use async_trait::async_trait;
use serde::Deserialize;
use crate::agent::log_line::{Level, LogLine};
use crate::agent::{LogProcessor, TaskResult};
#[derive(Debug, Deserialize)]
struct CodexEvent {
#[serde(rename = "type")]
event_type: String,
item: Option<ItemData>,
usage: Option<UsageData>,
error: Option<ErrorData>,
}
#[derive(Debug, Deserialize)]
struct ItemData {
#[serde(rename = "type")]
item_type: String,
command: Option<String>,
text: Option<String>,
aggregated_output: Option<String>,
exit_code: Option<i32>,
changes: Option<Vec<FileChange>>,
}
#[derive(Debug, Deserialize)]
struct FileChange {
path: String,
}
#[derive(Debug, Deserialize)]
struct UsageData {
input_tokens: Option<u64>,
output_tokens: Option<u64>,
cached_input_tokens: Option<u64>,
}
#[derive(Debug, Deserialize)]
struct ErrorData {
message: String,
}
pub struct CodexLogProcessor {
final_result: Option<TaskResult>,
json_mode_active: bool,
last_line_was_parse_error: bool,
}
impl CodexLogProcessor {
pub fn new() -> Self {
Self {
final_result: None,
json_mode_active: false,
last_line_was_parse_error: false,
}
}
fn tags() -> Vec<String> {
vec!["codex".into()]
}
fn simplify_command(&self, command: &str) -> String {
if let Some(inner) = command.strip_prefix("bash -lc '")
&& let Some(cmd) = inner.strip_suffix('\'')
{
return cmd.to_string();
}
if let Some(inner) = command.strip_prefix("bash -lc \"")
&& let Some(cmd) = inner.strip_suffix('"')
{
return cmd.to_string();
}
if let Some(inner) = command.strip_prefix("bash -lc ") {
return inner.to_string();
}
command.to_string()
}
fn format_event(&mut self, event: CodexEvent) -> Option<LogLine> {
match event.event_type.as_str() {
"thread.started" => None, "turn.started" => None, "turn.completed" => self.format_turn_completed(event),
"turn.failed" => self.format_turn_failed(event),
"item.started" => self.format_item_started(event),
"item.updated" => None, "item.completed" => self.format_item_completed(event),
"error" => self.format_error(event),
_ => {
None
}
}
}
fn format_turn_completed(&mut self, event: CodexEvent) -> Option<LogLine> {
if let Some(usage) = event.usage {
self.final_result = Some(TaskResult {
success: true,
message: "Task completed successfully".to_string(),
cost_usd: None,
duration_ms: None,
});
let input = usage.input_tokens.unwrap_or(0);
let output = usage.output_tokens.unwrap_or(0);
let cached = usage.cached_input_tokens.unwrap_or(0);
Some(LogLine::summary(
true,
format!(
"Task completed - {} input tokens, {} output tokens, {} cached tokens",
input, output, cached
),
None,
None,
None,
))
} else {
Some(LogLine::summary(
true,
"Task completed".into(),
None,
None,
None,
))
}
}
fn format_turn_failed(&mut self, event: CodexEvent) -> Option<LogLine> {
let error_msg = event
.error
.as_ref()
.map(|e| e.message.as_str())
.unwrap_or("Unknown error");
self.final_result = Some(TaskResult {
success: false,
message: format!("Turn failed: {}", error_msg),
cost_usd: None,
duration_ms: None,
});
Some(LogLine::summary(
false,
format!("Turn failed: {}", error_msg),
None,
None,
None,
))
}
fn format_item_started(&mut self, event: CodexEvent) -> Option<LogLine> {
if let Some(item) = event.item {
let tags = Self::tags();
match item.item_type.as_str() {
"command_execution" => {
let cmd = item.command.as_deref().unwrap_or("unknown");
let simplified_cmd = self.simplify_command(cmd);
Some(LogLine::message(
tags,
Some("command_execution".into()),
format!("Running: {}", simplified_cmd),
))
}
"agent_message" => None, "reasoning" => Some(LogLine::message(
tags,
Some("reasoning".into()),
"Reasoning...".into(),
)),
"file_change" => Some(LogLine::message(
tags,
Some("file_change".into()),
"Modifying file...".into(),
)),
"mcp_tool_call" => Some(LogLine::message(
tags,
Some("mcp_tool_call".into()),
"Calling tool...".into(),
)),
"web_search" => Some(LogLine::message(
tags,
Some("web_search".into()),
"Searching web...".into(),
)),
"todo_list" => None, _ => {
Some(LogLine::message(
tags,
Some(item.item_type.clone()),
format!("{}: started", item.item_type),
))
}
}
} else {
None
}
}
fn format_item_completed(&mut self, event: CodexEvent) -> Option<LogLine> {
if let Some(item) = event.item {
let tags = Self::tags();
match item.item_type.as_str() {
"command_execution" => {
let exit_code = item.exit_code.unwrap_or(-1);
let mut result_msg = String::new();
if let Some(stdout) = &item.aggregated_output {
if stdout.contains("test result: ok") {
result_msg = " - Tests passed".to_string();
} else if stdout.contains("test result: FAILED") {
result_msg = " - Tests failed".to_string();
}
}
let level = if result_msg.contains("Tests passed") {
Level::Success
} else if result_msg.contains("Tests failed") || exit_code != 0 {
Level::Error
} else {
Level::Info
};
if exit_code != 0
&& result_msg.is_empty()
&& let Some(stdout) = &item.aggregated_output
{
let preview = stdout.lines().next().unwrap_or("").trim();
if !preview.is_empty() {
if preview.len() > 60 {
result_msg = format!(" - {}...", &preview[..60]);
} else {
result_msg = format!(" - {}", preview);
}
}
}
Some(LogLine::Message {
level,
tags,
tool: Some("command_execution".into()),
message: format!("Command completed (exit: {}){}", exit_code, result_msg),
})
}
"agent_message" => {
if let Some(text) = item.text {
if !text.trim().is_empty() {
Some(LogLine::message(tags, None, text.trim().to_string()))
} else {
None
}
} else {
None
}
}
"reasoning" => {
if let Some(text) = item.text {
let snippet = text.lines().next().unwrap_or("").trim();
if snippet.len() > 80 {
Some(LogLine::message(
tags,
Some("reasoning".into()),
format!("{}...", &snippet[..77]),
))
} else if !snippet.is_empty() {
Some(LogLine::message(
tags,
Some("reasoning".into()),
snippet.to_string(),
))
} else {
None
}
} else {
None
}
}
"file_change" => {
if let Some(changes) = &item.changes {
let filenames: Vec<String> = changes
.iter()
.map(|c| {
c.path
.strip_prefix("/workspace/")
.and_then(|rest| rest.split_once('/').map(|(_, after)| after))
.unwrap_or(&c.path)
.to_string()
})
.collect();
if filenames.is_empty() {
Some(LogLine::success(
tags,
Some("file_change".into()),
"File modified".into(),
))
} else if filenames.len() == 1 {
Some(LogLine::success(
tags,
Some("file_change".into()),
format!("Modified: {}", filenames[0]),
))
} else {
Some(LogLine::success(
tags,
Some("file_change".into()),
format!(
"Modified {} files: {}, ...",
filenames.len(),
filenames[0]
),
))
}
} else {
Some(LogLine::success(
tags,
Some("file_change".into()),
"File modified".into(),
))
}
}
"mcp_tool_call" => Some(LogLine::message(
tags,
Some("mcp_tool_call".into()),
"Tool completed".into(),
)),
"web_search" => Some(LogLine::message(
tags,
Some("web_search".into()),
"Search completed".into(),
)),
"todo_list" => {
if let Some(text) = item.text {
let summary = text
.lines()
.find(|line| !line.trim().is_empty())
.unwrap_or("TODO updated");
Some(LogLine::message(
tags,
Some("todo_list".into()),
format!("TODO: {}", summary.trim()),
))
} else {
Some(LogLine::message(
tags,
Some("todo_list".into()),
"TODO updated".into(),
))
}
}
_ => {
Some(LogLine::message(
tags,
Some(item.item_type.clone()),
format!("{}: completed", item.item_type),
))
}
}
} else {
None
}
}
fn format_error(&mut self, event: CodexEvent) -> Option<LogLine> {
if let Some(error) = event.error {
Some(LogLine::error(
Self::tags(),
None,
format!("Error: {}", error.message),
))
} else {
Some(LogLine::error(Self::tags(), None, "Error occurred".into()))
}
}
}
#[async_trait]
impl LogProcessor for CodexLogProcessor {
fn process_line(&mut self, line: &str) -> Option<LogLine> {
if line.trim().is_empty() {
return None;
}
match serde_json::from_str::<CodexEvent>(line) {
Ok(event) => {
if !self.json_mode_active {
self.json_mode_active = true;
}
self.last_line_was_parse_error = false;
self.format_event(event)
}
Err(_) => {
if self.json_mode_active {
if self.last_line_was_parse_error {
None
} else {
self.last_line_was_parse_error = true;
Some(LogLine::warning(Self::tags(), None, "parsing error".into()))
}
} else {
Some(LogLine::message(vec![], None, line.to_string()))
}
}
}
}
fn get_final_result(&self) -> Option<&TaskResult> {
self.final_result.as_ref()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn get_message_text(line: &LogLine) -> &str {
match line {
LogLine::Message { message, .. } => message,
_ => panic!("Expected Message variant, got {:?}", line),
}
}
fn has_tag(line: &LogLine, tag: &str) {
let tags = match line {
LogLine::Message { tags, .. } => tags,
LogLine::Todo { tags, .. } => tags,
LogLine::Summary { .. } => panic!("Summary has no tags"),
};
assert!(
tags.iter().any(|t| t == tag),
"Expected tag '{}' in {:?}",
tag,
tags
);
}
#[test]
fn test_thread_and_turn_events() {
let mut processor = CodexLogProcessor::new();
let thread_started = r#"{"type":"thread.started","thread_id":"test-123"}"#;
assert_eq!(processor.process_line(thread_started), None);
let turn_started = r#"{"type":"turn.started"}"#;
assert_eq!(processor.process_line(turn_started), None);
}
#[test]
fn test_command_execution_events() {
let mut processor = CodexLogProcessor::new();
let started = r#"{"type":"item.started","item":{"id":"item_0","type":"command_execution","command":"ls -la","status":"in_progress"}}"#;
let output = processor.process_line(started).unwrap();
has_tag(&output, "codex");
assert!(get_message_text(&output).contains("Running: ls -la"));
let completed = r#"{"type":"item.completed","item":{"id":"item_0","type":"command_execution","command":"ls -la","aggregated_output":"file1.txt\nfile2.txt","exit_code":0,"status":"completed"}}"#;
let output = processor.process_line(completed).unwrap();
has_tag(&output, "codex");
assert!(get_message_text(&output).contains("Command completed (exit: 0)"));
assert!(!get_message_text(&output).contains("file1.txt")); }
#[test]
fn test_agent_message_events() {
let mut processor = CodexLogProcessor::new();
let completed = r#"{"type":"item.completed","item":{"id":"item_1","type":"agent_message","text":"This is my response"}}"#;
let output = processor.process_line(completed).unwrap();
has_tag(&output, "codex");
assert_eq!(get_message_text(&output), "This is my response");
}
#[test]
fn test_turn_completed_with_usage() {
let mut processor = CodexLogProcessor::new();
let completed = r#"{"type":"turn.completed","usage":{"input_tokens":1000,"output_tokens":500,"cached_input_tokens":200}}"#;
let output = processor.process_line(completed).unwrap();
if let LogLine::Summary {
success, message, ..
} = &output
{
assert!(success);
assert!(message.contains("1000 input tokens"));
assert!(message.contains("500 output tokens"));
assert!(message.contains("200 cached tokens"));
} else {
panic!("Expected Summary variant");
}
let result = processor.get_final_result().unwrap();
assert!(result.success);
assert!(result.cost_usd.is_none());
}
#[test]
fn test_turn_failed() {
let mut processor = CodexLogProcessor::new();
let failed = r#"{"type":"turn.failed","error":{"message":"API request failed"}}"#;
let output = processor.process_line(failed).unwrap();
if let LogLine::Summary {
success, message, ..
} = &output
{
assert!(!success);
assert!(message.contains("Turn failed: API request failed"));
} else {
panic!("Expected Summary variant");
}
let result = processor.get_final_result().unwrap();
assert!(!result.success);
}
#[test]
fn test_json_mode_behavior() {
let mut processor = CodexLogProcessor::new();
let result = processor.process_line("Configuration error");
assert_eq!(get_message_text(&result.unwrap()), "Configuration error");
assert!(!processor.json_mode_active);
let json = r#"{"type":"turn.started"}"#;
processor.process_line(json);
assert!(processor.json_mode_active);
let result = processor.process_line("Not JSON").unwrap();
if let LogLine::Message { level, message, .. } = &result {
assert_eq!(*level, Level::Warning);
assert_eq!(message, "parsing error");
}
}
#[test]
fn test_parse_error_deduplication() {
let mut processor = CodexLogProcessor::new();
processor.process_line(r#"{"type":"turn.started"}"#);
let result = processor.process_line("Bad line 1").unwrap();
if let LogLine::Message { level, .. } = &result {
assert_eq!(*level, Level::Warning);
}
assert_eq!(processor.process_line("Bad line 2"), None);
assert_eq!(processor.process_line("Bad line 3"), None);
processor.process_line(r#"{"type":"turn.started"}"#);
let result = processor.process_line("Bad line 4").unwrap();
if let LogLine::Message { level, .. } = &result {
assert_eq!(*level, Level::Warning);
}
}
#[test]
fn test_reasoning_and_file_change() {
let mut processor = CodexLogProcessor::new();
let started = r#"{"type":"item.started","item":{"id":"item_1","type":"reasoning"}}"#;
let output = processor.process_line(started).unwrap();
has_tag(&output, "codex");
assert!(get_message_text(&output).contains("Reasoning..."));
let completed = r#"{"type":"item.completed","item":{"id":"item_1","type":"reasoning","text":"I need to analyze the code structure"}}"#;
let output = processor.process_line(completed).unwrap();
has_tag(&output, "codex");
assert!(get_message_text(&output).contains("I need to analyze"));
let file_completed = r#"{"type":"item.completed","item":{"id":"item_2","type":"file_change","changes":[{"path":"/workspace/test-project/src/main.rs","kind":"update"}]}}"#;
let output = processor.process_line(file_completed).unwrap();
has_tag(&output, "codex");
if let LogLine::Message { level, message, .. } = &output {
assert_eq!(*level, Level::Success);
assert!(message.contains("Modified: src/main.rs"));
}
let file_completed_no_changes =
r#"{"type":"item.completed","item":{"id":"item_3","type":"file_change"}}"#;
let output = processor.process_line(file_completed_no_changes).unwrap();
if let LogLine::Message { level, message, .. } = &output {
assert_eq!(*level, Level::Success);
assert!(message.contains("File modified"));
}
}
#[test]
fn test_multiple_file_changes() {
let mut processor = CodexLogProcessor::new();
let file_completed = r#"{"type":"item.completed","item":{"id":"item_1","type":"file_change","changes":[{"path":"/workspace/test-project/src/main.rs","kind":"update"},{"path":"/workspace/test-project/src/lib.rs","kind":"update"},{"path":"/workspace/test-project/Cargo.toml","kind":"update"}]}}"#;
let output = processor.process_line(file_completed).unwrap();
if let LogLine::Message { level, message, .. } = &output {
assert_eq!(*level, Level::Success);
assert!(message.contains("Modified 3 files: src/main.rs, ..."));
}
}
#[test]
fn test_empty_lines_dont_reset_error_tracking() {
let mut processor = CodexLogProcessor::new();
let json = r#"{"type":"turn.started"}"#;
processor.process_line(json);
let result = processor.process_line("Bad line 1").unwrap();
if let LogLine::Message { level, .. } = &result {
assert_eq!(*level, Level::Warning);
}
let result = processor.process_line("");
assert_eq!(result, None);
let result = processor.process_line("Bad line 2");
assert_eq!(result, None);
}
#[test]
fn test_unknown_item_types() {
let mut processor = CodexLogProcessor::new();
let started = r#"{"type":"item.started","item":{"id":"item_1","type":"unknown_type"}}"#;
let output = processor.process_line(started).unwrap();
has_tag(&output, "codex");
assert!(get_message_text(&output).contains("unknown_type: started"));
let completed = r#"{"type":"item.completed","item":{"id":"item_1","type":"unknown_type"}}"#;
let output = processor.process_line(completed).unwrap();
has_tag(&output, "codex");
assert!(get_message_text(&output).contains("unknown_type: completed"));
}
#[test]
fn test_todo_list() {
let mut processor = CodexLogProcessor::new();
let completed = r#"{"type":"item.completed","item":{"id":"item_1","type":"todo_list","text":"- Implement feature A\n- Test feature A\n- Document feature A"}}"#;
let output = processor.process_line(completed).unwrap();
has_tag(&output, "codex");
assert!(get_message_text(&output).contains("TODO:"));
assert!(get_message_text(&output).contains("Implement feature A"));
}
#[test]
fn test_long_output_truncation() {
let mut processor = CodexLogProcessor::new();
let long_output = "a".repeat(100);
let completed = format!(
r#"{{"type":"item.completed","item":{{"id":"item_0","type":"command_execution","aggregated_output":"{}","exit_code":0}}}}"#,
long_output
);
let output = processor.process_line(&completed).unwrap();
let text = get_message_text(&output);
assert!(text.contains("Command completed (exit: 0)"));
assert!(!text.contains("aaa"));
let long_reasoning = "b".repeat(100);
let completed = format!(
r#"{{"type":"item.completed","item":{{"id":"item_1","type":"reasoning","text":"{}"}}}}"#,
long_reasoning
);
let output = processor.process_line(&completed).unwrap();
let text = get_message_text(&output);
assert!(text.contains("..."));
assert!(text.len() < long_reasoning.len() + 50); }
#[test]
fn test_empty_agent_message() {
let mut processor = CodexLogProcessor::new();
let completed = r#"{"type":"item.completed","item":{"id":"item_1","type":"agent_message","text":" "}}"#;
let output = processor.process_line(completed);
assert_eq!(output, None); }
#[test]
fn test_command_simplification() {
let mut processor = CodexLogProcessor::new();
let event = r#"{"type":"item.started","item":{"id":"item_0","type":"command_execution","command":"bash -lc 'ls -la'","status":"in_progress"}}"#;
let output = processor.process_line(event).unwrap();
let text = get_message_text(&output);
assert!(text.contains("Running: ls -la"));
assert!(!text.contains("bash -lc"));
let event = r#"{"type":"item.started","item":{"id":"item_1","type":"command_execution","command":"bash -lc \"echo hello\"","status":"in_progress"}}"#;
let output = processor.process_line(event).unwrap();
assert!(get_message_text(&output).contains("Running: echo hello"));
let event = r#"{"type":"item.started","item":{"id":"item_2","type":"command_execution","command":"bash -lc 'rg \"fn main\" src'","status":"in_progress"}}"#;
let output = processor.process_line(event).unwrap();
assert!(get_message_text(&output).contains("Running: rg \"fn main\" src"));
let event = r#"{"type":"item.started","item":{"id":"item_3","type":"command_execution","command":"bash -lc ls","status":"in_progress"}}"#;
let output = processor.process_line(event).unwrap();
let text = get_message_text(&output);
assert!(text.contains("Running: ls"));
assert!(!text.contains("bash -lc"));
let event = r#"{"type":"item.started","item":{"id":"item_4","type":"command_execution","command":"bash -lc cat file.txt","status":"in_progress"}}"#;
let output = processor.process_line(event).unwrap();
let text = get_message_text(&output);
assert!(text.contains("Running: cat file.txt"));
assert!(!text.contains("bash -lc"));
}
#[test]
fn test_verbose_output_filtering() {
let mut processor = CodexLogProcessor::new();
let event = r#"{"type":"item.completed","item":{"type":"command_execution","aggregated_output":"file1.txt\nfile2.txt","exit_code":0}}"#;
let output = processor.process_line(event).unwrap();
let text = get_message_text(&output);
assert!(text.contains("Command completed (exit: 0)"));
assert!(!text.contains("file1.txt"));
let event = r#"{"type":"item.completed","item":{"type":"command_execution","aggregated_output":"test result: ok. 13 passed","exit_code":0}}"#;
let output = processor.process_line(event).unwrap();
let text = get_message_text(&output);
assert!(text.contains("Tests passed"));
let event = r#"{"type":"item.completed","item":{"type":"command_execution","aggregated_output":"error: file not found","exit_code":1}}"#;
let output = processor.process_line(event).unwrap();
let text = get_message_text(&output);
assert!(text.contains("error: file not found"));
}
}