1pub mod prompt;
14pub mod validate;
15
16use std::collections::HashSet;
17use std::io::{Read, Write};
18use std::path::{Path, PathBuf};
19use std::time::Instant;
20
21use anyhow::{Context, Result};
22use tracing::warn;
23
24use crate::claude::ai::AiClient;
25use crate::voice::clock::Clock;
26use crate::voice::det::UlidRng;
27use crate::voice::events::{
28 Event, EventKind, ItemId, Provenance, ReflectionError, ReflectionId, TranscriptSpan,
29};
30use crate::voice::session::{self, Session};
31use crate::voice::{EventId, TranscriptEvent};
32
33#[derive(Debug, Clone)]
35pub enum TranscriptSource {
36 Path(PathBuf),
38 Stdin,
40 Session(String),
43}
44
45pub struct ReflectOptions {
48 pub source: TranscriptSource,
50 pub ulid_rng: Box<dyn UlidRng>,
52 pub clock: Box<dyn Clock>,
54 pub ai: Box<dyn AiClient>,
56 pub session_root_override: Option<PathBuf>,
60}
61
62const SYSTEM_PROMPT: &str = "You convert a voice transcript into structured reflection \
66 events. Follow the format and rules in the user prompt \
67 exactly. Emit ONLY the YAML document — no commentary, no \
68 code fences.";
69
70pub async fn run_reflect<W: Write>(opts: ReflectOptions, stdout: &mut W) -> Result<()> {
76 let ReflectOptions {
77 source,
78 mut ulid_rng,
79 clock,
80 ai,
81 session_root_override,
82 } = opts;
83
84 let (finals, session, existing_ids) = resolve_input(&source, session_root_override.as_deref())?;
86
87 let Some(span) = compute_span(&finals) else {
88 return Ok(());
90 };
91
92 let current_state_body = match &session {
94 Some(sess) => {
95 let prior = sess.read_events()?;
96 let projected = crate::voice::events::project(prior);
97 prompt::format_current_state(&projected)
98 }
99 None => prompt::format_current_state(&crate::voice::events::ProjectedState::default()),
100 };
101 let new_transcript_body = prompt::format_new_transcript(&finals);
102 let user_prompt = prompt::render(¤t_state_body, &new_transcript_body);
103
104 let reflection_id = ReflectionId::Ulid(ulid_rng.next_ulid());
106 let started = Instant::now();
107 let ai_response = ai.send_request(SYSTEM_PROMPT, &user_prompt).await;
108 let latency_ms = started.elapsed().as_millis();
109 let model = ai.get_metadata().model;
110 let prompt_version = prompt::prompt_version().to_string();
111
112 let raw_response = match ai_response {
113 Ok(s) => s,
114 Err(e) => {
115 let err_event = mint_error_event(
118 ulid_rng.as_mut(),
119 clock.as_ref(),
120 &reflection_id,
121 &span,
122 &model,
123 &prompt_version,
124 ReflectionError {
125 raw_output: String::new(),
126 error: format!("AI invocation failed: {e}"),
127 },
128 );
129 return emit_events(
130 &session,
131 stdout,
132 &[err_event],
133 Some(span.end_event_id),
134 &reflection_id,
135 &model,
136 latency_ms,
137 "error",
138 );
139 }
140 };
141
142 let events = match validate::parse_and_validate(&raw_response, &existing_ids) {
144 Ok(kinds) => kinds
145 .into_iter()
146 .map(|kind| {
147 build_event(
148 ulid_rng.as_mut(),
149 clock.as_ref(),
150 &reflection_id,
151 &span,
152 &model,
153 &prompt_version,
154 kind,
155 )
156 })
157 .collect::<Vec<_>>(),
158 Err(verr) => vec![mint_error_event(
159 ulid_rng.as_mut(),
160 clock.as_ref(),
161 &reflection_id,
162 &span,
163 &model,
164 &prompt_version,
165 ReflectionError {
166 raw_output: verr.raw_output,
167 error: verr.error,
168 },
169 )],
170 };
171
172 let status = if events
173 .iter()
174 .any(|e| matches!(e.kind, EventKind::ReflectionError(_)))
175 {
176 "error"
177 } else {
178 "ok"
179 };
180
181 emit_events(
182 &session,
183 stdout,
184 &events,
185 Some(span.end_event_id),
186 &reflection_id,
187 &model,
188 latency_ms,
189 status,
190 )
191}
192
193fn resolve_input(
194 source: &TranscriptSource,
195 session_root_override: Option<&Path>,
196) -> Result<(Vec<TranscriptEvent>, Option<Session>, HashSet<ItemId>)> {
197 match source {
198 TranscriptSource::Path(p) if p.as_os_str() == "-" => {
199 let finals = read_finals_from_stdin()?;
200 Ok((finals, None, HashSet::new()))
201 }
202 TranscriptSource::Path(p) => {
203 let finals = session::read_transcript_finals_after(p, None)?;
204 Ok((finals, None, HashSet::new()))
205 }
206 TranscriptSource::Stdin => {
207 let finals = read_finals_from_stdin()?;
208 Ok((finals, None, HashSet::new()))
209 }
210 TranscriptSource::Session(id) => {
211 let sess = match session_root_override {
212 Some(root) => session::open_or_create_under(root, id)?,
213 None => session::open_or_create(id)?,
214 };
215 let finals = sess.read_transcript_finals_after()?;
216 let existing_state = crate::voice::events::project(sess.read_events()?);
217 let existing_ids: HashSet<ItemId> = existing_state.items.keys().copied().collect();
218 Ok((finals, Some(sess), existing_ids))
219 }
220 }
221}
222
223fn read_finals_from_stdin() -> Result<Vec<TranscriptEvent>> {
224 parse_finals_from_reader(&mut std::io::stdin(), "stdin")
225}
226
227fn parse_finals_from_reader<R: Read>(
231 reader: &mut R,
232 source_label: &str,
233) -> Result<Vec<TranscriptEvent>> {
234 let mut body = String::new();
235 reader
236 .read_to_string(&mut body)
237 .with_context(|| format!("reading transcript from {source_label}"))?;
238 let mut events = Vec::new();
239 for (idx, line) in body.lines().enumerate() {
240 if line.trim().is_empty() {
241 continue;
242 }
243 let event: TranscriptEvent = serde_json::from_str(line)
244 .with_context(|| format!("parsing {source_label} transcript line {}", idx + 1))?;
245 if matches!(event, TranscriptEvent::Final { .. }) {
246 events.push(event);
247 }
248 }
249 Ok(events)
250}
251
252fn compute_span(finals: &[TranscriptEvent]) -> Option<TranscriptSpan> {
253 let first = finals.iter().find_map(|e| match e {
254 TranscriptEvent::Final { event_id, .. } => Some(*event_id),
255 _ => None,
256 })?;
257 let last = finals.iter().rev().find_map(|e| match e {
258 TranscriptEvent::Final { event_id, .. } => Some(*event_id),
259 _ => None,
260 })?;
261 Some(TranscriptSpan {
262 start_event_id: first,
263 end_event_id: last,
264 })
265}
266
267fn build_event(
268 rng: &mut dyn UlidRng,
269 clock: &dyn Clock,
270 reflection_id: &ReflectionId,
271 span: &TranscriptSpan,
272 model: &str,
273 prompt_version: &str,
274 kind: EventKind,
275) -> Event {
276 Event {
277 event_id: rng.next_ulid(),
278 ts: clock.now(),
279 reflection_id: reflection_id.clone(),
280 provenance: Provenance {
281 transcript_span: Some(span.clone()),
282 model: Some(model.to_string()),
283 prompt_version: Some(prompt_version.to_string()),
284 },
285 kind: rewrite_kind_with_omitted_optionals(kind),
286 }
287}
288
289fn mint_error_event(
290 rng: &mut dyn UlidRng,
291 clock: &dyn Clock,
292 reflection_id: &ReflectionId,
293 span: &TranscriptSpan,
294 model: &str,
295 prompt_version: &str,
296 err: ReflectionError,
297) -> Event {
298 build_event(
299 rng,
300 clock,
301 reflection_id,
302 span,
303 model,
304 prompt_version,
305 EventKind::ReflectionError(err),
306 )
307}
308
309fn rewrite_kind_with_omitted_optionals(kind: EventKind) -> EventKind {
315 kind
316}
317
318#[allow(clippy::too_many_arguments)]
319fn emit_events<W: Write>(
320 session: &Option<Session>,
321 stdout: &mut W,
322 events: &[Event],
323 new_marker: Option<EventId>,
324 reflection_id: &ReflectionId,
325 model: &str,
326 latency_ms: u128,
327 status: &str,
328) -> Result<()> {
329 if let Some(sess) = session {
330 sess.append_events(events)?;
331 if let Some(marker) = new_marker {
332 let mut sess_mut = sess.clone();
334 sess_mut.set_last_reflected(marker)?;
335 }
336 let refl_id_str = match reflection_id {
337 ReflectionId::Ulid(u) => u.to_string(),
338 ReflectionId::Review => "review".to_string(),
339 };
340 let line = format!(
341 "{ts} {refl_id_str} model={model} cost_usd=unknown latency_ms={latency_ms} events={n} status={status}",
342 ts = chrono::Utc::now().to_rfc3339(),
343 n = events.len(),
344 );
345 sess.append_log(&line)?;
346 } else {
347 for event in events {
348 serde_json::to_writer(&mut *stdout, event)
349 .context("serialising reflection event to stdout")?;
350 stdout
351 .write_all(b"\n")
352 .context("writing newline to stdout")?;
353 }
354 stdout
355 .flush()
356 .context("flushing reflection events to stdout")?;
357 if status == "error" {
360 warn!(
361 model = %model,
362 latency_ms,
363 "reflect completed with errors (see emitted reflection.error event)"
364 );
365 }
366 }
367 Ok(())
368}
369
370#[cfg(test)]
371#[allow(clippy::unwrap_used, clippy::expect_used)]
372mod tests {
373 use super::*;
374 use crate::claude::test_utils::ConfigurableMockAiClient;
375 use crate::voice::clock::FixedClock;
376 use crate::voice::det::CountingUlidRng;
377 use crate::voice::events::ItemClass;
378 use std::time::Duration;
379 use tempfile::TempDir;
380
381 fn make_final(event_id: u128, text: &str) -> TranscriptEvent {
382 TranscriptEvent::Final {
383 event_id: ulid::Ulid::from_parts(0, event_id),
384 text: text.to_string(),
385 start: Duration::ZERO,
386 end: Duration::from_millis(500),
387 confidence: 0.95,
388 words: None,
389 speaker: None,
390 revisable: false,
391 }
392 }
393
394 fn write_transcript(tmp: &TempDir, finals: &[TranscriptEvent]) -> PathBuf {
395 let path = tmp.path().join("transcript.jsonl");
396 let mut body = String::new();
397 for e in finals {
398 body.push_str(&serde_json::to_string(e).unwrap());
399 body.push('\n');
400 }
401 std::fs::write(&path, body).unwrap();
402 path
403 }
404
405 fn fixed_opts(source: TranscriptSource, ai_responses: Vec<Result<String>>) -> ReflectOptions {
406 ReflectOptions {
407 source,
408 ulid_rng: Box::new(CountingUlidRng::new()),
409 clock: Box::new(FixedClock::from_rfc3339("2026-01-01T00:00:00Z")),
410 ai: Box::new(ConfigurableMockAiClient::new(ai_responses)),
411 session_root_override: None,
412 }
413 }
414
415 #[tokio::test]
416 async fn path_source_emits_events_to_stdout() {
417 let tmp = TempDir::new().unwrap();
418 let transcript = write_transcript(&tmp, &[make_final(1, "wire it up")]);
419 let canned = r"events:
420 - event_type: item.create
421 payload:
422 item_id: 00000000000000000000000007
423 class: todo
424 text: wire it up
425";
426 let opts = fixed_opts(
427 TranscriptSource::Path(transcript),
428 vec![Ok(canned.to_string())],
429 );
430 let mut out: Vec<u8> = Vec::new();
431 run_reflect(opts, &mut out).await.unwrap();
432 let body = String::from_utf8(out).unwrap();
433 assert_eq!(body.lines().count(), 1, "expected exactly one event line");
434 let event: Event = serde_json::from_str(body.lines().next().unwrap()).unwrap();
435 assert!(matches!(event.kind, EventKind::ItemCreate(_)));
436 }
437
438 #[tokio::test]
439 async fn empty_transcript_exits_quietly() {
440 let tmp = TempDir::new().unwrap();
441 let transcript = write_transcript(&tmp, &[]);
442 let opts = fixed_opts(
443 TranscriptSource::Path(transcript),
444 vec![],
445 );
446 let mut out: Vec<u8> = Vec::new();
447 run_reflect(opts, &mut out).await.unwrap();
448 assert!(out.is_empty(), "no events expected when no Finals consumed");
449 }
450
451 #[tokio::test]
452 async fn malformed_response_yields_reflection_error_event() {
453 let tmp = TempDir::new().unwrap();
454 let transcript = write_transcript(&tmp, &[make_final(1, "talk")]);
455 let canned = "this is not yaml: - definitely not";
456 let opts = fixed_opts(
457 TranscriptSource::Path(transcript),
458 vec![Ok(canned.to_string())],
459 );
460 let mut out: Vec<u8> = Vec::new();
461 run_reflect(opts, &mut out).await.unwrap();
462 let body = String::from_utf8(out).unwrap();
463 assert_eq!(body.lines().count(), 1);
464 let event: Event = serde_json::from_str(body.lines().next().unwrap()).unwrap();
465 match &event.kind {
466 EventKind::ReflectionError(e) => {
467 assert!(e.raw_output.contains("not yaml"));
468 assert!(e.error.contains("YAML parse failure"));
469 }
470 other => panic!("expected ReflectionError, got {other:?}"),
471 }
472 }
473
474 #[tokio::test]
475 async fn ai_invocation_failure_yields_reflection_error_event() {
476 let tmp = TempDir::new().unwrap();
477 let transcript = write_transcript(&tmp, &[make_final(1, "talk")]);
478 let opts = fixed_opts(
479 TranscriptSource::Path(transcript),
480 vec![Err(anyhow::anyhow!("simulated subprocess crash"))],
481 );
482 let mut out: Vec<u8> = Vec::new();
483 run_reflect(opts, &mut out).await.unwrap();
484 let body = String::from_utf8(out).unwrap();
485 assert_eq!(body.lines().count(), 1);
486 let event: Event = serde_json::from_str(body.lines().next().unwrap()).unwrap();
487 match &event.kind {
488 EventKind::ReflectionError(e) => {
489 assert!(e.error.contains("simulated subprocess crash"));
490 }
491 other => panic!("expected ReflectionError, got {other:?}"),
492 }
493 }
494
495 #[tokio::test]
496 async fn session_source_appends_to_events_jsonl_and_advances_marker() {
497 let tmp = TempDir::new().unwrap();
498 let voice_root = tmp.path().join("voice-root");
499 std::fs::create_dir_all(&voice_root).unwrap();
500
501 let sess = session::open_or_create_under(&voice_root, "s1").unwrap();
503 std::fs::write(
504 &sess.paths.transcript,
505 serde_json::to_string(&make_final(1, "first")).unwrap() + "\n",
506 )
507 .unwrap();
508
509 let canned = r"events:
510 - event_type: item.create
511 payload:
512 item_id: 00000000000000000000000007
513 class: todo
514 text: first
515";
516 let mut opts = fixed_opts(
517 TranscriptSource::Session("s1".to_string()),
518 vec![Ok(canned.to_string())],
519 );
520 opts.session_root_override = Some(voice_root.clone());
521 let mut out: Vec<u8> = Vec::new();
522 run_reflect(opts, &mut out).await.unwrap();
523
524 assert!(out.is_empty(), "session writes go to disk, not stdout");
525 let appended = std::fs::read_to_string(&sess.paths.events).unwrap();
526 assert_eq!(appended.lines().count(), 1);
527 let reopened = session::open_or_create_under(&voice_root, "s1").unwrap();
528 assert_eq!(
529 reopened.meta.last_reflected_event_id,
530 Some(ulid::Ulid::from_parts(0, 1))
531 );
532 let log = std::fs::read_to_string(&sess.paths.log).unwrap();
533 assert!(log.contains("status=ok"), "log line missing: {log}");
534 assert!(log.contains("events=1"));
535 }
536
537 #[tokio::test]
538 async fn session_second_reflection_skips_already_consumed_finals() {
539 let tmp = TempDir::new().unwrap();
540 let voice_root = tmp.path().join("voice-root");
541 let sess = session::open_or_create_under(&voice_root, "s1").unwrap();
542
543 std::fs::write(
545 &sess.paths.transcript,
546 serde_json::to_string(&make_final(1, "first")).unwrap() + "\n",
547 )
548 .unwrap();
549 let canned1 = r"events:
550 - event_type: item.create
551 payload:
552 item_id: 00000000000000000000000007
553 class: todo
554 text: first
555";
556 let mut opts1 = fixed_opts(
557 TranscriptSource::Session("s1".to_string()),
558 vec![Ok(canned1.to_string())],
559 );
560 opts1.session_root_override = Some(voice_root.clone());
561 let mut sink: Vec<u8> = Vec::new();
562 run_reflect(opts1, &mut sink).await.unwrap();
563
564 use std::io::Write as _;
566 let mut transcript_file = std::fs::OpenOptions::new()
567 .append(true)
568 .open(&sess.paths.transcript)
569 .unwrap();
570 writeln!(
571 transcript_file,
572 "{}",
573 serde_json::to_string(&make_final(2, "second")).unwrap()
574 )
575 .unwrap();
576 drop(transcript_file);
577
578 let canned2 = r"events:
581 - event_type: item.create
582 payload:
583 item_id: 00000000000000000000000008
584 class: todo
585 text: second
586";
587 let ai = ConfigurableMockAiClient::new(vec![Ok(canned2.to_string())]);
588 let prompts = ai.prompt_handle();
589 let opts2 = ReflectOptions {
590 source: TranscriptSource::Session("s1".to_string()),
591 ulid_rng: Box::new(CountingUlidRng::new()),
592 clock: Box::new(FixedClock::from_rfc3339("2026-01-01T00:00:00Z")),
593 ai: Box::new(ai),
594 session_root_override: Some(voice_root.clone()),
595 };
596 run_reflect(opts2, &mut sink).await.unwrap();
597
598 let prompts = prompts.prompts();
599 assert_eq!(prompts.len(), 1);
600 let (_sys, user) = &prompts[0];
601 assert!(
602 user.contains("second"),
603 "second prompt should include 'second'"
604 );
605 assert!(
606 !user.contains("] first"),
607 "second prompt should NOT include the already-consumed 'first' transcript line, got: {user}"
608 );
609 }
610
611 #[tokio::test]
612 async fn same_seed_twice_produces_byte_equal_output() {
613 let tmp = TempDir::new().unwrap();
614 let transcript_path = write_transcript(&tmp, &[make_final(1, "wire it up")]);
615 let canned = r"events:
616 - event_type: item.create
617 payload:
618 item_id: 00000000000000000000000007
619 class: todo
620 text: wire it up
621";
622
623 let mut out1: Vec<u8> = Vec::new();
624 let opts1 = fixed_opts(
625 TranscriptSource::Path(transcript_path.clone()),
626 vec![Ok(canned.to_string())],
627 );
628 run_reflect(opts1, &mut out1).await.unwrap();
629
630 let mut out2: Vec<u8> = Vec::new();
631 let opts2 = fixed_opts(
632 TranscriptSource::Path(transcript_path),
633 vec![Ok(canned.to_string())],
634 );
635 run_reflect(opts2, &mut out2).await.unwrap();
636
637 assert_eq!(
638 out1, out2,
639 "deterministic seeds should produce identical output"
640 );
641 }
642
643 #[test]
644 fn item_class_is_used_in_test() {
645 let _ = ItemClass::Todo;
649 }
650
651 #[test]
652 fn parse_finals_from_reader_filters_partials_and_endpoints() {
653 let body = format!(
654 "{}\n{}\n{}\n\n",
655 serde_json::to_string(&TranscriptEvent::Partial {
656 text: "ignored".into(),
657 start: Duration::ZERO,
658 end: Duration::from_millis(50),
659 words: None,
660 speaker: None,
661 })
662 .unwrap(),
663 serde_json::to_string(&make_final(1, "kept")).unwrap(),
664 serde_json::to_string(&TranscriptEvent::Endpoint {
665 at: Duration::from_secs(1),
666 kind: crate::voice::EndpointKind::StreamEnd,
667 })
668 .unwrap(),
669 );
670 let mut bytes = body.as_bytes();
671 let finals = super::parse_finals_from_reader(&mut bytes, "test-source").unwrap();
672 assert_eq!(finals.len(), 1, "only the Final should be kept");
673 match &finals[0] {
674 TranscriptEvent::Final { text, .. } => assert_eq!(text, "kept"),
675 other => panic!("expected Final, got {other:?}"),
676 }
677 }
678
679 #[test]
680 fn parse_finals_from_reader_skips_blank_lines() {
681 let body = format!(
682 "\n \n{}\n\n",
683 serde_json::to_string(&make_final(1, "only")).unwrap()
684 );
685 let mut bytes = body.as_bytes();
686 let finals = super::parse_finals_from_reader(&mut bytes, "test-source").unwrap();
687 assert_eq!(finals.len(), 1);
688 }
689
690 #[test]
691 fn parse_finals_from_reader_reports_parse_failure_with_line_number() {
692 let body = format!(
693 "{}\nnot json\n",
694 serde_json::to_string(&make_final(1, "ok")).unwrap()
695 );
696 let mut bytes = body.as_bytes();
697 let err = super::parse_finals_from_reader(&mut bytes, "test-source").unwrap_err();
698 let msg = err.to_string();
699 assert!(
700 msg.contains("test-source") && msg.contains("line 2"),
701 "error should point at line 2: {msg}"
702 );
703 }
704}