Skip to main content

trace_share_core/
parser.rs

1use anyhow::Result;
2use chrono::{DateTime, Utc};
3use serde_json::Value;
4use std::{
5    fs::File,
6    io::{BufRead, BufReader, Seek, SeekFrom},
7    path::Path,
8};
9
10use crate::models::{CanonicalEvent, EventMeta, ToolInfo};
11
12pub fn parse_jsonl_file(path: &Path, source: &str) -> Result<Vec<CanonicalEvent>> {
13    let (events, _) = parse_jsonl_file_from_offset(path, source, 0)?;
14    Ok(events)
15}
16
17pub fn parse_source_file(
18    path: &Path,
19    source: &str,
20    format: &str,
21    parser_hint: Option<&str>,
22) -> Result<Vec<CanonicalEvent>> {
23    match format {
24        "jsonl" => parse_jsonl_file(path, source),
25        "json" => parse_json_file(path, source, parser_hint),
26        "mixed" => parse_mixed_file(path, source, parser_hint),
27        other => anyhow::bail!("unsupported source format: {other}"),
28    }
29}
30
31pub fn parse_jsonl_file_from_offset(
32    path: &Path,
33    source: &str,
34    start_offset: u64,
35) -> Result<(Vec<CanonicalEvent>, u64)> {
36    let file = File::open(path)?;
37    let mut reader = BufReader::new(file);
38    reader.seek(SeekFrom::Start(start_offset))?;
39    let session_id = path
40        .file_stem()
41        .and_then(|s| s.to_str())
42        .unwrap_or("unknown-session")
43        .to_string();
44
45    let mut out = Vec::new();
46    let mut next_offset = start_offset;
47    let mut line = String::new();
48
49    loop {
50        line.clear();
51        let bytes_read = reader.read_line(&mut line)?;
52        if bytes_read == 0 {
53            break;
54        }
55        next_offset += bytes_read as u64;
56        let line = line.trim_end_matches(['\n', '\r']).to_string();
57        if line.trim().is_empty() {
58            continue;
59        }
60
61        let value = match serde_json::from_str::<Value>(&line) {
62            Ok(v) => v,
63            Err(_) => {
64                out.push(fallback_event(source, &session_id, line));
65                continue;
66            }
67        };
68
69        out.push(value_to_event(source, &session_id, value));
70    }
71
72    Ok((out, next_offset))
73}
74
75fn value_to_event(source: &str, session_id: &str, v: Value) -> CanonicalEvent {
76    let ts = extract_ts(&v).unwrap_or_else(Utc::now);
77    let payload = v.get("payload").unwrap_or(&v);
78    let top_type = v.get("type").and_then(Value::as_str).unwrap_or_default();
79    let kind = infer_kind(top_type, payload);
80    let text = {
81        let from_payload = extract_text(payload);
82        if !from_payload.trim().is_empty() {
83            from_payload
84        } else {
85            extract_text(&v)
86        }
87    };
88
89    let tool_name = payload
90        .get("tool")
91        .and_then(|t| t.get("name").or(Some(t)))
92        .or_else(|| payload.get("name"))
93        .and_then(Value::as_str)
94        .map(str::to_string);
95
96    let tool = tool_name.map(|name| ToolInfo {
97        name,
98        args_json: payload.get("args").map(|a| a.to_string()),
99        result_json: payload.get("result").map(|r| r.to_string()),
100    });
101
102    CanonicalEvent {
103        source: source.to_string(),
104        session_id: session_id.to_string(),
105        ts,
106        kind,
107        text,
108        tool,
109        meta: Some(EventMeta {
110            cwd: payload
111                .get("cwd")
112                .or_else(|| v.get("cwd"))
113                .and_then(Value::as_str)
114                .map(str::to_string),
115            repo: payload
116                .get("repo")
117                .or_else(|| v.get("repo"))
118                .and_then(Value::as_str)
119                .map(str::to_string),
120            exit_code: payload
121                .get("exit_code")
122                .or_else(|| v.get("exit_code"))
123                .and_then(Value::as_i64)
124                .map(|n| n as i32),
125            model: payload
126                .get("model")
127                .or_else(|| v.get("model"))
128                .and_then(Value::as_str)
129                .map(str::to_string),
130            tags: Vec::new(),
131        }),
132    }
133}
134
135fn fallback_event(source: &str, session_id: &str, text: String) -> CanonicalEvent {
136    CanonicalEvent {
137        source: source.to_string(),
138        session_id: session_id.to_string(),
139        ts: Utc::now(),
140        kind: "system".to_string(),
141        text,
142        tool: None,
143        meta: None,
144    }
145}
146
147fn extract_ts(v: &Value) -> Option<DateTime<Utc>> {
148    let candidate = v
149        .get("ts")
150        .or_else(|| v.get("timestamp"))
151        .or_else(|| v.get("time"))
152        .and_then(Value::as_str)?;
153
154    DateTime::parse_from_rfc3339(candidate)
155        .ok()
156        .map(|dt| dt.with_timezone(&Utc))
157}
158
159fn parse_json_file(
160    path: &Path,
161    source: &str,
162    parser_hint: Option<&str>,
163) -> Result<Vec<CanonicalEvent>> {
164    let value = read_json_with_retry(path)?;
165    if parser_hint == Some("tandem_v1") {
166        return parse_tandem_v1(source, &value);
167    }
168    Ok(vec![value_to_event(
169        source,
170        path.file_stem()
171            .and_then(|s| s.to_str())
172            .unwrap_or("unknown-session"),
173        value,
174    )])
175}
176
177fn parse_mixed_file(
178    path: &Path,
179    source: &str,
180    parser_hint: Option<&str>,
181) -> Result<Vec<CanonicalEvent>> {
182    if let Some(ext) = path.extension().and_then(|e| e.to_str()) {
183        let ext = ext.to_ascii_lowercase();
184        if ext == "jsonl" || ext == "ndjson" {
185            return parse_jsonl_file(path, source);
186        }
187        if ext == "json" {
188            return parse_json_file(path, source, parser_hint);
189        }
190    }
191
192    parse_json_file(path, source, parser_hint).or_else(|_| parse_jsonl_file(path, source))
193}
194
195fn read_json_with_retry(path: &Path) -> Result<Value> {
196    let mut last_err: Option<serde_json::Error> = None;
197    for attempt in 0..3 {
198        let text = std::fs::read_to_string(path)?;
199        match serde_json::from_str::<Value>(&text) {
200            Ok(v) => return Ok(v),
201            Err(e) => {
202                if e.is_eof() && attempt < 2 {
203                    std::thread::sleep(std::time::Duration::from_millis(50));
204                    last_err = Some(e);
205                    continue;
206                }
207                return Err(e.into());
208            }
209        }
210    }
211    match last_err {
212        Some(e) => Err(e.into()),
213        None => anyhow::bail!("failed to parse json file"),
214    }
215}
216
217fn parse_tandem_v1(source: &str, root: &Value) -> Result<Vec<CanonicalEvent>> {
218    let mut out = Vec::new();
219    let Some(map) = root.as_object() else {
220        return Ok(out);
221    };
222
223    for (session_key, session) in map {
224        let session_id = session
225            .get("id")
226            .and_then(Value::as_str)
227            .unwrap_or(session_key)
228            .to_string();
229        let cwd = session
230            .get("workspace_root")
231            .or_else(|| session.get("directory"))
232            .and_then(Value::as_str)
233            .map(str::to_string);
234
235        let messages = session
236            .get("messages")
237            .and_then(Value::as_array)
238            .cloned()
239            .unwrap_or_default();
240
241        for msg in messages {
242            let role = msg.get("role").and_then(Value::as_str).unwrap_or("system");
243            let kind = match role {
244                "user" => "user_msg",
245                "assistant" => "assistant_msg",
246                _ => "system",
247            }
248            .to_string();
249
250            let ts = msg
251                .get("created_at")
252                .and_then(Value::as_str)
253                .and_then(parse_rfc3339)
254                .or_else(|| {
255                    session
256                        .get("time")
257                        .and_then(|t| t.get("updated").or_else(|| t.get("created")))
258                        .and_then(Value::as_str)
259                        .and_then(parse_rfc3339)
260                })
261                .unwrap_or_else(Utc::now);
262
263            let text = extract_tandem_message_text(&msg);
264            if text.trim().is_empty() {
265                continue;
266            }
267
268            out.push(CanonicalEvent {
269                source: source.to_string(),
270                session_id: session_id.clone(),
271                ts,
272                kind,
273                text,
274                tool: None,
275                meta: Some(EventMeta {
276                    cwd: cwd.clone(),
277                    repo: None,
278                    exit_code: None,
279                    model: session
280                        .get("model")
281                        .and_then(Value::as_str)
282                        .map(str::to_string),
283                    tags: vec!["tandem_v1".to_string()],
284                }),
285            });
286        }
287    }
288
289    Ok(out)
290}
291
292fn extract_tandem_message_text(msg: &Value) -> String {
293    if let Some(parts) = msg.get("parts").and_then(Value::as_array) {
294        let joined = parts
295            .iter()
296            .filter_map(|p| {
297                let ptype = p.get("type").and_then(Value::as_str).unwrap_or("");
298                if ptype == "text" || ptype == "input_text" || ptype == "output_text" {
299                    return p.get("text").and_then(Value::as_str).map(str::to_string);
300                }
301                None
302            })
303            .collect::<Vec<_>>()
304            .join("\n");
305        if !joined.trim().is_empty() {
306            return joined;
307        }
308    }
309    extract_text(msg)
310}
311
312fn parse_rfc3339(s: &str) -> Option<DateTime<Utc>> {
313    DateTime::parse_from_rfc3339(s)
314        .ok()
315        .map(|dt| dt.with_timezone(&Utc))
316}
317
318fn extract_text(v: &Value) -> String {
319    for key in [
320        "text",
321        "message",
322        "content",
323        "delta",
324        "output_text",
325        "input",
326    ] {
327        if let Some(raw) = v.get(key) {
328            let value = flatten_text(raw, 0);
329            if !value.trim().is_empty() {
330                return value;
331            }
332        }
333    }
334    if let Some(item) = v.get("item") {
335        let value = flatten_text(item, 0);
336        if !value.trim().is_empty() {
337            return value;
338        }
339    }
340    String::new()
341}
342
343fn infer_kind(top_type: &str, payload: &Value) -> String {
344    if top_type == "response_item" && payload.get("type").and_then(Value::as_str) == Some("message")
345    {
346        return match payload.get("role").and_then(Value::as_str) {
347            Some("user") => "user_msg".to_string(),
348            Some("assistant") => "assistant_msg".to_string(),
349            _ => "system".to_string(),
350        };
351    }
352    if top_type == "event_msg" {
353        return match payload.get("type").and_then(Value::as_str) {
354            Some("user_message") => "user_msg".to_string(),
355            Some("tool_call") => "tool_call".to_string(),
356            Some("tool_result") => "tool_result".to_string(),
357            Some("error") => "error".to_string(),
358            _ => "system".to_string(),
359        };
360    }
361    if let Some(kind) = payload
362        .get("kind")
363        .or_else(|| payload.get("type"))
364        .and_then(Value::as_str)
365    {
366        return kind.to_string();
367    }
368    if top_type.is_empty() {
369        "system".to_string()
370    } else {
371        top_type.to_string()
372    }
373}
374
375fn flatten_text(v: &Value, depth: usize) -> String {
376    if depth > 5 {
377        return String::new();
378    }
379    match v {
380        Value::String(s) => s.clone(),
381        Value::Array(items) => {
382            let parts = items
383                .iter()
384                .map(|item| flatten_text(item, depth + 1))
385                .filter(|s| !s.trim().is_empty())
386                .collect::<Vec<_>>();
387            parts.join(" ")
388        }
389        Value::Object(map) => {
390            for key in ["text", "content", "value", "output_text", "message"] {
391                if let Some(raw) = map.get(key) {
392                    let value = flatten_text(raw, depth + 1);
393                    if !value.trim().is_empty() {
394                        return value;
395                    }
396                }
397            }
398            String::new()
399        }
400        _ => String::new(),
401    }
402}
403
404#[cfg(test)]
405mod tests {
406    use serde_json::json;
407
408    #[test]
409    fn extracts_nested_content_arrays() {
410        let v = json!({
411            "content": [
412                {"type":"output_text","text":"hello"},
413                {"type":"output_text","text":"world"}
414            ]
415        });
416        let text = super::extract_text(&v);
417        assert!(text.contains("hello"));
418        assert!(text.contains("world"));
419    }
420
421    #[test]
422    fn maps_payload_wrapped_user_message() {
423        let v = json!({
424            "timestamp":"2026-02-25T09:51:59.245Z",
425            "type":"response_item",
426            "payload":{"type":"message","role":"user","content":[{"type":"input_text","text":"hello from user"}]}
427        });
428        let ev = super::value_to_event("codex_cli", "s", v);
429        assert_eq!(ev.kind, "user_msg");
430        assert!(ev.text.contains("hello from user"));
431    }
432
433    #[test]
434    fn parses_tandem_v1_sessions_json() {
435        let v = json!({
436            "s-1": {
437                "id":"s-1",
438                "workspace_root":"/tmp/proj",
439                "messages":[
440                    {"role":"user","created_at":"2026-02-25T00:00:00Z","parts":[{"type":"text","text":"hello"}]},
441                    {"role":"assistant","created_at":"2026-02-25T00:00:01Z","parts":[{"type":"text","text":"world"}]}
442                ]
443            }
444        });
445        let events = super::parse_tandem_v1("tandem_sessions", &v).expect("parse");
446        assert_eq!(events.len(), 2);
447        assert_eq!(events[0].kind, "user_msg");
448        assert_eq!(events[1].kind, "assistant_msg");
449        assert!(events[0].text.contains("hello"));
450        assert!(events[1].text.contains("world"));
451    }
452}