Skip to main content

tael_server/storage/
duckdb_store.rs

1use std::path::Path;
2use std::sync::Mutex;
3
4use anyhow::Result;
5use chrono::{DateTime, NaiveDateTime, Utc};
6use duckdb::{params, Connection};
7
8use super::Store;
9use super::models::{
10    Anomaly, AnomalyReport, CorrelateReport, ErrorOperation, LogQuery, LogRecord, LogSeverity,
11    LogSummary, MetricPoint, MetricQuery, MetricSummary, MetricType, ServiceInfo, ServiceSummary,
12    Span, SpanKind, SpanStatus, SummaryReport, TraceComment, TraceQuery, TraceSummary,
13};
14
15/// Build a JSON path expression for a single top-level attribute key.
16/// Quoting `$."<key>"` lets dotted keys (e.g. `http.method`) work, since the
17/// unquoted form `$.http.method` would walk into a nested object.
18fn json_path_for_key(key: &str) -> String {
19    let escaped = key.replace('\\', r"\\").replace('"', r#"\""#);
20    format!(r#"$."{escaped}""#)
21}
22
23fn parse_timestamp(s: &str) -> DateTime<Utc> {
24    NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f")
25        .or_else(|_| NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S"))
26        .map(|dt| dt.and_utc())
27        .unwrap_or_default()
28}
29
30fn row_to_span(row: &duckdb::Row<'_>) -> duckdb::Result<Span> {
31    let attrs_str: String = row.get(9)?;
32    let events_str: String = row.get(10)?;
33    let status_str: String = row.get(8)?;
34    let parent: Option<String> = row.get(2)?;
35    let start_str: String = row.get(5)?;
36    let end_str: String = row.get(6)?;
37    let kind_str: Option<String> = row.get(11)?;
38    let llm_str: Option<String> = row.get(12)?;
39
40    Ok(Span {
41        trace_id: row.get(0)?,
42        span_id: row.get(1)?,
43        parent_span_id: if parent.as_deref() == Some("") {
44            None
45        } else {
46            parent
47        },
48        service: row.get(3)?,
49        operation: row.get(4)?,
50        start_time: parse_timestamp(&start_str),
51        end_time: parse_timestamp(&end_str),
52        duration_ms: row.get(7)?,
53        status: SpanStatus::from_str(&status_str),
54        attributes: serde_json::from_str(&attrs_str).unwrap_or_default(),
55        events: serde_json::from_str(&events_str).unwrap_or_default(),
56        kind: kind_str
57            .map(|s| SpanKind::from_str(&s))
58            .unwrap_or(SpanKind::Internal),
59        llm: llm_str.and_then(|s| serde_json::from_str(&s).ok()),
60    })
61}
62
63pub struct DuckDbStore {
64    conn: Mutex<Connection>,
65}
66
67impl DuckDbStore {
68    pub fn new(data_dir: &str) -> Result<Self> {
69        std::fs::create_dir_all(data_dir)?;
70        let db_path = Path::new(data_dir).join("tael.duckdb");
71        let conn = Connection::open(db_path)?;
72        let store = Self {
73            conn: Mutex::new(conn),
74        };
75        store.init_schema()?;
76        Ok(store)
77    }
78
79    fn init_schema(&self) -> Result<()> {
80        let conn = self.conn.lock().unwrap();
81        conn.execute_batch(
82            "
83            CREATE TABLE IF NOT EXISTS spans (
84                trace_id       VARCHAR NOT NULL,
85                span_id        VARCHAR NOT NULL,
86                parent_span_id VARCHAR,
87                service        VARCHAR NOT NULL,
88                operation      VARCHAR NOT NULL,
89                start_time     TIMESTAMP NOT NULL,
90                end_time       TIMESTAMP NOT NULL,
91                duration_ms    DOUBLE NOT NULL,
92                status         VARCHAR NOT NULL DEFAULT 'unset',
93                attributes     JSON,
94                events         JSON,
95                kind           VARCHAR NOT NULL DEFAULT 'internal',
96                llm            JSON,
97                PRIMARY KEY (trace_id, span_id)
98            );
99
100            -- Migrate pre-existing databases created before the LLM columns.
101            -- DuckDB's ALTER ... ADD COLUMN rejects constraints, so these are
102            -- added nullable; fresh tables get the constraints from CREATE above,
103            -- and migrated NULL `kind` values are read back as 'internal'.
104            ALTER TABLE spans ADD COLUMN IF NOT EXISTS kind VARCHAR;
105            ALTER TABLE spans ADD COLUMN IF NOT EXISTS llm JSON;
106
107            CREATE INDEX IF NOT EXISTS idx_spans_service ON spans(service);
108            CREATE INDEX IF NOT EXISTS idx_spans_start_time ON spans(start_time);
109            CREATE INDEX IF NOT EXISTS idx_spans_trace_id ON spans(trace_id);
110            CREATE INDEX IF NOT EXISTS idx_spans_status ON spans(status);
111            CREATE INDEX IF NOT EXISTS idx_spans_kind ON spans(kind);
112
113            CREATE TABLE IF NOT EXISTS trace_comments (
114                id         VARCHAR NOT NULL PRIMARY KEY,
115                trace_id   VARCHAR NOT NULL,
116                span_id    VARCHAR,
117                author     VARCHAR NOT NULL,
118                body       VARCHAR NOT NULL,
119                created_at TIMESTAMP NOT NULL DEFAULT current_timestamp::TIMESTAMP
120            );
121
122            CREATE INDEX IF NOT EXISTS idx_comments_trace_id ON trace_comments(trace_id);
123
124            CREATE TABLE IF NOT EXISTS logs (
125                timestamp          TIMESTAMP NOT NULL,
126                observed_timestamp TIMESTAMP NOT NULL,
127                trace_id           VARCHAR,
128                span_id            VARCHAR,
129                severity           VARCHAR NOT NULL DEFAULT 'unspecified',
130                severity_text      VARCHAR NOT NULL DEFAULT '',
131                body               VARCHAR NOT NULL DEFAULT '',
132                service            VARCHAR NOT NULL,
133                attributes         JSON,
134                body_sha256        VARCHAR
135            );
136
137            -- Migrate pre-existing databases (nullable, no constraint).
138            ALTER TABLE logs ADD COLUMN IF NOT EXISTS body_sha256 VARCHAR;
139
140            CREATE INDEX IF NOT EXISTS idx_logs_service ON logs(service);
141            CREATE INDEX IF NOT EXISTS idx_logs_timestamp ON logs(timestamp);
142            CREATE INDEX IF NOT EXISTS idx_logs_severity ON logs(severity);
143            CREATE INDEX IF NOT EXISTS idx_logs_trace_id ON logs(trace_id);
144
145            CREATE TABLE IF NOT EXISTS metrics (
146                timestamp   TIMESTAMP NOT NULL,
147                service     VARCHAR NOT NULL,
148                name        VARCHAR NOT NULL,
149                metric_type VARCHAR NOT NULL DEFAULT 'unknown',
150                value       DOUBLE NOT NULL,
151                unit        VARCHAR NOT NULL DEFAULT '',
152                attributes  JSON
153            );
154
155            CREATE INDEX IF NOT EXISTS idx_metrics_service ON metrics(service);
156            CREATE INDEX IF NOT EXISTS idx_metrics_name ON metrics(name);
157            CREATE INDEX IF NOT EXISTS idx_metrics_timestamp ON metrics(timestamp);
158            ",
159        )?;
160        Ok(())
161    }
162
163    pub fn insert_spans(&self, spans: &[Span]) -> Result<()> {
164        let conn = self.conn.lock().unwrap();
165        let mut stmt = conn.prepare(
166            "INSERT OR REPLACE INTO spans
167             (trace_id, span_id, parent_span_id, service, operation,
168              start_time, end_time, duration_ms, status, attributes, events,
169              kind, llm)
170             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
171        )?;
172
173        for span in spans {
174            let attrs = serde_json::to_string(&span.attributes)?;
175            let events = serde_json::to_string(&span.events)?;
176            let llm = span
177                .llm
178                .as_ref()
179                .map(serde_json::to_string)
180                .transpose()?;
181            stmt.execute(params![
182                span.trace_id,
183                span.span_id,
184                span.parent_span_id,
185                span.service,
186                span.operation,
187                span.start_time.format("%Y-%m-%d %H:%M:%S%.6f").to_string(),
188                span.end_time.format("%Y-%m-%d %H:%M:%S%.6f").to_string(),
189                span.duration_ms,
190                span.status.to_string(),
191                attrs,
192                events,
193                span.kind.to_string(),
194                llm,
195            ])?;
196        }
197        Ok(())
198    }
199
200    pub fn query_traces(&self, query: &TraceQuery) -> Result<Vec<Span>> {
201        let conn = self.conn.lock().unwrap();
202
203        let mut sql = String::from(
204            "SELECT trace_id, span_id, parent_span_id, service, operation,
205                    start_time::VARCHAR, end_time::VARCHAR, duration_ms, status, attributes, events,
206                    kind, llm
207             FROM spans WHERE 1=1",
208        );
209        let mut param_values: Vec<Box<dyn duckdb::ToSql>> = Vec::new();
210
211        if let Some(ref svc) = query.service {
212            sql.push_str(" AND service = ?");
213            param_values.push(Box::new(svc.clone()));
214        }
215        if let Some(ref op) = query.operation {
216            sql.push_str(" AND operation LIKE ?");
217            param_values.push(Box::new(format!("%{op}%")));
218        }
219        if let Some(min) = query.min_duration_ms {
220            sql.push_str(" AND duration_ms >= ?");
221            param_values.push(Box::new(min));
222        }
223        if let Some(max) = query.max_duration_ms {
224            sql.push_str(" AND duration_ms <= ?");
225            param_values.push(Box::new(max));
226        }
227        if let Some(ref status) = query.status {
228            sql.push_str(" AND status = ?");
229            param_values.push(Box::new(status.clone()));
230        }
231        if let Some(secs) = query.last_seconds {
232            sql.push_str(&format!(" AND start_time >= current_timestamp::TIMESTAMP - INTERVAL '{secs} seconds'"));
233        }
234        for (k, v) in &query.attributes {
235            sql.push_str(" AND json_extract_string(attributes, ?) = ?");
236            param_values.push(Box::new(json_path_for_key(k)));
237            param_values.push(Box::new(v.clone()));
238        }
239
240        sql.push_str(" ORDER BY start_time DESC");
241
242        let limit = query.limit.unwrap_or(100);
243        sql.push_str(&format!(" LIMIT {limit}"));
244
245        let params_ref: Vec<&dyn duckdb::ToSql> = param_values.iter().map(|p| p.as_ref()).collect();
246        let mut stmt = conn.prepare(&sql)?;
247        let rows = stmt.query_map(params_ref.as_slice(), row_to_span)?;
248
249        let mut spans = Vec::new();
250        for row in rows {
251            spans.push(row?);
252        }
253        Ok(spans)
254    }
255
256    pub fn get_trace(&self, trace_id: &str) -> Result<Vec<Span>> {
257        let conn = self.conn.lock().unwrap();
258        let mut stmt = conn.prepare(
259            "SELECT trace_id, span_id, parent_span_id, service, operation,
260                    start_time::VARCHAR, end_time::VARCHAR, duration_ms, status, attributes, events,
261                    kind, llm
262             FROM spans
263             WHERE trace_id = ?
264             ORDER BY start_time ASC",
265        )?;
266
267        let rows = stmt.query_map(params![trace_id], row_to_span)?;
268
269        let mut spans = Vec::new();
270        for row in rows {
271            spans.push(row?);
272        }
273        Ok(spans)
274    }
275
276    pub fn add_comment(
277        &self,
278        trace_id: &str,
279        span_id: Option<&str>,
280        author: &str,
281        body: &str,
282    ) -> Result<TraceComment> {
283        let conn = self.conn.lock().unwrap();
284        let id = uuid::Uuid::new_v4().to_string();
285        conn.execute(
286            "INSERT INTO trace_comments (id, trace_id, span_id, author, body)
287             VALUES (?, ?, ?, ?, ?)",
288            params![id, trace_id, span_id, author, body],
289        )?;
290
291        let mut stmt = conn.prepare(
292            "SELECT id, trace_id, span_id, author, body, created_at::VARCHAR
293             FROM trace_comments WHERE id = ?",
294        )?;
295        let comment = stmt.query_row(params![id], |row| {
296            Ok(TraceComment {
297                id: row.get(0)?,
298                trace_id: row.get(1)?,
299                span_id: row.get(2)?,
300                author: row.get(3)?,
301                body: row.get(4)?,
302                created_at: row.get(5)?,
303            })
304        })?;
305        Ok(comment)
306    }
307
308    pub fn get_comments(&self, trace_id: &str) -> Result<Vec<TraceComment>> {
309        let conn = self.conn.lock().unwrap();
310        let mut stmt = conn.prepare(
311            "SELECT id, trace_id, span_id, author, body, created_at::VARCHAR
312             FROM trace_comments
313             WHERE trace_id = ?
314             ORDER BY created_at ASC",
315        )?;
316        let rows = stmt.query_map(params![trace_id], |row| {
317            Ok(TraceComment {
318                id: row.get(0)?,
319                trace_id: row.get(1)?,
320                span_id: row.get(2)?,
321                author: row.get(3)?,
322                body: row.get(4)?,
323                created_at: row.get(5)?,
324            })
325        })?;
326        let mut comments = Vec::new();
327        for row in rows {
328            comments.push(row?);
329        }
330        Ok(comments)
331    }
332
333    pub fn list_services(&self) -> Result<Vec<ServiceInfo>> {
334        let conn = self.conn.lock().unwrap();
335        let mut stmt = conn.prepare(
336            "SELECT service,
337                    COUNT(*) as span_count,
338                    COUNT(DISTINCT trace_id) as trace_count,
339                    AVG(duration_ms) as avg_duration,
340                    SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END)::DOUBLE / COUNT(*)::DOUBLE as error_rate
341             FROM spans
342             GROUP BY service
343             ORDER BY span_count DESC",
344        )?;
345
346        let rows = stmt.query_map([], |row| {
347            Ok(ServiceInfo {
348                name: row.get(0)?,
349                span_count: row.get(1)?,
350                trace_count: row.get(2)?,
351                avg_duration_ms: row.get(3)?,
352                error_rate: row.get(4)?,
353            })
354        })?;
355
356        let mut services = Vec::new();
357        for row in rows {
358            services.push(row?);
359        }
360        Ok(services)
361    }
362
363    // ── Log storage ─────────────────────────────────────────────────
364
365    pub fn insert_logs(&self, logs: &[LogRecord]) -> Result<()> {
366        let conn = self.conn.lock().unwrap();
367        let mut stmt = conn.prepare(
368            "INSERT INTO logs
369             (timestamp, observed_timestamp, trace_id, span_id, severity,
370              severity_text, body, service, attributes, body_sha256)
371             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
372        )?;
373
374        for log in logs {
375            let attrs = serde_json::to_string(&log.attributes)?;
376            stmt.execute(params![
377                log.timestamp.format("%Y-%m-%d %H:%M:%S%.6f").to_string(),
378                log.observed_timestamp.format("%Y-%m-%d %H:%M:%S%.6f").to_string(),
379                log.trace_id,
380                log.span_id,
381                log.severity.to_string(),
382                log.severity_text,
383                log.body,
384                log.service,
385                attrs,
386                log.body_sha256,
387            ])?;
388        }
389        Ok(())
390    }
391
392    pub fn query_logs(&self, query: &LogQuery) -> Result<Vec<LogRecord>> {
393        let conn = self.conn.lock().unwrap();
394
395        let mut sql = String::from(
396            "SELECT timestamp::VARCHAR, observed_timestamp::VARCHAR, trace_id, span_id,
397                    severity, severity_text, body, service, attributes, body_sha256
398             FROM logs WHERE 1=1",
399        );
400        let mut param_values: Vec<Box<dyn duckdb::ToSql>> = Vec::new();
401
402        if let Some(ref svc) = query.service {
403            sql.push_str(" AND service = ?");
404            param_values.push(Box::new(svc.clone()));
405        }
406        if let Some(ref sev) = query.severity {
407            sql.push_str(" AND severity = ?");
408            param_values.push(Box::new(sev.clone()));
409        }
410        if let Some(ref body) = query.body_contains {
411            sql.push_str(" AND body LIKE ?");
412            param_values.push(Box::new(format!("%{body}%")));
413        }
414        if let Some(ref tid) = query.trace_id {
415            sql.push_str(" AND trace_id = ?");
416            param_values.push(Box::new(tid.clone()));
417        }
418        if let Some(secs) = query.last_seconds {
419            sql.push_str(&format!(
420                " AND timestamp >= current_timestamp::TIMESTAMP - INTERVAL '{secs} seconds'"
421            ));
422        }
423
424        sql.push_str(" ORDER BY timestamp DESC");
425
426        let limit = query.limit.unwrap_or(100);
427        sql.push_str(&format!(" LIMIT {limit}"));
428
429        let params_ref: Vec<&dyn duckdb::ToSql> =
430            param_values.iter().map(|p| p.as_ref()).collect();
431        let mut stmt = conn.prepare(&sql)?;
432        let rows = stmt.query_map(params_ref.as_slice(), |row| {
433            let ts_str: String = row.get(0)?;
434            let obs_str: String = row.get(1)?;
435            let attrs_str: String = row.get(8)?;
436            let severity_str: String = row.get(4)?;
437
438            Ok(LogRecord {
439                timestamp: parse_timestamp(&ts_str),
440                observed_timestamp: parse_timestamp(&obs_str),
441                trace_id: row.get(2)?,
442                span_id: row.get(3)?,
443                severity: LogSeverity::from_str(&severity_str),
444                severity_text: row.get(5)?,
445                body: row.get(6)?,
446                service: row.get(7)?,
447                attributes: serde_json::from_str(&attrs_str).unwrap_or_default(),
448                body_sha256: row.get(9)?,
449            })
450        })?;
451
452        let mut logs = Vec::new();
453        for row in rows {
454            logs.push(row?);
455        }
456        Ok(logs)
457    }
458
459    // ── Metric storage ──────────────────────────────────────────────
460
461    pub fn insert_metrics(&self, metrics: &[MetricPoint]) -> Result<()> {
462        let conn = self.conn.lock().unwrap();
463        let mut stmt = conn.prepare(
464            "INSERT INTO metrics
465             (timestamp, service, name, metric_type, value, unit, attributes)
466             VALUES (?, ?, ?, ?, ?, ?, ?)",
467        )?;
468
469        for m in metrics {
470            let attrs = serde_json::to_string(&m.attributes)?;
471            stmt.execute(params![
472                m.timestamp.format("%Y-%m-%d %H:%M:%S%.6f").to_string(),
473                m.service,
474                m.name,
475                m.metric_type.to_string(),
476                m.value,
477                m.unit,
478                attrs,
479            ])?;
480        }
481        Ok(())
482    }
483
484    pub fn query_metrics(&self, query: &MetricQuery) -> Result<Vec<MetricPoint>> {
485        let conn = self.conn.lock().unwrap();
486
487        let mut sql = String::from(
488            "SELECT timestamp::VARCHAR, service, name, metric_type, value, unit, attributes
489             FROM metrics WHERE 1=1",
490        );
491        let mut param_values: Vec<Box<dyn duckdb::ToSql>> = Vec::new();
492
493        if let Some(ref svc) = query.service {
494            sql.push_str(" AND service = ?");
495            param_values.push(Box::new(svc.clone()));
496        }
497        if let Some(ref name) = query.name {
498            sql.push_str(" AND name = ?");
499            param_values.push(Box::new(name.clone()));
500        }
501        if let Some(ref mt) = query.metric_type {
502            sql.push_str(" AND metric_type = ?");
503            param_values.push(Box::new(mt.clone()));
504        }
505        if let Some(secs) = query.last_seconds {
506            sql.push_str(&format!(
507                " AND timestamp >= current_timestamp::TIMESTAMP - INTERVAL '{secs} seconds'"
508            ));
509        }
510
511        sql.push_str(" ORDER BY timestamp DESC");
512
513        let limit = query.limit.unwrap_or(500);
514        sql.push_str(&format!(" LIMIT {limit}"));
515
516        let params_ref: Vec<&dyn duckdb::ToSql> =
517            param_values.iter().map(|p| p.as_ref()).collect();
518        let mut stmt = conn.prepare(&sql)?;
519        let rows = stmt.query_map(params_ref.as_slice(), |row| {
520            let ts_str: String = row.get(0)?;
521            let mt_str: String = row.get(3)?;
522            let attrs_str: String = row.get(6)?;
523
524            Ok(MetricPoint {
525                timestamp: parse_timestamp(&ts_str),
526                service: row.get(1)?,
527                name: row.get(2)?,
528                metric_type: MetricType::from_str(&mt_str),
529                value: row.get(4)?,
530                unit: row.get(5)?,
531                attributes: serde_json::from_str(&attrs_str).unwrap_or_default(),
532            })
533        })?;
534
535        let mut metrics = Vec::new();
536        for row in rows {
537            metrics.push(row?);
538        }
539        Ok(metrics)
540    }
541
542    pub fn query_summary(
543        &self,
544        last_seconds: i64,
545        service: Option<&str>,
546    ) -> Result<SummaryReport> {
547        let conn = self.conn.lock().unwrap();
548
549        let span_time_clause = format!(
550            "start_time >= current_timestamp::TIMESTAMP - INTERVAL '{last_seconds} seconds'"
551        );
552        let ts_time_clause = format!(
553            "timestamp >= current_timestamp::TIMESTAMP - INTERVAL '{last_seconds} seconds'"
554        );
555
556        let svc_clause = if service.is_some() { " AND service = ?" } else { "" };
557        let svc_owned = service.map(|s| s.to_string());
558        let build_params = || -> Vec<&dyn duckdb::ToSql> {
559            match &svc_owned {
560                Some(s) => vec![s as &dyn duckdb::ToSql],
561                None => vec![],
562            }
563        };
564
565        // ── Traces aggregate ─────────────────────────────────────────
566        let traces_sql = format!(
567            "SELECT
568                COUNT(*)::BIGINT,
569                COUNT(DISTINCT trace_id)::BIGINT,
570                SUM(CASE WHEN status='error' THEN 1 ELSE 0 END)::BIGINT,
571                COALESCE(AVG(duration_ms), 0.0),
572                COALESCE(MAX(duration_ms), 0.0),
573                COALESCE(quantile_cont(duration_ms, 0.5), 0.0),
574                COALESCE(quantile_cont(duration_ms, 0.95), 0.0),
575                COALESCE(quantile_cont(duration_ms, 0.99), 0.0)
576             FROM spans WHERE {span_time_clause}{svc_clause}"
577        );
578        let mut stmt = conn.prepare(&traces_sql)?;
579        let traces: TraceSummary = stmt.query_row(build_params().as_slice(), |row| {
580            let span_count: i64 = row.get::<_, Option<i64>>(0)?.unwrap_or(0);
581            let error_count: i64 = row.get::<_, Option<i64>>(2)?.unwrap_or(0);
582            let error_rate = if span_count > 0 {
583                error_count as f64 / span_count as f64
584            } else {
585                0.0
586            };
587            Ok(TraceSummary {
588                span_count,
589                trace_count: row.get::<_, Option<i64>>(1)?.unwrap_or(0),
590                error_count,
591                error_rate,
592                avg_ms: row.get(3)?,
593                max_ms: row.get(4)?,
594                p50_ms: row.get(5)?,
595                p95_ms: row.get(6)?,
596                p99_ms: row.get(7)?,
597            })
598        })?;
599        drop(stmt);
600
601        // ── Top services by span count ───────────────────────────────
602        let top_svc_sql = format!(
603            "SELECT service,
604                    COUNT(*)::BIGINT,
605                    SUM(CASE WHEN status='error' THEN 1 ELSE 0 END)::DOUBLE / NULLIF(COUNT(*), 0)::DOUBLE,
606                    COALESCE(quantile_cont(duration_ms, 0.95), 0.0)
607             FROM spans WHERE {span_time_clause}{svc_clause}
608             GROUP BY service
609             ORDER BY 2 DESC
610             LIMIT 5"
611        );
612        let mut stmt = conn.prepare(&top_svc_sql)?;
613        let top_services: Vec<ServiceSummary> = stmt
614            .query_map(build_params().as_slice(), |row| {
615                Ok(ServiceSummary {
616                    service: row.get(0)?,
617                    span_count: row.get(1)?,
618                    error_rate: row.get::<_, Option<f64>>(2)?.unwrap_or(0.0),
619                    p95_ms: row.get(3)?,
620                })
621            })?
622            .collect::<duckdb::Result<_>>()?;
623        drop(stmt);
624
625        // ── Top error operations ─────────────────────────────────────
626        let err_op_sql = format!(
627            "SELECT service, operation, COUNT(*)::BIGINT
628             FROM spans
629             WHERE status='error' AND {span_time_clause}{svc_clause}
630             GROUP BY service, operation
631             ORDER BY 3 DESC
632             LIMIT 5"
633        );
634        let mut stmt = conn.prepare(&err_op_sql)?;
635        let top_error_operations: Vec<ErrorOperation> = stmt
636            .query_map(build_params().as_slice(), |row| {
637                Ok(ErrorOperation {
638                    service: row.get(0)?,
639                    operation: row.get(1)?,
640                    error_count: row.get(2)?,
641                })
642            })?
643            .collect::<duckdb::Result<_>>()?;
644        drop(stmt);
645
646        // ── Logs aggregate ───────────────────────────────────────────
647        let logs_sql = format!(
648            "SELECT
649                COUNT(*)::BIGINT,
650                SUM(CASE WHEN severity IN ('error','fatal') THEN 1 ELSE 0 END)::BIGINT,
651                SUM(CASE WHEN severity='warn' THEN 1 ELSE 0 END)::BIGINT,
652                SUM(CASE WHEN severity='info' THEN 1 ELSE 0 END)::BIGINT,
653                SUM(CASE WHEN severity IN ('debug','trace') THEN 1 ELSE 0 END)::BIGINT
654             FROM logs WHERE {ts_time_clause}{svc_clause}"
655        );
656        let mut stmt = conn.prepare(&logs_sql)?;
657        let logs: LogSummary = stmt.query_row(build_params().as_slice(), |row| {
658            Ok(LogSummary {
659                total: row.get::<_, Option<i64>>(0)?.unwrap_or(0),
660                error: row.get::<_, Option<i64>>(1)?.unwrap_or(0),
661                warn: row.get::<_, Option<i64>>(2)?.unwrap_or(0),
662                info: row.get::<_, Option<i64>>(3)?.unwrap_or(0),
663                debug: row.get::<_, Option<i64>>(4)?.unwrap_or(0),
664            })
665        })?;
666        drop(stmt);
667
668        // ── Metrics aggregate ────────────────────────────────────────
669        let metrics_sql = format!(
670            "SELECT COUNT(*)::BIGINT, COUNT(DISTINCT name)::BIGINT
671             FROM metrics WHERE {ts_time_clause}{svc_clause}"
672        );
673        let mut stmt = conn.prepare(&metrics_sql)?;
674        let metrics: MetricSummary = stmt.query_row(build_params().as_slice(), |row| {
675            Ok(MetricSummary {
676                point_count: row.get::<_, Option<i64>>(0)?.unwrap_or(0),
677                unique_names: row.get::<_, Option<i64>>(1)?.unwrap_or(0),
678            })
679        })?;
680        drop(stmt);
681
682        Ok(SummaryReport {
683            window_seconds: last_seconds,
684            service_filter: service.map(|s| s.to_string()),
685            traces,
686            top_services,
687            top_error_operations,
688            logs,
689            metrics,
690        })
691    }
692
693    pub fn query_anomalies(
694        &self,
695        current_seconds: i64,
696        baseline_seconds: i64,
697        service: Option<&str>,
698    ) -> Result<AnomalyReport> {
699        let conn = self.conn.lock().unwrap();
700        let svc_owned = service.map(|s| s.to_string());
701        let svc_clause = if service.is_some() { " AND service = ?" } else { "" };
702
703        // Per-service stats for a window whose end = now and start = now - window.
704        let stats_sql = |window: i64| -> String {
705            format!(
706                "SELECT service,
707                        COUNT(*)::BIGINT AS span_count,
708                        SUM(CASE WHEN status='error' THEN 1 ELSE 0 END)::DOUBLE
709                            / NULLIF(COUNT(*), 0)::DOUBLE AS error_rate,
710                        COALESCE(quantile_cont(duration_ms, 0.95), 0.0) AS p95_ms
711                 FROM spans
712                 WHERE start_time >= current_timestamp::TIMESTAMP - INTERVAL '{window} seconds'{svc_clause}
713                 GROUP BY service"
714            )
715        };
716
717        let build_params = || -> Vec<&dyn duckdb::ToSql> {
718            match &svc_owned {
719                Some(s) => vec![s as &dyn duckdb::ToSql],
720                None => vec![],
721            }
722        };
723
724        let mut current_stats: std::collections::HashMap<String, (i64, f64, f64)> =
725            std::collections::HashMap::new();
726        let mut stmt = conn.prepare(&stats_sql(current_seconds))?;
727        let rows = stmt.query_map(build_params().as_slice(), |row| {
728            Ok((
729                row.get::<_, String>(0)?,
730                row.get::<_, i64>(1)?,
731                row.get::<_, Option<f64>>(2)?.unwrap_or(0.0),
732                row.get::<_, f64>(3)?,
733            ))
734        })?;
735        for r in rows {
736            let (svc, span_count, error_rate, p95) = r?;
737            current_stats.insert(svc, (span_count, error_rate, p95));
738        }
739        drop(stmt);
740
741        let mut baseline_stats: std::collections::HashMap<String, (i64, f64, f64)> =
742            std::collections::HashMap::new();
743        let mut stmt = conn.prepare(&stats_sql(baseline_seconds))?;
744        let rows = stmt.query_map(build_params().as_slice(), |row| {
745            Ok((
746                row.get::<_, String>(0)?,
747                row.get::<_, i64>(1)?,
748                row.get::<_, Option<f64>>(2)?.unwrap_or(0.0),
749                row.get::<_, f64>(3)?,
750            ))
751        })?;
752        for r in rows {
753            let (svc, span_count, error_rate, p95) = r?;
754            baseline_stats.insert(svc, (span_count, error_rate, p95));
755        }
756        drop(stmt);
757
758        let mut anomalies = Vec::new();
759        for (svc, (cur_count, cur_err, cur_p95)) in &current_stats {
760            if *cur_count < 5 {
761                continue;
762            }
763            let (base_count, base_err, base_p95) =
764                baseline_stats.get(svc).copied().unwrap_or((0, 0.0, 0.0));
765
766            let err_delta = cur_err - base_err;
767            if err_delta >= 0.05 && *cur_err > 0.0 {
768                let severity = if err_delta >= 0.25 {
769                    "critical"
770                } else if err_delta >= 0.10 {
771                    "warning"
772                } else {
773                    "info"
774                };
775                anomalies.push(Anomaly {
776                    service: svc.clone(),
777                    kind: "error_rate".into(),
778                    severity: severity.into(),
779                    current: *cur_err,
780                    baseline: base_err,
781                    delta: err_delta,
782                    description: format!(
783                        "Error rate rose {:.1}% → {:.1}% (baseline {:.1}%, {} spans)",
784                        base_err * 100.0,
785                        cur_err * 100.0,
786                        base_err * 100.0,
787                        cur_count
788                    ),
789                });
790            }
791
792            if base_p95 > 0.0 && base_count >= 5 {
793                let ratio = cur_p95 / base_p95;
794                if ratio >= 1.5 && *cur_p95 > 50.0 {
795                    let severity = if ratio >= 3.0 {
796                        "critical"
797                    } else if ratio >= 2.0 {
798                        "warning"
799                    } else {
800                        "info"
801                    };
802                    anomalies.push(Anomaly {
803                        service: svc.clone(),
804                        kind: "latency_p95".into(),
805                        severity: severity.into(),
806                        current: *cur_p95,
807                        baseline: base_p95,
808                        delta: cur_p95 - base_p95,
809                        description: format!(
810                            "p95 latency {:.1}ms → {:.1}ms ({:.1}× baseline)",
811                            base_p95, cur_p95, ratio
812                        ),
813                    });
814                }
815            }
816        }
817
818        anomalies.sort_by(|a, b| {
819            let rank = |s: &str| match s {
820                "critical" => 0,
821                "warning" => 1,
822                _ => 2,
823            };
824            rank(&a.severity).cmp(&rank(&b.severity))
825        });
826
827        Ok(AnomalyReport {
828            current_seconds,
829            baseline_seconds,
830            service_filter: svc_owned,
831            anomalies,
832        })
833    }
834
835    pub fn query_correlate(&self, trace_id: &str) -> Result<Option<CorrelateReport>> {
836        let spans = self.get_trace(trace_id)?;
837        if spans.is_empty() {
838            return Ok(None);
839        }
840
841        let start_time = spans
842            .iter()
843            .map(|s| s.start_time)
844            .min()
845            .unwrap_or_else(Utc::now);
846        let end_time = spans
847            .iter()
848            .map(|s| s.end_time)
849            .max()
850            .unwrap_or_else(Utc::now);
851        let duration_ms = (end_time - start_time).num_milliseconds() as f64;
852        let error_count = spans
853            .iter()
854            .filter(|s| matches!(s.status, SpanStatus::Error))
855            .count() as i64;
856
857        let mut services: Vec<String> =
858            spans.iter().map(|s| s.service.clone()).collect();
859        services.sort();
860        services.dedup();
861
862        let logs = self.query_logs(&LogQuery {
863            trace_id: Some(trace_id.to_string()),
864            limit: Some(500),
865            ..Default::default()
866        })?;
867
868        // Metrics overlapping the trace's time range, scoped to touched services.
869        let metrics = {
870            let conn = self.conn.lock().unwrap();
871            let start = start_time
872                .naive_utc()
873                .format("%Y-%m-%d %H:%M:%S%.6f")
874                .to_string();
875            let end = end_time
876                .naive_utc()
877                .format("%Y-%m-%d %H:%M:%S%.6f")
878                .to_string();
879
880            let placeholders = std::iter::repeat("?")
881                .take(services.len())
882                .collect::<Vec<_>>()
883                .join(",");
884            let sql = format!(
885                "SELECT timestamp::VARCHAR, service, name, metric_type, value, unit, attributes
886                 FROM metrics
887                 WHERE timestamp BETWEEN ?::TIMESTAMP AND ?::TIMESTAMP
888                   AND service IN ({placeholders})
889                 ORDER BY timestamp ASC
890                 LIMIT 500"
891            );
892            let mut stmt = conn.prepare(&sql)?;
893            let mut params_vec: Vec<&dyn duckdb::ToSql> =
894                vec![&start as &dyn duckdb::ToSql, &end as &dyn duckdb::ToSql];
895            for s in &services {
896                params_vec.push(s as &dyn duckdb::ToSql);
897            }
898            let rows = stmt.query_map(params_vec.as_slice(), |row| {
899                let ts_str: String = row.get(0)?;
900                let mt_str: String = row.get(3)?;
901                let attrs_str: String = row.get(6)?;
902                Ok(MetricPoint {
903                    timestamp: parse_timestamp(&ts_str),
904                    service: row.get(1)?,
905                    name: row.get(2)?,
906                    metric_type: MetricType::from_str(&mt_str),
907                    value: row.get(4)?,
908                    unit: row.get(5)?,
909                    attributes: serde_json::from_str(&attrs_str).unwrap_or_default(),
910                })
911            })?;
912            let mut out = Vec::new();
913            for r in rows {
914                out.push(r?);
915            }
916            out
917        };
918
919        Ok(Some(CorrelateReport {
920            trace_id: trace_id.to_string(),
921            span_count: spans.len(),
922            services,
923            start_time: start_time.to_rfc3339(),
924            end_time: end_time.to_rfc3339(),
925            duration_ms,
926            error_count,
927            logs,
928            metrics,
929        }))
930    }
931}
932
933impl DuckDbStore {
934    /// Run a read-only SQL query and return rows as JSON objects. Only
935    /// `SELECT`/`WITH` statements are allowed — this is an analyst/agent query
936    /// surface over the `spans`/`logs`/`metrics`/`trace_comments` tables, not a
937    /// mutation path.
938    pub fn query_sql(&self, sql: &str) -> Result<Vec<serde_json::Value>> {
939        let trimmed = sql.trim_start();
940        let head = trimmed
941            .split_whitespace()
942            .next()
943            .unwrap_or("")
944            .to_ascii_uppercase();
945        if head != "SELECT" && head != "WITH" {
946            anyhow::bail!("only read-only SELECT/WITH queries are allowed");
947        }
948
949        let conn = self.conn.lock().unwrap();
950        let mut stmt = conn.prepare(sql)?;
951        // Column metadata is only valid after execution, so query first, then
952        // read names from the active statement.
953        let mut rows = stmt.query([])?;
954        let column_names: Vec<String> = match rows.as_ref() {
955            Some(s) => (0..s.column_count())
956                .map(|i| s.column_name(i).cloned().unwrap_or_else(|_| "?".to_string()))
957                .collect(),
958            None => Vec::new(),
959        };
960
961        let mut out = Vec::new();
962        while let Some(row) = rows.next()? {
963            let mut obj = serde_json::Map::with_capacity(column_names.len());
964            for (i, name) in column_names.iter().enumerate() {
965                let value: duckdb::types::Value = row.get(i)?;
966                obj.insert(name.clone(), duck_value_to_json(value));
967            }
968            out.push(serde_json::Value::Object(obj));
969        }
970        Ok(out)
971    }
972}
973
974/// Map a DuckDB dynamic value to JSON. Uncommon types fall back to their debug
975/// string so a query never fails on an exotic column.
976fn duck_value_to_json(v: duckdb::types::Value) -> serde_json::Value {
977    use duckdb::types::Value as V;
978    use serde_json::Value as J;
979    match v {
980        V::Null => J::Null,
981        V::Boolean(b) => J::Bool(b),
982        V::TinyInt(n) => J::from(n),
983        V::SmallInt(n) => J::from(n),
984        V::Int(n) => J::from(n),
985        V::BigInt(n) => J::from(n),
986        V::UTinyInt(n) => J::from(n),
987        V::USmallInt(n) => J::from(n),
988        V::UInt(n) => J::from(n),
989        V::UBigInt(n) => J::from(n),
990        V::Float(f) => serde_json::Number::from_f64(f as f64).map(J::Number).unwrap_or(J::Null),
991        V::Double(f) => serde_json::Number::from_f64(f).map(J::Number).unwrap_or(J::Null),
992        V::Text(s) => J::String(s),
993        other => J::String(format!("{other:?}")),
994    }
995}
996
997/// Forward the inherent `DuckDbStore` methods through the `Store` trait so the
998/// server can depend on `dyn Store`. Bodies live in the inherent impl above so
999/// internal callers and the unit tests keep working unchanged.
1000impl Store for DuckDbStore {
1001    fn insert_spans(&self, spans: &[Span]) -> Result<()> {
1002        DuckDbStore::insert_spans(self, spans)
1003    }
1004    fn query_traces(&self, query: &TraceQuery) -> Result<Vec<Span>> {
1005        DuckDbStore::query_traces(self, query)
1006    }
1007    fn get_trace(&self, trace_id: &str) -> Result<Vec<Span>> {
1008        DuckDbStore::get_trace(self, trace_id)
1009    }
1010    fn add_comment(
1011        &self,
1012        trace_id: &str,
1013        span_id: Option<&str>,
1014        author: &str,
1015        body: &str,
1016    ) -> Result<TraceComment> {
1017        DuckDbStore::add_comment(self, trace_id, span_id, author, body)
1018    }
1019    fn get_comments(&self, trace_id: &str) -> Result<Vec<TraceComment>> {
1020        DuckDbStore::get_comments(self, trace_id)
1021    }
1022    fn list_services(&self) -> Result<Vec<ServiceInfo>> {
1023        DuckDbStore::list_services(self)
1024    }
1025    fn insert_logs(&self, logs: &[LogRecord]) -> Result<()> {
1026        DuckDbStore::insert_logs(self, logs)
1027    }
1028    fn query_logs(&self, query: &LogQuery) -> Result<Vec<LogRecord>> {
1029        DuckDbStore::query_logs(self, query)
1030    }
1031    fn insert_metrics(&self, metrics: &[MetricPoint]) -> Result<()> {
1032        DuckDbStore::insert_metrics(self, metrics)
1033    }
1034    fn query_metrics(&self, query: &MetricQuery) -> Result<Vec<MetricPoint>> {
1035        DuckDbStore::query_metrics(self, query)
1036    }
1037    fn query_summary(&self, last_seconds: i64, service: Option<&str>) -> Result<SummaryReport> {
1038        DuckDbStore::query_summary(self, last_seconds, service)
1039    }
1040    fn query_anomalies(
1041        &self,
1042        current_seconds: i64,
1043        baseline_seconds: i64,
1044        service: Option<&str>,
1045    ) -> Result<AnomalyReport> {
1046        DuckDbStore::query_anomalies(self, current_seconds, baseline_seconds, service)
1047    }
1048    fn query_correlate(&self, trace_id: &str) -> Result<Option<CorrelateReport>> {
1049        DuckDbStore::query_correlate(self, trace_id)
1050    }
1051    fn query_sql(&self, sql: &str) -> Result<Vec<serde_json::Value>> {
1052        DuckDbStore::query_sql(self, sql)
1053    }
1054}
1055
1056#[cfg(test)]
1057mod tests {
1058    use super::*;
1059    use std::collections::HashMap;
1060
1061    fn span(trace_id: &str, attrs: &[(&str, &str)]) -> Span {
1062        let now = Utc::now();
1063        let mut attributes = HashMap::new();
1064        for (k, v) in attrs {
1065            attributes.insert((*k).to_string(), (*v).to_string());
1066        }
1067        Span {
1068            trace_id: trace_id.into(),
1069            span_id: format!("{trace_id}-span"),
1070            parent_span_id: None,
1071            service: "svc".into(),
1072            operation: "op".into(),
1073            start_time: now,
1074            end_time: now,
1075            duration_ms: 1.0,
1076            status: SpanStatus::Ok,
1077            attributes,
1078            events: vec![],
1079            kind: SpanKind::Internal,
1080            llm: None,
1081        }
1082    }
1083
1084    fn store() -> DuckDbStore {
1085        let dir = tempfile::tempdir().unwrap();
1086        DuckDbStore::new(dir.path().to_str().unwrap()).unwrap()
1087    }
1088
1089    #[test]
1090    fn attribute_filter_matches_dotted_key() {
1091        let s = store();
1092        s.insert_spans(&[
1093            span("a", &[("http.method", "GET"), ("http.status_code", "200")]),
1094            span("b", &[("http.method", "POST"), ("http.status_code", "500")]),
1095        ])
1096        .unwrap();
1097
1098        let q = TraceQuery {
1099            attributes: vec![("http.method".into(), "GET".into())],
1100            ..Default::default()
1101        };
1102        let got = s.query_traces(&q).unwrap();
1103        assert_eq!(got.len(), 1);
1104        assert_eq!(got[0].trace_id, "a");
1105    }
1106
1107    #[test]
1108    fn attribute_filters_are_anded() {
1109        let s = store();
1110        s.insert_spans(&[
1111            span("a", &[("http.method", "GET"), ("http.status_code", "200")]),
1112            span("b", &[("http.method", "GET"), ("http.status_code", "500")]),
1113        ])
1114        .unwrap();
1115
1116        let q = TraceQuery {
1117            attributes: vec![
1118                ("http.method".into(), "GET".into()),
1119                ("http.status_code".into(), "500".into()),
1120            ],
1121            ..Default::default()
1122        };
1123        let got = s.query_traces(&q).unwrap();
1124        assert_eq!(got.len(), 1);
1125        assert_eq!(got[0].trace_id, "b");
1126    }
1127
1128    #[test]
1129    fn attribute_filter_misses_when_value_differs() {
1130        let s = store();
1131        s.insert_spans(&[span("a", &[("env", "prod")])]).unwrap();
1132
1133        let q = TraceQuery {
1134            attributes: vec![("env".into(), "staging".into())],
1135            ..Default::default()
1136        };
1137        assert!(s.query_traces(&q).unwrap().is_empty());
1138    }
1139
1140    #[test]
1141    fn llm_span_round_trips() {
1142        use super::super::models::{LlmOperation, LlmSpan};
1143
1144        let s = store();
1145        let mut sp = span("llm", &[]);
1146        sp.kind = SpanKind::Llm;
1147        sp.llm = Some(LlmSpan {
1148            provider: "anthropic".into(),
1149            model: "claude-opus-4-7".into(),
1150            operation: LlmOperation::Chat,
1151            input_tokens: Some(1200),
1152            output_tokens: Some(340),
1153            total_tokens: Some(1540),
1154            cost_usd: Some(0.0185),
1155            ..Default::default()
1156        });
1157        s.insert_spans(&[sp]).unwrap();
1158
1159        let got = s.get_trace("llm").unwrap();
1160        assert_eq!(got.len(), 1);
1161        assert_eq!(got[0].kind, SpanKind::Llm);
1162        let llm = got[0].llm.as_ref().expect("llm extension preserved");
1163        assert_eq!(llm.model, "claude-opus-4-7");
1164        assert_eq!(llm.total_tokens, Some(1540));
1165        assert_eq!(llm.cost_usd, Some(0.0185));
1166        assert_eq!(llm.operation, LlmOperation::Chat);
1167    }
1168
1169    #[test]
1170    fn non_llm_span_has_no_extension_after_round_trip() {
1171        let s = store();
1172        s.insert_spans(&[span("plain", &[("k", "v")])]).unwrap();
1173        let got = s.get_trace("plain").unwrap();
1174        assert_eq!(got.len(), 1);
1175        assert_eq!(got[0].kind, SpanKind::Internal);
1176        assert!(got[0].llm.is_none());
1177    }
1178
1179    #[test]
1180    fn query_sql_reads_and_rejects_mutations() {
1181        let s = store();
1182        s.insert_spans(&[
1183            span("a", &[("env", "prod")]),
1184            span("b", &[("env", "prod")]),
1185        ])
1186        .unwrap();
1187
1188        let rows = s
1189            .query_sql("SELECT service, COUNT(*) AS n FROM spans GROUP BY service")
1190            .unwrap();
1191        assert_eq!(rows.len(), 1);
1192        assert_eq!(rows[0]["service"], serde_json::json!("svc"));
1193        assert_eq!(rows[0]["n"], serde_json::json!(2));
1194
1195        // Mutations are rejected.
1196        assert!(s.query_sql("DELETE FROM spans").is_err());
1197        assert!(s.query_sql("DROP TABLE spans").is_err());
1198    }
1199
1200    #[test]
1201    fn log_body_sha256_round_trips() {
1202        let s = store();
1203        let log = LogRecord {
1204            timestamp: Utc::now(),
1205            observed_timestamp: Utc::now(),
1206            trace_id: Some("t1".into()),
1207            span_id: None,
1208            severity: LogSeverity::Error,
1209            severity_text: "ERROR".into(),
1210            body: String::new(), // offloaded to blob
1211            service: "svc".into(),
1212            attributes: HashMap::new(),
1213            body_sha256: Some("abc123".into()),
1214        };
1215        s.insert_logs(&[log]).unwrap();
1216
1217        let got = s
1218            .query_logs(&LogQuery {
1219                trace_id: Some("t1".into()),
1220                ..Default::default()
1221            })
1222            .unwrap();
1223        assert_eq!(got.len(), 1);
1224        assert_eq!(got[0].body_sha256.as_deref(), Some("abc123"));
1225        assert!(got[0].body.is_empty());
1226    }
1227}