Skip to main content

kaizen/collect/tail/
codex.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Parse Codex (OpenAI) transcript `.jsonl` files into Events.
3//! Pure parser — no notify dependency, no IO beyond file reads.
4
5use crate::collect::model_from_json;
6use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
7use anyhow::{Context, Result};
8use serde_json::Value;
9use std::path::Path;
10
11/// Parse one `.jsonl` line. Returns `Some(Event)` for action-bearing lines.
12/// Extracts `usage.prompt_tokens` / `usage.completion_tokens` when present.
13pub fn parse_codex_line(
14    session_id: &str,
15    seq: u64,
16    base_ts: u64,
17    line: &str,
18) -> Result<Option<Event>> {
19    let v: Value = serde_json::from_str(line.trim()).context("codex transcript: invalid JSON")?;
20    let obj = match v.as_object() {
21        Some(o) => o,
22        None => return Ok(None),
23    };
24
25    let tokens_in = obj
26        .get("usage")
27        .and_then(|u| u.get("prompt_tokens"))
28        .and_then(|v| v.as_u64())
29        .map(|v| v as u32);
30
31    let tokens_out = obj
32        .get("usage")
33        .and_then(|u| u.get("completion_tokens"))
34        .and_then(|v| v.as_u64())
35        .map(|v| v as u32);
36
37    let reasoning_tokens = obj
38        .get("usage")
39        .and_then(|u| u.get("completion_tokens_details"))
40        .and_then(|d| d.get("reasoning_tokens"))
41        .and_then(|v| v.as_u64())
42        .map(|v| v as u32)
43        .or_else(|| {
44            obj.get("usage")
45                .and_then(|u| u.get("output_tokens_details"))
46                .and_then(|d| d.get("reasoning_tokens"))
47                .and_then(|v| v.as_u64())
48                .map(|v| v as u32)
49        });
50
51    let ts_ms = line_ts_ms(obj).unwrap_or(base_ts + seq * 100);
52    let ts_exact = line_ts_ms(obj).is_some();
53
54    // Tool calls are in top-level `tool_calls` array
55    if let Some(first) = obj
56        .get("tool_calls")
57        .and_then(|tc| tc.as_array())
58        .and_then(|arr| arr.first())
59    {
60        let tool_name = first
61            .get("function")
62            .and_then(|f| f.get("name"))
63            .and_then(|n| n.as_str())
64            .unwrap_or("")
65            .to_string();
66        return Ok(Some(Event {
67            session_id: session_id.to_string(),
68            seq,
69            ts_ms,
70            ts_exact,
71            kind: EventKind::ToolCall,
72            source: EventSource::Tail,
73            tool: Some(tool_name),
74            tool_call_id: first
75                .get("id")
76                .and_then(|v| v.as_str())
77                .map(ToOwned::to_owned),
78            tokens_in,
79            tokens_out,
80            reasoning_tokens,
81            cost_usd_e6: None,
82            stop_reason: None,
83            latency_ms: None,
84            ttft_ms: None,
85            retry_count: None,
86            context_used_tokens: None,
87            context_max_tokens: None,
88            cache_creation_tokens: None,
89            cache_read_tokens: None,
90            system_prompt_tokens: None,
91            payload: first.clone(),
92        }));
93    }
94
95    Ok(None)
96}
97
98fn line_ts_ms(obj: &serde_json::Map<String, Value>) -> Option<u64> {
99    if let Some(t) = ["timestamp_ms", "ts_ms", "created_at_ms"]
100        .iter()
101        .find_map(|k| obj.get(*k).and_then(|v| v.as_u64()))
102    {
103        return Some(t);
104    }
105    if let Some(t) = obj.get("timestamp").and_then(|v| v.as_u64()) {
106        return Some(if t < 1_000_000_000_000 {
107            t.saturating_mul(1000)
108        } else {
109            t
110        });
111    }
112    None
113}
114
115/// Walk all `.jsonl` files under `dir`; return inferred `SessionRecord` + events.
116/// Agent = "codex". Session id = dir name.
117pub fn scan_codex_session_dir(dir: &Path) -> Result<(SessionRecord, Vec<Event>)> {
118    let session_id = dir
119        .file_name()
120        .and_then(|n| n.to_str())
121        .unwrap_or("")
122        .to_string();
123
124    let workspace = dir
125        .parent()
126        .and_then(|p| p.parent())
127        .map(|p| p.to_string_lossy().to_string())
128        .unwrap_or_default();
129
130    let mut events = Vec::new();
131    let mut seq: u64 = 0;
132    let mut model: Option<String> = None;
133
134    let base_ts = crate::collect::tail::dir_mtime_ms(dir);
135    let mut entries: Vec<_> = std::fs::read_dir(dir)
136        .with_context(|| format!("read dir: {}", dir.display()))?
137        .filter_map(|e| e.ok())
138        .filter(|e| e.path().extension().map(|x| x == "jsonl").unwrap_or(false))
139        .collect();
140    entries.sort_by_key(|e| e.file_name());
141
142    for entry in entries {
143        let content = std::fs::read_to_string(entry.path())?;
144        for line in content.lines() {
145            if line.trim().is_empty() {
146                continue;
147            }
148            if model.is_none() {
149                model = model_from_json::from_line(line);
150            }
151            if let Some(ev) = parse_codex_line(&session_id, seq, base_ts, line)? {
152                events.push(ev);
153            }
154            seq += 1;
155        }
156    }
157
158    let record = SessionRecord {
159        id: session_id.clone(),
160        agent: "codex".to_string(),
161        model,
162        workspace,
163        started_at_ms: crate::collect::tail::dir_mtime_ms(dir),
164        ended_at_ms: None,
165        status: SessionStatus::Done,
166        trace_path: dir.to_string_lossy().to_string(),
167        start_commit: None,
168        end_commit: None,
169        branch: None,
170        dirty_start: None,
171        dirty_end: None,
172        repo_binding_source: None,
173        prompt_fingerprint: None,
174        parent_session_id: None,
175        agent_version: None,
176        os: None,
177        arch: None,
178        repo_file_count: None,
179        repo_total_loc: None,
180    };
181    Ok((record, events))
182}
183
184#[cfg(test)]
185mod tests {
186    use super::*;
187
188    const TOOL_USE_LINE: &str = r#"{"role":"assistant","content":null,"tool_calls":[{"id":"call_01","type":"function","function":{"name":"shell","arguments":"{\"command\":\"ls\"}"}}]}"#;
189    const WITH_USAGE_LINE: &str = r#"{"role":"assistant","content":null,"tool_calls":[{"id":"call_02","type":"function","function":{"name":"read_file","arguments":"{\"path\":\"src/main.rs\"}"}}],"usage":{"prompt_tokens":500,"completion_tokens":300,"total_tokens":800}}"#;
190
191    #[test]
192    fn parse_tool_use() {
193        let ev = parse_codex_line("s1", 0, 0, TOOL_USE_LINE)
194            .unwrap()
195            .unwrap();
196        assert_eq!(ev.kind, EventKind::ToolCall);
197        assert_eq!(ev.tool.as_deref(), Some("shell"));
198        assert_eq!(ev.tool_call_id.as_deref(), Some("call_01"));
199        assert_eq!(ev.session_id, "s1");
200    }
201
202    #[test]
203    fn parse_with_usage() {
204        let ev = parse_codex_line("s1", 0, 0, WITH_USAGE_LINE)
205            .unwrap()
206            .unwrap();
207        assert_eq!(ev.tokens_in, Some(500));
208        assert_eq!(ev.tokens_out, Some(300));
209        assert_eq!(ev.cost_usd_e6, None);
210    }
211
212    #[test]
213    fn scan_fixture_dir() {
214        let fixture_dir =
215            std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("tests/fixtures/codex");
216        let (record, events) = scan_codex_session_dir(&fixture_dir).unwrap();
217        assert_eq!(record.agent, "codex");
218        assert_eq!(record.model.as_deref(), Some("gpt-4o-fixture"));
219        assert!(!events.is_empty(), "expected events from fixture files");
220    }
221}