Skip to main content

ccstat_provider_codex/
data_loader.rs

1//! Codex data loader
2//!
3//! Discovers and parses Codex session JSONL files from `~/.codex/sessions/`.
4//! Codex uses cumulative token counts that must be converted to per-event deltas.
5
6use async_trait::async_trait;
7use ccstat_core::error::{CcstatError, Result};
8use ccstat_core::provider::ProviderDataLoader;
9use ccstat_core::types::{ISOTimestamp, ModelName, SessionId, TokenCounts, UsageEntry};
10use chrono::DateTime;
11use futures::stream::Stream;
12use serde::Deserialize;
13use std::collections::HashMap;
14use std::path::PathBuf;
15use std::pin::Pin;
16use tokio::io::{AsyncBufReadExt, BufReader};
17use tracing::{debug, warn};
18
19/// Default model name when no turn_context provides one.
20const FALLBACK_MODEL: &str = "gpt-5";
21
22/// Data loader for Codex usage data.
23pub struct DataLoader {
24    session_dir: PathBuf,
25}
26
27#[async_trait]
28impl ProviderDataLoader for DataLoader {
29    async fn new() -> Result<Self> {
30        let base = if let Ok(home) = std::env::var("CODEX_HOME") {
31            PathBuf::from(home)
32        } else {
33            dirs::home_dir()
34                .ok_or_else(|| CcstatError::Config("Cannot determine home directory".into()))?
35                .join(".codex")
36        };
37
38        let session_dir = base.join("sessions");
39        if !session_dir.exists() {
40            debug!(
41                "Codex sessions directory not found: {}",
42                session_dir.display()
43            );
44        }
45
46        Ok(DataLoader { session_dir })
47    }
48
49    fn load_entries(&self) -> Pin<Box<dyn Stream<Item = Result<UsageEntry>> + Send + '_>> {
50        Box::pin(async_stream::try_stream! {
51            if !self.session_dir.exists() {
52                return;
53            }
54
55            let mut jsonl_files = Vec::new();
56            for entry in walkdir::WalkDir::new(&self.session_dir)
57                .min_depth(1)
58                .max_depth(1)
59                .into_iter()
60                .filter_map(|e| e.ok())
61            {
62                let path = entry.path().to_path_buf();
63                if path.extension().is_some_and(|ext| ext == "jsonl") {
64                    jsonl_files.push(path);
65                }
66            }
67
68            debug!("Found {} Codex session files", jsonl_files.len());
69
70            for path in jsonl_files {
71                let session_id = path
72                    .file_stem()
73                    .and_then(|s| s.to_str())
74                    .unwrap_or("unknown")
75                    .to_string();
76
77                let entries = parse_session_file(&path, &session_id).await;
78                match entries {
79                    Ok(parsed) => {
80                        for entry in parsed {
81                            yield entry;
82                        }
83                    }
84                    Err(e) => {
85                        warn!("Failed to parse Codex session {}: {}", session_id, e);
86                    }
87                }
88            }
89        })
90    }
91}
92
93// ---------------------------------------------------------------------------
94// JSONL event types
95// ---------------------------------------------------------------------------
96
97#[derive(Deserialize)]
98struct CodexEvent {
99    #[serde(rename = "type")]
100    event_type: String,
101    timestamp: Option<String>,
102    #[serde(default)]
103    model_id: Option<String>,
104    #[serde(default)]
105    payload: Option<EventPayload>,
106}
107
108#[derive(Deserialize)]
109struct EventPayload {
110    #[serde(rename = "type")]
111    payload_type: Option<String>,
112    #[serde(default)]
113    info: Option<TokenInfo>,
114}
115
116#[derive(Deserialize)]
117struct TokenInfo {
118    total_token_usage: Option<CumulativeTokens>,
119    last_token_usage: Option<CumulativeTokens>,
120}
121
122#[derive(Deserialize, Clone, Default)]
123#[allow(dead_code)]
124struct CumulativeTokens {
125    #[serde(default)]
126    input_tokens: u64,
127    #[serde(default)]
128    cached_input_tokens: u64,
129    #[serde(alias = "cache_read_input_tokens")]
130    #[serde(default)]
131    cache_read_tokens: u64,
132    #[serde(default)]
133    output_tokens: u64,
134    #[serde(default)]
135    total_tokens: u64,
136}
137
138// ---------------------------------------------------------------------------
139// Session file parsing
140// ---------------------------------------------------------------------------
141
142async fn parse_session_file(path: &PathBuf, session_id: &str) -> Result<Vec<UsageEntry>> {
143    let file = tokio::fs::File::open(path).await.map_err(|e| {
144        CcstatError::Io(std::io::Error::new(
145            e.kind(),
146            format!("{}: {}", path.display(), e),
147        ))
148    })?;
149    let reader = BufReader::new(file);
150    let mut lines = reader.lines();
151
152    let mut entries = Vec::new();
153    let mut current_model: Option<String> = None;
154    let mut prev_cumulative = HashMap::<String, CumulativeTokens>::new();
155
156    while let Some(line) = lines.next_line().await? {
157        if line.trim().is_empty() {
158            continue;
159        }
160
161        let event: CodexEvent = match serde_json::from_str(&line) {
162            Ok(e) => e,
163            Err(_) => continue,
164        };
165
166        match event.event_type.as_str() {
167            "turn_context" => {
168                if let Some(model) = event.model_id {
169                    current_model = Some(normalize_model(&model));
170                }
171            }
172            "event_msg" => {
173                let Some(payload) = &event.payload else {
174                    continue;
175                };
176                if payload.payload_type.as_deref() != Some("token_count") {
177                    continue;
178                }
179                let Some(info) = &payload.info else {
180                    continue;
181                };
182                let Some(timestamp_str) = &event.timestamp else {
183                    continue;
184                };
185
186                let timestamp = match DateTime::parse_from_rfc3339(timestamp_str) {
187                    Ok(dt) => ISOTimestamp::new(dt.to_utc()),
188                    Err(_) => {
189                        warn!(
190                            "Invalid timestamp in Codex session {}: {}",
191                            session_id, timestamp_str
192                        );
193                        continue;
194                    }
195                };
196
197                let model_name = current_model
198                    .as_deref()
199                    .unwrap_or(FALLBACK_MODEL)
200                    .to_string();
201
202                // Compute delta tokens
203                let delta = compute_delta(info, prev_cumulative.get(&model_name));
204
205                // Update cumulative state
206                if let Some(total) = &info.total_token_usage {
207                    prev_cumulative.insert(model_name.clone(), total.clone());
208                }
209
210                // Skip zero-token entries
211                if delta.input_tokens == 0 && delta.output_tokens == 0 {
212                    continue;
213                }
214
215                entries.push(UsageEntry {
216                    session_id: SessionId::new(session_id.to_string()),
217                    timestamp,
218                    model: ModelName::new(model_name),
219                    tokens: delta,
220                    total_cost: None,
221                    project: None,
222                    instance_id: None,
223                });
224            }
225            _ => {}
226        }
227    }
228
229    Ok(entries)
230}
231
232/// Compute per-event delta tokens from cumulative or last_token_usage data.
233fn compute_delta(info: &TokenInfo, prev: Option<&CumulativeTokens>) -> TokenCounts {
234    // Prefer last_token_usage when available (already a delta)
235    if let Some(last) = &info.last_token_usage {
236        let cache_read = if last.cached_input_tokens > 0 {
237            last.cached_input_tokens
238        } else {
239            last.cache_read_tokens
240        };
241        return TokenCounts::new(last.input_tokens, last.output_tokens, 0, cache_read);
242    }
243
244    // Fall back to cumulative-to-delta conversion
245    if let Some(total) = &info.total_token_usage {
246        let cache_read_total = if total.cached_input_tokens > 0 {
247            total.cached_input_tokens
248        } else {
249            total.cache_read_tokens
250        };
251
252        let (prev_input, prev_output, prev_cache_read) = match prev {
253            Some(p) => {
254                let p_cache = if p.cached_input_tokens > 0 {
255                    p.cached_input_tokens
256                } else {
257                    p.cache_read_tokens
258                };
259                (p.input_tokens, p.output_tokens, p_cache)
260            }
261            None => (0, 0, 0),
262        };
263
264        TokenCounts::new(
265            total.input_tokens.saturating_sub(prev_input),
266            total.output_tokens.saturating_sub(prev_output),
267            0,
268            cache_read_total.saturating_sub(prev_cache_read),
269        )
270    } else {
271        TokenCounts::new(0, 0, 0, 0)
272    }
273}
274
275/// Normalize Codex model names (gpt-5-codex → gpt-5).
276fn normalize_model(model: &str) -> String {
277    if model == "gpt-5-codex" {
278        "gpt-5".to_string()
279    } else {
280        model.to_string()
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287    use std::io::Write;
288    use tempfile::TempDir;
289
290    fn make_token_event(ts: &str, input: u64, output: u64, cached: u64) -> String {
291        format!(
292            r#"{{"type":"event_msg","timestamp":"{}","payload":{{"type":"token_count","info":{{"total_token_usage":{{"input_tokens":{},"cached_input_tokens":{},"output_tokens":{},"total_tokens":{}}}}}}}}}"#,
293            ts,
294            input,
295            cached,
296            output,
297            input + output
298        )
299    }
300
301    fn make_token_event_with_last(
302        ts: &str,
303        total_input: u64,
304        total_output: u64,
305        last_input: u64,
306        last_output: u64,
307    ) -> String {
308        format!(
309            r#"{{"type":"event_msg","timestamp":"{}","payload":{{"type":"token_count","info":{{"total_token_usage":{{"input_tokens":{},"output_tokens":{},"total_tokens":{}}},"last_token_usage":{{"input_tokens":{},"output_tokens":{},"total_tokens":{}}}}}}}}}"#,
310            ts,
311            total_input,
312            total_output,
313            total_input + total_output,
314            last_input,
315            last_output,
316            last_input + last_output,
317        )
318    }
319
320    fn make_turn_context(model: &str) -> String {
321        format!(r#"{{"type":"turn_context","model_id":"{}"}}"#, model)
322    }
323
324    #[tokio::test]
325    async fn test_cumulative_to_delta() {
326        let dir = TempDir::new().unwrap();
327        let sessions_dir = dir.path().join("sessions");
328        std::fs::create_dir_all(&sessions_dir).unwrap();
329
330        let session_file = sessions_dir.join("test-session.jsonl");
331        let mut f = std::fs::File::create(&session_file).unwrap();
332
333        writeln!(f, "{}", make_turn_context("gpt-5")).unwrap();
334        // First event: cumulative 100 input, 50 output
335        writeln!(
336            f,
337            "{}",
338            make_token_event("2025-01-01T10:00:00Z", 100, 50, 0)
339        )
340        .unwrap();
341        // Second event: cumulative 300 input, 150 output → delta = 200 input, 100 output
342        writeln!(
343            f,
344            "{}",
345            make_token_event("2025-01-01T10:05:00Z", 300, 150, 0)
346        )
347        .unwrap();
348
349        let entries = parse_session_file(&session_file, "test-session")
350            .await
351            .unwrap();
352        assert_eq!(entries.len(), 2);
353        assert_eq!(entries[0].tokens.input_tokens, 100);
354        assert_eq!(entries[0].tokens.output_tokens, 50);
355        assert_eq!(entries[1].tokens.input_tokens, 200);
356        assert_eq!(entries[1].tokens.output_tokens, 100);
357    }
358
359    #[tokio::test]
360    async fn test_last_token_usage_preferred() {
361        let dir = TempDir::new().unwrap();
362        let sessions_dir = dir.path().join("sessions");
363        std::fs::create_dir_all(&sessions_dir).unwrap();
364
365        let session_file = sessions_dir.join("test-last.jsonl");
366        let mut f = std::fs::File::create(&session_file).unwrap();
367
368        writeln!(f, "{}", make_turn_context("gpt-5")).unwrap();
369        writeln!(
370            f,
371            "{}",
372            make_token_event_with_last("2025-01-01T10:00:00Z", 500, 200, 50, 20)
373        )
374        .unwrap();
375
376        let entries = parse_session_file(&session_file, "test-last")
377            .await
378            .unwrap();
379        assert_eq!(entries.len(), 1);
380        // Should use last_token_usage (50, 20), not cumulative (500, 200)
381        assert_eq!(entries[0].tokens.input_tokens, 50);
382        assert_eq!(entries[0].tokens.output_tokens, 20);
383    }
384
385    #[tokio::test]
386    async fn test_model_fallback() {
387        let dir = TempDir::new().unwrap();
388        let sessions_dir = dir.path().join("sessions");
389        std::fs::create_dir_all(&sessions_dir).unwrap();
390
391        let session_file = sessions_dir.join("no-model.jsonl");
392        let mut f = std::fs::File::create(&session_file).unwrap();
393
394        // No turn_context → should use fallback model
395        writeln!(
396            f,
397            "{}",
398            make_token_event("2025-01-01T10:00:00Z", 100, 50, 0)
399        )
400        .unwrap();
401
402        let entries = parse_session_file(&session_file, "no-model").await.unwrap();
403        assert_eq!(entries.len(), 1);
404        assert_eq!(entries[0].model.as_str(), FALLBACK_MODEL);
405    }
406
407    #[tokio::test]
408    async fn test_model_alias() {
409        assert_eq!(normalize_model("gpt-5-codex"), "gpt-5");
410        assert_eq!(normalize_model("gpt-5"), "gpt-5");
411        assert_eq!(normalize_model("o3-mini"), "o3-mini");
412    }
413
414    #[tokio::test]
415    async fn test_data_loader_no_dir() {
416        // When CODEX_HOME points to nonexistent dir, new() should succeed
417        // but load_entries should return empty stream
418        unsafe {
419            std::env::set_var("CODEX_HOME", "/tmp/nonexistent-codex-test-dir");
420        }
421        let loader = DataLoader::new().await.unwrap();
422        let entries: Vec<_> = futures::StreamExt::collect::<Vec<_>>(loader.load_entries()).await;
423        assert!(entries.is_empty());
424        unsafe {
425            std::env::remove_var("CODEX_HOME");
426        }
427    }
428}