Skip to main content

kaizen/store/sqlite/
tool_spans.rs

1use super::rows::*;
2use super::*;
3mod view_row;
4use view_row::tool_span_view_row;
5
6const SESSION_SPAN_TREE_SQL: &str =
7    "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
8            reasoning_tokens, cost_usd_e6, paths_json,
9            parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
10     FROM tool_spans
11     WHERE session_id = ?1
12     ORDER BY depth ASC, started_at_ms ASC, span_id ASC
13     LIMIT ?2";
14
15const SESSION_TOOL_SPANS_SQL: &str =
16    "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms, lead_time_ms,
17            tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
18     FROM tool_spans WHERE session_id = ?1 ORDER BY started_at_ms ASC, span_id ASC
19     LIMIT ?2";
20
21impl Store {
22    pub fn tool_rank_rows_in_window(
23        &self,
24        workspace: &str,
25        start_ms: u64,
26        end_ms: u64,
27    ) -> Result<Vec<RankedTool>> {
28        let mut stmt = self.conn.prepare(TOOL_RANK_ROWS_SQL)?;
29        let rows = stmt.query_map(
30            params![workspace, start_ms as i64, end_ms as i64],
31            ranked_tool_row,
32        )?;
33        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
34    }
35
36    pub fn tool_spans_in_window(
37        &self,
38        workspace: &str,
39        start_ms: u64,
40        end_ms: u64,
41    ) -> Result<Vec<ToolSpanView>> {
42        let mut stmt = self.conn.prepare(
43            "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
44                    reasoning_tokens, cost_usd_e6, paths_json,
45                    parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
46             FROM (
47                 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
48                        ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
49                        ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
50                        ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
51                        ts.started_at_ms AS sort_ms
52                 FROM tool_spans ts
53                 JOIN sessions s ON s.id = ts.session_id
54                 WHERE s.workspace = ?1
55                   AND ts.started_at_ms >= ?2
56                   AND ts.started_at_ms <= ?3
57                 UNION ALL
58                 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
59                        ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
60                        ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
61                        ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
62                        ts.ended_at_ms AS sort_ms
63                 FROM tool_spans ts
64                 JOIN sessions s ON s.id = ts.session_id
65                 WHERE s.workspace = ?1
66                   AND ts.started_at_ms IS NULL
67                   AND ts.ended_at_ms >= ?2
68                   AND ts.ended_at_ms <= ?3
69             )
70             ORDER BY sort_ms DESC",
71        )?;
72        let rows = stmt.query_map(
73            params![workspace, start_ms as i64, end_ms as i64],
74            tool_span_view_row,
75        )?;
76        Ok(rows.filter_map(|row| row.ok()).collect())
77    }
78
79    pub fn session_span_tree(
80        &self,
81        session_id: &str,
82    ) -> Result<Vec<crate::store::span_tree::SpanNode>> {
83        let last_event_seq = self.last_event_seq_for_session(session_id)?;
84        if let Some(nodes) = cached_span_tree(self, session_id, last_event_seq) {
85            return Ok(nodes);
86        }
87        let nodes = query_session_span_tree(self, session_id, usize::MAX)?;
88        cache_span_tree(self, session_id, last_event_seq, &nodes);
89        Ok(nodes)
90    }
91
92    pub(crate) fn limited_session_span_tree(
93        &self,
94        session_id: &str,
95        limit: usize,
96    ) -> Result<Vec<crate::store::span_tree::SpanNode>> {
97        query_session_span_tree(self, session_id, limit)
98    }
99
100    pub fn tool_spans_for_session(&self, session_id: &str) -> Result<Vec<ToolSpanSyncRow>> {
101        query_tool_spans_for_session(self, session_id, usize::MAX)
102    }
103
104    pub(crate) fn tool_spans_for_session_limited(
105        &self,
106        session_id: &str,
107        limit: usize,
108    ) -> Result<Vec<ToolSpanSyncRow>> {
109        query_tool_spans_for_session(self, session_id, limit)
110    }
111}
112
113fn query_tool_spans_for_session(
114    store: &Store,
115    session_id: &str,
116    limit: usize,
117) -> Result<Vec<ToolSpanSyncRow>> {
118    let mut stmt = store.conn.prepare(SESSION_TOOL_SPANS_SQL)?;
119    let rows = stmt.query_map(params![session_id, sql_limit(limit)], tool_span_sync_row)?;
120    rows.map(|row| row.map_err(Into::into)).collect()
121}
122
123fn tool_span_sync_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<ToolSpanSyncRow> {
124    Ok(ToolSpanSyncRow {
125        span_id: row.get(0)?,
126        session_id: row.get(1)?,
127        tool: row.get(2)?,
128        tool_call_id: row.get(3)?,
129        status: row.get(4)?,
130        paths: sync_paths(row)?,
131        started_at_ms: sync_u64(row, 5)?,
132        ended_at_ms: sync_u64(row, 6)?,
133        lead_time_ms: sync_u64(row, 7)?,
134        tokens_in: sync_u32(row, 8)?,
135        tokens_out: sync_u32(row, 9)?,
136        reasoning_tokens: sync_u32(row, 10)?,
137        cost_usd_e6: row.get(11)?,
138    })
139}
140
141fn sync_paths(row: &rusqlite::Row<'_>) -> rusqlite::Result<Vec<String>> {
142    let paths_json: String = row.get(12)?;
143    Ok(serde_json::from_str(&paths_json).unwrap_or_default())
144}
145
146fn sync_u64(row: &rusqlite::Row<'_>, index: usize) -> rusqlite::Result<Option<u64>> {
147    row.get::<_, Option<i64>>(index)
148        .map(|value| value.map(|value| value as u64))
149}
150
151fn sync_u32(row: &rusqlite::Row<'_>, index: usize) -> rusqlite::Result<Option<u32>> {
152    row.get::<_, Option<i64>>(index)
153        .map(|value| value.map(|value| value as u32))
154}
155
156fn query_session_span_tree(
157    store: &Store,
158    session_id: &str,
159    limit: usize,
160) -> Result<Vec<crate::store::span_tree::SpanNode>> {
161    let mut stmt = store.conn.prepare(SESSION_SPAN_TREE_SQL)?;
162    let rows = stmt.query_map(params![session_id, sql_limit(limit)], tool_span_view_row)?;
163    let spans = rows
164        .map(|row| row.map_err(anyhow::Error::from))
165        .collect::<Result<Vec<_>>>()?;
166    Ok(crate::store::span_tree::build_tree(spans))
167}
168
169fn cached_span_tree(
170    store: &Store,
171    session_id: &str,
172    last_event_seq: Option<u64>,
173) -> Option<Vec<crate::store::span_tree::SpanNode>> {
174    store
175        .span_tree_cache
176        .borrow()
177        .as_ref()
178        .filter(|entry| entry.session_id == session_id && entry.last_event_seq == last_event_seq)
179        .map(|entry| entry.nodes.clone())
180}
181
182fn cache_span_tree(
183    store: &Store,
184    session_id: &str,
185    last_event_seq: Option<u64>,
186    nodes: &[crate::store::span_tree::SpanNode],
187) {
188    *store.span_tree_cache.borrow_mut() = Some(SpanTreeCacheEntry {
189        session_id: session_id.to_string(),
190        last_event_seq,
191        nodes: nodes.to_vec(),
192    });
193}
194
195fn sql_limit(limit: usize) -> i64 {
196    limit.min(i64::MAX as usize) as i64
197}