1use std::io::Write;
6
7use serde::{Deserialize, Serialize};
8use time::OffsetDateTime;
9use time::format_description::well_known::Rfc3339;
10
11use crate::model::ANALYTICS_JSONL_SCHEMA_VERSION;
12use crate::store::AnalyticsStore;
13
14#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
15#[serde(tag = "kind")]
16pub enum AnalyticsJsonlRecord {
17 #[serde(rename = "tool_call", rename_all = "camelCase")]
18 ToolCall {
19 schema_version: u32,
20 thread_id: String,
21 turn_id: String,
22 tool_id: String,
23 tool_name: Option<String>,
24 started_at: Option<String>,
25 completed_at: Option<String>,
26 duration_ms: Option<i64>,
27 status: String,
28 },
29 #[serde(rename = "token_usage", rename_all = "camelCase")]
30 TokenUsage {
31 schema_version: u32,
32 thread_id: String,
33 turn_id: String,
34 provider: Option<String>,
35 model: Option<String>,
36 recorded_at: String,
37 prompt_tokens: u32,
38 completion_tokens: u32,
39 total_tokens: u32,
40 cached_prompt_tokens: u32,
41 },
42 #[serde(rename = "turn", rename_all = "camelCase")]
43 Turn {
44 schema_version: u32,
45 thread_id: String,
46 turn_id: String,
47 provider: Option<String>,
48 model: Option<String>,
49 started_at: Option<String>,
50 completed_at: Option<String>,
51 status: String,
52 error_kind: Option<String>,
53 },
54}
55
56fn rfc3339(ms: i64) -> String {
57 OffsetDateTime::from_unix_timestamp_nanos(i128::from(ms) * 1_000_000)
58 .unwrap_or(OffsetDateTime::UNIX_EPOCH)
59 .format(&Rfc3339)
60 .unwrap_or_default()
61}
62
63impl AnalyticsStore {
64 pub fn export_jsonl(&self, output: &mut dyn Write) -> anyhow::Result<u64> {
67 let conn = self.conn.lock().unwrap();
68 let mut written = 0_u64;
69
70 let mut statement = conn.prepare(
71 "SELECT thread_id, turn_id, provider, model, started_at_ms, completed_at_ms, status, \
72 error_kind FROM turns ORDER BY thread_id, turn_id",
73 )?;
74 let turns = statement.query_map([], |row| {
75 Ok(AnalyticsJsonlRecord::Turn {
76 schema_version: ANALYTICS_JSONL_SCHEMA_VERSION,
77 thread_id: row.get(0)?,
78 turn_id: row.get(1)?,
79 provider: row.get(2)?,
80 model: row.get(3)?,
81 started_at: row.get::<_, Option<i64>>(4)?.map(rfc3339),
82 completed_at: row.get::<_, Option<i64>>(5)?.map(rfc3339),
83 status: row.get(6)?,
84 error_kind: row.get(7)?,
85 })
86 })?;
87 for record in turns {
88 serde_json::to_writer(&mut *output, &record?)?;
89 output.write_all(b"\n")?;
90 written += 1;
91 }
92
93 let mut statement = conn.prepare(
94 "SELECT thread_id, turn_id, tool_id, tool_name, started_at_ms, completed_at_ms, \
95 duration_ms, status FROM tool_calls ORDER BY thread_id, turn_id, tool_id",
96 )?;
97 let tool_calls = statement.query_map([], |row| {
98 Ok(AnalyticsJsonlRecord::ToolCall {
99 schema_version: ANALYTICS_JSONL_SCHEMA_VERSION,
100 thread_id: row.get(0)?,
101 turn_id: row.get(1)?,
102 tool_id: row.get(2)?,
103 tool_name: row.get(3)?,
104 started_at: row.get::<_, Option<i64>>(4)?.map(rfc3339),
105 completed_at: row.get::<_, Option<i64>>(5)?.map(rfc3339),
106 duration_ms: row.get(6)?,
107 status: row.get(7)?,
108 })
109 })?;
110 for record in tool_calls {
111 serde_json::to_writer(&mut *output, &record?)?;
112 output.write_all(b"\n")?;
113 written += 1;
114 }
115
116 let mut statement = conn.prepare(
117 "SELECT u.thread_id, u.turn_id, tu.provider, tu.model, u.recorded_at_ms, \
118 u.prompt_tokens, u.completion_tokens, u.total_tokens, u.cached_prompt_tokens
119 FROM token_usage u
120 LEFT JOIN turns tu ON tu.thread_id = u.thread_id AND tu.turn_id = u.turn_id
121 ORDER BY u.thread_id, u.turn_id",
122 )?;
123 let usage = statement.query_map([], |row| {
124 Ok(AnalyticsJsonlRecord::TokenUsage {
125 schema_version: ANALYTICS_JSONL_SCHEMA_VERSION,
126 thread_id: row.get(0)?,
127 turn_id: row.get(1)?,
128 provider: row.get(2)?,
129 model: row.get(3)?,
130 recorded_at: rfc3339(row.get(4)?),
131 prompt_tokens: row.get::<_, i64>(5)? as u32,
132 completion_tokens: row.get::<_, i64>(6)? as u32,
133 total_tokens: row.get::<_, i64>(7)? as u32,
134 cached_prompt_tokens: row.get::<_, i64>(8)? as u32,
135 })
136 })?;
137 for record in usage {
138 serde_json::to_writer(&mut *output, &record?)?;
139 output.write_all(b"\n")?;
140 written += 1;
141 }
142 Ok(written)
143 }
144}
145
146#[cfg(test)]
147mod tests {
148 use super::*;
149 use crate::model::{TokenUsageRecord, ToolCallRecord, TurnRecord, WorkspaceLabelMode};
150
151 #[test]
152 fn jsonl_export_is_schema_versioned_and_body_free() {
153 let dir =
154 std::env::temp_dir().join(format!("roder-analytics-jsonl-{}", uuid::Uuid::new_v4()));
155 let store = AnalyticsStore::open(
156 &AnalyticsStore::default_path(&dir),
157 WorkspaceLabelMode::FullPath,
158 )
159 .unwrap();
160 store
161 .upsert_turn(&TurnRecord {
162 thread_id: "t1".into(),
163 turn_id: "u1".into(),
164 provider: Some("mock".into()),
165 model: Some("mock".into()),
166 runtime_profile: None,
167 started_at_ms: Some(1_000),
168 completed_at_ms: Some(2_000),
169 status: "completed".into(),
170 error_kind: None,
171 })
172 .unwrap();
173 store
174 .upsert_tool_call(&ToolCallRecord {
175 thread_id: "t1".into(),
176 turn_id: "u1".into(),
177 tool_id: "call-1".into(),
178 tool_name: Some("read_file".into()),
179 started_at_ms: Some(1_100),
180 completed_at_ms: Some(1_225),
181 duration_ms: Some(125),
182 status: "success".into(),
183 is_error: false,
184 })
185 .unwrap();
186 store
187 .upsert_token_usage(&TokenUsageRecord {
188 thread_id: "t1".into(),
189 turn_id: "u1".into(),
190 provider: None,
191 model: None,
192 recorded_at_ms: 2_000,
193 prompt_tokens: 100,
194 completion_tokens: 20,
195 total_tokens: 120,
196 cached_prompt_tokens: 80,
197 })
198 .unwrap();
199
200 let mut output = Vec::new();
201 let written = store.export_jsonl(&mut output).unwrap();
202 assert_eq!(written, 3);
203 let text = String::from_utf8(output).unwrap();
204 let records: Vec<AnalyticsJsonlRecord> = text
205 .lines()
206 .map(|line| serde_json::from_str(line).unwrap())
207 .collect();
208 assert_eq!(records.len(), 3);
209 assert!(text.contains("\"schemaVersion\":1"));
210 assert!(text.contains("\"kind\":\"tool_call\""));
211 assert!(text.contains("\"durationMs\":125"));
212 for forbidden in ["output", "arguments", "prompt\"", "text\""] {
214 assert!(!text.contains(forbidden), "export contains {forbidden}");
215 }
216 let _ = std::fs::remove_dir_all(&dir);
217 }
218}