agtrace_providers/codex/
parser.rs

1use crate::Result;
2use agtrace_types::*;
3use chrono::DateTime;
4use regex::Regex;
5use std::path::Path;
6use std::sync::LazyLock;
7use uuid::Uuid;
8
9use crate::builder::{EventBuilder, SemanticSuffix};
10use crate::codex::schema;
11use crate::codex::schema::CodexRecord;
12
13/// Regex for extracting exit codes from Codex output
14/// Example: "Exit code: 0" or "Exit Code: 0" (case-insensitive)
15static EXIT_CODE_REGEX: LazyLock<Regex> =
16    LazyLock::new(|| Regex::new(r"(?i)Exit Code:\s*(\d+)").unwrap());
17
18/// Determine StreamId from Codex subagent_type
19fn determine_stream_id(subagent_type: &Option<String>) -> StreamId {
20    match subagent_type {
21        Some(name) => StreamId::Subagent { name: name.clone() },
22        None => StreamId::Main,
23    }
24}
25
26/// Attach model to event metadata when available
27fn attach_model_metadata(
28    metadata: Option<serde_json::Value>,
29    model: Option<&String>,
30) -> Option<serde_json::Value> {
31    let model = match model {
32        Some(m) => m.clone(),
33        None => return metadata,
34    };
35
36    match metadata {
37        Some(serde_json::Value::Object(mut map)) => {
38            map.entry("model")
39                .or_insert_with(|| serde_json::Value::String(model));
40            Some(serde_json::Value::Object(map))
41        }
42        Some(other) => {
43            let mut map = serde_json::Map::new();
44            map.insert("raw".to_string(), other);
45            map.insert("model".to_string(), serde_json::Value::String(model));
46            Some(serde_json::Value::Object(map))
47        }
48        None => {
49            let mut map = serde_json::Map::new();
50            map.insert("model".to_string(), serde_json::Value::String(model));
51            Some(serde_json::Value::Object(map))
52        }
53    }
54}
55
56/// Normalize Codex session records to events
57/// Handles async token notifications, JSON string parsing, and exit code extraction
58pub(crate) fn normalize_codex_session(
59    records: Vec<CodexRecord>,
60    session_id: &str,
61    subagent_type: Option<String>,
62) -> Vec<AgentEvent> {
63    // Create session_id UUID from session_id string (deterministic)
64    let session_id_uuid = Uuid::new_v5(&Uuid::NAMESPACE_OID, session_id.as_bytes());
65    let mut builder = EventBuilder::new(session_id_uuid);
66    let mut events = Vec::new();
67    let mut last_seen_model: Option<String> = None;
68
69    // Determine stream_id from subagent_type
70    let stream_id = determine_stream_id(&subagent_type);
71
72    // Track last generation event for attaching TokenUsage (future use)
73    let mut _last_generation_event_id: Option<Uuid> = None;
74
75    // Track last seen token usage to deduplicate
76    // Codex sends duplicate token_count events with same last_token_usage values
77    let mut last_seen_token_usage: Option<(i32, i32, i32)> = None;
78
79    for (row_index, record) in records.iter().enumerate() {
80        // Generate base_id from session_id + row_index (deterministic)
81        let base_id = format!("{}:row_{}", session_id, row_index);
82        match record {
83            CodexRecord::SessionMeta(_meta) => {
84                // SessionMeta doesn't generate events
85                // Metadata is preserved in raw field if needed
86                // Subagent information is extracted during header scanning (see io::extract_codex_header)
87            }
88
89            CodexRecord::EventMsg(event_msg) => {
90                let timestamp = parse_timestamp(&event_msg.timestamp);
91                let raw_value = attach_model_metadata(
92                    serde_json::to_value(event_msg).ok(),
93                    last_seen_model.as_ref(),
94                );
95
96                match &event_msg.payload {
97                    // Skip user_message, agent_message, agent_reasoning
98                    // These are duplicated in ResponseItem with richer data (encrypted_content, etc.)
99                    schema::EventMsgPayload::UserMessage(_) => {
100                        // Skip: duplicated in ResponseItem::Message(user)
101                    }
102
103                    schema::EventMsgPayload::AgentMessage(_) => {
104                        // Skip: duplicated in ResponseItem::Message(assistant)
105                    }
106
107                    schema::EventMsgPayload::AgentReasoning(_) => {
108                        // Skip: duplicated in ResponseItem::Reasoning
109                    }
110
111                    schema::EventMsgPayload::TokenCount(token_count) => {
112                        // TokenUsage sidecar event
113                        // IMPORTANT: Keep this - token_count only exists in event_msg, not in response_item
114                        if let Some(info) = &token_count.info {
115                            let usage = &info.last_token_usage;
116                            let usage_triple = (
117                                usage.input_tokens as i32,
118                                usage.output_tokens as i32,
119                                usage.total_tokens as i32,
120                            );
121
122                            // Deduplicate: Codex often sends duplicate token_count with same last_token_usage
123                            if last_seen_token_usage == Some(usage_triple) {
124                                // Skip duplicate
125                                continue;
126                            }
127                            last_seen_token_usage = Some(usage_triple);
128
129                            // Codex Token Conversion Rationale:
130                            //
131                            // Input mapping (verified from codex-rs implementation):
132                            //   cached   = cached_input_tokens (explicit field)
133                            //   uncached = input_tokens - cached_input_tokens
134                            //              (codex-rs provides non_cached_input() helper for this)
135                            //
136                            // Output mapping (verified from codex-rs schema):
137                            //   generated = output_tokens (normal generation)
138                            //   reasoning = reasoning_output_tokens (explicit field for o1-style reasoning)
139                            //   tool      = 0 (Codex does not separate tool call tokens)
140                            builder.build_and_push(
141                                &mut events,
142                                &base_id,
143                                SemanticSuffix::TokenUsage,
144                                timestamp,
145                                EventPayload::TokenUsage(TokenUsagePayload::new(
146                                    TokenInput::new(
147                                        usage.cached_input_tokens as u64,
148                                        usage.input_tokens.saturating_sub(usage.cached_input_tokens)
149                                            as u64,
150                                    ),
151                                    TokenOutput::new(
152                                        usage.output_tokens as u64,
153                                        usage.reasoning_output_tokens as u64,
154                                        0, // Codex doesn't separate tool tokens
155                                    ),
156                                )),
157                                raw_value.clone(),
158                                stream_id.clone(),
159                            );
160                        }
161                    }
162
163                    schema::EventMsgPayload::EnteredReviewMode(_) => {
164                        // Skip: this is a spawn signal, not content
165                        // Used for parent-child correlation in discovery phase
166                    }
167
168                    schema::EventMsgPayload::Unknown => {
169                        // Skip unknown event types
170                    }
171                }
172            }
173
174            CodexRecord::ResponseItem(response_item) => {
175                let timestamp = parse_timestamp(&response_item.timestamp);
176                let raw_value = attach_model_metadata(
177                    serde_json::to_value(response_item).ok(),
178                    last_seen_model.as_ref(),
179                );
180
181                match &response_item.payload {
182                    schema::ResponseItemPayload::Message(message) => {
183                        // Extract text from content blocks
184                        let text = extract_message_text(&message.content);
185
186                        let (payload, suffix) = if message.role == "user" {
187                            (
188                                EventPayload::User(UserPayload { text }),
189                                SemanticSuffix::User,
190                            )
191                        } else {
192                            (
193                                EventPayload::Message(MessagePayload { text }),
194                                SemanticSuffix::Message,
195                            )
196                        };
197
198                        let event_id = builder.build_and_push(
199                            &mut events,
200                            &base_id,
201                            suffix,
202                            timestamp,
203                            payload,
204                            raw_value.clone(),
205                            stream_id.clone(),
206                        );
207
208                        if message.role == "assistant" {
209                            _last_generation_event_id = Some(event_id);
210                        }
211                    }
212
213                    schema::ResponseItemPayload::Reasoning(reasoning) => {
214                        // Extract text from summary blocks
215                        let text = extract_reasoning_text(reasoning);
216
217                        builder.build_and_push(
218                            &mut events,
219                            &base_id,
220                            SemanticSuffix::Reasoning,
221                            timestamp,
222                            EventPayload::Reasoning(ReasoningPayload { text }),
223                            raw_value.clone(),
224                            stream_id.clone(),
225                        );
226                    }
227
228                    schema::ResponseItemPayload::FunctionCall(func_call) => {
229                        // Parse JSON string arguments to Value
230                        let arguments = parse_json_arguments(&func_call.arguments);
231
232                        let event_id = builder.build_and_push(
233                            &mut events,
234                            &base_id,
235                            SemanticSuffix::ToolCall,
236                            timestamp,
237                            EventPayload::ToolCall(super::mapper::normalize_codex_tool_call(
238                                func_call.name.clone(),
239                                arguments,
240                                Some(func_call.call_id.clone()),
241                            )),
242                            raw_value.clone(),
243                            stream_id.clone(),
244                        );
245
246                        // Register tool call mapping
247                        builder.register_tool_call(func_call.call_id.clone(), event_id);
248                        _last_generation_event_id = Some(event_id);
249                    }
250
251                    schema::ResponseItemPayload::FunctionCallOutput(output) => {
252                        // Extract exit code from output text
253                        let exit_code = extract_exit_code(&output.output);
254
255                        if let Some(tool_call_id) = builder.get_tool_call_uuid(&output.call_id) {
256                            builder.build_and_push(
257                                &mut events,
258                                &base_id,
259                                SemanticSuffix::ToolResult,
260                                timestamp,
261                                EventPayload::ToolResult(ToolResultPayload {
262                                    output: output.output.clone(),
263                                    tool_call_id,
264                                    is_error: exit_code.map(|code| code != 0).unwrap_or(false),
265                                    agent_id: None,
266                                }),
267                                raw_value.clone(),
268                                stream_id.clone(),
269                            );
270                        }
271                    }
272
273                    schema::ResponseItemPayload::CustomToolCall(tool_call) => {
274                        // Parse JSON string input to Value
275                        let arguments = parse_json_arguments(&tool_call.input);
276
277                        let event_id = builder.build_and_push(
278                            &mut events,
279                            &base_id,
280                            SemanticSuffix::ToolCall,
281                            timestamp,
282                            EventPayload::ToolCall(super::mapper::normalize_codex_tool_call(
283                                tool_call.name.clone(),
284                                arguments,
285                                Some(tool_call.call_id.clone()),
286                            )),
287                            raw_value.clone(),
288                            stream_id.clone(),
289                        );
290
291                        builder.register_tool_call(tool_call.call_id.clone(), event_id);
292                        _last_generation_event_id = Some(event_id);
293                    }
294
295                    schema::ResponseItemPayload::CustomToolCallOutput(output) => {
296                        let exit_code = extract_exit_code(&output.output);
297
298                        if let Some(tool_call_id) = builder.get_tool_call_uuid(&output.call_id) {
299                            builder.build_and_push(
300                                &mut events,
301                                &base_id,
302                                SemanticSuffix::ToolResult,
303                                timestamp,
304                                EventPayload::ToolResult(ToolResultPayload {
305                                    output: output.output.clone(),
306                                    tool_call_id,
307                                    is_error: exit_code.map(|code| code != 0).unwrap_or(false),
308                                    agent_id: None,
309                                }),
310                                raw_value.clone(),
311                                stream_id.clone(),
312                            );
313                        }
314                    }
315
316                    schema::ResponseItemPayload::GhostSnapshot(_snapshot) => {
317                        // Skip ghost snapshots for now (file system events)
318                    }
319
320                    schema::ResponseItemPayload::Unknown => {
321                        // Skip unknown payload types
322                    }
323                }
324            }
325
326            CodexRecord::TurnContext(turn_context) => {
327                // Track model for downstream token usage + message events
328                last_seen_model = Some(turn_context.payload.model.clone());
329            }
330
331            CodexRecord::Unknown => {
332                // Skip unknown record types
333            }
334        }
335    }
336
337    events
338}
339
340/// Extract text from message content blocks
341fn extract_message_text(content: &[schema::MessageContent]) -> String {
342    content
343        .iter()
344        .filter_map(|c| match c {
345            schema::MessageContent::InputText { text } => Some(text.as_str()),
346            schema::MessageContent::OutputText { text } => Some(text.as_str()),
347            schema::MessageContent::Unknown => None,
348        })
349        .collect::<Vec<_>>()
350        .join("\n")
351}
352
353/// Extract text from reasoning summary blocks
354fn extract_reasoning_text(reasoning: &schema::ReasoningPayload) -> String {
355    let summary_text = reasoning
356        .summary
357        .iter()
358        .filter_map(|s| match s {
359            schema::SummaryText::SummaryText { text } => Some(text.as_str()),
360            schema::SummaryText::Unknown => None,
361        })
362        .collect::<Vec<_>>()
363        .join("\n");
364
365    // Prefer content over summary if available
366    reasoning
367        .content
368        .as_ref()
369        .unwrap_or(&summary_text)
370        .to_string()
371}
372
373/// Parse JSON string arguments to serde_json::Value
374/// If parsing fails, wrap the string in a JSON object
375fn parse_json_arguments(args: &str) -> serde_json::Value {
376    serde_json::from_str(args).unwrap_or_else(|_| {
377        // If not valid JSON, wrap in object
378        serde_json::json!({ "raw": args })
379    })
380}
381
382/// Extract exit code from output text using regex
383fn extract_exit_code(output: &str) -> Option<i32> {
384    EXIT_CODE_REGEX
385        .captures(output)
386        .and_then(|cap| cap.get(1))
387        .and_then(|m| m.as_str().parse().ok())
388}
389
390/// Parse Codex timestamp to DateTime<Utc>
391fn parse_timestamp(ts: &str) -> DateTime<chrono::Utc> {
392    DateTime::parse_from_rfc3339(ts)
393        .map(|dt| dt.with_timezone(&chrono::Utc))
394        .unwrap_or_else(|_| chrono::Utc::now())
395}
396
397/// Codex session parser implementation
398pub struct CodexParser;
399
400impl crate::traits::SessionParser for CodexParser {
401    fn parse_file(&self, path: &Path) -> Result<Vec<AgentEvent>> {
402        super::io::normalize_codex_file(path)
403    }
404
405    fn parse_record(&self, content: &str) -> Result<Option<AgentEvent>> {
406        // Codex uses JSONL format, parse as AgentEvent
407        match serde_json::from_str::<AgentEvent>(content) {
408            Ok(event) => Ok(Some(event)),
409            Err(_) => Ok(None), // Skip malformed lines
410        }
411    }
412}
413
414#[cfg(test)]
415mod tests {
416    use super::*;
417
418    #[test]
419    fn test_parse_json_arguments() {
420        // Valid JSON object
421        let valid = r#"{"command": "ls -la"}"#;
422        let result = parse_json_arguments(valid);
423        assert_eq!(result["command"], "ls -la");
424
425        // Valid JSON array
426        let array = r#"["arg1", "arg2"]"#;
427        let result = parse_json_arguments(array);
428        assert!(result.is_array());
429
430        // Invalid JSON - should wrap in object
431        let invalid = "not json";
432        let result = parse_json_arguments(invalid);
433        assert_eq!(result["raw"], "not json");
434    }
435
436    #[test]
437    fn test_extract_exit_code() {
438        // Uppercase (legacy format)
439        assert_eq!(extract_exit_code("Exit Code: 0"), Some(0));
440        assert_eq!(extract_exit_code("Exit Code: 127"), Some(127));
441        assert_eq!(extract_exit_code("Some output\nExit Code: 1\n"), Some(1));
442
443        // Lowercase (actual Codex format)
444        assert_eq!(extract_exit_code("Exit code: 0"), Some(0));
445        assert_eq!(extract_exit_code("Exit code: 127"), Some(127));
446        assert_eq!(extract_exit_code("Some output\nExit code: 1\n"), Some(1));
447
448        // Mixed case
449        assert_eq!(extract_exit_code("EXIT CODE: 42"), Some(42));
450
451        // No match
452        assert_eq!(extract_exit_code("No exit code here"), None);
453    }
454
455    #[test]
456    fn test_extract_message_text() {
457        let content = vec![
458            schema::MessageContent::InputText {
459                text: "Hello".to_string(),
460            },
461            schema::MessageContent::OutputText {
462                text: "World".to_string(),
463            },
464        ];
465        assert_eq!(extract_message_text(&content), "Hello\nWorld");
466    }
467}