Skip to main content

kaizen/store/sqlite/
tool_span_sync.rs

1use super::*;
2
3impl Store {
4    /// Sync-shaped tool spans whose session falls in `[start_ms, end_ms]`. Mirrors
5    /// `retro_events_in_window` for the spans table so `kaizen telemetry push` can ship
6    /// `IngestExportBatch::ToolSpans` next to the events batch. Window matches on
7    /// `started_at_ms` first, falling back to `ended_at_ms` for spans that never started a
8    /// timer (status-only rows). Workspace filter joins through `sessions.workspace`.
9    pub fn tool_spans_sync_rows_in_window(
10        &self,
11        workspace: &str,
12        start_ms: u64,
13        end_ms: u64,
14    ) -> Result<Vec<ToolSpanSyncRow>> {
15        let mut stmt = self.conn.prepare(
16            "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms,
17                    lead_time_ms, tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
18             FROM (
19                 SELECT ts.span_id, ts.session_id, ts.tool, ts.tool_call_id, ts.status,
20                        ts.started_at_ms, ts.ended_at_ms, ts.lead_time_ms, ts.tokens_in,
21                        ts.tokens_out, ts.reasoning_tokens, ts.cost_usd_e6, ts.paths_json,
22                        ts.started_at_ms AS sort_ms
23                 FROM tool_spans ts
24                 JOIN sessions s ON s.id = ts.session_id
25                 WHERE s.workspace = ?1
26                   AND ts.started_at_ms IS NOT NULL
27                   AND ts.started_at_ms >= ?2
28                   AND ts.started_at_ms <= ?3
29                 UNION ALL
30                 SELECT ts.span_id, ts.session_id, ts.tool, ts.tool_call_id, ts.status,
31                        ts.started_at_ms, ts.ended_at_ms, ts.lead_time_ms, ts.tokens_in,
32                        ts.tokens_out, ts.reasoning_tokens, ts.cost_usd_e6, ts.paths_json,
33                        ts.ended_at_ms AS sort_ms
34                 FROM tool_spans ts
35                 JOIN sessions s ON s.id = ts.session_id
36                 WHERE s.workspace = ?1
37                   AND ts.started_at_ms IS NULL
38                   AND ts.ended_at_ms IS NOT NULL
39                   AND ts.ended_at_ms >= ?2
40                   AND ts.ended_at_ms <= ?3
41             )
42             ORDER BY sort_ms ASC, span_id ASC",
43        )?;
44        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
45            let paths_json: String = row.get(12)?;
46            Ok(ToolSpanSyncRow {
47                span_id: row.get(0)?,
48                session_id: row.get(1)?,
49                tool: row.get(2)?,
50                tool_call_id: row.get(3)?,
51                status: row.get(4)?,
52                started_at_ms: row.get::<_, Option<i64>>(5)?.map(|v| v as u64),
53                ended_at_ms: row.get::<_, Option<i64>>(6)?.map(|v| v as u64),
54                lead_time_ms: row.get::<_, Option<i64>>(7)?.map(|v| v as u64),
55                tokens_in: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
56                tokens_out: row.get::<_, Option<i64>>(9)?.map(|v| v as u32),
57                reasoning_tokens: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
58                cost_usd_e6: row.get(11)?,
59                paths: serde_json::from_str(&paths_json).unwrap_or_default(),
60            })
61        })?;
62        Ok(rows.filter_map(|row| row.ok()).collect())
63    }
64}