1use agtrace_types::*;
2use anyhow::Result;
3use chrono::DateTime;
4use regex::Regex;
5use std::path::Path;
6use std::sync::LazyLock;
7use uuid::Uuid;
8
9use crate::builder::{EventBuilder, SemanticSuffix};
10use crate::codex::schema;
11use crate::codex::schema::CodexRecord;
12
13static EXIT_CODE_REGEX: LazyLock<Regex> =
16 LazyLock::new(|| Regex::new(r"Exit Code:\s*(\d+)").unwrap());
17
18fn attach_model_metadata(
20 metadata: Option<serde_json::Value>,
21 model: Option<&String>,
22) -> Option<serde_json::Value> {
23 let model = match model {
24 Some(m) => m.clone(),
25 None => return metadata,
26 };
27
28 match metadata {
29 Some(serde_json::Value::Object(mut map)) => {
30 map.entry("model")
31 .or_insert_with(|| serde_json::Value::String(model));
32 Some(serde_json::Value::Object(map))
33 }
34 Some(other) => {
35 let mut map = serde_json::Map::new();
36 map.insert("raw".to_string(), other);
37 map.insert("model".to_string(), serde_json::Value::String(model));
38 Some(serde_json::Value::Object(map))
39 }
40 None => {
41 let mut map = serde_json::Map::new();
42 map.insert("model".to_string(), serde_json::Value::String(model));
43 Some(serde_json::Value::Object(map))
44 }
45 }
46}
47
48pub(crate) fn normalize_codex_session(
51 records: Vec<CodexRecord>,
52 session_id: &str,
53) -> Vec<AgentEvent> {
54 let session_id_uuid = Uuid::new_v5(&Uuid::NAMESPACE_OID, session_id.as_bytes());
56 let mut builder = EventBuilder::new(session_id_uuid);
57 let mut events = Vec::new();
58 let mut last_seen_model: Option<String> = None;
59
60 let mut _last_generation_event_id: Option<Uuid> = None;
62
63 let mut last_seen_token_usage: Option<(i32, i32, i32)> = None;
66
67 for (row_index, record) in records.iter().enumerate() {
68 let base_id = format!("{}:row_{}", session_id, row_index);
70 match record {
71 CodexRecord::SessionMeta(_meta) => {
72 }
75
76 CodexRecord::EventMsg(event_msg) => {
77 let timestamp = parse_timestamp(&event_msg.timestamp);
78 let raw_value = attach_model_metadata(
79 serde_json::to_value(event_msg).ok(),
80 last_seen_model.as_ref(),
81 );
82
83 match &event_msg.payload {
84 schema::EventMsgPayload::UserMessage(_) => {
87 }
89
90 schema::EventMsgPayload::AgentMessage(_) => {
91 }
93
94 schema::EventMsgPayload::AgentReasoning(_) => {
95 }
97
98 schema::EventMsgPayload::TokenCount(token_count) => {
99 if let Some(info) = &token_count.info {
102 let usage = &info.last_token_usage;
103 let usage_triple = (
104 usage.input_tokens as i32,
105 usage.output_tokens as i32,
106 usage.total_tokens as i32,
107 );
108
109 if last_seen_token_usage == Some(usage_triple) {
111 continue;
113 }
114 last_seen_token_usage = Some(usage_triple);
115
116 builder.build_and_push(
117 &mut events,
118 &base_id,
119 SemanticSuffix::TokenUsage,
120 timestamp,
121 EventPayload::TokenUsage(TokenUsagePayload {
122 input_tokens: usage.input_tokens as i32,
123 output_tokens: usage.output_tokens as i32,
124 total_tokens: usage.total_tokens as i32,
125 details: Some(TokenUsageDetails {
126 cache_creation_input_tokens: None, cache_read_input_tokens: Some(
128 usage.cached_input_tokens as i32,
129 ),
130 reasoning_output_tokens: Some(
131 usage.reasoning_output_tokens as i32,
132 ),
133 }),
134 }),
135 raw_value.clone(),
136 StreamId::Main,
137 );
138 }
139 }
140
141 schema::EventMsgPayload::Unknown => {
142 }
144 }
145 }
146
147 CodexRecord::ResponseItem(response_item) => {
148 let timestamp = parse_timestamp(&response_item.timestamp);
149 let raw_value = attach_model_metadata(
150 serde_json::to_value(response_item).ok(),
151 last_seen_model.as_ref(),
152 );
153
154 match &response_item.payload {
155 schema::ResponseItemPayload::Message(message) => {
156 let text = extract_message_text(&message.content);
158
159 let (payload, suffix) = if message.role == "user" {
160 (
161 EventPayload::User(UserPayload { text }),
162 SemanticSuffix::User,
163 )
164 } else {
165 (
166 EventPayload::Message(MessagePayload { text }),
167 SemanticSuffix::Message,
168 )
169 };
170
171 let event_id = builder.build_and_push(
172 &mut events,
173 &base_id,
174 suffix,
175 timestamp,
176 payload,
177 raw_value.clone(),
178 StreamId::Main,
179 );
180
181 if message.role == "assistant" {
182 _last_generation_event_id = Some(event_id);
183 }
184 }
185
186 schema::ResponseItemPayload::Reasoning(reasoning) => {
187 let text = extract_reasoning_text(reasoning);
189
190 builder.build_and_push(
191 &mut events,
192 &base_id,
193 SemanticSuffix::Reasoning,
194 timestamp,
195 EventPayload::Reasoning(ReasoningPayload { text }),
196 raw_value.clone(),
197 StreamId::Main,
198 );
199 }
200
201 schema::ResponseItemPayload::FunctionCall(func_call) => {
202 let arguments = parse_json_arguments(&func_call.arguments);
204
205 let event_id = builder.build_and_push(
206 &mut events,
207 &base_id,
208 SemanticSuffix::ToolCall,
209 timestamp,
210 EventPayload::ToolCall(super::mapper::normalize_codex_tool_call(
211 func_call.name.clone(),
212 arguments,
213 Some(func_call.call_id.clone()),
214 )),
215 raw_value.clone(),
216 StreamId::Main,
217 );
218
219 builder.register_tool_call(func_call.call_id.clone(), event_id);
221 _last_generation_event_id = Some(event_id);
222 }
223
224 schema::ResponseItemPayload::FunctionCallOutput(output) => {
225 let exit_code = extract_exit_code(&output.output);
227
228 if let Some(tool_call_id) = builder.get_tool_call_uuid(&output.call_id) {
229 builder.build_and_push(
230 &mut events,
231 &base_id,
232 SemanticSuffix::ToolResult,
233 timestamp,
234 EventPayload::ToolResult(ToolResultPayload {
235 output: output.output.clone(),
236 tool_call_id,
237 is_error: exit_code.map(|code| code != 0).unwrap_or(false),
238 }),
239 raw_value.clone(),
240 StreamId::Main,
241 );
242 }
243 }
244
245 schema::ResponseItemPayload::CustomToolCall(tool_call) => {
246 let arguments = parse_json_arguments(&tool_call.input);
248
249 let event_id = builder.build_and_push(
250 &mut events,
251 &base_id,
252 SemanticSuffix::ToolCall,
253 timestamp,
254 EventPayload::ToolCall(super::mapper::normalize_codex_tool_call(
255 tool_call.name.clone(),
256 arguments,
257 Some(tool_call.call_id.clone()),
258 )),
259 raw_value.clone(),
260 StreamId::Main,
261 );
262
263 builder.register_tool_call(tool_call.call_id.clone(), event_id);
264 _last_generation_event_id = Some(event_id);
265 }
266
267 schema::ResponseItemPayload::CustomToolCallOutput(output) => {
268 let exit_code = extract_exit_code(&output.output);
269
270 if let Some(tool_call_id) = builder.get_tool_call_uuid(&output.call_id) {
271 builder.build_and_push(
272 &mut events,
273 &base_id,
274 SemanticSuffix::ToolResult,
275 timestamp,
276 EventPayload::ToolResult(ToolResultPayload {
277 output: output.output.clone(),
278 tool_call_id,
279 is_error: exit_code.map(|code| code != 0).unwrap_or(false),
280 }),
281 raw_value.clone(),
282 StreamId::Main,
283 );
284 }
285 }
286
287 schema::ResponseItemPayload::GhostSnapshot(_snapshot) => {
288 }
290
291 schema::ResponseItemPayload::Unknown => {
292 }
294 }
295 }
296
297 CodexRecord::TurnContext(turn_context) => {
298 last_seen_model = Some(turn_context.payload.model.clone());
300 }
301
302 CodexRecord::Unknown => {
303 }
305 }
306 }
307
308 events
309}
310
311fn extract_message_text(content: &[schema::MessageContent]) -> String {
313 content
314 .iter()
315 .filter_map(|c| match c {
316 schema::MessageContent::InputText { text } => Some(text.as_str()),
317 schema::MessageContent::OutputText { text } => Some(text.as_str()),
318 schema::MessageContent::Unknown => None,
319 })
320 .collect::<Vec<_>>()
321 .join("\n")
322}
323
324fn extract_reasoning_text(reasoning: &schema::ReasoningPayload) -> String {
326 let summary_text = reasoning
327 .summary
328 .iter()
329 .filter_map(|s| match s {
330 schema::SummaryText::SummaryText { text } => Some(text.as_str()),
331 schema::SummaryText::Unknown => None,
332 })
333 .collect::<Vec<_>>()
334 .join("\n");
335
336 reasoning
338 .content
339 .as_ref()
340 .unwrap_or(&summary_text)
341 .to_string()
342}
343
344fn parse_json_arguments(args: &str) -> serde_json::Value {
347 serde_json::from_str(args).unwrap_or_else(|_| {
348 serde_json::json!({ "raw": args })
350 })
351}
352
353fn extract_exit_code(output: &str) -> Option<i32> {
355 EXIT_CODE_REGEX
356 .captures(output)
357 .and_then(|cap| cap.get(1))
358 .and_then(|m| m.as_str().parse().ok())
359}
360
361fn parse_timestamp(ts: &str) -> DateTime<chrono::Utc> {
363 DateTime::parse_from_rfc3339(ts)
364 .map(|dt| dt.with_timezone(&chrono::Utc))
365 .unwrap_or_else(|_| chrono::Utc::now())
366}
367
368pub struct CodexParser;
370
371impl crate::traits::SessionParser for CodexParser {
372 fn parse_file(&self, path: &Path) -> Result<Vec<AgentEvent>> {
373 super::io::normalize_codex_file(path)
374 }
375
376 fn parse_record(&self, content: &str) -> Result<Option<AgentEvent>> {
377 match serde_json::from_str::<AgentEvent>(content) {
379 Ok(event) => Ok(Some(event)),
380 Err(_) => Ok(None), }
382 }
383}
384
385#[cfg(test)]
386mod tests {
387 use super::*;
388
389 #[test]
390 fn test_parse_json_arguments() {
391 let valid = r#"{"command": "ls -la"}"#;
393 let result = parse_json_arguments(valid);
394 assert_eq!(result["command"], "ls -la");
395
396 let array = r#"["arg1", "arg2"]"#;
398 let result = parse_json_arguments(array);
399 assert!(result.is_array());
400
401 let invalid = "not json";
403 let result = parse_json_arguments(invalid);
404 assert_eq!(result["raw"], "not json");
405 }
406
407 #[test]
408 fn test_extract_exit_code() {
409 assert_eq!(extract_exit_code("Exit Code: 0"), Some(0));
410 assert_eq!(extract_exit_code("Exit Code: 127"), Some(127));
411 assert_eq!(extract_exit_code("Some output\nExit Code: 1\n"), Some(1));
412 assert_eq!(extract_exit_code("No exit code here"), None);
413 }
414
415 #[test]
416 fn test_extract_message_text() {
417 let content = vec![
418 schema::MessageContent::InputText {
419 text: "Hello".to_string(),
420 },
421 schema::MessageContent::OutputText {
422 text: "World".to_string(),
423 },
424 ];
425 assert_eq!(extract_message_text(&content), "Hello\nWorld");
426 }
427}