1use codex::{CommandExecutionStatus, McpToolCallStatus, PatchApplyStatus, ThreadEvent, ThreadItem};
4use serde_json::json;
5
6use crate::{
7 error::{ExecutorError, Result},
8 log::{ActionType, LogNormalizer, NormalizedLog},
9 types::{Role, ToolStatus},
10};
11
12#[derive(Debug, Default)]
14pub struct CodexLogNormalizer {
15 buffer: Vec<u8>,
16}
17
18impl CodexLogNormalizer {
19 pub fn new() -> Self {
21 Self::default()
22 }
23
24 fn consume_lines(&mut self) -> Vec<NormalizedLog> {
25 let mut output = Vec::new();
26
27 while let Some(newline_idx) = self.buffer.iter().position(|&byte| byte == b'\n') {
28 let mut line = self.buffer.drain(..=newline_idx).collect::<Vec<_>>();
29 if matches!(line.last(), Some(b'\n')) {
30 line.pop();
31 }
32 if matches!(line.last(), Some(b'\r')) {
33 line.pop();
34 }
35
36 if line.is_empty() {
37 continue;
38 }
39
40 output.extend(self.normalize_line(&line));
41 }
42
43 output
44 }
45
46 fn normalize_line(&self, line: &[u8]) -> Vec<NormalizedLog> {
47 match self.try_normalize_line(line) {
48 Ok(logs) => logs,
49 Err(error) => vec![Self::error_from_executor_error(error)],
50 }
51 }
52
53 fn try_normalize_line(&self, line: &[u8]) -> Result<Vec<NormalizedLog>> {
54 let event: ThreadEvent = serde_json::from_slice(line)?;
55 Ok(Self::map_event(event))
56 }
57
58 fn map_event(event: ThreadEvent) -> Vec<NormalizedLog> {
59 match event {
60 ThreadEvent::ThreadStarted { .. } | ThreadEvent::TurnStarted => Vec::new(),
61 ThreadEvent::TurnCompleted { usage } => {
62 let total_u64 = usage
63 .input_tokens
64 .saturating_add(usage.cached_input_tokens)
65 .saturating_add(usage.output_tokens);
66
67 vec![NormalizedLog::TokenUsage {
68 total: total_u64.min(u32::MAX as u64) as u32,
69 limit: 0,
71 }]
72 }
73 ThreadEvent::TurnFailed { error } => vec![NormalizedLog::Error {
74 error_type: "turn_failed".to_string(),
75 message: error.message,
76 }],
77 ThreadEvent::Error { message } => vec![NormalizedLog::Error {
78 error_type: "stream_error".to_string(),
79 message,
80 }],
81 ThreadEvent::ItemStarted { item } => Self::map_item(item, ItemPhase::Started),
82 ThreadEvent::ItemUpdated { item } => Self::map_item(item, ItemPhase::Updated),
83 ThreadEvent::ItemCompleted { item } => Self::map_item(item, ItemPhase::Completed),
84 }
85 }
86
87 fn map_item(item: ThreadItem, phase: ItemPhase) -> Vec<NormalizedLog> {
88 match item {
89 ThreadItem::AgentMessage(message) => vec![NormalizedLog::Message {
90 role: Role::Assistant,
91 content: message.text,
92 }],
93 ThreadItem::CommandExecution(command) => vec![NormalizedLog::ToolCall {
94 name: "command_execution".to_string(),
95 args: json!({
96 "id": command.id,
97 "output": command.aggregated_output,
98 "exit_code": command.exit_code,
99 }),
100 status: Self::map_command_status(command.status, phase),
101 action: ActionType::CommandRun {
102 command: command.command,
103 },
104 }],
105 ThreadItem::FileChange(file_change) => {
106 let status = Self::map_patch_status(file_change.status, phase);
107 let changes_len = file_change.changes.len();
108
109 file_change
110 .changes
111 .into_iter()
112 .map(|change| NormalizedLog::ToolCall {
113 name: "file_change".to_string(),
114 args: json!({
115 "id": file_change.id,
116 "kind": change.kind,
117 "status": file_change.status,
118 "change_count": changes_len,
119 }),
120 status,
121 action: ActionType::FileEdit { path: change.path },
122 })
123 .collect()
124 }
125 ThreadItem::McpToolCall(tool_call) => {
126 let tool_name = format!("{}.{}", tool_call.server, tool_call.tool);
127 vec![NormalizedLog::ToolCall {
128 name: tool_name,
129 args: json!({
130 "id": tool_call.id,
131 "server": tool_call.server,
132 "arguments": tool_call.arguments,
133 "result": tool_call.result,
134 "error": tool_call.error,
135 }),
136 status: Self::map_mcp_status(tool_call.status, phase),
137 action: ActionType::McpTool {
138 tool: tool_call.tool,
139 },
140 }]
141 }
142 ThreadItem::Reasoning(reasoning) => vec![NormalizedLog::Thinking {
143 content: reasoning.text,
144 }],
145 ThreadItem::WebSearch(search) => vec![NormalizedLog::ToolCall {
146 name: "web_search".to_string(),
147 args: json!({ "id": search.id }),
148 status: Self::status_from_phase(phase, ToolStatus::Completed),
149 action: ActionType::WebSearch {
150 query: search.query,
151 },
152 }],
153 ThreadItem::Error(error_item) => vec![NormalizedLog::Error {
154 error_type: "item_error".to_string(),
155 message: error_item.message,
156 }],
157 ThreadItem::TodoList(_) => Vec::new(),
158 }
159 }
160
161 fn map_command_status(status: CommandExecutionStatus, phase: ItemPhase) -> ToolStatus {
162 match phase {
163 ItemPhase::Started => ToolStatus::Started,
164 ItemPhase::Updated | ItemPhase::Completed => match status {
165 CommandExecutionStatus::InProgress => ToolStatus::Running,
166 CommandExecutionStatus::Completed => ToolStatus::Completed,
167 CommandExecutionStatus::Failed => ToolStatus::Failed,
168 },
169 }
170 }
171
172 fn map_patch_status(status: PatchApplyStatus, phase: ItemPhase) -> ToolStatus {
173 match phase {
174 ItemPhase::Started => ToolStatus::Started,
175 ItemPhase::Updated | ItemPhase::Completed => match status {
176 PatchApplyStatus::Completed => ToolStatus::Completed,
177 PatchApplyStatus::Failed => ToolStatus::Failed,
178 },
179 }
180 }
181
182 fn map_mcp_status(status: McpToolCallStatus, phase: ItemPhase) -> ToolStatus {
183 match phase {
184 ItemPhase::Started => ToolStatus::Started,
185 ItemPhase::Updated | ItemPhase::Completed => match status {
186 McpToolCallStatus::InProgress => ToolStatus::Running,
187 McpToolCallStatus::Completed => ToolStatus::Completed,
188 McpToolCallStatus::Failed => ToolStatus::Failed,
189 },
190 }
191 }
192
193 fn status_from_phase(phase: ItemPhase, fallback: ToolStatus) -> ToolStatus {
194 match phase {
195 ItemPhase::Started => ToolStatus::Started,
196 ItemPhase::Updated | ItemPhase::Completed => fallback,
197 }
198 }
199
200 fn error_from_executor_error(error: ExecutorError) -> NormalizedLog {
201 let error_type = error.error_type();
202
203 NormalizedLog::Error {
204 error_type: error_type.to_string(),
205 message: error.to_string(),
206 }
207 }
208}
209
210impl LogNormalizer for CodexLogNormalizer {
211 fn normalize(&mut self, chunk: &[u8]) -> Vec<NormalizedLog> {
212 self.buffer.extend_from_slice(chunk);
213 self.consume_lines()
214 }
215
216 fn flush(&mut self) -> Vec<NormalizedLog> {
217 let remaining = std::mem::take(&mut self.buffer);
218 if remaining.is_empty() || remaining.iter().all(u8::is_ascii_whitespace) {
219 return Vec::new();
220 }
221
222 self.normalize_line(&remaining)
223 }
224}
225
226#[derive(Debug, Clone, Copy)]
227enum ItemPhase {
228 Started,
229 Updated,
230 Completed,
231}
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236
237 #[test]
238 fn normalize_is_incremental() {
239 let mut normalizer = CodexLogNormalizer::new();
240 let line =
241 r#"{"type":"item.completed","item":{"type":"agent_message","id":"a1","text":"done"}}"#;
242
243 assert!(normalizer.normalize(line.as_bytes()).is_empty());
244
245 let logs = normalizer.normalize(b"\n");
246 assert_eq!(logs.len(), 1);
247 match &logs[0] {
248 NormalizedLog::Message { role, content } => {
249 assert_eq!(*role, Role::Assistant);
250 assert_eq!(content, "done");
251 }
252 other => panic!("unexpected log: {other:?}"),
253 }
254 }
255
256 #[test]
257 fn maps_required_codex_items() {
258 let mut normalizer = CodexLogNormalizer::new();
259 let jsonl = concat!(
260 r#"{"type":"item.completed","item":{"type":"agent_message","id":"m1","text":"hello"}}"#,
261 "\n",
262 r#"{"type":"item.updated","item":{"type":"command_execution","id":"c1","command":"ls -la","aggregated_output":"ok","status":"in_progress"}}"#,
263 "\n",
264 r#"{"type":"item.completed","item":{"type":"file_change","id":"f1","changes":[{"path":"src/lib.rs","kind":"update"}],"status":"completed"}}"#,
265 "\n",
266 r#"{"type":"item.completed","item":{"type":"mcp_tool_call","id":"t1","server":"filesystem","tool":"read_file","arguments":{"path":"README.md"},"status":"completed"}}"#,
267 "\n",
268 r#"{"type":"item.updated","item":{"type":"reasoning","id":"r1","text":"analyzing..."}}"#,
269 "\n"
270 );
271
272 let logs = normalizer.normalize(jsonl.as_bytes());
273 assert_eq!(logs.len(), 5);
274
275 match &logs[0] {
276 NormalizedLog::Message { role, content } => {
277 assert_eq!(*role, Role::Assistant);
278 assert_eq!(content, "hello");
279 }
280 other => panic!("unexpected message mapping: {other:?}"),
281 }
282
283 match &logs[1] {
284 NormalizedLog::ToolCall { action, status, .. } => {
285 assert!(matches!(
286 action,
287 ActionType::CommandRun { command } if command == "ls -la"
288 ));
289 assert_eq!(*status, ToolStatus::Running);
290 }
291 other => panic!("unexpected command mapping: {other:?}"),
292 }
293
294 match &logs[2] {
295 NormalizedLog::ToolCall { action, status, .. } => {
296 assert!(matches!(
297 action,
298 ActionType::FileEdit { path } if path == "src/lib.rs"
299 ));
300 assert_eq!(*status, ToolStatus::Completed);
301 }
302 other => panic!("unexpected file change mapping: {other:?}"),
303 }
304
305 match &logs[3] {
306 NormalizedLog::ToolCall { action, status, .. } => {
307 assert!(matches!(
308 action,
309 ActionType::McpTool { tool } if tool == "read_file"
310 ));
311 assert_eq!(*status, ToolStatus::Completed);
312 }
313 other => panic!("unexpected mcp mapping: {other:?}"),
314 }
315
316 match &logs[4] {
317 NormalizedLog::Thinking { content } => assert_eq!(content, "analyzing..."),
318 other => panic!("unexpected thinking mapping: {other:?}"),
319 }
320 }
321
322 #[test]
323 fn flush_processes_trailing_data() {
324 let mut normalizer = CodexLogNormalizer::new();
325 let line =
326 r#"{"type":"item.completed","item":{"type":"reasoning","id":"r1","text":"pending"}}"#;
327
328 assert!(normalizer.normalize(line.as_bytes()).is_empty());
329
330 let logs = normalizer.flush();
331 assert_eq!(logs.len(), 1);
332 match &logs[0] {
333 NormalizedLog::Thinking { content } => assert_eq!(content, "pending"),
334 other => panic!("unexpected flush mapping: {other:?}"),
335 }
336 }
337}