Skip to main content

roder_usage_analytics/
rollup.rs

1//! Daily rollup generation: cached per-day/tool/provider/model/workspace
2//! aggregates for fast dashboards. Raw `tool_calls`/`token_usage` rows stay
3//! the source of truth; rollups are recomputed from them on refresh.
4
5use rusqlite::params;
6
7use crate::model::DailyRollupRow;
8use crate::query::percentile;
9use crate::store::AnalyticsStore;
10
11impl AnalyticsStore {
12    /// Recomputes all daily rollups from raw rows. Deterministic and
13    /// idempotent: the rollup table always reflects the raw tables.
14    pub fn refresh_daily_rollups(&self) -> anyhow::Result<u64> {
15        let conn = self.conn.lock().unwrap();
16        conn.execute("DELETE FROM daily_rollups", [])?;
17
18        // Tool-call aggregates per (day, workspace, provider, model, tool).
19        type RollupKey = (String, String, String, String, String);
20        type RollupAggregate = (Vec<i64>, u64, u64);
21        let mut grouped: std::collections::BTreeMap<RollupKey, RollupAggregate> =
22            std::collections::BTreeMap::new();
23        {
24            let mut statement = conn.prepare(
25                "SELECT strftime('%Y-%m-%d', COALESCE(tc.started_at_ms, tc.completed_at_ms) / 1000, \
26                 'unixepoch'),
27                        COALESCE(s.workspace_key, ''), COALESCE(tu.provider, ''),
28                        COALESCE(tu.model, ''), COALESCE(tc.tool_name, ''),
29                        tc.duration_ms, tc.is_error
30                 FROM tool_calls tc
31                 LEFT JOIN turns tu ON tu.thread_id = tc.thread_id AND tu.turn_id = tc.turn_id
32                 LEFT JOIN sessions s ON s.thread_id = tc.thread_id
33                 WHERE COALESCE(tc.started_at_ms, tc.completed_at_ms) IS NOT NULL",
34            )?;
35            let rows = statement.query_map([], |row| {
36                Ok((
37                    (
38                        row.get::<_, String>(0)?,
39                        row.get::<_, String>(1)?,
40                        row.get::<_, String>(2)?,
41                        row.get::<_, String>(3)?,
42                        row.get::<_, String>(4)?,
43                    ),
44                    row.get::<_, Option<i64>>(5)?,
45                    row.get::<_, bool>(6)?,
46                ))
47            })?;
48            for row in rows {
49                let (key, duration, is_error) = row?;
50                let entry = grouped.entry(key).or_default();
51                entry.1 += 1;
52                if is_error {
53                    entry.2 += 1;
54                }
55                if let Some(duration) = duration {
56                    entry.0.push(duration);
57                }
58            }
59        }
60        let mut written = 0_u64;
61        for ((day, workspace, provider, model, tool), (mut durations, calls, errors)) in grouped {
62            durations.sort_unstable();
63            conn.execute(
64                "INSERT INTO daily_rollups (day, workspace_key, provider, model, tool_name, \
65                 call_count, error_count, total_duration_ms, p50_duration_ms, p95_duration_ms, \
66                 p99_duration_ms)
67                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
68                params![
69                    day,
70                    workspace,
71                    provider,
72                    model,
73                    tool,
74                    calls as i64,
75                    errors as i64,
76                    durations.iter().sum::<i64>(),
77                    percentile(&durations, 0.50),
78                    percentile(&durations, 0.95),
79                    percentile(&durations, 0.99),
80                ],
81            )?;
82            written += 1;
83        }
84
85        // Token aggregates land on the tool_name = '' row per group.
86        let mut statement = conn.prepare(
87            "SELECT strftime('%Y-%m-%d', u.recorded_at_ms / 1000, 'unixepoch'),
88                    COALESCE(s.workspace_key, ''), COALESCE(tu.provider, ''),
89                    COALESCE(tu.model, ''),
90                    SUM(u.prompt_tokens), SUM(u.completion_tokens), SUM(u.total_tokens),
91                    SUM(u.cached_prompt_tokens)
92             FROM token_usage u
93             LEFT JOIN turns tu ON tu.thread_id = u.thread_id AND tu.turn_id = u.turn_id
94             LEFT JOIN sessions s ON s.thread_id = u.thread_id
95             GROUP BY 1, 2, 3, 4",
96        )?;
97        let rows = statement.query_map([], |row| {
98            Ok((
99                row.get::<_, String>(0)?,
100                row.get::<_, String>(1)?,
101                row.get::<_, String>(2)?,
102                row.get::<_, String>(3)?,
103                row.get::<_, i64>(4)?,
104                row.get::<_, i64>(5)?,
105                row.get::<_, i64>(6)?,
106                row.get::<_, i64>(7)?,
107            ))
108        })?;
109        for row in rows {
110            let (day, workspace, provider, model, prompt, completion, total, cached) = row?;
111            conn.execute(
112                "INSERT INTO daily_rollups (day, workspace_key, provider, model, tool_name, \
113                 prompt_tokens, completion_tokens, total_tokens, cached_prompt_tokens)
114                 VALUES (?1, ?2, ?3, ?4, '', ?5, ?6, ?7, ?8)
115                 ON CONFLICT(day, workspace_key, provider, model, tool_name) DO UPDATE SET
116                   prompt_tokens = excluded.prompt_tokens,
117                   completion_tokens = excluded.completion_tokens,
118                   total_tokens = excluded.total_tokens,
119                   cached_prompt_tokens = excluded.cached_prompt_tokens",
120                params![
121                    day, workspace, provider, model, prompt, completion, total, cached
122                ],
123            )?;
124            written += 1;
125        }
126        Ok(written)
127    }
128
129    pub fn daily_rollups(&self) -> anyhow::Result<Vec<DailyRollupRow>> {
130        let conn = self.conn.lock().unwrap();
131        let mut statement = conn.prepare(
132            "SELECT day, workspace_key, provider, model, tool_name, call_count, error_count, \
133             total_duration_ms, p50_duration_ms, p95_duration_ms, p99_duration_ms, prompt_tokens, \
134             completion_tokens, total_tokens, cached_prompt_tokens
135             FROM daily_rollups ORDER BY day, tool_name",
136        )?;
137        let rows = statement.query_map([], |row| {
138            let optional = |value: String| if value.is_empty() { None } else { Some(value) };
139            Ok(DailyRollupRow {
140                day: row.get(0)?,
141                workspace_key: optional(row.get(1)?),
142                provider: optional(row.get(2)?),
143                model: optional(row.get(3)?),
144                tool_name: optional(row.get(4)?),
145                call_count: row.get::<_, i64>(5)? as u64,
146                error_count: row.get::<_, i64>(6)? as u64,
147                total_duration_ms: row.get(7)?,
148                p50_duration_ms: row.get(8)?,
149                p95_duration_ms: row.get(9)?,
150                p99_duration_ms: row.get(10)?,
151                prompt_tokens: row.get::<_, i64>(11)? as u64,
152                completion_tokens: row.get::<_, i64>(12)? as u64,
153                total_tokens: row.get::<_, i64>(13)? as u64,
154                cached_prompt_tokens: row.get::<_, i64>(14)? as u64,
155            })
156        })?;
157        Ok(rows.collect::<Result<Vec<_>, _>>()?)
158    }
159}
160
161#[cfg(test)]
162mod tests {
163    use super::*;
164    use crate::model::{ToolCallRecord, WorkspaceLabelMode};
165
166    #[test]
167    fn rollup_refresh_is_deterministic_and_idempotent() {
168        let dir =
169            std::env::temp_dir().join(format!("roder-analytics-rollup-{}", uuid::Uuid::new_v4()));
170        let store = AnalyticsStore::open(
171            &AnalyticsStore::default_path(&dir),
172            WorkspaceLabelMode::FullPath,
173        )
174        .unwrap();
175        let day = 1_750_000_000_000_i64;
176        for (index, duration) in (1..=20).enumerate() {
177            store
178                .upsert_tool_call(&ToolCallRecord {
179                    thread_id: "t1".into(),
180                    turn_id: "u1".into(),
181                    tool_id: format!("call-{index}"),
182                    tool_name: Some("grep".into()),
183                    started_at_ms: Some(day),
184                    completed_at_ms: Some(day + duration),
185                    duration_ms: Some(duration),
186                    status: "success".into(),
187                    is_error: false,
188                })
189                .unwrap();
190        }
191
192        let first = store.refresh_daily_rollups().unwrap();
193        let rows_first = store.daily_rollups().unwrap();
194        let second = store.refresh_daily_rollups().unwrap();
195        let rows_second = store.daily_rollups().unwrap();
196        assert_eq!(first, second);
197        assert_eq!(rows_first, rows_second);
198
199        let grep = rows_first
200            .iter()
201            .find(|row| row.tool_name.as_deref() == Some("grep"))
202            .unwrap();
203        assert_eq!(grep.call_count, 20);
204        assert_eq!(grep.p50_duration_ms, Some(10));
205        assert_eq!(grep.p95_duration_ms, Some(19));
206        let _ = std::fs::remove_dir_all(&dir);
207    }
208}