Skip to main content

kaizen/extensions/
jsonl.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2
3use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
4use crate::interchange::jsonl::{JsonlEvent, parse_jsonl_value};
5use crate::store::Store;
6use anyhow::{Context, Result};
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::path::Path;
10
11#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
12pub struct JsonlImportReport {
13    pub imported_events: u64,
14    pub sessions_created: u64,
15}
16
17pub fn import_file(store: &Store, path: &Path, workspace: &str) -> Result<JsonlImportReport> {
18    let text = std::fs::read_to_string(path)?;
19    text.lines()
20        .enumerate()
21        .filter(|(_, line)| !line.trim().is_empty())
22        .try_fold(JsonlImportReport::default(), |mut report, (idx, line)| {
23            import_line(store, workspace, idx + 1, line, &mut report)?;
24            Ok(report)
25        })
26}
27
28fn import_line(
29    store: &Store,
30    workspace: &str,
31    line_no: usize,
32    line: &str,
33    report: &mut JsonlImportReport,
34) -> Result<()> {
35    let event = parse_event(line).with_context(|| format!("line {line_no}"))?;
36    ensure_session(store, workspace, &event, report)?;
37    store.append_event(&event)?;
38    report.imported_events += 1;
39    Ok(())
40}
41
42fn parse_event(line: &str) -> Result<Event> {
43    let value: Value = serde_json::from_str(line)?;
44    serde_json::from_value(value.clone()).or_else(|_| generic_event(value))
45}
46
47fn generic_event(value: Value) -> Result<Event> {
48    serde_json::from_value(value.get("event").cloned().unwrap_or_else(|| value.clone())).or_else(
49        |_| {
50            parse_jsonl_value(0, value)
51                .map(event_from_generic)
52                .map_err(Into::into)
53        },
54    )
55}
56
57fn event_from_generic(row: JsonlEvent) -> Event {
58    Event {
59        session_id: row.session_id,
60        seq: row.seq,
61        ts_ms: row.ts_ms,
62        ts_exact: true,
63        kind: kind(&row.kind),
64        source: source(&row.source),
65        tool: row.tool,
66        tool_call_id: row.tool_call_id,
67        tokens_in: row.tokens_in,
68        tokens_out: row.tokens_out,
69        reasoning_tokens: row.reasoning_tokens,
70        cost_usd_e6: row.cost_usd_e6,
71        stop_reason: None,
72        latency_ms: None,
73        ttft_ms: None,
74        retry_count: None,
75        context_used_tokens: None,
76        context_max_tokens: None,
77        cache_creation_tokens: row.cache_creation_tokens,
78        cache_read_tokens: row.cache_read_tokens,
79        system_prompt_tokens: None,
80        payload: row.payload,
81    }
82}
83
84fn kind(value: &str) -> EventKind {
85    match value.to_ascii_lowercase().replace('-', "_").as_str() {
86        "tool_call" | "toolcall" => EventKind::ToolCall,
87        "tool_result" | "toolresult" => EventKind::ToolResult,
88        "error" => EventKind::Error,
89        "cost" => EventKind::Cost,
90        "hook" => EventKind::Hook,
91        "lifecycle" => EventKind::Lifecycle,
92        _ => EventKind::Message,
93    }
94}
95
96fn source(value: &str) -> EventSource {
97    match value.to_ascii_lowercase().as_str() {
98        "hook" => EventSource::Hook,
99        "proxy" => EventSource::Proxy,
100        _ => EventSource::Tail,
101    }
102}
103
104fn ensure_session(
105    store: &Store,
106    workspace: &str,
107    event: &Event,
108    report: &mut JsonlImportReport,
109) -> Result<()> {
110    if store.get_session(&event.session_id)?.is_some() {
111        return Ok(());
112    }
113    store.upsert_session(&default_session(event, workspace))?;
114    report.sessions_created += 1;
115    Ok(())
116}
117
118fn default_session(event: &Event, workspace: &str) -> SessionRecord {
119    SessionRecord {
120        id: event.session_id.clone(),
121        agent: "jsonl".into(),
122        model: None,
123        workspace: workspace.into(),
124        started_at_ms: event.ts_ms,
125        ended_at_ms: None,
126        status: SessionStatus::Running,
127        trace_path: "jsonl".into(),
128        start_commit: None,
129        end_commit: None,
130        branch: None,
131        dirty_start: None,
132        dirty_end: None,
133        repo_binding_source: None,
134        prompt_fingerprint: None,
135        parent_session_id: None,
136        agent_version: None,
137        os: None,
138        arch: None,
139        repo_file_count: None,
140        repo_total_loc: None,
141    }
142}