1use super::rows::*;
2use super::*;
3
4impl Store {
5 pub fn upsert_trace_span(&self, span: &TraceSpanRecord) -> Result<()> {
6 self.conn.execute(
7 "INSERT INTO trace_spans (
8 span_id, trace_id, parent_span_id, session_id, kind, name, status,
9 started_at_ms, ended_at_ms, duration_ms, model, tool, tokens_in, tokens_out,
10 reasoning_tokens, cost_usd_e6, context_used_tokens, context_max_tokens, payload
11 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19)
12 ON CONFLICT(span_id) DO UPDATE SET
13 trace_id=excluded.trace_id, parent_span_id=excluded.parent_span_id,
14 session_id=excluded.session_id, kind=excluded.kind, name=excluded.name,
15 status=excluded.status, started_at_ms=excluded.started_at_ms,
16 ended_at_ms=excluded.ended_at_ms, duration_ms=excluded.duration_ms,
17 model=excluded.model, tool=excluded.tool, tokens_in=excluded.tokens_in,
18 tokens_out=excluded.tokens_out, reasoning_tokens=excluded.reasoning_tokens,
19 cost_usd_e6=excluded.cost_usd_e6,
20 context_used_tokens=excluded.context_used_tokens,
21 context_max_tokens=excluded.context_max_tokens, payload=excluded.payload",
22 params![
23 span.span_id.as_str(),
24 span.trace_id.as_str(),
25 span.parent_span_id.as_deref(),
26 span.session_id.as_str(),
27 span.kind.as_str(),
28 span.name.as_str(),
29 span.status.as_str(),
30 span.started_at_ms.map(|v| v as i64),
31 span.ended_at_ms.map(|v| v as i64),
32 span.duration_ms.map(|v| v as i64),
33 span.model.as_deref(),
34 span.tool.as_deref(),
35 span.tokens_in.map(|v| v as i64),
36 span.tokens_out.map(|v| v as i64),
37 span.reasoning_tokens.map(|v| v as i64),
38 span.cost_usd_e6,
39 span.context_used_tokens.map(|v| v as i64),
40 span.context_max_tokens.map(|v| v as i64),
41 serde_json::to_string(&span.payload)?,
42 ],
43 )?;
44 Ok(())
45 }
46
47 pub fn trace_spans_for_session(&self, session_id: &str) -> Result<Vec<TraceSpanRecord>> {
48 let mut stmt = self.conn.prepare(
49 "SELECT span_id, trace_id, parent_span_id, session_id, kind, name, status,
50 started_at_ms, ended_at_ms, duration_ms, model, tool, tokens_in, tokens_out,
51 reasoning_tokens, cost_usd_e6, context_used_tokens, context_max_tokens, payload
52 FROM trace_spans WHERE session_id = ?1
53 ORDER BY COALESCE(started_at_ms, ended_at_ms, 0), span_id",
54 )?;
55 let rows = stmt.query_map(params![session_id], trace_span_from_row)?;
56 Ok(rows.filter_map(|row| row.ok()).collect())
57 }
58
59 pub(crate) fn capture_quality_rows(
60 &self,
61 workspace: &str,
62 start_ms: u64,
63 end_ms: u64,
64 ) -> Result<Vec<CaptureQualityRow>> {
65 let mut stmt = self.conn.prepare(
66 "SELECT e.source,
67 e.tokens_in IS NOT NULL OR e.tokens_out IS NOT NULL OR e.reasoning_tokens IS NOT NULL,
68 e.latency_ms IS NOT NULL OR e.ttft_ms IS NOT NULL,
69 e.context_used_tokens IS NOT NULL AND e.context_max_tokens IS NOT NULL
70 FROM events e JOIN sessions s ON s.id = e.session_id
71 WHERE s.workspace = ?1 AND e.ts_ms >= ?2 AND e.ts_ms <= ?3",
72 )?;
73 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
74 Ok(CaptureQualityRow {
75 source: row.get(0)?,
76 has_tokens: row.get::<_, i64>(1)? != 0,
77 has_latency: row.get::<_, i64>(2)? != 0,
78 has_context: row.get::<_, i64>(3)? != 0,
79 })
80 })?;
81 Ok(rows.filter_map(|row| row.ok()).collect())
82 }
83
84 pub(crate) fn trace_span_quality_rows(
85 &self,
86 workspace: &str,
87 start_ms: u64,
88 end_ms: u64,
89 ) -> Result<Vec<TraceSpanQualityRow>> {
90 let mut stmt = self.conn.prepare(
91 "SELECT ts.kind,
92 ts.parent_span_id IS NOT NULL
93 AND parent.span_id IS NULL
94 AND ts.kind NOT IN ('session', 'agent')
95 FROM trace_spans ts
96 JOIN sessions s ON s.id = ts.session_id
97 LEFT JOIN trace_spans parent ON parent.span_id = ts.parent_span_id
98 WHERE s.workspace = ?1
99 AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) >= ?2
100 AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) <= ?3",
101 )?;
102 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
103 Ok(TraceSpanQualityRow {
104 kind: row.get(0)?,
105 is_orphan: row.get::<_, i64>(1)? != 0,
106 })
107 })?;
108 Ok(rows.filter_map(|row| row.ok()).collect())
109 }
110}