Skip to main content

tael_server/storage/
duckdb_store.rs

1use std::path::Path;
2use std::sync::Mutex;
3
4use anyhow::Result;
5use chrono::{DateTime, NaiveDateTime, Utc};
6use duckdb::{Connection, params};
7
8use super::Store;
9use super::models::{
10    Anomaly, AnomalyReport, CorrelateReport, ErrorOperation, LogQuery, LogRecord, LogSeverity,
11    LogSummary, MetricPoint, MetricQuery, MetricSummary, MetricType, ServiceInfo, ServiceSummary,
12    Span, SpanKind, SpanStatus, SummaryReport, TraceComment, TraceQuery, TraceSummary,
13};
14
15/// Build a JSON path expression for a single top-level attribute key.
16/// Quoting `$."<key>"` lets dotted keys (e.g. `http.method`) work, since the
17/// unquoted form `$.http.method` would walk into a nested object.
18fn json_path_for_key(key: &str) -> String {
19    let escaped = key.replace('\\', r"\\").replace('"', r#"\""#);
20    format!(r#"$."{escaped}""#)
21}
22
23fn parse_timestamp(s: &str) -> DateTime<Utc> {
24    NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f")
25        .or_else(|_| NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S"))
26        .map(|dt| dt.and_utc())
27        .unwrap_or_default()
28}
29
30fn row_to_span(row: &duckdb::Row<'_>) -> duckdb::Result<Span> {
31    let attrs_str: String = row.get(9)?;
32    let events_str: String = row.get(10)?;
33    let status_str: String = row.get(8)?;
34    let parent: Option<String> = row.get(2)?;
35    let start_str: String = row.get(5)?;
36    let end_str: String = row.get(6)?;
37    let kind_str: Option<String> = row.get(11)?;
38    let llm_str: Option<String> = row.get(12)?;
39
40    Ok(Span {
41        trace_id: row.get(0)?,
42        span_id: row.get(1)?,
43        parent_span_id: if parent.as_deref() == Some("") {
44            None
45        } else {
46            parent
47        },
48        service: row.get(3)?,
49        operation: row.get(4)?,
50        start_time: parse_timestamp(&start_str),
51        end_time: parse_timestamp(&end_str),
52        duration_ms: row.get(7)?,
53        status: SpanStatus::from_str(&status_str),
54        attributes: serde_json::from_str(&attrs_str).unwrap_or_default(),
55        events: serde_json::from_str(&events_str).unwrap_or_default(),
56        kind: kind_str
57            .map(|s| SpanKind::from_str(&s))
58            .unwrap_or(SpanKind::Internal),
59        llm: llm_str.and_then(|s| serde_json::from_str(&s).ok()),
60    })
61}
62
63pub struct DuckDbStore {
64    conn: Mutex<Connection>,
65}
66
67impl DuckDbStore {
68    pub fn new(data_dir: &str) -> Result<Self> {
69        std::fs::create_dir_all(data_dir)?;
70        let db_path = Path::new(data_dir).join("tael.duckdb");
71        let conn = Connection::open(db_path)?;
72        let store = Self {
73            conn: Mutex::new(conn),
74        };
75        store.init_schema()?;
76        Ok(store)
77    }
78
79    fn init_schema(&self) -> Result<()> {
80        let conn = self.conn.lock().unwrap();
81        conn.execute_batch(
82            "
83            CREATE TABLE IF NOT EXISTS spans (
84                trace_id       VARCHAR NOT NULL,
85                span_id        VARCHAR NOT NULL,
86                parent_span_id VARCHAR,
87                service        VARCHAR NOT NULL,
88                operation      VARCHAR NOT NULL,
89                start_time     TIMESTAMP NOT NULL,
90                end_time       TIMESTAMP NOT NULL,
91                duration_ms    DOUBLE NOT NULL,
92                status         VARCHAR NOT NULL DEFAULT 'unset',
93                attributes     JSON,
94                events         JSON,
95                kind           VARCHAR NOT NULL DEFAULT 'internal',
96                llm            JSON,
97                PRIMARY KEY (trace_id, span_id)
98            );
99
100            -- Migrate pre-existing databases created before the LLM columns.
101            -- DuckDB's ALTER ... ADD COLUMN rejects constraints, so these are
102            -- added nullable; fresh tables get the constraints from CREATE above,
103            -- and migrated NULL `kind` values are read back as 'internal'.
104            ALTER TABLE spans ADD COLUMN IF NOT EXISTS kind VARCHAR;
105            ALTER TABLE spans ADD COLUMN IF NOT EXISTS llm JSON;
106
107            CREATE INDEX IF NOT EXISTS idx_spans_service ON spans(service);
108            CREATE INDEX IF NOT EXISTS idx_spans_start_time ON spans(start_time);
109            CREATE INDEX IF NOT EXISTS idx_spans_trace_id ON spans(trace_id);
110            CREATE INDEX IF NOT EXISTS idx_spans_status ON spans(status);
111            CREATE INDEX IF NOT EXISTS idx_spans_kind ON spans(kind);
112
113            CREATE TABLE IF NOT EXISTS trace_comments (
114                id         VARCHAR NOT NULL PRIMARY KEY,
115                trace_id   VARCHAR NOT NULL,
116                span_id    VARCHAR,
117                author     VARCHAR NOT NULL,
118                body       VARCHAR NOT NULL,
119                created_at TIMESTAMP NOT NULL DEFAULT current_timestamp::TIMESTAMP
120            );
121
122            CREATE INDEX IF NOT EXISTS idx_comments_trace_id ON trace_comments(trace_id);
123
124            CREATE TABLE IF NOT EXISTS logs (
125                timestamp          TIMESTAMP NOT NULL,
126                observed_timestamp TIMESTAMP NOT NULL,
127                trace_id           VARCHAR,
128                span_id            VARCHAR,
129                severity           VARCHAR NOT NULL DEFAULT 'unspecified',
130                severity_text      VARCHAR NOT NULL DEFAULT '',
131                body               VARCHAR NOT NULL DEFAULT '',
132                service            VARCHAR NOT NULL,
133                attributes         JSON,
134                body_sha256        VARCHAR
135            );
136
137            -- Migrate pre-existing databases (nullable, no constraint).
138            ALTER TABLE logs ADD COLUMN IF NOT EXISTS body_sha256 VARCHAR;
139
140            CREATE INDEX IF NOT EXISTS idx_logs_service ON logs(service);
141            CREATE INDEX IF NOT EXISTS idx_logs_timestamp ON logs(timestamp);
142            CREATE INDEX IF NOT EXISTS idx_logs_severity ON logs(severity);
143            CREATE INDEX IF NOT EXISTS idx_logs_trace_id ON logs(trace_id);
144
145            CREATE TABLE IF NOT EXISTS metrics (
146                timestamp   TIMESTAMP NOT NULL,
147                service     VARCHAR NOT NULL,
148                name        VARCHAR NOT NULL,
149                metric_type VARCHAR NOT NULL DEFAULT 'unknown',
150                value       DOUBLE NOT NULL,
151                unit        VARCHAR NOT NULL DEFAULT '',
152                attributes  JSON
153            );
154
155            CREATE INDEX IF NOT EXISTS idx_metrics_service ON metrics(service);
156            CREATE INDEX IF NOT EXISTS idx_metrics_name ON metrics(name);
157            CREATE INDEX IF NOT EXISTS idx_metrics_timestamp ON metrics(timestamp);
158            ",
159        )?;
160        Ok(())
161    }
162
163    pub fn insert_spans(&self, spans: &[Span]) -> Result<()> {
164        let conn = self.conn.lock().unwrap();
165        let mut stmt = conn.prepare(
166            "INSERT OR REPLACE INTO spans
167             (trace_id, span_id, parent_span_id, service, operation,
168              start_time, end_time, duration_ms, status, attributes, events,
169              kind, llm)
170             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
171        )?;
172
173        for span in spans {
174            let attrs = serde_json::to_string(&span.attributes)?;
175            let events = serde_json::to_string(&span.events)?;
176            let llm = span.llm.as_ref().map(serde_json::to_string).transpose()?;
177            stmt.execute(params![
178                span.trace_id,
179                span.span_id,
180                span.parent_span_id,
181                span.service,
182                span.operation,
183                span.start_time.format("%Y-%m-%d %H:%M:%S%.6f").to_string(),
184                span.end_time.format("%Y-%m-%d %H:%M:%S%.6f").to_string(),
185                span.duration_ms,
186                span.status.to_string(),
187                attrs,
188                events,
189                span.kind.to_string(),
190                llm,
191            ])?;
192        }
193        Ok(())
194    }
195
196    pub fn query_traces(&self, query: &TraceQuery) -> Result<Vec<Span>> {
197        let conn = self.conn.lock().unwrap();
198
199        let mut sql = String::from(
200            "SELECT trace_id, span_id, parent_span_id, service, operation,
201                    start_time::VARCHAR, end_time::VARCHAR, duration_ms, status, attributes, events,
202                    kind, llm
203             FROM spans WHERE 1=1",
204        );
205        let mut param_values: Vec<Box<dyn duckdb::ToSql>> = Vec::new();
206
207        if let Some(ref svc) = query.service {
208            sql.push_str(" AND service = ?");
209            param_values.push(Box::new(svc.clone()));
210        }
211        if let Some(ref op) = query.operation {
212            sql.push_str(" AND operation LIKE ?");
213            param_values.push(Box::new(format!("%{op}%")));
214        }
215        if let Some(min) = query.min_duration_ms {
216            sql.push_str(" AND duration_ms >= ?");
217            param_values.push(Box::new(min));
218        }
219        if let Some(max) = query.max_duration_ms {
220            sql.push_str(" AND duration_ms <= ?");
221            param_values.push(Box::new(max));
222        }
223        if let Some(ref status) = query.status {
224            sql.push_str(" AND status = ?");
225            param_values.push(Box::new(status.clone()));
226        }
227        if let Some(secs) = query.last_seconds {
228            sql.push_str(&format!(
229                " AND start_time >= current_timestamp::TIMESTAMP - INTERVAL '{secs} seconds'"
230            ));
231        }
232        for (k, v) in &query.attributes {
233            sql.push_str(" AND json_extract_string(attributes, ?) = ?");
234            param_values.push(Box::new(json_path_for_key(k)));
235            param_values.push(Box::new(v.clone()));
236        }
237
238        sql.push_str(" ORDER BY start_time DESC");
239
240        let limit = query.limit.unwrap_or(100);
241        sql.push_str(&format!(" LIMIT {limit}"));
242
243        let params_ref: Vec<&dyn duckdb::ToSql> = param_values.iter().map(|p| p.as_ref()).collect();
244        let mut stmt = conn.prepare(&sql)?;
245        let rows = stmt.query_map(params_ref.as_slice(), row_to_span)?;
246
247        let mut spans = Vec::new();
248        for row in rows {
249            spans.push(row?);
250        }
251        Ok(spans)
252    }
253
254    pub fn get_trace(&self, trace_id: &str) -> Result<Vec<Span>> {
255        let conn = self.conn.lock().unwrap();
256        let mut stmt = conn.prepare(
257            "SELECT trace_id, span_id, parent_span_id, service, operation,
258                    start_time::VARCHAR, end_time::VARCHAR, duration_ms, status, attributes, events,
259                    kind, llm
260             FROM spans
261             WHERE trace_id = ?
262             ORDER BY start_time ASC",
263        )?;
264
265        let rows = stmt.query_map(params![trace_id], row_to_span)?;
266
267        let mut spans = Vec::new();
268        for row in rows {
269            spans.push(row?);
270        }
271        Ok(spans)
272    }
273
274    pub fn add_comment(
275        &self,
276        trace_id: &str,
277        span_id: Option<&str>,
278        author: &str,
279        body: &str,
280    ) -> Result<TraceComment> {
281        let conn = self.conn.lock().unwrap();
282        let id = uuid::Uuid::new_v4().to_string();
283        conn.execute(
284            "INSERT INTO trace_comments (id, trace_id, span_id, author, body)
285             VALUES (?, ?, ?, ?, ?)",
286            params![id, trace_id, span_id, author, body],
287        )?;
288
289        let mut stmt = conn.prepare(
290            "SELECT id, trace_id, span_id, author, body, created_at::VARCHAR
291             FROM trace_comments WHERE id = ?",
292        )?;
293        let comment = stmt.query_row(params![id], |row| {
294            Ok(TraceComment {
295                id: row.get(0)?,
296                trace_id: row.get(1)?,
297                span_id: row.get(2)?,
298                author: row.get(3)?,
299                body: row.get(4)?,
300                created_at: row.get(5)?,
301            })
302        })?;
303        Ok(comment)
304    }
305
306    pub fn get_comments(&self, trace_id: &str) -> Result<Vec<TraceComment>> {
307        let conn = self.conn.lock().unwrap();
308        let mut stmt = conn.prepare(
309            "SELECT id, trace_id, span_id, author, body, created_at::VARCHAR
310             FROM trace_comments
311             WHERE trace_id = ?
312             ORDER BY created_at ASC",
313        )?;
314        let rows = stmt.query_map(params![trace_id], |row| {
315            Ok(TraceComment {
316                id: row.get(0)?,
317                trace_id: row.get(1)?,
318                span_id: row.get(2)?,
319                author: row.get(3)?,
320                body: row.get(4)?,
321                created_at: row.get(5)?,
322            })
323        })?;
324        let mut comments = Vec::new();
325        for row in rows {
326            comments.push(row?);
327        }
328        Ok(comments)
329    }
330
331    pub fn list_services(&self) -> Result<Vec<ServiceInfo>> {
332        let conn = self.conn.lock().unwrap();
333        let mut stmt = conn.prepare(
334            "SELECT service,
335                    COUNT(*) as span_count,
336                    COUNT(DISTINCT trace_id) as trace_count,
337                    AVG(duration_ms) as avg_duration,
338                    SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END)::DOUBLE / COUNT(*)::DOUBLE as error_rate
339             FROM spans
340             GROUP BY service
341             ORDER BY span_count DESC",
342        )?;
343
344        let rows = stmt.query_map([], |row| {
345            Ok(ServiceInfo {
346                name: row.get(0)?,
347                span_count: row.get(1)?,
348                trace_count: row.get(2)?,
349                avg_duration_ms: row.get(3)?,
350                error_rate: row.get(4)?,
351            })
352        })?;
353
354        let mut services = Vec::new();
355        for row in rows {
356            services.push(row?);
357        }
358        Ok(services)
359    }
360
361    // ── Log storage ─────────────────────────────────────────────────
362
363    pub fn insert_logs(&self, logs: &[LogRecord]) -> Result<()> {
364        let conn = self.conn.lock().unwrap();
365        let mut stmt = conn.prepare(
366            "INSERT INTO logs
367             (timestamp, observed_timestamp, trace_id, span_id, severity,
368              severity_text, body, service, attributes, body_sha256)
369             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
370        )?;
371
372        for log in logs {
373            let attrs = serde_json::to_string(&log.attributes)?;
374            stmt.execute(params![
375                log.timestamp.format("%Y-%m-%d %H:%M:%S%.6f").to_string(),
376                log.observed_timestamp
377                    .format("%Y-%m-%d %H:%M:%S%.6f")
378                    .to_string(),
379                log.trace_id,
380                log.span_id,
381                log.severity.to_string(),
382                log.severity_text,
383                log.body,
384                log.service,
385                attrs,
386                log.body_sha256,
387            ])?;
388        }
389        Ok(())
390    }
391
392    pub fn query_logs(&self, query: &LogQuery) -> Result<Vec<LogRecord>> {
393        let conn = self.conn.lock().unwrap();
394
395        let mut sql = String::from(
396            "SELECT timestamp::VARCHAR, observed_timestamp::VARCHAR, trace_id, span_id,
397                    severity, severity_text, body, service, attributes, body_sha256
398             FROM logs WHERE 1=1",
399        );
400        let mut param_values: Vec<Box<dyn duckdb::ToSql>> = Vec::new();
401
402        if let Some(ref svc) = query.service {
403            sql.push_str(" AND service = ?");
404            param_values.push(Box::new(svc.clone()));
405        }
406        if let Some(ref sev) = query.severity {
407            sql.push_str(" AND severity = ?");
408            param_values.push(Box::new(sev.clone()));
409        }
410        if let Some(ref body) = query.body_contains {
411            sql.push_str(" AND body LIKE ?");
412            param_values.push(Box::new(format!("%{body}%")));
413        }
414        if let Some(ref tid) = query.trace_id {
415            sql.push_str(" AND trace_id = ?");
416            param_values.push(Box::new(tid.clone()));
417        }
418        if let Some(secs) = query.last_seconds {
419            sql.push_str(&format!(
420                " AND timestamp >= current_timestamp::TIMESTAMP - INTERVAL '{secs} seconds'"
421            ));
422        }
423
424        sql.push_str(" ORDER BY timestamp DESC");
425
426        let limit = query.limit.unwrap_or(100);
427        sql.push_str(&format!(" LIMIT {limit}"));
428
429        let params_ref: Vec<&dyn duckdb::ToSql> = param_values.iter().map(|p| p.as_ref()).collect();
430        let mut stmt = conn.prepare(&sql)?;
431        let rows = stmt.query_map(params_ref.as_slice(), |row| {
432            let ts_str: String = row.get(0)?;
433            let obs_str: String = row.get(1)?;
434            let attrs_str: String = row.get(8)?;
435            let severity_str: String = row.get(4)?;
436
437            Ok(LogRecord {
438                timestamp: parse_timestamp(&ts_str),
439                observed_timestamp: parse_timestamp(&obs_str),
440                trace_id: row.get(2)?,
441                span_id: row.get(3)?,
442                severity: LogSeverity::from_str(&severity_str),
443                severity_text: row.get(5)?,
444                body: row.get(6)?,
445                service: row.get(7)?,
446                attributes: serde_json::from_str(&attrs_str).unwrap_or_default(),
447                body_sha256: row.get(9)?,
448            })
449        })?;
450
451        let mut logs = Vec::new();
452        for row in rows {
453            logs.push(row?);
454        }
455        Ok(logs)
456    }
457
458    // ── Metric storage ──────────────────────────────────────────────
459
460    pub fn insert_metrics(&self, metrics: &[MetricPoint]) -> Result<()> {
461        let conn = self.conn.lock().unwrap();
462        let mut stmt = conn.prepare(
463            "INSERT INTO metrics
464             (timestamp, service, name, metric_type, value, unit, attributes)
465             VALUES (?, ?, ?, ?, ?, ?, ?)",
466        )?;
467
468        for m in metrics {
469            let attrs = serde_json::to_string(&m.attributes)?;
470            stmt.execute(params![
471                m.timestamp.format("%Y-%m-%d %H:%M:%S%.6f").to_string(),
472                m.service,
473                m.name,
474                m.metric_type.to_string(),
475                m.value,
476                m.unit,
477                attrs,
478            ])?;
479        }
480        Ok(())
481    }
482
483    pub fn query_metrics(&self, query: &MetricQuery) -> Result<Vec<MetricPoint>> {
484        let conn = self.conn.lock().unwrap();
485
486        let mut sql = String::from(
487            "SELECT timestamp::VARCHAR, service, name, metric_type, value, unit, attributes
488             FROM metrics WHERE 1=1",
489        );
490        let mut param_values: Vec<Box<dyn duckdb::ToSql>> = Vec::new();
491
492        if let Some(ref svc) = query.service {
493            sql.push_str(" AND service = ?");
494            param_values.push(Box::new(svc.clone()));
495        }
496        if let Some(ref name) = query.name {
497            sql.push_str(" AND name = ?");
498            param_values.push(Box::new(name.clone()));
499        }
500        if let Some(ref mt) = query.metric_type {
501            sql.push_str(" AND metric_type = ?");
502            param_values.push(Box::new(mt.clone()));
503        }
504        if let Some(secs) = query.last_seconds {
505            sql.push_str(&format!(
506                " AND timestamp >= current_timestamp::TIMESTAMP - INTERVAL '{secs} seconds'"
507            ));
508        }
509
510        sql.push_str(" ORDER BY timestamp DESC");
511
512        let limit = query.limit.unwrap_or(500);
513        sql.push_str(&format!(" LIMIT {limit}"));
514
515        let params_ref: Vec<&dyn duckdb::ToSql> = param_values.iter().map(|p| p.as_ref()).collect();
516        let mut stmt = conn.prepare(&sql)?;
517        let rows = stmt.query_map(params_ref.as_slice(), |row| {
518            let ts_str: String = row.get(0)?;
519            let mt_str: String = row.get(3)?;
520            let attrs_str: String = row.get(6)?;
521
522            Ok(MetricPoint {
523                timestamp: parse_timestamp(&ts_str),
524                service: row.get(1)?,
525                name: row.get(2)?,
526                metric_type: MetricType::from_str(&mt_str),
527                value: row.get(4)?,
528                unit: row.get(5)?,
529                attributes: serde_json::from_str(&attrs_str).unwrap_or_default(),
530            })
531        })?;
532
533        let mut metrics = Vec::new();
534        for row in rows {
535            metrics.push(row?);
536        }
537        Ok(metrics)
538    }
539
540    pub fn query_summary(&self, last_seconds: i64, service: Option<&str>) -> Result<SummaryReport> {
541        let conn = self.conn.lock().unwrap();
542
543        let span_time_clause = format!(
544            "start_time >= current_timestamp::TIMESTAMP - INTERVAL '{last_seconds} seconds'"
545        );
546        let ts_time_clause = format!(
547            "timestamp >= current_timestamp::TIMESTAMP - INTERVAL '{last_seconds} seconds'"
548        );
549
550        let svc_clause = if service.is_some() {
551            " AND service = ?"
552        } else {
553            ""
554        };
555        let svc_owned = service.map(|s| s.to_string());
556        let build_params = || -> Vec<&dyn duckdb::ToSql> {
557            match &svc_owned {
558                Some(s) => vec![s as &dyn duckdb::ToSql],
559                None => vec![],
560            }
561        };
562
563        // ── Traces aggregate ─────────────────────────────────────────
564        let traces_sql = format!(
565            "SELECT
566                COUNT(*)::BIGINT,
567                COUNT(DISTINCT trace_id)::BIGINT,
568                SUM(CASE WHEN status='error' THEN 1 ELSE 0 END)::BIGINT,
569                COALESCE(AVG(duration_ms), 0.0),
570                COALESCE(MAX(duration_ms), 0.0),
571                COALESCE(quantile_cont(duration_ms, 0.5), 0.0),
572                COALESCE(quantile_cont(duration_ms, 0.95), 0.0),
573                COALESCE(quantile_cont(duration_ms, 0.99), 0.0)
574             FROM spans WHERE {span_time_clause}{svc_clause}"
575        );
576        let mut stmt = conn.prepare(&traces_sql)?;
577        let traces: TraceSummary = stmt.query_row(build_params().as_slice(), |row| {
578            let span_count: i64 = row.get::<_, Option<i64>>(0)?.unwrap_or(0);
579            let error_count: i64 = row.get::<_, Option<i64>>(2)?.unwrap_or(0);
580            let error_rate = if span_count > 0 {
581                error_count as f64 / span_count as f64
582            } else {
583                0.0
584            };
585            Ok(TraceSummary {
586                span_count,
587                trace_count: row.get::<_, Option<i64>>(1)?.unwrap_or(0),
588                error_count,
589                error_rate,
590                avg_ms: row.get(3)?,
591                max_ms: row.get(4)?,
592                p50_ms: row.get(5)?,
593                p95_ms: row.get(6)?,
594                p99_ms: row.get(7)?,
595            })
596        })?;
597        drop(stmt);
598
599        // ── Top services by span count ───────────────────────────────
600        let top_svc_sql = format!(
601            "SELECT service,
602                    COUNT(*)::BIGINT,
603                    SUM(CASE WHEN status='error' THEN 1 ELSE 0 END)::DOUBLE / NULLIF(COUNT(*), 0)::DOUBLE,
604                    COALESCE(quantile_cont(duration_ms, 0.95), 0.0)
605             FROM spans WHERE {span_time_clause}{svc_clause}
606             GROUP BY service
607             ORDER BY 2 DESC
608             LIMIT 5"
609        );
610        let mut stmt = conn.prepare(&top_svc_sql)?;
611        let top_services: Vec<ServiceSummary> = stmt
612            .query_map(build_params().as_slice(), |row| {
613                Ok(ServiceSummary {
614                    service: row.get(0)?,
615                    span_count: row.get(1)?,
616                    error_rate: row.get::<_, Option<f64>>(2)?.unwrap_or(0.0),
617                    p95_ms: row.get(3)?,
618                })
619            })?
620            .collect::<duckdb::Result<_>>()?;
621        drop(stmt);
622
623        // ── Top error operations ─────────────────────────────────────
624        let err_op_sql = format!(
625            "SELECT service, operation, COUNT(*)::BIGINT
626             FROM spans
627             WHERE status='error' AND {span_time_clause}{svc_clause}
628             GROUP BY service, operation
629             ORDER BY 3 DESC
630             LIMIT 5"
631        );
632        let mut stmt = conn.prepare(&err_op_sql)?;
633        let top_error_operations: Vec<ErrorOperation> = stmt
634            .query_map(build_params().as_slice(), |row| {
635                Ok(ErrorOperation {
636                    service: row.get(0)?,
637                    operation: row.get(1)?,
638                    error_count: row.get(2)?,
639                })
640            })?
641            .collect::<duckdb::Result<_>>()?;
642        drop(stmt);
643
644        // ── Logs aggregate ───────────────────────────────────────────
645        let logs_sql = format!(
646            "SELECT
647                COUNT(*)::BIGINT,
648                SUM(CASE WHEN severity IN ('error','fatal') THEN 1 ELSE 0 END)::BIGINT,
649                SUM(CASE WHEN severity='warn' THEN 1 ELSE 0 END)::BIGINT,
650                SUM(CASE WHEN severity='info' THEN 1 ELSE 0 END)::BIGINT,
651                SUM(CASE WHEN severity IN ('debug','trace') THEN 1 ELSE 0 END)::BIGINT
652             FROM logs WHERE {ts_time_clause}{svc_clause}"
653        );
654        let mut stmt = conn.prepare(&logs_sql)?;
655        let logs: LogSummary = stmt.query_row(build_params().as_slice(), |row| {
656            Ok(LogSummary {
657                total: row.get::<_, Option<i64>>(0)?.unwrap_or(0),
658                error: row.get::<_, Option<i64>>(1)?.unwrap_or(0),
659                warn: row.get::<_, Option<i64>>(2)?.unwrap_or(0),
660                info: row.get::<_, Option<i64>>(3)?.unwrap_or(0),
661                debug: row.get::<_, Option<i64>>(4)?.unwrap_or(0),
662            })
663        })?;
664        drop(stmt);
665
666        // ── Metrics aggregate ────────────────────────────────────────
667        let metrics_sql = format!(
668            "SELECT COUNT(*)::BIGINT, COUNT(DISTINCT name)::BIGINT
669             FROM metrics WHERE {ts_time_clause}{svc_clause}"
670        );
671        let mut stmt = conn.prepare(&metrics_sql)?;
672        let metrics: MetricSummary = stmt.query_row(build_params().as_slice(), |row| {
673            Ok(MetricSummary {
674                point_count: row.get::<_, Option<i64>>(0)?.unwrap_or(0),
675                unique_names: row.get::<_, Option<i64>>(1)?.unwrap_or(0),
676            })
677        })?;
678        drop(stmt);
679
680        Ok(SummaryReport {
681            window_seconds: last_seconds,
682            service_filter: service.map(|s| s.to_string()),
683            traces,
684            top_services,
685            top_error_operations,
686            logs,
687            metrics,
688        })
689    }
690
691    pub fn query_anomalies(
692        &self,
693        current_seconds: i64,
694        baseline_seconds: i64,
695        service: Option<&str>,
696    ) -> Result<AnomalyReport> {
697        let conn = self.conn.lock().unwrap();
698        let svc_owned = service.map(|s| s.to_string());
699        let svc_clause = if service.is_some() {
700            " AND service = ?"
701        } else {
702            ""
703        };
704
705        // Per-service stats for a window whose end = now and start = now - window.
706        let stats_sql = |window: i64| -> String {
707            format!(
708                "SELECT service,
709                        COUNT(*)::BIGINT AS span_count,
710                        SUM(CASE WHEN status='error' THEN 1 ELSE 0 END)::DOUBLE
711                            / NULLIF(COUNT(*), 0)::DOUBLE AS error_rate,
712                        COALESCE(quantile_cont(duration_ms, 0.95), 0.0) AS p95_ms
713                 FROM spans
714                 WHERE start_time >= current_timestamp::TIMESTAMP - INTERVAL '{window} seconds'{svc_clause}
715                 GROUP BY service"
716            )
717        };
718
719        let build_params = || -> Vec<&dyn duckdb::ToSql> {
720            match &svc_owned {
721                Some(s) => vec![s as &dyn duckdb::ToSql],
722                None => vec![],
723            }
724        };
725
726        let mut current_stats: std::collections::HashMap<String, (i64, f64, f64)> =
727            std::collections::HashMap::new();
728        let mut stmt = conn.prepare(&stats_sql(current_seconds))?;
729        let rows = stmt.query_map(build_params().as_slice(), |row| {
730            Ok((
731                row.get::<_, String>(0)?,
732                row.get::<_, i64>(1)?,
733                row.get::<_, Option<f64>>(2)?.unwrap_or(0.0),
734                row.get::<_, f64>(3)?,
735            ))
736        })?;
737        for r in rows {
738            let (svc, span_count, error_rate, p95) = r?;
739            current_stats.insert(svc, (span_count, error_rate, p95));
740        }
741        drop(stmt);
742
743        let mut baseline_stats: std::collections::HashMap<String, (i64, f64, f64)> =
744            std::collections::HashMap::new();
745        let mut stmt = conn.prepare(&stats_sql(baseline_seconds))?;
746        let rows = stmt.query_map(build_params().as_slice(), |row| {
747            Ok((
748                row.get::<_, String>(0)?,
749                row.get::<_, i64>(1)?,
750                row.get::<_, Option<f64>>(2)?.unwrap_or(0.0),
751                row.get::<_, f64>(3)?,
752            ))
753        })?;
754        for r in rows {
755            let (svc, span_count, error_rate, p95) = r?;
756            baseline_stats.insert(svc, (span_count, error_rate, p95));
757        }
758        drop(stmt);
759
760        let mut anomalies = Vec::new();
761        for (svc, (cur_count, cur_err, cur_p95)) in &current_stats {
762            if *cur_count < 5 {
763                continue;
764            }
765            let (base_count, base_err, base_p95) =
766                baseline_stats.get(svc).copied().unwrap_or((0, 0.0, 0.0));
767
768            let err_delta = cur_err - base_err;
769            if err_delta >= 0.05 && *cur_err > 0.0 {
770                let severity = if err_delta >= 0.25 {
771                    "critical"
772                } else if err_delta >= 0.10 {
773                    "warning"
774                } else {
775                    "info"
776                };
777                anomalies.push(Anomaly {
778                    service: svc.clone(),
779                    kind: "error_rate".into(),
780                    severity: severity.into(),
781                    current: *cur_err,
782                    baseline: base_err,
783                    delta: err_delta,
784                    description: format!(
785                        "Error rate rose {:.1}% → {:.1}% (baseline {:.1}%, {} spans)",
786                        base_err * 100.0,
787                        cur_err * 100.0,
788                        base_err * 100.0,
789                        cur_count
790                    ),
791                });
792            }
793
794            if base_p95 > 0.0 && base_count >= 5 {
795                let ratio = cur_p95 / base_p95;
796                if ratio >= 1.5 && *cur_p95 > 50.0 {
797                    let severity = if ratio >= 3.0 {
798                        "critical"
799                    } else if ratio >= 2.0 {
800                        "warning"
801                    } else {
802                        "info"
803                    };
804                    anomalies.push(Anomaly {
805                        service: svc.clone(),
806                        kind: "latency_p95".into(),
807                        severity: severity.into(),
808                        current: *cur_p95,
809                        baseline: base_p95,
810                        delta: cur_p95 - base_p95,
811                        description: format!(
812                            "p95 latency {:.1}ms → {:.1}ms ({:.1}× baseline)",
813                            base_p95, cur_p95, ratio
814                        ),
815                    });
816                }
817            }
818        }
819
820        anomalies.sort_by(|a, b| {
821            let rank = |s: &str| match s {
822                "critical" => 0,
823                "warning" => 1,
824                _ => 2,
825            };
826            rank(&a.severity).cmp(&rank(&b.severity))
827        });
828
829        Ok(AnomalyReport {
830            current_seconds,
831            baseline_seconds,
832            service_filter: svc_owned,
833            anomalies,
834        })
835    }
836
837    pub fn query_correlate(&self, trace_id: &str) -> Result<Option<CorrelateReport>> {
838        let spans = self.get_trace(trace_id)?;
839        if spans.is_empty() {
840            return Ok(None);
841        }
842
843        let start_time = spans
844            .iter()
845            .map(|s| s.start_time)
846            .min()
847            .unwrap_or_else(Utc::now);
848        let end_time = spans
849            .iter()
850            .map(|s| s.end_time)
851            .max()
852            .unwrap_or_else(Utc::now);
853        let duration_ms = (end_time - start_time).num_milliseconds() as f64;
854        let error_count = spans
855            .iter()
856            .filter(|s| matches!(s.status, SpanStatus::Error))
857            .count() as i64;
858
859        let mut services: Vec<String> = spans.iter().map(|s| s.service.clone()).collect();
860        services.sort();
861        services.dedup();
862
863        let logs = self.query_logs(&LogQuery {
864            trace_id: Some(trace_id.to_string()),
865            limit: Some(500),
866            ..Default::default()
867        })?;
868
869        // Metrics overlapping the trace's time range, scoped to touched services.
870        let metrics = {
871            let conn = self.conn.lock().unwrap();
872            let start = start_time
873                .naive_utc()
874                .format("%Y-%m-%d %H:%M:%S%.6f")
875                .to_string();
876            let end = end_time
877                .naive_utc()
878                .format("%Y-%m-%d %H:%M:%S%.6f")
879                .to_string();
880
881            let placeholders = std::iter::repeat("?")
882                .take(services.len())
883                .collect::<Vec<_>>()
884                .join(",");
885            let sql = format!(
886                "SELECT timestamp::VARCHAR, service, name, metric_type, value, unit, attributes
887                 FROM metrics
888                 WHERE timestamp BETWEEN ?::TIMESTAMP AND ?::TIMESTAMP
889                   AND service IN ({placeholders})
890                 ORDER BY timestamp ASC
891                 LIMIT 500"
892            );
893            let mut stmt = conn.prepare(&sql)?;
894            let mut params_vec: Vec<&dyn duckdb::ToSql> =
895                vec![&start as &dyn duckdb::ToSql, &end as &dyn duckdb::ToSql];
896            for s in &services {
897                params_vec.push(s as &dyn duckdb::ToSql);
898            }
899            let rows = stmt.query_map(params_vec.as_slice(), |row| {
900                let ts_str: String = row.get(0)?;
901                let mt_str: String = row.get(3)?;
902                let attrs_str: String = row.get(6)?;
903                Ok(MetricPoint {
904                    timestamp: parse_timestamp(&ts_str),
905                    service: row.get(1)?,
906                    name: row.get(2)?,
907                    metric_type: MetricType::from_str(&mt_str),
908                    value: row.get(4)?,
909                    unit: row.get(5)?,
910                    attributes: serde_json::from_str(&attrs_str).unwrap_or_default(),
911                })
912            })?;
913            let mut out = Vec::new();
914            for r in rows {
915                out.push(r?);
916            }
917            out
918        };
919
920        Ok(Some(CorrelateReport {
921            trace_id: trace_id.to_string(),
922            span_count: spans.len(),
923            services,
924            start_time: start_time.to_rfc3339(),
925            end_time: end_time.to_rfc3339(),
926            duration_ms,
927            error_count,
928            logs,
929            metrics,
930        }))
931    }
932}
933
934impl DuckDbStore {
935    /// Run a read-only SQL query and return rows as JSON objects. Only
936    /// `SELECT`/`WITH` statements are allowed — this is an analyst/agent query
937    /// surface over the `spans`/`logs`/`metrics`/`trace_comments` tables, not a
938    /// mutation path.
939    pub fn query_sql(&self, sql: &str) -> Result<Vec<serde_json::Value>> {
940        let trimmed = sql.trim_start();
941        let head = trimmed
942            .split_whitespace()
943            .next()
944            .unwrap_or("")
945            .to_ascii_uppercase();
946        if head != "SELECT" && head != "WITH" {
947            anyhow::bail!("only read-only SELECT/WITH queries are allowed");
948        }
949
950        let conn = self.conn.lock().unwrap();
951        let mut stmt = conn.prepare(sql)?;
952        // Column metadata is only valid after execution, so query first, then
953        // read names from the active statement.
954        let mut rows = stmt.query([])?;
955        let column_names: Vec<String> = match rows.as_ref() {
956            Some(s) => (0..s.column_count())
957                .map(|i| {
958                    s.column_name(i)
959                        .cloned()
960                        .unwrap_or_else(|_| "?".to_string())
961                })
962                .collect(),
963            None => Vec::new(),
964        };
965
966        let mut out = Vec::new();
967        while let Some(row) = rows.next()? {
968            let mut obj = serde_json::Map::with_capacity(column_names.len());
969            for (i, name) in column_names.iter().enumerate() {
970                let value: duckdb::types::Value = row.get(i)?;
971                obj.insert(name.clone(), duck_value_to_json(value));
972            }
973            out.push(serde_json::Value::Object(obj));
974        }
975        Ok(out)
976    }
977}
978
979/// Map a DuckDB dynamic value to JSON. Uncommon types fall back to their debug
980/// string so a query never fails on an exotic column.
981fn duck_value_to_json(v: duckdb::types::Value) -> serde_json::Value {
982    use duckdb::types::Value as V;
983    use serde_json::Value as J;
984    match v {
985        V::Null => J::Null,
986        V::Boolean(b) => J::Bool(b),
987        V::TinyInt(n) => J::from(n),
988        V::SmallInt(n) => J::from(n),
989        V::Int(n) => J::from(n),
990        V::BigInt(n) => J::from(n),
991        V::UTinyInt(n) => J::from(n),
992        V::USmallInt(n) => J::from(n),
993        V::UInt(n) => J::from(n),
994        V::UBigInt(n) => J::from(n),
995        V::Float(f) => serde_json::Number::from_f64(f as f64)
996            .map(J::Number)
997            .unwrap_or(J::Null),
998        V::Double(f) => serde_json::Number::from_f64(f)
999            .map(J::Number)
1000            .unwrap_or(J::Null),
1001        V::Text(s) => J::String(s),
1002        other => J::String(format!("{other:?}")),
1003    }
1004}
1005
1006/// Forward the inherent `DuckDbStore` methods through the `Store` trait so the
1007/// server can depend on `dyn Store`. Bodies live in the inherent impl above so
1008/// internal callers and the unit tests keep working unchanged.
1009impl Store for DuckDbStore {
1010    fn insert_spans(&self, spans: &[Span]) -> Result<()> {
1011        DuckDbStore::insert_spans(self, spans)
1012    }
1013    fn query_traces(&self, query: &TraceQuery) -> Result<Vec<Span>> {
1014        DuckDbStore::query_traces(self, query)
1015    }
1016    fn get_trace(&self, trace_id: &str) -> Result<Vec<Span>> {
1017        DuckDbStore::get_trace(self, trace_id)
1018    }
1019    fn add_comment(
1020        &self,
1021        trace_id: &str,
1022        span_id: Option<&str>,
1023        author: &str,
1024        body: &str,
1025    ) -> Result<TraceComment> {
1026        DuckDbStore::add_comment(self, trace_id, span_id, author, body)
1027    }
1028    fn get_comments(&self, trace_id: &str) -> Result<Vec<TraceComment>> {
1029        DuckDbStore::get_comments(self, trace_id)
1030    }
1031    fn list_services(&self) -> Result<Vec<ServiceInfo>> {
1032        DuckDbStore::list_services(self)
1033    }
1034    fn insert_logs(&self, logs: &[LogRecord]) -> Result<()> {
1035        DuckDbStore::insert_logs(self, logs)
1036    }
1037    fn query_logs(&self, query: &LogQuery) -> Result<Vec<LogRecord>> {
1038        DuckDbStore::query_logs(self, query)
1039    }
1040    fn insert_metrics(&self, metrics: &[MetricPoint]) -> Result<()> {
1041        DuckDbStore::insert_metrics(self, metrics)
1042    }
1043    fn query_metrics(&self, query: &MetricQuery) -> Result<Vec<MetricPoint>> {
1044        DuckDbStore::query_metrics(self, query)
1045    }
1046    fn query_summary(&self, last_seconds: i64, service: Option<&str>) -> Result<SummaryReport> {
1047        DuckDbStore::query_summary(self, last_seconds, service)
1048    }
1049    fn query_anomalies(
1050        &self,
1051        current_seconds: i64,
1052        baseline_seconds: i64,
1053        service: Option<&str>,
1054    ) -> Result<AnomalyReport> {
1055        DuckDbStore::query_anomalies(self, current_seconds, baseline_seconds, service)
1056    }
1057    fn query_correlate(&self, trace_id: &str) -> Result<Option<CorrelateReport>> {
1058        DuckDbStore::query_correlate(self, trace_id)
1059    }
1060    fn query_sql(&self, sql: &str) -> Result<Vec<serde_json::Value>> {
1061        DuckDbStore::query_sql(self, sql)
1062    }
1063}
1064
1065#[cfg(test)]
1066mod tests {
1067    use super::*;
1068    use std::collections::HashMap;
1069
1070    fn span(trace_id: &str, attrs: &[(&str, &str)]) -> Span {
1071        let now = Utc::now();
1072        let mut attributes = HashMap::new();
1073        for (k, v) in attrs {
1074            attributes.insert((*k).to_string(), (*v).to_string());
1075        }
1076        Span {
1077            trace_id: trace_id.into(),
1078            span_id: format!("{trace_id}-span"),
1079            parent_span_id: None,
1080            service: "svc".into(),
1081            operation: "op".into(),
1082            start_time: now,
1083            end_time: now,
1084            duration_ms: 1.0,
1085            status: SpanStatus::Ok,
1086            attributes,
1087            events: vec![],
1088            kind: SpanKind::Internal,
1089            llm: None,
1090        }
1091    }
1092
1093    fn store() -> DuckDbStore {
1094        let dir = tempfile::tempdir().unwrap();
1095        DuckDbStore::new(dir.path().to_str().unwrap()).unwrap()
1096    }
1097
1098    #[test]
1099    fn attribute_filter_matches_dotted_key() {
1100        let s = store();
1101        s.insert_spans(&[
1102            span("a", &[("http.method", "GET"), ("http.status_code", "200")]),
1103            span("b", &[("http.method", "POST"), ("http.status_code", "500")]),
1104        ])
1105        .unwrap();
1106
1107        let q = TraceQuery {
1108            attributes: vec![("http.method".into(), "GET".into())],
1109            ..Default::default()
1110        };
1111        let got = s.query_traces(&q).unwrap();
1112        assert_eq!(got.len(), 1);
1113        assert_eq!(got[0].trace_id, "a");
1114    }
1115
1116    #[test]
1117    fn attribute_filters_are_anded() {
1118        let s = store();
1119        s.insert_spans(&[
1120            span("a", &[("http.method", "GET"), ("http.status_code", "200")]),
1121            span("b", &[("http.method", "GET"), ("http.status_code", "500")]),
1122        ])
1123        .unwrap();
1124
1125        let q = TraceQuery {
1126            attributes: vec![
1127                ("http.method".into(), "GET".into()),
1128                ("http.status_code".into(), "500".into()),
1129            ],
1130            ..Default::default()
1131        };
1132        let got = s.query_traces(&q).unwrap();
1133        assert_eq!(got.len(), 1);
1134        assert_eq!(got[0].trace_id, "b");
1135    }
1136
1137    #[test]
1138    fn attribute_filter_misses_when_value_differs() {
1139        let s = store();
1140        s.insert_spans(&[span("a", &[("env", "prod")])]).unwrap();
1141
1142        let q = TraceQuery {
1143            attributes: vec![("env".into(), "staging".into())],
1144            ..Default::default()
1145        };
1146        assert!(s.query_traces(&q).unwrap().is_empty());
1147    }
1148
1149    #[test]
1150    fn llm_span_round_trips() {
1151        use super::super::models::{LlmOperation, LlmSpan};
1152
1153        let s = store();
1154        let mut sp = span("llm", &[]);
1155        sp.kind = SpanKind::Llm;
1156        sp.llm = Some(LlmSpan {
1157            provider: "anthropic".into(),
1158            model: "claude-opus-4-7".into(),
1159            operation: LlmOperation::Chat,
1160            input_tokens: Some(1200),
1161            output_tokens: Some(340),
1162            total_tokens: Some(1540),
1163            cost_usd: Some(0.0185),
1164            ..Default::default()
1165        });
1166        s.insert_spans(&[sp]).unwrap();
1167
1168        let got = s.get_trace("llm").unwrap();
1169        assert_eq!(got.len(), 1);
1170        assert_eq!(got[0].kind, SpanKind::Llm);
1171        let llm = got[0].llm.as_ref().expect("llm extension preserved");
1172        assert_eq!(llm.model, "claude-opus-4-7");
1173        assert_eq!(llm.total_tokens, Some(1540));
1174        assert_eq!(llm.cost_usd, Some(0.0185));
1175        assert_eq!(llm.operation, LlmOperation::Chat);
1176    }
1177
1178    #[test]
1179    fn non_llm_span_has_no_extension_after_round_trip() {
1180        let s = store();
1181        s.insert_spans(&[span("plain", &[("k", "v")])]).unwrap();
1182        let got = s.get_trace("plain").unwrap();
1183        assert_eq!(got.len(), 1);
1184        assert_eq!(got[0].kind, SpanKind::Internal);
1185        assert!(got[0].llm.is_none());
1186    }
1187
1188    #[test]
1189    fn query_sql_reads_and_rejects_mutations() {
1190        let s = store();
1191        s.insert_spans(&[span("a", &[("env", "prod")]), span("b", &[("env", "prod")])])
1192            .unwrap();
1193
1194        let rows = s
1195            .query_sql("SELECT service, COUNT(*) AS n FROM spans GROUP BY service")
1196            .unwrap();
1197        assert_eq!(rows.len(), 1);
1198        assert_eq!(rows[0]["service"], serde_json::json!("svc"));
1199        assert_eq!(rows[0]["n"], serde_json::json!(2));
1200
1201        // Mutations are rejected.
1202        assert!(s.query_sql("DELETE FROM spans").is_err());
1203        assert!(s.query_sql("DROP TABLE spans").is_err());
1204    }
1205
1206    #[test]
1207    fn log_body_sha256_round_trips() {
1208        let s = store();
1209        let log = LogRecord {
1210            timestamp: Utc::now(),
1211            observed_timestamp: Utc::now(),
1212            trace_id: Some("t1".into()),
1213            span_id: None,
1214            severity: LogSeverity::Error,
1215            severity_text: "ERROR".into(),
1216            body: String::new(), // offloaded to blob
1217            service: "svc".into(),
1218            attributes: HashMap::new(),
1219            body_sha256: Some("abc123".into()),
1220        };
1221        s.insert_logs(&[log]).unwrap();
1222
1223        let got = s
1224            .query_logs(&LogQuery {
1225                trace_id: Some("t1".into()),
1226                ..Default::default()
1227            })
1228            .unwrap();
1229        assert_eq!(got.len(), 1);
1230        assert_eq!(got[0].body_sha256.as_deref(), Some("abc123"));
1231        assert!(got[0].body.is_empty());
1232    }
1233}