1use rusqlite::params;
6
7use crate::model::DailyRollupRow;
8use crate::query::percentile;
9use crate::store::AnalyticsStore;
10
11impl AnalyticsStore {
12 pub fn refresh_daily_rollups(&self) -> anyhow::Result<u64> {
15 let conn = self.conn.lock().unwrap();
16 conn.execute("DELETE FROM daily_rollups", [])?;
17
18 type RollupKey = (String, String, String, String, String);
20 type RollupAggregate = (Vec<i64>, u64, u64);
21 let mut grouped: std::collections::BTreeMap<RollupKey, RollupAggregate> =
22 std::collections::BTreeMap::new();
23 {
24 let mut statement = conn.prepare(
25 "SELECT strftime('%Y-%m-%d', COALESCE(tc.started_at_ms, tc.completed_at_ms) / 1000, \
26 'unixepoch'),
27 COALESCE(s.workspace_key, ''), COALESCE(tu.provider, ''),
28 COALESCE(tu.model, ''), COALESCE(tc.tool_name, ''),
29 tc.duration_ms, tc.is_error
30 FROM tool_calls tc
31 LEFT JOIN turns tu ON tu.thread_id = tc.thread_id AND tu.turn_id = tc.turn_id
32 LEFT JOIN sessions s ON s.thread_id = tc.thread_id
33 WHERE COALESCE(tc.started_at_ms, tc.completed_at_ms) IS NOT NULL",
34 )?;
35 let rows = statement.query_map([], |row| {
36 Ok((
37 (
38 row.get::<_, String>(0)?,
39 row.get::<_, String>(1)?,
40 row.get::<_, String>(2)?,
41 row.get::<_, String>(3)?,
42 row.get::<_, String>(4)?,
43 ),
44 row.get::<_, Option<i64>>(5)?,
45 row.get::<_, bool>(6)?,
46 ))
47 })?;
48 for row in rows {
49 let (key, duration, is_error) = row?;
50 let entry = grouped.entry(key).or_default();
51 entry.1 += 1;
52 if is_error {
53 entry.2 += 1;
54 }
55 if let Some(duration) = duration {
56 entry.0.push(duration);
57 }
58 }
59 }
60 let mut written = 0_u64;
61 for ((day, workspace, provider, model, tool), (mut durations, calls, errors)) in grouped {
62 durations.sort_unstable();
63 conn.execute(
64 "INSERT INTO daily_rollups (day, workspace_key, provider, model, tool_name, \
65 call_count, error_count, total_duration_ms, p50_duration_ms, p95_duration_ms, \
66 p99_duration_ms)
67 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
68 params![
69 day,
70 workspace,
71 provider,
72 model,
73 tool,
74 calls as i64,
75 errors as i64,
76 durations.iter().sum::<i64>(),
77 percentile(&durations, 0.50),
78 percentile(&durations, 0.95),
79 percentile(&durations, 0.99),
80 ],
81 )?;
82 written += 1;
83 }
84
85 let mut statement = conn.prepare(
87 "SELECT strftime('%Y-%m-%d', u.recorded_at_ms / 1000, 'unixepoch'),
88 COALESCE(s.workspace_key, ''), COALESCE(tu.provider, ''),
89 COALESCE(tu.model, ''),
90 SUM(u.prompt_tokens), SUM(u.completion_tokens), SUM(u.total_tokens),
91 SUM(u.cached_prompt_tokens)
92 FROM token_usage u
93 LEFT JOIN turns tu ON tu.thread_id = u.thread_id AND tu.turn_id = u.turn_id
94 LEFT JOIN sessions s ON s.thread_id = u.thread_id
95 GROUP BY 1, 2, 3, 4",
96 )?;
97 let rows = statement.query_map([], |row| {
98 Ok((
99 row.get::<_, String>(0)?,
100 row.get::<_, String>(1)?,
101 row.get::<_, String>(2)?,
102 row.get::<_, String>(3)?,
103 row.get::<_, i64>(4)?,
104 row.get::<_, i64>(5)?,
105 row.get::<_, i64>(6)?,
106 row.get::<_, i64>(7)?,
107 ))
108 })?;
109 for row in rows {
110 let (day, workspace, provider, model, prompt, completion, total, cached) = row?;
111 conn.execute(
112 "INSERT INTO daily_rollups (day, workspace_key, provider, model, tool_name, \
113 prompt_tokens, completion_tokens, total_tokens, cached_prompt_tokens)
114 VALUES (?1, ?2, ?3, ?4, '', ?5, ?6, ?7, ?8)
115 ON CONFLICT(day, workspace_key, provider, model, tool_name) DO UPDATE SET
116 prompt_tokens = excluded.prompt_tokens,
117 completion_tokens = excluded.completion_tokens,
118 total_tokens = excluded.total_tokens,
119 cached_prompt_tokens = excluded.cached_prompt_tokens",
120 params![
121 day, workspace, provider, model, prompt, completion, total, cached
122 ],
123 )?;
124 written += 1;
125 }
126 Ok(written)
127 }
128
129 pub fn daily_rollups(&self) -> anyhow::Result<Vec<DailyRollupRow>> {
130 let conn = self.conn.lock().unwrap();
131 let mut statement = conn.prepare(
132 "SELECT day, workspace_key, provider, model, tool_name, call_count, error_count, \
133 total_duration_ms, p50_duration_ms, p95_duration_ms, p99_duration_ms, prompt_tokens, \
134 completion_tokens, total_tokens, cached_prompt_tokens
135 FROM daily_rollups ORDER BY day, tool_name",
136 )?;
137 let rows = statement.query_map([], |row| {
138 let optional = |value: String| if value.is_empty() { None } else { Some(value) };
139 Ok(DailyRollupRow {
140 day: row.get(0)?,
141 workspace_key: optional(row.get(1)?),
142 provider: optional(row.get(2)?),
143 model: optional(row.get(3)?),
144 tool_name: optional(row.get(4)?),
145 call_count: row.get::<_, i64>(5)? as u64,
146 error_count: row.get::<_, i64>(6)? as u64,
147 total_duration_ms: row.get(7)?,
148 p50_duration_ms: row.get(8)?,
149 p95_duration_ms: row.get(9)?,
150 p99_duration_ms: row.get(10)?,
151 prompt_tokens: row.get::<_, i64>(11)? as u64,
152 completion_tokens: row.get::<_, i64>(12)? as u64,
153 total_tokens: row.get::<_, i64>(13)? as u64,
154 cached_prompt_tokens: row.get::<_, i64>(14)? as u64,
155 })
156 })?;
157 Ok(rows.collect::<Result<Vec<_>, _>>()?)
158 }
159}
160
161#[cfg(test)]
162mod tests {
163 use super::*;
164 use crate::model::{ToolCallRecord, WorkspaceLabelMode};
165
166 #[test]
167 fn rollup_refresh_is_deterministic_and_idempotent() {
168 let dir =
169 std::env::temp_dir().join(format!("roder-analytics-rollup-{}", uuid::Uuid::new_v4()));
170 let store = AnalyticsStore::open(
171 &AnalyticsStore::default_path(&dir),
172 WorkspaceLabelMode::FullPath,
173 )
174 .unwrap();
175 let day = 1_750_000_000_000_i64;
176 for (index, duration) in (1..=20).enumerate() {
177 store
178 .upsert_tool_call(&ToolCallRecord {
179 thread_id: "t1".into(),
180 turn_id: "u1".into(),
181 tool_id: format!("call-{index}"),
182 tool_name: Some("grep".into()),
183 started_at_ms: Some(day),
184 completed_at_ms: Some(day + duration),
185 duration_ms: Some(duration),
186 status: "success".into(),
187 is_error: false,
188 })
189 .unwrap();
190 }
191
192 let first = store.refresh_daily_rollups().unwrap();
193 let rows_first = store.daily_rollups().unwrap();
194 let second = store.refresh_daily_rollups().unwrap();
195 let rows_second = store.daily_rollups().unwrap();
196 assert_eq!(first, second);
197 assert_eq!(rows_first, rows_second);
198
199 let grep = rows_first
200 .iter()
201 .find(|row| row.tool_name.as_deref() == Some("grep"))
202 .unwrap();
203 assert_eq!(grep.call_count, 20);
204 assert_eq!(grep.p50_duration_ms, Some(10));
205 assert_eq!(grep.p95_duration_ms, Some(19));
206 let _ = std::fs::remove_dir_all(&dir);
207 }
208}