Skip to main content

lifeloop/telemetry/
codex.rs

1//! Codex CLI lifecycle telemetry reader.
2//!
3//! Parses Codex session JSONL emitting `event_msg` / `token_count`
4//! payloads and extracts the lifecycle kernel: latest prompt-token
5//! count, model name, and the natively reported context window.
6
7use std::collections::HashSet;
8use std::fs;
9use std::io::{BufRead, BufReader};
10use std::path::{Path, PathBuf};
11
12use serde::Deserialize;
13use serde_json::Value;
14
15use super::{
16    EnvAlias, PressureObservation, TelemetryError, TelemetryResult, TokenUsage, compute_pct,
17    file_mtime_epoch_s, general_context_window, general_host_model, home_dir, read_file_bounded,
18    resolve_env_string, string_key,
19};
20
21const ADAPTER_ID: &str = "codex";
22const MAX_SESSION_SCAN_DEPTH: usize = 8;
23const MAX_SESSION_SCAN_ENTRIES: usize = 10_000;
24
25const CODEX_HOME_ALIASES: &[EnvAlias] = &[EnvAlias {
26    lifeloop: "LIFELOOP_CODEX_HOME",
27    ccd_compat: "CODEX_HOME",
28}];
29
30const CODEX_THREAD_ID_ALIASES: &[EnvAlias] = &[EnvAlias {
31    lifeloop: "LIFELOOP_CODEX_THREAD_ID",
32    ccd_compat: "CODEX_THREAD_ID",
33}];
34
35const CODEX_MODEL_ALIASES: &[EnvAlias] = &[EnvAlias {
36    lifeloop: "LIFELOOP_CODEX_MODEL",
37    ccd_compat: "CCD_CODEX_MODEL",
38}];
39
40/// Probe the active Codex session log for the configured thread id.
41/// Returns `Ok(None)` when no thread id is configured or the session
42/// log has no `token_count` events yet.
43pub fn current() -> TelemetryResult<Option<PressureObservation>> {
44    let Some(thread_id) = resolve_env_string(CODEX_THREAD_ID_ALIASES) else {
45        return Ok(None);
46    };
47
48    let codex_home = codex_home()?;
49    let Some(session_path) = find_session_log(&codex_home.join("sessions"), &thread_id)? else {
50        return Ok(None);
51    };
52
53    let observed_at_epoch_s = match file_mtime_epoch_s(&session_path)? {
54        Some(epoch_s) => epoch_s,
55        None => return Ok(None),
56    };
57
58    let bytes = read_file_bounded(&session_path, "Codex session log")?;
59    parse_session_log(&bytes, observed_at_epoch_s)
60}
61
62/// Parse a Codex session log byte slice. Public for tests and for
63/// callers that read the file themselves.
64pub fn parse_session_log(
65    bytes: &[u8],
66    observed_at_epoch_s: u64,
67) -> TelemetryResult<Option<PressureObservation>> {
68    let reader = BufReader::new(bytes);
69    let mut latest: Option<PressureObservation> = None;
70    let mut latest_model = resolve_env_string(CODEX_MODEL_ALIASES).or_else(general_host_model);
71
72    for line in reader.lines() {
73        let line = line.map_err(TelemetryError::from)?;
74        let value: Value = match serde_json::from_str(&line) {
75            Ok(v) => v,
76            Err(_) => continue,
77        };
78        if let Some(model_name) = string_key(
79            &value,
80            &[
81                "model",
82                "model_name",
83                "modelName",
84                "model_slug",
85                "modelSlug",
86            ],
87        ) {
88            latest_model = Some(model_name);
89        }
90        let entry: SessionEntry = match serde_json::from_value(value) {
91            Ok(e) => e,
92            Err(_) => continue,
93        };
94        if entry.entry_type != "event_msg" {
95            continue;
96        }
97        let Some(payload) = entry.payload else {
98            continue;
99        };
100        if payload.payload_type != "token_count" {
101            continue;
102        }
103
104        let native_window = match payload.info.model_context_window {
105            0 => None,
106            value => Some(value),
107        };
108        let context_window = native_window.or_else(general_context_window);
109
110        let total_tokens = payload
111            .info
112            .last_token_usage
113            .as_ref()
114            .map(|u| u.total_tokens)
115            .unwrap_or(payload.info.total_token_usage.total_tokens);
116
117        latest = Some(PressureObservation {
118            adapter_id: ADAPTER_ID.into(),
119            adapter_version: None,
120            observed_at_epoch_s,
121            model_name: latest_model.clone(),
122            total_tokens: Some(total_tokens),
123            context_window_tokens: context_window,
124            context_used_pct: compute_pct(total_tokens, context_window),
125            compaction_signal: None,
126            usage: TokenUsage {
127                blended_total_tokens: Some(payload.info.total_token_usage.total_tokens),
128                ..TokenUsage::default()
129            },
130        });
131    }
132
133    Ok(latest)
134}
135
136fn codex_home() -> TelemetryResult<PathBuf> {
137    if let Some(path) = resolve_env_string(CODEX_HOME_ALIASES) {
138        return Ok(PathBuf::from(path));
139    }
140    Ok(home_dir()?.join(".codex"))
141}
142
143fn find_session_log(root: &Path, thread_id: &str) -> TelemetryResult<Option<PathBuf>> {
144    let root_meta = match fs::symlink_metadata(root) {
145        Ok(meta) => meta,
146        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
147        Err(e) => return Err(TelemetryError::from(e)),
148    };
149    if root_meta.file_type().is_symlink() || !root_meta.is_dir() {
150        return Ok(None);
151    }
152    let mut visited = HashSet::new();
153    let mut stack = vec![(root.to_path_buf(), 0usize)];
154    let mut entries_seen = 0usize;
155    while let Some((dir, depth)) = stack.pop() {
156        let canonical = fs::canonicalize(&dir).map_err(TelemetryError::from)?;
157        if !visited.insert(canonical) {
158            continue;
159        }
160        for entry in fs::read_dir(&dir).map_err(TelemetryError::from)? {
161            let entry = entry.map_err(TelemetryError::from)?;
162            entries_seen += 1;
163            if entries_seen > MAX_SESSION_SCAN_ENTRIES {
164                return Err(TelemetryError::Unavailable(format!(
165                    "Codex session search exceeded {MAX_SESSION_SCAN_ENTRIES} entries under {}",
166                    root.display()
167                )));
168            }
169            let path = entry.path();
170            let file_type = entry.file_type().map_err(TelemetryError::from)?;
171            if file_type.is_symlink() {
172                continue;
173            }
174            if file_type.is_dir() {
175                if depth < MAX_SESSION_SCAN_DEPTH {
176                    stack.push((path, depth + 1));
177                }
178                continue;
179            }
180            if !file_type.is_file() {
181                continue;
182            }
183            let matches = path
184                .file_name()
185                .and_then(|n| n.to_str())
186                .map(|n| n.contains(thread_id) && n.ends_with(".jsonl"))
187                .unwrap_or(false);
188            if matches {
189                return Ok(Some(path));
190            }
191        }
192    }
193    Ok(None)
194}
195
196#[derive(Deserialize)]
197struct SessionEntry {
198    #[serde(rename = "type")]
199    entry_type: String,
200    payload: Option<TokenCountPayload>,
201}
202
203#[derive(Deserialize)]
204struct TokenCountPayload {
205    #[serde(rename = "type")]
206    payload_type: String,
207    info: TokenCountInfo,
208}
209
210#[derive(Deserialize)]
211struct TokenCountInfo {
212    total_token_usage: TotalTokenUsage,
213    last_token_usage: Option<TotalTokenUsage>,
214    #[serde(default)]
215    model_context_window: u64,
216}
217
218#[derive(Deserialize)]
219struct TotalTokenUsage {
220    total_tokens: u64,
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226
227    #[cfg(unix)]
228    #[test]
229    fn find_session_log_skips_symlink_directories() {
230        let root = tempfile::tempdir().expect("temp root");
231        let outside = tempfile::tempdir().expect("temp outside");
232        fs::write(outside.path().join("thread-123.jsonl"), "{}\n").expect("outside log");
233        std::os::unix::fs::symlink(outside.path(), root.path().join("linked"))
234            .expect("symlink dir");
235
236        let found = find_session_log(root.path(), "thread-123").expect("search ok");
237        assert!(
238            found.is_none(),
239            "symlinked session dirs must not be followed"
240        );
241    }
242
243    #[test]
244    fn find_session_log_finds_regular_nested_file() {
245        let root = tempfile::tempdir().expect("temp root");
246        let nested = root.path().join("2026").join("05");
247        fs::create_dir_all(&nested).expect("nested dirs");
248        let expected = nested.join("thread-abc.jsonl");
249        fs::write(&expected, "{}\n").expect("session log");
250
251        let found = find_session_log(root.path(), "thread-abc").expect("search ok");
252        assert_eq!(found.as_deref(), Some(expected.as_path()));
253    }
254}