mi6_core/input/codex_session/
parser.rs

1//! Incremental Codex session file parser.
2//!
3//! Parses Codex CLI JSONL session files efficiently by tracking
4//! file positions and only reading new content.
5
6use std::collections::HashSet;
7use std::fs::File;
8use std::io::{BufRead, BufReader, Seek, SeekFrom};
9use std::path::Path;
10
11use crate::model::error::TranscriptError;
12use crate::model::{Event, EventBuilder, EventType};
13
14use super::{CodexSessionEntry, FunctionCallMap, SessionMeta, parse_session_meta};
15
16/// Tracks file read position for incremental parsing.
17#[derive(Debug, Clone, Default)]
18pub struct FilePosition {
19    /// Byte offset in file
20    pub offset: u64,
21    /// Last line number processed (1-based, for debugging)
22    pub line_number: u64,
23    /// Last dedup key processed (for validation)
24    pub last_key: Option<String>,
25}
26
27impl FilePosition {
28    /// Create a new position at the start of file.
29    pub fn new() -> Self {
30        Self::default()
31    }
32}
33
34/// Result of parsing a Codex session file.
35#[derive(Debug)]
36pub struct ParseResult {
37    /// Events extracted from the session file
38    pub events: Vec<Event>,
39    /// New file position after parsing
40    pub position: FilePosition,
41    /// Dedup keys seen during this parse
42    pub keys: HashSet<String>,
43    /// Number of lines parsed into events
44    pub lines_parsed: u64,
45    /// Number of lines skipped (non-event entries)
46    pub lines_skipped: u64,
47    /// Number of lines that failed to parse as JSON
48    pub parse_errors: u64,
49    /// Session metadata if found
50    pub session_meta: Option<SessionMeta>,
51}
52
53/// Parser for Codex CLI session JSONL files.
54pub struct CodexSessionParser {
55    machine_id: String,
56}
57
58impl CodexSessionParser {
59    /// Create a new parser with the given machine ID.
60    pub fn new(machine_id: impl Into<String>) -> Self {
61        Self {
62            machine_id: machine_id.into(),
63        }
64    }
65
66    /// Parse new entries from a session file starting at the given position.
67    ///
68    /// Returns events, updated position, and dedup keys seen.
69    pub fn parse_incremental(
70        &self,
71        path: &Path,
72        session_id: &str,
73        start_position: &FilePosition,
74    ) -> Result<ParseResult, TranscriptError> {
75        let file = File::open(path).map_err(|e| {
76            if e.kind() == std::io::ErrorKind::NotFound {
77                TranscriptError::NotFound(path.display().to_string())
78            } else {
79                TranscriptError::Io(e)
80            }
81        })?;
82
83        let file_len = file.metadata()?.len();
84
85        // If position is past file end, file may have been truncated - reset
86        let start_offset = if start_position.offset > file_len {
87            0
88        } else {
89            start_position.offset
90        };
91
92        let mut reader = BufReader::new(file);
93        reader.seek(SeekFrom::Start(start_offset))?;
94
95        let mut events = Vec::new();
96        let mut keys = HashSet::new();
97        let mut current_offset = start_offset;
98        let mut line_number = start_position.line_number;
99        let mut last_key = start_position.last_key.clone();
100        let mut lines_parsed = 0u64;
101        let mut lines_skipped = 0u64;
102        let mut parse_errors = 0u64;
103
104        // Track state across entries
105        let mut function_call_map: FunctionCallMap = Default::default();
106        let mut session_meta: Option<SessionMeta> = None;
107        let mut turn_model: Option<String> = None;
108        // Track which function calls have completed (have function_call_output)
109        let mut completed_call_ids: HashSet<String> = HashSet::new();
110
111        // If starting from offset 0, we need to scan for session_meta first
112        // to get context. We'll do this in a first pass if at start.
113        if start_offset == 0 {
114            // Quick scan for session_meta (usually first line)
115            let scan_file = File::open(path)?;
116            let scan_reader = BufReader::new(scan_file);
117            for line in scan_reader.lines() {
118                let line = match line {
119                    Ok(l) => l,
120                    Err(_) => break,
121                };
122                let trimmed = line.trim();
123                if trimmed.is_empty() {
124                    continue;
125                }
126                if let Ok(entry) = serde_json::from_str::<CodexSessionEntry>(trimmed)
127                    && entry.entry_type == "session_meta"
128                {
129                    session_meta = parse_session_meta(&entry);
130                    break;
131                }
132            }
133        }
134
135        let mut line = String::new();
136        loop {
137            line.clear();
138            let bytes_read = reader.read_line(&mut line)?;
139            if bytes_read == 0 {
140                break; // EOF
141            }
142
143            let line_bytes = bytes_read as u64;
144            line_number += 1;
145
146            // Skip empty lines
147            let trimmed = line.trim();
148            if trimmed.is_empty() {
149                current_offset += line_bytes;
150                continue;
151            }
152
153            // Try to parse as CodexSessionEntry
154            match serde_json::from_str::<CodexSessionEntry>(trimmed) {
155                Ok(entry) => {
156                    // Track key for deduplication
157                    let key = entry.dedup_key();
158                    keys.insert(key.clone());
159                    last_key = Some(key);
160
161                    // Track completed function calls (those with function_call_output)
162                    if entry.entry_type == "response_item"
163                        && let Some(payload_type) =
164                            entry.payload.get("type").and_then(|v| v.as_str())
165                        && payload_type == "function_call_output"
166                        && let Some(call_id) = entry.payload.get("call_id").and_then(|v| v.as_str())
167                    {
168                        completed_call_ids.insert(call_id.to_string());
169                    }
170
171                    // Check for session_meta to update context
172                    if entry.entry_type == "session_meta" && session_meta.is_none() {
173                        session_meta = parse_session_meta(&entry);
174                        lines_skipped += 1;
175                    } else {
176                        // Convert to events
177                        let entry_events = entry.into_events(
178                            &self.machine_id,
179                            session_id,
180                            &mut function_call_map,
181                            &session_meta,
182                            &mut turn_model,
183                        );
184
185                        if entry_events.is_empty() {
186                            lines_skipped += 1;
187                        } else {
188                            events.extend(entry_events);
189                            lines_parsed += 1;
190                        }
191                    }
192                }
193                Err(_) => {
194                    parse_errors += 1;
195                }
196            }
197
198            current_offset += line_bytes;
199        }
200
201        // Generate PermissionRequest events for pending approval calls
202        // A pending approval call is one that:
203        // 1. Has requires_approval = true (sandbox_permissions: "require_escalated")
204        // 2. Does not have a corresponding function_call_output
205        for (call_id, call_info) in &function_call_map {
206            if call_info.requires_approval && !completed_call_ids.contains(call_id) {
207                let mut builder =
208                    EventBuilder::new(&self.machine_id, EventType::PermissionRequest, session_id)
209                        .source("codex_session")
210                        .framework("codex")
211                        .timestamp_opt(call_info.timestamp)
212                        .tool(call_id.clone(), call_info.name.clone());
213
214                // Add cwd and git from session meta
215                if let Some(meta) = &session_meta {
216                    builder = builder.cwd_opt(meta.cwd.clone());
217                    if let Some(ref git) = meta.git {
218                        builder = builder.git_branch_opt(git.branch.clone());
219                    }
220                }
221
222                events.push(builder.build());
223            }
224        }
225
226        let new_position = FilePosition {
227            offset: current_offset,
228            line_number,
229            last_key,
230        };
231
232        Ok(ParseResult {
233            events,
234            position: new_position,
235            keys,
236            lines_parsed,
237            lines_skipped,
238            parse_errors,
239            session_meta,
240        })
241    }
242
243    /// Parse entire session file from the beginning.
244    pub fn parse_full(
245        &self,
246        path: &Path,
247        session_id: &str,
248    ) -> Result<ParseResult, TranscriptError> {
249        self.parse_incremental(path, session_id, &FilePosition::new())
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use super::*;
256    use std::io::Write;
257    use tempfile::NamedTempFile;
258
259    fn create_test_file(content: &str) -> NamedTempFile {
260        let mut file = NamedTempFile::new().unwrap();
261        file.write_all(content.as_bytes()).unwrap();
262        file.flush().unwrap();
263        file
264    }
265
266    #[test]
267    fn test_parse_empty_file() {
268        let file = create_test_file("");
269        let parser = CodexSessionParser::new("machine-1");
270        let result = parser.parse_full(file.path(), "test-session").unwrap();
271
272        assert!(result.events.is_empty());
273        assert_eq!(result.lines_parsed, 0);
274    }
275
276    #[test]
277    fn test_parse_session_with_token_count() {
278        let content = r#"{"timestamp":"2025-11-27T01:55:56.451Z","type":"session_meta","payload":{"id":"test-123","timestamp":"2025-11-27T01:55:56.369Z","cwd":"/test"}}
279{"timestamp":"2025-11-27T01:56:10.186Z","type":"event_msg","payload":{"type":"token_count","info":{"last_token_usage":{"input_tokens":100,"cached_input_tokens":50,"output_tokens":25},"model_context_window":128000}}}"#;
280
281        let file = create_test_file(content);
282        let parser = CodexSessionParser::new("machine-1");
283        let result = parser.parse_full(file.path(), "test-session").unwrap();
284
285        assert_eq!(result.events.len(), 1);
286        assert!(result.session_meta.is_some());
287        assert_eq!(result.session_meta.as_ref().unwrap().id, "test-123");
288
289        let event = &result.events[0];
290        assert_eq!(event.tokens_input, Some(100));
291        assert_eq!(event.tokens_output, Some(25));
292        assert_eq!(event.tokens_cache_read, Some(50));
293    }
294
295    #[test]
296    fn test_parse_function_calls() {
297        let content = r#"{"timestamp":"2025-11-27T01:56:10.186Z","type":"response_item","payload":{"type":"function_call","name":"shell_command","arguments":"{\"command\":\"ls\"}","call_id":"call_1"}}
298{"timestamp":"2025-11-27T01:56:10.300Z","type":"response_item","payload":{"type":"function_call_output","call_id":"call_1","output":"file.txt"}}"#;
299
300        let file = create_test_file(content);
301        let parser = CodexSessionParser::new("machine-1");
302        let result = parser.parse_full(file.path(), "test-session").unwrap();
303
304        assert_eq!(result.events.len(), 2);
305        assert_eq!(
306            result.events[0].event_type,
307            crate::model::EventType::PreToolUse
308        );
309        assert_eq!(
310            result.events[1].event_type,
311            crate::model::EventType::PostToolUse
312        );
313    }
314
315    #[test]
316    fn test_pending_approval_generates_permission_request() {
317        // Function call with require_escalated but no output = waiting for approval
318        let content = r#"{"timestamp":"2025-11-27T01:56:10.186Z","type":"response_item","payload":{"type":"function_call","name":"shell_command","arguments":"{\"command\":\"rm /tmp/test\",\"sandbox_permissions\":\"require_escalated\"}","call_id":"call_1"}}"#;
319
320        let file = create_test_file(content);
321        let parser = CodexSessionParser::new("machine-1");
322        let result = parser.parse_full(file.path(), "test-session").unwrap();
323
324        // Should have PreToolUse + PermissionRequest
325        assert_eq!(result.events.len(), 2);
326        assert_eq!(
327            result.events[0].event_type,
328            crate::model::EventType::PreToolUse
329        );
330        assert_eq!(
331            result.events[1].event_type,
332            crate::model::EventType::PermissionRequest
333        );
334        // PermissionRequest should have the tool name
335        assert_eq!(
336            result.events[1].tool_name,
337            Some("shell_command".to_string())
338        );
339    }
340
341    #[test]
342    fn test_completed_approval_no_permission_request() {
343        // Function call with require_escalated AND output = completed, no PermissionRequest
344        let content = r#"{"timestamp":"2025-11-27T01:56:10.186Z","type":"response_item","payload":{"type":"function_call","name":"shell_command","arguments":"{\"command\":\"rm /tmp/test\",\"sandbox_permissions\":\"require_escalated\"}","call_id":"call_1"}}
345{"timestamp":"2025-11-27T01:56:10.300Z","type":"response_item","payload":{"type":"function_call_output","call_id":"call_1","output":"done"}}"#;
346
347        let file = create_test_file(content);
348        let parser = CodexSessionParser::new("machine-1");
349        let result = parser.parse_full(file.path(), "test-session").unwrap();
350
351        // Should have PreToolUse + PostToolUse only (no PermissionRequest)
352        assert_eq!(result.events.len(), 2);
353        assert_eq!(
354            result.events[0].event_type,
355            crate::model::EventType::PreToolUse
356        );
357        assert_eq!(
358            result.events[1].event_type,
359            crate::model::EventType::PostToolUse
360        );
361    }
362
363    #[test]
364    fn test_pending_call_without_escalation_no_permission_request() {
365        // Function call WITHOUT require_escalated and no output = executing (not waiting)
366        let content = r#"{"timestamp":"2025-11-27T01:56:10.186Z","type":"response_item","payload":{"type":"function_call","name":"shell_command","arguments":"{\"command\":\"ls\"}","call_id":"call_1"}}"#;
367
368        let file = create_test_file(content);
369        let parser = CodexSessionParser::new("machine-1");
370        let result = parser.parse_full(file.path(), "test-session").unwrap();
371
372        // Should have PreToolUse only (no PermissionRequest because no require_escalated)
373        assert_eq!(result.events.len(), 1);
374        assert_eq!(
375            result.events[0].event_type,
376            crate::model::EventType::PreToolUse
377        );
378    }
379
380    #[test]
381    fn test_incremental_parsing() {
382        let line1 = r#"{"timestamp":"2025-11-27T01:56:10.186Z","type":"event_msg","payload":{"type":"token_count","info":{"last_token_usage":{"input_tokens":100,"output_tokens":25}}}}"#;
383        let content = format!("{}\n", line1);
384
385        let file = create_test_file(&content);
386        let parser = CodexSessionParser::new("machine-1");
387
388        // First parse
389        let result1 = parser.parse_full(file.path(), "test-session").unwrap();
390        assert_eq!(result1.events.len(), 1);
391
392        // Append new content
393        let mut f = std::fs::OpenOptions::new()
394            .append(true)
395            .open(file.path())
396            .unwrap();
397        let line2 = r#"{"timestamp":"2025-11-27T01:56:11.000Z","type":"event_msg","payload":{"type":"token_count","info":{"last_token_usage":{"input_tokens":200,"output_tokens":50}}}}"#;
398        writeln!(f, "{}", line2).unwrap();
399
400        // Incremental parse
401        let result2 = parser
402            .parse_incremental(file.path(), "test-session", &result1.position)
403            .unwrap();
404        assert_eq!(result2.events.len(), 1);
405        assert_eq!(result2.events[0].tokens_input, Some(200));
406    }
407
408    #[test]
409    fn test_file_not_found() {
410        let parser = CodexSessionParser::new("machine-1");
411        let result = parser.parse_full(Path::new("/nonexistent/path.jsonl"), "test");
412
413        assert!(matches!(result, Err(TranscriptError::NotFound(_))));
414    }
415
416    #[test]
417    fn test_parse_errors_tracked() {
418        let content = r#"{"timestamp":"2025-11-27T01:56:10.186Z","type":"event_msg","payload":{"type":"token_count","info":{"last_token_usage":{"input_tokens":100,"output_tokens":25}}}}
419not valid json
420{"incomplete"#;
421
422        let file = create_test_file(content);
423        let parser = CodexSessionParser::new("machine-1");
424        let result = parser.parse_full(file.path(), "test-session").unwrap();
425
426        assert_eq!(result.events.len(), 1);
427        assert_eq!(result.parse_errors, 2);
428    }
429
430    #[test]
431    fn test_user_message_parsing() {
432        let content = r#"{"timestamp":"2025-11-27T01:56:07.612Z","type":"event_msg","payload":{"type":"user_message","message":"Hello!","images":[]}}"#;
433
434        let file = create_test_file(content);
435        let parser = CodexSessionParser::new("machine-1");
436        let result = parser.parse_full(file.path(), "test-session").unwrap();
437
438        assert_eq!(result.events.len(), 1);
439        assert_eq!(
440            result.events[0].event_type,
441            crate::model::EventType::UserPromptSubmit
442        );
443    }
444}