use serde_json::{Map, Value};
use crate::events::run_events_from_parsed;
use crate::{
ParsedLine, ProcessEvent, RunEvent, SessionInfo, ToolCallEnd, ToolCallStart, UsageInfo,
};
fn codex_tool_kind(item: &Map<String, Value>) -> Option<&'static str> {
match item.get("type").and_then(Value::as_str)? {
"command_execution" => Some("command_execution"),
"file_change" => Some("file_change"),
"web_search" => Some("web_search"),
"mcp_tool_call" => Some("mcp_tool_call"),
_ => None,
}
}
fn codex_tool_label(item: &Map<String, Value>) -> Option<String> {
Some(
match item.get("type").and_then(Value::as_str)? {
"command_execution" => "Running a command",
"file_change" => "Editing files",
"web_search" => "Searching the web",
"mcp_tool_call" => "Running a tool",
_ => return None,
}
.to_owned(),
)
}
fn codex_tool_input(item: &Map<String, Value>) -> Option<String> {
if item.get("type").and_then(Value::as_str) == Some("command_execution") {
return item
.get("command")
.and_then(Value::as_str)
.filter(|s| !s.is_empty())
.map(str::to_owned);
}
None
}
fn codex_tool_output(item: &Map<String, Value>) -> Option<String> {
if item.get("type").and_then(Value::as_str) == Some("command_execution") {
return item
.get("aggregated_output")
.and_then(Value::as_str)
.filter(|s| !s.is_empty())
.map(str::to_owned);
}
None
}
fn codex_tool_ok(item: &Map<String, Value>) -> bool {
if let Some(code) = item.get("exit_code").and_then(Value::as_i64) {
return code == 0;
}
!matches!(
item.get("status").and_then(Value::as_str),
Some("failed") | Some("error")
)
}
#[derive(Debug, Default)]
pub struct CodexStreamParser {
pending_message: Option<String>,
}
impl CodexStreamParser {
pub fn new() -> Self {
Self::default()
}
pub fn on_process_event(&mut self, event: ProcessEvent) -> Vec<RunEvent> {
match event {
ProcessEvent::Stderr { .. } => Vec::new(),
ProcessEvent::Started { run_id } => vec![RunEvent::Started { run_id }],
ProcessEvent::Error { run_id, message } => {
let mut out = self.take_pending_as_answer(&run_id);
out.push(RunEvent::Error { run_id, message });
out
}
ProcessEvent::Exited {
run_id,
exit_code,
cancelled,
} => {
let mut out = self.take_pending_as_answer(&run_id);
out.push(RunEvent::Exited {
run_id,
exit_code,
cancelled,
});
out
}
ProcessEvent::Stdout { run_id, line } => self.on_stdout(&run_id, &line),
_ => Vec::new(),
}
}
fn on_stdout(&mut self, run_id: &str, line: &str) -> Vec<RunEvent> {
let value = serde_json::from_str::<Value>(line.trim()).ok();
let typ = value
.as_ref()
.and_then(Value::as_object)
.and_then(|o| o.get("type"))
.and_then(Value::as_str);
if let Some(text) = value.as_ref().and_then(codex_agent_message_text) {
let out = self.take_pending_as_preamble(run_id);
if !text.is_empty() {
self.pending_message = Some(text);
}
return out;
}
let mut out = if typ == Some("turn.completed") {
self.take_pending_as_answer(run_id)
} else {
self.take_pending_as_preamble(run_id)
};
out.extend(run_events_from_parsed(run_id, parse_codex_line(line)));
out
}
fn take_pending_as_preamble(&mut self, run_id: &str) -> Vec<RunEvent> {
match self.pending_message.take() {
Some(text) if !text.is_empty() => vec![RunEvent::Activity {
run_id: run_id.to_owned(),
message: text,
}],
_ => Vec::new(),
}
}
fn take_pending_as_answer(&mut self, run_id: &str) -> Vec<RunEvent> {
match self.pending_message.take() {
Some(text) if !text.is_empty() => vec![RunEvent::Text {
run_id: run_id.to_owned(),
delta: text,
}],
_ => Vec::new(),
}
}
}
fn codex_agent_message_text(value: &Value) -> Option<String> {
let obj = value.as_object()?;
if obj.get("type").and_then(Value::as_str) != Some("item.completed") {
return None;
}
let item = obj.get("item").and_then(Value::as_object)?;
if item.get("type").and_then(Value::as_str) != Some("agent_message") {
return None;
}
Some(
item.get("text")
.and_then(Value::as_str)
.unwrap_or_default()
.to_owned(),
)
}
pub fn parse_codex_line(line: &str) -> ParsedLine {
let trimmed = line.trim();
if trimmed.is_empty() {
return ParsedLine::default();
}
let Ok(value) = serde_json::from_str::<Value>(trimmed) else {
return ParsedLine::default();
};
let Some(obj) = value.as_object() else {
return ParsedLine::default();
};
match obj.get("type").and_then(Value::as_str) {
Some("item.completed") => {
let Some(item) = obj.get("item").and_then(Value::as_object) else {
return ParsedLine::default();
};
if item.get("type").and_then(Value::as_str) == Some("agent_message") {
if let Some(text) = item.get("text").and_then(Value::as_str) {
if !text.is_empty() {
return ParsedLine {
text: Some(text.to_owned()),
..ParsedLine::default()
};
}
}
}
if let Some(kind) = codex_tool_kind(item) {
return match item.get("id").and_then(Value::as_str) {
Some(id) => {
let id = id.to_owned();
ParsedLine {
tool_start: Some(ToolCallStart {
tool_call_id: id.clone(),
name: kind.to_owned(),
input: codex_tool_input(item),
}),
tool_end: Some(ToolCallEnd {
tool_call_id: id,
ok: codex_tool_ok(item),
output: codex_tool_output(item),
}),
..ParsedLine::default()
}
}
None => ParsedLine {
activity: codex_tool_label(item),
..ParsedLine::default()
},
};
}
ParsedLine::default()
}
Some("item.started") => {
let Some(item) = obj.get("item").and_then(Value::as_object) else {
return ParsedLine::default();
};
if let Some(kind) = codex_tool_kind(item) {
return match item.get("id").and_then(Value::as_str) {
Some(id) => ParsedLine {
tool_start: Some(ToolCallStart {
tool_call_id: id.to_owned(),
name: kind.to_owned(),
input: codex_tool_input(item),
}),
..ParsedLine::default()
},
None => ParsedLine {
activity: codex_tool_label(item),
..ParsedLine::default()
},
};
}
ParsedLine::default()
}
Some("error") => {
let message = obj
.get("message")
.and_then(Value::as_str)
.unwrap_or("Codex error");
ParsedLine {
activity: Some(truncate(message, 240)),
..ParsedLine::default()
}
}
Some("thread.started") => ParsedLine {
session: Some(SessionInfo {
session_id: obj
.get("thread_id")
.and_then(Value::as_str)
.filter(|s| !s.is_empty())
.map(str::to_owned),
model: None,
}),
..ParsedLine::default()
},
Some("turn.completed") => {
let usage = obj.get("usage").and_then(Value::as_object);
let input_tokens = usage.and_then(|u| u.get("input_tokens")).and_then(Value::as_u64);
let output_tokens = usage.and_then(|u| u.get("output_tokens")).and_then(Value::as_u64);
if input_tokens.is_none() && output_tokens.is_none() {
return ParsedLine::default();
}
let total_tokens = match (input_tokens, output_tokens) {
(Some(i), Some(o)) => Some(i + o),
_ => None,
};
ParsedLine {
usage: Some(UsageInfo {
input_tokens,
output_tokens,
total_tokens,
}),
..ParsedLine::default()
}
}
_ => ParsedLine::default(),
}
}
fn truncate(s: &str, max_chars: usize) -> String {
s.chars().take(max_chars).collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn agent_message_completed_becomes_text() {
let line = serde_json::json!({
"type": "item.completed",
"item": { "id": "item_3", "type": "agent_message", "text": "Repo has docs and sdk." }
})
.to_string();
let parsed = parse_codex_line(&line);
assert_eq!(parsed.text.as_deref(), Some("Repo has docs and sdk."));
assert!(parsed.edits.is_empty());
assert!(parsed.activity.is_none());
}
#[test]
fn command_execution_completed_becomes_finished_tool_card() {
let line = serde_json::json!({
"type": "item.completed",
"item": {
"id": "item_2",
"type": "command_execution",
"command": "bash -lc 'echo hi'",
"aggregated_output": "hi\n",
"exit_code": 0,
"status": "completed"
}
})
.to_string();
let parsed = parse_codex_line(&line);
let start = parsed.tool_start.expect("tool_start");
let end = parsed.tool_end.expect("tool_end");
assert_eq!(start.tool_call_id, "item_2");
assert_eq!(end.tool_call_id, "item_2");
assert_eq!(start.name, "command_execution");
assert_eq!(start.input.as_deref(), Some("bash -lc 'echo hi'"));
assert_eq!(end.output.as_deref(), Some("hi\n"));
assert!(end.ok, "exit_code 0 → ok");
assert!(parsed.activity.is_none());
assert!(parsed.text.is_none());
}
#[test]
fn command_execution_nonzero_exit_is_error_card() {
let line = r#"{"type":"item.completed","item":{"id":"item_2","type":"command_execution","command":"bash -lc false","aggregated_output":"","exit_code":1,"status":"failed"}}"#;
let end = parse_codex_line(line).tool_end.expect("tool_end");
assert!(!end.ok, "exit_code 1 / status failed → error");
}
#[test]
fn web_search_completed_becomes_tool_card() {
let line = serde_json::json!({
"type": "item.completed",
"item": { "id": "item_5", "type": "web_search", "status": "completed" }
})
.to_string();
let parsed = parse_codex_line(&line);
assert_eq!(parsed.tool_start.expect("start").name, "web_search");
assert!(parsed.tool_end.expect("end").ok, "no exit_code, status completed → ok");
}
#[test]
fn started_tool_with_id_becomes_running_card() {
let line = serde_json::json!({
"type": "item.started",
"item": { "id": "item_1", "type": "command_execution", "command": "bash -lc ls", "status": "in_progress" }
})
.to_string();
let parsed = parse_codex_line(&line);
let start = parsed.tool_start.expect("start");
assert_eq!(start.name, "command_execution");
assert_eq!(start.input.as_deref(), Some("bash -lc ls")); assert!(parsed.tool_end.is_none(), "started → running (no end yet)");
assert!(parsed.activity.is_none());
}
#[test]
fn tool_without_id_degrades_to_activity() {
let line = serde_json::json!({
"type": "item.completed",
"item": { "type": "command_execution", "command": "ls -la", "exit_code": 0 }
})
.to_string();
let parsed = parse_codex_line(&line);
assert_eq!(parsed.activity.as_deref(), Some("Running a command"));
assert!(parsed.tool_start.is_none() && parsed.tool_end.is_none());
}
#[test]
fn thread_started_yields_session_and_turn_completed_yields_usage() {
let session = parse_codex_line(r#"{"type":"thread.started","thread_id":"abc"}"#)
.session
.expect("session");
assert_eq!(session.session_id.as_deref(), Some("abc"));
assert_eq!(session.model, None);
let usage =
parse_codex_line(r#"{"type":"turn.completed","usage":{"input_tokens":100,"output_tokens":40}}"#)
.usage
.expect("usage");
assert_eq!(usage.input_tokens, Some(100));
assert_eq!(usage.output_tokens, Some(40));
assert_eq!(usage.total_tokens, Some(140));
assert!(parse_codex_line(r#"{"type":"turn.started"}"#).is_empty());
}
#[test]
fn error_event_becomes_activity() {
let line = r#"{"type":"error","message":"rate limited"}"#;
assert_eq!(parse_codex_line(line).activity.as_deref(), Some("rate limited"));
}
#[test]
fn non_json_is_ignored() {
assert!(parse_codex_line("plain text").text.is_none());
}
fn stdout(p: &mut CodexStreamParser, line: &str) -> Vec<RunEvent> {
p.on_process_event(ProcessEvent::Stdout {
run_id: "r".to_owned(),
line: line.to_owned(),
})
}
#[test]
fn codex_preambles_are_narration_and_only_final_message_is_the_answer() {
let mut p = CodexStreamParser::new();
let mut events = Vec::new();
for line in [
r#"{"type":"thread.started","thread_id":"t"}"#,
r#"{"type":"item.completed","item":{"id":"m1","type":"agent_message","text":"I’m going to read a.txt first."}}"#,
r#"{"type":"item.completed","item":{"id":"c1","type":"command_execution","command":"cat a.txt","aggregated_output":"alpha\n","exit_code":0,"status":"completed"}}"#,
r#"{"type":"item.completed","item":{"id":"m2","type":"agent_message","text":"I’m going to read b.txt next."}}"#,
r#"{"type":"item.completed","item":{"id":"c2","type":"command_execution","command":"cat b.txt","aggregated_output":"one\n","exit_code":0,"status":"completed"}}"#,
r#"{"type":"item.completed","item":{"id":"m3","type":"agent_message","text":"a.txt has more lines."}}"#,
r#"{"type":"turn.completed","usage":{"input_tokens":10,"output_tokens":5}}"#,
] {
events.extend(stdout(&mut p, line));
}
let texts: Vec<&str> = events
.iter()
.filter_map(|e| match e {
RunEvent::Text { delta, .. } => Some(delta.as_str()),
_ => None,
})
.collect();
assert_eq!(texts, vec!["a.txt has more lines."]);
let activity: Vec<&str> = events
.iter()
.filter_map(|e| match e {
RunEvent::Activity { message, .. } => Some(message.as_str()),
_ => None,
})
.collect();
assert_eq!(
activity,
vec![
"I’m going to read a.txt first.",
"I’m going to read b.txt next."
]
);
assert_eq!(
events
.iter()
.filter(|e| matches!(e, RunEvent::ToolStart { .. }))
.count(),
2
);
assert!(events.iter().any(|e| matches!(e, RunEvent::Session { .. })));
assert!(events.iter().any(|e| matches!(e, RunEvent::Usage { .. })));
}
#[test]
fn codex_single_message_turn_is_the_answer() {
let mut p = CodexStreamParser::new();
let mut events = Vec::new();
events.extend(stdout(
&mut p,
r#"{"type":"item.completed","item":{"id":"m1","type":"agent_message","text":"Done."}}"#,
));
events.extend(stdout(
&mut p,
r#"{"type":"turn.completed","usage":{"input_tokens":1,"output_tokens":1}}"#,
));
let texts: Vec<&str> = events
.iter()
.filter_map(|e| match e {
RunEvent::Text { delta, .. } => Some(delta.as_str()),
_ => None,
})
.collect();
assert_eq!(texts, vec!["Done."]);
assert!(!events.iter().any(|e| matches!(e, RunEvent::Activity { .. })));
}
#[test]
fn codex_stderr_is_dropped_as_noise() {
let mut p = CodexStreamParser::new();
let out = p.on_process_event(ProcessEvent::Stderr {
run_id: "r".to_owned(),
line: "2026-05-31T05:20:28Z ERROR codex_core::memories::phase2::job: failed to claim job"
.to_owned(),
});
assert!(out.is_empty(), "codex stderr is tracing noise → dropped, got {out:?}");
}
#[test]
fn codex_held_answer_is_flushed_if_stream_ends_without_turn_completed() {
let mut p = CodexStreamParser::new();
let _ = stdout(
&mut p,
r#"{"type":"item.completed","item":{"id":"m1","type":"agent_message","text":"Final."}}"#,
);
let out = p.on_process_event(ProcessEvent::Exited {
run_id: "r".to_owned(),
exit_code: Some(0),
cancelled: false,
});
assert!(
matches!(out.first(), Some(RunEvent::Text { delta, .. }) if delta == "Final."),
"held answer flushed as Text before Exited, got {out:?}"
);
assert!(matches!(out.last(), Some(RunEvent::Exited { .. })));
}
}