1use std::path::Path;
2use std::sync::Mutex;
3
4use anyhow::Result;
5use chrono::{DateTime, NaiveDateTime, Utc};
6use duckdb::{Connection, params};
7
8use super::Store;
9use super::models::{
10 Anomaly, AnomalyReport, CorrelateReport, ErrorOperation, LogQuery, LogRecord, LogSeverity,
11 LogSummary, MetricPoint, MetricQuery, MetricSummary, MetricType, ServiceInfo, ServiceSummary,
12 Span, SpanKind, SpanStatus, SummaryReport, TraceComment, TraceQuery, TraceSummary,
13};
14
15fn json_path_for_key(key: &str) -> String {
19 let escaped = key.replace('\\', r"\\").replace('"', r#"\""#);
20 format!(r#"$."{escaped}""#)
21}
22
23fn parse_timestamp(s: &str) -> DateTime<Utc> {
24 NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f")
25 .or_else(|_| NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S"))
26 .map(|dt| dt.and_utc())
27 .unwrap_or_default()
28}
29
30fn row_to_span(row: &duckdb::Row<'_>) -> duckdb::Result<Span> {
31 let attrs_str: String = row.get(9)?;
32 let events_str: String = row.get(10)?;
33 let status_str: String = row.get(8)?;
34 let parent: Option<String> = row.get(2)?;
35 let start_str: String = row.get(5)?;
36 let end_str: String = row.get(6)?;
37 let kind_str: Option<String> = row.get(11)?;
38 let llm_str: Option<String> = row.get(12)?;
39
40 Ok(Span {
41 trace_id: row.get(0)?,
42 span_id: row.get(1)?,
43 parent_span_id: if parent.as_deref() == Some("") {
44 None
45 } else {
46 parent
47 },
48 service: row.get(3)?,
49 operation: row.get(4)?,
50 start_time: parse_timestamp(&start_str),
51 end_time: parse_timestamp(&end_str),
52 duration_ms: row.get(7)?,
53 status: SpanStatus::from_str(&status_str),
54 attributes: serde_json::from_str(&attrs_str).unwrap_or_default(),
55 events: serde_json::from_str(&events_str).unwrap_or_default(),
56 kind: kind_str
57 .map(|s| SpanKind::from_str(&s))
58 .unwrap_or(SpanKind::Internal),
59 llm: llm_str.and_then(|s| serde_json::from_str(&s).ok()),
60 })
61}
62
63pub struct DuckDbStore {
64 conn: Mutex<Connection>,
65}
66
67impl DuckDbStore {
68 pub fn new(data_dir: &str) -> Result<Self> {
69 std::fs::create_dir_all(data_dir)?;
70 let db_path = Path::new(data_dir).join("tael.duckdb");
71 let conn = Connection::open(db_path)?;
72 let store = Self {
73 conn: Mutex::new(conn),
74 };
75 store.init_schema()?;
76 Ok(store)
77 }
78
79 fn init_schema(&self) -> Result<()> {
80 let conn = self.conn.lock().unwrap();
81 conn.execute_batch(
82 "
83 CREATE TABLE IF NOT EXISTS spans (
84 trace_id VARCHAR NOT NULL,
85 span_id VARCHAR NOT NULL,
86 parent_span_id VARCHAR,
87 service VARCHAR NOT NULL,
88 operation VARCHAR NOT NULL,
89 start_time TIMESTAMP NOT NULL,
90 end_time TIMESTAMP NOT NULL,
91 duration_ms DOUBLE NOT NULL,
92 status VARCHAR NOT NULL DEFAULT 'unset',
93 attributes JSON,
94 events JSON,
95 kind VARCHAR NOT NULL DEFAULT 'internal',
96 llm JSON,
97 PRIMARY KEY (trace_id, span_id)
98 );
99
100 -- Migrate pre-existing databases created before the LLM columns.
101 -- DuckDB's ALTER ... ADD COLUMN rejects constraints, so these are
102 -- added nullable; fresh tables get the constraints from CREATE above,
103 -- and migrated NULL `kind` values are read back as 'internal'.
104 ALTER TABLE spans ADD COLUMN IF NOT EXISTS kind VARCHAR;
105 ALTER TABLE spans ADD COLUMN IF NOT EXISTS llm JSON;
106
107 CREATE INDEX IF NOT EXISTS idx_spans_service ON spans(service);
108 CREATE INDEX IF NOT EXISTS idx_spans_start_time ON spans(start_time);
109 CREATE INDEX IF NOT EXISTS idx_spans_trace_id ON spans(trace_id);
110 CREATE INDEX IF NOT EXISTS idx_spans_status ON spans(status);
111 CREATE INDEX IF NOT EXISTS idx_spans_kind ON spans(kind);
112
113 CREATE TABLE IF NOT EXISTS trace_comments (
114 id VARCHAR NOT NULL PRIMARY KEY,
115 trace_id VARCHAR NOT NULL,
116 span_id VARCHAR,
117 author VARCHAR NOT NULL,
118 body VARCHAR NOT NULL,
119 created_at TIMESTAMP NOT NULL DEFAULT current_timestamp::TIMESTAMP
120 );
121
122 CREATE INDEX IF NOT EXISTS idx_comments_trace_id ON trace_comments(trace_id);
123
124 CREATE TABLE IF NOT EXISTS logs (
125 timestamp TIMESTAMP NOT NULL,
126 observed_timestamp TIMESTAMP NOT NULL,
127 trace_id VARCHAR,
128 span_id VARCHAR,
129 severity VARCHAR NOT NULL DEFAULT 'unspecified',
130 severity_text VARCHAR NOT NULL DEFAULT '',
131 body VARCHAR NOT NULL DEFAULT '',
132 service VARCHAR NOT NULL,
133 attributes JSON,
134 body_sha256 VARCHAR
135 );
136
137 -- Migrate pre-existing databases (nullable, no constraint).
138 ALTER TABLE logs ADD COLUMN IF NOT EXISTS body_sha256 VARCHAR;
139
140 CREATE INDEX IF NOT EXISTS idx_logs_service ON logs(service);
141 CREATE INDEX IF NOT EXISTS idx_logs_timestamp ON logs(timestamp);
142 CREATE INDEX IF NOT EXISTS idx_logs_severity ON logs(severity);
143 CREATE INDEX IF NOT EXISTS idx_logs_trace_id ON logs(trace_id);
144
145 CREATE TABLE IF NOT EXISTS metrics (
146 timestamp TIMESTAMP NOT NULL,
147 service VARCHAR NOT NULL,
148 name VARCHAR NOT NULL,
149 metric_type VARCHAR NOT NULL DEFAULT 'unknown',
150 value DOUBLE NOT NULL,
151 unit VARCHAR NOT NULL DEFAULT '',
152 attributes JSON
153 );
154
155 CREATE INDEX IF NOT EXISTS idx_metrics_service ON metrics(service);
156 CREATE INDEX IF NOT EXISTS idx_metrics_name ON metrics(name);
157 CREATE INDEX IF NOT EXISTS idx_metrics_timestamp ON metrics(timestamp);
158 ",
159 )?;
160 Ok(())
161 }
162
163 pub fn insert_spans(&self, spans: &[Span]) -> Result<()> {
164 let conn = self.conn.lock().unwrap();
165 let mut stmt = conn.prepare(
166 "INSERT OR REPLACE INTO spans
167 (trace_id, span_id, parent_span_id, service, operation,
168 start_time, end_time, duration_ms, status, attributes, events,
169 kind, llm)
170 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
171 )?;
172
173 for span in spans {
174 let attrs = serde_json::to_string(&span.attributes)?;
175 let events = serde_json::to_string(&span.events)?;
176 let llm = span.llm.as_ref().map(serde_json::to_string).transpose()?;
177 stmt.execute(params![
178 span.trace_id,
179 span.span_id,
180 span.parent_span_id,
181 span.service,
182 span.operation,
183 span.start_time.format("%Y-%m-%d %H:%M:%S%.6f").to_string(),
184 span.end_time.format("%Y-%m-%d %H:%M:%S%.6f").to_string(),
185 span.duration_ms,
186 span.status.to_string(),
187 attrs,
188 events,
189 span.kind.to_string(),
190 llm,
191 ])?;
192 }
193 Ok(())
194 }
195
196 pub fn query_traces(&self, query: &TraceQuery) -> Result<Vec<Span>> {
197 let conn = self.conn.lock().unwrap();
198
199 let mut sql = String::from(
200 "SELECT trace_id, span_id, parent_span_id, service, operation,
201 start_time::VARCHAR, end_time::VARCHAR, duration_ms, status, attributes, events,
202 kind, llm
203 FROM spans WHERE 1=1",
204 );
205 let mut param_values: Vec<Box<dyn duckdb::ToSql>> = Vec::new();
206
207 if let Some(ref svc) = query.service {
208 sql.push_str(" AND service = ?");
209 param_values.push(Box::new(svc.clone()));
210 }
211 if let Some(ref op) = query.operation {
212 sql.push_str(" AND operation LIKE ?");
213 param_values.push(Box::new(format!("%{op}%")));
214 }
215 if let Some(min) = query.min_duration_ms {
216 sql.push_str(" AND duration_ms >= ?");
217 param_values.push(Box::new(min));
218 }
219 if let Some(max) = query.max_duration_ms {
220 sql.push_str(" AND duration_ms <= ?");
221 param_values.push(Box::new(max));
222 }
223 if let Some(ref status) = query.status {
224 sql.push_str(" AND status = ?");
225 param_values.push(Box::new(status.clone()));
226 }
227 if let Some(secs) = query.last_seconds {
228 sql.push_str(&format!(
229 " AND start_time >= current_timestamp::TIMESTAMP - INTERVAL '{secs} seconds'"
230 ));
231 }
232 for (k, v) in &query.attributes {
233 sql.push_str(" AND json_extract_string(attributes, ?) = ?");
234 param_values.push(Box::new(json_path_for_key(k)));
235 param_values.push(Box::new(v.clone()));
236 }
237
238 sql.push_str(" ORDER BY start_time DESC");
239
240 let limit = query.limit.unwrap_or(100);
241 sql.push_str(&format!(" LIMIT {limit}"));
242
243 let params_ref: Vec<&dyn duckdb::ToSql> = param_values.iter().map(|p| p.as_ref()).collect();
244 let mut stmt = conn.prepare(&sql)?;
245 let rows = stmt.query_map(params_ref.as_slice(), row_to_span)?;
246
247 let mut spans = Vec::new();
248 for row in rows {
249 spans.push(row?);
250 }
251 Ok(spans)
252 }
253
254 pub fn get_trace(&self, trace_id: &str) -> Result<Vec<Span>> {
255 let conn = self.conn.lock().unwrap();
256 let mut stmt = conn.prepare(
257 "SELECT trace_id, span_id, parent_span_id, service, operation,
258 start_time::VARCHAR, end_time::VARCHAR, duration_ms, status, attributes, events,
259 kind, llm
260 FROM spans
261 WHERE trace_id = ?
262 ORDER BY start_time ASC",
263 )?;
264
265 let rows = stmt.query_map(params![trace_id], row_to_span)?;
266
267 let mut spans = Vec::new();
268 for row in rows {
269 spans.push(row?);
270 }
271 Ok(spans)
272 }
273
274 pub fn add_comment(
275 &self,
276 trace_id: &str,
277 span_id: Option<&str>,
278 author: &str,
279 body: &str,
280 ) -> Result<TraceComment> {
281 let conn = self.conn.lock().unwrap();
282 let id = uuid::Uuid::new_v4().to_string();
283 conn.execute(
284 "INSERT INTO trace_comments (id, trace_id, span_id, author, body)
285 VALUES (?, ?, ?, ?, ?)",
286 params![id, trace_id, span_id, author, body],
287 )?;
288
289 let mut stmt = conn.prepare(
290 "SELECT id, trace_id, span_id, author, body, created_at::VARCHAR
291 FROM trace_comments WHERE id = ?",
292 )?;
293 let comment = stmt.query_row(params![id], |row| {
294 Ok(TraceComment {
295 id: row.get(0)?,
296 trace_id: row.get(1)?,
297 span_id: row.get(2)?,
298 author: row.get(3)?,
299 body: row.get(4)?,
300 created_at: row.get(5)?,
301 })
302 })?;
303 Ok(comment)
304 }
305
306 pub fn get_comments(&self, trace_id: &str) -> Result<Vec<TraceComment>> {
307 let conn = self.conn.lock().unwrap();
308 let mut stmt = conn.prepare(
309 "SELECT id, trace_id, span_id, author, body, created_at::VARCHAR
310 FROM trace_comments
311 WHERE trace_id = ?
312 ORDER BY created_at ASC",
313 )?;
314 let rows = stmt.query_map(params![trace_id], |row| {
315 Ok(TraceComment {
316 id: row.get(0)?,
317 trace_id: row.get(1)?,
318 span_id: row.get(2)?,
319 author: row.get(3)?,
320 body: row.get(4)?,
321 created_at: row.get(5)?,
322 })
323 })?;
324 let mut comments = Vec::new();
325 for row in rows {
326 comments.push(row?);
327 }
328 Ok(comments)
329 }
330
331 pub fn list_services(&self) -> Result<Vec<ServiceInfo>> {
332 let conn = self.conn.lock().unwrap();
333 let mut stmt = conn.prepare(
334 "SELECT service,
335 COUNT(*) as span_count,
336 COUNT(DISTINCT trace_id) as trace_count,
337 AVG(duration_ms) as avg_duration,
338 SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END)::DOUBLE / COUNT(*)::DOUBLE as error_rate
339 FROM spans
340 GROUP BY service
341 ORDER BY span_count DESC",
342 )?;
343
344 let rows = stmt.query_map([], |row| {
345 Ok(ServiceInfo {
346 name: row.get(0)?,
347 span_count: row.get(1)?,
348 trace_count: row.get(2)?,
349 avg_duration_ms: row.get(3)?,
350 error_rate: row.get(4)?,
351 })
352 })?;
353
354 let mut services = Vec::new();
355 for row in rows {
356 services.push(row?);
357 }
358 Ok(services)
359 }
360
361 pub fn insert_logs(&self, logs: &[LogRecord]) -> Result<()> {
364 let conn = self.conn.lock().unwrap();
365 let mut stmt = conn.prepare(
366 "INSERT INTO logs
367 (timestamp, observed_timestamp, trace_id, span_id, severity,
368 severity_text, body, service, attributes, body_sha256)
369 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
370 )?;
371
372 for log in logs {
373 let attrs = serde_json::to_string(&log.attributes)?;
374 stmt.execute(params![
375 log.timestamp.format("%Y-%m-%d %H:%M:%S%.6f").to_string(),
376 log.observed_timestamp
377 .format("%Y-%m-%d %H:%M:%S%.6f")
378 .to_string(),
379 log.trace_id,
380 log.span_id,
381 log.severity.to_string(),
382 log.severity_text,
383 log.body,
384 log.service,
385 attrs,
386 log.body_sha256,
387 ])?;
388 }
389 Ok(())
390 }
391
392 pub fn query_logs(&self, query: &LogQuery) -> Result<Vec<LogRecord>> {
393 let conn = self.conn.lock().unwrap();
394
395 let mut sql = String::from(
396 "SELECT timestamp::VARCHAR, observed_timestamp::VARCHAR, trace_id, span_id,
397 severity, severity_text, body, service, attributes, body_sha256
398 FROM logs WHERE 1=1",
399 );
400 let mut param_values: Vec<Box<dyn duckdb::ToSql>> = Vec::new();
401
402 if let Some(ref svc) = query.service {
403 sql.push_str(" AND service = ?");
404 param_values.push(Box::new(svc.clone()));
405 }
406 if let Some(ref sev) = query.severity {
407 sql.push_str(" AND severity = ?");
408 param_values.push(Box::new(sev.clone()));
409 }
410 if let Some(ref body) = query.body_contains {
411 sql.push_str(" AND body LIKE ?");
412 param_values.push(Box::new(format!("%{body}%")));
413 }
414 if let Some(ref tid) = query.trace_id {
415 sql.push_str(" AND trace_id = ?");
416 param_values.push(Box::new(tid.clone()));
417 }
418 if let Some(secs) = query.last_seconds {
419 sql.push_str(&format!(
420 " AND timestamp >= current_timestamp::TIMESTAMP - INTERVAL '{secs} seconds'"
421 ));
422 }
423
424 sql.push_str(" ORDER BY timestamp DESC");
425
426 let limit = query.limit.unwrap_or(100);
427 sql.push_str(&format!(" LIMIT {limit}"));
428
429 let params_ref: Vec<&dyn duckdb::ToSql> = param_values.iter().map(|p| p.as_ref()).collect();
430 let mut stmt = conn.prepare(&sql)?;
431 let rows = stmt.query_map(params_ref.as_slice(), |row| {
432 let ts_str: String = row.get(0)?;
433 let obs_str: String = row.get(1)?;
434 let attrs_str: String = row.get(8)?;
435 let severity_str: String = row.get(4)?;
436
437 Ok(LogRecord {
438 timestamp: parse_timestamp(&ts_str),
439 observed_timestamp: parse_timestamp(&obs_str),
440 trace_id: row.get(2)?,
441 span_id: row.get(3)?,
442 severity: LogSeverity::from_str(&severity_str),
443 severity_text: row.get(5)?,
444 body: row.get(6)?,
445 service: row.get(7)?,
446 attributes: serde_json::from_str(&attrs_str).unwrap_or_default(),
447 body_sha256: row.get(9)?,
448 })
449 })?;
450
451 let mut logs = Vec::new();
452 for row in rows {
453 logs.push(row?);
454 }
455 Ok(logs)
456 }
457
458 pub fn insert_metrics(&self, metrics: &[MetricPoint]) -> Result<()> {
461 let conn = self.conn.lock().unwrap();
462 let mut stmt = conn.prepare(
463 "INSERT INTO metrics
464 (timestamp, service, name, metric_type, value, unit, attributes)
465 VALUES (?, ?, ?, ?, ?, ?, ?)",
466 )?;
467
468 for m in metrics {
469 let attrs = serde_json::to_string(&m.attributes)?;
470 stmt.execute(params![
471 m.timestamp.format("%Y-%m-%d %H:%M:%S%.6f").to_string(),
472 m.service,
473 m.name,
474 m.metric_type.to_string(),
475 m.value,
476 m.unit,
477 attrs,
478 ])?;
479 }
480 Ok(())
481 }
482
483 pub fn query_metrics(&self, query: &MetricQuery) -> Result<Vec<MetricPoint>> {
484 let conn = self.conn.lock().unwrap();
485
486 let mut sql = String::from(
487 "SELECT timestamp::VARCHAR, service, name, metric_type, value, unit, attributes
488 FROM metrics WHERE 1=1",
489 );
490 let mut param_values: Vec<Box<dyn duckdb::ToSql>> = Vec::new();
491
492 if let Some(ref svc) = query.service {
493 sql.push_str(" AND service = ?");
494 param_values.push(Box::new(svc.clone()));
495 }
496 if let Some(ref name) = query.name {
497 sql.push_str(" AND name = ?");
498 param_values.push(Box::new(name.clone()));
499 }
500 if let Some(ref mt) = query.metric_type {
501 sql.push_str(" AND metric_type = ?");
502 param_values.push(Box::new(mt.clone()));
503 }
504 if let Some(secs) = query.last_seconds {
505 sql.push_str(&format!(
506 " AND timestamp >= current_timestamp::TIMESTAMP - INTERVAL '{secs} seconds'"
507 ));
508 }
509
510 sql.push_str(" ORDER BY timestamp DESC");
511
512 let limit = query.limit.unwrap_or(500);
513 sql.push_str(&format!(" LIMIT {limit}"));
514
515 let params_ref: Vec<&dyn duckdb::ToSql> = param_values.iter().map(|p| p.as_ref()).collect();
516 let mut stmt = conn.prepare(&sql)?;
517 let rows = stmt.query_map(params_ref.as_slice(), |row| {
518 let ts_str: String = row.get(0)?;
519 let mt_str: String = row.get(3)?;
520 let attrs_str: String = row.get(6)?;
521
522 Ok(MetricPoint {
523 timestamp: parse_timestamp(&ts_str),
524 service: row.get(1)?,
525 name: row.get(2)?,
526 metric_type: MetricType::from_str(&mt_str),
527 value: row.get(4)?,
528 unit: row.get(5)?,
529 attributes: serde_json::from_str(&attrs_str).unwrap_or_default(),
530 })
531 })?;
532
533 let mut metrics = Vec::new();
534 for row in rows {
535 metrics.push(row?);
536 }
537 Ok(metrics)
538 }
539
540 pub fn query_summary(&self, last_seconds: i64, service: Option<&str>) -> Result<SummaryReport> {
541 let conn = self.conn.lock().unwrap();
542
543 let span_time_clause = format!(
544 "start_time >= current_timestamp::TIMESTAMP - INTERVAL '{last_seconds} seconds'"
545 );
546 let ts_time_clause = format!(
547 "timestamp >= current_timestamp::TIMESTAMP - INTERVAL '{last_seconds} seconds'"
548 );
549
550 let svc_clause = if service.is_some() {
551 " AND service = ?"
552 } else {
553 ""
554 };
555 let svc_owned = service.map(|s| s.to_string());
556 let build_params = || -> Vec<&dyn duckdb::ToSql> {
557 match &svc_owned {
558 Some(s) => vec![s as &dyn duckdb::ToSql],
559 None => vec![],
560 }
561 };
562
563 let traces_sql = format!(
565 "SELECT
566 COUNT(*)::BIGINT,
567 COUNT(DISTINCT trace_id)::BIGINT,
568 SUM(CASE WHEN status='error' THEN 1 ELSE 0 END)::BIGINT,
569 COALESCE(AVG(duration_ms), 0.0),
570 COALESCE(MAX(duration_ms), 0.0),
571 COALESCE(quantile_cont(duration_ms, 0.5), 0.0),
572 COALESCE(quantile_cont(duration_ms, 0.95), 0.0),
573 COALESCE(quantile_cont(duration_ms, 0.99), 0.0)
574 FROM spans WHERE {span_time_clause}{svc_clause}"
575 );
576 let mut stmt = conn.prepare(&traces_sql)?;
577 let traces: TraceSummary = stmt.query_row(build_params().as_slice(), |row| {
578 let span_count: i64 = row.get::<_, Option<i64>>(0)?.unwrap_or(0);
579 let error_count: i64 = row.get::<_, Option<i64>>(2)?.unwrap_or(0);
580 let error_rate = if span_count > 0 {
581 error_count as f64 / span_count as f64
582 } else {
583 0.0
584 };
585 Ok(TraceSummary {
586 span_count,
587 trace_count: row.get::<_, Option<i64>>(1)?.unwrap_or(0),
588 error_count,
589 error_rate,
590 avg_ms: row.get(3)?,
591 max_ms: row.get(4)?,
592 p50_ms: row.get(5)?,
593 p95_ms: row.get(6)?,
594 p99_ms: row.get(7)?,
595 })
596 })?;
597 drop(stmt);
598
599 let top_svc_sql = format!(
601 "SELECT service,
602 COUNT(*)::BIGINT,
603 SUM(CASE WHEN status='error' THEN 1 ELSE 0 END)::DOUBLE / NULLIF(COUNT(*), 0)::DOUBLE,
604 COALESCE(quantile_cont(duration_ms, 0.95), 0.0)
605 FROM spans WHERE {span_time_clause}{svc_clause}
606 GROUP BY service
607 ORDER BY 2 DESC
608 LIMIT 5"
609 );
610 let mut stmt = conn.prepare(&top_svc_sql)?;
611 let top_services: Vec<ServiceSummary> = stmt
612 .query_map(build_params().as_slice(), |row| {
613 Ok(ServiceSummary {
614 service: row.get(0)?,
615 span_count: row.get(1)?,
616 error_rate: row.get::<_, Option<f64>>(2)?.unwrap_or(0.0),
617 p95_ms: row.get(3)?,
618 })
619 })?
620 .collect::<duckdb::Result<_>>()?;
621 drop(stmt);
622
623 let err_op_sql = format!(
625 "SELECT service, operation, COUNT(*)::BIGINT
626 FROM spans
627 WHERE status='error' AND {span_time_clause}{svc_clause}
628 GROUP BY service, operation
629 ORDER BY 3 DESC
630 LIMIT 5"
631 );
632 let mut stmt = conn.prepare(&err_op_sql)?;
633 let top_error_operations: Vec<ErrorOperation> = stmt
634 .query_map(build_params().as_slice(), |row| {
635 Ok(ErrorOperation {
636 service: row.get(0)?,
637 operation: row.get(1)?,
638 error_count: row.get(2)?,
639 })
640 })?
641 .collect::<duckdb::Result<_>>()?;
642 drop(stmt);
643
644 let logs_sql = format!(
646 "SELECT
647 COUNT(*)::BIGINT,
648 SUM(CASE WHEN severity IN ('error','fatal') THEN 1 ELSE 0 END)::BIGINT,
649 SUM(CASE WHEN severity='warn' THEN 1 ELSE 0 END)::BIGINT,
650 SUM(CASE WHEN severity='info' THEN 1 ELSE 0 END)::BIGINT,
651 SUM(CASE WHEN severity IN ('debug','trace') THEN 1 ELSE 0 END)::BIGINT
652 FROM logs WHERE {ts_time_clause}{svc_clause}"
653 );
654 let mut stmt = conn.prepare(&logs_sql)?;
655 let logs: LogSummary = stmt.query_row(build_params().as_slice(), |row| {
656 Ok(LogSummary {
657 total: row.get::<_, Option<i64>>(0)?.unwrap_or(0),
658 error: row.get::<_, Option<i64>>(1)?.unwrap_or(0),
659 warn: row.get::<_, Option<i64>>(2)?.unwrap_or(0),
660 info: row.get::<_, Option<i64>>(3)?.unwrap_or(0),
661 debug: row.get::<_, Option<i64>>(4)?.unwrap_or(0),
662 })
663 })?;
664 drop(stmt);
665
666 let metrics_sql = format!(
668 "SELECT COUNT(*)::BIGINT, COUNT(DISTINCT name)::BIGINT
669 FROM metrics WHERE {ts_time_clause}{svc_clause}"
670 );
671 let mut stmt = conn.prepare(&metrics_sql)?;
672 let metrics: MetricSummary = stmt.query_row(build_params().as_slice(), |row| {
673 Ok(MetricSummary {
674 point_count: row.get::<_, Option<i64>>(0)?.unwrap_or(0),
675 unique_names: row.get::<_, Option<i64>>(1)?.unwrap_or(0),
676 })
677 })?;
678 drop(stmt);
679
680 Ok(SummaryReport {
681 window_seconds: last_seconds,
682 service_filter: service.map(|s| s.to_string()),
683 traces,
684 top_services,
685 top_error_operations,
686 logs,
687 metrics,
688 })
689 }
690
691 pub fn query_anomalies(
692 &self,
693 current_seconds: i64,
694 baseline_seconds: i64,
695 service: Option<&str>,
696 ) -> Result<AnomalyReport> {
697 let conn = self.conn.lock().unwrap();
698 let svc_owned = service.map(|s| s.to_string());
699 let svc_clause = if service.is_some() {
700 " AND service = ?"
701 } else {
702 ""
703 };
704
705 let stats_sql = |window: i64| -> String {
707 format!(
708 "SELECT service,
709 COUNT(*)::BIGINT AS span_count,
710 SUM(CASE WHEN status='error' THEN 1 ELSE 0 END)::DOUBLE
711 / NULLIF(COUNT(*), 0)::DOUBLE AS error_rate,
712 COALESCE(quantile_cont(duration_ms, 0.95), 0.0) AS p95_ms
713 FROM spans
714 WHERE start_time >= current_timestamp::TIMESTAMP - INTERVAL '{window} seconds'{svc_clause}
715 GROUP BY service"
716 )
717 };
718
719 let build_params = || -> Vec<&dyn duckdb::ToSql> {
720 match &svc_owned {
721 Some(s) => vec![s as &dyn duckdb::ToSql],
722 None => vec![],
723 }
724 };
725
726 let mut current_stats: std::collections::HashMap<String, (i64, f64, f64)> =
727 std::collections::HashMap::new();
728 let mut stmt = conn.prepare(&stats_sql(current_seconds))?;
729 let rows = stmt.query_map(build_params().as_slice(), |row| {
730 Ok((
731 row.get::<_, String>(0)?,
732 row.get::<_, i64>(1)?,
733 row.get::<_, Option<f64>>(2)?.unwrap_or(0.0),
734 row.get::<_, f64>(3)?,
735 ))
736 })?;
737 for r in rows {
738 let (svc, span_count, error_rate, p95) = r?;
739 current_stats.insert(svc, (span_count, error_rate, p95));
740 }
741 drop(stmt);
742
743 let mut baseline_stats: std::collections::HashMap<String, (i64, f64, f64)> =
744 std::collections::HashMap::new();
745 let mut stmt = conn.prepare(&stats_sql(baseline_seconds))?;
746 let rows = stmt.query_map(build_params().as_slice(), |row| {
747 Ok((
748 row.get::<_, String>(0)?,
749 row.get::<_, i64>(1)?,
750 row.get::<_, Option<f64>>(2)?.unwrap_or(0.0),
751 row.get::<_, f64>(3)?,
752 ))
753 })?;
754 for r in rows {
755 let (svc, span_count, error_rate, p95) = r?;
756 baseline_stats.insert(svc, (span_count, error_rate, p95));
757 }
758 drop(stmt);
759
760 let mut anomalies = Vec::new();
761 for (svc, (cur_count, cur_err, cur_p95)) in ¤t_stats {
762 if *cur_count < 5 {
763 continue;
764 }
765 let (base_count, base_err, base_p95) =
766 baseline_stats.get(svc).copied().unwrap_or((0, 0.0, 0.0));
767
768 let err_delta = cur_err - base_err;
769 if err_delta >= 0.05 && *cur_err > 0.0 {
770 let severity = if err_delta >= 0.25 {
771 "critical"
772 } else if err_delta >= 0.10 {
773 "warning"
774 } else {
775 "info"
776 };
777 anomalies.push(Anomaly {
778 service: svc.clone(),
779 kind: "error_rate".into(),
780 severity: severity.into(),
781 current: *cur_err,
782 baseline: base_err,
783 delta: err_delta,
784 description: format!(
785 "Error rate rose {:.1}% → {:.1}% (baseline {:.1}%, {} spans)",
786 base_err * 100.0,
787 cur_err * 100.0,
788 base_err * 100.0,
789 cur_count
790 ),
791 });
792 }
793
794 if base_p95 > 0.0 && base_count >= 5 {
795 let ratio = cur_p95 / base_p95;
796 if ratio >= 1.5 && *cur_p95 > 50.0 {
797 let severity = if ratio >= 3.0 {
798 "critical"
799 } else if ratio >= 2.0 {
800 "warning"
801 } else {
802 "info"
803 };
804 anomalies.push(Anomaly {
805 service: svc.clone(),
806 kind: "latency_p95".into(),
807 severity: severity.into(),
808 current: *cur_p95,
809 baseline: base_p95,
810 delta: cur_p95 - base_p95,
811 description: format!(
812 "p95 latency {:.1}ms → {:.1}ms ({:.1}× baseline)",
813 base_p95, cur_p95, ratio
814 ),
815 });
816 }
817 }
818 }
819
820 anomalies.sort_by(|a, b| {
821 let rank = |s: &str| match s {
822 "critical" => 0,
823 "warning" => 1,
824 _ => 2,
825 };
826 rank(&a.severity).cmp(&rank(&b.severity))
827 });
828
829 Ok(AnomalyReport {
830 current_seconds,
831 baseline_seconds,
832 service_filter: svc_owned,
833 anomalies,
834 })
835 }
836
837 pub fn query_correlate(&self, trace_id: &str) -> Result<Option<CorrelateReport>> {
838 let spans = self.get_trace(trace_id)?;
839 if spans.is_empty() {
840 return Ok(None);
841 }
842
843 let start_time = spans
844 .iter()
845 .map(|s| s.start_time)
846 .min()
847 .unwrap_or_else(Utc::now);
848 let end_time = spans
849 .iter()
850 .map(|s| s.end_time)
851 .max()
852 .unwrap_or_else(Utc::now);
853 let duration_ms = (end_time - start_time).num_milliseconds() as f64;
854 let error_count = spans
855 .iter()
856 .filter(|s| matches!(s.status, SpanStatus::Error))
857 .count() as i64;
858
859 let mut services: Vec<String> = spans.iter().map(|s| s.service.clone()).collect();
860 services.sort();
861 services.dedup();
862
863 let logs = self.query_logs(&LogQuery {
864 trace_id: Some(trace_id.to_string()),
865 limit: Some(500),
866 ..Default::default()
867 })?;
868
869 let metrics = {
871 let conn = self.conn.lock().unwrap();
872 let start = start_time
873 .naive_utc()
874 .format("%Y-%m-%d %H:%M:%S%.6f")
875 .to_string();
876 let end = end_time
877 .naive_utc()
878 .format("%Y-%m-%d %H:%M:%S%.6f")
879 .to_string();
880
881 let placeholders = std::iter::repeat("?")
882 .take(services.len())
883 .collect::<Vec<_>>()
884 .join(",");
885 let sql = format!(
886 "SELECT timestamp::VARCHAR, service, name, metric_type, value, unit, attributes
887 FROM metrics
888 WHERE timestamp BETWEEN ?::TIMESTAMP AND ?::TIMESTAMP
889 AND service IN ({placeholders})
890 ORDER BY timestamp ASC
891 LIMIT 500"
892 );
893 let mut stmt = conn.prepare(&sql)?;
894 let mut params_vec: Vec<&dyn duckdb::ToSql> =
895 vec![&start as &dyn duckdb::ToSql, &end as &dyn duckdb::ToSql];
896 for s in &services {
897 params_vec.push(s as &dyn duckdb::ToSql);
898 }
899 let rows = stmt.query_map(params_vec.as_slice(), |row| {
900 let ts_str: String = row.get(0)?;
901 let mt_str: String = row.get(3)?;
902 let attrs_str: String = row.get(6)?;
903 Ok(MetricPoint {
904 timestamp: parse_timestamp(&ts_str),
905 service: row.get(1)?,
906 name: row.get(2)?,
907 metric_type: MetricType::from_str(&mt_str),
908 value: row.get(4)?,
909 unit: row.get(5)?,
910 attributes: serde_json::from_str(&attrs_str).unwrap_or_default(),
911 })
912 })?;
913 let mut out = Vec::new();
914 for r in rows {
915 out.push(r?);
916 }
917 out
918 };
919
920 Ok(Some(CorrelateReport {
921 trace_id: trace_id.to_string(),
922 span_count: spans.len(),
923 services,
924 start_time: start_time.to_rfc3339(),
925 end_time: end_time.to_rfc3339(),
926 duration_ms,
927 error_count,
928 logs,
929 metrics,
930 }))
931 }
932}
933
934impl DuckDbStore {
935 pub fn query_sql(&self, sql: &str) -> Result<Vec<serde_json::Value>> {
940 let trimmed = sql.trim_start();
941 let head = trimmed
942 .split_whitespace()
943 .next()
944 .unwrap_or("")
945 .to_ascii_uppercase();
946 if head != "SELECT" && head != "WITH" {
947 anyhow::bail!("only read-only SELECT/WITH queries are allowed");
948 }
949
950 let conn = self.conn.lock().unwrap();
951 let mut stmt = conn.prepare(sql)?;
952 let mut rows = stmt.query([])?;
955 let column_names: Vec<String> = match rows.as_ref() {
956 Some(s) => (0..s.column_count())
957 .map(|i| {
958 s.column_name(i)
959 .cloned()
960 .unwrap_or_else(|_| "?".to_string())
961 })
962 .collect(),
963 None => Vec::new(),
964 };
965
966 let mut out = Vec::new();
967 while let Some(row) = rows.next()? {
968 let mut obj = serde_json::Map::with_capacity(column_names.len());
969 for (i, name) in column_names.iter().enumerate() {
970 let value: duckdb::types::Value = row.get(i)?;
971 obj.insert(name.clone(), duck_value_to_json(value));
972 }
973 out.push(serde_json::Value::Object(obj));
974 }
975 Ok(out)
976 }
977}
978
979fn duck_value_to_json(v: duckdb::types::Value) -> serde_json::Value {
982 use duckdb::types::Value as V;
983 use serde_json::Value as J;
984 match v {
985 V::Null => J::Null,
986 V::Boolean(b) => J::Bool(b),
987 V::TinyInt(n) => J::from(n),
988 V::SmallInt(n) => J::from(n),
989 V::Int(n) => J::from(n),
990 V::BigInt(n) => J::from(n),
991 V::UTinyInt(n) => J::from(n),
992 V::USmallInt(n) => J::from(n),
993 V::UInt(n) => J::from(n),
994 V::UBigInt(n) => J::from(n),
995 V::Float(f) => serde_json::Number::from_f64(f as f64)
996 .map(J::Number)
997 .unwrap_or(J::Null),
998 V::Double(f) => serde_json::Number::from_f64(f)
999 .map(J::Number)
1000 .unwrap_or(J::Null),
1001 V::Text(s) => J::String(s),
1002 other => J::String(format!("{other:?}")),
1003 }
1004}
1005
1006impl Store for DuckDbStore {
1010 fn insert_spans(&self, spans: &[Span]) -> Result<()> {
1011 DuckDbStore::insert_spans(self, spans)
1012 }
1013 fn query_traces(&self, query: &TraceQuery) -> Result<Vec<Span>> {
1014 DuckDbStore::query_traces(self, query)
1015 }
1016 fn get_trace(&self, trace_id: &str) -> Result<Vec<Span>> {
1017 DuckDbStore::get_trace(self, trace_id)
1018 }
1019 fn add_comment(
1020 &self,
1021 trace_id: &str,
1022 span_id: Option<&str>,
1023 author: &str,
1024 body: &str,
1025 ) -> Result<TraceComment> {
1026 DuckDbStore::add_comment(self, trace_id, span_id, author, body)
1027 }
1028 fn get_comments(&self, trace_id: &str) -> Result<Vec<TraceComment>> {
1029 DuckDbStore::get_comments(self, trace_id)
1030 }
1031 fn list_services(&self) -> Result<Vec<ServiceInfo>> {
1032 DuckDbStore::list_services(self)
1033 }
1034 fn insert_logs(&self, logs: &[LogRecord]) -> Result<()> {
1035 DuckDbStore::insert_logs(self, logs)
1036 }
1037 fn query_logs(&self, query: &LogQuery) -> Result<Vec<LogRecord>> {
1038 DuckDbStore::query_logs(self, query)
1039 }
1040 fn insert_metrics(&self, metrics: &[MetricPoint]) -> Result<()> {
1041 DuckDbStore::insert_metrics(self, metrics)
1042 }
1043 fn query_metrics(&self, query: &MetricQuery) -> Result<Vec<MetricPoint>> {
1044 DuckDbStore::query_metrics(self, query)
1045 }
1046 fn query_summary(&self, last_seconds: i64, service: Option<&str>) -> Result<SummaryReport> {
1047 DuckDbStore::query_summary(self, last_seconds, service)
1048 }
1049 fn query_anomalies(
1050 &self,
1051 current_seconds: i64,
1052 baseline_seconds: i64,
1053 service: Option<&str>,
1054 ) -> Result<AnomalyReport> {
1055 DuckDbStore::query_anomalies(self, current_seconds, baseline_seconds, service)
1056 }
1057 fn query_correlate(&self, trace_id: &str) -> Result<Option<CorrelateReport>> {
1058 DuckDbStore::query_correlate(self, trace_id)
1059 }
1060 fn query_sql(&self, sql: &str) -> Result<Vec<serde_json::Value>> {
1061 DuckDbStore::query_sql(self, sql)
1062 }
1063}
1064
1065#[cfg(test)]
1066mod tests {
1067 use super::*;
1068 use std::collections::HashMap;
1069
1070 fn span(trace_id: &str, attrs: &[(&str, &str)]) -> Span {
1071 let now = Utc::now();
1072 let mut attributes = HashMap::new();
1073 for (k, v) in attrs {
1074 attributes.insert((*k).to_string(), (*v).to_string());
1075 }
1076 Span {
1077 trace_id: trace_id.into(),
1078 span_id: format!("{trace_id}-span"),
1079 parent_span_id: None,
1080 service: "svc".into(),
1081 operation: "op".into(),
1082 start_time: now,
1083 end_time: now,
1084 duration_ms: 1.0,
1085 status: SpanStatus::Ok,
1086 attributes,
1087 events: vec![],
1088 kind: SpanKind::Internal,
1089 llm: None,
1090 }
1091 }
1092
1093 fn store() -> DuckDbStore {
1094 let dir = tempfile::tempdir().unwrap();
1095 DuckDbStore::new(dir.path().to_str().unwrap()).unwrap()
1096 }
1097
1098 #[test]
1099 fn attribute_filter_matches_dotted_key() {
1100 let s = store();
1101 s.insert_spans(&[
1102 span("a", &[("http.method", "GET"), ("http.status_code", "200")]),
1103 span("b", &[("http.method", "POST"), ("http.status_code", "500")]),
1104 ])
1105 .unwrap();
1106
1107 let q = TraceQuery {
1108 attributes: vec![("http.method".into(), "GET".into())],
1109 ..Default::default()
1110 };
1111 let got = s.query_traces(&q).unwrap();
1112 assert_eq!(got.len(), 1);
1113 assert_eq!(got[0].trace_id, "a");
1114 }
1115
1116 #[test]
1117 fn attribute_filters_are_anded() {
1118 let s = store();
1119 s.insert_spans(&[
1120 span("a", &[("http.method", "GET"), ("http.status_code", "200")]),
1121 span("b", &[("http.method", "GET"), ("http.status_code", "500")]),
1122 ])
1123 .unwrap();
1124
1125 let q = TraceQuery {
1126 attributes: vec![
1127 ("http.method".into(), "GET".into()),
1128 ("http.status_code".into(), "500".into()),
1129 ],
1130 ..Default::default()
1131 };
1132 let got = s.query_traces(&q).unwrap();
1133 assert_eq!(got.len(), 1);
1134 assert_eq!(got[0].trace_id, "b");
1135 }
1136
1137 #[test]
1138 fn attribute_filter_misses_when_value_differs() {
1139 let s = store();
1140 s.insert_spans(&[span("a", &[("env", "prod")])]).unwrap();
1141
1142 let q = TraceQuery {
1143 attributes: vec![("env".into(), "staging".into())],
1144 ..Default::default()
1145 };
1146 assert!(s.query_traces(&q).unwrap().is_empty());
1147 }
1148
1149 #[test]
1150 fn llm_span_round_trips() {
1151 use super::super::models::{LlmOperation, LlmSpan};
1152
1153 let s = store();
1154 let mut sp = span("llm", &[]);
1155 sp.kind = SpanKind::Llm;
1156 sp.llm = Some(LlmSpan {
1157 provider: "anthropic".into(),
1158 model: "claude-opus-4-7".into(),
1159 operation: LlmOperation::Chat,
1160 input_tokens: Some(1200),
1161 output_tokens: Some(340),
1162 total_tokens: Some(1540),
1163 cost_usd: Some(0.0185),
1164 ..Default::default()
1165 });
1166 s.insert_spans(&[sp]).unwrap();
1167
1168 let got = s.get_trace("llm").unwrap();
1169 assert_eq!(got.len(), 1);
1170 assert_eq!(got[0].kind, SpanKind::Llm);
1171 let llm = got[0].llm.as_ref().expect("llm extension preserved");
1172 assert_eq!(llm.model, "claude-opus-4-7");
1173 assert_eq!(llm.total_tokens, Some(1540));
1174 assert_eq!(llm.cost_usd, Some(0.0185));
1175 assert_eq!(llm.operation, LlmOperation::Chat);
1176 }
1177
1178 #[test]
1179 fn non_llm_span_has_no_extension_after_round_trip() {
1180 let s = store();
1181 s.insert_spans(&[span("plain", &[("k", "v")])]).unwrap();
1182 let got = s.get_trace("plain").unwrap();
1183 assert_eq!(got.len(), 1);
1184 assert_eq!(got[0].kind, SpanKind::Internal);
1185 assert!(got[0].llm.is_none());
1186 }
1187
1188 #[test]
1189 fn query_sql_reads_and_rejects_mutations() {
1190 let s = store();
1191 s.insert_spans(&[span("a", &[("env", "prod")]), span("b", &[("env", "prod")])])
1192 .unwrap();
1193
1194 let rows = s
1195 .query_sql("SELECT service, COUNT(*) AS n FROM spans GROUP BY service")
1196 .unwrap();
1197 assert_eq!(rows.len(), 1);
1198 assert_eq!(rows[0]["service"], serde_json::json!("svc"));
1199 assert_eq!(rows[0]["n"], serde_json::json!(2));
1200
1201 assert!(s.query_sql("DELETE FROM spans").is_err());
1203 assert!(s.query_sql("DROP TABLE spans").is_err());
1204 }
1205
1206 #[test]
1207 fn log_body_sha256_round_trips() {
1208 let s = store();
1209 let log = LogRecord {
1210 timestamp: Utc::now(),
1211 observed_timestamp: Utc::now(),
1212 trace_id: Some("t1".into()),
1213 span_id: None,
1214 severity: LogSeverity::Error,
1215 severity_text: "ERROR".into(),
1216 body: String::new(), service: "svc".into(),
1218 attributes: HashMap::new(),
1219 body_sha256: Some("abc123".into()),
1220 };
1221 s.insert_logs(&[log]).unwrap();
1222
1223 let got = s
1224 .query_logs(&LogQuery {
1225 trace_id: Some("t1".into()),
1226 ..Default::default()
1227 })
1228 .unwrap();
1229 assert_eq!(got.len(), 1);
1230 assert_eq!(got[0].body_sha256.as_deref(), Some("abc123"));
1231 assert!(got[0].body.is_empty());
1232 }
1233}