agtrace_providers/codex/
parser.rs

1use agtrace_types::*;
2use anyhow::Result;
3use chrono::DateTime;
4use regex::Regex;
5use std::path::Path;
6use std::sync::LazyLock;
7use uuid::Uuid;
8
9use crate::builder::{EventBuilder, SemanticSuffix};
10use crate::codex::schema;
11use crate::codex::schema::CodexRecord;
12
13/// Regex for extracting exit codes from Codex output
14/// Example: "Exit Code: 0" or similar patterns
15static EXIT_CODE_REGEX: LazyLock<Regex> =
16    LazyLock::new(|| Regex::new(r"Exit Code:\s*(\d+)").unwrap());
17
18/// Attach model to event metadata when available
19fn attach_model_metadata(
20    metadata: Option<serde_json::Value>,
21    model: Option<&String>,
22) -> Option<serde_json::Value> {
23    let model = match model {
24        Some(m) => m.clone(),
25        None => return metadata,
26    };
27
28    match metadata {
29        Some(serde_json::Value::Object(mut map)) => {
30            map.entry("model")
31                .or_insert_with(|| serde_json::Value::String(model));
32            Some(serde_json::Value::Object(map))
33        }
34        Some(other) => {
35            let mut map = serde_json::Map::new();
36            map.insert("raw".to_string(), other);
37            map.insert("model".to_string(), serde_json::Value::String(model));
38            Some(serde_json::Value::Object(map))
39        }
40        None => {
41            let mut map = serde_json::Map::new();
42            map.insert("model".to_string(), serde_json::Value::String(model));
43            Some(serde_json::Value::Object(map))
44        }
45    }
46}
47
48/// Normalize Codex session records to events
49/// Handles async token notifications, JSON string parsing, and exit code extraction
50pub(crate) fn normalize_codex_session(
51    records: Vec<CodexRecord>,
52    session_id: &str,
53) -> Vec<AgentEvent> {
54    // Create session_id UUID from session_id string (deterministic)
55    let session_id_uuid = Uuid::new_v5(&Uuid::NAMESPACE_OID, session_id.as_bytes());
56    let mut builder = EventBuilder::new(session_id_uuid);
57    let mut events = Vec::new();
58    let mut last_seen_model: Option<String> = None;
59
60    // Track last generation event for attaching TokenUsage (future use)
61    let mut _last_generation_event_id: Option<Uuid> = None;
62
63    // Track last seen token usage to deduplicate
64    // Codex sends duplicate token_count events with same last_token_usage values
65    let mut last_seen_token_usage: Option<(i32, i32, i32)> = None;
66
67    for (row_index, record) in records.iter().enumerate() {
68        // Generate base_id from session_id + row_index (deterministic)
69        let base_id = format!("{}:row_{}", session_id, row_index);
70        match record {
71            CodexRecord::SessionMeta(_meta) => {
72                // SessionMeta doesn't generate events
73                // Metadata is preserved in raw field if needed
74            }
75
76            CodexRecord::EventMsg(event_msg) => {
77                let timestamp = parse_timestamp(&event_msg.timestamp);
78                let raw_value = attach_model_metadata(
79                    serde_json::to_value(event_msg).ok(),
80                    last_seen_model.as_ref(),
81                );
82
83                match &event_msg.payload {
84                    // Skip user_message, agent_message, agent_reasoning
85                    // These are duplicated in ResponseItem with richer data (encrypted_content, etc.)
86                    schema::EventMsgPayload::UserMessage(_) => {
87                        // Skip: duplicated in ResponseItem::Message(user)
88                    }
89
90                    schema::EventMsgPayload::AgentMessage(_) => {
91                        // Skip: duplicated in ResponseItem::Message(assistant)
92                    }
93
94                    schema::EventMsgPayload::AgentReasoning(_) => {
95                        // Skip: duplicated in ResponseItem::Reasoning
96                    }
97
98                    schema::EventMsgPayload::TokenCount(token_count) => {
99                        // TokenUsage sidecar event
100                        // IMPORTANT: Keep this - token_count only exists in event_msg, not in response_item
101                        if let Some(info) = &token_count.info {
102                            let usage = &info.last_token_usage;
103                            let usage_triple = (
104                                usage.input_tokens as i32,
105                                usage.output_tokens as i32,
106                                usage.total_tokens as i32,
107                            );
108
109                            // Deduplicate: Codex often sends duplicate token_count with same last_token_usage
110                            if last_seen_token_usage == Some(usage_triple) {
111                                // Skip duplicate
112                                continue;
113                            }
114                            last_seen_token_usage = Some(usage_triple);
115
116                            builder.build_and_push(
117                                &mut events,
118                                &base_id,
119                                SemanticSuffix::TokenUsage,
120                                timestamp,
121                                EventPayload::TokenUsage(TokenUsagePayload {
122                                    input_tokens: usage.input_tokens as i32,
123                                    output_tokens: usage.output_tokens as i32,
124                                    total_tokens: usage.total_tokens as i32,
125                                    details: Some(TokenUsageDetails {
126                                        cache_creation_input_tokens: None, // Codex doesn't track cache creation separately
127                                        cache_read_input_tokens: Some(
128                                            usage.cached_input_tokens as i32,
129                                        ),
130                                        reasoning_output_tokens: Some(
131                                            usage.reasoning_output_tokens as i32,
132                                        ),
133                                    }),
134                                }),
135                                raw_value.clone(),
136                                StreamId::Main,
137                            );
138                        }
139                    }
140
141                    schema::EventMsgPayload::Unknown => {
142                        // Skip unknown event types
143                    }
144                }
145            }
146
147            CodexRecord::ResponseItem(response_item) => {
148                let timestamp = parse_timestamp(&response_item.timestamp);
149                let raw_value = attach_model_metadata(
150                    serde_json::to_value(response_item).ok(),
151                    last_seen_model.as_ref(),
152                );
153
154                match &response_item.payload {
155                    schema::ResponseItemPayload::Message(message) => {
156                        // Extract text from content blocks
157                        let text = extract_message_text(&message.content);
158
159                        let (payload, suffix) = if message.role == "user" {
160                            (
161                                EventPayload::User(UserPayload { text }),
162                                SemanticSuffix::User,
163                            )
164                        } else {
165                            (
166                                EventPayload::Message(MessagePayload { text }),
167                                SemanticSuffix::Message,
168                            )
169                        };
170
171                        let event_id = builder.build_and_push(
172                            &mut events,
173                            &base_id,
174                            suffix,
175                            timestamp,
176                            payload,
177                            raw_value.clone(),
178                            StreamId::Main,
179                        );
180
181                        if message.role == "assistant" {
182                            _last_generation_event_id = Some(event_id);
183                        }
184                    }
185
186                    schema::ResponseItemPayload::Reasoning(reasoning) => {
187                        // Extract text from summary blocks
188                        let text = extract_reasoning_text(reasoning);
189
190                        builder.build_and_push(
191                            &mut events,
192                            &base_id,
193                            SemanticSuffix::Reasoning,
194                            timestamp,
195                            EventPayload::Reasoning(ReasoningPayload { text }),
196                            raw_value.clone(),
197                            StreamId::Main,
198                        );
199                    }
200
201                    schema::ResponseItemPayload::FunctionCall(func_call) => {
202                        // Parse JSON string arguments to Value
203                        let arguments = parse_json_arguments(&func_call.arguments);
204
205                        let event_id = builder.build_and_push(
206                            &mut events,
207                            &base_id,
208                            SemanticSuffix::ToolCall,
209                            timestamp,
210                            EventPayload::ToolCall(super::mapper::normalize_codex_tool_call(
211                                func_call.name.clone(),
212                                arguments,
213                                Some(func_call.call_id.clone()),
214                            )),
215                            raw_value.clone(),
216                            StreamId::Main,
217                        );
218
219                        // Register tool call mapping
220                        builder.register_tool_call(func_call.call_id.clone(), event_id);
221                        _last_generation_event_id = Some(event_id);
222                    }
223
224                    schema::ResponseItemPayload::FunctionCallOutput(output) => {
225                        // Extract exit code from output text
226                        let exit_code = extract_exit_code(&output.output);
227
228                        if let Some(tool_call_id) = builder.get_tool_call_uuid(&output.call_id) {
229                            builder.build_and_push(
230                                &mut events,
231                                &base_id,
232                                SemanticSuffix::ToolResult,
233                                timestamp,
234                                EventPayload::ToolResult(ToolResultPayload {
235                                    output: output.output.clone(),
236                                    tool_call_id,
237                                    is_error: exit_code.map(|code| code != 0).unwrap_or(false),
238                                }),
239                                raw_value.clone(),
240                                StreamId::Main,
241                            );
242                        }
243                    }
244
245                    schema::ResponseItemPayload::CustomToolCall(tool_call) => {
246                        // Parse JSON string input to Value
247                        let arguments = parse_json_arguments(&tool_call.input);
248
249                        let event_id = builder.build_and_push(
250                            &mut events,
251                            &base_id,
252                            SemanticSuffix::ToolCall,
253                            timestamp,
254                            EventPayload::ToolCall(super::mapper::normalize_codex_tool_call(
255                                tool_call.name.clone(),
256                                arguments,
257                                Some(tool_call.call_id.clone()),
258                            )),
259                            raw_value.clone(),
260                            StreamId::Main,
261                        );
262
263                        builder.register_tool_call(tool_call.call_id.clone(), event_id);
264                        _last_generation_event_id = Some(event_id);
265                    }
266
267                    schema::ResponseItemPayload::CustomToolCallOutput(output) => {
268                        let exit_code = extract_exit_code(&output.output);
269
270                        if let Some(tool_call_id) = builder.get_tool_call_uuid(&output.call_id) {
271                            builder.build_and_push(
272                                &mut events,
273                                &base_id,
274                                SemanticSuffix::ToolResult,
275                                timestamp,
276                                EventPayload::ToolResult(ToolResultPayload {
277                                    output: output.output.clone(),
278                                    tool_call_id,
279                                    is_error: exit_code.map(|code| code != 0).unwrap_or(false),
280                                }),
281                                raw_value.clone(),
282                                StreamId::Main,
283                            );
284                        }
285                    }
286
287                    schema::ResponseItemPayload::GhostSnapshot(_snapshot) => {
288                        // Skip ghost snapshots for now (file system events)
289                    }
290
291                    schema::ResponseItemPayload::Unknown => {
292                        // Skip unknown payload types
293                    }
294                }
295            }
296
297            CodexRecord::TurnContext(turn_context) => {
298                // Track model for downstream token usage + message events
299                last_seen_model = Some(turn_context.payload.model.clone());
300            }
301
302            CodexRecord::Unknown => {
303                // Skip unknown record types
304            }
305        }
306    }
307
308    events
309}
310
311/// Extract text from message content blocks
312fn extract_message_text(content: &[schema::MessageContent]) -> String {
313    content
314        .iter()
315        .filter_map(|c| match c {
316            schema::MessageContent::InputText { text } => Some(text.as_str()),
317            schema::MessageContent::OutputText { text } => Some(text.as_str()),
318            schema::MessageContent::Unknown => None,
319        })
320        .collect::<Vec<_>>()
321        .join("\n")
322}
323
324/// Extract text from reasoning summary blocks
325fn extract_reasoning_text(reasoning: &schema::ReasoningPayload) -> String {
326    let summary_text = reasoning
327        .summary
328        .iter()
329        .filter_map(|s| match s {
330            schema::SummaryText::SummaryText { text } => Some(text.as_str()),
331            schema::SummaryText::Unknown => None,
332        })
333        .collect::<Vec<_>>()
334        .join("\n");
335
336    // Prefer content over summary if available
337    reasoning
338        .content
339        .as_ref()
340        .unwrap_or(&summary_text)
341        .to_string()
342}
343
344/// Parse JSON string arguments to serde_json::Value
345/// If parsing fails, wrap the string in a JSON object
346fn parse_json_arguments(args: &str) -> serde_json::Value {
347    serde_json::from_str(args).unwrap_or_else(|_| {
348        // If not valid JSON, wrap in object
349        serde_json::json!({ "raw": args })
350    })
351}
352
353/// Extract exit code from output text using regex
354fn extract_exit_code(output: &str) -> Option<i32> {
355    EXIT_CODE_REGEX
356        .captures(output)
357        .and_then(|cap| cap.get(1))
358        .and_then(|m| m.as_str().parse().ok())
359}
360
361/// Parse Codex timestamp to DateTime<Utc>
362fn parse_timestamp(ts: &str) -> DateTime<chrono::Utc> {
363    DateTime::parse_from_rfc3339(ts)
364        .map(|dt| dt.with_timezone(&chrono::Utc))
365        .unwrap_or_else(|_| chrono::Utc::now())
366}
367
368/// Codex session parser implementation
369pub struct CodexParser;
370
371impl crate::traits::SessionParser for CodexParser {
372    fn parse_file(&self, path: &Path) -> Result<Vec<AgentEvent>> {
373        super::io::normalize_codex_file(path)
374    }
375
376    fn parse_record(&self, content: &str) -> Result<Option<AgentEvent>> {
377        // Codex uses JSONL format, parse as AgentEvent
378        match serde_json::from_str::<AgentEvent>(content) {
379            Ok(event) => Ok(Some(event)),
380            Err(_) => Ok(None), // Skip malformed lines
381        }
382    }
383}
384
385#[cfg(test)]
386mod tests {
387    use super::*;
388
389    #[test]
390    fn test_parse_json_arguments() {
391        // Valid JSON object
392        let valid = r#"{"command": "ls -la"}"#;
393        let result = parse_json_arguments(valid);
394        assert_eq!(result["command"], "ls -la");
395
396        // Valid JSON array
397        let array = r#"["arg1", "arg2"]"#;
398        let result = parse_json_arguments(array);
399        assert!(result.is_array());
400
401        // Invalid JSON - should wrap in object
402        let invalid = "not json";
403        let result = parse_json_arguments(invalid);
404        assert_eq!(result["raw"], "not json");
405    }
406
407    #[test]
408    fn test_extract_exit_code() {
409        assert_eq!(extract_exit_code("Exit Code: 0"), Some(0));
410        assert_eq!(extract_exit_code("Exit Code: 127"), Some(127));
411        assert_eq!(extract_exit_code("Some output\nExit Code: 1\n"), Some(1));
412        assert_eq!(extract_exit_code("No exit code here"), None);
413    }
414
415    #[test]
416    fn test_extract_message_text() {
417        let content = vec![
418            schema::MessageContent::InputText {
419                text: "Hello".to_string(),
420            },
421            schema::MessageContent::OutputText {
422                text: "World".to_string(),
423            },
424        ];
425        assert_eq!(extract_message_text(&content), "Hello\nWorld");
426    }
427}