Skip to main content

unified_agent_sdk/providers/codex/
normalizer.rs

1//! Codex JSONL log normalizer.
2
3use codex::{CommandExecutionStatus, McpToolCallStatus, PatchApplyStatus, ThreadEvent, ThreadItem};
4use serde_json::json;
5
6use crate::{
7    error::{ExecutorError, Result},
8    log::{ActionType, LogNormalizer, NormalizedLog},
9    types::{Role, ToolStatus},
10};
11
12/// Normalizes Codex `ThreadEvent` JSONL chunks to [`NormalizedLog`] entries.
13#[derive(Debug, Default)]
14pub struct CodexLogNormalizer {
15    buffer: Vec<u8>,
16}
17
18impl CodexLogNormalizer {
19    /// Creates a new Codex log normalizer.
20    pub fn new() -> Self {
21        Self::default()
22    }
23
24    fn consume_lines(&mut self) -> Vec<NormalizedLog> {
25        let mut output = Vec::new();
26
27        while let Some(newline_idx) = self.buffer.iter().position(|&byte| byte == b'\n') {
28            let mut line = self.buffer.drain(..=newline_idx).collect::<Vec<_>>();
29            if matches!(line.last(), Some(b'\n')) {
30                line.pop();
31            }
32            if matches!(line.last(), Some(b'\r')) {
33                line.pop();
34            }
35
36            if line.is_empty() {
37                continue;
38            }
39
40            output.extend(self.normalize_line(&line));
41        }
42
43        output
44    }
45
46    fn normalize_line(&self, line: &[u8]) -> Vec<NormalizedLog> {
47        match self.try_normalize_line(line) {
48            Ok(logs) => logs,
49            Err(error) => vec![Self::error_from_executor_error(error)],
50        }
51    }
52
53    fn try_normalize_line(&self, line: &[u8]) -> Result<Vec<NormalizedLog>> {
54        let event: ThreadEvent = serde_json::from_slice(line)?;
55        Ok(Self::map_event(event))
56    }
57
58    fn map_event(event: ThreadEvent) -> Vec<NormalizedLog> {
59        match event {
60            ThreadEvent::ThreadStarted { .. } | ThreadEvent::TurnStarted => Vec::new(),
61            ThreadEvent::TurnCompleted { usage } => {
62                let total_u64 = usage
63                    .input_tokens
64                    .saturating_add(usage.cached_input_tokens)
65                    .saturating_add(usage.output_tokens);
66
67                vec![NormalizedLog::TokenUsage {
68                    total: total_u64.min(u32::MAX as u64) as u32,
69                    // Codex events do not expose a token limit.
70                    limit: 0,
71                }]
72            }
73            ThreadEvent::TurnFailed { error } => vec![NormalizedLog::Error {
74                error_type: "turn_failed".to_string(),
75                message: error.message,
76            }],
77            ThreadEvent::Error { message } => vec![NormalizedLog::Error {
78                error_type: "stream_error".to_string(),
79                message,
80            }],
81            ThreadEvent::ItemStarted { item } => Self::map_item(item, ItemPhase::Started),
82            ThreadEvent::ItemUpdated { item } => Self::map_item(item, ItemPhase::Updated),
83            ThreadEvent::ItemCompleted { item } => Self::map_item(item, ItemPhase::Completed),
84        }
85    }
86
87    fn map_item(item: ThreadItem, phase: ItemPhase) -> Vec<NormalizedLog> {
88        match item {
89            ThreadItem::AgentMessage(message) => vec![NormalizedLog::Message {
90                role: Role::Assistant,
91                content: message.text,
92            }],
93            ThreadItem::CommandExecution(command) => vec![NormalizedLog::ToolCall {
94                name: "command_execution".to_string(),
95                args: json!({
96                    "id": command.id,
97                    "output": command.aggregated_output,
98                    "exit_code": command.exit_code,
99                }),
100                status: Self::map_command_status(command.status, phase),
101                action: ActionType::CommandRun {
102                    command: command.command,
103                },
104            }],
105            ThreadItem::FileChange(file_change) => {
106                let status = Self::map_patch_status(file_change.status, phase);
107                let changes_len = file_change.changes.len();
108
109                file_change
110                    .changes
111                    .into_iter()
112                    .map(|change| NormalizedLog::ToolCall {
113                        name: "file_change".to_string(),
114                        args: json!({
115                            "id": file_change.id,
116                            "kind": change.kind,
117                            "status": file_change.status,
118                            "change_count": changes_len,
119                        }),
120                        status,
121                        action: ActionType::FileEdit { path: change.path },
122                    })
123                    .collect()
124            }
125            ThreadItem::McpToolCall(tool_call) => {
126                let tool_name = format!("{}.{}", tool_call.server, tool_call.tool);
127                vec![NormalizedLog::ToolCall {
128                    name: tool_name,
129                    args: json!({
130                        "id": tool_call.id,
131                        "server": tool_call.server,
132                        "arguments": tool_call.arguments,
133                        "result": tool_call.result,
134                        "error": tool_call.error,
135                    }),
136                    status: Self::map_mcp_status(tool_call.status, phase),
137                    action: ActionType::McpTool {
138                        tool: tool_call.tool,
139                    },
140                }]
141            }
142            ThreadItem::Reasoning(reasoning) => vec![NormalizedLog::Thinking {
143                content: reasoning.text,
144            }],
145            ThreadItem::WebSearch(search) => vec![NormalizedLog::ToolCall {
146                name: "web_search".to_string(),
147                args: json!({ "id": search.id }),
148                status: Self::status_from_phase(phase, ToolStatus::Completed),
149                action: ActionType::WebSearch {
150                    query: search.query,
151                },
152            }],
153            ThreadItem::Error(error_item) => vec![NormalizedLog::Error {
154                error_type: "item_error".to_string(),
155                message: error_item.message,
156            }],
157            ThreadItem::TodoList(_) => Vec::new(),
158        }
159    }
160
161    fn map_command_status(status: CommandExecutionStatus, phase: ItemPhase) -> ToolStatus {
162        match phase {
163            ItemPhase::Started => ToolStatus::Started,
164            ItemPhase::Updated | ItemPhase::Completed => match status {
165                CommandExecutionStatus::InProgress => ToolStatus::Running,
166                CommandExecutionStatus::Completed => ToolStatus::Completed,
167                CommandExecutionStatus::Failed => ToolStatus::Failed,
168            },
169        }
170    }
171
172    fn map_patch_status(status: PatchApplyStatus, phase: ItemPhase) -> ToolStatus {
173        match phase {
174            ItemPhase::Started => ToolStatus::Started,
175            ItemPhase::Updated | ItemPhase::Completed => match status {
176                PatchApplyStatus::Completed => ToolStatus::Completed,
177                PatchApplyStatus::Failed => ToolStatus::Failed,
178            },
179        }
180    }
181
182    fn map_mcp_status(status: McpToolCallStatus, phase: ItemPhase) -> ToolStatus {
183        match phase {
184            ItemPhase::Started => ToolStatus::Started,
185            ItemPhase::Updated | ItemPhase::Completed => match status {
186                McpToolCallStatus::InProgress => ToolStatus::Running,
187                McpToolCallStatus::Completed => ToolStatus::Completed,
188                McpToolCallStatus::Failed => ToolStatus::Failed,
189            },
190        }
191    }
192
193    fn status_from_phase(phase: ItemPhase, fallback: ToolStatus) -> ToolStatus {
194        match phase {
195            ItemPhase::Started => ToolStatus::Started,
196            ItemPhase::Updated | ItemPhase::Completed => fallback,
197        }
198    }
199
200    fn error_from_executor_error(error: ExecutorError) -> NormalizedLog {
201        let error_type = error.error_type();
202
203        NormalizedLog::Error {
204            error_type: error_type.to_string(),
205            message: error.to_string(),
206        }
207    }
208}
209
210impl LogNormalizer for CodexLogNormalizer {
211    fn normalize(&mut self, chunk: &[u8]) -> Vec<NormalizedLog> {
212        self.buffer.extend_from_slice(chunk);
213        self.consume_lines()
214    }
215
216    fn flush(&mut self) -> Vec<NormalizedLog> {
217        let remaining = std::mem::take(&mut self.buffer);
218        if remaining.is_empty() || remaining.iter().all(u8::is_ascii_whitespace) {
219            return Vec::new();
220        }
221
222        self.normalize_line(&remaining)
223    }
224}
225
226#[derive(Debug, Clone, Copy)]
227enum ItemPhase {
228    Started,
229    Updated,
230    Completed,
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236
237    #[test]
238    fn normalize_is_incremental() {
239        let mut normalizer = CodexLogNormalizer::new();
240        let line =
241            r#"{"type":"item.completed","item":{"type":"agent_message","id":"a1","text":"done"}}"#;
242
243        assert!(normalizer.normalize(line.as_bytes()).is_empty());
244
245        let logs = normalizer.normalize(b"\n");
246        assert_eq!(logs.len(), 1);
247        match &logs[0] {
248            NormalizedLog::Message { role, content } => {
249                assert_eq!(*role, Role::Assistant);
250                assert_eq!(content, "done");
251            }
252            other => panic!("unexpected log: {other:?}"),
253        }
254    }
255
256    #[test]
257    fn maps_required_codex_items() {
258        let mut normalizer = CodexLogNormalizer::new();
259        let jsonl = concat!(
260            r#"{"type":"item.completed","item":{"type":"agent_message","id":"m1","text":"hello"}}"#,
261            "\n",
262            r#"{"type":"item.updated","item":{"type":"command_execution","id":"c1","command":"ls -la","aggregated_output":"ok","status":"in_progress"}}"#,
263            "\n",
264            r#"{"type":"item.completed","item":{"type":"file_change","id":"f1","changes":[{"path":"src/lib.rs","kind":"update"}],"status":"completed"}}"#,
265            "\n",
266            r#"{"type":"item.completed","item":{"type":"mcp_tool_call","id":"t1","server":"filesystem","tool":"read_file","arguments":{"path":"README.md"},"status":"completed"}}"#,
267            "\n",
268            r#"{"type":"item.updated","item":{"type":"reasoning","id":"r1","text":"analyzing..."}}"#,
269            "\n"
270        );
271
272        let logs = normalizer.normalize(jsonl.as_bytes());
273        assert_eq!(logs.len(), 5);
274
275        match &logs[0] {
276            NormalizedLog::Message { role, content } => {
277                assert_eq!(*role, Role::Assistant);
278                assert_eq!(content, "hello");
279            }
280            other => panic!("unexpected message mapping: {other:?}"),
281        }
282
283        match &logs[1] {
284            NormalizedLog::ToolCall { action, status, .. } => {
285                assert!(matches!(
286                    action,
287                    ActionType::CommandRun { command } if command == "ls -la"
288                ));
289                assert_eq!(*status, ToolStatus::Running);
290            }
291            other => panic!("unexpected command mapping: {other:?}"),
292        }
293
294        match &logs[2] {
295            NormalizedLog::ToolCall { action, status, .. } => {
296                assert!(matches!(
297                    action,
298                    ActionType::FileEdit { path } if path == "src/lib.rs"
299                ));
300                assert_eq!(*status, ToolStatus::Completed);
301            }
302            other => panic!("unexpected file change mapping: {other:?}"),
303        }
304
305        match &logs[3] {
306            NormalizedLog::ToolCall { action, status, .. } => {
307                assert!(matches!(
308                    action,
309                    ActionType::McpTool { tool } if tool == "read_file"
310                ));
311                assert_eq!(*status, ToolStatus::Completed);
312            }
313            other => panic!("unexpected mcp mapping: {other:?}"),
314        }
315
316        match &logs[4] {
317            NormalizedLog::Thinking { content } => assert_eq!(content, "analyzing..."),
318            other => panic!("unexpected thinking mapping: {other:?}"),
319        }
320    }
321
322    #[test]
323    fn flush_processes_trailing_data() {
324        let mut normalizer = CodexLogNormalizer::new();
325        let line =
326            r#"{"type":"item.completed","item":{"type":"reasoning","id":"r1","text":"pending"}}"#;
327
328        assert!(normalizer.normalize(line.as_bytes()).is_empty());
329
330        let logs = normalizer.flush();
331        assert_eq!(logs.len(), 1);
332        match &logs[0] {
333            NormalizedLog::Thinking { content } => assert_eq!(content, "pending"),
334            other => panic!("unexpected flush mapping: {other:?}"),
335        }
336    }
337}