1use std::path::{Path, PathBuf};
4use std::sync::Mutex;
5
6use anyhow::Context;
7use rusqlite::{Connection, params};
8
9use crate::model::{
10 SessionRecord, TokenUsageRecord, ToolCallRecord, TurnRecord, WorkspaceLabelMode,
11};
12
13pub(crate) fn now_ms() -> i64 {
14 (time::OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000) as i64
15}
16
17pub struct AnalyticsStore {
18 pub(crate) conn: Mutex<Connection>,
19 path: PathBuf,
20 pub workspace_label_mode: WorkspaceLabelMode,
21}
22
23impl AnalyticsStore {
24 pub fn open(path: &Path, workspace_label_mode: WorkspaceLabelMode) -> anyhow::Result<Self> {
27 if let Some(parent) = path.parent() {
28 std::fs::create_dir_all(parent)
29 .with_context(|| format!("create analytics dir {}", parent.display()))?;
30 }
31 let conn = Connection::open(path)
32 .with_context(|| format!("open analytics database {}", path.display()))?;
33 conn.pragma_update(None, "journal_mode", "WAL")?;
34 conn.pragma_update(None, "synchronous", "NORMAL")?;
35 crate::schema::apply_migrations(&conn)?;
36 Ok(Self {
37 conn: Mutex::new(conn),
38 path: path.to_path_buf(),
39 workspace_label_mode,
40 })
41 }
42
43 pub fn default_path(data_dir: &Path) -> PathBuf {
45 data_dir.join("analytics/usage.sqlite3")
46 }
47
48 pub fn path(&self) -> &Path {
49 &self.path
50 }
51
52 pub fn upsert_session(&self, record: &SessionRecord) -> anyhow::Result<()> {
55 let conn = self.conn.lock().unwrap();
56 conn.execute(
57 "INSERT INTO sessions (thread_id, workspace_key, workspace_label, provider, model, \
58 created_at_ms, updated_at_ms)
59 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
60 ON CONFLICT(thread_id) DO UPDATE SET
61 workspace_key = COALESCE(excluded.workspace_key, sessions.workspace_key),
62 workspace_label = COALESCE(excluded.workspace_label, sessions.workspace_label),
63 provider = COALESCE(excluded.provider, sessions.provider),
64 model = COALESCE(excluded.model, sessions.model),
65 created_at_ms = MIN(sessions.created_at_ms, excluded.created_at_ms),
66 updated_at_ms = MAX(sessions.updated_at_ms, excluded.updated_at_ms)",
67 params![
68 record.thread_id,
69 record.workspace_key,
70 record.workspace_label,
71 record.provider,
72 record.model,
73 record.created_at_ms,
74 record.updated_at_ms,
75 ],
76 )?;
77 Ok(())
78 }
79
80 pub fn upsert_turn(&self, record: &TurnRecord) -> anyhow::Result<()> {
83 let conn = self.conn.lock().unwrap();
84 conn.execute(
85 "INSERT INTO turns (thread_id, turn_id, provider, model, runtime_profile, \
86 started_at_ms, completed_at_ms, status, error_kind)
87 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
88 ON CONFLICT(thread_id, turn_id) DO UPDATE SET
89 provider = COALESCE(excluded.provider, turns.provider),
90 model = COALESCE(excluded.model, turns.model),
91 runtime_profile = COALESCE(excluded.runtime_profile, turns.runtime_profile),
92 started_at_ms = COALESCE(turns.started_at_ms, excluded.started_at_ms),
93 completed_at_ms = COALESCE(excluded.completed_at_ms, turns.completed_at_ms),
94 status = CASE
95 WHEN turns.status IN ('completed', 'failed') AND excluded.status = 'running'
96 THEN turns.status
97 ELSE excluded.status
98 END,
99 error_kind = COALESCE(excluded.error_kind, turns.error_kind)",
100 params![
101 record.thread_id,
102 record.turn_id,
103 record.provider,
104 record.model,
105 record.runtime_profile,
106 record.started_at_ms,
107 record.completed_at_ms,
108 record.status,
109 record.error_kind,
110 ],
111 )?;
112 Ok(())
113 }
114
115 pub fn upsert_token_usage(&self, record: &TokenUsageRecord) -> anyhow::Result<()> {
119 let conn = self.conn.lock().unwrap();
120 conn.execute(
121 "INSERT INTO token_usage (thread_id, turn_id, provider, model, recorded_at_ms, \
122 prompt_tokens, completion_tokens, total_tokens, cached_prompt_tokens)
123 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
124 ON CONFLICT(thread_id, turn_id) DO UPDATE SET
125 provider = COALESCE(excluded.provider, token_usage.provider),
126 model = COALESCE(excluded.model, token_usage.model),
127 recorded_at_ms = excluded.recorded_at_ms,
128 prompt_tokens = excluded.prompt_tokens,
129 completion_tokens = excluded.completion_tokens,
130 total_tokens = excluded.total_tokens,
131 cached_prompt_tokens = excluded.cached_prompt_tokens",
132 params![
133 record.thread_id,
134 record.turn_id,
135 record.provider,
136 record.model,
137 record.recorded_at_ms,
138 record.prompt_tokens,
139 record.completion_tokens,
140 record.total_tokens,
141 record.cached_prompt_tokens,
142 ],
143 )?;
144 Ok(())
145 }
146
147 pub fn upsert_tool_call(&self, record: &ToolCallRecord) -> anyhow::Result<()> {
150 let conn = self.conn.lock().unwrap();
151 conn.execute(
152 "INSERT INTO tool_calls (thread_id, turn_id, tool_id, tool_name, started_at_ms, \
153 completed_at_ms, duration_ms, status, is_error)
154 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
155 ON CONFLICT(thread_id, turn_id, tool_id) DO UPDATE SET
156 tool_name = COALESCE(excluded.tool_name, tool_calls.tool_name),
157 started_at_ms = COALESCE(tool_calls.started_at_ms, excluded.started_at_ms),
158 completed_at_ms = COALESCE(excluded.completed_at_ms, tool_calls.completed_at_ms),
159 duration_ms = COALESCE(
160 excluded.duration_ms,
161 tool_calls.duration_ms,
162 CASE
163 WHEN excluded.completed_at_ms IS NOT NULL
164 AND tool_calls.started_at_ms IS NOT NULL
165 THEN MAX(0, excluded.completed_at_ms - tool_calls.started_at_ms)
166 END
167 ),
168 status = CASE
169 WHEN tool_calls.status IN ('success', 'error') AND excluded.status = 'running'
170 THEN tool_calls.status
171 ELSE excluded.status
172 END,
173 is_error = MAX(tool_calls.is_error, excluded.is_error)",
174 params![
175 record.thread_id,
176 record.turn_id,
177 record.tool_id,
178 record.tool_name,
179 record.started_at_ms,
180 record.completed_at_ms,
181 record.duration_ms,
182 record.status,
183 record.is_error,
184 ],
185 )?;
186 Ok(())
187 }
188
189 pub fn import_offset(&self, source_path: &str) -> anyhow::Result<Option<u64>> {
192 let conn = self.conn.lock().unwrap();
193 let mut statement =
194 conn.prepare("SELECT last_line FROM ingested_event_offsets WHERE source_path = ?1")?;
195 let mut rows = statement.query([source_path])?;
196 match rows.next()? {
197 Some(row) => Ok(Some(row.get::<_, i64>(0)? as u64)),
198 None => Ok(None),
199 }
200 }
201
202 pub fn record_import_offset(
203 &self,
204 source_path: &str,
205 last_line: u64,
206 source_mtime_ms: Option<i64>,
207 ) -> anyhow::Result<()> {
208 let conn = self.conn.lock().unwrap();
209 conn.execute(
210 "INSERT INTO ingested_event_offsets (source_path, last_line, source_mtime_ms, \
211 updated_at_ms)
212 VALUES (?1, ?2, ?3, ?4)
213 ON CONFLICT(source_path) DO UPDATE SET
214 last_line = excluded.last_line,
215 source_mtime_ms = excluded.source_mtime_ms,
216 updated_at_ms = excluded.updated_at_ms",
217 params![source_path, last_line as i64, source_mtime_ms, now_ms()],
218 )?;
219 Ok(())
220 }
221
222 pub fn clear_all(&self) -> anyhow::Result<()> {
225 let conn = self.conn.lock().unwrap();
226 conn.execute_batch(
227 "DELETE FROM sessions;
228 DELETE FROM turns;
229 DELETE FROM token_usage;
230 DELETE FROM tool_calls;
231 DELETE FROM ingested_event_offsets;
232 DELETE FROM daily_rollups;",
233 )?;
234 Ok(())
235 }
236
237 pub fn apply_retention(&self, retention_days: u32) -> anyhow::Result<u64> {
244 if retention_days == 0 {
245 return Ok(0);
246 }
247 let cutoff_ms = now_ms() - i64::from(retention_days) * 86_400_000;
248 let conn = self.conn.lock().unwrap();
249 let mut deleted = 0_u64;
250 deleted += conn.execute(
251 "DELETE FROM tool_calls WHERE COALESCE(started_at_ms, completed_at_ms) < ?1",
252 params![cutoff_ms],
253 )? as u64;
254 deleted += conn.execute(
255 "DELETE FROM token_usage WHERE recorded_at_ms < ?1",
256 params![cutoff_ms],
257 )? as u64;
258 deleted += conn.execute(
259 "DELETE FROM turns WHERE COALESCE(completed_at_ms, started_at_ms) < ?1",
260 params![cutoff_ms],
261 )? as u64;
262 deleted += conn.execute(
263 "DELETE FROM sessions WHERE updated_at_ms < ?1
264 AND NOT EXISTS (SELECT 1 FROM turns t WHERE t.thread_id = sessions.thread_id)
265 AND NOT EXISTS (SELECT 1 FROM tool_calls tc WHERE tc.thread_id = sessions.thread_id)",
266 params![cutoff_ms],
267 )? as u64;
268 Ok(deleted)
269 }
270
271 pub fn counts(&self) -> anyhow::Result<StoreCounts> {
272 let conn = self.conn.lock().unwrap();
273 let count = |table: &str| -> anyhow::Result<u64> {
274 Ok(
275 conn.query_row(&format!("SELECT COUNT(*) FROM {table}"), [], |row| {
276 row.get::<_, i64>(0)
277 })? as u64,
278 )
279 };
280 Ok(StoreCounts {
281 sessions: count("sessions")?,
282 turns: count("turns")?,
283 token_usage: count("token_usage")?,
284 tool_calls: count("tool_calls")?,
285 })
286 }
287}
288
289#[derive(Debug, Clone, Copy, PartialEq, Eq)]
290pub struct StoreCounts {
291 pub sessions: u64,
292 pub turns: u64,
293 pub token_usage: u64,
294 pub tool_calls: u64,
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300
301 fn temp_store() -> (AnalyticsStore, PathBuf) {
302 let dir =
303 std::env::temp_dir().join(format!("roder-analytics-store-{}", uuid::Uuid::new_v4()));
304 let store = AnalyticsStore::open(
305 &AnalyticsStore::default_path(&dir),
306 WorkspaceLabelMode::FullPath,
307 )
308 .unwrap();
309 (store, dir)
310 }
311
312 #[test]
313 fn store_upserts_are_idempotent_and_merge_partial_halves() {
314 let (store, dir) = temp_store();
315
316 store
318 .upsert_tool_call(&ToolCallRecord {
319 thread_id: "t1".into(),
320 turn_id: "u1".into(),
321 tool_id: "call-1".into(),
322 tool_name: Some("read_file".into()),
323 started_at_ms: Some(1_000),
324 completed_at_ms: None,
325 duration_ms: None,
326 status: "running".into(),
327 is_error: false,
328 })
329 .unwrap();
330 store
331 .upsert_tool_call(&ToolCallRecord {
332 thread_id: "t1".into(),
333 turn_id: "u1".into(),
334 tool_id: "call-1".into(),
335 tool_name: None,
336 started_at_ms: None,
337 completed_at_ms: Some(1_125),
338 duration_ms: None,
339 status: "success".into(),
340 is_error: false,
341 })
342 .unwrap();
343
344 store
346 .upsert_tool_call(&ToolCallRecord {
347 thread_id: "t1".into(),
348 turn_id: "u1".into(),
349 tool_id: "call-1".into(),
350 tool_name: None,
351 started_at_ms: None,
352 completed_at_ms: Some(1_125),
353 duration_ms: None,
354 status: "success".into(),
355 is_error: false,
356 })
357 .unwrap();
358
359 let counts = store.counts().unwrap();
360 assert_eq!(counts.tool_calls, 1);
361 let (duration, status, name): (i64, String, String) = store
362 .conn
363 .lock()
364 .unwrap()
365 .query_row(
366 "SELECT duration_ms, status, tool_name FROM tool_calls",
367 [],
368 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
369 )
370 .unwrap();
371 assert_eq!(duration, 125);
372 assert_eq!(status, "success");
373 assert_eq!(name, "read_file");
374
375 store
377 .upsert_turn(&TurnRecord {
378 thread_id: "t1".into(),
379 turn_id: "u1".into(),
380 provider: Some("mock".into()),
381 model: Some("mock".into()),
382 runtime_profile: None,
383 started_at_ms: Some(900),
384 completed_at_ms: Some(2_000),
385 status: "completed".into(),
386 error_kind: None,
387 })
388 .unwrap();
389 store
390 .upsert_turn(&TurnRecord {
391 thread_id: "t1".into(),
392 turn_id: "u1".into(),
393 provider: None,
394 model: None,
395 runtime_profile: None,
396 started_at_ms: Some(900),
397 completed_at_ms: None,
398 status: "running".into(),
399 error_kind: None,
400 })
401 .unwrap();
402 let status: String = store
403 .conn
404 .lock()
405 .unwrap()
406 .query_row("SELECT status FROM turns", [], |row| row.get(0))
407 .unwrap();
408 assert_eq!(status, "completed");
409
410 for _ in 0..2 {
412 store
413 .upsert_token_usage(&TokenUsageRecord {
414 thread_id: "t1".into(),
415 turn_id: "u1".into(),
416 provider: Some("mock".into()),
417 model: Some("mock".into()),
418 recorded_at_ms: 2_000,
419 prompt_tokens: 100,
420 completion_tokens: 20,
421 total_tokens: 120,
422 cached_prompt_tokens: 80,
423 })
424 .unwrap();
425 }
426 let counts = store.counts().unwrap();
427 assert_eq!(counts.token_usage, 1);
428
429 let _ = std::fs::remove_dir_all(&dir);
430 }
431
432 #[test]
433 fn store_records_no_payload_columns() {
434 let (store, dir) = temp_store();
435 let conn = store.conn.lock().unwrap();
438 let mut statement = conn
439 .prepare("SELECT name FROM pragma_table_info('tool_calls')")
440 .unwrap();
441 let columns: Vec<String> = statement
442 .query_map([], |row| row.get(0))
443 .unwrap()
444 .map(Result::unwrap)
445 .collect();
446 for forbidden in ["output", "arguments", "payload", "prompt", "text"] {
447 assert!(
448 !columns.iter().any(|column| column.contains(forbidden)),
449 "tool_calls must not store {forbidden}"
450 );
451 }
452 drop(statement);
453 drop(conn);
454 let _ = std::fs::remove_dir_all(&dir);
455 }
456
457 #[test]
458 fn retention_prunes_old_rows_and_keeps_recent_ones() {
459 let (store, dir) = temp_store();
460 let now = now_ms();
461 let old = now - 100 * 86_400_000;
462 for (suffix, at) in [("old", old), ("new", now)] {
463 store
464 .upsert_turn(&TurnRecord {
465 thread_id: format!("t-{suffix}"),
466 turn_id: "u1".into(),
467 provider: None,
468 model: None,
469 runtime_profile: None,
470 started_at_ms: Some(at),
471 completed_at_ms: Some(at + 10),
472 status: "completed".into(),
473 error_kind: None,
474 })
475 .unwrap();
476 store
477 .upsert_tool_call(&ToolCallRecord {
478 thread_id: format!("t-{suffix}"),
479 turn_id: "u1".into(),
480 tool_id: "call-1".into(),
481 tool_name: Some("grep".into()),
482 started_at_ms: Some(at),
483 completed_at_ms: Some(at + 5),
484 duration_ms: Some(5),
485 status: "success".into(),
486 is_error: false,
487 })
488 .unwrap();
489 store
490 .upsert_token_usage(&TokenUsageRecord {
491 thread_id: format!("t-{suffix}"),
492 turn_id: "u1".into(),
493 provider: None,
494 model: None,
495 recorded_at_ms: at,
496 prompt_tokens: 10,
497 completion_tokens: 5,
498 total_tokens: 15,
499 cached_prompt_tokens: 0,
500 })
501 .unwrap();
502 store
503 .upsert_session(&crate::model::SessionRecord {
504 thread_id: format!("t-{suffix}"),
505 workspace_key: None,
506 workspace_label: None,
507 provider: None,
508 model: None,
509 created_at_ms: at,
510 updated_at_ms: at,
511 })
512 .unwrap();
513 }
514
515 assert_eq!(store.apply_retention(0).unwrap(), 0);
517 assert_eq!(store.counts().unwrap().turns, 2);
518
519 let deleted = store.apply_retention(30).unwrap();
522 assert_eq!(deleted, 4);
523 let counts = store.counts().unwrap();
524 assert_eq!(counts.turns, 1);
525 assert_eq!(counts.tool_calls, 1);
526 assert_eq!(counts.token_usage, 1);
527 assert_eq!(counts.sessions, 1);
528
529 assert_eq!(store.apply_retention(30).unwrap(), 0);
531 let _ = std::fs::remove_dir_all(&dir);
532 }
533
534 #[test]
535 fn import_offsets_round_trip() {
536 let (store, dir) = temp_store();
537 assert_eq!(store.import_offset("a/events.jsonl").unwrap(), None);
538 store
539 .record_import_offset("a/events.jsonl", 42, Some(1_000))
540 .unwrap();
541 assert_eq!(store.import_offset("a/events.jsonl").unwrap(), Some(42));
542 store
543 .record_import_offset("a/events.jsonl", 99, Some(2_000))
544 .unwrap();
545 assert_eq!(store.import_offset("a/events.jsonl").unwrap(), Some(99));
546 let _ = std::fs::remove_dir_all(&dir);
547 }
548}