Skip to main content

kaizen/store/sqlite/
tool_spans.rs

1use super::rows::*;
2use super::*;
3
4impl Store {
5    pub fn tool_rank_rows_in_window(
6        &self,
7        workspace: &str,
8        start_ms: u64,
9        end_ms: u64,
10    ) -> Result<Vec<RankedTool>> {
11        let mut stmt = self.conn.prepare(TOOL_RANK_ROWS_SQL)?;
12        let rows = stmt.query_map(
13            params![workspace, start_ms as i64, end_ms as i64],
14            ranked_tool_row,
15        )?;
16        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
17    }
18
19    pub fn tool_spans_in_window(
20        &self,
21        workspace: &str,
22        start_ms: u64,
23        end_ms: u64,
24    ) -> Result<Vec<ToolSpanView>> {
25        let mut stmt = self.conn.prepare(
26            "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
27                    reasoning_tokens, cost_usd_e6, paths_json,
28                    parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
29             FROM (
30                 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
31                        ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
32                        ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
33                        ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
34                        ts.started_at_ms AS sort_ms
35                 FROM tool_spans ts
36                 JOIN sessions s ON s.id = ts.session_id
37                 WHERE s.workspace = ?1
38                   AND ts.started_at_ms >= ?2
39                   AND ts.started_at_ms <= ?3
40                 UNION ALL
41                 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
42                        ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
43                        ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
44                        ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
45                        ts.ended_at_ms AS sort_ms
46                 FROM tool_spans ts
47                 JOIN sessions s ON s.id = ts.session_id
48                 WHERE s.workspace = ?1
49                   AND ts.started_at_ms IS NULL
50                   AND ts.ended_at_ms >= ?2
51                   AND ts.ended_at_ms <= ?3
52             )
53             ORDER BY sort_ms DESC",
54        )?;
55        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
56            let paths_json: String = row.get(8)?;
57            Ok(ToolSpanView {
58                span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
59                tool: row
60                    .get::<_, Option<String>>(1)?
61                    .unwrap_or_else(|| "unknown".into()),
62                status: row.get(2)?,
63                lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
64                tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
65                tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
66                reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
67                cost_usd_e6: row.get(7)?,
68                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
69                parent_span_id: row.get(9)?,
70                depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
71                subtree_cost_usd_e6: row.get(11)?,
72                subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
73            })
74        })?;
75        Ok(rows.filter_map(|row| row.ok()).collect())
76    }
77
78    pub fn session_span_tree(
79        &self,
80        session_id: &str,
81    ) -> Result<Vec<crate::store::span_tree::SpanNode>> {
82        let last_event_seq = self.last_event_seq_for_session(session_id)?;
83        if let Some(entry) = self.span_tree_cache.borrow().as_ref()
84            && entry.session_id == session_id
85            && entry.last_event_seq == last_event_seq
86        {
87            return Ok(entry.nodes.clone());
88        }
89        let mut stmt = self.conn.prepare(
90            "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
91                    reasoning_tokens, cost_usd_e6, paths_json,
92                    parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
93             FROM tool_spans
94             WHERE session_id = ?1
95             ORDER BY depth ASC, started_at_ms ASC",
96        )?;
97        let rows = stmt.query_map(params![session_id], |row| {
98            let paths_json: String = row.get(8)?;
99            Ok(crate::metrics::types::ToolSpanView {
100                span_id: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
101                tool: row
102                    .get::<_, Option<String>>(1)?
103                    .unwrap_or_else(|| "unknown".into()),
104                status: row.get(2)?,
105                lead_time_ms: row.get::<_, Option<i64>>(3)?.map(|v| v as u64),
106                tokens_in: row.get::<_, Option<i64>>(4)?.map(|v| v as u32),
107                tokens_out: row.get::<_, Option<i64>>(5)?.map(|v| v as u32),
108                reasoning_tokens: row.get::<_, Option<i64>>(6)?.map(|v| v as u32),
109                cost_usd_e6: row.get(7)?,
110                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
111                parent_span_id: row.get(9)?,
112                depth: row.get::<_, Option<i64>>(10)?.unwrap_or(0) as u32,
113                subtree_cost_usd_e6: row.get(11)?,
114                subtree_token_count: row.get::<_, Option<i64>>(12)?.map(|v| v as u32),
115            })
116        })?;
117        let spans: Vec<_> = rows.filter_map(|r| r.ok()).collect();
118        let nodes = crate::store::span_tree::build_tree(spans);
119        *self.span_tree_cache.borrow_mut() = Some(SpanTreeCacheEntry {
120            session_id: session_id.to_string(),
121            last_event_seq,
122            nodes: nodes.clone(),
123        });
124        Ok(nodes)
125    }
126
127    pub fn tool_spans_for_session(&self, session_id: &str) -> Result<Vec<ToolSpanSyncRow>> {
128        let mut stmt = self.conn.prepare(
129            "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms, lead_time_ms,
130                    tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
131             FROM tool_spans WHERE session_id = ?1 ORDER BY started_at_ms ASC, span_id ASC",
132        )?;
133        let rows = stmt.query_map(params![session_id], |row| {
134            let paths_json: String = row.get(12)?;
135            Ok(ToolSpanSyncRow {
136                span_id: row.get(0)?,
137                session_id: row.get(1)?,
138                tool: row.get(2)?,
139                tool_call_id: row.get(3)?,
140                status: row.get(4)?,
141                started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
142                ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
143                lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
144                tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
145                tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
146                reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
147                cost_usd_e6: row.get(11)?,
148                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
149            })
150        })?;
151        Ok(rows.filter_map(|row| row.ok()).collect())
152    }
153}