Skip to main content

tael_server/storage/backend/
mod.rs

1//! `TaelBackend` — the purpose-built storage engine (see
2//! `docs/tael-backend-design.md`). Built incrementally behind the `Store`
3//! trait so it ships opt-in alongside `DuckDbStore`.
4//!
5//! **Phase 3:** durable WAL write path + crash-gap replay.
6//! **Phase 4 (this file):** an `fjall` LSM hot tier serves the core per-signal
7//! reads (`query_traces`, `get_trace`, `list_services`, `query_logs`,
8//! `query_metrics`). The legacy DuckDB projection is available behind the
9//! `duckdb` Cargo feature, but the default backend is self-contained so the
10//! default CLI install does not compile or link DuckDB.
11
12mod cold;
13mod hot;
14mod wal;
15
16use anyhow::{Result, bail};
17
18#[cfg(feature = "duckdb")]
19use super::DuckDbStore;
20use super::Store;
21use super::models::{
22    Anomaly, AnomalyReport, CorrelateReport, ErrorOperation, LogQuery, LogRecord, LogSeverity,
23    LogSummary, MetricPoint, MetricQuery, MetricSummary, ServiceInfo, ServiceSummary, Span,
24    SpanStatus, SummaryReport, TraceComment, TraceQuery, TraceSummary,
25};
26use std::collections::{HashMap, HashSet};
27use std::sync::Arc;
28
29use super::DynObjectBackend;
30use super::SearchIndex;
31use super::{CommentsStore, JsonlComments};
32use cold::ColdTier;
33use hot::HotTier;
34use wal::WalLog;
35pub use wal::{WalRecord, WalSink};
36
37pub struct TaelBackend {
38    /// LSM hot tier — serves recent reads.
39    hot: HotTier,
40    /// Parquet cold tier — aged spans rolled out of the hot tier.
41    cold: ColdTier,
42    /// Optional legacy projection used only when the `duckdb` feature is enabled.
43    #[cfg(feature = "duckdb")]
44    inner: DuckDbStore,
45    /// Pluggable comments store (JSONL locally, Postgres in the cloud).
46    comments: Box<dyn CommentsStore>,
47    /// Full-text index over LLM payloads (shared with the ingest path).
48    search: Arc<SearchIndex>,
49    wal: WalLog,
50}
51
52impl TaelBackend {
53    pub fn new(data_dir: &str) -> Result<Self> {
54        Self::with_wal_key(data_dir, "tael-backend")
55    }
56
57    /// Like [`Self::new`] but with an explicit WAL namespace key — lets tests
58    /// run isolated instances (the WAL key is process-global in walrus).
59    pub fn with_wal_key(data_dir: &str, wal_key: &str) -> Result<Self> {
60        Self::with_wal_key_and_sinks(data_dir, wal_key, Vec::new(), None)
61    }
62
63    /// Like [`Self::with_wal_key`] but with WAL replication sinks attached: this
64    /// backend runs as a **leader** that ships every appended record to its
65    /// standbys before acking the write (`docs/tael-server-scaling-ha.md` §5.1).
66    /// `required_acks` is how many standbys must confirm before a write returns
67    /// (`None` = all = fully synchronous; `Some(0)` = async best-effort). With
68    /// no sinks the write path is unchanged. A standby on the receiving end
69    /// applies shipped records via `Store::apply_framed_wal`.
70    pub fn with_wal_key_and_sinks(
71        data_dir: &str,
72        wal_key: &str,
73        sinks: Vec<Arc<dyn WalSink>>,
74        required_acks: Option<usize>,
75    ) -> Result<Self> {
76        // Default components: local cold tier + local JSONL comments.
77        let comments: Box<dyn CommentsStore> = Box::new(JsonlComments::open(data_dir)?);
78        Self::with_components(data_dir, wal_key, sinks, required_acks, None, comments)
79    }
80
81    /// Full constructor used by the cloud deployment path: pass an optional
82    /// cold-tier object backend (`None` = local filesystem) and an already-built
83    /// comments store (JSONL or Postgres). The local constructors above forward
84    /// here with the single-binary defaults, so their behavior is unchanged.
85    pub fn with_components(
86        data_dir: &str,
87        wal_key: &str,
88        sinks: Vec<Arc<dyn WalSink>>,
89        required_acks: Option<usize>,
90        cold_backend: Option<DynObjectBackend>,
91        comments: Box<dyn CommentsStore>,
92    ) -> Result<Self> {
93        let hot = HotTier::open(data_dir)?;
94        let cold = match cold_backend {
95            Some(backend) => ColdTier::with_backend(backend)?,
96            None => ColdTier::open(data_dir)?,
97        };
98        #[cfg(feature = "duckdb")]
99        let inner = DuckDbStore::new(data_dir)?;
100        let search = Arc::new(SearchIndex::open(data_dir)?);
101        let mut wal = if sinks.is_empty() {
102            WalLog::new_for_key(wal_key)?
103        } else {
104            WalLog::new_for_key_with_sinks(wal_key, sinks)?
105        };
106        if let Some(n) = required_acks {
107            wal = wal.with_required_acks(n);
108        }
109        let backend = Self {
110            hot,
111            cold,
112            #[cfg(feature = "duckdb")]
113            inner,
114            comments,
115            search,
116            wal,
117        };
118        backend.replay()?;
119        Ok(backend)
120    }
121
122    /// The shared payload search index — handed to the ingest path so prompt/
123    /// completion text is indexed at write time (the text isn't retained on the
124    /// span itself, only its blob hashes).
125    pub fn search_index(&self) -> Arc<SearchIndex> {
126        Arc::clone(&self.search)
127    }
128
129    /// Roll spans older than `cutoff` out of the LSM hot tier into Parquet.
130    /// Returns the number of spans compacted. Safe to call repeatedly.
131    pub fn compact_spans(&self, cutoff: chrono::DateTime<chrono::Utc>) -> Result<usize> {
132        let cutoff_ns = cutoff.timestamp_nanos_opt().unwrap_or(0);
133        let evicted = self.hot.evict_spans_before(cutoff_ns)?;
134        if evicted.is_empty() {
135            return Ok(0);
136        }
137        // Write to cold first, then the hot eviction is already done; if we
138        // crash between, the spans remain in the DuckDB projection and the WAL.
139        self.cold.write_spans(&evicted)?;
140        tracing::info!(
141            spans = evicted.len(),
142            "tael-backend: compacted spans to cold tier"
143        );
144        Ok(evicted.len())
145    }
146
147    /// Roll aged logs/metrics out of the hot tier into Parquet. Returns the
148    /// total number of records compacted across both signals.
149    pub fn compact_logs_metrics(&self, cutoff: chrono::DateTime<chrono::Utc>) -> Result<usize> {
150        let cutoff_ns = cutoff.timestamp_nanos_opt().unwrap_or(0);
151        let logs = self.hot.evict_logs_before(cutoff_ns)?;
152        if !logs.is_empty() {
153            self.cold.write_logs(&logs)?;
154        }
155        let metrics = self.hot.evict_metrics_before(cutoff_ns)?;
156        if !metrics.is_empty() {
157            self.cold.write_metrics(&metrics)?;
158            // Downsample to 5m rollups alongside the raw cold write, so trends
159            // survive once raw points are dropped by retention.
160            self.cold.write_downsampled(&metrics)?;
161        }
162        let n = logs.len() + metrics.len();
163        if n > 0 {
164            tracing::info!(
165                logs = logs.len(),
166                metrics = metrics.len(),
167                "tael-backend: compacted logs/metrics to cold tier"
168            );
169        }
170        Ok(n)
171    }
172
173    /// Collect every blob hash still referenced by a live row — LLM prompt and
174    /// completion hashes on spans, and `body_sha256` on logs — across hot and
175    /// cold tiers. Drives blob GC (anything not here is unreferenced).
176    pub fn collect_live_blob_hashes(&self) -> Result<std::collections::HashSet<String>> {
177        use super::models::{LogQuery, TraceQuery};
178        let mut live = std::collections::HashSet::new();
179        // Spans (hot∪cold via the unioned read path), with a high limit.
180        let spans = self.query_traces(&TraceQuery {
181            limit: Some(u32::MAX),
182            ..Default::default()
183        })?;
184        for s in spans {
185            if let Some(llm) = s.llm {
186                live.extend(llm.prompt_sha256);
187                live.extend(llm.completion_sha256);
188            }
189        }
190        let logs = self.query_logs(&LogQuery {
191            limit: Some(u32::MAX),
192            ..Default::default()
193        })?;
194        for l in logs {
195            live.extend(l.body_sha256);
196        }
197        Ok(live)
198    }
199
200    /// Drop cold partitions (spans/logs/metrics) whose date is older than
201    /// `keep`. Returns the total number of partitions removed. (Metadata GC;
202    /// payload-blob GC runs separately in the maintenance task.)
203    pub fn enforce_span_retention(&self, keep: chrono::DateTime<chrono::Utc>) -> Result<usize> {
204        let cutoff_date = keep.format("%Y-%m-%d").to_string();
205        let dropped = self.cold.drop_partitions_before(&cutoff_date)?;
206        if dropped > 0 {
207            tracing::info!(
208                partitions = dropped,
209                "tael-backend: dropped expired cold partitions"
210            );
211        }
212        Ok(dropped)
213    }
214
215    /// Apply a batch to every projection. Used by both the live write path and
216    /// WAL replay.
217    fn apply_spans(&self, spans: &[Span]) -> Result<()> {
218        self.hot.insert_spans(spans)?;
219        #[cfg(feature = "duckdb")]
220        {
221            self.inner.insert_spans(spans)
222        }
223        #[cfg(not(feature = "duckdb"))]
224        {
225            Ok(())
226        }
227    }
228    fn apply_logs(&self, logs: &[LogRecord]) -> Result<()> {
229        self.hot.insert_logs(logs)?;
230        #[cfg(feature = "duckdb")]
231        {
232            self.inner.insert_logs(logs)
233        }
234        #[cfg(not(feature = "duckdb"))]
235        {
236            Ok(())
237        }
238    }
239    fn apply_metrics(&self, metrics: &[MetricPoint]) -> Result<()> {
240        self.hot.insert_metrics(metrics)?;
241        #[cfg(feature = "duckdb")]
242        {
243            self.inner.insert_metrics(metrics)
244        }
245        #[cfg(not(feature = "duckdb"))]
246        {
247            Ok(())
248        }
249    }
250
251    /// Re-apply any WAL records left unconsumed by a crash, then advance past
252    /// them (they are consumed by `drain`).
253    fn replay(&self) -> Result<()> {
254        let records = self.wal.drain()?;
255        if records.is_empty() {
256            return Ok(());
257        }
258        let mut spans = 0usize;
259        let mut logs = 0usize;
260        let mut metrics = 0usize;
261        for record in records {
262            match record {
263                WalRecord::Spans(s) => {
264                    spans += s.len();
265                    self.apply_spans(&s)?;
266                }
267                WalRecord::Logs(l) => {
268                    logs += l.len();
269                    self.apply_logs(&l)?;
270                }
271                WalRecord::Metrics(m) => {
272                    metrics += m.len();
273                    self.apply_metrics(&m)?;
274                }
275            }
276        }
277        tracing::info!(spans, logs, metrics, "tael-backend: replayed WAL");
278        Ok(())
279    }
280}
281
282impl Store for TaelBackend {
283    // ── Writes: WAL → apply (hot + projection) → mark applied ───────
284    fn insert_spans(&self, spans: &[Span]) -> Result<()> {
285        self.wal.append_spans(spans)?;
286        self.apply_spans(spans)?;
287        self.wal.mark_applied()?;
288        Ok(())
289    }
290
291    fn insert_logs(&self, logs: &[LogRecord]) -> Result<()> {
292        self.wal.append_logs(logs)?;
293        self.apply_logs(logs)?;
294        self.wal.mark_applied()?;
295        Ok(())
296    }
297
298    fn insert_metrics(&self, metrics: &[MetricPoint]) -> Result<()> {
299        self.wal.append_metrics(metrics)?;
300        self.apply_metrics(metrics)?;
301        self.wal.mark_applied()?;
302        Ok(())
303    }
304
305    // ── Core reads: hot tier, unioned with the cold tier ────────────
306    fn query_traces(&self, query: &TraceQuery) -> Result<Vec<Span>> {
307        // Full-text payload filter: restrict to traces whose LLM prompts/
308        // completions match, then apply the rest of the query over those spans.
309        if let Some(ref text) = query.text {
310            let trace_ids = self.search.search_trace_ids(text, 1000)?;
311            if trace_ids.is_empty() {
312                return Ok(Vec::new());
313            }
314            let cutoff = query
315                .last_seconds
316                .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
317            let limit = query.limit.unwrap_or(100) as usize;
318            let mut matched: Vec<Span> = Vec::new();
319            for tid in &trace_ids {
320                for s in self.get_trace(tid)? {
321                    if hot::span_matches(&s, query, cutoff) {
322                        matched.push(s);
323                    }
324                }
325            }
326            matched.sort_by(|a, b| b.start_time.cmp(&a.start_time));
327            matched.truncate(limit);
328            return Ok(matched);
329        }
330        // Hot holds the most-recent spans; cold holds older ones. Newest-first
331        // ordering means hot results lead; only dip into cold to fill the limit.
332        let mut results = self.hot.query_traces(query)?;
333        let limit = query.limit.unwrap_or(100) as usize;
334        if results.len() < limit {
335            let cutoff = query
336                .last_seconds
337                .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
338            let mut cold: Vec<Span> = self
339                .cold
340                .all_spans()?
341                .into_iter()
342                .filter(|s| hot::span_matches(s, query, cutoff))
343                .collect();
344            cold.sort_by(|a, b| b.start_time.cmp(&a.start_time));
345            for s in cold {
346                if results.len() >= limit {
347                    break;
348                }
349                results.push(s);
350            }
351        }
352        Ok(results)
353    }
354    fn get_trace(&self, trace_id: &str) -> Result<Vec<Span>> {
355        let mut spans = self.hot.get_trace(trace_id)?;
356        let mut seen: std::collections::HashSet<String> =
357            spans.iter().map(|s| s.span_id.clone()).collect();
358        // Union with cold; dedup by span_id in case of transient overlap during
359        // compaction.
360        for s in self.cold.get_trace(trace_id)? {
361            if seen.insert(s.span_id.clone()) {
362                spans.push(s);
363            }
364        }
365        spans.sort_by_key(|s| s.start_time);
366        Ok(spans)
367    }
368    fn list_services(&self) -> Result<Vec<ServiceInfo>> {
369        self.hot.list_services()
370    }
371    fn query_logs(&self, query: &LogQuery) -> Result<Vec<LogRecord>> {
372        let mut results = self.hot.query_logs(query)?;
373        let limit = query.limit.unwrap_or(100) as usize;
374        if results.len() < limit {
375            let cutoff = query
376                .last_seconds
377                .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
378            let mut cold: Vec<LogRecord> = self
379                .cold
380                .all_logs()?
381                .into_iter()
382                .filter(|l| hot::log_matches(l, query, cutoff))
383                .collect();
384            cold.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
385            for l in cold {
386                if results.len() >= limit {
387                    break;
388                }
389                results.push(l);
390            }
391        }
392        Ok(results)
393    }
394    fn query_metrics(&self, query: &MetricQuery) -> Result<Vec<MetricPoint>> {
395        let mut results = self.hot.query_metrics(query)?;
396        let limit = query.limit.unwrap_or(500) as usize;
397        if results.len() < limit {
398            let cutoff = query
399                .last_seconds
400                .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
401            let mut cold: Vec<MetricPoint> = self
402                .cold
403                .all_metrics()?
404                .into_iter()
405                .filter(|m| hot::metric_matches(m, query, cutoff))
406                .collect();
407            cold.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
408            for m in cold {
409                if results.len() >= limit {
410                    break;
411                }
412                results.push(m);
413            }
414        }
415        Ok(results)
416    }
417
418    // ── Comments & cross-signal analytics ───────────────────────────
419    fn add_comment(
420        &self,
421        trace_id: &str,
422        span_id: Option<&str>,
423        author: &str,
424        body: &str,
425    ) -> Result<TraceComment> {
426        #[cfg(feature = "duckdb")]
427        {
428            self.inner.add_comment(trace_id, span_id, author, body)
429        }
430        #[cfg(not(feature = "duckdb"))]
431        {
432            self.comments.add(trace_id, span_id, author, body)
433        }
434    }
435    fn get_comments(&self, trace_id: &str) -> Result<Vec<TraceComment>> {
436        #[cfg(feature = "duckdb")]
437        {
438            self.inner.get_comments(trace_id)
439        }
440        #[cfg(not(feature = "duckdb"))]
441        {
442            self.comments.get(trace_id)
443        }
444    }
445    fn query_summary(&self, last_seconds: i64, service: Option<&str>) -> Result<SummaryReport> {
446        #[cfg(feature = "duckdb")]
447        {
448            self.inner.query_summary(last_seconds, service)
449        }
450        #[cfg(not(feature = "duckdb"))]
451        {
452            self.summary_native(last_seconds, service)
453        }
454    }
455    fn query_anomalies(
456        &self,
457        current_seconds: i64,
458        baseline_seconds: i64,
459        service: Option<&str>,
460    ) -> Result<AnomalyReport> {
461        #[cfg(feature = "duckdb")]
462        {
463            self
464                .inner
465                .query_anomalies(current_seconds, baseline_seconds, service)
466        }
467        #[cfg(not(feature = "duckdb"))]
468        {
469            self.anomalies_native(current_seconds, baseline_seconds, service)
470        }
471    }
472    fn query_correlate(&self, trace_id: &str) -> Result<Option<CorrelateReport>> {
473        #[cfg(feature = "duckdb")]
474        {
475            self.inner.query_correlate(trace_id)
476        }
477        #[cfg(not(feature = "duckdb"))]
478        {
479            self.correlate_native(trace_id)
480        }
481    }
482    fn query_sql(&self, sql: &str) -> Result<Vec<serde_json::Value>> {
483        #[cfg(feature = "duckdb")]
484        {
485            self.inner.query_sql(sql)
486        }
487        #[cfg(not(feature = "duckdb"))]
488        {
489            let _ = sql;
490            bail!("SQL queries require a build with the `duckdb` feature")
491        }
492    }
493
494    fn flush(&self) -> Result<()> {
495        // Graceful-shutdown flush: tighten the hot tier so a restart/standby
496        // replays less WAL. WAL fsync already guarantees durability.
497        self.hot.flush()
498    }
499
500    /// Standby entrypoint: durably accept a framed WAL record shipped from a
501    /// leader and bring local state up to it. Mirrors the leader's write
502    /// discipline (append → apply → consume) so the standby's WAL, hot tier, and
503    /// projection stay byte-identical and itself replayable — the basis for
504    /// promotion on leader loss (§5.1).
505    fn apply_framed_wal(&self, framed: &[u8]) -> Result<()> {
506        let record = WalRecord::decode(framed)?;
507        self.wal.append_framed(framed)?;
508        match &record {
509            WalRecord::Spans(s) => self.apply_spans(s)?,
510            WalRecord::Logs(l) => self.apply_logs(l)?,
511            WalRecord::Metrics(m) => self.apply_metrics(m)?,
512        }
513        self.wal.mark_applied()?;
514        Ok(())
515    }
516}
517
518impl TaelBackend {
519    fn summary_native(&self, last_seconds: i64, service: Option<&str>) -> Result<SummaryReport> {
520        let spans = self.query_traces(&TraceQuery {
521            service: service.map(str::to_string),
522            last_seconds: Some(last_seconds),
523            limit: Some(u32::MAX),
524            ..Default::default()
525        })?;
526        let logs = self.query_logs(&LogQuery {
527            service: service.map(str::to_string),
528            last_seconds: Some(last_seconds),
529            limit: Some(u32::MAX),
530            ..Default::default()
531        })?;
532        let metrics = self.query_metrics(&MetricQuery {
533            service: service.map(str::to_string),
534            last_seconds: Some(last_seconds),
535            limit: Some(u32::MAX),
536            ..Default::default()
537        })?;
538        Ok(SummaryReport {
539            window_seconds: last_seconds,
540            service_filter: service.map(str::to_string),
541            traces: trace_summary(&spans),
542            top_services: service_summaries(&spans),
543            top_error_operations: error_operations(&spans),
544            logs: log_summary(&logs),
545            metrics: metric_summary(&metrics),
546        })
547    }
548
549    fn anomalies_native(
550        &self,
551        current_seconds: i64,
552        baseline_seconds: i64,
553        service: Option<&str>,
554    ) -> Result<AnomalyReport> {
555        let current = self.query_traces(&TraceQuery {
556            service: service.map(str::to_string),
557            last_seconds: Some(current_seconds),
558            limit: Some(u32::MAX),
559            ..Default::default()
560        })?;
561        let baseline = self.query_traces(&TraceQuery {
562            service: service.map(str::to_string),
563            last_seconds: Some(baseline_seconds),
564            limit: Some(u32::MAX),
565            ..Default::default()
566        })?;
567        Ok(AnomalyReport {
568            current_seconds,
569            baseline_seconds,
570            service_filter: service.map(str::to_string),
571            anomalies: anomalies(&current, &baseline),
572        })
573    }
574
575    fn correlate_native(&self, trace_id: &str) -> Result<Option<CorrelateReport>> {
576        let spans = self.get_trace(trace_id)?;
577        if spans.is_empty() {
578            return Ok(None);
579        }
580        let start = spans
581            .iter()
582            .map(|s| s.start_time)
583            .min()
584            .unwrap_or_else(chrono::Utc::now);
585        let end = spans
586            .iter()
587            .map(|s| s.end_time)
588            .max()
589            .unwrap_or_else(chrono::Utc::now);
590        let services = spans
591            .iter()
592            .map(|s| s.service.clone())
593            .collect::<HashSet<_>>()
594            .into_iter()
595            .collect();
596        let logs = self.query_logs(&LogQuery {
597            trace_id: Some(trace_id.to_string()),
598            limit: Some(500),
599            ..Default::default()
600        })?;
601        let service = spans.first().map(|s| s.service.clone());
602        let metrics = self.query_metrics(&MetricQuery {
603            service,
604            limit: Some(500),
605            ..Default::default()
606        })?;
607        Ok(Some(CorrelateReport {
608            trace_id: trace_id.to_string(),
609            span_count: spans.len(),
610            services,
611            start_time: start.to_rfc3339(),
612            end_time: end.to_rfc3339(),
613            duration_ms: (end - start).num_microseconds().unwrap_or(0) as f64 / 1000.0,
614            error_count: spans
615                .iter()
616                .filter(|s| matches!(s.status, SpanStatus::Error))
617                .count() as i64,
618            logs,
619            metrics,
620        }))
621    }
622}
623
624fn trace_summary(spans: &[Span]) -> TraceSummary {
625    let mut durations: Vec<f64> = spans.iter().map(|s| s.duration_ms).collect();
626    durations.sort_by(|a, b| a.total_cmp(b));
627    let trace_count = spans.iter().map(|s| &s.trace_id).collect::<HashSet<_>>().len() as i64;
628    let error_count = spans
629        .iter()
630        .filter(|s| matches!(s.status, SpanStatus::Error))
631        .count() as i64;
632    TraceSummary {
633        span_count: spans.len() as i64,
634        trace_count,
635        error_count,
636        error_rate: ratio(error_count, spans.len() as i64),
637        avg_ms: if durations.is_empty() {
638            0.0
639        } else {
640            durations.iter().sum::<f64>() / durations.len() as f64
641        },
642        max_ms: durations.last().copied().unwrap_or(0.0),
643        p50_ms: percentile(&durations, 0.50),
644        p95_ms: percentile(&durations, 0.95),
645        p99_ms: percentile(&durations, 0.99),
646    }
647}
648
649fn service_summaries(spans: &[Span]) -> Vec<ServiceSummary> {
650    let mut by_service: HashMap<String, Vec<Span>> = HashMap::new();
651    for span in spans {
652        by_service
653            .entry(span.service.clone())
654            .or_default()
655            .push(span.clone());
656    }
657    let mut rows: Vec<ServiceSummary> = by_service
658        .into_iter()
659        .map(|(service, spans)| {
660            let summary = trace_summary(&spans);
661            ServiceSummary {
662                service,
663                span_count: summary.span_count,
664                error_rate: summary.error_rate,
665                p95_ms: summary.p95_ms,
666            }
667        })
668        .collect();
669    rows.sort_by(|a, b| b.span_count.cmp(&a.span_count));
670    rows.truncate(10);
671    rows
672}
673
674fn error_operations(spans: &[Span]) -> Vec<ErrorOperation> {
675    let mut counts: HashMap<(String, String), i64> = HashMap::new();
676    for span in spans
677        .iter()
678        .filter(|s| matches!(s.status, SpanStatus::Error))
679    {
680        *counts
681            .entry((span.service.clone(), span.operation.clone()))
682            .or_default() += 1;
683    }
684    let mut rows: Vec<ErrorOperation> = counts
685        .into_iter()
686        .map(|((service, operation), error_count)| ErrorOperation {
687            service,
688            operation,
689            error_count,
690        })
691        .collect();
692    rows.sort_by(|a, b| b.error_count.cmp(&a.error_count));
693    rows.truncate(10);
694    rows
695}
696
697fn log_summary(logs: &[LogRecord]) -> LogSummary {
698    let mut summary = LogSummary {
699        total: logs.len() as i64,
700        ..Default::default()
701    };
702    for log in logs {
703        match log.severity {
704            LogSeverity::Error | LogSeverity::Fatal => summary.error += 1,
705            LogSeverity::Warn => summary.warn += 1,
706            LogSeverity::Info => summary.info += 1,
707            LogSeverity::Debug => summary.debug += 1,
708            _ => {}
709        }
710    }
711    summary
712}
713
714fn metric_summary(metrics: &[MetricPoint]) -> MetricSummary {
715    MetricSummary {
716        point_count: metrics.len() as i64,
717        unique_names: metrics.iter().map(|m| &m.name).collect::<HashSet<_>>().len() as i64,
718    }
719}
720
721fn anomalies(current: &[Span], baseline: &[Span]) -> Vec<Anomaly> {
722    let cur = service_summaries(current);
723    let base = service_summaries(baseline);
724    let base_map: HashMap<String, ServiceSummary> =
725        base.into_iter().map(|s| (s.service.clone(), s)).collect();
726    let mut rows = Vec::new();
727    for c in cur {
728        let Some(b) = base_map.get(&c.service) else {
729            continue;
730        };
731        if c.error_rate > b.error_rate + 0.05 {
732            rows.push(Anomaly {
733                service: c.service.clone(),
734                kind: "error_rate".to_string(),
735                severity: if c.error_rate > b.error_rate + 0.20 {
736                    "high".to_string()
737                } else {
738                    "medium".to_string()
739                },
740                current: c.error_rate,
741                baseline: b.error_rate,
742                delta: c.error_rate - b.error_rate,
743                description: format!(
744                    "{} error rate increased from {:.1}% to {:.1}%",
745                    c.service,
746                    b.error_rate * 100.0,
747                    c.error_rate * 100.0
748                ),
749            });
750        }
751        if c.p95_ms > b.p95_ms * 1.5 && c.p95_ms - b.p95_ms > 25.0 {
752            rows.push(Anomaly {
753                service: c.service.clone(),
754                kind: "p95_latency".to_string(),
755                severity: if c.p95_ms > b.p95_ms * 2.0 {
756                    "high".to_string()
757                } else {
758                    "medium".to_string()
759                },
760                current: c.p95_ms,
761                baseline: b.p95_ms,
762                delta: c.p95_ms - b.p95_ms,
763                description: format!(
764                    "{} p95 latency increased from {:.1}ms to {:.1}ms",
765                    c.service, b.p95_ms, c.p95_ms
766                ),
767            });
768        }
769    }
770    rows
771}
772
773fn percentile(sorted: &[f64], p: f64) -> f64 {
774    if sorted.is_empty() {
775        return 0.0;
776    }
777    let idx = ((sorted.len() - 1) as f64 * p).round() as usize;
778    sorted[idx]
779}
780
781fn ratio(numerator: i64, denominator: i64) -> f64 {
782    if denominator == 0 {
783        0.0
784    } else {
785        numerator as f64 / denominator as f64
786    }
787}
788
789#[cfg(test)]
790mod tests {
791    use super::*;
792    use crate::storage::models::{SpanKind, SpanStatus};
793    use chrono::Utc;
794    use std::collections::HashMap;
795
796    /// Removes a walrus namespace dir (`wal_files/<key>`) on drop.
797    struct NsGuard(String);
798    impl Drop for NsGuard {
799        fn drop(&mut self) {
800            let _ = std::fs::remove_dir_all(format!("wal_files/{}", self.0));
801        }
802    }
803
804    fn backend() -> (TaelBackend, tempfile::TempDir, NsGuard) {
805        let dir = tempfile::tempdir().unwrap();
806        let key = format!("tael-test-backend-{}", uuid::Uuid::new_v4());
807        let b = TaelBackend::with_wal_key(dir.path().to_str().unwrap(), &key).unwrap();
808        (b, dir, NsGuard(key))
809    }
810
811    fn span(trace: &str, sid: &str, svc: &str, dur: f64, status: SpanStatus) -> Span {
812        let now = Utc::now();
813        Span {
814            trace_id: trace.into(),
815            span_id: sid.into(),
816            parent_span_id: None,
817            service: svc.into(),
818            operation: "op".into(),
819            start_time: now,
820            end_time: now,
821            duration_ms: dur,
822            status,
823            attributes: HashMap::new(),
824            events: vec![],
825            kind: SpanKind::Internal,
826            llm: None,
827        }
828    }
829
830    #[test]
831    fn get_trace_reconstructs_span_tree_from_hot_tier() {
832        let (b, _d, _g) = backend();
833        b.insert_spans(&[
834            span("t1", "s1", "api", 10.0, SpanStatus::Ok),
835            span("t1", "s2", "db", 20.0, SpanStatus::Ok),
836            span("t2", "s3", "api", 5.0, SpanStatus::Error),
837        ])
838        .unwrap();
839
840        let trace = b.get_trace("t1").unwrap();
841        assert_eq!(trace.len(), 2);
842        assert!(trace.iter().all(|s| s.trace_id == "t1"));
843        assert_eq!(b.get_trace("t2").unwrap().len(), 1);
844        assert!(b.get_trace("missing").unwrap().is_empty());
845    }
846
847    #[test]
848    #[cfg(feature = "duckdb")]
849    fn query_traces_filters_match_duckdb() {
850        let (b, _d, _g) = backend();
851        let spans = vec![
852            span("t1", "s1", "api", 10.0, SpanStatus::Ok),
853            span("t2", "s2", "db", 600.0, SpanStatus::Error),
854            span("t3", "s3", "api", 50.0, SpanStatus::Error),
855        ];
856        b.insert_spans(&spans).unwrap();
857
858        // Independent DuckDB with identical data = the parity oracle.
859        let oracle_dir = tempfile::tempdir().unwrap();
860        let oracle = DuckDbStore::new(oracle_dir.path().to_str().unwrap()).unwrap();
861        oracle.insert_spans(&spans).unwrap();
862
863        let queries = [
864            TraceQuery {
865                service: Some("api".into()),
866                ..Default::default()
867            },
868            TraceQuery {
869                status: Some("error".into()),
870                ..Default::default()
871            },
872            TraceQuery {
873                min_duration_ms: Some(100.0),
874                ..Default::default()
875            },
876            TraceQuery::default(),
877        ];
878        for q in &queries {
879            let mut hot: Vec<String> = b
880                .query_traces(q)
881                .unwrap()
882                .into_iter()
883                .map(|s| s.span_id)
884                .collect();
885            let mut duck: Vec<String> = oracle
886                .query_traces(q)
887                .unwrap()
888                .into_iter()
889                .map(|s| s.span_id)
890                .collect();
891            hot.sort();
892            duck.sort();
893            assert_eq!(hot, duck, "hot tier and DuckDB disagree for {q:?}");
894        }
895    }
896
897    #[test]
898    fn list_services_aggregates_from_hot_tier() {
899        let (b, _d, _g) = backend();
900        b.insert_spans(&[
901            span("t1", "s1", "api", 10.0, SpanStatus::Ok),
902            span("t1", "s2", "api", 30.0, SpanStatus::Error),
903            span("t2", "s3", "db", 20.0, SpanStatus::Ok),
904        ])
905        .unwrap();
906
907        let services = b.list_services().unwrap();
908        let api = services.iter().find(|s| s.name == "api").unwrap();
909        assert_eq!(api.span_count, 2);
910        assert_eq!(api.trace_count, 1);
911        assert_eq!(api.avg_duration_ms, 20.0);
912        assert!((api.error_rate - 0.5).abs() < 1e-9);
913    }
914
915    #[test]
916    fn standby_rebuilds_identical_state_from_shipped_wal() {
917        use crate::storage::models::{LogRecord, LogSeverity};
918
919        // The standby: a normal backend that is never written to directly.
920        let (standby, _sd, _sg) = backend();
921        let standby = Arc::new(standby);
922
923        // A WAL sink that ships each framed record into the standby — the
924        // in-process stand-in for the (deferred) network transport.
925        struct ReplicaSink(Arc<TaelBackend>);
926        impl WalSink for ReplicaSink {
927            fn append_framed(&self, framed: &[u8]) -> Result<()> {
928                self.0.apply_framed_wal(framed)
929            }
930        }
931
932        // The leader, with the standby attached as a replication sink.
933        let leader_dir = tempfile::tempdir().unwrap();
934        let leader_key = format!("tael-test-leader-{}", uuid::Uuid::new_v4());
935        let _lg = NsGuard(leader_key.clone());
936        let leader = TaelBackend::with_wal_key_and_sinks(
937            leader_dir.path().to_str().unwrap(),
938            &leader_key,
939            vec![Arc::new(ReplicaSink(Arc::clone(&standby)))],
940            None, // synchronous: require all (one) standbys
941        )
942        .unwrap();
943
944        // Write a mix of signals to the leader only.
945        leader
946            .insert_spans(&[
947                span("t1", "s1", "api", 10.0, SpanStatus::Ok),
948                span("t1", "s2", "db", 20.0, SpanStatus::Ok),
949                span("t2", "s3", "api", 5.0, SpanStatus::Error),
950            ])
951            .unwrap();
952        leader
953            .insert_logs(&[LogRecord {
954                timestamp: Utc::now(),
955                observed_timestamp: Utc::now(),
956                trace_id: Some("t1".into()),
957                span_id: None,
958                severity: LogSeverity::Error,
959                severity_text: "ERROR".into(),
960                body: "boom".into(),
961                service: "api".into(),
962                attributes: HashMap::new(),
963                body_sha256: None,
964            }])
965            .unwrap();
966
967        // The standby reconstructed identical state purely from the shipped WAL.
968        assert_eq!(standby.get_trace("t1").unwrap().len(), 2);
969        assert_eq!(standby.get_trace("t2").unwrap().len(), 1);
970        let leader_traces = leader.query_traces(&TraceQuery::default()).unwrap();
971        let standby_traces = standby.query_traces(&TraceQuery::default()).unwrap();
972        assert_eq!(leader_traces.len(), standby_traces.len());
973        assert_eq!(standby_traces.len(), 3);
974    }
975
976    #[test]
977    fn logs_and_metrics_round_trip_through_hot_tier() {
978        use crate::storage::models::{LogRecord, LogSeverity, MetricPoint, MetricType};
979        let (b, _d, _g) = backend();
980
981        b.insert_logs(&[LogRecord {
982            timestamp: Utc::now(),
983            observed_timestamp: Utc::now(),
984            trace_id: Some("t1".into()),
985            span_id: None,
986            severity: LogSeverity::Error,
987            severity_text: "ERROR".into(),
988            body: "connection refused".into(),
989            service: "api".into(),
990            attributes: HashMap::new(),
991            body_sha256: None,
992        }])
993        .unwrap();
994        let logs = b
995            .query_logs(&LogQuery {
996                severity: Some("error".into()),
997                ..Default::default()
998            })
999            .unwrap();
1000        assert_eq!(logs.len(), 1);
1001        assert_eq!(logs[0].body, "connection refused");
1002
1003        b.insert_metrics(&[MetricPoint {
1004            timestamp: Utc::now(),
1005            service: "api".into(),
1006            name: "http_requests_total".into(),
1007            metric_type: MetricType::Sum,
1008            value: 42.0,
1009            unit: "1".into(),
1010            attributes: HashMap::new(),
1011        }])
1012        .unwrap();
1013        let metrics = b
1014            .query_metrics(&MetricQuery {
1015                name: Some("http_requests_total".into()),
1016                ..Default::default()
1017            })
1018            .unwrap();
1019        assert_eq!(metrics.len(), 1);
1020        assert_eq!(metrics[0].value, 42.0);
1021    }
1022
1023    #[test]
1024    fn compaction_moves_spans_to_cold_and_reads_still_union() {
1025        let (b, _d, _g) = backend();
1026        b.insert_spans(&[
1027            span("t1", "s1", "api", 10.0, SpanStatus::Ok),
1028            span("t1", "s2", "db", 20.0, SpanStatus::Ok),
1029            span("t2", "s3", "api", 5.0, SpanStatus::Error),
1030        ])
1031        .unwrap();
1032
1033        // Compact everything (cutoff in the future) → all spans roll to cold.
1034        let moved = b
1035            .compact_spans(Utc::now() + chrono::Duration::seconds(60))
1036            .unwrap();
1037        assert_eq!(moved, 3);
1038        // Hot tier is now empty...
1039        assert!(b.hot.get_trace("t1").unwrap().is_empty());
1040        // ...but the unioned reads still see everything.
1041        assert_eq!(b.get_trace("t1").unwrap().len(), 2);
1042        let all = b.query_traces(&TraceQuery::default()).unwrap();
1043        assert_eq!(all.len(), 3);
1044        let errors = b
1045            .query_traces(&TraceQuery {
1046                status: Some("error".into()),
1047                ..Default::default()
1048            })
1049            .unwrap();
1050        assert_eq!(errors.len(), 1);
1051        assert_eq!(errors[0].span_id, "s3");
1052
1053        // Re-compaction is a no-op (nothing left in hot).
1054        assert_eq!(b.compact_spans(Utc::now()).unwrap(), 0);
1055    }
1056
1057    #[test]
1058    fn logs_metrics_compact_to_cold_and_union_reads() {
1059        use crate::storage::models::{LogRecord, LogSeverity, MetricPoint, MetricType};
1060        let (b, _d, _g) = backend();
1061        b.insert_logs(&[LogRecord {
1062            timestamp: Utc::now(),
1063            observed_timestamp: Utc::now(),
1064            trace_id: Some("t1".into()),
1065            span_id: None,
1066            severity: LogSeverity::Error,
1067            severity_text: "ERROR".into(),
1068            body: "boom".into(),
1069            service: "api".into(),
1070            attributes: HashMap::new(),
1071            body_sha256: None,
1072        }])
1073        .unwrap();
1074        b.insert_metrics(&[MetricPoint {
1075            timestamp: Utc::now(),
1076            service: "api".into(),
1077            name: "rps".into(),
1078            metric_type: MetricType::Sum,
1079            value: 7.0,
1080            unit: "1".into(),
1081            attributes: HashMap::new(),
1082        }])
1083        .unwrap();
1084
1085        let moved = b
1086            .compact_logs_metrics(Utc::now() + chrono::Duration::seconds(60))
1087            .unwrap();
1088        assert_eq!(moved, 2);
1089
1090        // Served from cold via union after the hot tier emptied.
1091        let logs = b.query_logs(&LogQuery::default()).unwrap();
1092        assert_eq!(logs.len(), 1);
1093        assert_eq!(logs[0].body, "boom");
1094        let metrics = b.query_metrics(&MetricQuery::default()).unwrap();
1095        assert_eq!(metrics.len(), 1);
1096        assert_eq!(metrics[0].value, 7.0);
1097    }
1098
1099    #[test]
1100    fn full_text_search_filters_traces_by_payload() {
1101        let (b, _d, _g) = backend();
1102        b.insert_spans(&[
1103            span("t1", "s1", "llm-proxy", 100.0, SpanStatus::Ok),
1104            span("t2", "s2", "llm-proxy", 100.0, SpanStatus::Ok),
1105        ])
1106        .unwrap();
1107        // Index payload text the way ingestion would.
1108        let idx = b.search_index();
1109        idx.index_span("t1", "s1", "summarize the rate limit policy")
1110            .unwrap();
1111        idx.index_span("t2", "s2", "translate to French").unwrap();
1112        idx.commit().unwrap();
1113
1114        let hits = b
1115            .query_traces(&TraceQuery {
1116                text: Some("rate limit".into()),
1117                ..Default::default()
1118            })
1119            .unwrap();
1120        assert_eq!(hits.len(), 1);
1121        assert_eq!(hits[0].trace_id, "t1");
1122
1123        // Non-matching query → empty.
1124        let none = b
1125            .query_traces(&TraceQuery {
1126                text: Some("quantum".into()),
1127                ..Default::default()
1128            })
1129            .unwrap();
1130        assert!(none.is_empty());
1131    }
1132
1133    #[test]
1134    fn survives_reopen_via_persistent_hot_tier() {
1135        let dir = tempfile::tempdir().unwrap();
1136        let path = dir.path().to_str().unwrap();
1137        let key = format!("tael-test-reopen-{}", uuid::Uuid::new_v4());
1138        {
1139            let b = TaelBackend::with_wal_key(path, &key).unwrap();
1140            b.insert_spans(&[span("t1", "s1", "api", 10.0, SpanStatus::Ok)])
1141                .unwrap();
1142        }
1143        // Reopen: data persists (hot tier + DuckDB are durable).
1144        let b2 = TaelBackend::with_wal_key(path, &key).unwrap();
1145        assert_eq!(b2.get_trace("t1").unwrap().len(), 1);
1146        let _ = std::fs::remove_dir_all(format!("wal_files/{key}"));
1147    }
1148}