mi6_core/input/codex_session/
parser.rs1use std::collections::HashSet;
7use std::fs::File;
8use std::io::{BufRead, BufReader, Seek, SeekFrom};
9use std::path::Path;
10
11use crate::model::error::TranscriptError;
12use crate::model::{Event, EventBuilder, EventType};
13
14use super::{CodexSessionEntry, FunctionCallMap, SessionMeta, parse_session_meta};
15
16#[derive(Debug, Clone, Default)]
18pub struct FilePosition {
19 pub offset: u64,
21 pub line_number: u64,
23 pub last_key: Option<String>,
25}
26
27impl FilePosition {
28 pub fn new() -> Self {
30 Self::default()
31 }
32}
33
34#[derive(Debug)]
36pub struct ParseResult {
37 pub events: Vec<Event>,
39 pub position: FilePosition,
41 pub keys: HashSet<String>,
43 pub lines_parsed: u64,
45 pub lines_skipped: u64,
47 pub parse_errors: u64,
49 pub session_meta: Option<SessionMeta>,
51}
52
53pub struct CodexSessionParser {
55 machine_id: String,
56}
57
58impl CodexSessionParser {
59 pub fn new(machine_id: impl Into<String>) -> Self {
61 Self {
62 machine_id: machine_id.into(),
63 }
64 }
65
66 pub fn parse_incremental(
70 &self,
71 path: &Path,
72 session_id: &str,
73 start_position: &FilePosition,
74 ) -> Result<ParseResult, TranscriptError> {
75 let file = File::open(path).map_err(|e| {
76 if e.kind() == std::io::ErrorKind::NotFound {
77 TranscriptError::NotFound(path.display().to_string())
78 } else {
79 TranscriptError::Io(e)
80 }
81 })?;
82
83 let file_len = file.metadata()?.len();
84
85 let start_offset = if start_position.offset > file_len {
87 0
88 } else {
89 start_position.offset
90 };
91
92 let mut reader = BufReader::new(file);
93 reader.seek(SeekFrom::Start(start_offset))?;
94
95 let mut events = Vec::new();
96 let mut keys = HashSet::new();
97 let mut current_offset = start_offset;
98 let mut line_number = start_position.line_number;
99 let mut last_key = start_position.last_key.clone();
100 let mut lines_parsed = 0u64;
101 let mut lines_skipped = 0u64;
102 let mut parse_errors = 0u64;
103
104 let mut function_call_map: FunctionCallMap = Default::default();
106 let mut session_meta: Option<SessionMeta> = None;
107 let mut turn_model: Option<String> = None;
108 let mut completed_call_ids: HashSet<String> = HashSet::new();
110
111 if start_offset == 0 {
114 let scan_file = File::open(path)?;
116 let scan_reader = BufReader::new(scan_file);
117 for line in scan_reader.lines() {
118 let line = match line {
119 Ok(l) => l,
120 Err(_) => break,
121 };
122 let trimmed = line.trim();
123 if trimmed.is_empty() {
124 continue;
125 }
126 if let Ok(entry) = serde_json::from_str::<CodexSessionEntry>(trimmed)
127 && entry.entry_type == "session_meta"
128 {
129 session_meta = parse_session_meta(&entry);
130 break;
131 }
132 }
133 }
134
135 let mut line = String::new();
136 loop {
137 line.clear();
138 let bytes_read = reader.read_line(&mut line)?;
139 if bytes_read == 0 {
140 break; }
142
143 let line_bytes = bytes_read as u64;
144 line_number += 1;
145
146 let trimmed = line.trim();
148 if trimmed.is_empty() {
149 current_offset += line_bytes;
150 continue;
151 }
152
153 match serde_json::from_str::<CodexSessionEntry>(trimmed) {
155 Ok(entry) => {
156 let key = entry.dedup_key();
158 keys.insert(key.clone());
159 last_key = Some(key);
160
161 if entry.entry_type == "response_item"
163 && let Some(payload_type) =
164 entry.payload.get("type").and_then(|v| v.as_str())
165 && payload_type == "function_call_output"
166 && let Some(call_id) = entry.payload.get("call_id").and_then(|v| v.as_str())
167 {
168 completed_call_ids.insert(call_id.to_string());
169 }
170
171 if entry.entry_type == "session_meta" && session_meta.is_none() {
173 session_meta = parse_session_meta(&entry);
174 lines_skipped += 1;
175 } else {
176 let entry_events = entry.into_events(
178 &self.machine_id,
179 session_id,
180 &mut function_call_map,
181 &session_meta,
182 &mut turn_model,
183 );
184
185 if entry_events.is_empty() {
186 lines_skipped += 1;
187 } else {
188 events.extend(entry_events);
189 lines_parsed += 1;
190 }
191 }
192 }
193 Err(_) => {
194 parse_errors += 1;
195 }
196 }
197
198 current_offset += line_bytes;
199 }
200
201 for (call_id, call_info) in &function_call_map {
206 if call_info.requires_approval && !completed_call_ids.contains(call_id) {
207 let mut builder =
208 EventBuilder::new(&self.machine_id, EventType::PermissionRequest, session_id)
209 .source("codex_session")
210 .framework("codex")
211 .timestamp_opt(call_info.timestamp)
212 .tool(call_id.clone(), call_info.name.clone());
213
214 if let Some(meta) = &session_meta {
216 builder = builder.cwd_opt(meta.cwd.clone());
217 if let Some(ref git) = meta.git {
218 builder = builder.git_branch_opt(git.branch.clone());
219 }
220 }
221
222 events.push(builder.build());
223 }
224 }
225
226 let new_position = FilePosition {
227 offset: current_offset,
228 line_number,
229 last_key,
230 };
231
232 Ok(ParseResult {
233 events,
234 position: new_position,
235 keys,
236 lines_parsed,
237 lines_skipped,
238 parse_errors,
239 session_meta,
240 })
241 }
242
243 pub fn parse_full(
245 &self,
246 path: &Path,
247 session_id: &str,
248 ) -> Result<ParseResult, TranscriptError> {
249 self.parse_incremental(path, session_id, &FilePosition::new())
250 }
251}
252
253#[cfg(test)]
254mod tests {
255 use super::*;
256 use std::io::Write;
257 use tempfile::NamedTempFile;
258
259 fn create_test_file(content: &str) -> NamedTempFile {
260 let mut file = NamedTempFile::new().unwrap();
261 file.write_all(content.as_bytes()).unwrap();
262 file.flush().unwrap();
263 file
264 }
265
266 #[test]
267 fn test_parse_empty_file() {
268 let file = create_test_file("");
269 let parser = CodexSessionParser::new("machine-1");
270 let result = parser.parse_full(file.path(), "test-session").unwrap();
271
272 assert!(result.events.is_empty());
273 assert_eq!(result.lines_parsed, 0);
274 }
275
276 #[test]
277 fn test_parse_session_with_token_count() {
278 let content = r#"{"timestamp":"2025-11-27T01:55:56.451Z","type":"session_meta","payload":{"id":"test-123","timestamp":"2025-11-27T01:55:56.369Z","cwd":"/test"}}
279{"timestamp":"2025-11-27T01:56:10.186Z","type":"event_msg","payload":{"type":"token_count","info":{"last_token_usage":{"input_tokens":100,"cached_input_tokens":50,"output_tokens":25},"model_context_window":128000}}}"#;
280
281 let file = create_test_file(content);
282 let parser = CodexSessionParser::new("machine-1");
283 let result = parser.parse_full(file.path(), "test-session").unwrap();
284
285 assert_eq!(result.events.len(), 1);
286 assert!(result.session_meta.is_some());
287 assert_eq!(result.session_meta.as_ref().unwrap().id, "test-123");
288
289 let event = &result.events[0];
290 assert_eq!(event.tokens_input, Some(100));
291 assert_eq!(event.tokens_output, Some(25));
292 assert_eq!(event.tokens_cache_read, Some(50));
293 }
294
295 #[test]
296 fn test_parse_function_calls() {
297 let content = r#"{"timestamp":"2025-11-27T01:56:10.186Z","type":"response_item","payload":{"type":"function_call","name":"shell_command","arguments":"{\"command\":\"ls\"}","call_id":"call_1"}}
298{"timestamp":"2025-11-27T01:56:10.300Z","type":"response_item","payload":{"type":"function_call_output","call_id":"call_1","output":"file.txt"}}"#;
299
300 let file = create_test_file(content);
301 let parser = CodexSessionParser::new("machine-1");
302 let result = parser.parse_full(file.path(), "test-session").unwrap();
303
304 assert_eq!(result.events.len(), 2);
305 assert_eq!(
306 result.events[0].event_type,
307 crate::model::EventType::PreToolUse
308 );
309 assert_eq!(
310 result.events[1].event_type,
311 crate::model::EventType::PostToolUse
312 );
313 }
314
315 #[test]
316 fn test_pending_approval_generates_permission_request() {
317 let content = r#"{"timestamp":"2025-11-27T01:56:10.186Z","type":"response_item","payload":{"type":"function_call","name":"shell_command","arguments":"{\"command\":\"rm /tmp/test\",\"sandbox_permissions\":\"require_escalated\"}","call_id":"call_1"}}"#;
319
320 let file = create_test_file(content);
321 let parser = CodexSessionParser::new("machine-1");
322 let result = parser.parse_full(file.path(), "test-session").unwrap();
323
324 assert_eq!(result.events.len(), 2);
326 assert_eq!(
327 result.events[0].event_type,
328 crate::model::EventType::PreToolUse
329 );
330 assert_eq!(
331 result.events[1].event_type,
332 crate::model::EventType::PermissionRequest
333 );
334 assert_eq!(
336 result.events[1].tool_name,
337 Some("shell_command".to_string())
338 );
339 }
340
341 #[test]
342 fn test_completed_approval_no_permission_request() {
343 let content = r#"{"timestamp":"2025-11-27T01:56:10.186Z","type":"response_item","payload":{"type":"function_call","name":"shell_command","arguments":"{\"command\":\"rm /tmp/test\",\"sandbox_permissions\":\"require_escalated\"}","call_id":"call_1"}}
345{"timestamp":"2025-11-27T01:56:10.300Z","type":"response_item","payload":{"type":"function_call_output","call_id":"call_1","output":"done"}}"#;
346
347 let file = create_test_file(content);
348 let parser = CodexSessionParser::new("machine-1");
349 let result = parser.parse_full(file.path(), "test-session").unwrap();
350
351 assert_eq!(result.events.len(), 2);
353 assert_eq!(
354 result.events[0].event_type,
355 crate::model::EventType::PreToolUse
356 );
357 assert_eq!(
358 result.events[1].event_type,
359 crate::model::EventType::PostToolUse
360 );
361 }
362
363 #[test]
364 fn test_pending_call_without_escalation_no_permission_request() {
365 let content = r#"{"timestamp":"2025-11-27T01:56:10.186Z","type":"response_item","payload":{"type":"function_call","name":"shell_command","arguments":"{\"command\":\"ls\"}","call_id":"call_1"}}"#;
367
368 let file = create_test_file(content);
369 let parser = CodexSessionParser::new("machine-1");
370 let result = parser.parse_full(file.path(), "test-session").unwrap();
371
372 assert_eq!(result.events.len(), 1);
374 assert_eq!(
375 result.events[0].event_type,
376 crate::model::EventType::PreToolUse
377 );
378 }
379
380 #[test]
381 fn test_incremental_parsing() {
382 let line1 = r#"{"timestamp":"2025-11-27T01:56:10.186Z","type":"event_msg","payload":{"type":"token_count","info":{"last_token_usage":{"input_tokens":100,"output_tokens":25}}}}"#;
383 let content = format!("{}\n", line1);
384
385 let file = create_test_file(&content);
386 let parser = CodexSessionParser::new("machine-1");
387
388 let result1 = parser.parse_full(file.path(), "test-session").unwrap();
390 assert_eq!(result1.events.len(), 1);
391
392 let mut f = std::fs::OpenOptions::new()
394 .append(true)
395 .open(file.path())
396 .unwrap();
397 let line2 = r#"{"timestamp":"2025-11-27T01:56:11.000Z","type":"event_msg","payload":{"type":"token_count","info":{"last_token_usage":{"input_tokens":200,"output_tokens":50}}}}"#;
398 writeln!(f, "{}", line2).unwrap();
399
400 let result2 = parser
402 .parse_incremental(file.path(), "test-session", &result1.position)
403 .unwrap();
404 assert_eq!(result2.events.len(), 1);
405 assert_eq!(result2.events[0].tokens_input, Some(200));
406 }
407
408 #[test]
409 fn test_file_not_found() {
410 let parser = CodexSessionParser::new("machine-1");
411 let result = parser.parse_full(Path::new("/nonexistent/path.jsonl"), "test");
412
413 assert!(matches!(result, Err(TranscriptError::NotFound(_))));
414 }
415
416 #[test]
417 fn test_parse_errors_tracked() {
418 let content = r#"{"timestamp":"2025-11-27T01:56:10.186Z","type":"event_msg","payload":{"type":"token_count","info":{"last_token_usage":{"input_tokens":100,"output_tokens":25}}}}
419not valid json
420{"incomplete"#;
421
422 let file = create_test_file(content);
423 let parser = CodexSessionParser::new("machine-1");
424 let result = parser.parse_full(file.path(), "test-session").unwrap();
425
426 assert_eq!(result.events.len(), 1);
427 assert_eq!(result.parse_errors, 2);
428 }
429
430 #[test]
431 fn test_user_message_parsing() {
432 let content = r#"{"timestamp":"2025-11-27T01:56:07.612Z","type":"event_msg","payload":{"type":"user_message","message":"Hello!","images":[]}}"#;
433
434 let file = create_test_file(content);
435 let parser = CodexSessionParser::new("machine-1");
436 let result = parser.parse_full(file.path(), "test-session").unwrap();
437
438 assert_eq!(result.events.len(), 1);
439 assert_eq!(
440 result.events[0].event_type,
441 crate::model::EventType::UserPromptSubmit
442 );
443 }
444}