kaizen/collect/tail/
codex.rs1use crate::collect::model_from_json;
6use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
7use anyhow::{Context, Result};
8use serde_json::Value;
9use std::path::Path;
10
11pub fn parse_codex_line(
14 session_id: &str,
15 seq: u64,
16 base_ts: u64,
17 line: &str,
18) -> Result<Option<Event>> {
19 let v: Value = serde_json::from_str(line.trim()).context("codex transcript: invalid JSON")?;
20 let obj = match v.as_object() {
21 Some(o) => o,
22 None => return Ok(None),
23 };
24
25 let tokens_in = obj
26 .get("usage")
27 .and_then(|u| u.get("prompt_tokens"))
28 .and_then(|v| v.as_u64())
29 .map(|v| v as u32);
30
31 let tokens_out = obj
32 .get("usage")
33 .and_then(|u| u.get("completion_tokens"))
34 .and_then(|v| v.as_u64())
35 .map(|v| v as u32);
36
37 let reasoning_tokens = obj
38 .get("usage")
39 .and_then(|u| u.get("completion_tokens_details"))
40 .and_then(|d| d.get("reasoning_tokens"))
41 .and_then(|v| v.as_u64())
42 .map(|v| v as u32)
43 .or_else(|| {
44 obj.get("usage")
45 .and_then(|u| u.get("output_tokens_details"))
46 .and_then(|d| d.get("reasoning_tokens"))
47 .and_then(|v| v.as_u64())
48 .map(|v| v as u32)
49 });
50
51 let ts_ms = line_ts_ms(obj).unwrap_or(base_ts + seq * 100);
52 let ts_exact = line_ts_ms(obj).is_some();
53
54 if let Some(first) = obj
56 .get("tool_calls")
57 .and_then(|tc| tc.as_array())
58 .and_then(|arr| arr.first())
59 {
60 let tool_name = first
61 .get("function")
62 .and_then(|f| f.get("name"))
63 .and_then(|n| n.as_str())
64 .unwrap_or("")
65 .to_string();
66 return Ok(Some(Event {
67 session_id: session_id.to_string(),
68 seq,
69 ts_ms,
70 ts_exact,
71 kind: EventKind::ToolCall,
72 source: EventSource::Tail,
73 tool: Some(tool_name),
74 tool_call_id: first
75 .get("id")
76 .and_then(|v| v.as_str())
77 .map(ToOwned::to_owned),
78 tokens_in,
79 tokens_out,
80 reasoning_tokens,
81 cost_usd_e6: None,
82 payload: first.clone(),
83 }));
84 }
85
86 Ok(None)
87}
88
89fn line_ts_ms(obj: &serde_json::Map<String, Value>) -> Option<u64> {
90 if let Some(t) = ["timestamp_ms", "ts_ms", "created_at_ms"]
91 .iter()
92 .find_map(|k| obj.get(*k).and_then(|v| v.as_u64()))
93 {
94 return Some(t);
95 }
96 if let Some(t) = obj.get("timestamp").and_then(|v| v.as_u64()) {
97 return Some(if t < 1_000_000_000_000 {
98 t.saturating_mul(1000)
99 } else {
100 t
101 });
102 }
103 None
104}
105
106pub fn scan_codex_session_dir(dir: &Path) -> Result<(SessionRecord, Vec<Event>)> {
109 let session_id = dir
110 .file_name()
111 .and_then(|n| n.to_str())
112 .unwrap_or("")
113 .to_string();
114
115 let workspace = dir
116 .parent()
117 .and_then(|p| p.parent())
118 .map(|p| p.to_string_lossy().to_string())
119 .unwrap_or_default();
120
121 let mut events = Vec::new();
122 let mut seq: u64 = 0;
123 let mut model: Option<String> = None;
124
125 let base_ts = crate::collect::tail::dir_mtime_ms(dir);
126 let mut entries: Vec<_> = std::fs::read_dir(dir)
127 .with_context(|| format!("read dir: {}", dir.display()))?
128 .filter_map(|e| e.ok())
129 .filter(|e| e.path().extension().map(|x| x == "jsonl").unwrap_or(false))
130 .collect();
131 entries.sort_by_key(|e| e.file_name());
132
133 for entry in entries {
134 let content = std::fs::read_to_string(entry.path())?;
135 for line in content.lines() {
136 if line.trim().is_empty() {
137 continue;
138 }
139 if model.is_none() {
140 model = model_from_json::from_line(line);
141 }
142 if let Some(ev) = parse_codex_line(&session_id, seq, base_ts, line)? {
143 events.push(ev);
144 }
145 seq += 1;
146 }
147 }
148
149 let record = SessionRecord {
150 id: session_id.clone(),
151 agent: "codex".to_string(),
152 model,
153 workspace,
154 started_at_ms: crate::collect::tail::dir_mtime_ms(dir),
155 ended_at_ms: None,
156 status: SessionStatus::Done,
157 trace_path: dir.to_string_lossy().to_string(),
158 start_commit: None,
159 end_commit: None,
160 branch: None,
161 dirty_start: None,
162 dirty_end: None,
163 repo_binding_source: None,
164 };
165 Ok((record, events))
166}
167
168#[cfg(test)]
169mod tests {
170 use super::*;
171
172 const TOOL_USE_LINE: &str = r#"{"role":"assistant","content":null,"tool_calls":[{"id":"call_01","type":"function","function":{"name":"shell","arguments":"{\"command\":\"ls\"}"}}]}"#;
173 const WITH_USAGE_LINE: &str = r#"{"role":"assistant","content":null,"tool_calls":[{"id":"call_02","type":"function","function":{"name":"read_file","arguments":"{\"path\":\"src/main.rs\"}"}}],"usage":{"prompt_tokens":500,"completion_tokens":300,"total_tokens":800}}"#;
174
175 #[test]
176 fn parse_tool_use() {
177 let ev = parse_codex_line("s1", 0, 0, TOOL_USE_LINE)
178 .unwrap()
179 .unwrap();
180 assert_eq!(ev.kind, EventKind::ToolCall);
181 assert_eq!(ev.tool.as_deref(), Some("shell"));
182 assert_eq!(ev.tool_call_id.as_deref(), Some("call_01"));
183 assert_eq!(ev.session_id, "s1");
184 }
185
186 #[test]
187 fn parse_with_usage() {
188 let ev = parse_codex_line("s1", 0, 0, WITH_USAGE_LINE)
189 .unwrap()
190 .unwrap();
191 assert_eq!(ev.tokens_in, Some(500));
192 assert_eq!(ev.tokens_out, Some(300));
193 assert_eq!(ev.cost_usd_e6, None);
194 }
195
196 #[test]
197 fn scan_fixture_dir() {
198 let fixture_dir =
199 std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("tests/fixtures/codex");
200 let (record, events) = scan_codex_session_dir(&fixture_dir).unwrap();
201 assert_eq!(record.agent, "codex");
202 assert_eq!(record.model.as_deref(), Some("gpt-4o-fixture"));
203 assert!(!events.is_empty(), "expected events from fixture files");
204 }
205}