Skip to main content

omni_dev/voice/reflect/
mod.rs

1//! `voice reflect` — transcript-to-events Claude consumer.
2//!
3//! Consumes `TranscriptEvent::Final` events from a `transcript.jsonl`
4//! source (file path, stdin, or session directory), calls Claude via the
5//! existing [`AiClient`], parses the YAML response into [`Event`]s, and
6//! appends them to `events.jsonl` (or stdout, for one-shot mode).
7//!
8//! Per #799: this is step 1 of the build order — text-in / events-out,
9//! no audio code. See [the umbrella issue](https://github.com/rust-works/omni-dev/issues/799)
10//! for the load-bearing event schema and the rationale for event-sourced
11//! reflection.
12
13pub mod prompt;
14pub mod validate;
15
16use std::collections::HashSet;
17use std::io::{Read, Write};
18use std::path::{Path, PathBuf};
19use std::time::Instant;
20
21use anyhow::{Context, Result};
22use tracing::warn;
23
24use crate::claude::ai::AiClient;
25use crate::voice::clock::Clock;
26use crate::voice::det::UlidRng;
27use crate::voice::events::{
28    Event, EventKind, ItemId, Provenance, ReflectionError, ReflectionId, TranscriptSpan,
29};
30use crate::voice::session::{self, Session};
31use crate::voice::{EventId, TranscriptEvent};
32
33/// Where to read the transcript from.
34#[derive(Debug, Clone)]
35pub enum TranscriptSource {
36    /// Read from a JSONL file at this path.
37    Path(PathBuf),
38    /// Read from standard input.
39    Stdin,
40    /// Open or create the named session and read its `transcript.jsonl`
41    /// (incrementally, after `meta.last_reflected_event_id`).
42    Session(String),
43}
44
45/// Driver options. The trait fields ([`UlidRng`], [`Clock`], [`AiClient`])
46/// are injected so tests can pin them with deterministic implementations.
47pub struct ReflectOptions {
48    /// Where to read the input transcript from.
49    pub source: TranscriptSource,
50    /// ULID source for `event_id` / `reflection_id` generation.
51    pub ulid_rng: Box<dyn UlidRng>,
52    /// Wall-clock source for `ts` fields.
53    pub clock: Box<dyn Clock>,
54    /// AI client to invoke for the reflection prompt.
55    pub ai: Box<dyn AiClient>,
56    /// Override the session root directory (test hook). When `None` and
57    /// `source = Session(_)`, the standard `~/.omni-dev/voice/` root
58    /// (or `OMNI_DEV_VOICE_ROOT`) is used.
59    pub session_root_override: Option<PathBuf>,
60}
61
62/// System prompt — short, fixed. All schema and behaviour rules live in
63/// the user prompt (`src/voice/prompts/reflect.md`) so the
64/// `prompt_version` hash captures the full contract.
65const SYSTEM_PROMPT: &str = "You convert a voice transcript into structured reflection \
66                              events. Follow the format and rules in the user prompt \
67                              exactly. Emit ONLY the YAML document — no commentary, no \
68                              code fences.";
69
70/// Drives one reflection invocation end-to-end.
71///
72/// Writes resulting JSONL events to `stdout` for non-session sources,
73/// or appends them to the session's `events.jsonl` and updates
74/// `meta.last_reflected_event_id` for session-backed runs.
75pub async fn run_reflect<W: Write>(opts: ReflectOptions, stdout: &mut W) -> Result<()> {
76    let ReflectOptions {
77        source,
78        mut ulid_rng,
79        clock,
80        ai,
81        session_root_override,
82    } = opts;
83
84    // Resolve transcript and existing state.
85    let (finals, session, existing_ids) = resolve_input(&source, session_root_override.as_deref())?;
86
87    let Some(span) = compute_span(&finals) else {
88        // No Finals consumed — nothing to reflect on. Quiet exit.
89        return Ok(());
90    };
91
92    // Build the prompt.
93    let current_state_body = match &session {
94        Some(sess) => {
95            let prior = sess.read_events()?;
96            let projected = crate::voice::events::project(prior);
97            prompt::format_current_state(&projected)
98        }
99        None => prompt::format_current_state(&crate::voice::events::ProjectedState::default()),
100    };
101    let new_transcript_body = prompt::format_new_transcript(&finals);
102    let user_prompt = prompt::render(&current_state_body, &new_transcript_body);
103
104    // Invoke the AI and time it.
105    let reflection_id = ReflectionId::Ulid(ulid_rng.next_ulid());
106    let started = Instant::now();
107    let ai_response = ai.send_request(SYSTEM_PROMPT, &user_prompt).await;
108    let latency_ms = started.elapsed().as_millis();
109    let model = ai.get_metadata().model;
110    let prompt_version = prompt::prompt_version().to_string();
111
112    let raw_response = match ai_response {
113        Ok(s) => s,
114        Err(e) => {
115            // The AI call itself failed (subprocess crash, timeout, …).
116            // Surface it as a reflection.error so the operator can audit.
117            let err_event = mint_error_event(
118                ulid_rng.as_mut(),
119                clock.as_ref(),
120                &reflection_id,
121                &span,
122                &model,
123                &prompt_version,
124                ReflectionError {
125                    raw_output: String::new(),
126                    error: format!("AI invocation failed: {e}"),
127                },
128            );
129            return emit_events(
130                &session,
131                stdout,
132                &[err_event],
133                /*new_marker*/ Some(span.end_event_id),
134                /*reflection_id*/ &reflection_id,
135                &model,
136                latency_ms,
137                /*status*/ "error",
138            );
139        }
140    };
141
142    // Validate; on failure, emit a single reflection.error event.
143    let events = match validate::parse_and_validate(&raw_response, &existing_ids) {
144        Ok(kinds) => kinds
145            .into_iter()
146            .map(|kind| {
147                build_event(
148                    ulid_rng.as_mut(),
149                    clock.as_ref(),
150                    &reflection_id,
151                    &span,
152                    &model,
153                    &prompt_version,
154                    kind,
155                )
156            })
157            .collect::<Vec<_>>(),
158        Err(verr) => vec![mint_error_event(
159            ulid_rng.as_mut(),
160            clock.as_ref(),
161            &reflection_id,
162            &span,
163            &model,
164            &prompt_version,
165            ReflectionError {
166                raw_output: verr.raw_output,
167                error: verr.error,
168            },
169        )],
170    };
171
172    let status = if events
173        .iter()
174        .any(|e| matches!(e.kind, EventKind::ReflectionError(_)))
175    {
176        "error"
177    } else {
178        "ok"
179    };
180
181    emit_events(
182        &session,
183        stdout,
184        &events,
185        Some(span.end_event_id),
186        &reflection_id,
187        &model,
188        latency_ms,
189        status,
190    )
191}
192
193fn resolve_input(
194    source: &TranscriptSource,
195    session_root_override: Option<&Path>,
196) -> Result<(Vec<TranscriptEvent>, Option<Session>, HashSet<ItemId>)> {
197    match source {
198        TranscriptSource::Path(p) if p.as_os_str() == "-" => {
199            let finals = read_finals_from_stdin()?;
200            Ok((finals, None, HashSet::new()))
201        }
202        TranscriptSource::Path(p) => {
203            let finals = session::read_transcript_finals_after(p, None)?;
204            Ok((finals, None, HashSet::new()))
205        }
206        TranscriptSource::Stdin => {
207            let finals = read_finals_from_stdin()?;
208            Ok((finals, None, HashSet::new()))
209        }
210        TranscriptSource::Session(id) => {
211            let sess = match session_root_override {
212                Some(root) => session::open_or_create_under(root, id)?,
213                None => session::open_or_create(id)?,
214            };
215            let finals = sess.read_transcript_finals_after()?;
216            let existing_state = crate::voice::events::project(sess.read_events()?);
217            let existing_ids: HashSet<ItemId> = existing_state.items.keys().copied().collect();
218            Ok((finals, Some(sess), existing_ids))
219        }
220    }
221}
222
223fn read_finals_from_stdin() -> Result<Vec<TranscriptEvent>> {
224    parse_finals_from_reader(&mut std::io::stdin(), "stdin")
225}
226
227/// Reads all `TranscriptEvent`s from `reader`, returning only the `Final`
228/// variants. Split out from [`read_finals_from_stdin`] so unit tests can
229/// drive it with a `&[u8]` rather than needing a real stdin pipe.
230fn parse_finals_from_reader<R: Read>(
231    reader: &mut R,
232    source_label: &str,
233) -> Result<Vec<TranscriptEvent>> {
234    let mut body = String::new();
235    reader
236        .read_to_string(&mut body)
237        .with_context(|| format!("reading transcript from {source_label}"))?;
238    let mut events = Vec::new();
239    for (idx, line) in body.lines().enumerate() {
240        if line.trim().is_empty() {
241            continue;
242        }
243        let event: TranscriptEvent = serde_json::from_str(line)
244            .with_context(|| format!("parsing {source_label} transcript line {}", idx + 1))?;
245        if matches!(event, TranscriptEvent::Final { .. }) {
246            events.push(event);
247        }
248    }
249    Ok(events)
250}
251
252fn compute_span(finals: &[TranscriptEvent]) -> Option<TranscriptSpan> {
253    let first = finals.iter().find_map(|e| match e {
254        TranscriptEvent::Final { event_id, .. } => Some(*event_id),
255        _ => None,
256    })?;
257    let last = finals.iter().rev().find_map(|e| match e {
258        TranscriptEvent::Final { event_id, .. } => Some(*event_id),
259        _ => None,
260    })?;
261    Some(TranscriptSpan {
262        start_event_id: first,
263        end_event_id: last,
264    })
265}
266
267fn build_event(
268    rng: &mut dyn UlidRng,
269    clock: &dyn Clock,
270    reflection_id: &ReflectionId,
271    span: &TranscriptSpan,
272    model: &str,
273    prompt_version: &str,
274    kind: EventKind,
275) -> Event {
276    Event {
277        event_id: rng.next_ulid(),
278        ts: clock.now(),
279        reflection_id: reflection_id.clone(),
280        provenance: Provenance {
281            transcript_span: Some(span.clone()),
282            model: Some(model.to_string()),
283            prompt_version: Some(prompt_version.to_string()),
284        },
285        kind: rewrite_kind_with_omitted_optionals(kind),
286    }
287}
288
289fn mint_error_event(
290    rng: &mut dyn UlidRng,
291    clock: &dyn Clock,
292    reflection_id: &ReflectionId,
293    span: &TranscriptSpan,
294    model: &str,
295    prompt_version: &str,
296    err: ReflectionError,
297) -> Event {
298    build_event(
299        rng,
300        clock,
301        reflection_id,
302        span,
303        model,
304        prompt_version,
305        EventKind::ReflectionError(err),
306    )
307}
308
309/// No-op pass-through today.
310///
311/// Hook point for future normalisation (e.g. trimming whitespace in
312/// user-supplied text). Kept centralised so any later sanitisation
313/// lives in one place.
314fn rewrite_kind_with_omitted_optionals(kind: EventKind) -> EventKind {
315    kind
316}
317
318#[allow(clippy::too_many_arguments)]
319fn emit_events<W: Write>(
320    session: &Option<Session>,
321    stdout: &mut W,
322    events: &[Event],
323    new_marker: Option<EventId>,
324    reflection_id: &ReflectionId,
325    model: &str,
326    latency_ms: u128,
327    status: &str,
328) -> Result<()> {
329    if let Some(sess) = session {
330        sess.append_events(events)?;
331        if let Some(marker) = new_marker {
332            // Clone is cheap; we need a mutable session to update meta.
333            let mut sess_mut = sess.clone();
334            sess_mut.set_last_reflected(marker)?;
335        }
336        let refl_id_str = match reflection_id {
337            ReflectionId::Ulid(u) => u.to_string(),
338            ReflectionId::Review => "review".to_string(),
339        };
340        let line = format!(
341            "{ts} {refl_id_str} model={model} cost_usd=unknown latency_ms={latency_ms} events={n} status={status}",
342            ts = chrono::Utc::now().to_rfc3339(),
343            n = events.len(),
344        );
345        sess.append_log(&line)?;
346    } else {
347        for event in events {
348            serde_json::to_writer(&mut *stdout, event)
349                .context("serialising reflection event to stdout")?;
350            stdout
351                .write_all(b"\n")
352                .context("writing newline to stdout")?;
353        }
354        stdout
355            .flush()
356            .context("flushing reflection events to stdout")?;
357        // No log line for non-session mode — the events.jsonl on stdout
358        // is the audit trail.
359        if status == "error" {
360            warn!(
361                model = %model,
362                latency_ms,
363                "reflect completed with errors (see emitted reflection.error event)"
364            );
365        }
366    }
367    Ok(())
368}
369
370#[cfg(test)]
371#[allow(clippy::unwrap_used, clippy::expect_used)]
372mod tests {
373    use super::*;
374    use crate::claude::test_utils::ConfigurableMockAiClient;
375    use crate::voice::clock::FixedClock;
376    use crate::voice::det::CountingUlidRng;
377    use crate::voice::events::ItemClass;
378    use std::time::Duration;
379    use tempfile::TempDir;
380
381    fn make_final(event_id: u128, text: &str) -> TranscriptEvent {
382        TranscriptEvent::Final {
383            event_id: ulid::Ulid::from_parts(0, event_id),
384            text: text.to_string(),
385            start: Duration::ZERO,
386            end: Duration::from_millis(500),
387            confidence: 0.95,
388            words: None,
389            speaker: None,
390            revisable: false,
391        }
392    }
393
394    fn write_transcript(tmp: &TempDir, finals: &[TranscriptEvent]) -> PathBuf {
395        let path = tmp.path().join("transcript.jsonl");
396        let mut body = String::new();
397        for e in finals {
398            body.push_str(&serde_json::to_string(e).unwrap());
399            body.push('\n');
400        }
401        std::fs::write(&path, body).unwrap();
402        path
403    }
404
405    fn fixed_opts(source: TranscriptSource, ai_responses: Vec<Result<String>>) -> ReflectOptions {
406        ReflectOptions {
407            source,
408            ulid_rng: Box::new(CountingUlidRng::new()),
409            clock: Box::new(FixedClock::from_rfc3339("2026-01-01T00:00:00Z")),
410            ai: Box::new(ConfigurableMockAiClient::new(ai_responses)),
411            session_root_override: None,
412        }
413    }
414
415    #[tokio::test]
416    async fn path_source_emits_events_to_stdout() {
417        let tmp = TempDir::new().unwrap();
418        let transcript = write_transcript(&tmp, &[make_final(1, "wire it up")]);
419        let canned = r"events:
420  - event_type: item.create
421    payload:
422      item_id: 00000000000000000000000007
423      class: todo
424      text: wire it up
425";
426        let opts = fixed_opts(
427            TranscriptSource::Path(transcript),
428            vec![Ok(canned.to_string())],
429        );
430        let mut out: Vec<u8> = Vec::new();
431        run_reflect(opts, &mut out).await.unwrap();
432        let body = String::from_utf8(out).unwrap();
433        assert_eq!(body.lines().count(), 1, "expected exactly one event line");
434        let event: Event = serde_json::from_str(body.lines().next().unwrap()).unwrap();
435        assert!(matches!(event.kind, EventKind::ItemCreate(_)));
436    }
437
438    #[tokio::test]
439    async fn empty_transcript_exits_quietly() {
440        let tmp = TempDir::new().unwrap();
441        let transcript = write_transcript(&tmp, &[]);
442        let opts = fixed_opts(
443            TranscriptSource::Path(transcript),
444            vec![/* AI never called */],
445        );
446        let mut out: Vec<u8> = Vec::new();
447        run_reflect(opts, &mut out).await.unwrap();
448        assert!(out.is_empty(), "no events expected when no Finals consumed");
449    }
450
451    #[tokio::test]
452    async fn malformed_response_yields_reflection_error_event() {
453        let tmp = TempDir::new().unwrap();
454        let transcript = write_transcript(&tmp, &[make_final(1, "talk")]);
455        let canned = "this is not yaml: - definitely not";
456        let opts = fixed_opts(
457            TranscriptSource::Path(transcript),
458            vec![Ok(canned.to_string())],
459        );
460        let mut out: Vec<u8> = Vec::new();
461        run_reflect(opts, &mut out).await.unwrap();
462        let body = String::from_utf8(out).unwrap();
463        assert_eq!(body.lines().count(), 1);
464        let event: Event = serde_json::from_str(body.lines().next().unwrap()).unwrap();
465        match &event.kind {
466            EventKind::ReflectionError(e) => {
467                assert!(e.raw_output.contains("not yaml"));
468                assert!(e.error.contains("YAML parse failure"));
469            }
470            other => panic!("expected ReflectionError, got {other:?}"),
471        }
472    }
473
474    #[tokio::test]
475    async fn ai_invocation_failure_yields_reflection_error_event() {
476        let tmp = TempDir::new().unwrap();
477        let transcript = write_transcript(&tmp, &[make_final(1, "talk")]);
478        let opts = fixed_opts(
479            TranscriptSource::Path(transcript),
480            vec![Err(anyhow::anyhow!("simulated subprocess crash"))],
481        );
482        let mut out: Vec<u8> = Vec::new();
483        run_reflect(opts, &mut out).await.unwrap();
484        let body = String::from_utf8(out).unwrap();
485        assert_eq!(body.lines().count(), 1);
486        let event: Event = serde_json::from_str(body.lines().next().unwrap()).unwrap();
487        match &event.kind {
488            EventKind::ReflectionError(e) => {
489                assert!(e.error.contains("simulated subprocess crash"));
490            }
491            other => panic!("expected ReflectionError, got {other:?}"),
492        }
493    }
494
495    #[tokio::test]
496    async fn session_source_appends_to_events_jsonl_and_advances_marker() {
497        let tmp = TempDir::new().unwrap();
498        let voice_root = tmp.path().join("voice-root");
499        std::fs::create_dir_all(&voice_root).unwrap();
500
501        // Pre-populate the session with a transcript.
502        let sess = session::open_or_create_under(&voice_root, "s1").unwrap();
503        std::fs::write(
504            &sess.paths.transcript,
505            serde_json::to_string(&make_final(1, "first")).unwrap() + "\n",
506        )
507        .unwrap();
508
509        let canned = r"events:
510  - event_type: item.create
511    payload:
512      item_id: 00000000000000000000000007
513      class: todo
514      text: first
515";
516        let mut opts = fixed_opts(
517            TranscriptSource::Session("s1".to_string()),
518            vec![Ok(canned.to_string())],
519        );
520        opts.session_root_override = Some(voice_root.clone());
521        let mut out: Vec<u8> = Vec::new();
522        run_reflect(opts, &mut out).await.unwrap();
523
524        assert!(out.is_empty(), "session writes go to disk, not stdout");
525        let appended = std::fs::read_to_string(&sess.paths.events).unwrap();
526        assert_eq!(appended.lines().count(), 1);
527        let reopened = session::open_or_create_under(&voice_root, "s1").unwrap();
528        assert_eq!(
529            reopened.meta.last_reflected_event_id,
530            Some(ulid::Ulid::from_parts(0, 1))
531        );
532        let log = std::fs::read_to_string(&sess.paths.log).unwrap();
533        assert!(log.contains("status=ok"), "log line missing: {log}");
534        assert!(log.contains("events=1"));
535    }
536
537    #[tokio::test]
538    async fn session_second_reflection_skips_already_consumed_finals() {
539        let tmp = TempDir::new().unwrap();
540        let voice_root = tmp.path().join("voice-root");
541        let sess = session::open_or_create_under(&voice_root, "s1").unwrap();
542
543        // First reflection consumes one final.
544        std::fs::write(
545            &sess.paths.transcript,
546            serde_json::to_string(&make_final(1, "first")).unwrap() + "\n",
547        )
548        .unwrap();
549        let canned1 = r"events:
550  - event_type: item.create
551    payload:
552      item_id: 00000000000000000000000007
553      class: todo
554      text: first
555";
556        let mut opts1 = fixed_opts(
557            TranscriptSource::Session("s1".to_string()),
558            vec![Ok(canned1.to_string())],
559        );
560        opts1.session_root_override = Some(voice_root.clone());
561        let mut sink: Vec<u8> = Vec::new();
562        run_reflect(opts1, &mut sink).await.unwrap();
563
564        // A second final is appended (as if the user dictated more).
565        use std::io::Write as _;
566        let mut transcript_file = std::fs::OpenOptions::new()
567            .append(true)
568            .open(&sess.paths.transcript)
569            .unwrap();
570        writeln!(
571            transcript_file,
572            "{}",
573            serde_json::to_string(&make_final(2, "second")).unwrap()
574        )
575        .unwrap();
576        drop(transcript_file);
577
578        // Second reflection should see only the "second" final because
579        // meta.last_reflected_event_id now points at ulid 1.
580        let canned2 = r"events:
581  - event_type: item.create
582    payload:
583      item_id: 00000000000000000000000008
584      class: todo
585      text: second
586";
587        let ai = ConfigurableMockAiClient::new(vec![Ok(canned2.to_string())]);
588        let prompts = ai.prompt_handle();
589        let opts2 = ReflectOptions {
590            source: TranscriptSource::Session("s1".to_string()),
591            ulid_rng: Box::new(CountingUlidRng::new()),
592            clock: Box::new(FixedClock::from_rfc3339("2026-01-01T00:00:00Z")),
593            ai: Box::new(ai),
594            session_root_override: Some(voice_root.clone()),
595        };
596        run_reflect(opts2, &mut sink).await.unwrap();
597
598        let prompts = prompts.prompts();
599        assert_eq!(prompts.len(), 1);
600        let (_sys, user) = &prompts[0];
601        assert!(
602            user.contains("second"),
603            "second prompt should include 'second'"
604        );
605        assert!(
606            !user.contains("] first"),
607            "second prompt should NOT include the already-consumed 'first' transcript line, got: {user}"
608        );
609    }
610
611    #[tokio::test]
612    async fn same_seed_twice_produces_byte_equal_output() {
613        let tmp = TempDir::new().unwrap();
614        let transcript_path = write_transcript(&tmp, &[make_final(1, "wire it up")]);
615        let canned = r"events:
616  - event_type: item.create
617    payload:
618      item_id: 00000000000000000000000007
619      class: todo
620      text: wire it up
621";
622
623        let mut out1: Vec<u8> = Vec::new();
624        let opts1 = fixed_opts(
625            TranscriptSource::Path(transcript_path.clone()),
626            vec![Ok(canned.to_string())],
627        );
628        run_reflect(opts1, &mut out1).await.unwrap();
629
630        let mut out2: Vec<u8> = Vec::new();
631        let opts2 = fixed_opts(
632            TranscriptSource::Path(transcript_path),
633            vec![Ok(canned.to_string())],
634        );
635        run_reflect(opts2, &mut out2).await.unwrap();
636
637        assert_eq!(
638            out1, out2,
639            "deterministic seeds should produce identical output"
640        );
641    }
642
643    #[test]
644    fn item_class_is_used_in_test() {
645        // Pin imports so the compiler doesn't warn about ItemClass being
646        // imported but unused — it's used in `format_current_state`'s
647        // pattern matching, which we don't otherwise exercise here.
648        let _ = ItemClass::Todo;
649    }
650
651    #[test]
652    fn parse_finals_from_reader_filters_partials_and_endpoints() {
653        let body = format!(
654            "{}\n{}\n{}\n\n",
655            serde_json::to_string(&TranscriptEvent::Partial {
656                text: "ignored".into(),
657                start: Duration::ZERO,
658                end: Duration::from_millis(50),
659                words: None,
660                speaker: None,
661            })
662            .unwrap(),
663            serde_json::to_string(&make_final(1, "kept")).unwrap(),
664            serde_json::to_string(&TranscriptEvent::Endpoint {
665                at: Duration::from_secs(1),
666                kind: crate::voice::EndpointKind::StreamEnd,
667            })
668            .unwrap(),
669        );
670        let mut bytes = body.as_bytes();
671        let finals = super::parse_finals_from_reader(&mut bytes, "test-source").unwrap();
672        assert_eq!(finals.len(), 1, "only the Final should be kept");
673        match &finals[0] {
674            TranscriptEvent::Final { text, .. } => assert_eq!(text, "kept"),
675            other => panic!("expected Final, got {other:?}"),
676        }
677    }
678
679    #[test]
680    fn parse_finals_from_reader_skips_blank_lines() {
681        let body = format!(
682            "\n  \n{}\n\n",
683            serde_json::to_string(&make_final(1, "only")).unwrap()
684        );
685        let mut bytes = body.as_bytes();
686        let finals = super::parse_finals_from_reader(&mut bytes, "test-source").unwrap();
687        assert_eq!(finals.len(), 1);
688    }
689
690    #[test]
691    fn parse_finals_from_reader_reports_parse_failure_with_line_number() {
692        let body = format!(
693            "{}\nnot json\n",
694            serde_json::to_string(&make_final(1, "ok")).unwrap()
695        );
696        let mut bytes = body.as_bytes();
697        let err = super::parse_finals_from_reader(&mut bytes, "test-source").unwrap_err();
698        let msg = err.to_string();
699        assert!(
700            msg.contains("test-source") && msg.contains("line 2"),
701            "error should point at line 2: {msg}"
702        );
703    }
704}