1use async_trait::async_trait;
7use ccstat_core::error::{CcstatError, Result};
8use ccstat_core::provider::ProviderDataLoader;
9use ccstat_core::types::{ISOTimestamp, ModelName, SessionId, TokenCounts, UsageEntry};
10use chrono::DateTime;
11use futures::stream::Stream;
12use serde::Deserialize;
13use std::collections::HashMap;
14use std::path::PathBuf;
15use std::pin::Pin;
16use tokio::io::{AsyncBufReadExt, BufReader};
17use tracing::{debug, warn};
18
19const FALLBACK_MODEL: &str = "gpt-5";
21
22pub struct DataLoader {
24 session_dir: PathBuf,
25}
26
27#[async_trait]
28impl ProviderDataLoader for DataLoader {
29 async fn new() -> Result<Self> {
30 let base = if let Ok(home) = std::env::var("CODEX_HOME") {
31 PathBuf::from(home)
32 } else {
33 dirs::home_dir()
34 .ok_or_else(|| CcstatError::Config("Cannot determine home directory".into()))?
35 .join(".codex")
36 };
37
38 let session_dir = base.join("sessions");
39 if !session_dir.exists() {
40 debug!(
41 "Codex sessions directory not found: {}",
42 session_dir.display()
43 );
44 }
45
46 Ok(DataLoader { session_dir })
47 }
48
49 fn load_entries(&self) -> Pin<Box<dyn Stream<Item = Result<UsageEntry>> + Send + '_>> {
50 Box::pin(async_stream::try_stream! {
51 if !self.session_dir.exists() {
52 return;
53 }
54
55 let mut jsonl_files = Vec::new();
56 for entry in walkdir::WalkDir::new(&self.session_dir)
57 .min_depth(1)
58 .max_depth(1)
59 .into_iter()
60 .filter_map(|e| e.ok())
61 {
62 let path = entry.path().to_path_buf();
63 if path.extension().is_some_and(|ext| ext == "jsonl") {
64 jsonl_files.push(path);
65 }
66 }
67
68 debug!("Found {} Codex session files", jsonl_files.len());
69
70 for path in jsonl_files {
71 let session_id = path
72 .file_stem()
73 .and_then(|s| s.to_str())
74 .unwrap_or("unknown")
75 .to_string();
76
77 let entries = parse_session_file(&path, &session_id).await;
78 match entries {
79 Ok(parsed) => {
80 for entry in parsed {
81 yield entry;
82 }
83 }
84 Err(e) => {
85 warn!("Failed to parse Codex session {}: {}", session_id, e);
86 }
87 }
88 }
89 })
90 }
91}
92
93#[derive(Deserialize)]
98struct CodexEvent {
99 #[serde(rename = "type")]
100 event_type: String,
101 timestamp: Option<String>,
102 #[serde(default)]
103 model_id: Option<String>,
104 #[serde(default)]
105 payload: Option<EventPayload>,
106}
107
108#[derive(Deserialize)]
109struct EventPayload {
110 #[serde(rename = "type")]
111 payload_type: Option<String>,
112 #[serde(default)]
113 info: Option<TokenInfo>,
114}
115
116#[derive(Deserialize)]
117struct TokenInfo {
118 total_token_usage: Option<CumulativeTokens>,
119 last_token_usage: Option<CumulativeTokens>,
120}
121
122#[derive(Deserialize, Clone, Default)]
123#[allow(dead_code)]
124struct CumulativeTokens {
125 #[serde(default)]
126 input_tokens: u64,
127 #[serde(default)]
128 cached_input_tokens: u64,
129 #[serde(alias = "cache_read_input_tokens")]
130 #[serde(default)]
131 cache_read_tokens: u64,
132 #[serde(default)]
133 output_tokens: u64,
134 #[serde(default)]
135 total_tokens: u64,
136}
137
138async fn parse_session_file(path: &PathBuf, session_id: &str) -> Result<Vec<UsageEntry>> {
143 let file = tokio::fs::File::open(path).await.map_err(|e| {
144 CcstatError::Io(std::io::Error::new(
145 e.kind(),
146 format!("{}: {}", path.display(), e),
147 ))
148 })?;
149 let reader = BufReader::new(file);
150 let mut lines = reader.lines();
151
152 let mut entries = Vec::new();
153 let mut current_model: Option<String> = None;
154 let mut prev_cumulative = HashMap::<String, CumulativeTokens>::new();
155
156 while let Some(line) = lines.next_line().await? {
157 if line.trim().is_empty() {
158 continue;
159 }
160
161 let event: CodexEvent = match serde_json::from_str(&line) {
162 Ok(e) => e,
163 Err(_) => continue,
164 };
165
166 match event.event_type.as_str() {
167 "turn_context" => {
168 if let Some(model) = event.model_id {
169 current_model = Some(normalize_model(&model));
170 }
171 }
172 "event_msg" => {
173 let Some(payload) = &event.payload else {
174 continue;
175 };
176 if payload.payload_type.as_deref() != Some("token_count") {
177 continue;
178 }
179 let Some(info) = &payload.info else {
180 continue;
181 };
182 let Some(timestamp_str) = &event.timestamp else {
183 continue;
184 };
185
186 let timestamp = match DateTime::parse_from_rfc3339(timestamp_str) {
187 Ok(dt) => ISOTimestamp::new(dt.to_utc()),
188 Err(_) => {
189 warn!(
190 "Invalid timestamp in Codex session {}: {}",
191 session_id, timestamp_str
192 );
193 continue;
194 }
195 };
196
197 let model_name = current_model
198 .as_deref()
199 .unwrap_or(FALLBACK_MODEL)
200 .to_string();
201
202 let delta = compute_delta(info, prev_cumulative.get(&model_name));
204
205 if let Some(total) = &info.total_token_usage {
207 prev_cumulative.insert(model_name.clone(), total.clone());
208 }
209
210 if delta.input_tokens == 0 && delta.output_tokens == 0 {
212 continue;
213 }
214
215 entries.push(UsageEntry {
216 session_id: SessionId::new(session_id.to_string()),
217 timestamp,
218 model: ModelName::new(model_name),
219 tokens: delta,
220 total_cost: None,
221 project: None,
222 instance_id: None,
223 });
224 }
225 _ => {}
226 }
227 }
228
229 Ok(entries)
230}
231
232fn compute_delta(info: &TokenInfo, prev: Option<&CumulativeTokens>) -> TokenCounts {
234 if let Some(last) = &info.last_token_usage {
236 let cache_read = if last.cached_input_tokens > 0 {
237 last.cached_input_tokens
238 } else {
239 last.cache_read_tokens
240 };
241 return TokenCounts::new(last.input_tokens, last.output_tokens, 0, cache_read);
242 }
243
244 if let Some(total) = &info.total_token_usage {
246 let cache_read_total = if total.cached_input_tokens > 0 {
247 total.cached_input_tokens
248 } else {
249 total.cache_read_tokens
250 };
251
252 let (prev_input, prev_output, prev_cache_read) = match prev {
253 Some(p) => {
254 let p_cache = if p.cached_input_tokens > 0 {
255 p.cached_input_tokens
256 } else {
257 p.cache_read_tokens
258 };
259 (p.input_tokens, p.output_tokens, p_cache)
260 }
261 None => (0, 0, 0),
262 };
263
264 TokenCounts::new(
265 total.input_tokens.saturating_sub(prev_input),
266 total.output_tokens.saturating_sub(prev_output),
267 0,
268 cache_read_total.saturating_sub(prev_cache_read),
269 )
270 } else {
271 TokenCounts::new(0, 0, 0, 0)
272 }
273}
274
275fn normalize_model(model: &str) -> String {
277 if model == "gpt-5-codex" {
278 "gpt-5".to_string()
279 } else {
280 model.to_string()
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287 use std::io::Write;
288 use tempfile::TempDir;
289
290 fn make_token_event(ts: &str, input: u64, output: u64, cached: u64) -> String {
291 format!(
292 r#"{{"type":"event_msg","timestamp":"{}","payload":{{"type":"token_count","info":{{"total_token_usage":{{"input_tokens":{},"cached_input_tokens":{},"output_tokens":{},"total_tokens":{}}}}}}}}}"#,
293 ts,
294 input,
295 cached,
296 output,
297 input + output
298 )
299 }
300
301 fn make_token_event_with_last(
302 ts: &str,
303 total_input: u64,
304 total_output: u64,
305 last_input: u64,
306 last_output: u64,
307 ) -> String {
308 format!(
309 r#"{{"type":"event_msg","timestamp":"{}","payload":{{"type":"token_count","info":{{"total_token_usage":{{"input_tokens":{},"output_tokens":{},"total_tokens":{}}},"last_token_usage":{{"input_tokens":{},"output_tokens":{},"total_tokens":{}}}}}}}}}"#,
310 ts,
311 total_input,
312 total_output,
313 total_input + total_output,
314 last_input,
315 last_output,
316 last_input + last_output,
317 )
318 }
319
320 fn make_turn_context(model: &str) -> String {
321 format!(r#"{{"type":"turn_context","model_id":"{}"}}"#, model)
322 }
323
324 #[tokio::test]
325 async fn test_cumulative_to_delta() {
326 let dir = TempDir::new().unwrap();
327 let sessions_dir = dir.path().join("sessions");
328 std::fs::create_dir_all(&sessions_dir).unwrap();
329
330 let session_file = sessions_dir.join("test-session.jsonl");
331 let mut f = std::fs::File::create(&session_file).unwrap();
332
333 writeln!(f, "{}", make_turn_context("gpt-5")).unwrap();
334 writeln!(
336 f,
337 "{}",
338 make_token_event("2025-01-01T10:00:00Z", 100, 50, 0)
339 )
340 .unwrap();
341 writeln!(
343 f,
344 "{}",
345 make_token_event("2025-01-01T10:05:00Z", 300, 150, 0)
346 )
347 .unwrap();
348
349 let entries = parse_session_file(&session_file, "test-session")
350 .await
351 .unwrap();
352 assert_eq!(entries.len(), 2);
353 assert_eq!(entries[0].tokens.input_tokens, 100);
354 assert_eq!(entries[0].tokens.output_tokens, 50);
355 assert_eq!(entries[1].tokens.input_tokens, 200);
356 assert_eq!(entries[1].tokens.output_tokens, 100);
357 }
358
359 #[tokio::test]
360 async fn test_last_token_usage_preferred() {
361 let dir = TempDir::new().unwrap();
362 let sessions_dir = dir.path().join("sessions");
363 std::fs::create_dir_all(&sessions_dir).unwrap();
364
365 let session_file = sessions_dir.join("test-last.jsonl");
366 let mut f = std::fs::File::create(&session_file).unwrap();
367
368 writeln!(f, "{}", make_turn_context("gpt-5")).unwrap();
369 writeln!(
370 f,
371 "{}",
372 make_token_event_with_last("2025-01-01T10:00:00Z", 500, 200, 50, 20)
373 )
374 .unwrap();
375
376 let entries = parse_session_file(&session_file, "test-last")
377 .await
378 .unwrap();
379 assert_eq!(entries.len(), 1);
380 assert_eq!(entries[0].tokens.input_tokens, 50);
382 assert_eq!(entries[0].tokens.output_tokens, 20);
383 }
384
385 #[tokio::test]
386 async fn test_model_fallback() {
387 let dir = TempDir::new().unwrap();
388 let sessions_dir = dir.path().join("sessions");
389 std::fs::create_dir_all(&sessions_dir).unwrap();
390
391 let session_file = sessions_dir.join("no-model.jsonl");
392 let mut f = std::fs::File::create(&session_file).unwrap();
393
394 writeln!(
396 f,
397 "{}",
398 make_token_event("2025-01-01T10:00:00Z", 100, 50, 0)
399 )
400 .unwrap();
401
402 let entries = parse_session_file(&session_file, "no-model").await.unwrap();
403 assert_eq!(entries.len(), 1);
404 assert_eq!(entries[0].model.as_str(), FALLBACK_MODEL);
405 }
406
407 #[tokio::test]
408 async fn test_model_alias() {
409 assert_eq!(normalize_model("gpt-5-codex"), "gpt-5");
410 assert_eq!(normalize_model("gpt-5"), "gpt-5");
411 assert_eq!(normalize_model("o3-mini"), "o3-mini");
412 }
413
414 #[tokio::test]
415 async fn test_data_loader_no_dir() {
416 unsafe {
419 std::env::set_var("CODEX_HOME", "/tmp/nonexistent-codex-test-dir");
420 }
421 let loader = DataLoader::new().await.unwrap();
422 let entries: Vec<_> = futures::StreamExt::collect::<Vec<_>>(loader.load_entries()).await;
423 assert!(entries.is_empty());
424 unsafe {
425 std::env::remove_var("CODEX_HOME");
426 }
427 }
428}