Skip to main content

toolhub_storage/
usage.rs

1//! `usage_events` writer + `tool_scores` aggregator. Phase 4.
2//!
3//! `insert_event` is idempotent on `uuid` — re-running `toolhub score` on the
4//! same JSONL files MUST NOT double-count. `recompute_scores` rebuilds
5//! `tool_scores` from scratch (DELETE + INSERT inside a transaction): the
6//! table is a derived projection, so a clean rebuild is simpler than
7//! incremental updates and stays correct as `usage_events` evolves.
8
9use anyhow::Context;
10use chrono::Utc;
11use rusqlite::{Connection, OptionalExtension, params};
12use toolhub_core::usage::{Outcome, UsageEvent};
13
14/// INSERT OR IGNORE — returns true if the row was new, false if `uuid`
15/// collided with an existing event. Events without `uuid` are always inserted
16/// (no dedupe key), so callers should prefer to set `uuid` whenever possible.
17///
18/// `usage_events.tool_id` has a FOREIGN KEY into `tools(id)` and SQLite
19/// foreign-key enforcement is on for this connection — callers MUST upsert a
20/// matching `tools` row before inserting events for it. The session_jsonl
21/// replay ingestor skips events for un-catalogued tool ids.
22pub fn insert_event(conn: &Connection, e: &UsageEvent) -> anyhow::Result<bool> {
23    let occurred = e.occurred_at.to_rfc3339();
24    let outcome = e.outcome.as_str();
25    let n = conn.execute(
26        "INSERT OR IGNORE INTO usage_events
27            (uuid, tool_id, session_id, project, task_text, outcome,
28             duration_ms, cost_usd, occurred_at)
29         VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
30        params![
31            e.uuid,
32            e.tool_id,
33            e.session_id,
34            e.project,
35            e.task_text,
36            outcome,
37            e.duration_ms,
38            e.cost_usd,
39            occurred,
40        ],
41    )?;
42    Ok(n > 0)
43}
44
45pub fn count_events(conn: &Connection) -> anyhow::Result<i64> {
46    let n: i64 = conn.query_row("SELECT COUNT(*) FROM usage_events", [], |r| r.get(0))?;
47    Ok(n)
48}
49
50#[derive(Debug, Clone)]
51pub struct EventRow {
52    pub id: i64,
53    pub uuid: Option<String>,
54    pub tool_id: String,
55    pub session_id: Option<String>,
56    pub project: Option<String>,
57    pub task_text: Option<String>,
58    pub outcome: Option<Outcome>,
59    pub duration_ms: Option<i64>,
60    pub cost_usd: Option<f64>,
61    pub occurred_at: String,
62}
63
64pub fn list_events(
65    conn: &Connection,
66    tool_id: Option<&str>,
67    limit: usize,
68) -> anyhow::Result<Vec<EventRow>> {
69    let (sql, params): (&str, Vec<Box<dyn rusqlite::ToSql>>) = match tool_id {
70        Some(id) => (
71            "SELECT id, uuid, tool_id, session_id, project, task_text, outcome,
72                    duration_ms, cost_usd, occurred_at
73             FROM usage_events
74             WHERE tool_id = ?
75             ORDER BY occurred_at DESC
76             LIMIT ?",
77            vec![Box::new(id.to_string()), Box::new(limit as i64)],
78        ),
79        None => (
80            "SELECT id, uuid, tool_id, session_id, project, task_text, outcome,
81                    duration_ms, cost_usd, occurred_at
82             FROM usage_events
83             ORDER BY occurred_at DESC
84             LIMIT ?",
85            vec![Box::new(limit as i64)],
86        ),
87    };
88    let mut stmt = conn.prepare(sql)?;
89    let param_refs: Vec<&dyn rusqlite::ToSql> = params
90        .iter()
91        .map(|b| b.as_ref() as &dyn rusqlite::ToSql)
92        .collect();
93    let rows = stmt
94        .query_map(param_refs.as_slice(), |row| {
95            let outcome: Option<String> = row.get(6)?;
96            Ok(EventRow {
97                id: row.get(0)?,
98                uuid: row.get(1)?,
99                tool_id: row.get(2)?,
100                session_id: row.get(3)?,
101                project: row.get(4)?,
102                task_text: row.get(5)?,
103                outcome: outcome.as_deref().and_then(Outcome::parse),
104                duration_ms: row.get(7)?,
105                cost_usd: row.get(8)?,
106                occurred_at: row.get(9)?,
107            })
108        })?
109        .collect::<Result<Vec<_>, _>>()?;
110    Ok(rows)
111}
112
113#[derive(Debug, Clone, Copy)]
114struct ScoreAcc {
115    successes: u64,
116    total: u64,
117    cost_sum: f64,
118    cost_count: u64,
119}
120
121impl ScoreAcc {
122    fn new() -> Self {
123        Self {
124            successes: 0,
125            total: 0,
126            cost_sum: 0.0,
127            cost_count: 0,
128        }
129    }
130}
131
132/// Rebuild `tool_scores` from `usage_events`. Returns the number of tool rows
133/// written. Intended to be called after a batch of `insert_event` calls.
134pub fn recompute_scores(conn: &mut Connection) -> anyhow::Result<usize> {
135    // Pull every event in one query, aggregate Rust-side (SQLite has no median).
136    let mut stmt =
137        conn.prepare("SELECT tool_id, outcome, cost_usd, duration_ms FROM usage_events")?;
138    let rows = stmt
139        .query_map([], |row| {
140            Ok((
141                row.get::<_, String>(0)?,
142                row.get::<_, Option<String>>(1)?,
143                row.get::<_, Option<f64>>(2)?,
144                row.get::<_, Option<i64>>(3)?,
145            ))
146        })?
147        .collect::<Result<Vec<_>, _>>()?;
148    drop(stmt);
149
150    use std::collections::HashMap;
151    let mut accs: HashMap<String, ScoreAcc> = HashMap::new();
152    let mut durations: HashMap<String, Vec<i64>> = HashMap::new();
153
154    for (tool_id, outcome, cost, dur) in rows {
155        let acc = accs.entry(tool_id.clone()).or_insert_with(ScoreAcc::new);
156        acc.total += 1;
157        if outcome.as_deref() == Some("success") {
158            acc.successes += 1;
159        }
160        if let Some(c) = cost {
161            acc.cost_sum += c;
162            acc.cost_count += 1;
163        }
164        if let Some(d) = dur {
165            durations.entry(tool_id).or_default().push(d);
166        }
167    }
168
169    let now = Utc::now().to_rfc3339();
170    let tx = conn.transaction()?;
171    tx.execute("DELETE FROM tool_scores", [])
172        .context("clear tool_scores")?;
173    for (tool_id, acc) in &accs {
174        let success_rate = if acc.total == 0 {
175            None
176        } else {
177            Some(acc.successes as f64 / acc.total as f64)
178        };
179        let avg_cost = if acc.cost_count == 0 {
180            None
181        } else {
182            Some(acc.cost_sum / acc.cost_count as f64)
183        };
184        let median_dur = durations.get(tool_id).map(|v| {
185            let mut s = v.clone();
186            s.sort_unstable();
187            s[s.len() / 2]
188        });
189        tx.execute(
190            "INSERT INTO tool_scores
191                (tool_id, success_rate, sample_size, avg_cost_usd,
192                 median_duration_ms, score_updated_at)
193             VALUES (?, ?, ?, ?, ?, ?)",
194            params![
195                tool_id,
196                success_rate,
197                acc.total as i64,
198                avg_cost,
199                median_dur,
200                now,
201            ],
202        )?;
203    }
204    tx.commit()?;
205    Ok(accs.len())
206}
207
208/// Tools that have NOT been used in the last `older_than_days` days.
209pub fn dead_weight(
210    conn: &Connection,
211    older_than_days: u32,
212) -> anyhow::Result<Vec<(String, String, Option<String>)>> {
213    let cutoff = format!("-{older_than_days} days");
214    let mut stmt = conn.prepare(
215        "SELECT t.id, t.name,
216                (SELECT MAX(occurred_at) FROM usage_events WHERE tool_id = t.id) AS last_used
217         FROM tools t
218         WHERE NOT EXISTS (
219             SELECT 1 FROM usage_events u
220             WHERE u.tool_id = t.id
221               AND u.occurred_at >= datetime('now', ?1)
222         )
223         ORDER BY t.id",
224    )?;
225    let rows = stmt
226        .query_map([cutoff], |row| {
227            Ok((
228                row.get::<_, String>(0)?,
229                row.get::<_, String>(1)?,
230                row.get::<_, Option<String>>(2)?,
231            ))
232        })?
233        .collect::<Result<Vec<_>, _>>()?;
234    Ok(rows)
235}
236
237/// Most-recent `occurred_at` for a tool, used by `stats --tool`.
238pub fn last_used(conn: &Connection, tool_id: &str) -> anyhow::Result<Option<String>> {
239    let v: Option<String> = conn
240        .query_row(
241            "SELECT MAX(occurred_at) FROM usage_events WHERE tool_id = ?",
242            params![tool_id],
243            |r| r.get(0),
244        )
245        .optional()?
246        .flatten();
247    Ok(v)
248}
249
250#[cfg(test)]
251mod tests {
252    use super::*;
253    use crate::open;
254    use chrono::TimeZone;
255
256    fn evt(
257        uuid: &str,
258        tool: &str,
259        outcome: Outcome,
260        cost: Option<f64>,
261        dur: Option<i64>,
262    ) -> UsageEvent {
263        UsageEvent {
264            uuid: Some(uuid.to_string()),
265            tool_id: tool.to_string(),
266            session_id: Some("sess-1".into()),
267            project: Some("quiver".into()),
268            task_text: Some("a task".into()),
269            outcome,
270            duration_ms: dur,
271            cost_usd: cost,
272            occurred_at: Utc.with_ymd_and_hms(2026, 5, 3, 12, 0, 0).unwrap(),
273        }
274    }
275
276    fn seed_tool(conn: &Connection, id: &str) {
277        conn.execute(
278            "INSERT OR IGNORE INTO tools (id, type, name, triggers, examples, requires,
279                                          enabled, added_at, last_seen_at)
280             VALUES (?, 'skill', ?, '[]', '[]', '[]', 1,
281                     '2026-05-03T00:00:00+00:00', '2026-05-03T00:00:00+00:00')",
282            params![id, id],
283        )
284        .unwrap();
285    }
286
287    fn tmp_conn() -> (tempfile::TempDir, Connection) {
288        let dir = tempfile::tempdir().unwrap();
289        let conn = open(&dir.path().join("t.sqlite")).unwrap();
290        (dir, conn)
291    }
292
293    #[test]
294    fn insert_dedupes_on_uuid() {
295        let (_d, conn) = tmp_conn();
296        seed_tool(&conn, "skill:caveman");
297        let e = evt("u1", "skill:caveman", Outcome::Success, None, None);
298        assert!(insert_event(&conn, &e).unwrap());
299        assert!(!insert_event(&conn, &e).unwrap());
300        assert_eq!(count_events(&conn).unwrap(), 1);
301    }
302
303    #[test]
304    fn list_events_filters_by_tool() {
305        let (_d, conn) = tmp_conn();
306        seed_tool(&conn, "skill:a");
307        seed_tool(&conn, "skill:b");
308        insert_event(&conn, &evt("u1", "skill:a", Outcome::Success, None, None)).unwrap();
309        insert_event(&conn, &evt("u2", "skill:b", Outcome::Failure, None, None)).unwrap();
310        let all = list_events(&conn, None, 10).unwrap();
311        assert_eq!(all.len(), 2);
312        let only_b = list_events(&conn, Some("skill:b"), 10).unwrap();
313        assert_eq!(only_b.len(), 1);
314        assert_eq!(only_b[0].tool_id, "skill:b");
315        assert_eq!(only_b[0].outcome, Some(Outcome::Failure));
316    }
317
318    #[test]
319    fn recompute_scores_aggregates_correctly() {
320        let (_d, mut conn) = tmp_conn();
321        seed_tool(&conn, "skill:x");
322        seed_tool(&conn, "skill:y");
323        insert_event(
324            &conn,
325            &evt("u1", "skill:x", Outcome::Success, Some(0.10), Some(100)),
326        )
327        .unwrap();
328        insert_event(
329            &conn,
330            &evt("u2", "skill:x", Outcome::Success, Some(0.20), Some(200)),
331        )
332        .unwrap();
333        insert_event(
334            &conn,
335            &evt("u3", "skill:x", Outcome::Failure, Some(0.30), Some(300)),
336        )
337        .unwrap();
338        insert_event(&conn, &evt("u4", "skill:y", Outcome::Failure, None, None)).unwrap();
339        let touched = recompute_scores(&mut conn).unwrap();
340        assert_eq!(touched, 2);
341        let scores = crate::scores::list(&conn, Some("skill:x")).unwrap();
342        assert_eq!(scores.len(), 1);
343        let row = &scores[0];
344        assert!((row.success_rate.unwrap() - 2.0 / 3.0).abs() < 1e-9);
345        assert_eq!(row.sample_size, Some(3));
346        assert!((row.avg_cost_usd.unwrap() - 0.20).abs() < 1e-9);
347        assert_eq!(row.median_duration_ms, Some(200));
348    }
349
350    #[test]
351    fn recompute_clears_stale_rows() {
352        let (_d, mut conn) = tmp_conn();
353        seed_tool(&conn, "skill:x");
354        insert_event(&conn, &evt("u1", "skill:x", Outcome::Success, None, None)).unwrap();
355        recompute_scores(&mut conn).unwrap();
356        conn.execute("DELETE FROM usage_events", []).unwrap();
357        let touched = recompute_scores(&mut conn).unwrap();
358        assert_eq!(touched, 0);
359        assert!(crate::scores::list(&conn, None).unwrap().is_empty());
360    }
361}