1use rusqlite::{Connection, ToSql, params_from_iter};
5
6use crate::model::{
7 SessionSummary, StatsFilter, TokenGroup, TokenSummaryRow, ToolSummary, UsageSummary,
8};
9use crate::store::AnalyticsStore;
10
11pub const DEFAULT_LIMIT: u64 = 50;
12pub const MAX_LIMIT: u64 = 1_000;
14
15fn effective_limit(filter: &StatsFilter) -> u64 {
16 filter.limit.unwrap_or(DEFAULT_LIMIT).min(MAX_LIMIT)
17}
18
19struct Where {
20 clause: String,
21 params: Vec<Box<dyn ToSql>>,
22}
23
24fn tool_call_filter(filter: &StatsFilter) -> Where {
27 let mut clauses = vec!["1=1".to_string()];
28 let mut params: Vec<Box<dyn ToSql>> = Vec::new();
29 if let Some(since) = filter.since_ms {
30 clauses.push("COALESCE(tc.started_at_ms, tc.completed_at_ms) >= ?".to_string());
31 params.push(Box::new(since));
32 }
33 if let Some(until) = filter.until_ms {
34 clauses.push("COALESCE(tc.started_at_ms, tc.completed_at_ms) < ?".to_string());
35 params.push(Box::new(until));
36 }
37 if let Some(thread) = &filter.thread_id {
38 clauses.push("tc.thread_id = ?".to_string());
39 params.push(Box::new(thread.clone()));
40 }
41 if let Some(tool) = &filter.tool_name {
42 clauses.push("tc.tool_name = ?".to_string());
43 params.push(Box::new(tool.clone()));
44 }
45 if let Some(provider) = &filter.provider {
46 clauses.push("tu.provider = ?".to_string());
47 params.push(Box::new(provider.clone()));
48 }
49 if let Some(model) = &filter.model {
50 clauses.push("tu.model = ?".to_string());
51 params.push(Box::new(model.clone()));
52 }
53 if let Some(workspace) = &filter.workspace_key {
54 clauses.push("s.workspace_key = ?".to_string());
55 params.push(Box::new(workspace.clone()));
56 }
57 Where {
58 clause: clauses.join(" AND "),
59 params,
60 }
61}
62
63pub(crate) fn percentile(sorted: &[i64], quantile: f64) -> Option<i64> {
65 if sorted.is_empty() {
66 return None;
67 }
68 let rank = (quantile * sorted.len() as f64).ceil() as usize;
69 Some(sorted[rank.clamp(1, sorted.len()) - 1])
70}
71
72impl AnalyticsStore {
73 pub fn tool_summaries(&self, filter: &StatsFilter) -> anyhow::Result<Vec<ToolSummary>> {
76 let conn = self.conn.lock().unwrap();
77 let where_clause = tool_call_filter(filter);
78 let sql = format!(
79 "SELECT COALESCE(tc.tool_name, '(unknown)') AS name, tc.duration_ms, tc.is_error
80 FROM tool_calls tc
81 LEFT JOIN turns tu ON tu.thread_id = tc.thread_id AND tu.turn_id = tc.turn_id
82 LEFT JOIN sessions s ON s.thread_id = tc.thread_id
83 WHERE {}",
84 where_clause.clause
85 );
86 let mut statement = conn.prepare(&sql)?;
87 let mut grouped: std::collections::BTreeMap<String, (Vec<i64>, u64, u64)> =
88 std::collections::BTreeMap::new();
89 let rows = statement.query_map(params_from_iter(where_clause.params.iter()), |row| {
90 Ok((
91 row.get::<_, String>(0)?,
92 row.get::<_, Option<i64>>(1)?,
93 row.get::<_, bool>(2)?,
94 ))
95 })?;
96 for row in rows {
97 let (name, duration, is_error) = row?;
98 let entry = grouped.entry(name).or_default();
99 entry.1 += 1;
100 if is_error {
101 entry.2 += 1;
102 }
103 if let Some(duration) = duration {
104 entry.0.push(duration);
105 }
106 }
107 let min_calls = filter.min_calls.unwrap_or(0);
108 let mut summaries: Vec<ToolSummary> = grouped
109 .into_iter()
110 .filter(|(_, (_, calls, _))| *calls >= min_calls)
111 .map(|(tool_name, (mut durations, call_count, error_count))| {
112 durations.sort_unstable();
113 let total: i64 = durations.iter().sum();
114 ToolSummary {
115 tool_name,
116 call_count,
117 error_count,
118 error_rate: if call_count == 0 {
119 0.0
120 } else {
121 error_count as f64 / call_count as f64
122 },
123 total_duration_ms: total,
124 avg_duration_ms: if durations.is_empty() {
125 None
126 } else {
127 Some(total as f64 / durations.len() as f64)
128 },
129 p50_duration_ms: percentile(&durations, 0.50),
130 p95_duration_ms: percentile(&durations, 0.95),
131 p99_duration_ms: percentile(&durations, 0.99),
132 }
133 })
134 .collect();
135 summaries.sort_by_key(|summary| std::cmp::Reverse(summary.call_count));
136 summaries.truncate(effective_limit(filter) as usize);
137 Ok(summaries)
138 }
139
140 pub fn token_summaries(
143 &self,
144 group: TokenGroup,
145 filter: &StatsFilter,
146 ) -> anyhow::Result<Vec<TokenSummaryRow>> {
147 let group_expr = match group {
148 TokenGroup::Day => {
149 "strftime('%Y-%m-%d', token_usage.recorded_at_ms / 1000, 'unixepoch')"
150 }
151 TokenGroup::Session => "token_usage.thread_id",
152 TokenGroup::Provider => "COALESCE(tu.provider, '(unknown)')",
153 TokenGroup::Model => "COALESCE(tu.model, '(unknown)')",
154 TokenGroup::Workspace => "COALESCE(s.workspace_label, '(unknown)')",
155 };
156 let mut clauses = vec!["1=1".to_string()];
157 let mut params: Vec<Box<dyn ToSql>> = Vec::new();
158 if let Some(since) = filter.since_ms {
159 clauses.push("token_usage.recorded_at_ms >= ?".to_string());
160 params.push(Box::new(since));
161 }
162 if let Some(until) = filter.until_ms {
163 clauses.push("token_usage.recorded_at_ms < ?".to_string());
164 params.push(Box::new(until));
165 }
166 if let Some(thread) = &filter.thread_id {
167 clauses.push("token_usage.thread_id = ?".to_string());
168 params.push(Box::new(thread.clone()));
169 }
170 if let Some(provider) = &filter.provider {
171 clauses.push("tu.provider = ?".to_string());
172 params.push(Box::new(provider.clone()));
173 }
174 if let Some(model) = &filter.model {
175 clauses.push("tu.model = ?".to_string());
176 params.push(Box::new(model.clone()));
177 }
178 if let Some(workspace) = &filter.workspace_key {
179 clauses.push("s.workspace_key = ?".to_string());
180 params.push(Box::new(workspace.clone()));
181 }
182 let sql = format!(
183 "SELECT {group_expr} AS grp,
184 SUM(token_usage.prompt_tokens),
185 SUM(token_usage.completion_tokens),
186 SUM(token_usage.total_tokens),
187 SUM(token_usage.cached_prompt_tokens),
188 COUNT(*)
189 FROM token_usage
190 LEFT JOIN turns tu ON tu.thread_id = token_usage.thread_id
191 AND tu.turn_id = token_usage.turn_id
192 LEFT JOIN sessions s ON s.thread_id = token_usage.thread_id
193 WHERE {}
194 GROUP BY grp
195 ORDER BY SUM(token_usage.total_tokens) DESC
196 LIMIT {}",
197 clauses.join(" AND "),
198 effective_limit(filter)
199 );
200 let conn = self.conn.lock().unwrap();
201 let mut statement = conn.prepare(&sql)?;
202 let rows = statement.query_map(params_from_iter(params.iter()), |row| {
203 Ok(TokenSummaryRow {
204 group: row.get(0)?,
205 prompt_tokens: row.get::<_, i64>(1)? as u64,
206 completion_tokens: row.get::<_, i64>(2)? as u64,
207 total_tokens: row.get::<_, i64>(3)? as u64,
208 cached_prompt_tokens: row.get::<_, i64>(4)? as u64,
209 turn_count: row.get::<_, i64>(5)? as u64,
210 })
211 })?;
212 Ok(rows.collect::<Result<Vec<_>, _>>()?)
213 }
214
215 pub fn session_summaries(&self, filter: &StatsFilter) -> anyhow::Result<Vec<SessionSummary>> {
217 let mut clauses = vec!["1=1".to_string()];
218 let mut params: Vec<Box<dyn ToSql>> = Vec::new();
219 if let Some(thread) = &filter.thread_id {
220 clauses.push("s.thread_id = ?".to_string());
221 params.push(Box::new(thread.clone()));
222 }
223 if let Some(workspace) = &filter.workspace_key {
224 clauses.push("s.workspace_key = ?".to_string());
225 params.push(Box::new(workspace.clone()));
226 }
227 if let Some(since) = filter.since_ms {
228 clauses.push("s.updated_at_ms >= ?".to_string());
229 params.push(Box::new(since));
230 }
231 if let Some(until) = filter.until_ms {
232 clauses.push("s.created_at_ms < ?".to_string());
233 params.push(Box::new(until));
234 }
235 let sql = format!(
236 "SELECT s.thread_id, s.workspace_label, s.provider, s.model,
237 (SELECT COUNT(*) FROM turns t WHERE t.thread_id = s.thread_id),
238 (SELECT COUNT(*) FROM tool_calls tc WHERE tc.thread_id = s.thread_id),
239 (SELECT COUNT(*) FROM tool_calls tc
240 WHERE tc.thread_id = s.thread_id AND tc.is_error = 1),
241 (SELECT COALESCE(SUM(u.total_tokens), 0) FROM token_usage u
242 WHERE u.thread_id = s.thread_id),
243 (SELECT COALESCE(SUM(tc.duration_ms), 0) FROM tool_calls tc
244 WHERE tc.thread_id = s.thread_id),
245 (SELECT MIN(t.started_at_ms) FROM turns t WHERE t.thread_id = s.thread_id),
246 (SELECT MAX(t.completed_at_ms) FROM turns t WHERE t.thread_id = s.thread_id)
247 FROM sessions s
248 WHERE {}
249 ORDER BY 8 DESC
250 LIMIT {}",
251 clauses.join(" AND "),
252 effective_limit(filter)
253 );
254 let conn = self.conn.lock().unwrap();
255 let mut statement = conn.prepare(&sql)?;
256 let rows = statement.query_map(params_from_iter(params.iter()), |row| {
257 Ok(SessionSummary {
258 thread_id: row.get(0)?,
259 workspace_label: row.get(1)?,
260 provider: row.get(2)?,
261 model: row.get(3)?,
262 turn_count: row.get::<_, i64>(4)? as u64,
263 tool_call_count: row.get::<_, i64>(5)? as u64,
264 tool_error_count: row.get::<_, i64>(6)? as u64,
265 total_tokens: row.get::<_, i64>(7)? as u64,
266 total_tool_duration_ms: row.get(8)?,
267 first_activity_ms: row.get(9)?,
268 last_activity_ms: row.get(10)?,
269 })
270 })?;
271 Ok(rows.collect::<Result<Vec<_>, _>>()?)
272 }
273
274 pub fn usage_summary(&self, filter: &StatsFilter) -> anyhow::Result<UsageSummary> {
276 let tools = self.tool_summaries(&StatsFilter {
277 limit: Some(MAX_LIMIT),
278 min_calls: None,
279 ..filter.clone()
280 })?;
281 let tool_call_count = tools.iter().map(|tool| tool.call_count).sum();
282 let tool_error_count = tools.iter().map(|tool| tool.error_count).sum();
283 let most_called_tool = tools.first().map(|tool| tool.tool_name.clone());
284
285 let mut clauses = vec!["1=1".to_string()];
286 let mut params: Vec<Box<dyn ToSql>> = Vec::new();
287 if let Some(since) = filter.since_ms {
288 clauses.push("COALESCE(started_at_ms, completed_at_ms) >= ?".to_string());
289 params.push(Box::new(since));
290 }
291 if let Some(until) = filter.until_ms {
292 clauses.push("COALESCE(started_at_ms, completed_at_ms) < ?".to_string());
293 params.push(Box::new(until));
294 }
295 if let Some(thread) = &filter.thread_id {
296 clauses.push("thread_id = ?".to_string());
297 params.push(Box::new(thread.clone()));
298 }
299 let conn = self.conn.lock().unwrap();
300 let (turn_count, completed, failed): (i64, i64, i64) = conn.query_row(
301 &format!(
302 "SELECT COUNT(*),
303 SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END),
304 SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END)
305 FROM turns WHERE {}",
306 clauses.join(" AND ")
307 ),
308 params_from_iter(params.iter()),
309 |row| {
310 Ok((
311 row.get(0)?,
312 row.get::<_, Option<i64>>(1)?.unwrap_or(0),
313 row.get::<_, Option<i64>>(2)?.unwrap_or(0),
314 ))
315 },
316 )?;
317
318 let mut usage_clauses = vec!["1=1".to_string()];
319 let mut usage_params: Vec<Box<dyn ToSql>> = Vec::new();
320 if let Some(since) = filter.since_ms {
321 usage_clauses.push("recorded_at_ms >= ?".to_string());
322 usage_params.push(Box::new(since));
323 }
324 if let Some(until) = filter.until_ms {
325 usage_clauses.push("recorded_at_ms < ?".to_string());
326 usage_params.push(Box::new(until));
327 }
328 if let Some(thread) = &filter.thread_id {
329 usage_clauses.push("thread_id = ?".to_string());
330 usage_params.push(Box::new(thread.clone()));
331 }
332 let (prompt, completion, total, cached): (i64, i64, i64, i64) = conn.query_row(
333 &format!(
334 "SELECT COALESCE(SUM(prompt_tokens), 0), COALESCE(SUM(completion_tokens), 0),
335 COALESCE(SUM(total_tokens), 0), COALESCE(SUM(cached_prompt_tokens), 0)
336 FROM token_usage WHERE {}",
337 usage_clauses.join(" AND ")
338 ),
339 params_from_iter(usage_params.iter()),
340 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
341 )?;
342 let session_count: i64 =
343 conn.query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))?;
344
345 Ok(UsageSummary {
346 turn_count: turn_count as u64,
347 completed_turn_count: completed as u64,
348 failed_turn_count: failed as u64,
349 tool_call_count,
350 tool_error_count,
351 prompt_tokens: prompt as u64,
352 completion_tokens: completion as u64,
353 total_tokens: total as u64,
354 cached_prompt_tokens: cached as u64,
355 session_count: session_count as u64,
356 most_called_tool,
357 })
358 }
359
360 pub fn never_used_tools(
362 &self,
363 registered: &[String],
364 filter: &StatsFilter,
365 ) -> anyhow::Result<Vec<String>> {
366 let used: std::collections::BTreeSet<String> = self
367 .tool_summaries(&StatsFilter {
368 limit: Some(MAX_LIMIT),
369 min_calls: None,
370 ..filter.clone()
371 })?
372 .into_iter()
373 .map(|summary| summary.tool_name)
374 .collect();
375 Ok(registered
376 .iter()
377 .filter(|tool| !used.contains(*tool))
378 .cloned()
379 .collect())
380 }
381}
382
383pub fn sort_tool_summaries(summaries: &mut [ToolSummary], sort: &str) {
385 match sort {
386 "p95" => summaries.sort_by_key(|summary| std::cmp::Reverse(summary.p95_duration_ms)),
387 "errors" => summaries.sort_by(|a, b| {
388 b.error_rate
389 .partial_cmp(&a.error_rate)
390 .unwrap_or(std::cmp::Ordering::Equal)
391 }),
392 "underused" => summaries.sort_by_key(|summary| summary.call_count),
393 _ => summaries.sort_by_key(|summary| std::cmp::Reverse(summary.call_count)),
394 }
395}
396
397pub(crate) fn _connection_for_tests(
398 store: &AnalyticsStore,
399) -> std::sync::MutexGuard<'_, Connection> {
400 store.conn.lock().unwrap()
401}
402
403#[cfg(test)]
404mod tests {
405 use super::*;
406 use crate::model::{TokenUsageRecord, ToolCallRecord, TurnRecord, WorkspaceLabelMode};
407
408 fn temp_store() -> (AnalyticsStore, std::path::PathBuf) {
409 let dir =
410 std::env::temp_dir().join(format!("roder-analytics-query-{}", uuid::Uuid::new_v4()));
411 let store = AnalyticsStore::open(
412 &AnalyticsStore::default_path(&dir),
413 WorkspaceLabelMode::FullPath,
414 )
415 .unwrap();
416 (store, dir)
417 }
418
419 fn seed_tool_calls(store: &AnalyticsStore, tool: &str, durations: &[i64], errors: u64) {
420 for (index, duration) in durations.iter().enumerate() {
421 store
422 .upsert_tool_call(&ToolCallRecord {
423 thread_id: "t1".into(),
424 turn_id: "u1".into(),
425 tool_id: format!("{tool}-{index}"),
426 tool_name: Some(tool.to_string()),
427 started_at_ms: Some(1_000),
428 completed_at_ms: Some(1_000 + duration),
429 duration_ms: Some(*duration),
430 status: if (index as u64) < errors {
431 "error".into()
432 } else {
433 "success".into()
434 },
435 is_error: (index as u64) < errors,
436 })
437 .unwrap();
438 }
439 }
440
441 #[test]
442 fn percentiles_use_exact_nearest_rank() {
443 let durations: Vec<i64> = (1..=100).collect();
445 assert_eq!(percentile(&durations, 0.50), Some(50));
446 assert_eq!(percentile(&durations, 0.95), Some(95));
447 assert_eq!(percentile(&durations, 0.99), Some(99));
448 assert_eq!(percentile(&[], 0.95), None);
449 assert_eq!(percentile(&[7], 0.95), Some(7));
450 }
451
452 #[test]
453 fn tool_summaries_compute_counts_errors_and_p95() {
454 let (store, dir) = temp_store();
455 seed_tool_calls(&store, "read_file", &(1..=100).collect::<Vec<_>>(), 5);
456 seed_tool_calls(&store, "shell", &[10, 20], 2);
457
458 let summaries = store.tool_summaries(&StatsFilter::default()).unwrap();
459 assert_eq!(summaries[0].tool_name, "read_file");
460 assert_eq!(summaries[0].call_count, 100);
461 assert_eq!(summaries[0].error_count, 5);
462 assert_eq!(summaries[0].p50_duration_ms, Some(50));
463 assert_eq!(summaries[0].p95_duration_ms, Some(95));
464 assert_eq!(summaries[0].p99_duration_ms, Some(99));
465 assert_eq!(summaries[1].tool_name, "shell");
466 assert!((summaries[1].error_rate - 1.0).abs() < f64::EPSILON);
467
468 let mut by_errors = summaries.clone();
470 sort_tool_summaries(&mut by_errors, "errors");
471 assert_eq!(by_errors[0].tool_name, "shell");
472 let mut underused = summaries.clone();
473 sort_tool_summaries(&mut underused, "underused");
474 assert_eq!(underused[0].tool_name, "shell");
475
476 let registered = vec![
478 "read_file".to_string(),
479 "shell".to_string(),
480 "write_file".to_string(),
481 ];
482 assert_eq!(
483 store
484 .never_used_tools(®istered, &StatsFilter::default())
485 .unwrap(),
486 vec!["write_file".to_string()]
487 );
488 let _ = std::fs::remove_dir_all(&dir);
489 }
490
491 #[test]
492 fn token_summaries_group_by_day_and_session() {
493 let (store, dir) = temp_store();
494 let day1 = 1_750_000_000_000_i64; let day2 = day1 + 86_400_000;
496 for (thread, turn, at, total) in [
497 ("t1", "u1", day1, 100_u32),
498 ("t1", "u2", day1 + 1_000, 50),
499 ("t2", "u1", day2, 30),
500 ] {
501 store
502 .upsert_turn(&TurnRecord {
503 thread_id: thread.into(),
504 turn_id: turn.into(),
505 provider: Some("mock".into()),
506 model: Some("mock-model".into()),
507 runtime_profile: None,
508 started_at_ms: Some(at),
509 completed_at_ms: Some(at + 10),
510 status: "completed".into(),
511 error_kind: None,
512 })
513 .unwrap();
514 store
515 .upsert_token_usage(&TokenUsageRecord {
516 thread_id: thread.into(),
517 turn_id: turn.into(),
518 provider: None,
519 model: None,
520 recorded_at_ms: at,
521 prompt_tokens: total - 10,
522 completion_tokens: 10,
523 total_tokens: total,
524 cached_prompt_tokens: 0,
525 })
526 .unwrap();
527 }
528
529 let by_day = store
530 .token_summaries(TokenGroup::Day, &StatsFilter::default())
531 .unwrap();
532 assert_eq!(by_day.len(), 2);
533 assert_eq!(by_day[0].total_tokens, 150);
534 assert_eq!(by_day[0].turn_count, 2);
535
536 let by_session = store
537 .token_summaries(TokenGroup::Session, &StatsFilter::default())
538 .unwrap();
539 assert_eq!(by_session[0].group, "t1");
540 assert_eq!(by_session[0].total_tokens, 150);
541
542 let by_model = store
543 .token_summaries(TokenGroup::Model, &StatsFilter::default())
544 .unwrap();
545 assert_eq!(by_model[0].group, "mock-model");
546 assert_eq!(by_model[0].total_tokens, 180);
547 let _ = std::fs::remove_dir_all(&dir);
548 }
549}