Skip to main content

roder_usage_analytics/
jsonl.rs

1//! Schema-versioned normalized JSONL export (and import for external
2//! aggregation round-trips). Exported rows carry ids, names, timestamps,
3//! durations, status, and counts only — never prompt/output text.
4
5use std::io::Write;
6
7use serde::{Deserialize, Serialize};
8use time::OffsetDateTime;
9use time::format_description::well_known::Rfc3339;
10
11use crate::model::ANALYTICS_JSONL_SCHEMA_VERSION;
12use crate::store::AnalyticsStore;
13
14#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
15#[serde(tag = "kind")]
16pub enum AnalyticsJsonlRecord {
17    #[serde(rename = "tool_call", rename_all = "camelCase")]
18    ToolCall {
19        schema_version: u32,
20        thread_id: String,
21        turn_id: String,
22        tool_id: String,
23        tool_name: Option<String>,
24        started_at: Option<String>,
25        completed_at: Option<String>,
26        duration_ms: Option<i64>,
27        status: String,
28    },
29    #[serde(rename = "token_usage", rename_all = "camelCase")]
30    TokenUsage {
31        schema_version: u32,
32        thread_id: String,
33        turn_id: String,
34        provider: Option<String>,
35        model: Option<String>,
36        recorded_at: String,
37        prompt_tokens: u32,
38        completion_tokens: u32,
39        total_tokens: u32,
40        cached_prompt_tokens: u32,
41    },
42    #[serde(rename = "turn", rename_all = "camelCase")]
43    Turn {
44        schema_version: u32,
45        thread_id: String,
46        turn_id: String,
47        provider: Option<String>,
48        model: Option<String>,
49        started_at: Option<String>,
50        completed_at: Option<String>,
51        status: String,
52        error_kind: Option<String>,
53    },
54}
55
56fn rfc3339(ms: i64) -> String {
57    OffsetDateTime::from_unix_timestamp_nanos(i128::from(ms) * 1_000_000)
58        .unwrap_or(OffsetDateTime::UNIX_EPOCH)
59        .format(&Rfc3339)
60        .unwrap_or_default()
61}
62
63impl AnalyticsStore {
64    /// Streams every turn, tool-call, and token-usage row as normalized
65    /// JSONL into `output`. Returns the record count.
66    pub fn export_jsonl(&self, output: &mut dyn Write) -> anyhow::Result<u64> {
67        let conn = self.conn.lock().unwrap();
68        let mut written = 0_u64;
69
70        let mut statement = conn.prepare(
71            "SELECT thread_id, turn_id, provider, model, started_at_ms, completed_at_ms, status, \
72             error_kind FROM turns ORDER BY thread_id, turn_id",
73        )?;
74        let turns = statement.query_map([], |row| {
75            Ok(AnalyticsJsonlRecord::Turn {
76                schema_version: ANALYTICS_JSONL_SCHEMA_VERSION,
77                thread_id: row.get(0)?,
78                turn_id: row.get(1)?,
79                provider: row.get(2)?,
80                model: row.get(3)?,
81                started_at: row.get::<_, Option<i64>>(4)?.map(rfc3339),
82                completed_at: row.get::<_, Option<i64>>(5)?.map(rfc3339),
83                status: row.get(6)?,
84                error_kind: row.get(7)?,
85            })
86        })?;
87        for record in turns {
88            serde_json::to_writer(&mut *output, &record?)?;
89            output.write_all(b"\n")?;
90            written += 1;
91        }
92
93        let mut statement = conn.prepare(
94            "SELECT thread_id, turn_id, tool_id, tool_name, started_at_ms, completed_at_ms, \
95             duration_ms, status FROM tool_calls ORDER BY thread_id, turn_id, tool_id",
96        )?;
97        let tool_calls = statement.query_map([], |row| {
98            Ok(AnalyticsJsonlRecord::ToolCall {
99                schema_version: ANALYTICS_JSONL_SCHEMA_VERSION,
100                thread_id: row.get(0)?,
101                turn_id: row.get(1)?,
102                tool_id: row.get(2)?,
103                tool_name: row.get(3)?,
104                started_at: row.get::<_, Option<i64>>(4)?.map(rfc3339),
105                completed_at: row.get::<_, Option<i64>>(5)?.map(rfc3339),
106                duration_ms: row.get(6)?,
107                status: row.get(7)?,
108            })
109        })?;
110        for record in tool_calls {
111            serde_json::to_writer(&mut *output, &record?)?;
112            output.write_all(b"\n")?;
113            written += 1;
114        }
115
116        let mut statement = conn.prepare(
117            "SELECT u.thread_id, u.turn_id, tu.provider, tu.model, u.recorded_at_ms, \
118             u.prompt_tokens, u.completion_tokens, u.total_tokens, u.cached_prompt_tokens
119             FROM token_usage u
120             LEFT JOIN turns tu ON tu.thread_id = u.thread_id AND tu.turn_id = u.turn_id
121             ORDER BY u.thread_id, u.turn_id",
122        )?;
123        let usage = statement.query_map([], |row| {
124            Ok(AnalyticsJsonlRecord::TokenUsage {
125                schema_version: ANALYTICS_JSONL_SCHEMA_VERSION,
126                thread_id: row.get(0)?,
127                turn_id: row.get(1)?,
128                provider: row.get(2)?,
129                model: row.get(3)?,
130                recorded_at: rfc3339(row.get(4)?),
131                prompt_tokens: row.get::<_, i64>(5)? as u32,
132                completion_tokens: row.get::<_, i64>(6)? as u32,
133                total_tokens: row.get::<_, i64>(7)? as u32,
134                cached_prompt_tokens: row.get::<_, i64>(8)? as u32,
135            })
136        })?;
137        for record in usage {
138            serde_json::to_writer(&mut *output, &record?)?;
139            output.write_all(b"\n")?;
140            written += 1;
141        }
142        Ok(written)
143    }
144}
145
146#[cfg(test)]
147mod tests {
148    use super::*;
149    use crate::model::{TokenUsageRecord, ToolCallRecord, TurnRecord, WorkspaceLabelMode};
150
151    #[test]
152    fn jsonl_export_is_schema_versioned_and_body_free() {
153        let dir =
154            std::env::temp_dir().join(format!("roder-analytics-jsonl-{}", uuid::Uuid::new_v4()));
155        let store = AnalyticsStore::open(
156            &AnalyticsStore::default_path(&dir),
157            WorkspaceLabelMode::FullPath,
158        )
159        .unwrap();
160        store
161            .upsert_turn(&TurnRecord {
162                thread_id: "t1".into(),
163                turn_id: "u1".into(),
164                provider: Some("mock".into()),
165                model: Some("mock".into()),
166                runtime_profile: None,
167                started_at_ms: Some(1_000),
168                completed_at_ms: Some(2_000),
169                status: "completed".into(),
170                error_kind: None,
171            })
172            .unwrap();
173        store
174            .upsert_tool_call(&ToolCallRecord {
175                thread_id: "t1".into(),
176                turn_id: "u1".into(),
177                tool_id: "call-1".into(),
178                tool_name: Some("read_file".into()),
179                started_at_ms: Some(1_100),
180                completed_at_ms: Some(1_225),
181                duration_ms: Some(125),
182                status: "success".into(),
183                is_error: false,
184            })
185            .unwrap();
186        store
187            .upsert_token_usage(&TokenUsageRecord {
188                thread_id: "t1".into(),
189                turn_id: "u1".into(),
190                provider: None,
191                model: None,
192                recorded_at_ms: 2_000,
193                prompt_tokens: 100,
194                completion_tokens: 20,
195                total_tokens: 120,
196                cached_prompt_tokens: 80,
197            })
198            .unwrap();
199
200        let mut output = Vec::new();
201        let written = store.export_jsonl(&mut output).unwrap();
202        assert_eq!(written, 3);
203        let text = String::from_utf8(output).unwrap();
204        let records: Vec<AnalyticsJsonlRecord> = text
205            .lines()
206            .map(|line| serde_json::from_str(line).unwrap())
207            .collect();
208        assert_eq!(records.len(), 3);
209        assert!(text.contains("\"schemaVersion\":1"));
210        assert!(text.contains("\"kind\":\"tool_call\""));
211        assert!(text.contains("\"durationMs\":125"));
212        // No body-ish keys exist in the export vocabulary.
213        for forbidden in ["output", "arguments", "prompt\"", "text\""] {
214            assert!(!text.contains(forbidden), "export contains {forbidden}");
215        }
216        let _ = std::fs::remove_dir_all(&dir);
217    }
218}