Skip to main content

tael_server/storage/backend/
mod.rs

1//! `TaelBackend` — the purpose-built storage engine (see
2//! `docs/tael-backend-design.md`). Built incrementally behind the `Store`
3//! trait so it ships opt-in alongside `DuckDbStore`.
4//!
5//! **Phase 3:** durable WAL write path + crash-gap replay.
6//! **Phase 4 (this file):** an `fjall` LSM hot tier serves the core per-signal
7//! reads (`query_traces`, `get_trace`, `list_services`, `query_logs`,
8//! `query_metrics`). The legacy DuckDB projection is available behind the
9//! `duckdb` Cargo feature, but the default backend is self-contained so the
10//! default CLI install does not compile or link DuckDB.
11
12mod cold;
13mod hot;
14mod wal;
15
16use anyhow::{Context, Result, bail};
17
18#[cfg(feature = "duckdb")]
19use super::DuckDbStore;
20use super::Store;
21use super::models::{
22    Anomaly, AnomalyReport, CorrelateReport, ErrorOperation, LogQuery, LogRecord, LogSeverity,
23    LogSummary, MetricPoint, MetricQuery, MetricSummary, ServiceInfo, ServiceSummary, Span,
24    SpanStatus, SummaryReport, TraceComment, TraceQuery, TraceSummary,
25};
26use std::collections::{HashMap, HashSet};
27use std::fs::OpenOptions;
28use std::io::{BufRead, Write};
29use std::path::{Path, PathBuf};
30use std::sync::{Arc, Mutex};
31
32use super::SearchIndex;
33use cold::ColdTier;
34use hot::HotTier;
35use wal::WalLog;
36pub use wal::{WalRecord, WalSink};
37
38pub struct TaelBackend {
39    /// LSM hot tier — serves recent reads.
40    hot: HotTier,
41    /// Parquet cold tier — aged spans rolled out of the hot tier.
42    cold: ColdTier,
43    /// Optional legacy projection used only when the `duckdb` feature is enabled.
44    #[cfg(feature = "duckdb")]
45    inner: DuckDbStore,
46    comments: CommentStore,
47    /// Full-text index over LLM payloads (shared with the ingest path).
48    search: Arc<SearchIndex>,
49    wal: WalLog,
50}
51
52impl TaelBackend {
53    pub fn new(data_dir: &str) -> Result<Self> {
54        Self::with_wal_key(data_dir, "tael-backend")
55    }
56
57    /// Like [`Self::new`] but with an explicit WAL namespace key — lets tests
58    /// run isolated instances (the WAL key is process-global in walrus).
59    pub fn with_wal_key(data_dir: &str, wal_key: &str) -> Result<Self> {
60        Self::with_wal_key_and_sinks(data_dir, wal_key, Vec::new(), None)
61    }
62
63    /// Like [`Self::with_wal_key`] but with WAL replication sinks attached: this
64    /// backend runs as a **leader** that ships every appended record to its
65    /// standbys before acking the write (`docs/tael-server-scaling-ha.md` §5.1).
66    /// `required_acks` is how many standbys must confirm before a write returns
67    /// (`None` = all = fully synchronous; `Some(0)` = async best-effort). With
68    /// no sinks the write path is unchanged. A standby on the receiving end
69    /// applies shipped records via `Store::apply_framed_wal`.
70    pub fn with_wal_key_and_sinks(
71        data_dir: &str,
72        wal_key: &str,
73        sinks: Vec<Arc<dyn WalSink>>,
74        required_acks: Option<usize>,
75    ) -> Result<Self> {
76        let hot = HotTier::open(data_dir)?;
77        let cold = ColdTier::open(data_dir)?;
78        #[cfg(feature = "duckdb")]
79        let inner = DuckDbStore::new(data_dir)?;
80        let comments = CommentStore::open(data_dir)?;
81        let search = Arc::new(SearchIndex::open(data_dir)?);
82        let mut wal = if sinks.is_empty() {
83            WalLog::new_for_key(wal_key)?
84        } else {
85            WalLog::new_for_key_with_sinks(wal_key, sinks)?
86        };
87        if let Some(n) = required_acks {
88            wal = wal.with_required_acks(n);
89        }
90        let backend = Self {
91            hot,
92            cold,
93            #[cfg(feature = "duckdb")]
94            inner,
95            comments,
96            search,
97            wal,
98        };
99        backend.replay()?;
100        Ok(backend)
101    }
102
103    /// The shared payload search index — handed to the ingest path so prompt/
104    /// completion text is indexed at write time (the text isn't retained on the
105    /// span itself, only its blob hashes).
106    pub fn search_index(&self) -> Arc<SearchIndex> {
107        Arc::clone(&self.search)
108    }
109
110    /// Roll spans older than `cutoff` out of the LSM hot tier into Parquet.
111    /// Returns the number of spans compacted. Safe to call repeatedly.
112    pub fn compact_spans(&self, cutoff: chrono::DateTime<chrono::Utc>) -> Result<usize> {
113        let cutoff_ns = cutoff.timestamp_nanos_opt().unwrap_or(0);
114        let evicted = self.hot.evict_spans_before(cutoff_ns)?;
115        if evicted.is_empty() {
116            return Ok(0);
117        }
118        // Write to cold first, then the hot eviction is already done; if we
119        // crash between, the spans remain in the DuckDB projection and the WAL.
120        self.cold.write_spans(&evicted)?;
121        tracing::info!(
122            spans = evicted.len(),
123            "tael-backend: compacted spans to cold tier"
124        );
125        Ok(evicted.len())
126    }
127
128    /// Roll aged logs/metrics out of the hot tier into Parquet. Returns the
129    /// total number of records compacted across both signals.
130    pub fn compact_logs_metrics(&self, cutoff: chrono::DateTime<chrono::Utc>) -> Result<usize> {
131        let cutoff_ns = cutoff.timestamp_nanos_opt().unwrap_or(0);
132        let logs = self.hot.evict_logs_before(cutoff_ns)?;
133        if !logs.is_empty() {
134            self.cold.write_logs(&logs)?;
135        }
136        let metrics = self.hot.evict_metrics_before(cutoff_ns)?;
137        if !metrics.is_empty() {
138            self.cold.write_metrics(&metrics)?;
139            // Downsample to 5m rollups alongside the raw cold write, so trends
140            // survive once raw points are dropped by retention.
141            self.cold.write_downsampled(&metrics)?;
142        }
143        let n = logs.len() + metrics.len();
144        if n > 0 {
145            tracing::info!(
146                logs = logs.len(),
147                metrics = metrics.len(),
148                "tael-backend: compacted logs/metrics to cold tier"
149            );
150        }
151        Ok(n)
152    }
153
154    /// Collect every blob hash still referenced by a live row — LLM prompt and
155    /// completion hashes on spans, and `body_sha256` on logs — across hot and
156    /// cold tiers. Drives blob GC (anything not here is unreferenced).
157    pub fn collect_live_blob_hashes(&self) -> Result<std::collections::HashSet<String>> {
158        use super::models::{LogQuery, TraceQuery};
159        let mut live = std::collections::HashSet::new();
160        // Spans (hot∪cold via the unioned read path), with a high limit.
161        let spans = self.query_traces(&TraceQuery {
162            limit: Some(u32::MAX),
163            ..Default::default()
164        })?;
165        for s in spans {
166            if let Some(llm) = s.llm {
167                live.extend(llm.prompt_sha256);
168                live.extend(llm.completion_sha256);
169            }
170        }
171        let logs = self.query_logs(&LogQuery {
172            limit: Some(u32::MAX),
173            ..Default::default()
174        })?;
175        for l in logs {
176            live.extend(l.body_sha256);
177        }
178        Ok(live)
179    }
180
181    /// Drop cold partitions (spans/logs/metrics) whose date is older than
182    /// `keep`. Returns the total number of partitions removed. (Metadata GC;
183    /// payload-blob GC runs separately in the maintenance task.)
184    pub fn enforce_span_retention(&self, keep: chrono::DateTime<chrono::Utc>) -> Result<usize> {
185        let cutoff_date = keep.format("%Y-%m-%d").to_string();
186        let dropped = self.cold.drop_partitions_before(&cutoff_date)?;
187        if dropped > 0 {
188            tracing::info!(
189                partitions = dropped,
190                "tael-backend: dropped expired cold partitions"
191            );
192        }
193        Ok(dropped)
194    }
195
196    /// Apply a batch to every projection. Used by both the live write path and
197    /// WAL replay.
198    fn apply_spans(&self, spans: &[Span]) -> Result<()> {
199        self.hot.insert_spans(spans)?;
200        #[cfg(feature = "duckdb")]
201        {
202            self.inner.insert_spans(spans)
203        }
204        #[cfg(not(feature = "duckdb"))]
205        {
206            Ok(())
207        }
208    }
209    fn apply_logs(&self, logs: &[LogRecord]) -> Result<()> {
210        self.hot.insert_logs(logs)?;
211        #[cfg(feature = "duckdb")]
212        {
213            self.inner.insert_logs(logs)
214        }
215        #[cfg(not(feature = "duckdb"))]
216        {
217            Ok(())
218        }
219    }
220    fn apply_metrics(&self, metrics: &[MetricPoint]) -> Result<()> {
221        self.hot.insert_metrics(metrics)?;
222        #[cfg(feature = "duckdb")]
223        {
224            self.inner.insert_metrics(metrics)
225        }
226        #[cfg(not(feature = "duckdb"))]
227        {
228            Ok(())
229        }
230    }
231
232    /// Re-apply any WAL records left unconsumed by a crash, then advance past
233    /// them (they are consumed by `drain`).
234    fn replay(&self) -> Result<()> {
235        let records = self.wal.drain()?;
236        if records.is_empty() {
237            return Ok(());
238        }
239        let mut spans = 0usize;
240        let mut logs = 0usize;
241        let mut metrics = 0usize;
242        for record in records {
243            match record {
244                WalRecord::Spans(s) => {
245                    spans += s.len();
246                    self.apply_spans(&s)?;
247                }
248                WalRecord::Logs(l) => {
249                    logs += l.len();
250                    self.apply_logs(&l)?;
251                }
252                WalRecord::Metrics(m) => {
253                    metrics += m.len();
254                    self.apply_metrics(&m)?;
255                }
256            }
257        }
258        tracing::info!(spans, logs, metrics, "tael-backend: replayed WAL");
259        Ok(())
260    }
261}
262
263impl Store for TaelBackend {
264    // ── Writes: WAL → apply (hot + projection) → mark applied ───────
265    fn insert_spans(&self, spans: &[Span]) -> Result<()> {
266        self.wal.append_spans(spans)?;
267        self.apply_spans(spans)?;
268        self.wal.mark_applied()?;
269        Ok(())
270    }
271
272    fn insert_logs(&self, logs: &[LogRecord]) -> Result<()> {
273        self.wal.append_logs(logs)?;
274        self.apply_logs(logs)?;
275        self.wal.mark_applied()?;
276        Ok(())
277    }
278
279    fn insert_metrics(&self, metrics: &[MetricPoint]) -> Result<()> {
280        self.wal.append_metrics(metrics)?;
281        self.apply_metrics(metrics)?;
282        self.wal.mark_applied()?;
283        Ok(())
284    }
285
286    // ── Core reads: hot tier, unioned with the cold tier ────────────
287    fn query_traces(&self, query: &TraceQuery) -> Result<Vec<Span>> {
288        // Full-text payload filter: restrict to traces whose LLM prompts/
289        // completions match, then apply the rest of the query over those spans.
290        if let Some(ref text) = query.text {
291            let trace_ids = self.search.search_trace_ids(text, 1000)?;
292            if trace_ids.is_empty() {
293                return Ok(Vec::new());
294            }
295            let cutoff = query
296                .last_seconds
297                .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
298            let limit = query.limit.unwrap_or(100) as usize;
299            let mut matched: Vec<Span> = Vec::new();
300            for tid in &trace_ids {
301                for s in self.get_trace(tid)? {
302                    if hot::span_matches(&s, query, cutoff) {
303                        matched.push(s);
304                    }
305                }
306            }
307            matched.sort_by(|a, b| b.start_time.cmp(&a.start_time));
308            matched.truncate(limit);
309            return Ok(matched);
310        }
311        // Hot holds the most-recent spans; cold holds older ones. Newest-first
312        // ordering means hot results lead; only dip into cold to fill the limit.
313        let mut results = self.hot.query_traces(query)?;
314        let limit = query.limit.unwrap_or(100) as usize;
315        if results.len() < limit {
316            let cutoff = query
317                .last_seconds
318                .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
319            let mut cold: Vec<Span> = self
320                .cold
321                .all_spans()?
322                .into_iter()
323                .filter(|s| hot::span_matches(s, query, cutoff))
324                .collect();
325            cold.sort_by(|a, b| b.start_time.cmp(&a.start_time));
326            for s in cold {
327                if results.len() >= limit {
328                    break;
329                }
330                results.push(s);
331            }
332        }
333        Ok(results)
334    }
335    fn get_trace(&self, trace_id: &str) -> Result<Vec<Span>> {
336        let mut spans = self.hot.get_trace(trace_id)?;
337        let mut seen: std::collections::HashSet<String> =
338            spans.iter().map(|s| s.span_id.clone()).collect();
339        // Union with cold; dedup by span_id in case of transient overlap during
340        // compaction.
341        for s in self.cold.get_trace(trace_id)? {
342            if seen.insert(s.span_id.clone()) {
343                spans.push(s);
344            }
345        }
346        spans.sort_by_key(|s| s.start_time);
347        Ok(spans)
348    }
349    fn list_services(&self) -> Result<Vec<ServiceInfo>> {
350        self.hot.list_services()
351    }
352    fn query_logs(&self, query: &LogQuery) -> Result<Vec<LogRecord>> {
353        let mut results = self.hot.query_logs(query)?;
354        let limit = query.limit.unwrap_or(100) as usize;
355        if results.len() < limit {
356            let cutoff = query
357                .last_seconds
358                .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
359            let mut cold: Vec<LogRecord> = self
360                .cold
361                .all_logs()?
362                .into_iter()
363                .filter(|l| hot::log_matches(l, query, cutoff))
364                .collect();
365            cold.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
366            for l in cold {
367                if results.len() >= limit {
368                    break;
369                }
370                results.push(l);
371            }
372        }
373        Ok(results)
374    }
375    fn query_metrics(&self, query: &MetricQuery) -> Result<Vec<MetricPoint>> {
376        let mut results = self.hot.query_metrics(query)?;
377        let limit = query.limit.unwrap_or(500) as usize;
378        if results.len() < limit {
379            let cutoff = query
380                .last_seconds
381                .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
382            let mut cold: Vec<MetricPoint> = self
383                .cold
384                .all_metrics()?
385                .into_iter()
386                .filter(|m| hot::metric_matches(m, query, cutoff))
387                .collect();
388            cold.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
389            for m in cold {
390                if results.len() >= limit {
391                    break;
392                }
393                results.push(m);
394            }
395        }
396        Ok(results)
397    }
398
399    // ── Comments & cross-signal analytics ───────────────────────────
400    fn add_comment(
401        &self,
402        trace_id: &str,
403        span_id: Option<&str>,
404        author: &str,
405        body: &str,
406    ) -> Result<TraceComment> {
407        #[cfg(feature = "duckdb")]
408        {
409            self.inner.add_comment(trace_id, span_id, author, body)
410        }
411        #[cfg(not(feature = "duckdb"))]
412        {
413            self.comments.add(trace_id, span_id, author, body)
414        }
415    }
416    fn get_comments(&self, trace_id: &str) -> Result<Vec<TraceComment>> {
417        #[cfg(feature = "duckdb")]
418        {
419            self.inner.get_comments(trace_id)
420        }
421        #[cfg(not(feature = "duckdb"))]
422        {
423            self.comments.get(trace_id)
424        }
425    }
426    fn query_summary(&self, last_seconds: i64, service: Option<&str>) -> Result<SummaryReport> {
427        #[cfg(feature = "duckdb")]
428        {
429            self.inner.query_summary(last_seconds, service)
430        }
431        #[cfg(not(feature = "duckdb"))]
432        {
433            self.summary_native(last_seconds, service)
434        }
435    }
436    fn query_anomalies(
437        &self,
438        current_seconds: i64,
439        baseline_seconds: i64,
440        service: Option<&str>,
441    ) -> Result<AnomalyReport> {
442        #[cfg(feature = "duckdb")]
443        {
444            self
445                .inner
446                .query_anomalies(current_seconds, baseline_seconds, service)
447        }
448        #[cfg(not(feature = "duckdb"))]
449        {
450            self.anomalies_native(current_seconds, baseline_seconds, service)
451        }
452    }
453    fn query_correlate(&self, trace_id: &str) -> Result<Option<CorrelateReport>> {
454        #[cfg(feature = "duckdb")]
455        {
456            self.inner.query_correlate(trace_id)
457        }
458        #[cfg(not(feature = "duckdb"))]
459        {
460            self.correlate_native(trace_id)
461        }
462    }
463    fn query_sql(&self, sql: &str) -> Result<Vec<serde_json::Value>> {
464        #[cfg(feature = "duckdb")]
465        {
466            self.inner.query_sql(sql)
467        }
468        #[cfg(not(feature = "duckdb"))]
469        {
470            let _ = sql;
471            bail!("SQL queries require a build with the `duckdb` feature")
472        }
473    }
474
475    fn flush(&self) -> Result<()> {
476        // Graceful-shutdown flush: tighten the hot tier so a restart/standby
477        // replays less WAL. WAL fsync already guarantees durability.
478        self.hot.flush()
479    }
480
481    /// Standby entrypoint: durably accept a framed WAL record shipped from a
482    /// leader and bring local state up to it. Mirrors the leader's write
483    /// discipline (append → apply → consume) so the standby's WAL, hot tier, and
484    /// projection stay byte-identical and itself replayable — the basis for
485    /// promotion on leader loss (§5.1).
486    fn apply_framed_wal(&self, framed: &[u8]) -> Result<()> {
487        let record = WalRecord::decode(framed)?;
488        self.wal.append_framed(framed)?;
489        match &record {
490            WalRecord::Spans(s) => self.apply_spans(s)?,
491            WalRecord::Logs(l) => self.apply_logs(l)?,
492            WalRecord::Metrics(m) => self.apply_metrics(m)?,
493        }
494        self.wal.mark_applied()?;
495        Ok(())
496    }
497}
498
499impl TaelBackend {
500    fn summary_native(&self, last_seconds: i64, service: Option<&str>) -> Result<SummaryReport> {
501        let spans = self.query_traces(&TraceQuery {
502            service: service.map(str::to_string),
503            last_seconds: Some(last_seconds),
504            limit: Some(u32::MAX),
505            ..Default::default()
506        })?;
507        let logs = self.query_logs(&LogQuery {
508            service: service.map(str::to_string),
509            last_seconds: Some(last_seconds),
510            limit: Some(u32::MAX),
511            ..Default::default()
512        })?;
513        let metrics = self.query_metrics(&MetricQuery {
514            service: service.map(str::to_string),
515            last_seconds: Some(last_seconds),
516            limit: Some(u32::MAX),
517            ..Default::default()
518        })?;
519        Ok(SummaryReport {
520            window_seconds: last_seconds,
521            service_filter: service.map(str::to_string),
522            traces: trace_summary(&spans),
523            top_services: service_summaries(&spans),
524            top_error_operations: error_operations(&spans),
525            logs: log_summary(&logs),
526            metrics: metric_summary(&metrics),
527        })
528    }
529
530    fn anomalies_native(
531        &self,
532        current_seconds: i64,
533        baseline_seconds: i64,
534        service: Option<&str>,
535    ) -> Result<AnomalyReport> {
536        let current = self.query_traces(&TraceQuery {
537            service: service.map(str::to_string),
538            last_seconds: Some(current_seconds),
539            limit: Some(u32::MAX),
540            ..Default::default()
541        })?;
542        let baseline = self.query_traces(&TraceQuery {
543            service: service.map(str::to_string),
544            last_seconds: Some(baseline_seconds),
545            limit: Some(u32::MAX),
546            ..Default::default()
547        })?;
548        Ok(AnomalyReport {
549            current_seconds,
550            baseline_seconds,
551            service_filter: service.map(str::to_string),
552            anomalies: anomalies(&current, &baseline),
553        })
554    }
555
556    fn correlate_native(&self, trace_id: &str) -> Result<Option<CorrelateReport>> {
557        let spans = self.get_trace(trace_id)?;
558        if spans.is_empty() {
559            return Ok(None);
560        }
561        let start = spans
562            .iter()
563            .map(|s| s.start_time)
564            .min()
565            .unwrap_or_else(chrono::Utc::now);
566        let end = spans
567            .iter()
568            .map(|s| s.end_time)
569            .max()
570            .unwrap_or_else(chrono::Utc::now);
571        let services = spans
572            .iter()
573            .map(|s| s.service.clone())
574            .collect::<HashSet<_>>()
575            .into_iter()
576            .collect();
577        let logs = self.query_logs(&LogQuery {
578            trace_id: Some(trace_id.to_string()),
579            limit: Some(500),
580            ..Default::default()
581        })?;
582        let service = spans.first().map(|s| s.service.clone());
583        let metrics = self.query_metrics(&MetricQuery {
584            service,
585            limit: Some(500),
586            ..Default::default()
587        })?;
588        Ok(Some(CorrelateReport {
589            trace_id: trace_id.to_string(),
590            span_count: spans.len(),
591            services,
592            start_time: start.to_rfc3339(),
593            end_time: end.to_rfc3339(),
594            duration_ms: (end - start).num_microseconds().unwrap_or(0) as f64 / 1000.0,
595            error_count: spans
596                .iter()
597                .filter(|s| matches!(s.status, SpanStatus::Error))
598                .count() as i64,
599            logs,
600            metrics,
601        }))
602    }
603}
604
605#[derive(Debug)]
606struct CommentStore {
607    path: PathBuf,
608    comments: Mutex<HashMap<String, Vec<TraceComment>>>,
609}
610
611impl CommentStore {
612    fn open(data_dir: &str) -> Result<Self> {
613        let path = Path::new(data_dir).join("trace_comments.jsonl");
614        let mut comments: HashMap<String, Vec<TraceComment>> = HashMap::new();
615        if path.exists() {
616            let file = std::fs::File::open(&path)
617                .with_context(|| format!("opening {}", path.display()))?;
618            for line in std::io::BufReader::new(file).lines() {
619                let line = line?;
620                if line.trim().is_empty() {
621                    continue;
622                }
623                let comment: TraceComment = serde_json::from_str(&line)
624                    .with_context(|| format!("decoding {}", path.display()))?;
625                comments
626                    .entry(comment.trace_id.clone())
627                    .or_default()
628                    .push(comment);
629            }
630        }
631        Ok(Self {
632            path,
633            comments: Mutex::new(comments),
634        })
635    }
636
637    fn add(
638        &self,
639        trace_id: &str,
640        span_id: Option<&str>,
641        author: &str,
642        body: &str,
643    ) -> Result<TraceComment> {
644        if let Some(parent) = self.path.parent() {
645            std::fs::create_dir_all(parent)?;
646        }
647        let comment = TraceComment {
648            id: uuid::Uuid::new_v4().to_string(),
649            trace_id: trace_id.to_string(),
650            span_id: span_id.map(str::to_string),
651            author: author.to_string(),
652            body: body.to_string(),
653            created_at: chrono::Utc::now().to_rfc3339(),
654        };
655        let mut file = OpenOptions::new()
656            .create(true)
657            .append(true)
658            .open(&self.path)
659            .with_context(|| format!("opening {}", self.path.display()))?;
660        writeln!(file, "{}", serde_json::to_string(&comment)?)?;
661        self.comments
662            .lock()
663            .expect("comment store lock poisoned")
664            .entry(trace_id.to_string())
665            .or_default()
666            .push(comment.clone());
667        Ok(comment)
668    }
669
670    fn get(&self, trace_id: &str) -> Result<Vec<TraceComment>> {
671        Ok(self
672            .comments
673            .lock()
674            .expect("comment store lock poisoned")
675            .get(trace_id)
676            .cloned()
677            .unwrap_or_default())
678    }
679}
680
681fn trace_summary(spans: &[Span]) -> TraceSummary {
682    let mut durations: Vec<f64> = spans.iter().map(|s| s.duration_ms).collect();
683    durations.sort_by(|a, b| a.total_cmp(b));
684    let trace_count = spans.iter().map(|s| &s.trace_id).collect::<HashSet<_>>().len() as i64;
685    let error_count = spans
686        .iter()
687        .filter(|s| matches!(s.status, SpanStatus::Error))
688        .count() as i64;
689    TraceSummary {
690        span_count: spans.len() as i64,
691        trace_count,
692        error_count,
693        error_rate: ratio(error_count, spans.len() as i64),
694        avg_ms: if durations.is_empty() {
695            0.0
696        } else {
697            durations.iter().sum::<f64>() / durations.len() as f64
698        },
699        max_ms: durations.last().copied().unwrap_or(0.0),
700        p50_ms: percentile(&durations, 0.50),
701        p95_ms: percentile(&durations, 0.95),
702        p99_ms: percentile(&durations, 0.99),
703    }
704}
705
706fn service_summaries(spans: &[Span]) -> Vec<ServiceSummary> {
707    let mut by_service: HashMap<String, Vec<Span>> = HashMap::new();
708    for span in spans {
709        by_service
710            .entry(span.service.clone())
711            .or_default()
712            .push(span.clone());
713    }
714    let mut rows: Vec<ServiceSummary> = by_service
715        .into_iter()
716        .map(|(service, spans)| {
717            let summary = trace_summary(&spans);
718            ServiceSummary {
719                service,
720                span_count: summary.span_count,
721                error_rate: summary.error_rate,
722                p95_ms: summary.p95_ms,
723            }
724        })
725        .collect();
726    rows.sort_by(|a, b| b.span_count.cmp(&a.span_count));
727    rows.truncate(10);
728    rows
729}
730
731fn error_operations(spans: &[Span]) -> Vec<ErrorOperation> {
732    let mut counts: HashMap<(String, String), i64> = HashMap::new();
733    for span in spans
734        .iter()
735        .filter(|s| matches!(s.status, SpanStatus::Error))
736    {
737        *counts
738            .entry((span.service.clone(), span.operation.clone()))
739            .or_default() += 1;
740    }
741    let mut rows: Vec<ErrorOperation> = counts
742        .into_iter()
743        .map(|((service, operation), error_count)| ErrorOperation {
744            service,
745            operation,
746            error_count,
747        })
748        .collect();
749    rows.sort_by(|a, b| b.error_count.cmp(&a.error_count));
750    rows.truncate(10);
751    rows
752}
753
754fn log_summary(logs: &[LogRecord]) -> LogSummary {
755    let mut summary = LogSummary {
756        total: logs.len() as i64,
757        ..Default::default()
758    };
759    for log in logs {
760        match log.severity {
761            LogSeverity::Error | LogSeverity::Fatal => summary.error += 1,
762            LogSeverity::Warn => summary.warn += 1,
763            LogSeverity::Info => summary.info += 1,
764            LogSeverity::Debug => summary.debug += 1,
765            _ => {}
766        }
767    }
768    summary
769}
770
771fn metric_summary(metrics: &[MetricPoint]) -> MetricSummary {
772    MetricSummary {
773        point_count: metrics.len() as i64,
774        unique_names: metrics.iter().map(|m| &m.name).collect::<HashSet<_>>().len() as i64,
775    }
776}
777
778fn anomalies(current: &[Span], baseline: &[Span]) -> Vec<Anomaly> {
779    let cur = service_summaries(current);
780    let base = service_summaries(baseline);
781    let base_map: HashMap<String, ServiceSummary> =
782        base.into_iter().map(|s| (s.service.clone(), s)).collect();
783    let mut rows = Vec::new();
784    for c in cur {
785        let Some(b) = base_map.get(&c.service) else {
786            continue;
787        };
788        if c.error_rate > b.error_rate + 0.05 {
789            rows.push(Anomaly {
790                service: c.service.clone(),
791                kind: "error_rate".to_string(),
792                severity: if c.error_rate > b.error_rate + 0.20 {
793                    "high".to_string()
794                } else {
795                    "medium".to_string()
796                },
797                current: c.error_rate,
798                baseline: b.error_rate,
799                delta: c.error_rate - b.error_rate,
800                description: format!(
801                    "{} error rate increased from {:.1}% to {:.1}%",
802                    c.service,
803                    b.error_rate * 100.0,
804                    c.error_rate * 100.0
805                ),
806            });
807        }
808        if c.p95_ms > b.p95_ms * 1.5 && c.p95_ms - b.p95_ms > 25.0 {
809            rows.push(Anomaly {
810                service: c.service.clone(),
811                kind: "p95_latency".to_string(),
812                severity: if c.p95_ms > b.p95_ms * 2.0 {
813                    "high".to_string()
814                } else {
815                    "medium".to_string()
816                },
817                current: c.p95_ms,
818                baseline: b.p95_ms,
819                delta: c.p95_ms - b.p95_ms,
820                description: format!(
821                    "{} p95 latency increased from {:.1}ms to {:.1}ms",
822                    c.service, b.p95_ms, c.p95_ms
823                ),
824            });
825        }
826    }
827    rows
828}
829
830fn percentile(sorted: &[f64], p: f64) -> f64 {
831    if sorted.is_empty() {
832        return 0.0;
833    }
834    let idx = ((sorted.len() - 1) as f64 * p).round() as usize;
835    sorted[idx]
836}
837
838fn ratio(numerator: i64, denominator: i64) -> f64 {
839    if denominator == 0 {
840        0.0
841    } else {
842        numerator as f64 / denominator as f64
843    }
844}
845
846#[cfg(test)]
847mod tests {
848    use super::*;
849    use crate::storage::models::{SpanKind, SpanStatus};
850    use chrono::Utc;
851    use std::collections::HashMap;
852
853    /// Removes a walrus namespace dir (`wal_files/<key>`) on drop.
854    struct NsGuard(String);
855    impl Drop for NsGuard {
856        fn drop(&mut self) {
857            let _ = std::fs::remove_dir_all(format!("wal_files/{}", self.0));
858        }
859    }
860
861    fn backend() -> (TaelBackend, tempfile::TempDir, NsGuard) {
862        let dir = tempfile::tempdir().unwrap();
863        let key = format!("tael-test-backend-{}", uuid::Uuid::new_v4());
864        let b = TaelBackend::with_wal_key(dir.path().to_str().unwrap(), &key).unwrap();
865        (b, dir, NsGuard(key))
866    }
867
868    fn span(trace: &str, sid: &str, svc: &str, dur: f64, status: SpanStatus) -> Span {
869        let now = Utc::now();
870        Span {
871            trace_id: trace.into(),
872            span_id: sid.into(),
873            parent_span_id: None,
874            service: svc.into(),
875            operation: "op".into(),
876            start_time: now,
877            end_time: now,
878            duration_ms: dur,
879            status,
880            attributes: HashMap::new(),
881            events: vec![],
882            kind: SpanKind::Internal,
883            llm: None,
884        }
885    }
886
887    #[test]
888    fn get_trace_reconstructs_span_tree_from_hot_tier() {
889        let (b, _d, _g) = backend();
890        b.insert_spans(&[
891            span("t1", "s1", "api", 10.0, SpanStatus::Ok),
892            span("t1", "s2", "db", 20.0, SpanStatus::Ok),
893            span("t2", "s3", "api", 5.0, SpanStatus::Error),
894        ])
895        .unwrap();
896
897        let trace = b.get_trace("t1").unwrap();
898        assert_eq!(trace.len(), 2);
899        assert!(trace.iter().all(|s| s.trace_id == "t1"));
900        assert_eq!(b.get_trace("t2").unwrap().len(), 1);
901        assert!(b.get_trace("missing").unwrap().is_empty());
902    }
903
904    #[test]
905    #[cfg(feature = "duckdb")]
906    fn query_traces_filters_match_duckdb() {
907        let (b, _d, _g) = backend();
908        let spans = vec![
909            span("t1", "s1", "api", 10.0, SpanStatus::Ok),
910            span("t2", "s2", "db", 600.0, SpanStatus::Error),
911            span("t3", "s3", "api", 50.0, SpanStatus::Error),
912        ];
913        b.insert_spans(&spans).unwrap();
914
915        // Independent DuckDB with identical data = the parity oracle.
916        let oracle_dir = tempfile::tempdir().unwrap();
917        let oracle = DuckDbStore::new(oracle_dir.path().to_str().unwrap()).unwrap();
918        oracle.insert_spans(&spans).unwrap();
919
920        let queries = [
921            TraceQuery {
922                service: Some("api".into()),
923                ..Default::default()
924            },
925            TraceQuery {
926                status: Some("error".into()),
927                ..Default::default()
928            },
929            TraceQuery {
930                min_duration_ms: Some(100.0),
931                ..Default::default()
932            },
933            TraceQuery::default(),
934        ];
935        for q in &queries {
936            let mut hot: Vec<String> = b
937                .query_traces(q)
938                .unwrap()
939                .into_iter()
940                .map(|s| s.span_id)
941                .collect();
942            let mut duck: Vec<String> = oracle
943                .query_traces(q)
944                .unwrap()
945                .into_iter()
946                .map(|s| s.span_id)
947                .collect();
948            hot.sort();
949            duck.sort();
950            assert_eq!(hot, duck, "hot tier and DuckDB disagree for {q:?}");
951        }
952    }
953
954    #[test]
955    fn list_services_aggregates_from_hot_tier() {
956        let (b, _d, _g) = backend();
957        b.insert_spans(&[
958            span("t1", "s1", "api", 10.0, SpanStatus::Ok),
959            span("t1", "s2", "api", 30.0, SpanStatus::Error),
960            span("t2", "s3", "db", 20.0, SpanStatus::Ok),
961        ])
962        .unwrap();
963
964        let services = b.list_services().unwrap();
965        let api = services.iter().find(|s| s.name == "api").unwrap();
966        assert_eq!(api.span_count, 2);
967        assert_eq!(api.trace_count, 1);
968        assert_eq!(api.avg_duration_ms, 20.0);
969        assert!((api.error_rate - 0.5).abs() < 1e-9);
970    }
971
972    #[test]
973    fn standby_rebuilds_identical_state_from_shipped_wal() {
974        use crate::storage::models::{LogRecord, LogSeverity};
975
976        // The standby: a normal backend that is never written to directly.
977        let (standby, _sd, _sg) = backend();
978        let standby = Arc::new(standby);
979
980        // A WAL sink that ships each framed record into the standby — the
981        // in-process stand-in for the (deferred) network transport.
982        struct ReplicaSink(Arc<TaelBackend>);
983        impl WalSink for ReplicaSink {
984            fn append_framed(&self, framed: &[u8]) -> Result<()> {
985                self.0.apply_framed_wal(framed)
986            }
987        }
988
989        // The leader, with the standby attached as a replication sink.
990        let leader_dir = tempfile::tempdir().unwrap();
991        let leader_key = format!("tael-test-leader-{}", uuid::Uuid::new_v4());
992        let _lg = NsGuard(leader_key.clone());
993        let leader = TaelBackend::with_wal_key_and_sinks(
994            leader_dir.path().to_str().unwrap(),
995            &leader_key,
996            vec![Arc::new(ReplicaSink(Arc::clone(&standby)))],
997            None, // synchronous: require all (one) standbys
998        )
999        .unwrap();
1000
1001        // Write a mix of signals to the leader only.
1002        leader
1003            .insert_spans(&[
1004                span("t1", "s1", "api", 10.0, SpanStatus::Ok),
1005                span("t1", "s2", "db", 20.0, SpanStatus::Ok),
1006                span("t2", "s3", "api", 5.0, SpanStatus::Error),
1007            ])
1008            .unwrap();
1009        leader
1010            .insert_logs(&[LogRecord {
1011                timestamp: Utc::now(),
1012                observed_timestamp: Utc::now(),
1013                trace_id: Some("t1".into()),
1014                span_id: None,
1015                severity: LogSeverity::Error,
1016                severity_text: "ERROR".into(),
1017                body: "boom".into(),
1018                service: "api".into(),
1019                attributes: HashMap::new(),
1020                body_sha256: None,
1021            }])
1022            .unwrap();
1023
1024        // The standby reconstructed identical state purely from the shipped WAL.
1025        assert_eq!(standby.get_trace("t1").unwrap().len(), 2);
1026        assert_eq!(standby.get_trace("t2").unwrap().len(), 1);
1027        let leader_traces = leader.query_traces(&TraceQuery::default()).unwrap();
1028        let standby_traces = standby.query_traces(&TraceQuery::default()).unwrap();
1029        assert_eq!(leader_traces.len(), standby_traces.len());
1030        assert_eq!(standby_traces.len(), 3);
1031    }
1032
1033    #[test]
1034    fn logs_and_metrics_round_trip_through_hot_tier() {
1035        use crate::storage::models::{LogRecord, LogSeverity, MetricPoint, MetricType};
1036        let (b, _d, _g) = backend();
1037
1038        b.insert_logs(&[LogRecord {
1039            timestamp: Utc::now(),
1040            observed_timestamp: Utc::now(),
1041            trace_id: Some("t1".into()),
1042            span_id: None,
1043            severity: LogSeverity::Error,
1044            severity_text: "ERROR".into(),
1045            body: "connection refused".into(),
1046            service: "api".into(),
1047            attributes: HashMap::new(),
1048            body_sha256: None,
1049        }])
1050        .unwrap();
1051        let logs = b
1052            .query_logs(&LogQuery {
1053                severity: Some("error".into()),
1054                ..Default::default()
1055            })
1056            .unwrap();
1057        assert_eq!(logs.len(), 1);
1058        assert_eq!(logs[0].body, "connection refused");
1059
1060        b.insert_metrics(&[MetricPoint {
1061            timestamp: Utc::now(),
1062            service: "api".into(),
1063            name: "http_requests_total".into(),
1064            metric_type: MetricType::Sum,
1065            value: 42.0,
1066            unit: "1".into(),
1067            attributes: HashMap::new(),
1068        }])
1069        .unwrap();
1070        let metrics = b
1071            .query_metrics(&MetricQuery {
1072                name: Some("http_requests_total".into()),
1073                ..Default::default()
1074            })
1075            .unwrap();
1076        assert_eq!(metrics.len(), 1);
1077        assert_eq!(metrics[0].value, 42.0);
1078    }
1079
1080    #[test]
1081    fn compaction_moves_spans_to_cold_and_reads_still_union() {
1082        let (b, _d, _g) = backend();
1083        b.insert_spans(&[
1084            span("t1", "s1", "api", 10.0, SpanStatus::Ok),
1085            span("t1", "s2", "db", 20.0, SpanStatus::Ok),
1086            span("t2", "s3", "api", 5.0, SpanStatus::Error),
1087        ])
1088        .unwrap();
1089
1090        // Compact everything (cutoff in the future) → all spans roll to cold.
1091        let moved = b
1092            .compact_spans(Utc::now() + chrono::Duration::seconds(60))
1093            .unwrap();
1094        assert_eq!(moved, 3);
1095        // Hot tier is now empty...
1096        assert!(b.hot.get_trace("t1").unwrap().is_empty());
1097        // ...but the unioned reads still see everything.
1098        assert_eq!(b.get_trace("t1").unwrap().len(), 2);
1099        let all = b.query_traces(&TraceQuery::default()).unwrap();
1100        assert_eq!(all.len(), 3);
1101        let errors = b
1102            .query_traces(&TraceQuery {
1103                status: Some("error".into()),
1104                ..Default::default()
1105            })
1106            .unwrap();
1107        assert_eq!(errors.len(), 1);
1108        assert_eq!(errors[0].span_id, "s3");
1109
1110        // Re-compaction is a no-op (nothing left in hot).
1111        assert_eq!(b.compact_spans(Utc::now()).unwrap(), 0);
1112    }
1113
1114    #[test]
1115    fn logs_metrics_compact_to_cold_and_union_reads() {
1116        use crate::storage::models::{LogRecord, LogSeverity, MetricPoint, MetricType};
1117        let (b, _d, _g) = backend();
1118        b.insert_logs(&[LogRecord {
1119            timestamp: Utc::now(),
1120            observed_timestamp: Utc::now(),
1121            trace_id: Some("t1".into()),
1122            span_id: None,
1123            severity: LogSeverity::Error,
1124            severity_text: "ERROR".into(),
1125            body: "boom".into(),
1126            service: "api".into(),
1127            attributes: HashMap::new(),
1128            body_sha256: None,
1129        }])
1130        .unwrap();
1131        b.insert_metrics(&[MetricPoint {
1132            timestamp: Utc::now(),
1133            service: "api".into(),
1134            name: "rps".into(),
1135            metric_type: MetricType::Sum,
1136            value: 7.0,
1137            unit: "1".into(),
1138            attributes: HashMap::new(),
1139        }])
1140        .unwrap();
1141
1142        let moved = b
1143            .compact_logs_metrics(Utc::now() + chrono::Duration::seconds(60))
1144            .unwrap();
1145        assert_eq!(moved, 2);
1146
1147        // Served from cold via union after the hot tier emptied.
1148        let logs = b.query_logs(&LogQuery::default()).unwrap();
1149        assert_eq!(logs.len(), 1);
1150        assert_eq!(logs[0].body, "boom");
1151        let metrics = b.query_metrics(&MetricQuery::default()).unwrap();
1152        assert_eq!(metrics.len(), 1);
1153        assert_eq!(metrics[0].value, 7.0);
1154    }
1155
1156    #[test]
1157    fn full_text_search_filters_traces_by_payload() {
1158        let (b, _d, _g) = backend();
1159        b.insert_spans(&[
1160            span("t1", "s1", "llm-proxy", 100.0, SpanStatus::Ok),
1161            span("t2", "s2", "llm-proxy", 100.0, SpanStatus::Ok),
1162        ])
1163        .unwrap();
1164        // Index payload text the way ingestion would.
1165        let idx = b.search_index();
1166        idx.index_span("t1", "s1", "summarize the rate limit policy")
1167            .unwrap();
1168        idx.index_span("t2", "s2", "translate to French").unwrap();
1169        idx.commit().unwrap();
1170
1171        let hits = b
1172            .query_traces(&TraceQuery {
1173                text: Some("rate limit".into()),
1174                ..Default::default()
1175            })
1176            .unwrap();
1177        assert_eq!(hits.len(), 1);
1178        assert_eq!(hits[0].trace_id, "t1");
1179
1180        // Non-matching query → empty.
1181        let none = b
1182            .query_traces(&TraceQuery {
1183                text: Some("quantum".into()),
1184                ..Default::default()
1185            })
1186            .unwrap();
1187        assert!(none.is_empty());
1188    }
1189
1190    #[test]
1191    fn survives_reopen_via_persistent_hot_tier() {
1192        let dir = tempfile::tempdir().unwrap();
1193        let path = dir.path().to_str().unwrap();
1194        let key = format!("tael-test-reopen-{}", uuid::Uuid::new_v4());
1195        {
1196            let b = TaelBackend::with_wal_key(path, &key).unwrap();
1197            b.insert_spans(&[span("t1", "s1", "api", 10.0, SpanStatus::Ok)])
1198                .unwrap();
1199        }
1200        // Reopen: data persists (hot tier + DuckDB are durable).
1201        let b2 = TaelBackend::with_wal_key(path, &key).unwrap();
1202        assert_eq!(b2.get_trace("t1").unwrap().len(), 1);
1203        let _ = std::fs::remove_dir_all(format!("wal_files/{key}"));
1204    }
1205}