1use super::rows::*;
2use super::*;
3
4impl Store {
5 pub fn upsert_trace_span(&self, span: &TraceSpanRecord) -> Result<()> {
6 self.conn.execute(
7 "INSERT INTO trace_spans (
8 span_id, trace_id, parent_span_id, session_id, kind, name, status,
9 started_at_ms, ended_at_ms, duration_ms, model, tool, tokens_in, tokens_out,
10 reasoning_tokens, cost_usd_e6, context_used_tokens, context_max_tokens, payload
11 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19)
12 ON CONFLICT(span_id) DO UPDATE SET
13 trace_id=excluded.trace_id, parent_span_id=excluded.parent_span_id,
14 session_id=excluded.session_id, kind=excluded.kind, name=excluded.name,
15 status=excluded.status, started_at_ms=excluded.started_at_ms,
16 ended_at_ms=excluded.ended_at_ms, duration_ms=excluded.duration_ms,
17 model=excluded.model, tool=excluded.tool, tokens_in=excluded.tokens_in,
18 tokens_out=excluded.tokens_out, reasoning_tokens=excluded.reasoning_tokens,
19 cost_usd_e6=excluded.cost_usd_e6,
20 context_used_tokens=excluded.context_used_tokens,
21 context_max_tokens=excluded.context_max_tokens, payload=excluded.payload",
22 params![
23 span.span_id.as_str(),
24 span.trace_id.as_str(),
25 span.parent_span_id.as_deref(),
26 span.session_id.as_str(),
27 span.kind.as_str(),
28 span.name.as_str(),
29 span.status.as_str(),
30 span.started_at_ms.map(|v| v as i64),
31 span.ended_at_ms.map(|v| v as i64),
32 span.duration_ms.map(|v| v as i64),
33 span.model.as_deref(),
34 span.tool.as_deref(),
35 span.tokens_in.map(|v| v as i64),
36 span.tokens_out.map(|v| v as i64),
37 span.reasoning_tokens.map(|v| v as i64),
38 span.cost_usd_e6,
39 span.context_used_tokens.map(|v| v as i64),
40 span.context_max_tokens.map(|v| v as i64),
41 serde_json::to_string(&span.payload)?,
42 ],
43 )?;
44 Ok(())
45 }
46
47 pub fn trace_spans_for_session(&self, session_id: &str) -> Result<Vec<TraceSpanRecord>> {
48 let mut stmt = self.conn.prepare(
49 "SELECT span_id, trace_id, parent_span_id, session_id, kind, name, status,
50 started_at_ms, ended_at_ms, duration_ms, model, tool, tokens_in, tokens_out,
51 reasoning_tokens, cost_usd_e6, context_used_tokens, context_max_tokens, payload
52 FROM trace_spans WHERE session_id = ?1
53 ORDER BY COALESCE(started_at_ms, ended_at_ms, 0), span_id",
54 )?;
55 let rows = stmt.query_map(params![session_id], trace_span_from_row)?;
56 Ok(rows.filter_map(|row| row.ok()).collect())
57 }
58
59 pub(crate) fn capture_quality_rows(
60 &self,
61 workspace: &str,
62 start_ms: u64,
63 end_ms: u64,
64 ) -> Result<Vec<CaptureQualityRow>> {
65 let mut stmt = self.conn.prepare(
66 "SELECT e.source,
67 e.tokens_in IS NOT NULL OR e.tokens_out IS NOT NULL OR e.reasoning_tokens IS NOT NULL,
68 e.cost_usd_e6 IS NOT NULL,
69 e.latency_ms IS NOT NULL OR e.ttft_ms IS NOT NULL,
70 e.context_used_tokens IS NOT NULL AND e.context_max_tokens IS NOT NULL,
71 COALESCE(e.cache_read_tokens, 0),
72 COALESCE(e.cache_creation_tokens, 0)
73 FROM events e JOIN sessions s ON s.id = e.session_id
74 WHERE s.workspace = ?1 AND e.ts_ms >= ?2 AND e.ts_ms <= ?3",
75 )?;
76 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
77 Ok(CaptureQualityRow {
78 source: row.get(0)?,
79 has_tokens: row.get::<_, i64>(1)? != 0,
80 has_cost: row.get::<_, i64>(2)? != 0,
81 has_latency: row.get::<_, i64>(3)? != 0,
82 has_context: row.get::<_, i64>(4)? != 0,
83 cache_read_tokens: row.get::<_, i64>(5)? as u64,
84 cache_creation_tokens: row.get::<_, i64>(6)? as u64,
85 })
86 })?;
87 Ok(rows.filter_map(|row| row.ok()).collect())
88 }
89
90 pub(crate) fn trace_span_quality_rows(
91 &self,
92 workspace: &str,
93 start_ms: u64,
94 end_ms: u64,
95 ) -> Result<Vec<TraceSpanQualityRow>> {
96 let mut stmt = self.conn.prepare(
97 "SELECT ts.kind,
98 ts.parent_span_id IS NOT NULL
99 AND parent.span_id IS NULL
100 AND ts.kind NOT IN ('session', 'agent')
101 FROM trace_spans ts
102 JOIN sessions s ON s.id = ts.session_id
103 LEFT JOIN trace_spans parent ON parent.span_id = ts.parent_span_id
104 WHERE s.workspace = ?1
105 AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) >= ?2
106 AND COALESCE(ts.started_at_ms, ts.ended_at_ms, 0) <= ?3",
107 )?;
108 let rows = stmt.query_map(params![workspace, start_ms as i64, end_ms as i64], |row| {
109 Ok(TraceSpanQualityRow {
110 kind: row.get(0)?,
111 is_orphan: row.get::<_, i64>(1)? != 0,
112 })
113 })?;
114 Ok(rows.filter_map(|row| row.ok()).collect())
115 }
116}