Skip to main content

roder_usage_analytics/
store.rs

1//! SQLite-backed analytics store with idempotent upserts.
2
3use std::path::{Path, PathBuf};
4use std::sync::Mutex;
5
6use anyhow::Context;
7use rusqlite::{Connection, params};
8
9use crate::model::{
10    SessionRecord, TokenUsageRecord, ToolCallRecord, TurnRecord, WorkspaceLabelMode,
11};
12
13pub(crate) fn now_ms() -> i64 {
14    (time::OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000) as i64
15}
16
17pub struct AnalyticsStore {
18    pub(crate) conn: Mutex<Connection>,
19    path: PathBuf,
20    pub workspace_label_mode: WorkspaceLabelMode,
21}
22
23impl AnalyticsStore {
24    /// Opens (creating directories and schema as needed) the analytics
25    /// database at `path`.
26    pub fn open(path: &Path, workspace_label_mode: WorkspaceLabelMode) -> anyhow::Result<Self> {
27        if let Some(parent) = path.parent() {
28            std::fs::create_dir_all(parent)
29                .with_context(|| format!("create analytics dir {}", parent.display()))?;
30        }
31        let conn = Connection::open(path)
32            .with_context(|| format!("open analytics database {}", path.display()))?;
33        conn.pragma_update(None, "journal_mode", "WAL")?;
34        conn.pragma_update(None, "synchronous", "NORMAL")?;
35        crate::schema::apply_migrations(&conn)?;
36        Ok(Self {
37            conn: Mutex::new(conn),
38            path: path.to_path_buf(),
39            workspace_label_mode,
40        })
41    }
42
43    /// Default location under a Roder data directory.
44    pub fn default_path(data_dir: &Path) -> PathBuf {
45        data_dir.join("analytics/usage.sqlite3")
46    }
47
48    pub fn path(&self) -> &Path {
49        &self.path
50    }
51
52    /// Upserts session metadata. Later non-null values win; `created_at_ms`
53    /// keeps the earliest observed value.
54    pub fn upsert_session(&self, record: &SessionRecord) -> anyhow::Result<()> {
55        let conn = self.conn.lock().unwrap();
56        conn.execute(
57            "INSERT INTO sessions (thread_id, workspace_key, workspace_label, provider, model, \
58             created_at_ms, updated_at_ms)
59             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
60             ON CONFLICT(thread_id) DO UPDATE SET
61               workspace_key = COALESCE(excluded.workspace_key, sessions.workspace_key),
62               workspace_label = COALESCE(excluded.workspace_label, sessions.workspace_label),
63               provider = COALESCE(excluded.provider, sessions.provider),
64               model = COALESCE(excluded.model, sessions.model),
65               created_at_ms = MIN(sessions.created_at_ms, excluded.created_at_ms),
66               updated_at_ms = MAX(sessions.updated_at_ms, excluded.updated_at_ms)",
67            params![
68                record.thread_id,
69                record.workspace_key,
70                record.workspace_label,
71                record.provider,
72                record.model,
73                record.created_at_ms,
74                record.updated_at_ms,
75            ],
76        )?;
77        Ok(())
78    }
79
80    /// Upserts a turn keyed by `(thread_id, turn_id)`. Terminal statuses
81    /// (`completed`/`failed`) are never downgraded back to `running`.
82    pub fn upsert_turn(&self, record: &TurnRecord) -> anyhow::Result<()> {
83        let conn = self.conn.lock().unwrap();
84        conn.execute(
85            "INSERT INTO turns (thread_id, turn_id, provider, model, runtime_profile, \
86             started_at_ms, completed_at_ms, status, error_kind)
87             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
88             ON CONFLICT(thread_id, turn_id) DO UPDATE SET
89               provider = COALESCE(excluded.provider, turns.provider),
90               model = COALESCE(excluded.model, turns.model),
91               runtime_profile = COALESCE(excluded.runtime_profile, turns.runtime_profile),
92               started_at_ms = COALESCE(turns.started_at_ms, excluded.started_at_ms),
93               completed_at_ms = COALESCE(excluded.completed_at_ms, turns.completed_at_ms),
94               status = CASE
95                 WHEN turns.status IN ('completed', 'failed') AND excluded.status = 'running'
96                   THEN turns.status
97                 ELSE excluded.status
98               END,
99               error_kind = COALESCE(excluded.error_kind, turns.error_kind)",
100            params![
101                record.thread_id,
102                record.turn_id,
103                record.provider,
104                record.model,
105                record.runtime_profile,
106                record.started_at_ms,
107                record.completed_at_ms,
108                record.status,
109                record.error_kind,
110            ],
111        )?;
112        Ok(())
113    }
114
115    /// Upserts terminal token usage for a turn keyed by
116    /// `(thread_id, turn_id)`; replaying the same terminal event is a no-op
117    /// rather than a double count.
118    pub fn upsert_token_usage(&self, record: &TokenUsageRecord) -> anyhow::Result<()> {
119        let conn = self.conn.lock().unwrap();
120        conn.execute(
121            "INSERT INTO token_usage (thread_id, turn_id, provider, model, recorded_at_ms, \
122             prompt_tokens, completion_tokens, total_tokens, cached_prompt_tokens)
123             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
124             ON CONFLICT(thread_id, turn_id) DO UPDATE SET
125               provider = COALESCE(excluded.provider, token_usage.provider),
126               model = COALESCE(excluded.model, token_usage.model),
127               recorded_at_ms = excluded.recorded_at_ms,
128               prompt_tokens = excluded.prompt_tokens,
129               completion_tokens = excluded.completion_tokens,
130               total_tokens = excluded.total_tokens,
131               cached_prompt_tokens = excluded.cached_prompt_tokens",
132            params![
133                record.thread_id,
134                record.turn_id,
135                record.provider,
136                record.model,
137                record.recorded_at_ms,
138                record.prompt_tokens,
139                record.completion_tokens,
140                record.total_tokens,
141                record.cached_prompt_tokens,
142            ],
143        )?;
144        Ok(())
145    }
146
147    /// Upserts a tool call keyed by `(thread_id, turn_id, tool_id)`,
148    /// merging start/completion halves into one logical record.
149    pub fn upsert_tool_call(&self, record: &ToolCallRecord) -> anyhow::Result<()> {
150        let conn = self.conn.lock().unwrap();
151        conn.execute(
152            "INSERT INTO tool_calls (thread_id, turn_id, tool_id, tool_name, started_at_ms, \
153             completed_at_ms, duration_ms, status, is_error)
154             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
155             ON CONFLICT(thread_id, turn_id, tool_id) DO UPDATE SET
156               tool_name = COALESCE(excluded.tool_name, tool_calls.tool_name),
157               started_at_ms = COALESCE(tool_calls.started_at_ms, excluded.started_at_ms),
158               completed_at_ms = COALESCE(excluded.completed_at_ms, tool_calls.completed_at_ms),
159               duration_ms = COALESCE(
160                 excluded.duration_ms,
161                 tool_calls.duration_ms,
162                 CASE
163                   WHEN excluded.completed_at_ms IS NOT NULL
164                        AND tool_calls.started_at_ms IS NOT NULL
165                     THEN MAX(0, excluded.completed_at_ms - tool_calls.started_at_ms)
166                 END
167               ),
168               status = CASE
169                 WHEN tool_calls.status IN ('success', 'error') AND excluded.status = 'running'
170                   THEN tool_calls.status
171                 ELSE excluded.status
172               END,
173               is_error = MAX(tool_calls.is_error, excluded.is_error)",
174            params![
175                record.thread_id,
176                record.turn_id,
177                record.tool_id,
178                record.tool_name,
179                record.started_at_ms,
180                record.completed_at_ms,
181                record.duration_ms,
182                record.status,
183                record.is_error,
184            ],
185        )?;
186        Ok(())
187    }
188
189    // -- import offsets ---------------------------------------------------
190
191    pub fn import_offset(&self, source_path: &str) -> anyhow::Result<Option<u64>> {
192        let conn = self.conn.lock().unwrap();
193        let mut statement =
194            conn.prepare("SELECT last_line FROM ingested_event_offsets WHERE source_path = ?1")?;
195        let mut rows = statement.query([source_path])?;
196        match rows.next()? {
197            Some(row) => Ok(Some(row.get::<_, i64>(0)? as u64)),
198            None => Ok(None),
199        }
200    }
201
202    pub fn record_import_offset(
203        &self,
204        source_path: &str,
205        last_line: u64,
206        source_mtime_ms: Option<i64>,
207    ) -> anyhow::Result<()> {
208        let conn = self.conn.lock().unwrap();
209        conn.execute(
210            "INSERT INTO ingested_event_offsets (source_path, last_line, source_mtime_ms, \
211             updated_at_ms)
212             VALUES (?1, ?2, ?3, ?4)
213             ON CONFLICT(source_path) DO UPDATE SET
214               last_line = excluded.last_line,
215               source_mtime_ms = excluded.source_mtime_ms,
216               updated_at_ms = excluded.updated_at_ms",
217            params![source_path, last_line as i64, source_mtime_ms, now_ms()],
218        )?;
219        Ok(())
220    }
221
222    /// Clears all analytics rows (used by `--rebuild` before replaying
223    /// JSONL). The schema and migrations are kept.
224    pub fn clear_all(&self) -> anyhow::Result<()> {
225        let conn = self.conn.lock().unwrap();
226        conn.execute_batch(
227            "DELETE FROM sessions;
228             DELETE FROM turns;
229             DELETE FROM token_usage;
230             DELETE FROM tool_calls;
231             DELETE FROM ingested_event_offsets;
232             DELETE FROM daily_rollups;",
233        )?;
234        Ok(())
235    }
236
237    /**
238     * Deletes raw rows older than `retention_days` (sessions are kept while
239     * any of their activity remains). Returns the number of deleted rows.
240     * `0` days disables pruning. Rollups are not touched here; callers
241     * refresh them after pruning.
242     */
243    pub fn apply_retention(&self, retention_days: u32) -> anyhow::Result<u64> {
244        if retention_days == 0 {
245            return Ok(0);
246        }
247        let cutoff_ms = now_ms() - i64::from(retention_days) * 86_400_000;
248        let conn = self.conn.lock().unwrap();
249        let mut deleted = 0_u64;
250        deleted += conn.execute(
251            "DELETE FROM tool_calls WHERE COALESCE(started_at_ms, completed_at_ms) < ?1",
252            params![cutoff_ms],
253        )? as u64;
254        deleted += conn.execute(
255            "DELETE FROM token_usage WHERE recorded_at_ms < ?1",
256            params![cutoff_ms],
257        )? as u64;
258        deleted += conn.execute(
259            "DELETE FROM turns WHERE COALESCE(completed_at_ms, started_at_ms) < ?1",
260            params![cutoff_ms],
261        )? as u64;
262        deleted += conn.execute(
263            "DELETE FROM sessions WHERE updated_at_ms < ?1
264               AND NOT EXISTS (SELECT 1 FROM turns t WHERE t.thread_id = sessions.thread_id)
265               AND NOT EXISTS (SELECT 1 FROM tool_calls tc WHERE tc.thread_id = sessions.thread_id)",
266            params![cutoff_ms],
267        )? as u64;
268        Ok(deleted)
269    }
270
271    pub fn counts(&self) -> anyhow::Result<StoreCounts> {
272        let conn = self.conn.lock().unwrap();
273        let count = |table: &str| -> anyhow::Result<u64> {
274            Ok(
275                conn.query_row(&format!("SELECT COUNT(*) FROM {table}"), [], |row| {
276                    row.get::<_, i64>(0)
277                })? as u64,
278            )
279        };
280        Ok(StoreCounts {
281            sessions: count("sessions")?,
282            turns: count("turns")?,
283            token_usage: count("token_usage")?,
284            tool_calls: count("tool_calls")?,
285        })
286    }
287}
288
289#[derive(Debug, Clone, Copy, PartialEq, Eq)]
290pub struct StoreCounts {
291    pub sessions: u64,
292    pub turns: u64,
293    pub token_usage: u64,
294    pub tool_calls: u64,
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300
301    fn temp_store() -> (AnalyticsStore, PathBuf) {
302        let dir =
303            std::env::temp_dir().join(format!("roder-analytics-store-{}", uuid::Uuid::new_v4()));
304        let store = AnalyticsStore::open(
305            &AnalyticsStore::default_path(&dir),
306            WorkspaceLabelMode::FullPath,
307        )
308        .unwrap();
309        (store, dir)
310    }
311
312    #[test]
313    fn store_upserts_are_idempotent_and_merge_partial_halves() {
314        let (store, dir) = temp_store();
315
316        // Tool start + completion merge into one record with a duration.
317        store
318            .upsert_tool_call(&ToolCallRecord {
319                thread_id: "t1".into(),
320                turn_id: "u1".into(),
321                tool_id: "call-1".into(),
322                tool_name: Some("read_file".into()),
323                started_at_ms: Some(1_000),
324                completed_at_ms: None,
325                duration_ms: None,
326                status: "running".into(),
327                is_error: false,
328            })
329            .unwrap();
330        store
331            .upsert_tool_call(&ToolCallRecord {
332                thread_id: "t1".into(),
333                turn_id: "u1".into(),
334                tool_id: "call-1".into(),
335                tool_name: None,
336                started_at_ms: None,
337                completed_at_ms: Some(1_125),
338                duration_ms: None,
339                status: "success".into(),
340                is_error: false,
341            })
342            .unwrap();
343
344        // Replaying the completion does not double-count.
345        store
346            .upsert_tool_call(&ToolCallRecord {
347                thread_id: "t1".into(),
348                turn_id: "u1".into(),
349                tool_id: "call-1".into(),
350                tool_name: None,
351                started_at_ms: None,
352                completed_at_ms: Some(1_125),
353                duration_ms: None,
354                status: "success".into(),
355                is_error: false,
356            })
357            .unwrap();
358
359        let counts = store.counts().unwrap();
360        assert_eq!(counts.tool_calls, 1);
361        let (duration, status, name): (i64, String, String) = store
362            .conn
363            .lock()
364            .unwrap()
365            .query_row(
366                "SELECT duration_ms, status, tool_name FROM tool_calls",
367                [],
368                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
369            )
370            .unwrap();
371        assert_eq!(duration, 125);
372        assert_eq!(status, "success");
373        assert_eq!(name, "read_file");
374
375        // Terminal turn status is not downgraded by a late running upsert.
376        store
377            .upsert_turn(&TurnRecord {
378                thread_id: "t1".into(),
379                turn_id: "u1".into(),
380                provider: Some("mock".into()),
381                model: Some("mock".into()),
382                runtime_profile: None,
383                started_at_ms: Some(900),
384                completed_at_ms: Some(2_000),
385                status: "completed".into(),
386                error_kind: None,
387            })
388            .unwrap();
389        store
390            .upsert_turn(&TurnRecord {
391                thread_id: "t1".into(),
392                turn_id: "u1".into(),
393                provider: None,
394                model: None,
395                runtime_profile: None,
396                started_at_ms: Some(900),
397                completed_at_ms: None,
398                status: "running".into(),
399                error_kind: None,
400            })
401            .unwrap();
402        let status: String = store
403            .conn
404            .lock()
405            .unwrap()
406            .query_row("SELECT status FROM turns", [], |row| row.get(0))
407            .unwrap();
408        assert_eq!(status, "completed");
409
410        // Token usage replays update in place.
411        for _ in 0..2 {
412            store
413                .upsert_token_usage(&TokenUsageRecord {
414                    thread_id: "t1".into(),
415                    turn_id: "u1".into(),
416                    provider: Some("mock".into()),
417                    model: Some("mock".into()),
418                    recorded_at_ms: 2_000,
419                    prompt_tokens: 100,
420                    completion_tokens: 20,
421                    total_tokens: 120,
422                    cached_prompt_tokens: 80,
423                })
424                .unwrap();
425        }
426        let counts = store.counts().unwrap();
427        assert_eq!(counts.token_usage, 1);
428
429        let _ = std::fs::remove_dir_all(&dir);
430    }
431
432    #[test]
433    fn store_records_no_payload_columns() {
434        let (store, dir) = temp_store();
435        // The schema itself must not have any column that could hold prompt
436        // or output bodies.
437        let conn = store.conn.lock().unwrap();
438        let mut statement = conn
439            .prepare("SELECT name FROM pragma_table_info('tool_calls')")
440            .unwrap();
441        let columns: Vec<String> = statement
442            .query_map([], |row| row.get(0))
443            .unwrap()
444            .map(Result::unwrap)
445            .collect();
446        for forbidden in ["output", "arguments", "payload", "prompt", "text"] {
447            assert!(
448                !columns.iter().any(|column| column.contains(forbidden)),
449                "tool_calls must not store {forbidden}"
450            );
451        }
452        drop(statement);
453        drop(conn);
454        let _ = std::fs::remove_dir_all(&dir);
455    }
456
457    #[test]
458    fn retention_prunes_old_rows_and_keeps_recent_ones() {
459        let (store, dir) = temp_store();
460        let now = now_ms();
461        let old = now - 100 * 86_400_000;
462        for (suffix, at) in [("old", old), ("new", now)] {
463            store
464                .upsert_turn(&TurnRecord {
465                    thread_id: format!("t-{suffix}"),
466                    turn_id: "u1".into(),
467                    provider: None,
468                    model: None,
469                    runtime_profile: None,
470                    started_at_ms: Some(at),
471                    completed_at_ms: Some(at + 10),
472                    status: "completed".into(),
473                    error_kind: None,
474                })
475                .unwrap();
476            store
477                .upsert_tool_call(&ToolCallRecord {
478                    thread_id: format!("t-{suffix}"),
479                    turn_id: "u1".into(),
480                    tool_id: "call-1".into(),
481                    tool_name: Some("grep".into()),
482                    started_at_ms: Some(at),
483                    completed_at_ms: Some(at + 5),
484                    duration_ms: Some(5),
485                    status: "success".into(),
486                    is_error: false,
487                })
488                .unwrap();
489            store
490                .upsert_token_usage(&TokenUsageRecord {
491                    thread_id: format!("t-{suffix}"),
492                    turn_id: "u1".into(),
493                    provider: None,
494                    model: None,
495                    recorded_at_ms: at,
496                    prompt_tokens: 10,
497                    completion_tokens: 5,
498                    total_tokens: 15,
499                    cached_prompt_tokens: 0,
500                })
501                .unwrap();
502            store
503                .upsert_session(&crate::model::SessionRecord {
504                    thread_id: format!("t-{suffix}"),
505                    workspace_key: None,
506                    workspace_label: None,
507                    provider: None,
508                    model: None,
509                    created_at_ms: at,
510                    updated_at_ms: at,
511                })
512                .unwrap();
513        }
514
515        // Disabled retention prunes nothing.
516        assert_eq!(store.apply_retention(0).unwrap(), 0);
517        assert_eq!(store.counts().unwrap().turns, 2);
518
519        // 30-day retention removes only the 100-day-old rows, including the
520        // now-empty session.
521        let deleted = store.apply_retention(30).unwrap();
522        assert_eq!(deleted, 4);
523        let counts = store.counts().unwrap();
524        assert_eq!(counts.turns, 1);
525        assert_eq!(counts.tool_calls, 1);
526        assert_eq!(counts.token_usage, 1);
527        assert_eq!(counts.sessions, 1);
528
529        // Idempotent: a second pass deletes nothing further.
530        assert_eq!(store.apply_retention(30).unwrap(), 0);
531        let _ = std::fs::remove_dir_all(&dir);
532    }
533
534    #[test]
535    fn import_offsets_round_trip() {
536        let (store, dir) = temp_store();
537        assert_eq!(store.import_offset("a/events.jsonl").unwrap(), None);
538        store
539            .record_import_offset("a/events.jsonl", 42, Some(1_000))
540            .unwrap();
541        assert_eq!(store.import_offset("a/events.jsonl").unwrap(), Some(42));
542        store
543            .record_import_offset("a/events.jsonl", 99, Some(2_000))
544            .unwrap();
545        assert_eq!(store.import_offset("a/events.jsonl").unwrap(), Some(99));
546        let _ = std::fs::remove_dir_all(&dir);
547    }
548}