1use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
4use crate::interchange::jsonl::{JsonlEvent, parse_jsonl_value};
5use crate::store::Store;
6use anyhow::{Context, Result};
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::path::Path;
10
11#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
12pub struct JsonlImportReport {
13 pub imported_events: u64,
14 pub sessions_created: u64,
15}
16
17pub fn import_file(store: &Store, path: &Path, workspace: &str) -> Result<JsonlImportReport> {
18 let text = std::fs::read_to_string(path)?;
19 text.lines()
20 .enumerate()
21 .filter(|(_, line)| !line.trim().is_empty())
22 .try_fold(JsonlImportReport::default(), |mut report, (idx, line)| {
23 import_line(store, workspace, idx + 1, line, &mut report)?;
24 Ok(report)
25 })
26}
27
28fn import_line(
29 store: &Store,
30 workspace: &str,
31 line_no: usize,
32 line: &str,
33 report: &mut JsonlImportReport,
34) -> Result<()> {
35 let event = parse_event(line).with_context(|| format!("line {line_no}"))?;
36 ensure_session(store, workspace, &event, report)?;
37 store.append_event(&event)?;
38 report.imported_events += 1;
39 Ok(())
40}
41
42fn parse_event(line: &str) -> Result<Event> {
43 let value: Value = serde_json::from_str(line)?;
44 serde_json::from_value(value.clone()).or_else(|_| generic_event(value))
45}
46
47fn generic_event(value: Value) -> Result<Event> {
48 serde_json::from_value(value.get("event").cloned().unwrap_or_else(|| value.clone())).or_else(
49 |_| {
50 parse_jsonl_value(0, value)
51 .map(event_from_generic)
52 .map_err(Into::into)
53 },
54 )
55}
56
57fn event_from_generic(row: JsonlEvent) -> Event {
58 Event {
59 session_id: row.session_id,
60 seq: row.seq,
61 ts_ms: row.ts_ms,
62 ts_exact: true,
63 kind: kind(&row.kind),
64 source: source(&row.source),
65 tool: row.tool,
66 tool_call_id: row.tool_call_id,
67 tokens_in: row.tokens_in,
68 tokens_out: row.tokens_out,
69 reasoning_tokens: row.reasoning_tokens,
70 cost_usd_e6: row.cost_usd_e6,
71 stop_reason: None,
72 latency_ms: None,
73 ttft_ms: None,
74 retry_count: None,
75 context_used_tokens: None,
76 context_max_tokens: None,
77 cache_creation_tokens: row.cache_creation_tokens,
78 cache_read_tokens: row.cache_read_tokens,
79 system_prompt_tokens: None,
80 payload: row.payload,
81 }
82}
83
84fn kind(value: &str) -> EventKind {
85 match value.to_ascii_lowercase().replace('-', "_").as_str() {
86 "tool_call" | "toolcall" => EventKind::ToolCall,
87 "tool_result" | "toolresult" => EventKind::ToolResult,
88 "error" => EventKind::Error,
89 "cost" => EventKind::Cost,
90 "hook" => EventKind::Hook,
91 "lifecycle" => EventKind::Lifecycle,
92 _ => EventKind::Message,
93 }
94}
95
96fn source(value: &str) -> EventSource {
97 match value.to_ascii_lowercase().as_str() {
98 "hook" => EventSource::Hook,
99 "proxy" => EventSource::Proxy,
100 _ => EventSource::Tail,
101 }
102}
103
104fn ensure_session(
105 store: &Store,
106 workspace: &str,
107 event: &Event,
108 report: &mut JsonlImportReport,
109) -> Result<()> {
110 if store.get_session(&event.session_id)?.is_some() {
111 return Ok(());
112 }
113 store.upsert_session(&default_session(event, workspace))?;
114 report.sessions_created += 1;
115 Ok(())
116}
117
118fn default_session(event: &Event, workspace: &str) -> SessionRecord {
119 SessionRecord {
120 id: event.session_id.clone(),
121 agent: "jsonl".into(),
122 model: None,
123 workspace: workspace.into(),
124 started_at_ms: event.ts_ms,
125 ended_at_ms: None,
126 status: SessionStatus::Running,
127 trace_path: "jsonl".into(),
128 start_commit: None,
129 end_commit: None,
130 branch: None,
131 dirty_start: None,
132 dirty_end: None,
133 repo_binding_source: None,
134 prompt_fingerprint: None,
135 parent_session_id: None,
136 agent_version: None,
137 os: None,
138 arch: None,
139 repo_file_count: None,
140 repo_total_loc: None,
141 }
142}