agtrace_providers/codex/
discovery.rs

1use crate::traits::{LogDiscovery, ProbeResult, SessionIndex};
2use crate::{Error, Result};
3use agtrace_types::SpawnContext;
4use chrono::DateTime;
5use std::collections::HashMap;
6use std::path::{Path, PathBuf};
7use walkdir::WalkDir;
8
9use super::io::{SpawnEvent, extract_codex_header, extract_spawn_events, is_empty_codex_session};
10
11pub struct CodexDiscovery;
12
13impl LogDiscovery for CodexDiscovery {
14    fn id(&self) -> &'static str {
15        "codex"
16    }
17
18    fn probe(&self, path: &Path) -> ProbeResult {
19        if !path.is_file() {
20            return ProbeResult::NoMatch;
21        }
22
23        if let Ok(metadata) = std::fs::metadata(path)
24            && metadata.len() == 0
25        {
26            return ProbeResult::NoMatch;
27        }
28
29        let is_jsonl = path.extension().is_some_and(|e| e == "jsonl");
30        let filename = path.file_name().and_then(|f| f.to_str()).unwrap_or("");
31
32        if is_jsonl && filename.starts_with("rollout-") && !is_empty_codex_session(path) {
33            ProbeResult::match_high()
34        } else {
35            ProbeResult::NoMatch
36        }
37    }
38
39    fn resolve_log_root(&self, _project_root: &Path) -> Option<PathBuf> {
40        None
41    }
42
43    fn scan_sessions(&self, log_root: &Path) -> Result<Vec<SessionIndex>> {
44        if !log_root.exists() {
45            return Ok(Vec::new());
46        }
47
48        // Phase 1: Collect all sessions, separating CLI and subagent sessions
49        let mut cli_sessions: Vec<(SessionIndex, PathBuf)> = Vec::new();
50        let mut subagent_sessions: Vec<(SessionIndex, String)> = Vec::new(); // (session, timestamp)
51
52        for entry in WalkDir::new(log_root).into_iter().filter_map(|e| e.ok()) {
53            let path = entry.path();
54
55            if self.probe(path) == ProbeResult::NoMatch {
56                continue;
57            }
58
59            let header = match extract_codex_header(path) {
60                Ok(h) => h,
61                Err(_) => continue,
62            };
63
64            let session_id = match header.session_id {
65                Some(id) => id,
66                None => continue,
67            };
68
69            let session = SessionIndex {
70                session_id: session_id.clone(),
71                timestamp: header.timestamp.clone(),
72                latest_mod_time: None,
73                main_file: path.to_path_buf(),
74                sidechain_files: Vec::new(),
75                project_root: header.cwd.clone().map(PathBuf::from),
76                snippet: header.snippet.clone(),
77                parent_session_id: None,
78                spawned_by: None,
79            };
80
81            if header.subagent_type.is_some() {
82                // Subagent session - store with timestamp for correlation
83                let ts = header.timestamp.clone().unwrap_or_default();
84                subagent_sessions.push((session, ts));
85            } else {
86                // CLI session - store with path for spawn event extraction
87                cli_sessions.push((session, path.to_path_buf()));
88            }
89        }
90
91        // Phase 2: Build spawn event map from CLI sessions
92        // Map: (parent_session_id, spawn_timestamp) -> SpawnContext
93        let mut spawn_map: HashMap<String, Vec<SpawnEvent>> = HashMap::new();
94
95        for (session, path) in &cli_sessions {
96            if let Ok(spawn_events) = extract_spawn_events(path)
97                && !spawn_events.is_empty()
98            {
99                spawn_map.insert(session.session_id.clone(), spawn_events);
100            }
101        }
102
103        // Phase 3: Correlate subagent sessions to parent spawn events
104        for (session, subagent_ts) in &mut subagent_sessions {
105            if let Some((parent_id, spawn_ctx)) =
106                find_matching_spawn(&spawn_map, subagent_ts, session.project_root.as_ref())
107            {
108                session.parent_session_id = Some(parent_id);
109                session.spawned_by = Some(spawn_ctx);
110            }
111        }
112
113        // Combine all sessions
114        let mut all_sessions: HashMap<String, SessionIndex> = HashMap::new();
115
116        for (session, _) in cli_sessions {
117            all_sessions.insert(session.session_id.clone(), session);
118        }
119        for (session, _) in subagent_sessions {
120            all_sessions.insert(session.session_id.clone(), session);
121        }
122
123        // Compute latest_mod_time for each session
124        for session in all_sessions.values_mut() {
125            let all_files = vec![session.main_file.as_path()];
126            session.latest_mod_time = crate::get_latest_mod_time_rfc3339(&all_files);
127        }
128
129        Ok(all_sessions.into_values().collect())
130    }
131
132    fn extract_session_id(&self, path: &Path) -> Result<String> {
133        let header = extract_codex_header(path)?;
134        header
135            .session_id
136            .ok_or_else(|| Error::Parse(format!("No session_id in file: {}", path.display())))
137    }
138
139    fn extract_project_hash(&self, path: &Path) -> Result<Option<agtrace_types::ProjectHash>> {
140        let header = extract_codex_header(path)?;
141        Ok(header
142            .cwd
143            .map(|cwd| agtrace_core::project_hash_from_root(&cwd)))
144    }
145
146    fn find_session_files(&self, log_root: &Path, session_id: &str) -> Result<Vec<PathBuf>> {
147        let mut matching_files = Vec::new();
148
149        for entry in WalkDir::new(log_root).into_iter().filter_map(|e| e.ok()) {
150            let path = entry.path();
151
152            if self.probe(path) == ProbeResult::NoMatch {
153                continue;
154            }
155
156            if let Ok(header) = extract_codex_header(path)
157                && header.session_id.as_deref() == Some(session_id)
158            {
159                matching_files.push(path.to_path_buf());
160            }
161        }
162
163        Ok(matching_files)
164    }
165
166    fn is_sidechain_file(&self, path: &Path) -> Result<bool> {
167        let header = extract_codex_header(path)?;
168        Ok(header.subagent_type.is_some())
169    }
170}
171
172/// Find a matching spawn event for a subagent based on timestamp correlation.
173/// Returns (parent_session_id, SpawnContext) if found within 100ms window.
174fn find_matching_spawn(
175    spawn_map: &HashMap<String, Vec<SpawnEvent>>,
176    subagent_ts: &str,
177    subagent_project: Option<&PathBuf>,
178) -> Option<(String, SpawnContext)> {
179    let subagent_dt = DateTime::parse_from_rfc3339(subagent_ts).ok()?;
180
181    // Maximum time difference for correlation (100ms)
182    const MAX_DIFF_MS: i64 = 100;
183
184    for (parent_id, spawn_events) in spawn_map {
185        for spawn in spawn_events {
186            let spawn_dt = match DateTime::parse_from_rfc3339(&spawn.timestamp) {
187                Ok(dt) => dt,
188                Err(_) => continue,
189            };
190
191            // Calculate time difference in milliseconds
192            let diff = (subagent_dt.timestamp_millis() - spawn_dt.timestamp_millis()).abs();
193
194            if diff <= MAX_DIFF_MS {
195                // Found a match within the time window
196                return Some((parent_id.clone(), spawn.spawn_context.clone()));
197            }
198        }
199    }
200
201    // No match found - subagent might be from a different parent or standalone
202    let _ = subagent_project; // Future: could also match by project_root
203    None
204}