1use anyhow::Result;
2use chrono::{DateTime, Utc};
3use serde_json::Value;
4use std::{
5 fs::File,
6 io::{BufRead, BufReader, Seek, SeekFrom},
7 path::Path,
8};
9
10use crate::models::{CanonicalEvent, EventMeta, ToolInfo};
11
12pub fn parse_jsonl_file(path: &Path, source: &str) -> Result<Vec<CanonicalEvent>> {
13 let (events, _) = parse_jsonl_file_from_offset(path, source, 0)?;
14 Ok(events)
15}
16
17pub fn parse_source_file(
18 path: &Path,
19 source: &str,
20 format: &str,
21 parser_hint: Option<&str>,
22) -> Result<Vec<CanonicalEvent>> {
23 match format {
24 "jsonl" => parse_jsonl_file(path, source),
25 "json" => parse_json_file(path, source, parser_hint),
26 "mixed" => parse_mixed_file(path, source, parser_hint),
27 other => anyhow::bail!("unsupported source format: {other}"),
28 }
29}
30
31pub fn parse_jsonl_file_from_offset(
32 path: &Path,
33 source: &str,
34 start_offset: u64,
35) -> Result<(Vec<CanonicalEvent>, u64)> {
36 let file = File::open(path)?;
37 let mut reader = BufReader::new(file);
38 reader.seek(SeekFrom::Start(start_offset))?;
39 let session_id = path
40 .file_stem()
41 .and_then(|s| s.to_str())
42 .unwrap_or("unknown-session")
43 .to_string();
44
45 let mut out = Vec::new();
46 let mut next_offset = start_offset;
47 let mut line = String::new();
48
49 loop {
50 line.clear();
51 let bytes_read = reader.read_line(&mut line)?;
52 if bytes_read == 0 {
53 break;
54 }
55 next_offset += bytes_read as u64;
56 let line = line.trim_end_matches(['\n', '\r']).to_string();
57 if line.trim().is_empty() {
58 continue;
59 }
60
61 let value = match serde_json::from_str::<Value>(&line) {
62 Ok(v) => v,
63 Err(_) => {
64 out.push(fallback_event(source, &session_id, line));
65 continue;
66 }
67 };
68
69 out.push(value_to_event(source, &session_id, value));
70 }
71
72 Ok((out, next_offset))
73}
74
75fn value_to_event(source: &str, session_id: &str, v: Value) -> CanonicalEvent {
76 let ts = extract_ts(&v).unwrap_or_else(Utc::now);
77 let payload = v.get("payload").unwrap_or(&v);
78 let top_type = v.get("type").and_then(Value::as_str).unwrap_or_default();
79 let kind = infer_kind(top_type, payload);
80 let text = {
81 let from_payload = extract_text(payload);
82 if !from_payload.trim().is_empty() {
83 from_payload
84 } else {
85 extract_text(&v)
86 }
87 };
88
89 let tool_name = payload
90 .get("tool")
91 .and_then(|t| t.get("name").or(Some(t)))
92 .or_else(|| payload.get("name"))
93 .and_then(Value::as_str)
94 .map(str::to_string);
95
96 let tool = tool_name.map(|name| ToolInfo {
97 name,
98 args_json: payload.get("args").map(|a| a.to_string()),
99 result_json: payload.get("result").map(|r| r.to_string()),
100 });
101
102 CanonicalEvent {
103 source: source.to_string(),
104 session_id: session_id.to_string(),
105 ts,
106 kind,
107 text,
108 tool,
109 meta: Some(EventMeta {
110 cwd: payload
111 .get("cwd")
112 .or_else(|| v.get("cwd"))
113 .and_then(Value::as_str)
114 .map(str::to_string),
115 repo: payload
116 .get("repo")
117 .or_else(|| v.get("repo"))
118 .and_then(Value::as_str)
119 .map(str::to_string),
120 exit_code: payload
121 .get("exit_code")
122 .or_else(|| v.get("exit_code"))
123 .and_then(Value::as_i64)
124 .map(|n| n as i32),
125 model: payload
126 .get("model")
127 .or_else(|| v.get("model"))
128 .and_then(Value::as_str)
129 .map(str::to_string),
130 tags: Vec::new(),
131 }),
132 }
133}
134
135fn fallback_event(source: &str, session_id: &str, text: String) -> CanonicalEvent {
136 CanonicalEvent {
137 source: source.to_string(),
138 session_id: session_id.to_string(),
139 ts: Utc::now(),
140 kind: "system".to_string(),
141 text,
142 tool: None,
143 meta: None,
144 }
145}
146
147fn extract_ts(v: &Value) -> Option<DateTime<Utc>> {
148 let candidate = v
149 .get("ts")
150 .or_else(|| v.get("timestamp"))
151 .or_else(|| v.get("time"))
152 .and_then(Value::as_str)?;
153
154 DateTime::parse_from_rfc3339(candidate)
155 .ok()
156 .map(|dt| dt.with_timezone(&Utc))
157}
158
159fn parse_json_file(
160 path: &Path,
161 source: &str,
162 parser_hint: Option<&str>,
163) -> Result<Vec<CanonicalEvent>> {
164 let value = read_json_with_retry(path)?;
165 if parser_hint == Some("tandem_v1") {
166 return parse_tandem_v1(source, &value);
167 }
168 Ok(vec![value_to_event(
169 source,
170 path.file_stem()
171 .and_then(|s| s.to_str())
172 .unwrap_or("unknown-session"),
173 value,
174 )])
175}
176
177fn parse_mixed_file(
178 path: &Path,
179 source: &str,
180 parser_hint: Option<&str>,
181) -> Result<Vec<CanonicalEvent>> {
182 if let Some(ext) = path.extension().and_then(|e| e.to_str()) {
183 let ext = ext.to_ascii_lowercase();
184 if ext == "jsonl" || ext == "ndjson" {
185 return parse_jsonl_file(path, source);
186 }
187 if ext == "json" {
188 return parse_json_file(path, source, parser_hint);
189 }
190 }
191
192 parse_json_file(path, source, parser_hint).or_else(|_| parse_jsonl_file(path, source))
193}
194
195fn read_json_with_retry(path: &Path) -> Result<Value> {
196 let mut last_err: Option<serde_json::Error> = None;
197 for attempt in 0..3 {
198 let text = std::fs::read_to_string(path)?;
199 match serde_json::from_str::<Value>(&text) {
200 Ok(v) => return Ok(v),
201 Err(e) => {
202 if e.is_eof() && attempt < 2 {
203 std::thread::sleep(std::time::Duration::from_millis(50));
204 last_err = Some(e);
205 continue;
206 }
207 return Err(e.into());
208 }
209 }
210 }
211 match last_err {
212 Some(e) => Err(e.into()),
213 None => anyhow::bail!("failed to parse json file"),
214 }
215}
216
217fn parse_tandem_v1(source: &str, root: &Value) -> Result<Vec<CanonicalEvent>> {
218 let mut out = Vec::new();
219 let Some(map) = root.as_object() else {
220 return Ok(out);
221 };
222
223 for (session_key, session) in map {
224 let session_id = session
225 .get("id")
226 .and_then(Value::as_str)
227 .unwrap_or(session_key)
228 .to_string();
229 let cwd = session
230 .get("workspace_root")
231 .or_else(|| session.get("directory"))
232 .and_then(Value::as_str)
233 .map(str::to_string);
234
235 let messages = session
236 .get("messages")
237 .and_then(Value::as_array)
238 .cloned()
239 .unwrap_or_default();
240
241 for msg in messages {
242 let role = msg.get("role").and_then(Value::as_str).unwrap_or("system");
243 let kind = match role {
244 "user" => "user_msg",
245 "assistant" => "assistant_msg",
246 _ => "system",
247 }
248 .to_string();
249
250 let ts = msg
251 .get("created_at")
252 .and_then(Value::as_str)
253 .and_then(parse_rfc3339)
254 .or_else(|| {
255 session
256 .get("time")
257 .and_then(|t| t.get("updated").or_else(|| t.get("created")))
258 .and_then(Value::as_str)
259 .and_then(parse_rfc3339)
260 })
261 .unwrap_or_else(Utc::now);
262
263 let text = extract_tandem_message_text(&msg);
264 if text.trim().is_empty() {
265 continue;
266 }
267
268 out.push(CanonicalEvent {
269 source: source.to_string(),
270 session_id: session_id.clone(),
271 ts,
272 kind,
273 text,
274 tool: None,
275 meta: Some(EventMeta {
276 cwd: cwd.clone(),
277 repo: None,
278 exit_code: None,
279 model: session
280 .get("model")
281 .and_then(Value::as_str)
282 .map(str::to_string),
283 tags: vec!["tandem_v1".to_string()],
284 }),
285 });
286 }
287 }
288
289 Ok(out)
290}
291
292fn extract_tandem_message_text(msg: &Value) -> String {
293 if let Some(parts) = msg.get("parts").and_then(Value::as_array) {
294 let joined = parts
295 .iter()
296 .filter_map(|p| {
297 let ptype = p.get("type").and_then(Value::as_str).unwrap_or("");
298 if ptype == "text" || ptype == "input_text" || ptype == "output_text" {
299 return p.get("text").and_then(Value::as_str).map(str::to_string);
300 }
301 None
302 })
303 .collect::<Vec<_>>()
304 .join("\n");
305 if !joined.trim().is_empty() {
306 return joined;
307 }
308 }
309 extract_text(msg)
310}
311
312fn parse_rfc3339(s: &str) -> Option<DateTime<Utc>> {
313 DateTime::parse_from_rfc3339(s)
314 .ok()
315 .map(|dt| dt.with_timezone(&Utc))
316}
317
318fn extract_text(v: &Value) -> String {
319 for key in [
320 "text",
321 "message",
322 "content",
323 "delta",
324 "output_text",
325 "input",
326 ] {
327 if let Some(raw) = v.get(key) {
328 let value = flatten_text(raw, 0);
329 if !value.trim().is_empty() {
330 return value;
331 }
332 }
333 }
334 if let Some(item) = v.get("item") {
335 let value = flatten_text(item, 0);
336 if !value.trim().is_empty() {
337 return value;
338 }
339 }
340 String::new()
341}
342
343fn infer_kind(top_type: &str, payload: &Value) -> String {
344 if top_type == "response_item" && payload.get("type").and_then(Value::as_str) == Some("message")
345 {
346 return match payload.get("role").and_then(Value::as_str) {
347 Some("user") => "user_msg".to_string(),
348 Some("assistant") => "assistant_msg".to_string(),
349 _ => "system".to_string(),
350 };
351 }
352 if top_type == "event_msg" {
353 return match payload.get("type").and_then(Value::as_str) {
354 Some("user_message") => "user_msg".to_string(),
355 Some("tool_call") => "tool_call".to_string(),
356 Some("tool_result") => "tool_result".to_string(),
357 Some("error") => "error".to_string(),
358 _ => "system".to_string(),
359 };
360 }
361 if let Some(kind) = payload
362 .get("kind")
363 .or_else(|| payload.get("type"))
364 .and_then(Value::as_str)
365 {
366 return kind.to_string();
367 }
368 if top_type.is_empty() {
369 "system".to_string()
370 } else {
371 top_type.to_string()
372 }
373}
374
375fn flatten_text(v: &Value, depth: usize) -> String {
376 if depth > 5 {
377 return String::new();
378 }
379 match v {
380 Value::String(s) => s.clone(),
381 Value::Array(items) => {
382 let parts = items
383 .iter()
384 .map(|item| flatten_text(item, depth + 1))
385 .filter(|s| !s.trim().is_empty())
386 .collect::<Vec<_>>();
387 parts.join(" ")
388 }
389 Value::Object(map) => {
390 for key in ["text", "content", "value", "output_text", "message"] {
391 if let Some(raw) = map.get(key) {
392 let value = flatten_text(raw, depth + 1);
393 if !value.trim().is_empty() {
394 return value;
395 }
396 }
397 }
398 String::new()
399 }
400 _ => String::new(),
401 }
402}
403
404#[cfg(test)]
405mod tests {
406 use serde_json::json;
407
408 #[test]
409 fn extracts_nested_content_arrays() {
410 let v = json!({
411 "content": [
412 {"type":"output_text","text":"hello"},
413 {"type":"output_text","text":"world"}
414 ]
415 });
416 let text = super::extract_text(&v);
417 assert!(text.contains("hello"));
418 assert!(text.contains("world"));
419 }
420
421 #[test]
422 fn maps_payload_wrapped_user_message() {
423 let v = json!({
424 "timestamp":"2026-02-25T09:51:59.245Z",
425 "type":"response_item",
426 "payload":{"type":"message","role":"user","content":[{"type":"input_text","text":"hello from user"}]}
427 });
428 let ev = super::value_to_event("codex_cli", "s", v);
429 assert_eq!(ev.kind, "user_msg");
430 assert!(ev.text.contains("hello from user"));
431 }
432
433 #[test]
434 fn parses_tandem_v1_sessions_json() {
435 let v = json!({
436 "s-1": {
437 "id":"s-1",
438 "workspace_root":"/tmp/proj",
439 "messages":[
440 {"role":"user","created_at":"2026-02-25T00:00:00Z","parts":[{"type":"text","text":"hello"}]},
441 {"role":"assistant","created_at":"2026-02-25T00:00:01Z","parts":[{"type":"text","text":"world"}]}
442 ]
443 }
444 });
445 let events = super::parse_tandem_v1("tandem_sessions", &v).expect("parse");
446 assert_eq!(events.len(), 2);
447 assert_eq!(events[0].kind, "user_msg");
448 assert_eq!(events[1].kind, "assistant_msg");
449 assert!(events[0].text.contains("hello"));
450 assert!(events[1].text.contains("world"));
451 }
452}