Skip to main content

harness/codex/
parser.rs

1//! Codex's `codex exec --json` parser — its JSONL wire format → the neutral
2//! [`crate::RunEvent`] vocabulary.
3//!
4//! Two layers: the stateless [`parse_codex_line`] decodes one line into a
5//! [`ParsedLine`] (tool cards, session, usage), and the stateful
6//! [`CodexStreamParser`] wraps it per-run to resolve codex's
7//! preamble-vs-answer ambiguity and drop its stderr noise.
8//!
9//! Wire format reference (verified against the official docs,
10//! https://developers.openai.com/codex/noninteractive): `--json` emits one
11//! JSON object per line. The assistant's reply is an `item.completed` event
12//! whose `item.type == "agent_message"` with the full text in `item.text` —
13//! Codex sends the whole message at once, not token deltas. Command
14//! executions arrive as `command_execution` items; `thread.started` carries
15//! the session (thread id) and `turn.completed` the token usage.
16
17use serde_json::{Map, Value};
18
19use crate::events::run_events_from_parsed;
20use crate::{
21    ParsedLine, ProcessEvent, RunEvent, SessionInfo, ToolCallEnd, ToolCallStart, UsageInfo,
22};
23
24/// A stable tool *identifier* for a codex item — the card's `name`, which the
25/// consumer humanizes (see Compose's `toolLabels`). Returning an identifier
26/// here rather than a display phrase keeps codex consistent with the other
27/// adapters (bob's `read_file`, …) and keeps the raw command — which lives in
28/// `input` — out of the `name`, so a live status never echoes a shell command.
29/// `None` for non-tool items.
30fn codex_tool_kind(item: &Map<String, Value>) -> Option<&'static str> {
31    match item.get("type").and_then(Value::as_str)? {
32        "command_execution" => Some("command_execution"),
33        "file_change" => Some("file_change"),
34        "web_search" => Some("web_search"),
35        "mcp_tool_call" => Some("mcp_tool_call"),
36        _ => None,
37    }
38}
39
40/// A human-readable fallback label for a codex tool item, used only when it
41/// carries no `id` to key a card (rare — codex 0.125.0 always sends one). The
42/// normal path emits [`codex_tool_kind`] as the `name` and lets the consumer
43/// phrase it; this stays command-free so even the fallback can't leak a shell
44/// command into the status line.
45fn codex_tool_label(item: &Map<String, Value>) -> Option<String> {
46    Some(
47        match item.get("type").and_then(Value::as_str)? {
48            "command_execution" => "Running a command",
49            "file_change" => "Editing files",
50            "web_search" => "Searching the web",
51            "mcp_tool_call" => "Running a tool",
52            _ => return None,
53        }
54        .to_owned(),
55    )
56}
57
58/// The tool call's input, lifted inline. Only `command_execution`
59/// carries a literal we can ground against (`command`); other item
60/// types stream/structure their args differently, so leave them `None`
61/// rather than guess.
62fn codex_tool_input(item: &Map<String, Value>) -> Option<String> {
63    if item.get("type").and_then(Value::as_str) == Some("command_execution") {
64        return item
65            .get("command")
66            .and_then(Value::as_str)
67            .filter(|s| !s.is_empty())
68            .map(str::to_owned);
69    }
70    None
71}
72
73/// The tool call's output, lifted inline. `command_execution` reports
74/// `aggregated_output`; other item types are left `None`.
75fn codex_tool_output(item: &Map<String, Value>) -> Option<String> {
76    if item.get("type").and_then(Value::as_str) == Some("command_execution") {
77        return item
78            .get("aggregated_output")
79            .and_then(Value::as_str)
80            .filter(|s| !s.is_empty())
81            .map(str::to_owned);
82    }
83    None
84}
85
86/// Did a codex tool item succeed? `command_execution` reports
87/// `exit_code` (0 = ok); otherwise fall back to `status` (anything but
88/// an explicit failure is treated as ok, since not every tool type
89/// carries an exit code).
90fn codex_tool_ok(item: &Map<String, Value>) -> bool {
91    if let Some(code) = item.get("exit_code").and_then(Value::as_i64) {
92        return code == 0;
93    }
94    !matches!(
95        item.get("status").and_then(Value::as_str),
96        Some("failed") | Some("error")
97    )
98}
99
100/// Stateful per-run wrapper over [`parse_codex_line`] that resolves codex's
101/// preamble-vs-answer ambiguity and drops its stderr noise. One per run.
102///
103/// Codex emits several *complete* `agent_message` items in a turn: short
104/// preambles before tool calls ("I'll read the file first") and a final
105/// answer. Nothing on the item distinguishes them — the only signal is
106/// position: the last `agent_message` before `turn.completed` is the answer;
107/// every earlier one is a preamble. So we hold the latest `agent_message`
108/// and classify it by what follows — another item means it was a preamble
109/// (→ [`RunEvent::Activity`], transient narration), `turn.completed` (or
110/// stream end) means it was the answer (→ [`RunEvent::Text`]). Without this
111/// the preambles concatenate onto the answer in the bubble.
112///
113/// It also drops codex's stderr: in `--json` mode codex writes only tracing
114/// logs there ("Reading additional input…", internal `ERROR codex_core::…`
115/// lines) and reports real failures as stdout `error` items — so stderr is
116/// pure noise, not status.
117#[derive(Debug, Default)]
118pub struct CodexStreamParser {
119    /// The most recent `agent_message` text, not yet known to be a preamble
120    /// (→ Activity) or the final answer (→ Text).
121    pending_message: Option<String>,
122}
123
124impl CodexStreamParser {
125    pub fn new() -> Self {
126        Self::default()
127    }
128
129    /// Normalize one raw process event, applying the agent_message state
130    /// machine to stdout and dropping stderr noise.
131    pub fn on_process_event(&mut self, event: ProcessEvent) -> Vec<RunEvent> {
132        match event {
133            // codex's stderr is tracing noise in `--json` mode; real errors
134            // arrive as stdout `error` items. Don't surface it as status.
135            ProcessEvent::Stderr { .. } => Vec::new(),
136            ProcessEvent::Started { run_id } => vec![RunEvent::Started { run_id }],
137            ProcessEvent::Error { run_id, message } => {
138                // Flush a held message as the answer before the terminal error.
139                let mut out = self.take_pending_as_answer(&run_id);
140                out.push(RunEvent::Error { run_id, message });
141                out
142            }
143            ProcessEvent::Exited {
144                run_id,
145                exit_code,
146                cancelled,
147            } => {
148                // Defensive: a turn normally ends with `turn.completed` (which
149                // flushes the answer); if the stream ended without it, don't
150                // lose a held final message.
151                let mut out = self.take_pending_as_answer(&run_id);
152                out.push(RunEvent::Exited {
153                    run_id,
154                    exit_code,
155                    cancelled,
156                });
157                out
158            }
159            ProcessEvent::Stdout { run_id, line } => self.on_stdout(&run_id, &line),
160            // `ProcessEvent` is #[non_exhaustive]; ignore any future variant.
161            _ => Vec::new(),
162        }
163    }
164
165    fn on_stdout(&mut self, run_id: &str, line: &str) -> Vec<RunEvent> {
166        let value = serde_json::from_str::<Value>(line.trim()).ok();
167        let typ = value
168            .as_ref()
169            .and_then(Value::as_object)
170            .and_then(|o| o.get("type"))
171            .and_then(Value::as_str);
172
173        // A new assistant message arrived: whatever we held is now known to
174        // be a preamble (it was superseded). Hold the new one.
175        if let Some(text) = value.as_ref().and_then(codex_agent_message_text) {
176            let out = self.take_pending_as_preamble(run_id);
177            if !text.is_empty() {
178                self.pending_message = Some(text);
179            }
180            return out;
181        }
182
183        // Any other line: a held message is a preamble — unless the turn just
184        // ended, when it's the answer.
185        let mut out = if typ == Some("turn.completed") {
186            self.take_pending_as_answer(run_id)
187        } else {
188            self.take_pending_as_preamble(run_id)
189        };
190        // Non-`agent_message` lines still decode normally (tool cards,
191        // session, usage, error).
192        out.extend(run_events_from_parsed(run_id, parse_codex_line(line)));
193        out
194    }
195
196    /// Emit a held message as transient narration (it was a preamble).
197    fn take_pending_as_preamble(&mut self, run_id: &str) -> Vec<RunEvent> {
198        match self.pending_message.take() {
199            Some(text) if !text.is_empty() => vec![RunEvent::Activity {
200                run_id: run_id.to_owned(),
201                message: text,
202            }],
203            _ => Vec::new(),
204        }
205    }
206
207    /// Emit a held message as the answer (the final assistant message).
208    fn take_pending_as_answer(&mut self, run_id: &str) -> Vec<RunEvent> {
209        match self.pending_message.take() {
210            Some(text) if !text.is_empty() => vec![RunEvent::Text {
211                run_id: run_id.to_owned(),
212                delta: text,
213            }],
214            _ => Vec::new(),
215        }
216    }
217}
218
219/// The text of an `agent_message` `item.completed` line, else `None`. Returns
220/// `Some("")` for an empty/absent `text` so the caller still treats the line
221/// as a (superseding) message.
222fn codex_agent_message_text(value: &Value) -> Option<String> {
223    let obj = value.as_object()?;
224    if obj.get("type").and_then(Value::as_str) != Some("item.completed") {
225        return None;
226    }
227    let item = obj.get("item").and_then(Value::as_object)?;
228    if item.get("type").and_then(Value::as_str) != Some("agent_message") {
229        return None;
230    }
231    Some(
232        item.get("text")
233            .and_then(Value::as_str)
234            .unwrap_or_default()
235            .to_owned(),
236    )
237}
238
239/// Parse one line of `codex exec --json` JSONL into the shared
240/// [`ParsedLine`]. Assistant text is the full `agent_message` on
241/// `item.completed`; tool items (`command_execution`, `file_change`,
242/// `web_search`, `mcp_tool_call`) become structured tool cards
243/// (`ToolStart`/`ToolEnd`). Codex edits files directly via tools
244/// (reflected on disk by the file watcher), so it never emits
245/// suggested-edit previews — `edits` stays empty.
246pub fn parse_codex_line(line: &str) -> ParsedLine {
247    let trimmed = line.trim();
248    if trimmed.is_empty() {
249        return ParsedLine::default();
250    }
251    let Ok(value) = serde_json::from_str::<Value>(trimmed) else {
252        return ParsedLine::default();
253    };
254    let Some(obj) = value.as_object() else {
255        return ParsedLine::default();
256    };
257
258    match obj.get("type").and_then(Value::as_str) {
259        Some("item.completed") => {
260            let Some(item) = obj.get("item").and_then(Value::as_object) else {
261                return ParsedLine::default();
262            };
263            // The assistant's reply: full text in one shot.
264            if item.get("type").and_then(Value::as_str) == Some("agent_message") {
265                if let Some(text) = item.get("text").and_then(Value::as_str) {
266                    if !text.is_empty() {
267                        return ParsedLine {
268                            text: Some(text.to_owned()),
269                            ..ParsedLine::default()
270                        };
271                    }
272                }
273            }
274            // A tool item finished. Grounded in codex 0.125.0's
275            // `--json` schema: command_execution / web_search /
276            // file_change / mcp_tool_call items arrive on
277            // `item.completed` carrying `id` + `status` (+ `exit_code`
278            // for commands). It does NOT emit an `item.started` for
279            // these, so emit BOTH start and end from this one event —
280            // that's what makes a card appear. The frontend dedups a
281            // repeated start by id, so a future codex that *does* send
282            // `item.started` still renders correctly.
283            if let Some(kind) = codex_tool_kind(item) {
284                return match item.get("id").and_then(Value::as_str) {
285                    Some(id) => {
286                        let id = id.to_owned();
287                        ParsedLine {
288                            tool_start: Some(ToolCallStart {
289                                tool_call_id: id.clone(),
290                                name: kind.to_owned(),
291                                input: codex_tool_input(item),
292                            }),
293                            tool_end: Some(ToolCallEnd {
294                                tool_call_id: id,
295                                ok: codex_tool_ok(item),
296                                output: codex_tool_output(item),
297                            }),
298                            ..ParsedLine::default()
299                        }
300                    }
301                    None => ParsedLine {
302                        activity: codex_tool_label(item),
303                        ..ParsedLine::default()
304                    },
305                };
306            }
307            ParsedLine::default()
308        }
309        Some("item.started") => {
310            let Some(item) = obj.get("item").and_then(Value::as_object) else {
311                return ParsedLine::default();
312            };
313            // A tool announced before it finishes → a "running" card; the
314            // matching `item.completed` flips it to done/error. If the
315            // item has no `id` to key the card, degrade to a plain
316            // activity line rather than drop it.
317            if let Some(kind) = codex_tool_kind(item) {
318                return match item.get("id").and_then(Value::as_str) {
319                    Some(id) => ParsedLine {
320                        tool_start: Some(ToolCallStart {
321                            tool_call_id: id.to_owned(),
322                            name: kind.to_owned(),
323                            input: codex_tool_input(item),
324                        }),
325                        ..ParsedLine::default()
326                    },
327                    None => ParsedLine {
328                        activity: codex_tool_label(item),
329                        ..ParsedLine::default()
330                    },
331                };
332            }
333            ParsedLine::default()
334        }
335        Some("error") => {
336            let message = obj
337                .get("message")
338                .and_then(Value::as_str)
339                .unwrap_or("Codex error");
340            ParsedLine {
341                activity: Some(truncate(message, 240)),
342                ..ParsedLine::default()
343            }
344        }
345        // thread.started → session. Codex gives a `thread_id` (its session)
346        // but no model in this event, so `model` stays None.
347        Some("thread.started") => ParsedLine {
348            session: Some(SessionInfo {
349                session_id: obj
350                    .get("thread_id")
351                    .and_then(Value::as_str)
352                    .filter(|s| !s.is_empty())
353                    .map(str::to_owned),
354                model: None,
355            }),
356            ..ParsedLine::default()
357        },
358        // turn.completed → token usage (codex reports input/output tokens).
359        Some("turn.completed") => {
360            let usage = obj.get("usage").and_then(Value::as_object);
361            let input_tokens = usage.and_then(|u| u.get("input_tokens")).and_then(Value::as_u64);
362            let output_tokens = usage.and_then(|u| u.get("output_tokens")).and_then(Value::as_u64);
363            if input_tokens.is_none() && output_tokens.is_none() {
364                return ParsedLine::default();
365            }
366            let total_tokens = match (input_tokens, output_tokens) {
367                (Some(i), Some(o)) => Some(i + o),
368                _ => None,
369            };
370            ParsedLine {
371                usage: Some(UsageInfo {
372                    input_tokens,
373                    output_tokens,
374                    total_tokens,
375                }),
376                ..ParsedLine::default()
377            }
378        }
379        // turn.started / turn.failed / item.updated: lifecycle / partials — ignored.
380        _ => ParsedLine::default(),
381    }
382}
383
384fn truncate(s: &str, max_chars: usize) -> String {
385    s.chars().take(max_chars).collect()
386}
387
388#[cfg(test)]
389mod tests {
390    use super::*;
391
392    #[test]
393    fn agent_message_completed_becomes_text() {
394        let line = serde_json::json!({
395            "type": "item.completed",
396            "item": { "id": "item_3", "type": "agent_message", "text": "Repo has docs and sdk." }
397        })
398        .to_string();
399        let parsed = parse_codex_line(&line);
400        assert_eq!(parsed.text.as_deref(), Some("Repo has docs and sdk."));
401        assert!(parsed.edits.is_empty());
402        assert!(parsed.activity.is_none());
403    }
404
405    // Tool-card tests grounded in codex-cli 0.125.0's `--json` schema:
406    // tool items (command_execution / web_search / file_change /
407    // mcp_tool_call) arrive on `item.completed` carrying `id`, `status`
408    // (in_progress|completed|failed) and, for commands, `exit_code`.
409    // Example (verbatim from the documented schema):
410    //   {"type":"item.completed","item":{"id":"item_2","type":
411    //    "command_execution","command":"bash -lc false",
412    //    "aggregated_output":"","exit_code":1,"status":"failed"}}
413
414    #[test]
415    fn command_execution_completed_becomes_finished_tool_card() {
416        let line = serde_json::json!({
417            "type": "item.completed",
418            "item": {
419                "id": "item_2",
420                "type": "command_execution",
421                "command": "bash -lc 'echo hi'",
422                "aggregated_output": "hi\n",
423                "exit_code": 0,
424                "status": "completed"
425            }
426        })
427        .to_string();
428        let parsed = parse_codex_line(&line);
429        let start = parsed.tool_start.expect("tool_start");
430        let end = parsed.tool_end.expect("tool_end");
431        assert_eq!(start.tool_call_id, "item_2");
432        assert_eq!(end.tool_call_id, "item_2");
433        // `name` is the tool *identifier* (the UI humanizes it); the command
434        // rides in `input`, never in the name — so it can't leak into a
435        // status line.
436        assert_eq!(start.name, "command_execution");
437        assert_eq!(start.input.as_deref(), Some("bash -lc 'echo hi'"));
438        assert_eq!(end.output.as_deref(), Some("hi\n"));
439        assert!(end.ok, "exit_code 0 → ok");
440        assert!(parsed.activity.is_none());
441        assert!(parsed.text.is_none());
442    }
443
444    #[test]
445    fn command_execution_nonzero_exit_is_error_card() {
446        let line = r#"{"type":"item.completed","item":{"id":"item_2","type":"command_execution","command":"bash -lc false","aggregated_output":"","exit_code":1,"status":"failed"}}"#;
447        let end = parse_codex_line(line).tool_end.expect("tool_end");
448        assert!(!end.ok, "exit_code 1 / status failed → error");
449    }
450
451    #[test]
452    fn web_search_completed_becomes_tool_card() {
453        let line = serde_json::json!({
454            "type": "item.completed",
455            "item": { "id": "item_5", "type": "web_search", "status": "completed" }
456        })
457        .to_string();
458        let parsed = parse_codex_line(&line);
459        assert_eq!(parsed.tool_start.expect("start").name, "web_search");
460        assert!(parsed.tool_end.expect("end").ok, "no exit_code, status completed → ok");
461    }
462
463    #[test]
464    fn started_tool_with_id_becomes_running_card() {
465        let line = serde_json::json!({
466            "type": "item.started",
467            "item": { "id": "item_1", "type": "command_execution", "command": "bash -lc ls", "status": "in_progress" }
468        })
469        .to_string();
470        let parsed = parse_codex_line(&line);
471        let start = parsed.tool_start.expect("start");
472        assert_eq!(start.name, "command_execution");
473        assert_eq!(start.input.as_deref(), Some("bash -lc ls")); // input known at start
474        assert!(parsed.tool_end.is_none(), "started → running (no end yet)");
475        assert!(parsed.activity.is_none());
476    }
477
478    #[test]
479    fn tool_without_id_degrades_to_activity() {
480        // Defensive: an item lacking `id` can't key a card, so it falls
481        // back to a plain activity line rather than vanishing.
482        let line = serde_json::json!({
483            "type": "item.completed",
484            "item": { "type": "command_execution", "command": "ls -la", "exit_code": 0 }
485        })
486        .to_string();
487        let parsed = parse_codex_line(&line);
488        // No id to key a card → a command-free human fallback (never the
489        // raw command).
490        assert_eq!(parsed.activity.as_deref(), Some("Running a command"));
491        assert!(parsed.tool_start.is_none() && parsed.tool_end.is_none());
492    }
493
494    #[test]
495    fn thread_started_yields_session_and_turn_completed_yields_usage() {
496        // thread.started → Session (thread id; codex reports no model here).
497        let session = parse_codex_line(r#"{"type":"thread.started","thread_id":"abc"}"#)
498            .session
499            .expect("session");
500        assert_eq!(session.session_id.as_deref(), Some("abc"));
501        assert_eq!(session.model, None);
502
503        // turn.completed → Usage.
504        let usage =
505            parse_codex_line(r#"{"type":"turn.completed","usage":{"input_tokens":100,"output_tokens":40}}"#)
506                .usage
507                .expect("usage");
508        assert_eq!(usage.input_tokens, Some(100));
509        assert_eq!(usage.output_tokens, Some(40));
510        assert_eq!(usage.total_tokens, Some(140));
511
512        // turn.started remains pure lifecycle.
513        assert!(parse_codex_line(r#"{"type":"turn.started"}"#).is_empty());
514    }
515
516    #[test]
517    fn error_event_becomes_activity() {
518        let line = r#"{"type":"error","message":"rate limited"}"#;
519        assert_eq!(parse_codex_line(line).activity.as_deref(), Some("rate limited"));
520    }
521
522    #[test]
523    fn non_json_is_ignored() {
524        assert!(parse_codex_line("plain text").text.is_none());
525    }
526
527    // --- CodexStreamParser: preamble-vs-answer + stderr drop ----------------
528
529    fn stdout(p: &mut CodexStreamParser, line: &str) -> Vec<RunEvent> {
530        p.on_process_event(ProcessEvent::Stdout {
531            run_id: "r".to_owned(),
532            line: line.to_owned(),
533        })
534    }
535
536    #[test]
537    fn codex_preambles_are_narration_and_only_final_message_is_the_answer() {
538        // The grounded multi-message turn (codex-cli 0.125.0): two preambles
539        // before tool calls, then the final answer — nothing on the items
540        // distinguishes them, only position does.
541        let mut p = CodexStreamParser::new();
542        let mut events = Vec::new();
543        for line in [
544            r#"{"type":"thread.started","thread_id":"t"}"#,
545            r#"{"type":"item.completed","item":{"id":"m1","type":"agent_message","text":"I’m going to read a.txt first."}}"#,
546            r#"{"type":"item.completed","item":{"id":"c1","type":"command_execution","command":"cat a.txt","aggregated_output":"alpha\n","exit_code":0,"status":"completed"}}"#,
547            r#"{"type":"item.completed","item":{"id":"m2","type":"agent_message","text":"I’m going to read b.txt next."}}"#,
548            r#"{"type":"item.completed","item":{"id":"c2","type":"command_execution","command":"cat b.txt","aggregated_output":"one\n","exit_code":0,"status":"completed"}}"#,
549            r#"{"type":"item.completed","item":{"id":"m3","type":"agent_message","text":"a.txt has more lines."}}"#,
550            r#"{"type":"turn.completed","usage":{"input_tokens":10,"output_tokens":5}}"#,
551        ] {
552            events.extend(stdout(&mut p, line));
553        }
554
555        // Exactly one answer (Text) — the FINAL message; preambles are not in it.
556        let texts: Vec<&str> = events
557            .iter()
558            .filter_map(|e| match e {
559                RunEvent::Text { delta, .. } => Some(delta.as_str()),
560                _ => None,
561            })
562            .collect();
563        assert_eq!(texts, vec!["a.txt has more lines."]);
564
565        // The two preambles surface as transient Activity (narration), in order.
566        let activity: Vec<&str> = events
567            .iter()
568            .filter_map(|e| match e {
569                RunEvent::Activity { message, .. } => Some(message.as_str()),
570                _ => None,
571            })
572            .collect();
573        assert_eq!(
574            activity,
575            vec![
576                "I’m going to read a.txt first.",
577                "I’m going to read b.txt next."
578            ]
579        );
580
581        // Tool cards, session, and usage still flow through unchanged.
582        assert_eq!(
583            events
584                .iter()
585                .filter(|e| matches!(e, RunEvent::ToolStart { .. }))
586                .count(),
587            2
588        );
589        assert!(events.iter().any(|e| matches!(e, RunEvent::Session { .. })));
590        assert!(events.iter().any(|e| matches!(e, RunEvent::Usage { .. })));
591    }
592
593    #[test]
594    fn codex_single_message_turn_is_the_answer() {
595        // No preamble: one agent_message → the answer, no spurious narration.
596        let mut p = CodexStreamParser::new();
597        let mut events = Vec::new();
598        events.extend(stdout(
599            &mut p,
600            r#"{"type":"item.completed","item":{"id":"m1","type":"agent_message","text":"Done."}}"#,
601        ));
602        events.extend(stdout(
603            &mut p,
604            r#"{"type":"turn.completed","usage":{"input_tokens":1,"output_tokens":1}}"#,
605        ));
606        let texts: Vec<&str> = events
607            .iter()
608            .filter_map(|e| match e {
609                RunEvent::Text { delta, .. } => Some(delta.as_str()),
610                _ => None,
611            })
612            .collect();
613        assert_eq!(texts, vec!["Done."]);
614        assert!(!events.iter().any(|e| matches!(e, RunEvent::Activity { .. })));
615    }
616
617    #[test]
618    fn codex_stderr_is_dropped_as_noise() {
619        let mut p = CodexStreamParser::new();
620        let out = p.on_process_event(ProcessEvent::Stderr {
621            run_id: "r".to_owned(),
622            line: "2026-05-31T05:20:28Z ERROR codex_core::memories::phase2::job: failed to claim job"
623                .to_owned(),
624        });
625        assert!(out.is_empty(), "codex stderr is tracing noise → dropped, got {out:?}");
626    }
627
628    #[test]
629    fn codex_held_answer_is_flushed_if_stream_ends_without_turn_completed() {
630        // Defensive: final message then the process exits with no
631        // `turn.completed` — the answer must not be lost.
632        let mut p = CodexStreamParser::new();
633        let _ = stdout(
634            &mut p,
635            r#"{"type":"item.completed","item":{"id":"m1","type":"agent_message","text":"Final."}}"#,
636        );
637        let out = p.on_process_event(ProcessEvent::Exited {
638            run_id: "r".to_owned(),
639            exit_code: Some(0),
640            cancelled: false,
641        });
642        assert!(
643            matches!(out.first(), Some(RunEvent::Text { delta, .. }) if delta == "Final."),
644            "held answer flushed as Text before Exited, got {out:?}"
645        );
646        assert!(matches!(out.last(), Some(RunEvent::Exited { .. })));
647    }
648}