Skip to main content

kaizen/interchange/
jsonl.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Generic JSONL event parser. No filesystem access.
3
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use std::error::Error;
7use std::fmt::{Display, Formatter};
8
9#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
10pub struct JsonlEvent {
11    pub line: usize,
12    pub session_id: String,
13    pub seq: u64,
14    pub ts_ms: u64,
15    pub kind: String,
16    pub source: String,
17    #[serde(default, skip_serializing_if = "Option::is_none")]
18    pub tool: Option<String>,
19    #[serde(default, skip_serializing_if = "Option::is_none")]
20    pub tool_call_id: Option<String>,
21    #[serde(default, skip_serializing_if = "Option::is_none")]
22    pub tokens_in: Option<u32>,
23    #[serde(default, skip_serializing_if = "Option::is_none")]
24    pub tokens_out: Option<u32>,
25    #[serde(default, skip_serializing_if = "Option::is_none")]
26    pub reasoning_tokens: Option<u32>,
27    #[serde(default, skip_serializing_if = "Option::is_none")]
28    pub cost_usd_e6: Option<i64>,
29    #[serde(default, skip_serializing_if = "Option::is_none")]
30    pub cache_creation_tokens: Option<u32>,
31    #[serde(default, skip_serializing_if = "Option::is_none")]
32    pub cache_read_tokens: Option<u32>,
33    #[serde(default)]
34    pub payload: Value,
35}
36
37#[derive(Debug, Clone, PartialEq, Eq)]
38pub struct JsonlParseError {
39    pub line: usize,
40    pub message: String,
41}
42
43pub fn parse_jsonl_events(input: &str) -> Result<Vec<JsonlEvent>, JsonlParseError> {
44    input
45        .lines()
46        .enumerate()
47        .filter_map(|(idx, line)| parse_jsonl_line(idx + 1, line).transpose())
48        .collect()
49}
50
51pub fn parse_jsonl_line(line: usize, raw: &str) -> Result<Option<JsonlEvent>, JsonlParseError> {
52    let trimmed = raw.trim();
53    if trimmed.is_empty() {
54        return Ok(None);
55    }
56    let value = parse_value(line, trimmed)?;
57    Ok(Some(parse_jsonl_value(line, value)?))
58}
59
60pub fn parse_jsonl_value(line: usize, value: Value) -> Result<JsonlEvent, JsonlParseError> {
61    let payload = value.get("event").cloned().unwrap_or(value);
62    event_from_value(line, payload)
63}
64
65fn event_from_value(line: usize, value: Value) -> Result<JsonlEvent, JsonlParseError> {
66    Ok(JsonlEvent {
67        line,
68        session_id: req_str(&value, line, &["session_id"], "session_id")?,
69        seq: req_u64(&value, line, &["seq", "sequence", "event_seq"], "seq")?,
70        ts_ms: req_u64(&value, line, &["ts_ms", "timestamp_ms"], "ts_ms")?,
71        kind: req_str(&value, line, &["kind", "type", "event_type"], "kind")?,
72        source: opt_str(&value, &["source"]).unwrap_or_else(|| "jsonl".into()),
73        tool: opt_str(&value, &["tool", "tool_name"]),
74        tool_call_id: opt_str(&value, &["tool_call_id", "call_id"]),
75        tokens_in: opt_u32(&value, &["tokens_in", "input_tokens"]),
76        tokens_out: opt_u32(&value, &["tokens_out", "output_tokens"]),
77        reasoning_tokens: opt_u32(&value, &["reasoning_tokens"]),
78        cost_usd_e6: opt_i64(&value, &["cost_usd_e6"]),
79        cache_creation_tokens: opt_u32(&value, &["cache_creation_tokens"]),
80        cache_read_tokens: opt_u32(&value, &["cache_read_tokens"]),
81        payload: payload(&value),
82    })
83}
84
85fn parse_value(line: usize, raw: &str) -> Result<Value, JsonlParseError> {
86    serde_json::from_str(raw).map_err(|err| JsonlParseError {
87        line,
88        message: err.to_string(),
89    })
90}
91
92fn req_str(v: &Value, line: usize, keys: &[&str], name: &str) -> Result<String, JsonlParseError> {
93    opt_str(v, keys).ok_or_else(|| JsonlParseError::missing(line, name))
94}
95
96fn req_u64(v: &Value, line: usize, keys: &[&str], name: &str) -> Result<u64, JsonlParseError> {
97    field(v, keys)
98        .and_then(Value::as_u64)
99        .ok_or_else(|| JsonlParseError::missing(line, name))
100}
101
102fn opt_str(v: &Value, keys: &[&str]) -> Option<String> {
103    field(v, keys)
104        .and_then(Value::as_str)
105        .filter(|s| !s.is_empty())
106        .map(str::to_owned)
107}
108
109fn opt_u32(v: &Value, keys: &[&str]) -> Option<u32> {
110    field(v, keys)?.as_u64()?.try_into().ok()
111}
112
113fn opt_i64(v: &Value, keys: &[&str]) -> Option<i64> {
114    field(v, keys)?.as_i64()
115}
116
117fn field<'a>(v: &'a Value, keys: &[&str]) -> Option<&'a Value> {
118    keys.iter().find_map(|key| v.get(*key))
119}
120
121fn payload(v: &Value) -> Value {
122    v.get("payload").cloned().unwrap_or_else(|| v.clone())
123}
124
125impl JsonlParseError {
126    fn missing(line: usize, name: &str) -> Self {
127        Self {
128            line,
129            message: format!("missing JSONL event field: {name}"),
130        }
131    }
132}
133
134impl Display for JsonlParseError {
135    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
136        write!(f, "line {}: {}", self.line, self.message)
137    }
138}
139
140impl Error for JsonlParseError {}