Skip to main content

kaizen/collect/tail/
codex.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Parse Codex (OpenAI) transcript `.jsonl` files into Events.
3//! Pure parser — no notify dependency, no IO beyond file reads.
4
5use crate::collect::model_from_json;
6use crate::core::cost::estimate_tail_event_cost_usd_e6;
7use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
8use anyhow::{Context, Result};
9use serde_json::Value;
10use std::path::Path;
11
12/// Parse one `.jsonl` line. Returns `Some(Event)` for action-bearing lines.
13/// Extracts `usage.prompt_tokens` / `usage.completion_tokens` when present.
14pub fn parse_codex_line(
15    session_id: &str,
16    seq: u64,
17    base_ts: u64,
18    line: &str,
19) -> Result<Option<Event>> {
20    let v: Value = serde_json::from_str(line.trim()).context("codex transcript: invalid JSON")?;
21    let obj = match v.as_object() {
22        Some(o) => o,
23        None => return Ok(None),
24    };
25
26    let tokens_in = obj
27        .get("usage")
28        .and_then(|u| u.get("prompt_tokens"))
29        .and_then(|v| v.as_u64())
30        .map(|v| v as u32);
31
32    let tokens_out = obj
33        .get("usage")
34        .and_then(|u| u.get("completion_tokens"))
35        .and_then(|v| v.as_u64())
36        .map(|v| v as u32);
37
38    let reasoning_tokens = obj
39        .get("usage")
40        .and_then(|u| u.get("completion_tokens_details"))
41        .and_then(|d| d.get("reasoning_tokens"))
42        .and_then(|v| v.as_u64())
43        .map(|v| v as u32)
44        .or_else(|| {
45            obj.get("usage")
46                .and_then(|u| u.get("output_tokens_details"))
47                .and_then(|d| d.get("reasoning_tokens"))
48                .and_then(|v| v.as_u64())
49                .map(|v| v as u32)
50        });
51
52    let ts_ms = line_ts_ms(obj).unwrap_or(base_ts + seq * 100);
53    let ts_exact = line_ts_ms(obj).is_some();
54    let line_model = model_from_json::from_object(obj);
55    let cost_usd_e6 = estimate_tail_event_cost_usd_e6(
56        line_model.as_deref(),
57        tokens_in,
58        tokens_out,
59        reasoning_tokens,
60    );
61
62    // Tool calls are in top-level `tool_calls` array
63    if let Some(first) = obj
64        .get("tool_calls")
65        .and_then(|tc| tc.as_array())
66        .and_then(|arr| arr.first())
67    {
68        let tool_name = first
69            .get("function")
70            .and_then(|f| f.get("name"))
71            .and_then(|n| n.as_str())
72            .unwrap_or("")
73            .to_string();
74        return Ok(Some(Event {
75            session_id: session_id.to_string(),
76            seq,
77            ts_ms,
78            ts_exact,
79            kind: EventKind::ToolCall,
80            source: EventSource::Tail,
81            tool: Some(tool_name),
82            tool_call_id: first
83                .get("id")
84                .and_then(|v| v.as_str())
85                .map(ToOwned::to_owned),
86            tokens_in,
87            tokens_out,
88            reasoning_tokens,
89            cost_usd_e6,
90            stop_reason: None,
91            latency_ms: None,
92            ttft_ms: None,
93            retry_count: None,
94            context_used_tokens: None,
95            context_max_tokens: None,
96            cache_creation_tokens: None,
97            cache_read_tokens: None,
98            system_prompt_tokens: None,
99            payload: first.clone(),
100        }));
101    }
102
103    Ok(None)
104}
105
106fn line_ts_ms(obj: &serde_json::Map<String, Value>) -> Option<u64> {
107    if let Some(t) = ["timestamp_ms", "ts_ms", "created_at_ms"]
108        .iter()
109        .find_map(|k| obj.get(*k).and_then(|v| v.as_u64()))
110    {
111        return Some(t);
112    }
113    if let Some(t) = obj.get("timestamp").and_then(|v| v.as_u64()) {
114        return Some(if t < 1_000_000_000_000 {
115            t.saturating_mul(1000)
116        } else {
117            t
118        });
119    }
120    None
121}
122
123/// Walk all `.jsonl` files under `dir`; return inferred `SessionRecord` + events.
124/// Agent = "codex". Session id = dir name.
125pub fn scan_codex_session_dir(dir: &Path) -> Result<(SessionRecord, Vec<Event>)> {
126    let session_id = dir
127        .file_name()
128        .and_then(|n| n.to_str())
129        .unwrap_or("")
130        .to_string();
131
132    let workspace = dir
133        .parent()
134        .and_then(|p| p.parent())
135        .map(|p| p.to_string_lossy().to_string())
136        .unwrap_or_default();
137
138    let mut events = Vec::new();
139    let mut seq: u64 = 0;
140    let mut model: Option<String> = None;
141
142    let base_ts = crate::collect::tail::dir_mtime_ms(dir);
143    let mut entries: Vec<_> = std::fs::read_dir(dir)
144        .with_context(|| format!("read dir: {}", dir.display()))?
145        .filter_map(|e| e.ok())
146        .filter(|e| e.path().extension().map(|x| x == "jsonl").unwrap_or(false))
147        .collect();
148    entries.sort_by_key(|e| e.file_name());
149
150    for entry in entries {
151        let content = std::fs::read_to_string(entry.path())?;
152        for line in content.lines() {
153            if line.trim().is_empty() {
154                continue;
155            }
156            if let Some(m) = model_from_json::from_line(line) {
157                model = Some(m);
158            }
159            if let Some(ev) = parse_codex_line(&session_id, seq, base_ts, line)? {
160                events.push(ev);
161            }
162            seq += 1;
163        }
164    }
165
166    let record = SessionRecord {
167        id: session_id.clone(),
168        agent: "codex".to_string(),
169        model,
170        workspace,
171        started_at_ms: crate::collect::tail::dir_mtime_ms(dir),
172        ended_at_ms: None,
173        status: SessionStatus::Done,
174        trace_path: dir.to_string_lossy().to_string(),
175        start_commit: None,
176        end_commit: None,
177        branch: None,
178        dirty_start: None,
179        dirty_end: None,
180        repo_binding_source: None,
181        prompt_fingerprint: None,
182        parent_session_id: None,
183        agent_version: None,
184        os: None,
185        arch: None,
186        repo_file_count: None,
187        repo_total_loc: None,
188    };
189    Ok((record, events))
190}
191
192#[cfg(test)]
193mod tests {
194    use super::*;
195
196    const TOOL_USE_LINE: &str = r#"{"role":"assistant","content":null,"tool_calls":[{"id":"call_01","type":"function","function":{"name":"shell","arguments":"{\"command\":\"ls\"}"}}]}"#;
197    const WITH_USAGE_LINE: &str = r#"{"role":"assistant","content":null,"tool_calls":[{"id":"call_02","type":"function","function":{"name":"read_file","arguments":"{\"path\":\"src/main.rs\"}"}}],"usage":{"prompt_tokens":500,"completion_tokens":300,"total_tokens":800}}"#;
198
199    #[test]
200    fn parse_tool_use() {
201        let ev = parse_codex_line("s1", 0, 0, TOOL_USE_LINE)
202            .unwrap()
203            .unwrap();
204        assert_eq!(ev.kind, EventKind::ToolCall);
205        assert_eq!(ev.tool.as_deref(), Some("shell"));
206        assert_eq!(ev.tool_call_id.as_deref(), Some("call_01"));
207        assert_eq!(ev.session_id, "s1");
208    }
209
210    #[test]
211    fn parse_with_usage() {
212        let ev = parse_codex_line("s1", 0, 0, WITH_USAGE_LINE)
213            .unwrap()
214            .unwrap();
215        assert_eq!(ev.tokens_in, Some(500));
216        assert_eq!(ev.tokens_out, Some(300));
217        assert!(
218            ev.cost_usd_e6.is_some_and(|c| c > 0),
219            "expected tail cost from usage"
220        );
221    }
222
223    #[test]
224    fn scan_fixture_dir() {
225        let fixture_dir =
226            std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("tests/fixtures/codex");
227        let (record, events) = scan_codex_session_dir(&fixture_dir).unwrap();
228        assert_eq!(record.agent, "codex");
229        assert_eq!(record.model.as_deref(), Some("gpt-4o-fixture"));
230        assert!(!events.is_empty(), "expected events from fixture files");
231        assert!(
232            events.iter().any(|e| e.cost_usd_e6.is_some_and(|c| c > 0)),
233            "with_usage fixture should yield cost"
234        );
235    }
236}