use std::path::Path;
use std::sync::Mutex;
use anyhow::Result;
use chrono::{DateTime, NaiveDateTime, Utc};
use duckdb::{Connection, params};
use super::Store;
use super::models::{
Anomaly, AnomalyReport, CorrelateReport, ErrorOperation, LogQuery, LogRecord, LogSeverity,
LogSummary, MetricPoint, MetricQuery, MetricSummary, MetricType, ServiceInfo, ServiceSummary,
Span, SpanKind, SpanStatus, SummaryReport, TraceComment, TraceQuery, TraceSummary,
};
fn json_path_for_key(key: &str) -> String {
let escaped = key.replace('\\', r"\\").replace('"', r#"\""#);
format!(r#"$."{escaped}""#)
}
fn parse_timestamp(s: &str) -> DateTime<Utc> {
NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f")
.or_else(|_| NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S"))
.map(|dt| dt.and_utc())
.unwrap_or_default()
}
fn row_to_span(row: &duckdb::Row<'_>) -> duckdb::Result<Span> {
let attrs_str: String = row.get(9)?;
let events_str: String = row.get(10)?;
let status_str: String = row.get(8)?;
let parent: Option<String> = row.get(2)?;
let start_str: String = row.get(5)?;
let end_str: String = row.get(6)?;
let kind_str: Option<String> = row.get(11)?;
let llm_str: Option<String> = row.get(12)?;
Ok(Span {
trace_id: row.get(0)?,
span_id: row.get(1)?,
parent_span_id: if parent.as_deref() == Some("") {
None
} else {
parent
},
service: row.get(3)?,
operation: row.get(4)?,
start_time: parse_timestamp(&start_str),
end_time: parse_timestamp(&end_str),
duration_ms: row.get(7)?,
status: SpanStatus::from_str(&status_str),
attributes: serde_json::from_str(&attrs_str).unwrap_or_default(),
events: serde_json::from_str(&events_str).unwrap_or_default(),
kind: kind_str
.map(|s| SpanKind::from_str(&s))
.unwrap_or(SpanKind::Internal),
llm: llm_str.and_then(|s| serde_json::from_str(&s).ok()),
})
}
pub struct DuckDbStore {
conn: Mutex<Connection>,
}
impl DuckDbStore {
pub fn new(data_dir: &str) -> Result<Self> {
std::fs::create_dir_all(data_dir)?;
let db_path = Path::new(data_dir).join("tael.duckdb");
let conn = Connection::open(db_path)?;
let store = Self {
conn: Mutex::new(conn),
};
store.init_schema()?;
Ok(store)
}
fn init_schema(&self) -> Result<()> {
let conn = self.conn.lock().unwrap();
conn.execute_batch(
"
CREATE TABLE IF NOT EXISTS spans (
trace_id VARCHAR NOT NULL,
span_id VARCHAR NOT NULL,
parent_span_id VARCHAR,
service VARCHAR NOT NULL,
operation VARCHAR NOT NULL,
start_time TIMESTAMP NOT NULL,
end_time TIMESTAMP NOT NULL,
duration_ms DOUBLE NOT NULL,
status VARCHAR NOT NULL DEFAULT 'unset',
attributes JSON,
events JSON,
kind VARCHAR NOT NULL DEFAULT 'internal',
llm JSON,
PRIMARY KEY (trace_id, span_id)
);
-- Migrate pre-existing databases created before the LLM columns.
-- DuckDB's ALTER ... ADD COLUMN rejects constraints, so these are
-- added nullable; fresh tables get the constraints from CREATE above,
-- and migrated NULL `kind` values are read back as 'internal'.
ALTER TABLE spans ADD COLUMN IF NOT EXISTS kind VARCHAR;
ALTER TABLE spans ADD COLUMN IF NOT EXISTS llm JSON;
CREATE INDEX IF NOT EXISTS idx_spans_service ON spans(service);
CREATE INDEX IF NOT EXISTS idx_spans_start_time ON spans(start_time);
CREATE INDEX IF NOT EXISTS idx_spans_trace_id ON spans(trace_id);
CREATE INDEX IF NOT EXISTS idx_spans_status ON spans(status);
CREATE INDEX IF NOT EXISTS idx_spans_kind ON spans(kind);
CREATE TABLE IF NOT EXISTS trace_comments (
id VARCHAR NOT NULL PRIMARY KEY,
trace_id VARCHAR NOT NULL,
span_id VARCHAR,
author VARCHAR NOT NULL,
body VARCHAR NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT current_timestamp::TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_comments_trace_id ON trace_comments(trace_id);
CREATE TABLE IF NOT EXISTS logs (
timestamp TIMESTAMP NOT NULL,
observed_timestamp TIMESTAMP NOT NULL,
trace_id VARCHAR,
span_id VARCHAR,
severity VARCHAR NOT NULL DEFAULT 'unspecified',
severity_text VARCHAR NOT NULL DEFAULT '',
body VARCHAR NOT NULL DEFAULT '',
service VARCHAR NOT NULL,
attributes JSON,
body_sha256 VARCHAR
);
-- Migrate pre-existing databases (nullable, no constraint).
ALTER TABLE logs ADD COLUMN IF NOT EXISTS body_sha256 VARCHAR;
CREATE INDEX IF NOT EXISTS idx_logs_service ON logs(service);
CREATE INDEX IF NOT EXISTS idx_logs_timestamp ON logs(timestamp);
CREATE INDEX IF NOT EXISTS idx_logs_severity ON logs(severity);
CREATE INDEX IF NOT EXISTS idx_logs_trace_id ON logs(trace_id);
CREATE TABLE IF NOT EXISTS metrics (
timestamp TIMESTAMP NOT NULL,
service VARCHAR NOT NULL,
name VARCHAR NOT NULL,
metric_type VARCHAR NOT NULL DEFAULT 'unknown',
value DOUBLE NOT NULL,
unit VARCHAR NOT NULL DEFAULT '',
attributes JSON
);
CREATE INDEX IF NOT EXISTS idx_metrics_service ON metrics(service);
CREATE INDEX IF NOT EXISTS idx_metrics_name ON metrics(name);
CREATE INDEX IF NOT EXISTS idx_metrics_timestamp ON metrics(timestamp);
",
)?;
Ok(())
}
pub fn insert_spans(&self, spans: &[Span]) -> Result<()> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare(
"INSERT OR REPLACE INTO spans
(trace_id, span_id, parent_span_id, service, operation,
start_time, end_time, duration_ms, status, attributes, events,
kind, llm)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
)?;
for span in spans {
let attrs = serde_json::to_string(&span.attributes)?;
let events = serde_json::to_string(&span.events)?;
let llm = span.llm.as_ref().map(serde_json::to_string).transpose()?;
stmt.execute(params![
span.trace_id,
span.span_id,
span.parent_span_id,
span.service,
span.operation,
span.start_time.format("%Y-%m-%d %H:%M:%S%.6f").to_string(),
span.end_time.format("%Y-%m-%d %H:%M:%S%.6f").to_string(),
span.duration_ms,
span.status.to_string(),
attrs,
events,
span.kind.to_string(),
llm,
])?;
}
Ok(())
}
pub fn query_traces(&self, query: &TraceQuery) -> Result<Vec<Span>> {
let conn = self.conn.lock().unwrap();
let mut sql = String::from(
"SELECT trace_id, span_id, parent_span_id, service, operation,
start_time::VARCHAR, end_time::VARCHAR, duration_ms, status, attributes, events,
kind, llm
FROM spans WHERE 1=1",
);
let mut param_values: Vec<Box<dyn duckdb::ToSql>> = Vec::new();
if let Some(ref svc) = query.service {
sql.push_str(" AND service = ?");
param_values.push(Box::new(svc.clone()));
}
if let Some(ref op) = query.operation {
sql.push_str(" AND operation LIKE ?");
param_values.push(Box::new(format!("%{op}%")));
}
if let Some(min) = query.min_duration_ms {
sql.push_str(" AND duration_ms >= ?");
param_values.push(Box::new(min));
}
if let Some(max) = query.max_duration_ms {
sql.push_str(" AND duration_ms <= ?");
param_values.push(Box::new(max));
}
if let Some(ref status) = query.status {
sql.push_str(" AND status = ?");
param_values.push(Box::new(status.clone()));
}
if let Some(secs) = query.last_seconds {
sql.push_str(&format!(
" AND start_time >= current_timestamp::TIMESTAMP - INTERVAL '{secs} seconds'"
));
}
for (k, v) in &query.attributes {
sql.push_str(" AND json_extract_string(attributes, ?) = ?");
param_values.push(Box::new(json_path_for_key(k)));
param_values.push(Box::new(v.clone()));
}
sql.push_str(" ORDER BY start_time DESC");
let limit = query.limit.unwrap_or(100);
sql.push_str(&format!(" LIMIT {limit}"));
let params_ref: Vec<&dyn duckdb::ToSql> = param_values.iter().map(|p| p.as_ref()).collect();
let mut stmt = conn.prepare(&sql)?;
let rows = stmt.query_map(params_ref.as_slice(), row_to_span)?;
let mut spans = Vec::new();
for row in rows {
spans.push(row?);
}
Ok(spans)
}
pub fn get_trace(&self, trace_id: &str) -> Result<Vec<Span>> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT trace_id, span_id, parent_span_id, service, operation,
start_time::VARCHAR, end_time::VARCHAR, duration_ms, status, attributes, events,
kind, llm
FROM spans
WHERE trace_id = ?
ORDER BY start_time ASC",
)?;
let rows = stmt.query_map(params![trace_id], row_to_span)?;
let mut spans = Vec::new();
for row in rows {
spans.push(row?);
}
Ok(spans)
}
pub fn add_comment(
&self,
trace_id: &str,
span_id: Option<&str>,
author: &str,
body: &str,
) -> Result<TraceComment> {
let conn = self.conn.lock().unwrap();
let id = uuid::Uuid::new_v4().to_string();
conn.execute(
"INSERT INTO trace_comments (id, trace_id, span_id, author, body)
VALUES (?, ?, ?, ?, ?)",
params![id, trace_id, span_id, author, body],
)?;
let mut stmt = conn.prepare(
"SELECT id, trace_id, span_id, author, body, created_at::VARCHAR
FROM trace_comments WHERE id = ?",
)?;
let comment = stmt.query_row(params![id], |row| {
Ok(TraceComment {
id: row.get(0)?,
trace_id: row.get(1)?,
span_id: row.get(2)?,
author: row.get(3)?,
body: row.get(4)?,
created_at: row.get(5)?,
})
})?;
Ok(comment)
}
pub fn get_comments(&self, trace_id: &str) -> Result<Vec<TraceComment>> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT id, trace_id, span_id, author, body, created_at::VARCHAR
FROM trace_comments
WHERE trace_id = ?
ORDER BY created_at ASC",
)?;
let rows = stmt.query_map(params![trace_id], |row| {
Ok(TraceComment {
id: row.get(0)?,
trace_id: row.get(1)?,
span_id: row.get(2)?,
author: row.get(3)?,
body: row.get(4)?,
created_at: row.get(5)?,
})
})?;
let mut comments = Vec::new();
for row in rows {
comments.push(row?);
}
Ok(comments)
}
pub fn list_services(&self) -> Result<Vec<ServiceInfo>> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT service,
COUNT(*) as span_count,
COUNT(DISTINCT trace_id) as trace_count,
AVG(duration_ms) as avg_duration,
SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END)::DOUBLE / COUNT(*)::DOUBLE as error_rate
FROM spans
GROUP BY service
ORDER BY span_count DESC",
)?;
let rows = stmt.query_map([], |row| {
Ok(ServiceInfo {
name: row.get(0)?,
span_count: row.get(1)?,
trace_count: row.get(2)?,
avg_duration_ms: row.get(3)?,
error_rate: row.get(4)?,
})
})?;
let mut services = Vec::new();
for row in rows {
services.push(row?);
}
Ok(services)
}
pub fn insert_logs(&self, logs: &[LogRecord]) -> Result<()> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare(
"INSERT INTO logs
(timestamp, observed_timestamp, trace_id, span_id, severity,
severity_text, body, service, attributes, body_sha256)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
)?;
for log in logs {
let attrs = serde_json::to_string(&log.attributes)?;
stmt.execute(params![
log.timestamp.format("%Y-%m-%d %H:%M:%S%.6f").to_string(),
log.observed_timestamp
.format("%Y-%m-%d %H:%M:%S%.6f")
.to_string(),
log.trace_id,
log.span_id,
log.severity.to_string(),
log.severity_text,
log.body,
log.service,
attrs,
log.body_sha256,
])?;
}
Ok(())
}
pub fn query_logs(&self, query: &LogQuery) -> Result<Vec<LogRecord>> {
let conn = self.conn.lock().unwrap();
let mut sql = String::from(
"SELECT timestamp::VARCHAR, observed_timestamp::VARCHAR, trace_id, span_id,
severity, severity_text, body, service, attributes, body_sha256
FROM logs WHERE 1=1",
);
let mut param_values: Vec<Box<dyn duckdb::ToSql>> = Vec::new();
if let Some(ref svc) = query.service {
sql.push_str(" AND service = ?");
param_values.push(Box::new(svc.clone()));
}
if let Some(ref sev) = query.severity {
sql.push_str(" AND severity = ?");
param_values.push(Box::new(sev.clone()));
}
if let Some(ref body) = query.body_contains {
sql.push_str(" AND body LIKE ?");
param_values.push(Box::new(format!("%{body}%")));
}
if let Some(ref tid) = query.trace_id {
sql.push_str(" AND trace_id = ?");
param_values.push(Box::new(tid.clone()));
}
if let Some(secs) = query.last_seconds {
sql.push_str(&format!(
" AND timestamp >= current_timestamp::TIMESTAMP - INTERVAL '{secs} seconds'"
));
}
sql.push_str(" ORDER BY timestamp DESC");
let limit = query.limit.unwrap_or(100);
sql.push_str(&format!(" LIMIT {limit}"));
let params_ref: Vec<&dyn duckdb::ToSql> = param_values.iter().map(|p| p.as_ref()).collect();
let mut stmt = conn.prepare(&sql)?;
let rows = stmt.query_map(params_ref.as_slice(), |row| {
let ts_str: String = row.get(0)?;
let obs_str: String = row.get(1)?;
let attrs_str: String = row.get(8)?;
let severity_str: String = row.get(4)?;
Ok(LogRecord {
timestamp: parse_timestamp(&ts_str),
observed_timestamp: parse_timestamp(&obs_str),
trace_id: row.get(2)?,
span_id: row.get(3)?,
severity: LogSeverity::from_str(&severity_str),
severity_text: row.get(5)?,
body: row.get(6)?,
service: row.get(7)?,
attributes: serde_json::from_str(&attrs_str).unwrap_or_default(),
body_sha256: row.get(9)?,
})
})?;
let mut logs = Vec::new();
for row in rows {
logs.push(row?);
}
Ok(logs)
}
pub fn insert_metrics(&self, metrics: &[MetricPoint]) -> Result<()> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare(
"INSERT INTO metrics
(timestamp, service, name, metric_type, value, unit, attributes)
VALUES (?, ?, ?, ?, ?, ?, ?)",
)?;
for m in metrics {
let attrs = serde_json::to_string(&m.attributes)?;
stmt.execute(params![
m.timestamp.format("%Y-%m-%d %H:%M:%S%.6f").to_string(),
m.service,
m.name,
m.metric_type.to_string(),
m.value,
m.unit,
attrs,
])?;
}
Ok(())
}
pub fn query_metrics(&self, query: &MetricQuery) -> Result<Vec<MetricPoint>> {
let conn = self.conn.lock().unwrap();
let mut sql = String::from(
"SELECT timestamp::VARCHAR, service, name, metric_type, value, unit, attributes
FROM metrics WHERE 1=1",
);
let mut param_values: Vec<Box<dyn duckdb::ToSql>> = Vec::new();
if let Some(ref svc) = query.service {
sql.push_str(" AND service = ?");
param_values.push(Box::new(svc.clone()));
}
if let Some(ref name) = query.name {
sql.push_str(" AND name = ?");
param_values.push(Box::new(name.clone()));
}
if let Some(ref mt) = query.metric_type {
sql.push_str(" AND metric_type = ?");
param_values.push(Box::new(mt.clone()));
}
if let Some(secs) = query.last_seconds {
sql.push_str(&format!(
" AND timestamp >= current_timestamp::TIMESTAMP - INTERVAL '{secs} seconds'"
));
}
sql.push_str(" ORDER BY timestamp DESC");
let limit = query.limit.unwrap_or(500);
sql.push_str(&format!(" LIMIT {limit}"));
let params_ref: Vec<&dyn duckdb::ToSql> = param_values.iter().map(|p| p.as_ref()).collect();
let mut stmt = conn.prepare(&sql)?;
let rows = stmt.query_map(params_ref.as_slice(), |row| {
let ts_str: String = row.get(0)?;
let mt_str: String = row.get(3)?;
let attrs_str: String = row.get(6)?;
Ok(MetricPoint {
timestamp: parse_timestamp(&ts_str),
service: row.get(1)?,
name: row.get(2)?,
metric_type: MetricType::from_str(&mt_str),
value: row.get(4)?,
unit: row.get(5)?,
attributes: serde_json::from_str(&attrs_str).unwrap_or_default(),
})
})?;
let mut metrics = Vec::new();
for row in rows {
metrics.push(row?);
}
Ok(metrics)
}
pub fn query_summary(&self, last_seconds: i64, service: Option<&str>) -> Result<SummaryReport> {
let conn = self.conn.lock().unwrap();
let span_time_clause = format!(
"start_time >= current_timestamp::TIMESTAMP - INTERVAL '{last_seconds} seconds'"
);
let ts_time_clause = format!(
"timestamp >= current_timestamp::TIMESTAMP - INTERVAL '{last_seconds} seconds'"
);
let svc_clause = if service.is_some() {
" AND service = ?"
} else {
""
};
let svc_owned = service.map(|s| s.to_string());
let build_params = || -> Vec<&dyn duckdb::ToSql> {
match &svc_owned {
Some(s) => vec![s as &dyn duckdb::ToSql],
None => vec![],
}
};
let traces_sql = format!(
"SELECT
COUNT(*)::BIGINT,
COUNT(DISTINCT trace_id)::BIGINT,
SUM(CASE WHEN status='error' THEN 1 ELSE 0 END)::BIGINT,
COALESCE(AVG(duration_ms), 0.0),
COALESCE(MAX(duration_ms), 0.0),
COALESCE(quantile_cont(duration_ms, 0.5), 0.0),
COALESCE(quantile_cont(duration_ms, 0.95), 0.0),
COALESCE(quantile_cont(duration_ms, 0.99), 0.0)
FROM spans WHERE {span_time_clause}{svc_clause}"
);
let mut stmt = conn.prepare(&traces_sql)?;
let traces: TraceSummary = stmt.query_row(build_params().as_slice(), |row| {
let span_count: i64 = row.get::<_, Option<i64>>(0)?.unwrap_or(0);
let error_count: i64 = row.get::<_, Option<i64>>(2)?.unwrap_or(0);
let error_rate = if span_count > 0 {
error_count as f64 / span_count as f64
} else {
0.0
};
Ok(TraceSummary {
span_count,
trace_count: row.get::<_, Option<i64>>(1)?.unwrap_or(0),
error_count,
error_rate,
avg_ms: row.get(3)?,
max_ms: row.get(4)?,
p50_ms: row.get(5)?,
p95_ms: row.get(6)?,
p99_ms: row.get(7)?,
})
})?;
drop(stmt);
let top_svc_sql = format!(
"SELECT service,
COUNT(*)::BIGINT,
SUM(CASE WHEN status='error' THEN 1 ELSE 0 END)::DOUBLE / NULLIF(COUNT(*), 0)::DOUBLE,
COALESCE(quantile_cont(duration_ms, 0.95), 0.0)
FROM spans WHERE {span_time_clause}{svc_clause}
GROUP BY service
ORDER BY 2 DESC
LIMIT 5"
);
let mut stmt = conn.prepare(&top_svc_sql)?;
let top_services: Vec<ServiceSummary> = stmt
.query_map(build_params().as_slice(), |row| {
Ok(ServiceSummary {
service: row.get(0)?,
span_count: row.get(1)?,
error_rate: row.get::<_, Option<f64>>(2)?.unwrap_or(0.0),
p95_ms: row.get(3)?,
})
})?
.collect::<duckdb::Result<_>>()?;
drop(stmt);
let err_op_sql = format!(
"SELECT service, operation, COUNT(*)::BIGINT
FROM spans
WHERE status='error' AND {span_time_clause}{svc_clause}
GROUP BY service, operation
ORDER BY 3 DESC
LIMIT 5"
);
let mut stmt = conn.prepare(&err_op_sql)?;
let top_error_operations: Vec<ErrorOperation> = stmt
.query_map(build_params().as_slice(), |row| {
Ok(ErrorOperation {
service: row.get(0)?,
operation: row.get(1)?,
error_count: row.get(2)?,
})
})?
.collect::<duckdb::Result<_>>()?;
drop(stmt);
let logs_sql = format!(
"SELECT
COUNT(*)::BIGINT,
SUM(CASE WHEN severity IN ('error','fatal') THEN 1 ELSE 0 END)::BIGINT,
SUM(CASE WHEN severity='warn' THEN 1 ELSE 0 END)::BIGINT,
SUM(CASE WHEN severity='info' THEN 1 ELSE 0 END)::BIGINT,
SUM(CASE WHEN severity IN ('debug','trace') THEN 1 ELSE 0 END)::BIGINT
FROM logs WHERE {ts_time_clause}{svc_clause}"
);
let mut stmt = conn.prepare(&logs_sql)?;
let logs: LogSummary = stmt.query_row(build_params().as_slice(), |row| {
Ok(LogSummary {
total: row.get::<_, Option<i64>>(0)?.unwrap_or(0),
error: row.get::<_, Option<i64>>(1)?.unwrap_or(0),
warn: row.get::<_, Option<i64>>(2)?.unwrap_or(0),
info: row.get::<_, Option<i64>>(3)?.unwrap_or(0),
debug: row.get::<_, Option<i64>>(4)?.unwrap_or(0),
})
})?;
drop(stmt);
let metrics_sql = format!(
"SELECT COUNT(*)::BIGINT, COUNT(DISTINCT name)::BIGINT
FROM metrics WHERE {ts_time_clause}{svc_clause}"
);
let mut stmt = conn.prepare(&metrics_sql)?;
let metrics: MetricSummary = stmt.query_row(build_params().as_slice(), |row| {
Ok(MetricSummary {
point_count: row.get::<_, Option<i64>>(0)?.unwrap_or(0),
unique_names: row.get::<_, Option<i64>>(1)?.unwrap_or(0),
})
})?;
drop(stmt);
Ok(SummaryReport {
window_seconds: last_seconds,
service_filter: service.map(|s| s.to_string()),
traces,
top_services,
top_error_operations,
logs,
metrics,
})
}
pub fn query_anomalies(
&self,
current_seconds: i64,
baseline_seconds: i64,
service: Option<&str>,
) -> Result<AnomalyReport> {
let conn = self.conn.lock().unwrap();
let svc_owned = service.map(|s| s.to_string());
let svc_clause = if service.is_some() {
" AND service = ?"
} else {
""
};
let stats_sql = |window: i64| -> String {
format!(
"SELECT service,
COUNT(*)::BIGINT AS span_count,
SUM(CASE WHEN status='error' THEN 1 ELSE 0 END)::DOUBLE
/ NULLIF(COUNT(*), 0)::DOUBLE AS error_rate,
COALESCE(quantile_cont(duration_ms, 0.95), 0.0) AS p95_ms
FROM spans
WHERE start_time >= current_timestamp::TIMESTAMP - INTERVAL '{window} seconds'{svc_clause}
GROUP BY service"
)
};
let build_params = || -> Vec<&dyn duckdb::ToSql> {
match &svc_owned {
Some(s) => vec![s as &dyn duckdb::ToSql],
None => vec![],
}
};
let mut current_stats: std::collections::HashMap<String, (i64, f64, f64)> =
std::collections::HashMap::new();
let mut stmt = conn.prepare(&stats_sql(current_seconds))?;
let rows = stmt.query_map(build_params().as_slice(), |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, i64>(1)?,
row.get::<_, Option<f64>>(2)?.unwrap_or(0.0),
row.get::<_, f64>(3)?,
))
})?;
for r in rows {
let (svc, span_count, error_rate, p95) = r?;
current_stats.insert(svc, (span_count, error_rate, p95));
}
drop(stmt);
let mut baseline_stats: std::collections::HashMap<String, (i64, f64, f64)> =
std::collections::HashMap::new();
let mut stmt = conn.prepare(&stats_sql(baseline_seconds))?;
let rows = stmt.query_map(build_params().as_slice(), |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, i64>(1)?,
row.get::<_, Option<f64>>(2)?.unwrap_or(0.0),
row.get::<_, f64>(3)?,
))
})?;
for r in rows {
let (svc, span_count, error_rate, p95) = r?;
baseline_stats.insert(svc, (span_count, error_rate, p95));
}
drop(stmt);
let mut anomalies = Vec::new();
for (svc, (cur_count, cur_err, cur_p95)) in ¤t_stats {
if *cur_count < 5 {
continue;
}
let (base_count, base_err, base_p95) =
baseline_stats.get(svc).copied().unwrap_or((0, 0.0, 0.0));
let err_delta = cur_err - base_err;
if err_delta >= 0.05 && *cur_err > 0.0 {
let severity = if err_delta >= 0.25 {
"critical"
} else if err_delta >= 0.10 {
"warning"
} else {
"info"
};
anomalies.push(Anomaly {
service: svc.clone(),
kind: "error_rate".into(),
severity: severity.into(),
current: *cur_err,
baseline: base_err,
delta: err_delta,
description: format!(
"Error rate rose {:.1}% → {:.1}% (baseline {:.1}%, {} spans)",
base_err * 100.0,
cur_err * 100.0,
base_err * 100.0,
cur_count
),
});
}
if base_p95 > 0.0 && base_count >= 5 {
let ratio = cur_p95 / base_p95;
if ratio >= 1.5 && *cur_p95 > 50.0 {
let severity = if ratio >= 3.0 {
"critical"
} else if ratio >= 2.0 {
"warning"
} else {
"info"
};
anomalies.push(Anomaly {
service: svc.clone(),
kind: "latency_p95".into(),
severity: severity.into(),
current: *cur_p95,
baseline: base_p95,
delta: cur_p95 - base_p95,
description: format!(
"p95 latency {:.1}ms → {:.1}ms ({:.1}× baseline)",
base_p95, cur_p95, ratio
),
});
}
}
}
anomalies.sort_by(|a, b| {
let rank = |s: &str| match s {
"critical" => 0,
"warning" => 1,
_ => 2,
};
rank(&a.severity).cmp(&rank(&b.severity))
});
Ok(AnomalyReport {
current_seconds,
baseline_seconds,
service_filter: svc_owned,
anomalies,
})
}
pub fn query_correlate(&self, trace_id: &str) -> Result<Option<CorrelateReport>> {
let spans = self.get_trace(trace_id)?;
if spans.is_empty() {
return Ok(None);
}
let start_time = spans
.iter()
.map(|s| s.start_time)
.min()
.unwrap_or_else(Utc::now);
let end_time = spans
.iter()
.map(|s| s.end_time)
.max()
.unwrap_or_else(Utc::now);
let duration_ms = (end_time - start_time).num_milliseconds() as f64;
let error_count = spans
.iter()
.filter(|s| matches!(s.status, SpanStatus::Error))
.count() as i64;
let mut services: Vec<String> = spans.iter().map(|s| s.service.clone()).collect();
services.sort();
services.dedup();
let logs = self.query_logs(&LogQuery {
trace_id: Some(trace_id.to_string()),
limit: Some(500),
..Default::default()
})?;
let metrics = {
let conn = self.conn.lock().unwrap();
let start = start_time
.naive_utc()
.format("%Y-%m-%d %H:%M:%S%.6f")
.to_string();
let end = end_time
.naive_utc()
.format("%Y-%m-%d %H:%M:%S%.6f")
.to_string();
let placeholders = std::iter::repeat("?")
.take(services.len())
.collect::<Vec<_>>()
.join(",");
let sql = format!(
"SELECT timestamp::VARCHAR, service, name, metric_type, value, unit, attributes
FROM metrics
WHERE timestamp BETWEEN ?::TIMESTAMP AND ?::TIMESTAMP
AND service IN ({placeholders})
ORDER BY timestamp ASC
LIMIT 500"
);
let mut stmt = conn.prepare(&sql)?;
let mut params_vec: Vec<&dyn duckdb::ToSql> =
vec![&start as &dyn duckdb::ToSql, &end as &dyn duckdb::ToSql];
for s in &services {
params_vec.push(s as &dyn duckdb::ToSql);
}
let rows = stmt.query_map(params_vec.as_slice(), |row| {
let ts_str: String = row.get(0)?;
let mt_str: String = row.get(3)?;
let attrs_str: String = row.get(6)?;
Ok(MetricPoint {
timestamp: parse_timestamp(&ts_str),
service: row.get(1)?,
name: row.get(2)?,
metric_type: MetricType::from_str(&mt_str),
value: row.get(4)?,
unit: row.get(5)?,
attributes: serde_json::from_str(&attrs_str).unwrap_or_default(),
})
})?;
let mut out = Vec::new();
for r in rows {
out.push(r?);
}
out
};
Ok(Some(CorrelateReport {
trace_id: trace_id.to_string(),
span_count: spans.len(),
services,
start_time: start_time.to_rfc3339(),
end_time: end_time.to_rfc3339(),
duration_ms,
error_count,
logs,
metrics,
}))
}
}
impl DuckDbStore {
pub fn query_sql(&self, sql: &str) -> Result<Vec<serde_json::Value>> {
let trimmed = sql.trim_start();
let head = trimmed
.split_whitespace()
.next()
.unwrap_or("")
.to_ascii_uppercase();
if head != "SELECT" && head != "WITH" {
anyhow::bail!("only read-only SELECT/WITH queries are allowed");
}
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare(sql)?;
let mut rows = stmt.query([])?;
let column_names: Vec<String> = match rows.as_ref() {
Some(s) => (0..s.column_count())
.map(|i| {
s.column_name(i)
.cloned()
.unwrap_or_else(|_| "?".to_string())
})
.collect(),
None => Vec::new(),
};
let mut out = Vec::new();
while let Some(row) = rows.next()? {
let mut obj = serde_json::Map::with_capacity(column_names.len());
for (i, name) in column_names.iter().enumerate() {
let value: duckdb::types::Value = row.get(i)?;
obj.insert(name.clone(), duck_value_to_json(value));
}
out.push(serde_json::Value::Object(obj));
}
Ok(out)
}
}
fn duck_value_to_json(v: duckdb::types::Value) -> serde_json::Value {
use duckdb::types::Value as V;
use serde_json::Value as J;
match v {
V::Null => J::Null,
V::Boolean(b) => J::Bool(b),
V::TinyInt(n) => J::from(n),
V::SmallInt(n) => J::from(n),
V::Int(n) => J::from(n),
V::BigInt(n) => J::from(n),
V::UTinyInt(n) => J::from(n),
V::USmallInt(n) => J::from(n),
V::UInt(n) => J::from(n),
V::UBigInt(n) => J::from(n),
V::Float(f) => serde_json::Number::from_f64(f as f64)
.map(J::Number)
.unwrap_or(J::Null),
V::Double(f) => serde_json::Number::from_f64(f)
.map(J::Number)
.unwrap_or(J::Null),
V::Text(s) => J::String(s),
other => J::String(format!("{other:?}")),
}
}
impl Store for DuckDbStore {
fn insert_spans(&self, spans: &[Span]) -> Result<()> {
DuckDbStore::insert_spans(self, spans)
}
fn query_traces(&self, query: &TraceQuery) -> Result<Vec<Span>> {
DuckDbStore::query_traces(self, query)
}
fn get_trace(&self, trace_id: &str) -> Result<Vec<Span>> {
DuckDbStore::get_trace(self, trace_id)
}
fn add_comment(
&self,
trace_id: &str,
span_id: Option<&str>,
author: &str,
body: &str,
) -> Result<TraceComment> {
DuckDbStore::add_comment(self, trace_id, span_id, author, body)
}
fn get_comments(&self, trace_id: &str) -> Result<Vec<TraceComment>> {
DuckDbStore::get_comments(self, trace_id)
}
fn list_services(&self) -> Result<Vec<ServiceInfo>> {
DuckDbStore::list_services(self)
}
fn insert_logs(&self, logs: &[LogRecord]) -> Result<()> {
DuckDbStore::insert_logs(self, logs)
}
fn query_logs(&self, query: &LogQuery) -> Result<Vec<LogRecord>> {
DuckDbStore::query_logs(self, query)
}
fn insert_metrics(&self, metrics: &[MetricPoint]) -> Result<()> {
DuckDbStore::insert_metrics(self, metrics)
}
fn query_metrics(&self, query: &MetricQuery) -> Result<Vec<MetricPoint>> {
DuckDbStore::query_metrics(self, query)
}
fn query_summary(&self, last_seconds: i64, service: Option<&str>) -> Result<SummaryReport> {
DuckDbStore::query_summary(self, last_seconds, service)
}
fn query_anomalies(
&self,
current_seconds: i64,
baseline_seconds: i64,
service: Option<&str>,
) -> Result<AnomalyReport> {
DuckDbStore::query_anomalies(self, current_seconds, baseline_seconds, service)
}
fn query_correlate(&self, trace_id: &str) -> Result<Option<CorrelateReport>> {
DuckDbStore::query_correlate(self, trace_id)
}
fn query_sql(&self, sql: &str) -> Result<Vec<serde_json::Value>> {
DuckDbStore::query_sql(self, sql)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
fn span(trace_id: &str, attrs: &[(&str, &str)]) -> Span {
let now = Utc::now();
let mut attributes = HashMap::new();
for (k, v) in attrs {
attributes.insert((*k).to_string(), (*v).to_string());
}
Span {
trace_id: trace_id.into(),
span_id: format!("{trace_id}-span"),
parent_span_id: None,
service: "svc".into(),
operation: "op".into(),
start_time: now,
end_time: now,
duration_ms: 1.0,
status: SpanStatus::Ok,
attributes,
events: vec![],
kind: SpanKind::Internal,
llm: None,
}
}
fn store() -> DuckDbStore {
let dir = tempfile::tempdir().unwrap();
DuckDbStore::new(dir.path().to_str().unwrap()).unwrap()
}
#[test]
fn attribute_filter_matches_dotted_key() {
let s = store();
s.insert_spans(&[
span("a", &[("http.method", "GET"), ("http.status_code", "200")]),
span("b", &[("http.method", "POST"), ("http.status_code", "500")]),
])
.unwrap();
let q = TraceQuery {
attributes: vec![("http.method".into(), "GET".into())],
..Default::default()
};
let got = s.query_traces(&q).unwrap();
assert_eq!(got.len(), 1);
assert_eq!(got[0].trace_id, "a");
}
#[test]
fn attribute_filters_are_anded() {
let s = store();
s.insert_spans(&[
span("a", &[("http.method", "GET"), ("http.status_code", "200")]),
span("b", &[("http.method", "GET"), ("http.status_code", "500")]),
])
.unwrap();
let q = TraceQuery {
attributes: vec![
("http.method".into(), "GET".into()),
("http.status_code".into(), "500".into()),
],
..Default::default()
};
let got = s.query_traces(&q).unwrap();
assert_eq!(got.len(), 1);
assert_eq!(got[0].trace_id, "b");
}
#[test]
fn attribute_filter_misses_when_value_differs() {
let s = store();
s.insert_spans(&[span("a", &[("env", "prod")])]).unwrap();
let q = TraceQuery {
attributes: vec![("env".into(), "staging".into())],
..Default::default()
};
assert!(s.query_traces(&q).unwrap().is_empty());
}
#[test]
fn llm_span_round_trips() {
use super::super::models::{LlmOperation, LlmSpan};
let s = store();
let mut sp = span("llm", &[]);
sp.kind = SpanKind::Llm;
sp.llm = Some(LlmSpan {
provider: "anthropic".into(),
model: "claude-opus-4-7".into(),
operation: LlmOperation::Chat,
input_tokens: Some(1200),
output_tokens: Some(340),
total_tokens: Some(1540),
cost_usd: Some(0.0185),
..Default::default()
});
s.insert_spans(&[sp]).unwrap();
let got = s.get_trace("llm").unwrap();
assert_eq!(got.len(), 1);
assert_eq!(got[0].kind, SpanKind::Llm);
let llm = got[0].llm.as_ref().expect("llm extension preserved");
assert_eq!(llm.model, "claude-opus-4-7");
assert_eq!(llm.total_tokens, Some(1540));
assert_eq!(llm.cost_usd, Some(0.0185));
assert_eq!(llm.operation, LlmOperation::Chat);
}
#[test]
fn non_llm_span_has_no_extension_after_round_trip() {
let s = store();
s.insert_spans(&[span("plain", &[("k", "v")])]).unwrap();
let got = s.get_trace("plain").unwrap();
assert_eq!(got.len(), 1);
assert_eq!(got[0].kind, SpanKind::Internal);
assert!(got[0].llm.is_none());
}
#[test]
fn query_sql_reads_and_rejects_mutations() {
let s = store();
s.insert_spans(&[span("a", &[("env", "prod")]), span("b", &[("env", "prod")])])
.unwrap();
let rows = s
.query_sql("SELECT service, COUNT(*) AS n FROM spans GROUP BY service")
.unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(rows[0]["service"], serde_json::json!("svc"));
assert_eq!(rows[0]["n"], serde_json::json!(2));
assert!(s.query_sql("DELETE FROM spans").is_err());
assert!(s.query_sql("DROP TABLE spans").is_err());
}
#[test]
fn log_body_sha256_round_trips() {
let s = store();
let log = LogRecord {
timestamp: Utc::now(),
observed_timestamp: Utc::now(),
trace_id: Some("t1".into()),
span_id: None,
severity: LogSeverity::Error,
severity_text: "ERROR".into(),
body: String::new(), service: "svc".into(),
attributes: HashMap::new(),
body_sha256: Some("abc123".into()),
};
s.insert_logs(&[log]).unwrap();
let got = s
.query_logs(&LogQuery {
trace_id: Some("t1".into()),
..Default::default()
})
.unwrap();
assert_eq!(got.len(), 1);
assert_eq!(got[0].body_sha256.as_deref(), Some("abc123"));
assert!(got[0].body.is_empty());
}
}