agtrace_providers/codex/
io.rs

1use crate::Result;
2use agtrace_types::SpawnContext;
3use std::io::{BufRead, BufReader};
4use std::path::Path;
5
6use super::parser::normalize_codex_session;
7use super::schema::{CodexRecord, EventMsgPayload};
8
9/// Parse Codex JSONL file and normalize to AgentEvent
10pub fn normalize_codex_file(path: &Path) -> Result<Vec<agtrace_types::AgentEvent>> {
11    let text = std::fs::read_to_string(path)?;
12
13    let mut records: Vec<CodexRecord> = Vec::new();
14    let mut session_id_from_meta: Option<String> = None;
15    let mut subagent_type: Option<String> = None;
16
17    for line in text.lines() {
18        let line = line.trim();
19        if line.is_empty() {
20            continue;
21        }
22        let record: CodexRecord = serde_json::from_str(line)?;
23
24        // Extract session_id and subagent_type from session_meta record
25        if let CodexRecord::SessionMeta(ref meta) = record {
26            session_id_from_meta = Some(meta.payload.id.clone());
27            // Extract subagent information from source field
28            if let super::schema::SessionSource::Subagent { subagent } = &meta.payload.source {
29                subagent_type = Some(subagent.clone());
30            }
31        }
32
33        records.push(record);
34    }
35
36    // session_id should be extracted from file content, fallback to "unknown-session"
37    let session_id = session_id_from_meta.unwrap_or_else(|| "unknown-session".to_string());
38
39    Ok(normalize_codex_session(records, &session_id, subagent_type))
40}
41
42/// Extract cwd from a Codex session file by reading the first few records
43pub fn extract_cwd_from_codex_file(path: &Path) -> Option<String> {
44    let file = std::fs::File::open(path).ok()?;
45    let reader = BufReader::new(file);
46
47    for line in reader.lines().take(10).flatten() {
48        if let Ok(record) = serde_json::from_str::<CodexRecord>(&line) {
49            match record {
50                CodexRecord::SessionMeta(meta) => {
51                    return Some(meta.payload.cwd.clone());
52                }
53                CodexRecord::TurnContext(turn) => {
54                    return Some(turn.payload.cwd.clone());
55                }
56                _ => continue,
57            }
58        }
59    }
60    None
61}
62
63/// Spawn event extracted from a CLI session (e.g., entered_review_mode)
64#[derive(Debug, Clone)]
65pub struct SpawnEvent {
66    pub timestamp: String,
67    pub subagent_type: String,
68    pub spawn_context: SpawnContext,
69}
70
71#[derive(Debug)]
72pub struct CodexHeader {
73    pub session_id: Option<String>,
74    pub cwd: Option<String>,
75    pub timestamp: Option<String>,
76    pub snippet: Option<String>,
77    pub subagent_type: Option<String>,
78    pub parent_session_id: Option<String>,
79    /// Pre-computed spawn context for subagent sessions (set during discovery correlation)
80    pub spawned_by: Option<SpawnContext>,
81}
82
83/// Extract header information from Codex file (for scanning)
84pub fn extract_codex_header(path: &Path) -> Result<CodexHeader> {
85    let file = std::fs::File::open(path)?;
86    let reader = BufReader::new(file);
87
88    let mut session_id = None;
89    let mut cwd = None;
90    let mut timestamp = None;
91    let mut snippet = None;
92    let mut subagent_type = None;
93    let parent_session_id = None; // Not mutated (future use for Codex parent tracking)
94
95    for line in reader.lines().take(20).flatten() {
96        if let Ok(record) = serde_json::from_str::<CodexRecord>(&line) {
97            match &record {
98                CodexRecord::SessionMeta(meta) => {
99                    if session_id.is_none() {
100                        session_id = Some(meta.payload.id.clone());
101                    }
102                    if cwd.is_none() {
103                        cwd = Some(meta.payload.cwd.clone());
104                    }
105                    if timestamp.is_none() {
106                        timestamp = Some(meta.timestamp.clone());
107                    }
108                    // Extract subagent information from source field
109                    if subagent_type.is_none()
110                        && let super::schema::SessionSource::Subagent { subagent } =
111                            &meta.payload.source
112                    {
113                        subagent_type = Some(subagent.clone());
114                    }
115                }
116                CodexRecord::TurnContext(turn) => {
117                    if cwd.is_none() {
118                        cwd = Some(turn.payload.cwd.clone());
119                    }
120                    if timestamp.is_none() {
121                        timestamp = Some(turn.timestamp.clone());
122                    }
123                }
124                CodexRecord::EventMsg(event) => {
125                    if timestamp.is_none() {
126                        timestamp = Some(event.timestamp.clone());
127                    }
128                    if snippet.is_none()
129                        && let super::schema::EventMsgPayload::UserMessage(msg) = &event.payload
130                    {
131                        snippet = Some(agtrace_types::truncate(&msg.message, 200));
132                    }
133                }
134                CodexRecord::ResponseItem(response) => {
135                    if timestamp.is_none() {
136                        timestamp = Some(response.timestamp.clone());
137                    }
138                    if snippet.is_none()
139                        && let super::schema::ResponseItemPayload::Message(msg) = &response.payload
140                        && msg.role == "user"
141                    {
142                        let text = msg.content.iter().find_map(|c| match c {
143                            super::schema::MessageContent::InputText { text } => {
144                                Some(agtrace_types::truncate(text, 200))
145                            }
146                            super::schema::MessageContent::OutputText { text } => {
147                                Some(agtrace_types::truncate(text, 200))
148                            }
149                            _ => None,
150                        });
151                        if let Some(t) = &text
152                            && !t.contains("<environment_context>")
153                        {
154                            snippet = text;
155                        }
156                    }
157                }
158                _ => {}
159            }
160
161            if session_id.is_some() && cwd.is_some() && timestamp.is_some() && snippet.is_some() {
162                break;
163            }
164        }
165    }
166
167    Ok(CodexHeader {
168        session_id,
169        cwd,
170        timestamp,
171        snippet,
172        subagent_type,
173        parent_session_id,
174        spawned_by: None, // Set during discovery correlation
175    })
176}
177
178/// Extract spawn events from a CLI session file with turn/step context
179/// Used to correlate subagent sessions back to their parent turns
180pub fn extract_spawn_events(path: &Path) -> Result<Vec<SpawnEvent>> {
181    let text = std::fs::read_to_string(path)?;
182    let mut spawn_events = Vec::new();
183
184    // Track turn/step indices
185    // A new turn starts with TurnContext or UserMessage
186    let mut current_turn: usize = 0;
187    let mut current_step: usize = 0;
188    let mut in_turn = false;
189
190    for line in text.lines() {
191        let line = line.trim();
192        if line.is_empty() {
193            continue;
194        }
195
196        let record: CodexRecord = match serde_json::from_str(line) {
197            Ok(r) => r,
198            Err(_) => continue,
199        };
200
201        match &record {
202            CodexRecord::TurnContext(_) => {
203                // New turn starts
204                if in_turn {
205                    current_turn += 1;
206                }
207                current_step = 0;
208                in_turn = true;
209            }
210            CodexRecord::EventMsg(event) => {
211                match &event.payload {
212                    EventMsgPayload::UserMessage(_) => {
213                        // User message also starts a new turn (if no TurnContext)
214                        if in_turn {
215                            current_turn += 1;
216                            current_step = 0;
217                        }
218                        in_turn = true;
219                    }
220                    EventMsgPayload::EnteredReviewMode(_) => {
221                        // Found a spawn event!
222                        spawn_events.push(SpawnEvent {
223                            timestamp: event.timestamp.clone(),
224                            subagent_type: "review".to_string(),
225                            spawn_context: SpawnContext {
226                                turn_index: current_turn,
227                                step_index: current_step,
228                            },
229                        });
230                        current_step += 1;
231                    }
232                    _ => {
233                        // Other events increment step within current turn
234                        if in_turn {
235                            current_step += 1;
236                        }
237                    }
238                }
239            }
240            CodexRecord::ResponseItem(_) => {
241                // Response items are part of current step
242                if in_turn {
243                    current_step += 1;
244                }
245            }
246            _ => {}
247        }
248    }
249
250    Ok(spawn_events)
251}
252
253/// Check if a Codex session file is empty or incomplete
254pub fn is_empty_codex_session(path: &Path) -> bool {
255    let Ok(file) = std::fs::File::open(path) else {
256        return true;
257    };
258    let reader = BufReader::new(file);
259
260    let mut line_count = 0;
261    let mut has_event = false;
262
263    for line in reader.lines().take(20).flatten() {
264        line_count += 1;
265        if let Ok(record) = serde_json::from_str::<CodexRecord>(&line) {
266            match record {
267                CodexRecord::SessionMeta(_) | CodexRecord::TurnContext(_) => {
268                    has_event = true;
269                    break;
270                }
271                CodexRecord::EventMsg(_) | CodexRecord::ResponseItem(_) => {
272                    has_event = true;
273                    break;
274                }
275                _ => {}
276            }
277        }
278    }
279
280    line_count <= 2 && !has_event
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286
287    #[test]
288    fn test_extract_subagent_header() {
289        // Create a temporary file with subagent session_meta
290        let tmpfile = std::env::temp_dir().join("test_subagent.jsonl");
291        std::fs::write(&tmpfile, r#"{"timestamp":"2025-01-01T00:00:00Z","type":"session_meta","payload":{"id":"test-id","timestamp":"2025-01-01T00:00:00Z","cwd":"/test","originator":"test","cli_version":"1.0.0","source":{"subagent":"review"}}}
292"#).unwrap();
293
294        let header = extract_codex_header(&tmpfile).unwrap();
295
296        assert_eq!(header.session_id, Some("test-id".to_string()));
297        assert_eq!(header.subagent_type, Some("review".to_string()));
298        assert!(header.parent_session_id.is_none());
299
300        std::fs::remove_file(&tmpfile).unwrap();
301    }
302}