Skip to main content

kaizen/store/sqlite/
trace_spans.rs

1use super::rows::*;
2use super::*;
3
4impl Store {
5    pub fn upsert_trace_span(&self, span: &TraceSpanRecord) -> Result<()> {
6        self.conn.execute(
7            "INSERT INTO trace_spans (
8                span_id, trace_id, parent_span_id, session_id, kind, name, status,
9                started_at_ms, ended_at_ms, duration_ms, model, tool, tokens_in, tokens_out,
10                reasoning_tokens, cost_usd_e6, context_used_tokens, context_max_tokens, payload
11             ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19)
12             ON CONFLICT(span_id) DO UPDATE SET
13                trace_id=excluded.trace_id, parent_span_id=excluded.parent_span_id,
14                session_id=excluded.session_id, kind=excluded.kind, name=excluded.name,
15                status=excluded.status, started_at_ms=excluded.started_at_ms,
16                ended_at_ms=excluded.ended_at_ms, duration_ms=excluded.duration_ms,
17                model=excluded.model, tool=excluded.tool, tokens_in=excluded.tokens_in,
18                tokens_out=excluded.tokens_out, reasoning_tokens=excluded.reasoning_tokens,
19                cost_usd_e6=excluded.cost_usd_e6,
20                context_used_tokens=excluded.context_used_tokens,
21                context_max_tokens=excluded.context_max_tokens, payload=excluded.payload",
22            params![
23                span.span_id.as_str(),
24                span.trace_id.as_str(),
25                span.parent_span_id.as_deref(),
26                span.session_id.as_str(),
27                span.kind.as_str(),
28                span.name.as_str(),
29                span.status.as_str(),
30                span.started_at_ms.map(|v| v as i64),
31                span.ended_at_ms.map(|v| v as i64),
32                span.duration_ms.map(|v| v as i64),
33                span.model.as_deref(),
34                span.tool.as_deref(),
35                span.tokens_in.map(|v| v as i64),
36                span.tokens_out.map(|v| v as i64),
37                span.reasoning_tokens.map(|v| v as i64),
38                span.cost_usd_e6,
39                span.context_used_tokens.map(|v| v as i64),
40                span.context_max_tokens.map(|v| v as i64),
41                serde_json::to_string(&span.payload)?,
42            ],
43        )?;
44        Ok(())
45    }
46
47    pub fn trace_spans_for_session(&self, session_id: &str) -> Result<Vec<TraceSpanRecord>> {
48        let mut stmt = self.conn.prepare(
49            "SELECT span_id, trace_id, parent_span_id, session_id, kind, name, status,
50                    started_at_ms, ended_at_ms, duration_ms, model, tool, tokens_in, tokens_out,
51                    reasoning_tokens, cost_usd_e6, context_used_tokens, context_max_tokens, payload
52             FROM trace_spans WHERE session_id = ?1
53             ORDER BY COALESCE(started_at_ms, ended_at_ms, 0), span_id",
54        )?;
55        let rows = stmt.query_map(params![session_id], trace_span_from_row)?;
56        Ok(rows.filter_map(|row| row.ok()).collect())
57    }
58
59    pub(crate) fn capture_quality_rows(
60        &self,
61        workspace: &str,
62        start_ms: u64,
63        end_ms: u64,
64    ) -> Result<Vec<CaptureQualityRow>> {
65        let mut stmt = self.conn.prepare(
66            "SELECT e.source,
67                    e.tokens_in IS NOT NULL OR e.tokens_out IS NOT NULL OR e.reasoning_tokens IS NOT NULL,
68                    e.latency_ms IS NOT NULL OR e.ttft_ms IS NOT NULL,
69                    e.context_used_tokens IS NOT NULL AND e.context_max_tokens IS NOT NULL
70             FROM events e JOIN sessions s ON s.id = e.session_id
71             WHERE s.workspace = ?1 AND e.ts_ms >= ?2 AND e.ts_ms <= ?3",
72        )?;
73        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
74            Ok(CaptureQualityRow {
75                source: row.get(0)?,
76                has_tokens: row.get::<_, i64>(1)? != 0,
77                has_latency: row.get::<_, i64>(2)? != 0,
78                has_context: row.get::<_, i64>(3)? != 0,
79            })
80        })?;
81        Ok(rows.filter_map(|row| row.ok()).collect())
82    }
83
84    pub(crate) fn trace_span_quality_rows(
85        &self,
86        workspace: &str,
87        start_ms: u64,
88        end_ms: u64,
89    ) -> Result<Vec<TraceSpanQualityRow>> {
90        let mut stmt = self.conn.prepare(
91            "SELECT ts.kind,
92                    ts.parent_span_id IS NOT NULL
93                    AND parent.span_id IS NULL
94                    AND ts.kind NOT IN ('session', 'agent')
95             FROM trace_spans ts
96             JOIN sessions s ON s.id = ts.session_id
97             LEFT JOIN trace_spans parent ON parent.span_id = ts.parent_span_id
98             WHERE s.workspace = ?1
99               AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) >= ?2
100               AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) <= ?3",
101        )?;
102        let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
103            Ok(TraceSpanQualityRow {
104                kind: row.get(0)?,
105                is_orphan: row.get::<_, i64>(1)? != 0,
106            })
107        })?;
108        Ok(rows.filter_map(|row| row.ok()).collect())
109    }
110}