1use serde_json::{Map, Value};
18
19use crate::events::run_events_from_parsed;
20use crate::{
21 ParsedLine, ProcessEvent, RunEvent, SessionInfo, ToolCallEnd, ToolCallStart, UsageInfo,
22};
23
24fn codex_tool_kind(item: &Map<String, Value>) -> Option<&'static str> {
31 match item.get("type").and_then(Value::as_str)? {
32 "command_execution" => Some("command_execution"),
33 "file_change" => Some("file_change"),
34 "web_search" => Some("web_search"),
35 "mcp_tool_call" => Some("mcp_tool_call"),
36 _ => None,
37 }
38}
39
40fn codex_tool_label(item: &Map<String, Value>) -> Option<String> {
46 Some(
47 match item.get("type").and_then(Value::as_str)? {
48 "command_execution" => "Running a command",
49 "file_change" => "Editing files",
50 "web_search" => "Searching the web",
51 "mcp_tool_call" => "Running a tool",
52 _ => return None,
53 }
54 .to_owned(),
55 )
56}
57
58fn codex_tool_input(item: &Map<String, Value>) -> Option<String> {
63 if item.get("type").and_then(Value::as_str) == Some("command_execution") {
64 return item
65 .get("command")
66 .and_then(Value::as_str)
67 .filter(|s| !s.is_empty())
68 .map(str::to_owned);
69 }
70 None
71}
72
73fn codex_tool_output(item: &Map<String, Value>) -> Option<String> {
76 if item.get("type").and_then(Value::as_str) == Some("command_execution") {
77 return item
78 .get("aggregated_output")
79 .and_then(Value::as_str)
80 .filter(|s| !s.is_empty())
81 .map(str::to_owned);
82 }
83 None
84}
85
86fn codex_tool_ok(item: &Map<String, Value>) -> bool {
91 if let Some(code) = item.get("exit_code").and_then(Value::as_i64) {
92 return code == 0;
93 }
94 !matches!(
95 item.get("status").and_then(Value::as_str),
96 Some("failed") | Some("error")
97 )
98}
99
100#[derive(Debug, Default)]
118pub struct CodexStreamParser {
119 pending_message: Option<String>,
122}
123
124impl CodexStreamParser {
125 pub fn new() -> Self {
126 Self::default()
127 }
128
129 pub fn on_process_event(&mut self, event: ProcessEvent) -> Vec<RunEvent> {
132 match event {
133 ProcessEvent::Stderr { .. } => Vec::new(),
136 ProcessEvent::Started { run_id } => vec![RunEvent::Started { run_id }],
137 ProcessEvent::Error { run_id, message } => {
138 let mut out = self.take_pending_as_answer(&run_id);
140 out.push(RunEvent::Error { run_id, message });
141 out
142 }
143 ProcessEvent::Exited {
144 run_id,
145 exit_code,
146 cancelled,
147 } => {
148 let mut out = self.take_pending_as_answer(&run_id);
152 out.push(RunEvent::Exited {
153 run_id,
154 exit_code,
155 cancelled,
156 });
157 out
158 }
159 ProcessEvent::Stdout { run_id, line } => self.on_stdout(&run_id, &line),
160 _ => Vec::new(),
162 }
163 }
164
165 fn on_stdout(&mut self, run_id: &str, line: &str) -> Vec<RunEvent> {
166 let value = serde_json::from_str::<Value>(line.trim()).ok();
167 let typ = value
168 .as_ref()
169 .and_then(Value::as_object)
170 .and_then(|o| o.get("type"))
171 .and_then(Value::as_str);
172
173 if let Some(text) = value.as_ref().and_then(codex_agent_message_text) {
176 let out = self.take_pending_as_preamble(run_id);
177 if !text.is_empty() {
178 self.pending_message = Some(text);
179 }
180 return out;
181 }
182
183 let mut out = if typ == Some("turn.completed") {
186 self.take_pending_as_answer(run_id)
187 } else {
188 self.take_pending_as_preamble(run_id)
189 };
190 out.extend(run_events_from_parsed(run_id, parse_codex_line(line)));
193 out
194 }
195
196 fn take_pending_as_preamble(&mut self, run_id: &str) -> Vec<RunEvent> {
198 match self.pending_message.take() {
199 Some(text) if !text.is_empty() => vec![RunEvent::Activity {
200 run_id: run_id.to_owned(),
201 message: text,
202 }],
203 _ => Vec::new(),
204 }
205 }
206
207 fn take_pending_as_answer(&mut self, run_id: &str) -> Vec<RunEvent> {
209 match self.pending_message.take() {
210 Some(text) if !text.is_empty() => vec![RunEvent::Text {
211 run_id: run_id.to_owned(),
212 delta: text,
213 }],
214 _ => Vec::new(),
215 }
216 }
217}
218
219fn codex_agent_message_text(value: &Value) -> Option<String> {
223 let obj = value.as_object()?;
224 if obj.get("type").and_then(Value::as_str) != Some("item.completed") {
225 return None;
226 }
227 let item = obj.get("item").and_then(Value::as_object)?;
228 if item.get("type").and_then(Value::as_str) != Some("agent_message") {
229 return None;
230 }
231 Some(
232 item.get("text")
233 .and_then(Value::as_str)
234 .unwrap_or_default()
235 .to_owned(),
236 )
237}
238
239pub fn parse_codex_line(line: &str) -> ParsedLine {
247 let trimmed = line.trim();
248 if trimmed.is_empty() {
249 return ParsedLine::default();
250 }
251 let Ok(value) = serde_json::from_str::<Value>(trimmed) else {
252 return ParsedLine::default();
253 };
254 let Some(obj) = value.as_object() else {
255 return ParsedLine::default();
256 };
257
258 match obj.get("type").and_then(Value::as_str) {
259 Some("item.completed") => {
260 let Some(item) = obj.get("item").and_then(Value::as_object) else {
261 return ParsedLine::default();
262 };
263 if item.get("type").and_then(Value::as_str) == Some("agent_message") {
265 if let Some(text) = item.get("text").and_then(Value::as_str) {
266 if !text.is_empty() {
267 return ParsedLine {
268 text: Some(text.to_owned()),
269 ..ParsedLine::default()
270 };
271 }
272 }
273 }
274 if let Some(kind) = codex_tool_kind(item) {
284 return match item.get("id").and_then(Value::as_str) {
285 Some(id) => {
286 let id = id.to_owned();
287 ParsedLine {
288 tool_start: Some(ToolCallStart {
289 tool_call_id: id.clone(),
290 name: kind.to_owned(),
291 input: codex_tool_input(item),
292 }),
293 tool_end: Some(ToolCallEnd {
294 tool_call_id: id,
295 ok: codex_tool_ok(item),
296 output: codex_tool_output(item),
297 }),
298 ..ParsedLine::default()
299 }
300 }
301 None => ParsedLine {
302 activity: codex_tool_label(item),
303 ..ParsedLine::default()
304 },
305 };
306 }
307 ParsedLine::default()
308 }
309 Some("item.started") => {
310 let Some(item) = obj.get("item").and_then(Value::as_object) else {
311 return ParsedLine::default();
312 };
313 if let Some(kind) = codex_tool_kind(item) {
318 return match item.get("id").and_then(Value::as_str) {
319 Some(id) => ParsedLine {
320 tool_start: Some(ToolCallStart {
321 tool_call_id: id.to_owned(),
322 name: kind.to_owned(),
323 input: codex_tool_input(item),
324 }),
325 ..ParsedLine::default()
326 },
327 None => ParsedLine {
328 activity: codex_tool_label(item),
329 ..ParsedLine::default()
330 },
331 };
332 }
333 ParsedLine::default()
334 }
335 Some("error") => {
336 let message = obj
337 .get("message")
338 .and_then(Value::as_str)
339 .unwrap_or("Codex error");
340 ParsedLine {
341 activity: Some(truncate(message, 240)),
342 ..ParsedLine::default()
343 }
344 }
345 Some("thread.started") => ParsedLine {
348 session: Some(SessionInfo {
349 session_id: obj
350 .get("thread_id")
351 .and_then(Value::as_str)
352 .filter(|s| !s.is_empty())
353 .map(str::to_owned),
354 model: None,
355 }),
356 ..ParsedLine::default()
357 },
358 Some("turn.completed") => {
360 let usage = obj.get("usage").and_then(Value::as_object);
361 let input_tokens = usage.and_then(|u| u.get("input_tokens")).and_then(Value::as_u64);
362 let output_tokens = usage.and_then(|u| u.get("output_tokens")).and_then(Value::as_u64);
363 if input_tokens.is_none() && output_tokens.is_none() {
364 return ParsedLine::default();
365 }
366 let total_tokens = match (input_tokens, output_tokens) {
367 (Some(i), Some(o)) => Some(i + o),
368 _ => None,
369 };
370 ParsedLine {
371 usage: Some(UsageInfo {
372 input_tokens,
373 output_tokens,
374 total_tokens,
375 }),
376 ..ParsedLine::default()
377 }
378 }
379 _ => ParsedLine::default(),
381 }
382}
383
384fn truncate(s: &str, max_chars: usize) -> String {
385 s.chars().take(max_chars).collect()
386}
387
388#[cfg(test)]
389mod tests {
390 use super::*;
391
392 #[test]
393 fn agent_message_completed_becomes_text() {
394 let line = serde_json::json!({
395 "type": "item.completed",
396 "item": { "id": "item_3", "type": "agent_message", "text": "Repo has docs and sdk." }
397 })
398 .to_string();
399 let parsed = parse_codex_line(&line);
400 assert_eq!(parsed.text.as_deref(), Some("Repo has docs and sdk."));
401 assert!(parsed.edits.is_empty());
402 assert!(parsed.activity.is_none());
403 }
404
405 #[test]
415 fn command_execution_completed_becomes_finished_tool_card() {
416 let line = serde_json::json!({
417 "type": "item.completed",
418 "item": {
419 "id": "item_2",
420 "type": "command_execution",
421 "command": "bash -lc 'echo hi'",
422 "aggregated_output": "hi\n",
423 "exit_code": 0,
424 "status": "completed"
425 }
426 })
427 .to_string();
428 let parsed = parse_codex_line(&line);
429 let start = parsed.tool_start.expect("tool_start");
430 let end = parsed.tool_end.expect("tool_end");
431 assert_eq!(start.tool_call_id, "item_2");
432 assert_eq!(end.tool_call_id, "item_2");
433 assert_eq!(start.name, "command_execution");
437 assert_eq!(start.input.as_deref(), Some("bash -lc 'echo hi'"));
438 assert_eq!(end.output.as_deref(), Some("hi\n"));
439 assert!(end.ok, "exit_code 0 → ok");
440 assert!(parsed.activity.is_none());
441 assert!(parsed.text.is_none());
442 }
443
444 #[test]
445 fn command_execution_nonzero_exit_is_error_card() {
446 let line = r#"{"type":"item.completed","item":{"id":"item_2","type":"command_execution","command":"bash -lc false","aggregated_output":"","exit_code":1,"status":"failed"}}"#;
447 let end = parse_codex_line(line).tool_end.expect("tool_end");
448 assert!(!end.ok, "exit_code 1 / status failed → error");
449 }
450
451 #[test]
452 fn web_search_completed_becomes_tool_card() {
453 let line = serde_json::json!({
454 "type": "item.completed",
455 "item": { "id": "item_5", "type": "web_search", "status": "completed" }
456 })
457 .to_string();
458 let parsed = parse_codex_line(&line);
459 assert_eq!(parsed.tool_start.expect("start").name, "web_search");
460 assert!(parsed.tool_end.expect("end").ok, "no exit_code, status completed → ok");
461 }
462
463 #[test]
464 fn started_tool_with_id_becomes_running_card() {
465 let line = serde_json::json!({
466 "type": "item.started",
467 "item": { "id": "item_1", "type": "command_execution", "command": "bash -lc ls", "status": "in_progress" }
468 })
469 .to_string();
470 let parsed = parse_codex_line(&line);
471 let start = parsed.tool_start.expect("start");
472 assert_eq!(start.name, "command_execution");
473 assert_eq!(start.input.as_deref(), Some("bash -lc ls")); assert!(parsed.tool_end.is_none(), "started → running (no end yet)");
475 assert!(parsed.activity.is_none());
476 }
477
478 #[test]
479 fn tool_without_id_degrades_to_activity() {
480 let line = serde_json::json!({
483 "type": "item.completed",
484 "item": { "type": "command_execution", "command": "ls -la", "exit_code": 0 }
485 })
486 .to_string();
487 let parsed = parse_codex_line(&line);
488 assert_eq!(parsed.activity.as_deref(), Some("Running a command"));
491 assert!(parsed.tool_start.is_none() && parsed.tool_end.is_none());
492 }
493
494 #[test]
495 fn thread_started_yields_session_and_turn_completed_yields_usage() {
496 let session = parse_codex_line(r#"{"type":"thread.started","thread_id":"abc"}"#)
498 .session
499 .expect("session");
500 assert_eq!(session.session_id.as_deref(), Some("abc"));
501 assert_eq!(session.model, None);
502
503 let usage =
505 parse_codex_line(r#"{"type":"turn.completed","usage":{"input_tokens":100,"output_tokens":40}}"#)
506 .usage
507 .expect("usage");
508 assert_eq!(usage.input_tokens, Some(100));
509 assert_eq!(usage.output_tokens, Some(40));
510 assert_eq!(usage.total_tokens, Some(140));
511
512 assert!(parse_codex_line(r#"{"type":"turn.started"}"#).is_empty());
514 }
515
516 #[test]
517 fn error_event_becomes_activity() {
518 let line = r#"{"type":"error","message":"rate limited"}"#;
519 assert_eq!(parse_codex_line(line).activity.as_deref(), Some("rate limited"));
520 }
521
522 #[test]
523 fn non_json_is_ignored() {
524 assert!(parse_codex_line("plain text").text.is_none());
525 }
526
527 fn stdout(p: &mut CodexStreamParser, line: &str) -> Vec<RunEvent> {
530 p.on_process_event(ProcessEvent::Stdout {
531 run_id: "r".to_owned(),
532 line: line.to_owned(),
533 })
534 }
535
536 #[test]
537 fn codex_preambles_are_narration_and_only_final_message_is_the_answer() {
538 let mut p = CodexStreamParser::new();
542 let mut events = Vec::new();
543 for line in [
544 r#"{"type":"thread.started","thread_id":"t"}"#,
545 r#"{"type":"item.completed","item":{"id":"m1","type":"agent_message","text":"I’m going to read a.txt first."}}"#,
546 r#"{"type":"item.completed","item":{"id":"c1","type":"command_execution","command":"cat a.txt","aggregated_output":"alpha\n","exit_code":0,"status":"completed"}}"#,
547 r#"{"type":"item.completed","item":{"id":"m2","type":"agent_message","text":"I’m going to read b.txt next."}}"#,
548 r#"{"type":"item.completed","item":{"id":"c2","type":"command_execution","command":"cat b.txt","aggregated_output":"one\n","exit_code":0,"status":"completed"}}"#,
549 r#"{"type":"item.completed","item":{"id":"m3","type":"agent_message","text":"a.txt has more lines."}}"#,
550 r#"{"type":"turn.completed","usage":{"input_tokens":10,"output_tokens":5}}"#,
551 ] {
552 events.extend(stdout(&mut p, line));
553 }
554
555 let texts: Vec<&str> = events
557 .iter()
558 .filter_map(|e| match e {
559 RunEvent::Text { delta, .. } => Some(delta.as_str()),
560 _ => None,
561 })
562 .collect();
563 assert_eq!(texts, vec!["a.txt has more lines."]);
564
565 let activity: Vec<&str> = events
567 .iter()
568 .filter_map(|e| match e {
569 RunEvent::Activity { message, .. } => Some(message.as_str()),
570 _ => None,
571 })
572 .collect();
573 assert_eq!(
574 activity,
575 vec![
576 "I’m going to read a.txt first.",
577 "I’m going to read b.txt next."
578 ]
579 );
580
581 assert_eq!(
583 events
584 .iter()
585 .filter(|e| matches!(e, RunEvent::ToolStart { .. }))
586 .count(),
587 2
588 );
589 assert!(events.iter().any(|e| matches!(e, RunEvent::Session { .. })));
590 assert!(events.iter().any(|e| matches!(e, RunEvent::Usage { .. })));
591 }
592
593 #[test]
594 fn codex_single_message_turn_is_the_answer() {
595 let mut p = CodexStreamParser::new();
597 let mut events = Vec::new();
598 events.extend(stdout(
599 &mut p,
600 r#"{"type":"item.completed","item":{"id":"m1","type":"agent_message","text":"Done."}}"#,
601 ));
602 events.extend(stdout(
603 &mut p,
604 r#"{"type":"turn.completed","usage":{"input_tokens":1,"output_tokens":1}}"#,
605 ));
606 let texts: Vec<&str> = events
607 .iter()
608 .filter_map(|e| match e {
609 RunEvent::Text { delta, .. } => Some(delta.as_str()),
610 _ => None,
611 })
612 .collect();
613 assert_eq!(texts, vec!["Done."]);
614 assert!(!events.iter().any(|e| matches!(e, RunEvent::Activity { .. })));
615 }
616
617 #[test]
618 fn codex_stderr_is_dropped_as_noise() {
619 let mut p = CodexStreamParser::new();
620 let out = p.on_process_event(ProcessEvent::Stderr {
621 run_id: "r".to_owned(),
622 line: "2026-05-31T05:20:28Z ERROR codex_core::memories::phase2::job: failed to claim job"
623 .to_owned(),
624 });
625 assert!(out.is_empty(), "codex stderr is tracing noise → dropped, got {out:?}");
626 }
627
628 #[test]
629 fn codex_held_answer_is_flushed_if_stream_ends_without_turn_completed() {
630 let mut p = CodexStreamParser::new();
633 let _ = stdout(
634 &mut p,
635 r#"{"type":"item.completed","item":{"id":"m1","type":"agent_message","text":"Final."}}"#,
636 );
637 let out = p.on_process_event(ProcessEvent::Exited {
638 run_id: "r".to_owned(),
639 exit_code: Some(0),
640 cancelled: false,
641 });
642 assert!(
643 matches!(out.first(), Some(RunEvent::Text { delta, .. }) if delta == "Final."),
644 "held answer flushed as Text before Exited, got {out:?}"
645 );
646 assert!(matches!(out.last(), Some(RunEvent::Exited { .. })));
647 }
648}