1use crate::store::Store;
4use anyhow::Result;
5use rusqlite::{OptionalExtension, params};
6use serde::{Deserialize, Serialize};
7
8#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
9pub struct SessionAggregate {
10 pub session_id: String,
11 pub event_count: u64,
12 pub tool_call_count: u64,
13 pub error_count: u64,
14 pub tokens_in: u64,
15 pub tokens_out: u64,
16 pub reasoning_tokens: u64,
17 pub cache_read_tokens: u64,
18 pub cache_creation_tokens: u64,
19 pub cost_usd_e6: i64,
20 pub first_event_ms: Option<u64>,
21 pub last_event_ms: Option<u64>,
22 pub rebuilt_at_ms: u64,
23}
24
25pub fn rebuild_workspace(store: &Store, workspace: &str) -> Result<usize> {
26 store
27 .list_sessions(workspace)?
28 .iter()
29 .map(|s| upsert_session(store, &s.id).map(|_| 1usize))
30 .sum()
31}
32
33pub fn upsert_session(store: &Store, session_id: &str) -> Result<SessionAggregate> {
34 let row = aggregate_raw(store, session_id, now_ms())?;
35 store.conn().execute(
36 "INSERT INTO session_aggregates (
37 session_id, event_count, tool_call_count, error_count, tokens_in,
38 tokens_out, reasoning_tokens, cache_read_tokens, cache_creation_tokens,
39 cost_usd_e6, first_event_ms, last_event_ms, rebuilt_at_ms
40 ) VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13)
41 ON CONFLICT(session_id) DO UPDATE SET
42 event_count=excluded.event_count, tool_call_count=excluded.tool_call_count,
43 error_count=excluded.error_count, tokens_in=excluded.tokens_in,
44 tokens_out=excluded.tokens_out, reasoning_tokens=excluded.reasoning_tokens,
45 cache_read_tokens=excluded.cache_read_tokens,
46 cache_creation_tokens=excluded.cache_creation_tokens,
47 cost_usd_e6=excluded.cost_usd_e6, first_event_ms=excluded.first_event_ms,
48 last_event_ms=excluded.last_event_ms, rebuilt_at_ms=excluded.rebuilt_at_ms",
49 params![
50 row.session_id,
51 row.event_count as i64,
52 row.tool_call_count as i64,
53 row.error_count as i64,
54 row.tokens_in as i64,
55 row.tokens_out as i64,
56 row.reasoning_tokens as i64,
57 row.cache_read_tokens as i64,
58 row.cache_creation_tokens as i64,
59 row.cost_usd_e6,
60 row.first_event_ms.map(|v| v as i64),
61 row.last_event_ms.map(|v| v as i64),
62 row.rebuilt_at_ms as i64,
63 ],
64 )?;
65 Ok(row)
66}
67
68pub fn get(store: &Store, session_id: &str) -> Result<Option<SessionAggregate>> {
69 store
70 .conn()
71 .query_row(
72 "SELECT session_id, event_count, tool_call_count, error_count, tokens_in,
73 tokens_out, reasoning_tokens, cache_read_tokens, cache_creation_tokens,
74 cost_usd_e6, first_event_ms, last_event_ms, rebuilt_at_ms
75 FROM session_aggregates WHERE session_id = ?1",
76 [session_id],
77 map_aggregate,
78 )
79 .optional()
80 .map_err(Into::into)
81}
82
83fn aggregate_raw(store: &Store, session_id: &str, rebuilt_at_ms: u64) -> Result<SessionAggregate> {
84 store
85 .conn()
86 .query_row(
87 "SELECT COUNT(*), COALESCE(SUM(kind='ToolCall'),0), COALESCE(SUM(kind='Error'),0),
88 COALESCE(SUM(tokens_in),0), COALESCE(SUM(tokens_out),0),
89 COALESCE(SUM(reasoning_tokens),0), COALESCE(SUM(cache_read_tokens),0),
90 COALESCE(SUM(cache_creation_tokens),0), COALESCE(SUM(cost_usd_e6),0),
91 MIN(ts_ms), MAX(ts_ms)
92 FROM events WHERE session_id = ?1",
93 [session_id],
94 |row| {
95 Ok(SessionAggregate {
96 session_id: session_id.to_string(),
97 event_count: row.get::<_, i64>(0)? as u64,
98 tool_call_count: row.get::<_, i64>(1)? as u64,
99 error_count: row.get::<_, i64>(2)? as u64,
100 tokens_in: row.get::<_, i64>(3)? as u64,
101 tokens_out: row.get::<_, i64>(4)? as u64,
102 reasoning_tokens: row.get::<_, i64>(5)? as u64,
103 cache_read_tokens: row.get::<_, i64>(6)? as u64,
104 cache_creation_tokens: row.get::<_, i64>(7)? as u64,
105 cost_usd_e6: row.get(8)?,
106 first_event_ms: row.get::<_, Option<i64>>(9)?.map(|v| v as u64),
107 last_event_ms: row.get::<_, Option<i64>>(10)?.map(|v| v as u64),
108 rebuilt_at_ms,
109 })
110 },
111 )
112 .map_err(Into::into)
113}
114
115fn map_aggregate(row: &rusqlite::Row<'_>) -> rusqlite::Result<SessionAggregate> {
116 Ok(SessionAggregate {
117 session_id: row.get(0)?,
118 event_count: row.get::<_, i64>(1)? as u64,
119 tool_call_count: row.get::<_, i64>(2)? as u64,
120 error_count: row.get::<_, i64>(3)? as u64,
121 tokens_in: row.get::<_, i64>(4)? as u64,
122 tokens_out: row.get::<_, i64>(5)? as u64,
123 reasoning_tokens: row.get::<_, i64>(6)? as u64,
124 cache_read_tokens: row.get::<_, i64>(7)? as u64,
125 cache_creation_tokens: row.get::<_, i64>(8)? as u64,
126 cost_usd_e6: row.get(9)?,
127 first_event_ms: row.get::<_, Option<i64>>(10)?.map(|v| v as u64),
128 last_event_ms: row.get::<_, Option<i64>>(11)?.map(|v| v as u64),
129 rebuilt_at_ms: row.get::<_, i64>(12)? as u64,
130 })
131}
132
133fn now_ms() -> u64 {
134 std::time::SystemTime::now()
135 .duration_since(std::time::UNIX_EPOCH)
136 .unwrap_or_default()
137 .as_millis() as u64
138}