Skip to main content

roder_usage_analytics/
query.rs

1//! Typed query APIs: tool summaries with exact percentiles, token
2//! summaries by grouping, session summaries, and underutilization helpers.
3
4use rusqlite::{Connection, ToSql, params_from_iter};
5
6use crate::model::{
7    SessionSummary, StatsFilter, TokenGroup, TokenSummaryRow, ToolSummary, UsageSummary,
8};
9use crate::store::AnalyticsStore;
10
11pub const DEFAULT_LIMIT: u64 = 50;
12/// Hard cap applied to all listing queries regardless of the caller.
13pub const MAX_LIMIT: u64 = 1_000;
14
15fn effective_limit(filter: &StatsFilter) -> u64 {
16    filter.limit.unwrap_or(DEFAULT_LIMIT).min(MAX_LIMIT)
17}
18
19struct Where {
20    clause: String,
21    params: Vec<Box<dyn ToSql>>,
22}
23
24/// Builds a WHERE clause over `tool_calls` (alias `tc`) joined with
25/// `turns` (alias `tu`) and `sessions` (alias `s`).
26fn tool_call_filter(filter: &StatsFilter) -> Where {
27    let mut clauses = vec!["1=1".to_string()];
28    let mut params: Vec<Box<dyn ToSql>> = Vec::new();
29    if let Some(since) = filter.since_ms {
30        clauses.push("COALESCE(tc.started_at_ms, tc.completed_at_ms) >= ?".to_string());
31        params.push(Box::new(since));
32    }
33    if let Some(until) = filter.until_ms {
34        clauses.push("COALESCE(tc.started_at_ms, tc.completed_at_ms) < ?".to_string());
35        params.push(Box::new(until));
36    }
37    if let Some(thread) = &filter.thread_id {
38        clauses.push("tc.thread_id = ?".to_string());
39        params.push(Box::new(thread.clone()));
40    }
41    if let Some(tool) = &filter.tool_name {
42        clauses.push("tc.tool_name = ?".to_string());
43        params.push(Box::new(tool.clone()));
44    }
45    if let Some(provider) = &filter.provider {
46        clauses.push("tu.provider = ?".to_string());
47        params.push(Box::new(provider.clone()));
48    }
49    if let Some(model) = &filter.model {
50        clauses.push("tu.model = ?".to_string());
51        params.push(Box::new(model.clone()));
52    }
53    if let Some(workspace) = &filter.workspace_key {
54        clauses.push("s.workspace_key = ?".to_string());
55        params.push(Box::new(workspace.clone()));
56    }
57    Where {
58        clause: clauses.join(" AND "),
59        params,
60    }
61}
62
63/// Exact nearest-rank percentile over a sorted ascending slice.
64pub(crate) fn percentile(sorted: &[i64], quantile: f64) -> Option<i64> {
65    if sorted.is_empty() {
66        return None;
67    }
68    let rank = (quantile * sorted.len() as f64).ceil() as usize;
69    Some(sorted[rank.clamp(1, sorted.len()) - 1])
70}
71
72impl AnalyticsStore {
73    /// Per-tool summaries with exact percentiles computed from raw
74    /// durations in the filtered window.
75    pub fn tool_summaries(&self, filter: &StatsFilter) -> anyhow::Result<Vec<ToolSummary>> {
76        let conn = self.conn.lock().unwrap();
77        let where_clause = tool_call_filter(filter);
78        let sql = format!(
79            "SELECT COALESCE(tc.tool_name, '(unknown)') AS name, tc.duration_ms, tc.is_error
80             FROM tool_calls tc
81             LEFT JOIN turns tu ON tu.thread_id = tc.thread_id AND tu.turn_id = tc.turn_id
82             LEFT JOIN sessions s ON s.thread_id = tc.thread_id
83             WHERE {}",
84            where_clause.clause
85        );
86        let mut statement = conn.prepare(&sql)?;
87        let mut grouped: std::collections::BTreeMap<String, (Vec<i64>, u64, u64)> =
88            std::collections::BTreeMap::new();
89        let rows = statement.query_map(params_from_iter(where_clause.params.iter()), |row| {
90            Ok((
91                row.get::<_, String>(0)?,
92                row.get::<_, Option<i64>>(1)?,
93                row.get::<_, bool>(2)?,
94            ))
95        })?;
96        for row in rows {
97            let (name, duration, is_error) = row?;
98            let entry = grouped.entry(name).or_default();
99            entry.1 += 1;
100            if is_error {
101                entry.2 += 1;
102            }
103            if let Some(duration) = duration {
104                entry.0.push(duration);
105            }
106        }
107        let min_calls = filter.min_calls.unwrap_or(0);
108        let mut summaries: Vec<ToolSummary> = grouped
109            .into_iter()
110            .filter(|(_, (_, calls, _))| *calls >= min_calls)
111            .map(|(tool_name, (mut durations, call_count, error_count))| {
112                durations.sort_unstable();
113                let total: i64 = durations.iter().sum();
114                ToolSummary {
115                    tool_name,
116                    call_count,
117                    error_count,
118                    error_rate: if call_count == 0 {
119                        0.0
120                    } else {
121                        error_count as f64 / call_count as f64
122                    },
123                    total_duration_ms: total,
124                    avg_duration_ms: if durations.is_empty() {
125                        None
126                    } else {
127                        Some(total as f64 / durations.len() as f64)
128                    },
129                    p50_duration_ms: percentile(&durations, 0.50),
130                    p95_duration_ms: percentile(&durations, 0.95),
131                    p99_duration_ms: percentile(&durations, 0.99),
132                }
133            })
134            .collect();
135        summaries.sort_by_key(|summary| std::cmp::Reverse(summary.call_count));
136        summaries.truncate(effective_limit(filter) as usize);
137        Ok(summaries)
138    }
139
140    /// Token summaries grouped by day, session, provider, model, or
141    /// workspace. Reads only the projected tables, never raw JSONL.
142    pub fn token_summaries(
143        &self,
144        group: TokenGroup,
145        filter: &StatsFilter,
146    ) -> anyhow::Result<Vec<TokenSummaryRow>> {
147        let group_expr = match group {
148            TokenGroup::Day => {
149                "strftime('%Y-%m-%d', token_usage.recorded_at_ms / 1000, 'unixepoch')"
150            }
151            TokenGroup::Session => "token_usage.thread_id",
152            TokenGroup::Provider => "COALESCE(tu.provider, '(unknown)')",
153            TokenGroup::Model => "COALESCE(tu.model, '(unknown)')",
154            TokenGroup::Workspace => "COALESCE(s.workspace_label, '(unknown)')",
155        };
156        let mut clauses = vec!["1=1".to_string()];
157        let mut params: Vec<Box<dyn ToSql>> = Vec::new();
158        if let Some(since) = filter.since_ms {
159            clauses.push("token_usage.recorded_at_ms >= ?".to_string());
160            params.push(Box::new(since));
161        }
162        if let Some(until) = filter.until_ms {
163            clauses.push("token_usage.recorded_at_ms < ?".to_string());
164            params.push(Box::new(until));
165        }
166        if let Some(thread) = &filter.thread_id {
167            clauses.push("token_usage.thread_id = ?".to_string());
168            params.push(Box::new(thread.clone()));
169        }
170        if let Some(provider) = &filter.provider {
171            clauses.push("tu.provider = ?".to_string());
172            params.push(Box::new(provider.clone()));
173        }
174        if let Some(model) = &filter.model {
175            clauses.push("tu.model = ?".to_string());
176            params.push(Box::new(model.clone()));
177        }
178        if let Some(workspace) = &filter.workspace_key {
179            clauses.push("s.workspace_key = ?".to_string());
180            params.push(Box::new(workspace.clone()));
181        }
182        let sql = format!(
183            "SELECT {group_expr} AS grp,
184                    SUM(token_usage.prompt_tokens),
185                    SUM(token_usage.completion_tokens),
186                    SUM(token_usage.total_tokens),
187                    SUM(token_usage.cached_prompt_tokens),
188                    COUNT(*)
189             FROM token_usage
190             LEFT JOIN turns tu ON tu.thread_id = token_usage.thread_id
191                               AND tu.turn_id = token_usage.turn_id
192             LEFT JOIN sessions s ON s.thread_id = token_usage.thread_id
193             WHERE {}
194             GROUP BY grp
195             ORDER BY SUM(token_usage.total_tokens) DESC
196             LIMIT {}",
197            clauses.join(" AND "),
198            effective_limit(filter)
199        );
200        let conn = self.conn.lock().unwrap();
201        let mut statement = conn.prepare(&sql)?;
202        let rows = statement.query_map(params_from_iter(params.iter()), |row| {
203            Ok(TokenSummaryRow {
204                group: row.get(0)?,
205                prompt_tokens: row.get::<_, i64>(1)? as u64,
206                completion_tokens: row.get::<_, i64>(2)? as u64,
207                total_tokens: row.get::<_, i64>(3)? as u64,
208                cached_prompt_tokens: row.get::<_, i64>(4)? as u64,
209                turn_count: row.get::<_, i64>(5)? as u64,
210            })
211        })?;
212        Ok(rows.collect::<Result<Vec<_>, _>>()?)
213    }
214
215    /// Session summaries sortable by tokens, tool calls, duration, errors.
216    pub fn session_summaries(&self, filter: &StatsFilter) -> anyhow::Result<Vec<SessionSummary>> {
217        let mut clauses = vec!["1=1".to_string()];
218        let mut params: Vec<Box<dyn ToSql>> = Vec::new();
219        if let Some(thread) = &filter.thread_id {
220            clauses.push("s.thread_id = ?".to_string());
221            params.push(Box::new(thread.clone()));
222        }
223        if let Some(workspace) = &filter.workspace_key {
224            clauses.push("s.workspace_key = ?".to_string());
225            params.push(Box::new(workspace.clone()));
226        }
227        if let Some(since) = filter.since_ms {
228            clauses.push("s.updated_at_ms >= ?".to_string());
229            params.push(Box::new(since));
230        }
231        if let Some(until) = filter.until_ms {
232            clauses.push("s.created_at_ms < ?".to_string());
233            params.push(Box::new(until));
234        }
235        let sql = format!(
236            "SELECT s.thread_id, s.workspace_label, s.provider, s.model,
237                    (SELECT COUNT(*) FROM turns t WHERE t.thread_id = s.thread_id),
238                    (SELECT COUNT(*) FROM tool_calls tc WHERE tc.thread_id = s.thread_id),
239                    (SELECT COUNT(*) FROM tool_calls tc
240                      WHERE tc.thread_id = s.thread_id AND tc.is_error = 1),
241                    (SELECT COALESCE(SUM(u.total_tokens), 0) FROM token_usage u
242                      WHERE u.thread_id = s.thread_id),
243                    (SELECT COALESCE(SUM(tc.duration_ms), 0) FROM tool_calls tc
244                      WHERE tc.thread_id = s.thread_id),
245                    (SELECT MIN(t.started_at_ms) FROM turns t WHERE t.thread_id = s.thread_id),
246                    (SELECT MAX(t.completed_at_ms) FROM turns t WHERE t.thread_id = s.thread_id)
247             FROM sessions s
248             WHERE {}
249             ORDER BY 8 DESC
250             LIMIT {}",
251            clauses.join(" AND "),
252            effective_limit(filter)
253        );
254        let conn = self.conn.lock().unwrap();
255        let mut statement = conn.prepare(&sql)?;
256        let rows = statement.query_map(params_from_iter(params.iter()), |row| {
257            Ok(SessionSummary {
258                thread_id: row.get(0)?,
259                workspace_label: row.get(1)?,
260                provider: row.get(2)?,
261                model: row.get(3)?,
262                turn_count: row.get::<_, i64>(4)? as u64,
263                tool_call_count: row.get::<_, i64>(5)? as u64,
264                tool_error_count: row.get::<_, i64>(6)? as u64,
265                total_tokens: row.get::<_, i64>(7)? as u64,
266                total_tool_duration_ms: row.get(8)?,
267                first_activity_ms: row.get(9)?,
268                last_activity_ms: row.get(10)?,
269            })
270        })?;
271        Ok(rows.collect::<Result<Vec<_>, _>>()?)
272    }
273
274    /// Overall window summary for dashboards and `stats summary`.
275    pub fn usage_summary(&self, filter: &StatsFilter) -> anyhow::Result<UsageSummary> {
276        let tools = self.tool_summaries(&StatsFilter {
277            limit: Some(MAX_LIMIT),
278            min_calls: None,
279            ..filter.clone()
280        })?;
281        let tool_call_count = tools.iter().map(|tool| tool.call_count).sum();
282        let tool_error_count = tools.iter().map(|tool| tool.error_count).sum();
283        let most_called_tool = tools.first().map(|tool| tool.tool_name.clone());
284
285        let mut clauses = vec!["1=1".to_string()];
286        let mut params: Vec<Box<dyn ToSql>> = Vec::new();
287        if let Some(since) = filter.since_ms {
288            clauses.push("COALESCE(started_at_ms, completed_at_ms) >= ?".to_string());
289            params.push(Box::new(since));
290        }
291        if let Some(until) = filter.until_ms {
292            clauses.push("COALESCE(started_at_ms, completed_at_ms) < ?".to_string());
293            params.push(Box::new(until));
294        }
295        if let Some(thread) = &filter.thread_id {
296            clauses.push("thread_id = ?".to_string());
297            params.push(Box::new(thread.clone()));
298        }
299        let conn = self.conn.lock().unwrap();
300        let (turn_count, completed, failed): (i64, i64, i64) = conn.query_row(
301            &format!(
302                "SELECT COUNT(*),
303                        SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END),
304                        SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END)
305                 FROM turns WHERE {}",
306                clauses.join(" AND ")
307            ),
308            params_from_iter(params.iter()),
309            |row| {
310                Ok((
311                    row.get(0)?,
312                    row.get::<_, Option<i64>>(1)?.unwrap_or(0),
313                    row.get::<_, Option<i64>>(2)?.unwrap_or(0),
314                ))
315            },
316        )?;
317
318        let mut usage_clauses = vec!["1=1".to_string()];
319        let mut usage_params: Vec<Box<dyn ToSql>> = Vec::new();
320        if let Some(since) = filter.since_ms {
321            usage_clauses.push("recorded_at_ms >= ?".to_string());
322            usage_params.push(Box::new(since));
323        }
324        if let Some(until) = filter.until_ms {
325            usage_clauses.push("recorded_at_ms < ?".to_string());
326            usage_params.push(Box::new(until));
327        }
328        if let Some(thread) = &filter.thread_id {
329            usage_clauses.push("thread_id = ?".to_string());
330            usage_params.push(Box::new(thread.clone()));
331        }
332        let (prompt, completion, total, cached): (i64, i64, i64, i64) = conn.query_row(
333            &format!(
334                "SELECT COALESCE(SUM(prompt_tokens), 0), COALESCE(SUM(completion_tokens), 0),
335                        COALESCE(SUM(total_tokens), 0), COALESCE(SUM(cached_prompt_tokens), 0)
336                 FROM token_usage WHERE {}",
337                usage_clauses.join(" AND ")
338            ),
339            params_from_iter(usage_params.iter()),
340            |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
341        )?;
342        let session_count: i64 =
343            conn.query_row("SELECT COUNT(*) FROM sessions", [], |row| row.get(0))?;
344
345        Ok(UsageSummary {
346            turn_count: turn_count as u64,
347            completed_turn_count: completed as u64,
348            failed_turn_count: failed as u64,
349            tool_call_count,
350            tool_error_count,
351            prompt_tokens: prompt as u64,
352            completion_tokens: completion as u64,
353            total_tokens: total as u64,
354            cached_prompt_tokens: cached as u64,
355            session_count: session_count as u64,
356            most_called_tool,
357        })
358    }
359
360    /// Registered tools that were never called in the window.
361    pub fn never_used_tools(
362        &self,
363        registered: &[String],
364        filter: &StatsFilter,
365    ) -> anyhow::Result<Vec<String>> {
366        let used: std::collections::BTreeSet<String> = self
367            .tool_summaries(&StatsFilter {
368                limit: Some(MAX_LIMIT),
369                min_calls: None,
370                ..filter.clone()
371            })?
372            .into_iter()
373            .map(|summary| summary.tool_name)
374            .collect();
375        Ok(registered
376            .iter()
377            .filter(|tool| !used.contains(*tool))
378            .cloned()
379            .collect())
380    }
381}
382
383/// Sort orders shared by the CLI and app-server tool listings.
384pub fn sort_tool_summaries(summaries: &mut [ToolSummary], sort: &str) {
385    match sort {
386        "p95" => summaries.sort_by_key(|summary| std::cmp::Reverse(summary.p95_duration_ms)),
387        "errors" => summaries.sort_by(|a, b| {
388            b.error_rate
389                .partial_cmp(&a.error_rate)
390                .unwrap_or(std::cmp::Ordering::Equal)
391        }),
392        "underused" => summaries.sort_by_key(|summary| summary.call_count),
393        _ => summaries.sort_by_key(|summary| std::cmp::Reverse(summary.call_count)),
394    }
395}
396
397pub(crate) fn _connection_for_tests(
398    store: &AnalyticsStore,
399) -> std::sync::MutexGuard<'_, Connection> {
400    store.conn.lock().unwrap()
401}
402
403#[cfg(test)]
404mod tests {
405    use super::*;
406    use crate::model::{TokenUsageRecord, ToolCallRecord, TurnRecord, WorkspaceLabelMode};
407
408    fn temp_store() -> (AnalyticsStore, std::path::PathBuf) {
409        let dir =
410            std::env::temp_dir().join(format!("roder-analytics-query-{}", uuid::Uuid::new_v4()));
411        let store = AnalyticsStore::open(
412            &AnalyticsStore::default_path(&dir),
413            WorkspaceLabelMode::FullPath,
414        )
415        .unwrap();
416        (store, dir)
417    }
418
419    fn seed_tool_calls(store: &AnalyticsStore, tool: &str, durations: &[i64], errors: u64) {
420        for (index, duration) in durations.iter().enumerate() {
421            store
422                .upsert_tool_call(&ToolCallRecord {
423                    thread_id: "t1".into(),
424                    turn_id: "u1".into(),
425                    tool_id: format!("{tool}-{index}"),
426                    tool_name: Some(tool.to_string()),
427                    started_at_ms: Some(1_000),
428                    completed_at_ms: Some(1_000 + duration),
429                    duration_ms: Some(*duration),
430                    status: if (index as u64) < errors {
431                        "error".into()
432                    } else {
433                        "success".into()
434                    },
435                    is_error: (index as u64) < errors,
436                })
437                .unwrap();
438        }
439    }
440
441    #[test]
442    fn percentiles_use_exact_nearest_rank() {
443        // 1..=100 -> p50 = 50, p95 = 95, p99 = 99.
444        let durations: Vec<i64> = (1..=100).collect();
445        assert_eq!(percentile(&durations, 0.50), Some(50));
446        assert_eq!(percentile(&durations, 0.95), Some(95));
447        assert_eq!(percentile(&durations, 0.99), Some(99));
448        assert_eq!(percentile(&[], 0.95), None);
449        assert_eq!(percentile(&[7], 0.95), Some(7));
450    }
451
452    #[test]
453    fn tool_summaries_compute_counts_errors_and_p95() {
454        let (store, dir) = temp_store();
455        seed_tool_calls(&store, "read_file", &(1..=100).collect::<Vec<_>>(), 5);
456        seed_tool_calls(&store, "shell", &[10, 20], 2);
457
458        let summaries = store.tool_summaries(&StatsFilter::default()).unwrap();
459        assert_eq!(summaries[0].tool_name, "read_file");
460        assert_eq!(summaries[0].call_count, 100);
461        assert_eq!(summaries[0].error_count, 5);
462        assert_eq!(summaries[0].p50_duration_ms, Some(50));
463        assert_eq!(summaries[0].p95_duration_ms, Some(95));
464        assert_eq!(summaries[0].p99_duration_ms, Some(99));
465        assert_eq!(summaries[1].tool_name, "shell");
466        assert!((summaries[1].error_rate - 1.0).abs() < f64::EPSILON);
467
468        // Sort helpers are deterministic.
469        let mut by_errors = summaries.clone();
470        sort_tool_summaries(&mut by_errors, "errors");
471        assert_eq!(by_errors[0].tool_name, "shell");
472        let mut underused = summaries.clone();
473        sort_tool_summaries(&mut underused, "underused");
474        assert_eq!(underused[0].tool_name, "shell");
475
476        // Never-used helper compares against the registered tool list.
477        let registered = vec![
478            "read_file".to_string(),
479            "shell".to_string(),
480            "write_file".to_string(),
481        ];
482        assert_eq!(
483            store
484                .never_used_tools(&registered, &StatsFilter::default())
485                .unwrap(),
486            vec!["write_file".to_string()]
487        );
488        let _ = std::fs::remove_dir_all(&dir);
489    }
490
491    #[test]
492    fn token_summaries_group_by_day_and_session() {
493        let (store, dir) = temp_store();
494        let day1 = 1_750_000_000_000_i64; // 2025-06-15
495        let day2 = day1 + 86_400_000;
496        for (thread, turn, at, total) in [
497            ("t1", "u1", day1, 100_u32),
498            ("t1", "u2", day1 + 1_000, 50),
499            ("t2", "u1", day2, 30),
500        ] {
501            store
502                .upsert_turn(&TurnRecord {
503                    thread_id: thread.into(),
504                    turn_id: turn.into(),
505                    provider: Some("mock".into()),
506                    model: Some("mock-model".into()),
507                    runtime_profile: None,
508                    started_at_ms: Some(at),
509                    completed_at_ms: Some(at + 10),
510                    status: "completed".into(),
511                    error_kind: None,
512                })
513                .unwrap();
514            store
515                .upsert_token_usage(&TokenUsageRecord {
516                    thread_id: thread.into(),
517                    turn_id: turn.into(),
518                    provider: None,
519                    model: None,
520                    recorded_at_ms: at,
521                    prompt_tokens: total - 10,
522                    completion_tokens: 10,
523                    total_tokens: total,
524                    cached_prompt_tokens: 0,
525                })
526                .unwrap();
527        }
528
529        let by_day = store
530            .token_summaries(TokenGroup::Day, &StatsFilter::default())
531            .unwrap();
532        assert_eq!(by_day.len(), 2);
533        assert_eq!(by_day[0].total_tokens, 150);
534        assert_eq!(by_day[0].turn_count, 2);
535
536        let by_session = store
537            .token_summaries(TokenGroup::Session, &StatsFilter::default())
538            .unwrap();
539        assert_eq!(by_session[0].group, "t1");
540        assert_eq!(by_session[0].total_tokens, 150);
541
542        let by_model = store
543            .token_summaries(TokenGroup::Model, &StatsFilter::default())
544            .unwrap();
545        assert_eq!(by_model[0].group, "mock-model");
546        assert_eq!(by_model[0].total_tokens, 180);
547        let _ = std::fs::remove_dir_all(&dir);
548    }
549}