1use std::path::Path;
2use std::sync::Mutex;
3
4use anyhow::Result;
5use chrono::{DateTime, NaiveDateTime, Utc};
6use duckdb::{params, Connection};
7
8use super::Store;
9use super::models::{
10 Anomaly, AnomalyReport, CorrelateReport, ErrorOperation, LogQuery, LogRecord, LogSeverity,
11 LogSummary, MetricPoint, MetricQuery, MetricSummary, MetricType, ServiceInfo, ServiceSummary,
12 Span, SpanKind, SpanStatus, SummaryReport, TraceComment, TraceQuery, TraceSummary,
13};
14
15fn json_path_for_key(key: &str) -> String {
19 let escaped = key.replace('\\', r"\\").replace('"', r#"\""#);
20 format!(r#"$."{escaped}""#)
21}
22
23fn parse_timestamp(s: &str) -> DateTime<Utc> {
24 NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f")
25 .or_else(|_| NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S"))
26 .map(|dt| dt.and_utc())
27 .unwrap_or_default()
28}
29
30fn row_to_span(row: &duckdb::Row<'_>) -> duckdb::Result<Span> {
31 let attrs_str: String = row.get(9)?;
32 let events_str: String = row.get(10)?;
33 let status_str: String = row.get(8)?;
34 let parent: Option<String> = row.get(2)?;
35 let start_str: String = row.get(5)?;
36 let end_str: String = row.get(6)?;
37 let kind_str: Option<String> = row.get(11)?;
38 let llm_str: Option<String> = row.get(12)?;
39
40 Ok(Span {
41 trace_id: row.get(0)?,
42 span_id: row.get(1)?,
43 parent_span_id: if parent.as_deref() == Some("") {
44 None
45 } else {
46 parent
47 },
48 service: row.get(3)?,
49 operation: row.get(4)?,
50 start_time: parse_timestamp(&start_str),
51 end_time: parse_timestamp(&end_str),
52 duration_ms: row.get(7)?,
53 status: SpanStatus::from_str(&status_str),
54 attributes: serde_json::from_str(&attrs_str).unwrap_or_default(),
55 events: serde_json::from_str(&events_str).unwrap_or_default(),
56 kind: kind_str
57 .map(|s| SpanKind::from_str(&s))
58 .unwrap_or(SpanKind::Internal),
59 llm: llm_str.and_then(|s| serde_json::from_str(&s).ok()),
60 })
61}
62
63pub struct DuckDbStore {
64 conn: Mutex<Connection>,
65}
66
67impl DuckDbStore {
68 pub fn new(data_dir: &str) -> Result<Self> {
69 std::fs::create_dir_all(data_dir)?;
70 let db_path = Path::new(data_dir).join("tael.duckdb");
71 let conn = Connection::open(db_path)?;
72 let store = Self {
73 conn: Mutex::new(conn),
74 };
75 store.init_schema()?;
76 Ok(store)
77 }
78
79 fn init_schema(&self) -> Result<()> {
80 let conn = self.conn.lock().unwrap();
81 conn.execute_batch(
82 "
83 CREATE TABLE IF NOT EXISTS spans (
84 trace_id VARCHAR NOT NULL,
85 span_id VARCHAR NOT NULL,
86 parent_span_id VARCHAR,
87 service VARCHAR NOT NULL,
88 operation VARCHAR NOT NULL,
89 start_time TIMESTAMP NOT NULL,
90 end_time TIMESTAMP NOT NULL,
91 duration_ms DOUBLE NOT NULL,
92 status VARCHAR NOT NULL DEFAULT 'unset',
93 attributes JSON,
94 events JSON,
95 kind VARCHAR NOT NULL DEFAULT 'internal',
96 llm JSON,
97 PRIMARY KEY (trace_id, span_id)
98 );
99
100 -- Migrate pre-existing databases created before the LLM columns.
101 -- DuckDB's ALTER ... ADD COLUMN rejects constraints, so these are
102 -- added nullable; fresh tables get the constraints from CREATE above,
103 -- and migrated NULL `kind` values are read back as 'internal'.
104 ALTER TABLE spans ADD COLUMN IF NOT EXISTS kind VARCHAR;
105 ALTER TABLE spans ADD COLUMN IF NOT EXISTS llm JSON;
106
107 CREATE INDEX IF NOT EXISTS idx_spans_service ON spans(service);
108 CREATE INDEX IF NOT EXISTS idx_spans_start_time ON spans(start_time);
109 CREATE INDEX IF NOT EXISTS idx_spans_trace_id ON spans(trace_id);
110 CREATE INDEX IF NOT EXISTS idx_spans_status ON spans(status);
111 CREATE INDEX IF NOT EXISTS idx_spans_kind ON spans(kind);
112
113 CREATE TABLE IF NOT EXISTS trace_comments (
114 id VARCHAR NOT NULL PRIMARY KEY,
115 trace_id VARCHAR NOT NULL,
116 span_id VARCHAR,
117 author VARCHAR NOT NULL,
118 body VARCHAR NOT NULL,
119 created_at TIMESTAMP NOT NULL DEFAULT current_timestamp::TIMESTAMP
120 );
121
122 CREATE INDEX IF NOT EXISTS idx_comments_trace_id ON trace_comments(trace_id);
123
124 CREATE TABLE IF NOT EXISTS logs (
125 timestamp TIMESTAMP NOT NULL,
126 observed_timestamp TIMESTAMP NOT NULL,
127 trace_id VARCHAR,
128 span_id VARCHAR,
129 severity VARCHAR NOT NULL DEFAULT 'unspecified',
130 severity_text VARCHAR NOT NULL DEFAULT '',
131 body VARCHAR NOT NULL DEFAULT '',
132 service VARCHAR NOT NULL,
133 attributes JSON,
134 body_sha256 VARCHAR
135 );
136
137 -- Migrate pre-existing databases (nullable, no constraint).
138 ALTER TABLE logs ADD COLUMN IF NOT EXISTS body_sha256 VARCHAR;
139
140 CREATE INDEX IF NOT EXISTS idx_logs_service ON logs(service);
141 CREATE INDEX IF NOT EXISTS idx_logs_timestamp ON logs(timestamp);
142 CREATE INDEX IF NOT EXISTS idx_logs_severity ON logs(severity);
143 CREATE INDEX IF NOT EXISTS idx_logs_trace_id ON logs(trace_id);
144
145 CREATE TABLE IF NOT EXISTS metrics (
146 timestamp TIMESTAMP NOT NULL,
147 service VARCHAR NOT NULL,
148 name VARCHAR NOT NULL,
149 metric_type VARCHAR NOT NULL DEFAULT 'unknown',
150 value DOUBLE NOT NULL,
151 unit VARCHAR NOT NULL DEFAULT '',
152 attributes JSON
153 );
154
155 CREATE INDEX IF NOT EXISTS idx_metrics_service ON metrics(service);
156 CREATE INDEX IF NOT EXISTS idx_metrics_name ON metrics(name);
157 CREATE INDEX IF NOT EXISTS idx_metrics_timestamp ON metrics(timestamp);
158 ",
159 )?;
160 Ok(())
161 }
162
163 pub fn insert_spans(&self, spans: &[Span]) -> Result<()> {
164 let conn = self.conn.lock().unwrap();
165 let mut stmt = conn.prepare(
166 "INSERT OR REPLACE INTO spans
167 (trace_id, span_id, parent_span_id, service, operation,
168 start_time, end_time, duration_ms, status, attributes, events,
169 kind, llm)
170 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
171 )?;
172
173 for span in spans {
174 let attrs = serde_json::to_string(&span.attributes)?;
175 let events = serde_json::to_string(&span.events)?;
176 let llm = span
177 .llm
178 .as_ref()
179 .map(serde_json::to_string)
180 .transpose()?;
181 stmt.execute(params![
182 span.trace_id,
183 span.span_id,
184 span.parent_span_id,
185 span.service,
186 span.operation,
187 span.start_time.format("%Y-%m-%d %H:%M:%S%.6f").to_string(),
188 span.end_time.format("%Y-%m-%d %H:%M:%S%.6f").to_string(),
189 span.duration_ms,
190 span.status.to_string(),
191 attrs,
192 events,
193 span.kind.to_string(),
194 llm,
195 ])?;
196 }
197 Ok(())
198 }
199
200 pub fn query_traces(&self, query: &TraceQuery) -> Result<Vec<Span>> {
201 let conn = self.conn.lock().unwrap();
202
203 let mut sql = String::from(
204 "SELECT trace_id, span_id, parent_span_id, service, operation,
205 start_time::VARCHAR, end_time::VARCHAR, duration_ms, status, attributes, events,
206 kind, llm
207 FROM spans WHERE 1=1",
208 );
209 let mut param_values: Vec<Box<dyn duckdb::ToSql>> = Vec::new();
210
211 if let Some(ref svc) = query.service {
212 sql.push_str(" AND service = ?");
213 param_values.push(Box::new(svc.clone()));
214 }
215 if let Some(ref op) = query.operation {
216 sql.push_str(" AND operation LIKE ?");
217 param_values.push(Box::new(format!("%{op}%")));
218 }
219 if let Some(min) = query.min_duration_ms {
220 sql.push_str(" AND duration_ms >= ?");
221 param_values.push(Box::new(min));
222 }
223 if let Some(max) = query.max_duration_ms {
224 sql.push_str(" AND duration_ms <= ?");
225 param_values.push(Box::new(max));
226 }
227 if let Some(ref status) = query.status {
228 sql.push_str(" AND status = ?");
229 param_values.push(Box::new(status.clone()));
230 }
231 if let Some(secs) = query.last_seconds {
232 sql.push_str(&format!(" AND start_time >= current_timestamp::TIMESTAMP - INTERVAL '{secs} seconds'"));
233 }
234 for (k, v) in &query.attributes {
235 sql.push_str(" AND json_extract_string(attributes, ?) = ?");
236 param_values.push(Box::new(json_path_for_key(k)));
237 param_values.push(Box::new(v.clone()));
238 }
239
240 sql.push_str(" ORDER BY start_time DESC");
241
242 let limit = query.limit.unwrap_or(100);
243 sql.push_str(&format!(" LIMIT {limit}"));
244
245 let params_ref: Vec<&dyn duckdb::ToSql> = param_values.iter().map(|p| p.as_ref()).collect();
246 let mut stmt = conn.prepare(&sql)?;
247 let rows = stmt.query_map(params_ref.as_slice(), row_to_span)?;
248
249 let mut spans = Vec::new();
250 for row in rows {
251 spans.push(row?);
252 }
253 Ok(spans)
254 }
255
256 pub fn get_trace(&self, trace_id: &str) -> Result<Vec<Span>> {
257 let conn = self.conn.lock().unwrap();
258 let mut stmt = conn.prepare(
259 "SELECT trace_id, span_id, parent_span_id, service, operation,
260 start_time::VARCHAR, end_time::VARCHAR, duration_ms, status, attributes, events,
261 kind, llm
262 FROM spans
263 WHERE trace_id = ?
264 ORDER BY start_time ASC",
265 )?;
266
267 let rows = stmt.query_map(params![trace_id], row_to_span)?;
268
269 let mut spans = Vec::new();
270 for row in rows {
271 spans.push(row?);
272 }
273 Ok(spans)
274 }
275
276 pub fn add_comment(
277 &self,
278 trace_id: &str,
279 span_id: Option<&str>,
280 author: &str,
281 body: &str,
282 ) -> Result<TraceComment> {
283 let conn = self.conn.lock().unwrap();
284 let id = uuid::Uuid::new_v4().to_string();
285 conn.execute(
286 "INSERT INTO trace_comments (id, trace_id, span_id, author, body)
287 VALUES (?, ?, ?, ?, ?)",
288 params![id, trace_id, span_id, author, body],
289 )?;
290
291 let mut stmt = conn.prepare(
292 "SELECT id, trace_id, span_id, author, body, created_at::VARCHAR
293 FROM trace_comments WHERE id = ?",
294 )?;
295 let comment = stmt.query_row(params![id], |row| {
296 Ok(TraceComment {
297 id: row.get(0)?,
298 trace_id: row.get(1)?,
299 span_id: row.get(2)?,
300 author: row.get(3)?,
301 body: row.get(4)?,
302 created_at: row.get(5)?,
303 })
304 })?;
305 Ok(comment)
306 }
307
308 pub fn get_comments(&self, trace_id: &str) -> Result<Vec<TraceComment>> {
309 let conn = self.conn.lock().unwrap();
310 let mut stmt = conn.prepare(
311 "SELECT id, trace_id, span_id, author, body, created_at::VARCHAR
312 FROM trace_comments
313 WHERE trace_id = ?
314 ORDER BY created_at ASC",
315 )?;
316 let rows = stmt.query_map(params![trace_id], |row| {
317 Ok(TraceComment {
318 id: row.get(0)?,
319 trace_id: row.get(1)?,
320 span_id: row.get(2)?,
321 author: row.get(3)?,
322 body: row.get(4)?,
323 created_at: row.get(5)?,
324 })
325 })?;
326 let mut comments = Vec::new();
327 for row in rows {
328 comments.push(row?);
329 }
330 Ok(comments)
331 }
332
333 pub fn list_services(&self) -> Result<Vec<ServiceInfo>> {
334 let conn = self.conn.lock().unwrap();
335 let mut stmt = conn.prepare(
336 "SELECT service,
337 COUNT(*) as span_count,
338 COUNT(DISTINCT trace_id) as trace_count,
339 AVG(duration_ms) as avg_duration,
340 SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END)::DOUBLE / COUNT(*)::DOUBLE as error_rate
341 FROM spans
342 GROUP BY service
343 ORDER BY span_count DESC",
344 )?;
345
346 let rows = stmt.query_map([], |row| {
347 Ok(ServiceInfo {
348 name: row.get(0)?,
349 span_count: row.get(1)?,
350 trace_count: row.get(2)?,
351 avg_duration_ms: row.get(3)?,
352 error_rate: row.get(4)?,
353 })
354 })?;
355
356 let mut services = Vec::new();
357 for row in rows {
358 services.push(row?);
359 }
360 Ok(services)
361 }
362
363 pub fn insert_logs(&self, logs: &[LogRecord]) -> Result<()> {
366 let conn = self.conn.lock().unwrap();
367 let mut stmt = conn.prepare(
368 "INSERT INTO logs
369 (timestamp, observed_timestamp, trace_id, span_id, severity,
370 severity_text, body, service, attributes, body_sha256)
371 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
372 )?;
373
374 for log in logs {
375 let attrs = serde_json::to_string(&log.attributes)?;
376 stmt.execute(params![
377 log.timestamp.format("%Y-%m-%d %H:%M:%S%.6f").to_string(),
378 log.observed_timestamp.format("%Y-%m-%d %H:%M:%S%.6f").to_string(),
379 log.trace_id,
380 log.span_id,
381 log.severity.to_string(),
382 log.severity_text,
383 log.body,
384 log.service,
385 attrs,
386 log.body_sha256,
387 ])?;
388 }
389 Ok(())
390 }
391
392 pub fn query_logs(&self, query: &LogQuery) -> Result<Vec<LogRecord>> {
393 let conn = self.conn.lock().unwrap();
394
395 let mut sql = String::from(
396 "SELECT timestamp::VARCHAR, observed_timestamp::VARCHAR, trace_id, span_id,
397 severity, severity_text, body, service, attributes, body_sha256
398 FROM logs WHERE 1=1",
399 );
400 let mut param_values: Vec<Box<dyn duckdb::ToSql>> = Vec::new();
401
402 if let Some(ref svc) = query.service {
403 sql.push_str(" AND service = ?");
404 param_values.push(Box::new(svc.clone()));
405 }
406 if let Some(ref sev) = query.severity {
407 sql.push_str(" AND severity = ?");
408 param_values.push(Box::new(sev.clone()));
409 }
410 if let Some(ref body) = query.body_contains {
411 sql.push_str(" AND body LIKE ?");
412 param_values.push(Box::new(format!("%{body}%")));
413 }
414 if let Some(ref tid) = query.trace_id {
415 sql.push_str(" AND trace_id = ?");
416 param_values.push(Box::new(tid.clone()));
417 }
418 if let Some(secs) = query.last_seconds {
419 sql.push_str(&format!(
420 " AND timestamp >= current_timestamp::TIMESTAMP - INTERVAL '{secs} seconds'"
421 ));
422 }
423
424 sql.push_str(" ORDER BY timestamp DESC");
425
426 let limit = query.limit.unwrap_or(100);
427 sql.push_str(&format!(" LIMIT {limit}"));
428
429 let params_ref: Vec<&dyn duckdb::ToSql> =
430 param_values.iter().map(|p| p.as_ref()).collect();
431 let mut stmt = conn.prepare(&sql)?;
432 let rows = stmt.query_map(params_ref.as_slice(), |row| {
433 let ts_str: String = row.get(0)?;
434 let obs_str: String = row.get(1)?;
435 let attrs_str: String = row.get(8)?;
436 let severity_str: String = row.get(4)?;
437
438 Ok(LogRecord {
439 timestamp: parse_timestamp(&ts_str),
440 observed_timestamp: parse_timestamp(&obs_str),
441 trace_id: row.get(2)?,
442 span_id: row.get(3)?,
443 severity: LogSeverity::from_str(&severity_str),
444 severity_text: row.get(5)?,
445 body: row.get(6)?,
446 service: row.get(7)?,
447 attributes: serde_json::from_str(&attrs_str).unwrap_or_default(),
448 body_sha256: row.get(9)?,
449 })
450 })?;
451
452 let mut logs = Vec::new();
453 for row in rows {
454 logs.push(row?);
455 }
456 Ok(logs)
457 }
458
459 pub fn insert_metrics(&self, metrics: &[MetricPoint]) -> Result<()> {
462 let conn = self.conn.lock().unwrap();
463 let mut stmt = conn.prepare(
464 "INSERT INTO metrics
465 (timestamp, service, name, metric_type, value, unit, attributes)
466 VALUES (?, ?, ?, ?, ?, ?, ?)",
467 )?;
468
469 for m in metrics {
470 let attrs = serde_json::to_string(&m.attributes)?;
471 stmt.execute(params![
472 m.timestamp.format("%Y-%m-%d %H:%M:%S%.6f").to_string(),
473 m.service,
474 m.name,
475 m.metric_type.to_string(),
476 m.value,
477 m.unit,
478 attrs,
479 ])?;
480 }
481 Ok(())
482 }
483
484 pub fn query_metrics(&self, query: &MetricQuery) -> Result<Vec<MetricPoint>> {
485 let conn = self.conn.lock().unwrap();
486
487 let mut sql = String::from(
488 "SELECT timestamp::VARCHAR, service, name, metric_type, value, unit, attributes
489 FROM metrics WHERE 1=1",
490 );
491 let mut param_values: Vec<Box<dyn duckdb::ToSql>> = Vec::new();
492
493 if let Some(ref svc) = query.service {
494 sql.push_str(" AND service = ?");
495 param_values.push(Box::new(svc.clone()));
496 }
497 if let Some(ref name) = query.name {
498 sql.push_str(" AND name = ?");
499 param_values.push(Box::new(name.clone()));
500 }
501 if let Some(ref mt) = query.metric_type {
502 sql.push_str(" AND metric_type = ?");
503 param_values.push(Box::new(mt.clone()));
504 }
505 if let Some(secs) = query.last_seconds {
506 sql.push_str(&format!(
507 " AND timestamp >= current_timestamp::TIMESTAMP - INTERVAL '{secs} seconds'"
508 ));
509 }
510
511 sql.push_str(" ORDER BY timestamp DESC");
512
513 let limit = query.limit.unwrap_or(500);
514 sql.push_str(&format!(" LIMIT {limit}"));
515
516 let params_ref: Vec<&dyn duckdb::ToSql> =
517 param_values.iter().map(|p| p.as_ref()).collect();
518 let mut stmt = conn.prepare(&sql)?;
519 let rows = stmt.query_map(params_ref.as_slice(), |row| {
520 let ts_str: String = row.get(0)?;
521 let mt_str: String = row.get(3)?;
522 let attrs_str: String = row.get(6)?;
523
524 Ok(MetricPoint {
525 timestamp: parse_timestamp(&ts_str),
526 service: row.get(1)?,
527 name: row.get(2)?,
528 metric_type: MetricType::from_str(&mt_str),
529 value: row.get(4)?,
530 unit: row.get(5)?,
531 attributes: serde_json::from_str(&attrs_str).unwrap_or_default(),
532 })
533 })?;
534
535 let mut metrics = Vec::new();
536 for row in rows {
537 metrics.push(row?);
538 }
539 Ok(metrics)
540 }
541
542 pub fn query_summary(
543 &self,
544 last_seconds: i64,
545 service: Option<&str>,
546 ) -> Result<SummaryReport> {
547 let conn = self.conn.lock().unwrap();
548
549 let span_time_clause = format!(
550 "start_time >= current_timestamp::TIMESTAMP - INTERVAL '{last_seconds} seconds'"
551 );
552 let ts_time_clause = format!(
553 "timestamp >= current_timestamp::TIMESTAMP - INTERVAL '{last_seconds} seconds'"
554 );
555
556 let svc_clause = if service.is_some() { " AND service = ?" } else { "" };
557 let svc_owned = service.map(|s| s.to_string());
558 let build_params = || -> Vec<&dyn duckdb::ToSql> {
559 match &svc_owned {
560 Some(s) => vec![s as &dyn duckdb::ToSql],
561 None => vec![],
562 }
563 };
564
565 let traces_sql = format!(
567 "SELECT
568 COUNT(*)::BIGINT,
569 COUNT(DISTINCT trace_id)::BIGINT,
570 SUM(CASE WHEN status='error' THEN 1 ELSE 0 END)::BIGINT,
571 COALESCE(AVG(duration_ms), 0.0),
572 COALESCE(MAX(duration_ms), 0.0),
573 COALESCE(quantile_cont(duration_ms, 0.5), 0.0),
574 COALESCE(quantile_cont(duration_ms, 0.95), 0.0),
575 COALESCE(quantile_cont(duration_ms, 0.99), 0.0)
576 FROM spans WHERE {span_time_clause}{svc_clause}"
577 );
578 let mut stmt = conn.prepare(&traces_sql)?;
579 let traces: TraceSummary = stmt.query_row(build_params().as_slice(), |row| {
580 let span_count: i64 = row.get::<_, Option<i64>>(0)?.unwrap_or(0);
581 let error_count: i64 = row.get::<_, Option<i64>>(2)?.unwrap_or(0);
582 let error_rate = if span_count > 0 {
583 error_count as f64 / span_count as f64
584 } else {
585 0.0
586 };
587 Ok(TraceSummary {
588 span_count,
589 trace_count: row.get::<_, Option<i64>>(1)?.unwrap_or(0),
590 error_count,
591 error_rate,
592 avg_ms: row.get(3)?,
593 max_ms: row.get(4)?,
594 p50_ms: row.get(5)?,
595 p95_ms: row.get(6)?,
596 p99_ms: row.get(7)?,
597 })
598 })?;
599 drop(stmt);
600
601 let top_svc_sql = format!(
603 "SELECT service,
604 COUNT(*)::BIGINT,
605 SUM(CASE WHEN status='error' THEN 1 ELSE 0 END)::DOUBLE / NULLIF(COUNT(*), 0)::DOUBLE,
606 COALESCE(quantile_cont(duration_ms, 0.95), 0.0)
607 FROM spans WHERE {span_time_clause}{svc_clause}
608 GROUP BY service
609 ORDER BY 2 DESC
610 LIMIT 5"
611 );
612 let mut stmt = conn.prepare(&top_svc_sql)?;
613 let top_services: Vec<ServiceSummary> = stmt
614 .query_map(build_params().as_slice(), |row| {
615 Ok(ServiceSummary {
616 service: row.get(0)?,
617 span_count: row.get(1)?,
618 error_rate: row.get::<_, Option<f64>>(2)?.unwrap_or(0.0),
619 p95_ms: row.get(3)?,
620 })
621 })?
622 .collect::<duckdb::Result<_>>()?;
623 drop(stmt);
624
625 let err_op_sql = format!(
627 "SELECT service, operation, COUNT(*)::BIGINT
628 FROM spans
629 WHERE status='error' AND {span_time_clause}{svc_clause}
630 GROUP BY service, operation
631 ORDER BY 3 DESC
632 LIMIT 5"
633 );
634 let mut stmt = conn.prepare(&err_op_sql)?;
635 let top_error_operations: Vec<ErrorOperation> = stmt
636 .query_map(build_params().as_slice(), |row| {
637 Ok(ErrorOperation {
638 service: row.get(0)?,
639 operation: row.get(1)?,
640 error_count: row.get(2)?,
641 })
642 })?
643 .collect::<duckdb::Result<_>>()?;
644 drop(stmt);
645
646 let logs_sql = format!(
648 "SELECT
649 COUNT(*)::BIGINT,
650 SUM(CASE WHEN severity IN ('error','fatal') THEN 1 ELSE 0 END)::BIGINT,
651 SUM(CASE WHEN severity='warn' THEN 1 ELSE 0 END)::BIGINT,
652 SUM(CASE WHEN severity='info' THEN 1 ELSE 0 END)::BIGINT,
653 SUM(CASE WHEN severity IN ('debug','trace') THEN 1 ELSE 0 END)::BIGINT
654 FROM logs WHERE {ts_time_clause}{svc_clause}"
655 );
656 let mut stmt = conn.prepare(&logs_sql)?;
657 let logs: LogSummary = stmt.query_row(build_params().as_slice(), |row| {
658 Ok(LogSummary {
659 total: row.get::<_, Option<i64>>(0)?.unwrap_or(0),
660 error: row.get::<_, Option<i64>>(1)?.unwrap_or(0),
661 warn: row.get::<_, Option<i64>>(2)?.unwrap_or(0),
662 info: row.get::<_, Option<i64>>(3)?.unwrap_or(0),
663 debug: row.get::<_, Option<i64>>(4)?.unwrap_or(0),
664 })
665 })?;
666 drop(stmt);
667
668 let metrics_sql = format!(
670 "SELECT COUNT(*)::BIGINT, COUNT(DISTINCT name)::BIGINT
671 FROM metrics WHERE {ts_time_clause}{svc_clause}"
672 );
673 let mut stmt = conn.prepare(&metrics_sql)?;
674 let metrics: MetricSummary = stmt.query_row(build_params().as_slice(), |row| {
675 Ok(MetricSummary {
676 point_count: row.get::<_, Option<i64>>(0)?.unwrap_or(0),
677 unique_names: row.get::<_, Option<i64>>(1)?.unwrap_or(0),
678 })
679 })?;
680 drop(stmt);
681
682 Ok(SummaryReport {
683 window_seconds: last_seconds,
684 service_filter: service.map(|s| s.to_string()),
685 traces,
686 top_services,
687 top_error_operations,
688 logs,
689 metrics,
690 })
691 }
692
693 pub fn query_anomalies(
694 &self,
695 current_seconds: i64,
696 baseline_seconds: i64,
697 service: Option<&str>,
698 ) -> Result<AnomalyReport> {
699 let conn = self.conn.lock().unwrap();
700 let svc_owned = service.map(|s| s.to_string());
701 let svc_clause = if service.is_some() { " AND service = ?" } else { "" };
702
703 let stats_sql = |window: i64| -> String {
705 format!(
706 "SELECT service,
707 COUNT(*)::BIGINT AS span_count,
708 SUM(CASE WHEN status='error' THEN 1 ELSE 0 END)::DOUBLE
709 / NULLIF(COUNT(*), 0)::DOUBLE AS error_rate,
710 COALESCE(quantile_cont(duration_ms, 0.95), 0.0) AS p95_ms
711 FROM spans
712 WHERE start_time >= current_timestamp::TIMESTAMP - INTERVAL '{window} seconds'{svc_clause}
713 GROUP BY service"
714 )
715 };
716
717 let build_params = || -> Vec<&dyn duckdb::ToSql> {
718 match &svc_owned {
719 Some(s) => vec![s as &dyn duckdb::ToSql],
720 None => vec![],
721 }
722 };
723
724 let mut current_stats: std::collections::HashMap<String, (i64, f64, f64)> =
725 std::collections::HashMap::new();
726 let mut stmt = conn.prepare(&stats_sql(current_seconds))?;
727 let rows = stmt.query_map(build_params().as_slice(), |row| {
728 Ok((
729 row.get::<_, String>(0)?,
730 row.get::<_, i64>(1)?,
731 row.get::<_, Option<f64>>(2)?.unwrap_or(0.0),
732 row.get::<_, f64>(3)?,
733 ))
734 })?;
735 for r in rows {
736 let (svc, span_count, error_rate, p95) = r?;
737 current_stats.insert(svc, (span_count, error_rate, p95));
738 }
739 drop(stmt);
740
741 let mut baseline_stats: std::collections::HashMap<String, (i64, f64, f64)> =
742 std::collections::HashMap::new();
743 let mut stmt = conn.prepare(&stats_sql(baseline_seconds))?;
744 let rows = stmt.query_map(build_params().as_slice(), |row| {
745 Ok((
746 row.get::<_, String>(0)?,
747 row.get::<_, i64>(1)?,
748 row.get::<_, Option<f64>>(2)?.unwrap_or(0.0),
749 row.get::<_, f64>(3)?,
750 ))
751 })?;
752 for r in rows {
753 let (svc, span_count, error_rate, p95) = r?;
754 baseline_stats.insert(svc, (span_count, error_rate, p95));
755 }
756 drop(stmt);
757
758 let mut anomalies = Vec::new();
759 for (svc, (cur_count, cur_err, cur_p95)) in ¤t_stats {
760 if *cur_count < 5 {
761 continue;
762 }
763 let (base_count, base_err, base_p95) =
764 baseline_stats.get(svc).copied().unwrap_or((0, 0.0, 0.0));
765
766 let err_delta = cur_err - base_err;
767 if err_delta >= 0.05 && *cur_err > 0.0 {
768 let severity = if err_delta >= 0.25 {
769 "critical"
770 } else if err_delta >= 0.10 {
771 "warning"
772 } else {
773 "info"
774 };
775 anomalies.push(Anomaly {
776 service: svc.clone(),
777 kind: "error_rate".into(),
778 severity: severity.into(),
779 current: *cur_err,
780 baseline: base_err,
781 delta: err_delta,
782 description: format!(
783 "Error rate rose {:.1}% → {:.1}% (baseline {:.1}%, {} spans)",
784 base_err * 100.0,
785 cur_err * 100.0,
786 base_err * 100.0,
787 cur_count
788 ),
789 });
790 }
791
792 if base_p95 > 0.0 && base_count >= 5 {
793 let ratio = cur_p95 / base_p95;
794 if ratio >= 1.5 && *cur_p95 > 50.0 {
795 let severity = if ratio >= 3.0 {
796 "critical"
797 } else if ratio >= 2.0 {
798 "warning"
799 } else {
800 "info"
801 };
802 anomalies.push(Anomaly {
803 service: svc.clone(),
804 kind: "latency_p95".into(),
805 severity: severity.into(),
806 current: *cur_p95,
807 baseline: base_p95,
808 delta: cur_p95 - base_p95,
809 description: format!(
810 "p95 latency {:.1}ms → {:.1}ms ({:.1}× baseline)",
811 base_p95, cur_p95, ratio
812 ),
813 });
814 }
815 }
816 }
817
818 anomalies.sort_by(|a, b| {
819 let rank = |s: &str| match s {
820 "critical" => 0,
821 "warning" => 1,
822 _ => 2,
823 };
824 rank(&a.severity).cmp(&rank(&b.severity))
825 });
826
827 Ok(AnomalyReport {
828 current_seconds,
829 baseline_seconds,
830 service_filter: svc_owned,
831 anomalies,
832 })
833 }
834
835 pub fn query_correlate(&self, trace_id: &str) -> Result<Option<CorrelateReport>> {
836 let spans = self.get_trace(trace_id)?;
837 if spans.is_empty() {
838 return Ok(None);
839 }
840
841 let start_time = spans
842 .iter()
843 .map(|s| s.start_time)
844 .min()
845 .unwrap_or_else(Utc::now);
846 let end_time = spans
847 .iter()
848 .map(|s| s.end_time)
849 .max()
850 .unwrap_or_else(Utc::now);
851 let duration_ms = (end_time - start_time).num_milliseconds() as f64;
852 let error_count = spans
853 .iter()
854 .filter(|s| matches!(s.status, SpanStatus::Error))
855 .count() as i64;
856
857 let mut services: Vec<String> =
858 spans.iter().map(|s| s.service.clone()).collect();
859 services.sort();
860 services.dedup();
861
862 let logs = self.query_logs(&LogQuery {
863 trace_id: Some(trace_id.to_string()),
864 limit: Some(500),
865 ..Default::default()
866 })?;
867
868 let metrics = {
870 let conn = self.conn.lock().unwrap();
871 let start = start_time
872 .naive_utc()
873 .format("%Y-%m-%d %H:%M:%S%.6f")
874 .to_string();
875 let end = end_time
876 .naive_utc()
877 .format("%Y-%m-%d %H:%M:%S%.6f")
878 .to_string();
879
880 let placeholders = std::iter::repeat("?")
881 .take(services.len())
882 .collect::<Vec<_>>()
883 .join(",");
884 let sql = format!(
885 "SELECT timestamp::VARCHAR, service, name, metric_type, value, unit, attributes
886 FROM metrics
887 WHERE timestamp BETWEEN ?::TIMESTAMP AND ?::TIMESTAMP
888 AND service IN ({placeholders})
889 ORDER BY timestamp ASC
890 LIMIT 500"
891 );
892 let mut stmt = conn.prepare(&sql)?;
893 let mut params_vec: Vec<&dyn duckdb::ToSql> =
894 vec![&start as &dyn duckdb::ToSql, &end as &dyn duckdb::ToSql];
895 for s in &services {
896 params_vec.push(s as &dyn duckdb::ToSql);
897 }
898 let rows = stmt.query_map(params_vec.as_slice(), |row| {
899 let ts_str: String = row.get(0)?;
900 let mt_str: String = row.get(3)?;
901 let attrs_str: String = row.get(6)?;
902 Ok(MetricPoint {
903 timestamp: parse_timestamp(&ts_str),
904 service: row.get(1)?,
905 name: row.get(2)?,
906 metric_type: MetricType::from_str(&mt_str),
907 value: row.get(4)?,
908 unit: row.get(5)?,
909 attributes: serde_json::from_str(&attrs_str).unwrap_or_default(),
910 })
911 })?;
912 let mut out = Vec::new();
913 for r in rows {
914 out.push(r?);
915 }
916 out
917 };
918
919 Ok(Some(CorrelateReport {
920 trace_id: trace_id.to_string(),
921 span_count: spans.len(),
922 services,
923 start_time: start_time.to_rfc3339(),
924 end_time: end_time.to_rfc3339(),
925 duration_ms,
926 error_count,
927 logs,
928 metrics,
929 }))
930 }
931}
932
933impl DuckDbStore {
934 pub fn query_sql(&self, sql: &str) -> Result<Vec<serde_json::Value>> {
939 let trimmed = sql.trim_start();
940 let head = trimmed
941 .split_whitespace()
942 .next()
943 .unwrap_or("")
944 .to_ascii_uppercase();
945 if head != "SELECT" && head != "WITH" {
946 anyhow::bail!("only read-only SELECT/WITH queries are allowed");
947 }
948
949 let conn = self.conn.lock().unwrap();
950 let mut stmt = conn.prepare(sql)?;
951 let mut rows = stmt.query([])?;
954 let column_names: Vec<String> = match rows.as_ref() {
955 Some(s) => (0..s.column_count())
956 .map(|i| s.column_name(i).cloned().unwrap_or_else(|_| "?".to_string()))
957 .collect(),
958 None => Vec::new(),
959 };
960
961 let mut out = Vec::new();
962 while let Some(row) = rows.next()? {
963 let mut obj = serde_json::Map::with_capacity(column_names.len());
964 for (i, name) in column_names.iter().enumerate() {
965 let value: duckdb::types::Value = row.get(i)?;
966 obj.insert(name.clone(), duck_value_to_json(value));
967 }
968 out.push(serde_json::Value::Object(obj));
969 }
970 Ok(out)
971 }
972}
973
974fn duck_value_to_json(v: duckdb::types::Value) -> serde_json::Value {
977 use duckdb::types::Value as V;
978 use serde_json::Value as J;
979 match v {
980 V::Null => J::Null,
981 V::Boolean(b) => J::Bool(b),
982 V::TinyInt(n) => J::from(n),
983 V::SmallInt(n) => J::from(n),
984 V::Int(n) => J::from(n),
985 V::BigInt(n) => J::from(n),
986 V::UTinyInt(n) => J::from(n),
987 V::USmallInt(n) => J::from(n),
988 V::UInt(n) => J::from(n),
989 V::UBigInt(n) => J::from(n),
990 V::Float(f) => serde_json::Number::from_f64(f as f64).map(J::Number).unwrap_or(J::Null),
991 V::Double(f) => serde_json::Number::from_f64(f).map(J::Number).unwrap_or(J::Null),
992 V::Text(s) => J::String(s),
993 other => J::String(format!("{other:?}")),
994 }
995}
996
997impl Store for DuckDbStore {
1001 fn insert_spans(&self, spans: &[Span]) -> Result<()> {
1002 DuckDbStore::insert_spans(self, spans)
1003 }
1004 fn query_traces(&self, query: &TraceQuery) -> Result<Vec<Span>> {
1005 DuckDbStore::query_traces(self, query)
1006 }
1007 fn get_trace(&self, trace_id: &str) -> Result<Vec<Span>> {
1008 DuckDbStore::get_trace(self, trace_id)
1009 }
1010 fn add_comment(
1011 &self,
1012 trace_id: &str,
1013 span_id: Option<&str>,
1014 author: &str,
1015 body: &str,
1016 ) -> Result<TraceComment> {
1017 DuckDbStore::add_comment(self, trace_id, span_id, author, body)
1018 }
1019 fn get_comments(&self, trace_id: &str) -> Result<Vec<TraceComment>> {
1020 DuckDbStore::get_comments(self, trace_id)
1021 }
1022 fn list_services(&self) -> Result<Vec<ServiceInfo>> {
1023 DuckDbStore::list_services(self)
1024 }
1025 fn insert_logs(&self, logs: &[LogRecord]) -> Result<()> {
1026 DuckDbStore::insert_logs(self, logs)
1027 }
1028 fn query_logs(&self, query: &LogQuery) -> Result<Vec<LogRecord>> {
1029 DuckDbStore::query_logs(self, query)
1030 }
1031 fn insert_metrics(&self, metrics: &[MetricPoint]) -> Result<()> {
1032 DuckDbStore::insert_metrics(self, metrics)
1033 }
1034 fn query_metrics(&self, query: &MetricQuery) -> Result<Vec<MetricPoint>> {
1035 DuckDbStore::query_metrics(self, query)
1036 }
1037 fn query_summary(&self, last_seconds: i64, service: Option<&str>) -> Result<SummaryReport> {
1038 DuckDbStore::query_summary(self, last_seconds, service)
1039 }
1040 fn query_anomalies(
1041 &self,
1042 current_seconds: i64,
1043 baseline_seconds: i64,
1044 service: Option<&str>,
1045 ) -> Result<AnomalyReport> {
1046 DuckDbStore::query_anomalies(self, current_seconds, baseline_seconds, service)
1047 }
1048 fn query_correlate(&self, trace_id: &str) -> Result<Option<CorrelateReport>> {
1049 DuckDbStore::query_correlate(self, trace_id)
1050 }
1051 fn query_sql(&self, sql: &str) -> Result<Vec<serde_json::Value>> {
1052 DuckDbStore::query_sql(self, sql)
1053 }
1054}
1055
1056#[cfg(test)]
1057mod tests {
1058 use super::*;
1059 use std::collections::HashMap;
1060
1061 fn span(trace_id: &str, attrs: &[(&str, &str)]) -> Span {
1062 let now = Utc::now();
1063 let mut attributes = HashMap::new();
1064 for (k, v) in attrs {
1065 attributes.insert((*k).to_string(), (*v).to_string());
1066 }
1067 Span {
1068 trace_id: trace_id.into(),
1069 span_id: format!("{trace_id}-span"),
1070 parent_span_id: None,
1071 service: "svc".into(),
1072 operation: "op".into(),
1073 start_time: now,
1074 end_time: now,
1075 duration_ms: 1.0,
1076 status: SpanStatus::Ok,
1077 attributes,
1078 events: vec![],
1079 kind: SpanKind::Internal,
1080 llm: None,
1081 }
1082 }
1083
1084 fn store() -> DuckDbStore {
1085 let dir = tempfile::tempdir().unwrap();
1086 DuckDbStore::new(dir.path().to_str().unwrap()).unwrap()
1087 }
1088
1089 #[test]
1090 fn attribute_filter_matches_dotted_key() {
1091 let s = store();
1092 s.insert_spans(&[
1093 span("a", &[("http.method", "GET"), ("http.status_code", "200")]),
1094 span("b", &[("http.method", "POST"), ("http.status_code", "500")]),
1095 ])
1096 .unwrap();
1097
1098 let q = TraceQuery {
1099 attributes: vec![("http.method".into(), "GET".into())],
1100 ..Default::default()
1101 };
1102 let got = s.query_traces(&q).unwrap();
1103 assert_eq!(got.len(), 1);
1104 assert_eq!(got[0].trace_id, "a");
1105 }
1106
1107 #[test]
1108 fn attribute_filters_are_anded() {
1109 let s = store();
1110 s.insert_spans(&[
1111 span("a", &[("http.method", "GET"), ("http.status_code", "200")]),
1112 span("b", &[("http.method", "GET"), ("http.status_code", "500")]),
1113 ])
1114 .unwrap();
1115
1116 let q = TraceQuery {
1117 attributes: vec![
1118 ("http.method".into(), "GET".into()),
1119 ("http.status_code".into(), "500".into()),
1120 ],
1121 ..Default::default()
1122 };
1123 let got = s.query_traces(&q).unwrap();
1124 assert_eq!(got.len(), 1);
1125 assert_eq!(got[0].trace_id, "b");
1126 }
1127
1128 #[test]
1129 fn attribute_filter_misses_when_value_differs() {
1130 let s = store();
1131 s.insert_spans(&[span("a", &[("env", "prod")])]).unwrap();
1132
1133 let q = TraceQuery {
1134 attributes: vec![("env".into(), "staging".into())],
1135 ..Default::default()
1136 };
1137 assert!(s.query_traces(&q).unwrap().is_empty());
1138 }
1139
1140 #[test]
1141 fn llm_span_round_trips() {
1142 use super::super::models::{LlmOperation, LlmSpan};
1143
1144 let s = store();
1145 let mut sp = span("llm", &[]);
1146 sp.kind = SpanKind::Llm;
1147 sp.llm = Some(LlmSpan {
1148 provider: "anthropic".into(),
1149 model: "claude-opus-4-7".into(),
1150 operation: LlmOperation::Chat,
1151 input_tokens: Some(1200),
1152 output_tokens: Some(340),
1153 total_tokens: Some(1540),
1154 cost_usd: Some(0.0185),
1155 ..Default::default()
1156 });
1157 s.insert_spans(&[sp]).unwrap();
1158
1159 let got = s.get_trace("llm").unwrap();
1160 assert_eq!(got.len(), 1);
1161 assert_eq!(got[0].kind, SpanKind::Llm);
1162 let llm = got[0].llm.as_ref().expect("llm extension preserved");
1163 assert_eq!(llm.model, "claude-opus-4-7");
1164 assert_eq!(llm.total_tokens, Some(1540));
1165 assert_eq!(llm.cost_usd, Some(0.0185));
1166 assert_eq!(llm.operation, LlmOperation::Chat);
1167 }
1168
1169 #[test]
1170 fn non_llm_span_has_no_extension_after_round_trip() {
1171 let s = store();
1172 s.insert_spans(&[span("plain", &[("k", "v")])]).unwrap();
1173 let got = s.get_trace("plain").unwrap();
1174 assert_eq!(got.len(), 1);
1175 assert_eq!(got[0].kind, SpanKind::Internal);
1176 assert!(got[0].llm.is_none());
1177 }
1178
1179 #[test]
1180 fn query_sql_reads_and_rejects_mutations() {
1181 let s = store();
1182 s.insert_spans(&[
1183 span("a", &[("env", "prod")]),
1184 span("b", &[("env", "prod")]),
1185 ])
1186 .unwrap();
1187
1188 let rows = s
1189 .query_sql("SELECT service, COUNT(*) AS n FROM spans GROUP BY service")
1190 .unwrap();
1191 assert_eq!(rows.len(), 1);
1192 assert_eq!(rows[0]["service"], serde_json::json!("svc"));
1193 assert_eq!(rows[0]["n"], serde_json::json!(2));
1194
1195 assert!(s.query_sql("DELETE FROM spans").is_err());
1197 assert!(s.query_sql("DROP TABLE spans").is_err());
1198 }
1199
1200 #[test]
1201 fn log_body_sha256_round_trips() {
1202 let s = store();
1203 let log = LogRecord {
1204 timestamp: Utc::now(),
1205 observed_timestamp: Utc::now(),
1206 trace_id: Some("t1".into()),
1207 span_id: None,
1208 severity: LogSeverity::Error,
1209 severity_text: "ERROR".into(),
1210 body: String::new(), service: "svc".into(),
1212 attributes: HashMap::new(),
1213 body_sha256: Some("abc123".into()),
1214 };
1215 s.insert_logs(&[log]).unwrap();
1216
1217 let got = s
1218 .query_logs(&LogQuery {
1219 trace_id: Some("t1".into()),
1220 ..Default::default()
1221 })
1222 .unwrap();
1223 assert_eq!(got.len(), 1);
1224 assert_eq!(got[0].body_sha256.as_deref(), Some("abc123"));
1225 assert!(got[0].body.is_empty());
1226 }
1227}