Skip to main content

tael_server/storage/backend/
mod.rs

1//! `TaelBackend` — the purpose-built storage engine (see
2//! `docs/tael-backend-design.md`). Built incrementally behind the `Store`
3//! trait so it ships opt-in alongside `DuckDbStore`.
4//!
5//! **Phase 3:** durable WAL write path + crash-gap replay.
6//! **Phase 4 (this file):** an `fjall` LSM hot tier serves the core per-signal
7//! reads (`query_traces`, `get_trace`, `list_services`, `query_logs`,
8//! `query_metrics`). Writes fan out to the WAL, the hot tier, and an inner
9//! `DuckDbStore` projection that still backs the heavier analytics
10//! (`query_summary`/`anomalies`/`correlate`, PromQL) until DataFusion (Phase 6)
11//! serves those from hot+cold. The double-write is the explicit transitional
12//! state; `--storage=tael-backend` is always a complete backend.
13
14mod cold;
15mod hot;
16mod wal;
17
18use anyhow::Result;
19
20use super::DuckDbStore;
21use super::Store;
22use super::models::{
23    AnomalyReport, CorrelateReport, LogQuery, LogRecord, MetricPoint, MetricQuery, ServiceInfo,
24    Span, SummaryReport, TraceComment, TraceQuery,
25};
26use std::sync::Arc;
27
28use super::SearchIndex;
29use cold::ColdTier;
30use hot::HotTier;
31use wal::WalLog;
32pub use wal::{WalRecord, WalSink};
33
34pub struct TaelBackend {
35    /// LSM hot tier — serves recent reads.
36    hot: HotTier,
37    /// Parquet cold tier — aged spans rolled out of the hot tier.
38    cold: ColdTier,
39    /// Projection backing analytics not yet ported to the hot tier.
40    inner: DuckDbStore,
41    /// Full-text index over LLM payloads (shared with the ingest path).
42    search: Arc<SearchIndex>,
43    wal: WalLog,
44}
45
46impl TaelBackend {
47    pub fn new(data_dir: &str) -> Result<Self> {
48        Self::with_wal_key(data_dir, "tael-backend")
49    }
50
51    /// Like [`Self::new`] but with an explicit WAL namespace key — lets tests
52    /// run isolated instances (the WAL key is process-global in walrus).
53    pub fn with_wal_key(data_dir: &str, wal_key: &str) -> Result<Self> {
54        Self::with_wal_key_and_sinks(data_dir, wal_key, Vec::new(), None)
55    }
56
57    /// Like [`Self::with_wal_key`] but with WAL replication sinks attached: this
58    /// backend runs as a **leader** that ships every appended record to its
59    /// standbys before acking the write (`docs/tael-server-scaling-ha.md` §5.1).
60    /// `required_acks` is how many standbys must confirm before a write returns
61    /// (`None` = all = fully synchronous; `Some(0)` = async best-effort). With
62    /// no sinks the write path is unchanged. A standby on the receiving end
63    /// applies shipped records via `Store::apply_framed_wal`.
64    pub fn with_wal_key_and_sinks(
65        data_dir: &str,
66        wal_key: &str,
67        sinks: Vec<Arc<dyn WalSink>>,
68        required_acks: Option<usize>,
69    ) -> Result<Self> {
70        let hot = HotTier::open(data_dir)?;
71        let cold = ColdTier::open(data_dir)?;
72        let inner = DuckDbStore::new(data_dir)?;
73        let search = Arc::new(SearchIndex::open(data_dir)?);
74        let mut wal = if sinks.is_empty() {
75            WalLog::new_for_key(wal_key)?
76        } else {
77            WalLog::new_for_key_with_sinks(wal_key, sinks)?
78        };
79        if let Some(n) = required_acks {
80            wal = wal.with_required_acks(n);
81        }
82        let backend = Self {
83            hot,
84            cold,
85            inner,
86            search,
87            wal,
88        };
89        backend.replay()?;
90        Ok(backend)
91    }
92
93    /// The shared payload search index — handed to the ingest path so prompt/
94    /// completion text is indexed at write time (the text isn't retained on the
95    /// span itself, only its blob hashes).
96    pub fn search_index(&self) -> Arc<SearchIndex> {
97        Arc::clone(&self.search)
98    }
99
100    /// Roll spans older than `cutoff` out of the LSM hot tier into Parquet.
101    /// Returns the number of spans compacted. Safe to call repeatedly.
102    pub fn compact_spans(&self, cutoff: chrono::DateTime<chrono::Utc>) -> Result<usize> {
103        let cutoff_ns = cutoff.timestamp_nanos_opt().unwrap_or(0);
104        let evicted = self.hot.evict_spans_before(cutoff_ns)?;
105        if evicted.is_empty() {
106            return Ok(0);
107        }
108        // Write to cold first, then the hot eviction is already done; if we
109        // crash between, the spans remain in the DuckDB projection and the WAL.
110        self.cold.write_spans(&evicted)?;
111        tracing::info!(
112            spans = evicted.len(),
113            "tael-backend: compacted spans to cold tier"
114        );
115        Ok(evicted.len())
116    }
117
118    /// Roll aged logs/metrics out of the hot tier into Parquet. Returns the
119    /// total number of records compacted across both signals.
120    pub fn compact_logs_metrics(&self, cutoff: chrono::DateTime<chrono::Utc>) -> Result<usize> {
121        let cutoff_ns = cutoff.timestamp_nanos_opt().unwrap_or(0);
122        let logs = self.hot.evict_logs_before(cutoff_ns)?;
123        if !logs.is_empty() {
124            self.cold.write_logs(&logs)?;
125        }
126        let metrics = self.hot.evict_metrics_before(cutoff_ns)?;
127        if !metrics.is_empty() {
128            self.cold.write_metrics(&metrics)?;
129            // Downsample to 5m rollups alongside the raw cold write, so trends
130            // survive once raw points are dropped by retention.
131            self.cold.write_downsampled(&metrics)?;
132        }
133        let n = logs.len() + metrics.len();
134        if n > 0 {
135            tracing::info!(
136                logs = logs.len(),
137                metrics = metrics.len(),
138                "tael-backend: compacted logs/metrics to cold tier"
139            );
140        }
141        Ok(n)
142    }
143
144    /// Collect every blob hash still referenced by a live row — LLM prompt and
145    /// completion hashes on spans, and `body_sha256` on logs — across hot and
146    /// cold tiers. Drives blob GC (anything not here is unreferenced).
147    pub fn collect_live_blob_hashes(&self) -> Result<std::collections::HashSet<String>> {
148        use super::models::{LogQuery, TraceQuery};
149        let mut live = std::collections::HashSet::new();
150        // Spans (hot∪cold via the unioned read path), with a high limit.
151        let spans = self.query_traces(&TraceQuery {
152            limit: Some(u32::MAX),
153            ..Default::default()
154        })?;
155        for s in spans {
156            if let Some(llm) = s.llm {
157                live.extend(llm.prompt_sha256);
158                live.extend(llm.completion_sha256);
159            }
160        }
161        let logs = self.query_logs(&LogQuery {
162            limit: Some(u32::MAX),
163            ..Default::default()
164        })?;
165        for l in logs {
166            live.extend(l.body_sha256);
167        }
168        Ok(live)
169    }
170
171    /// Drop cold partitions (spans/logs/metrics) whose date is older than
172    /// `keep`. Returns the total number of partitions removed. (Metadata GC;
173    /// payload-blob GC runs separately in the maintenance task.)
174    pub fn enforce_span_retention(&self, keep: chrono::DateTime<chrono::Utc>) -> Result<usize> {
175        let cutoff_date = keep.format("%Y-%m-%d").to_string();
176        let dropped = self.cold.drop_partitions_before(&cutoff_date)?;
177        if dropped > 0 {
178            tracing::info!(
179                partitions = dropped,
180                "tael-backend: dropped expired cold partitions"
181            );
182        }
183        Ok(dropped)
184    }
185
186    /// Apply a batch to every projection (hot tier + DuckDB). Used by both the
187    /// live write path and WAL replay.
188    fn apply_spans(&self, spans: &[Span]) -> Result<()> {
189        self.hot.insert_spans(spans)?;
190        self.inner.insert_spans(spans)
191    }
192    fn apply_logs(&self, logs: &[LogRecord]) -> Result<()> {
193        self.hot.insert_logs(logs)?;
194        self.inner.insert_logs(logs)
195    }
196    fn apply_metrics(&self, metrics: &[MetricPoint]) -> Result<()> {
197        self.hot.insert_metrics(metrics)?;
198        self.inner.insert_metrics(metrics)
199    }
200
201    /// Re-apply any WAL records left unconsumed by a crash, then advance past
202    /// them (they are consumed by `drain`).
203    fn replay(&self) -> Result<()> {
204        let records = self.wal.drain()?;
205        if records.is_empty() {
206            return Ok(());
207        }
208        let mut spans = 0usize;
209        let mut logs = 0usize;
210        let mut metrics = 0usize;
211        for record in records {
212            match record {
213                WalRecord::Spans(s) => {
214                    spans += s.len();
215                    self.apply_spans(&s)?;
216                }
217                WalRecord::Logs(l) => {
218                    logs += l.len();
219                    self.apply_logs(&l)?;
220                }
221                WalRecord::Metrics(m) => {
222                    metrics += m.len();
223                    self.apply_metrics(&m)?;
224                }
225            }
226        }
227        tracing::info!(spans, logs, metrics, "tael-backend: replayed WAL");
228        Ok(())
229    }
230}
231
232impl Store for TaelBackend {
233    // ── Writes: WAL → apply (hot + projection) → mark applied ───────
234    fn insert_spans(&self, spans: &[Span]) -> Result<()> {
235        self.wal.append_spans(spans)?;
236        self.apply_spans(spans)?;
237        self.wal.mark_applied()?;
238        Ok(())
239    }
240
241    fn insert_logs(&self, logs: &[LogRecord]) -> Result<()> {
242        self.wal.append_logs(logs)?;
243        self.apply_logs(logs)?;
244        self.wal.mark_applied()?;
245        Ok(())
246    }
247
248    fn insert_metrics(&self, metrics: &[MetricPoint]) -> Result<()> {
249        self.wal.append_metrics(metrics)?;
250        self.apply_metrics(metrics)?;
251        self.wal.mark_applied()?;
252        Ok(())
253    }
254
255    // ── Core reads: hot tier, unioned with the cold tier ────────────
256    fn query_traces(&self, query: &TraceQuery) -> Result<Vec<Span>> {
257        // Full-text payload filter: restrict to traces whose LLM prompts/
258        // completions match, then apply the rest of the query over those spans.
259        if let Some(ref text) = query.text {
260            let trace_ids = self.search.search_trace_ids(text, 1000)?;
261            if trace_ids.is_empty() {
262                return Ok(Vec::new());
263            }
264            let cutoff = query
265                .last_seconds
266                .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
267            let limit = query.limit.unwrap_or(100) as usize;
268            let mut matched: Vec<Span> = Vec::new();
269            for tid in &trace_ids {
270                for s in self.get_trace(tid)? {
271                    if hot::span_matches(&s, query, cutoff) {
272                        matched.push(s);
273                    }
274                }
275            }
276            matched.sort_by(|a, b| b.start_time.cmp(&a.start_time));
277            matched.truncate(limit);
278            return Ok(matched);
279        }
280        // Hot holds the most-recent spans; cold holds older ones. Newest-first
281        // ordering means hot results lead; only dip into cold to fill the limit.
282        let mut results = self.hot.query_traces(query)?;
283        let limit = query.limit.unwrap_or(100) as usize;
284        if results.len() < limit {
285            let cutoff = query
286                .last_seconds
287                .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
288            let mut cold: Vec<Span> = self
289                .cold
290                .all_spans()?
291                .into_iter()
292                .filter(|s| hot::span_matches(s, query, cutoff))
293                .collect();
294            cold.sort_by(|a, b| b.start_time.cmp(&a.start_time));
295            for s in cold {
296                if results.len() >= limit {
297                    break;
298                }
299                results.push(s);
300            }
301        }
302        Ok(results)
303    }
304    fn get_trace(&self, trace_id: &str) -> Result<Vec<Span>> {
305        let mut spans = self.hot.get_trace(trace_id)?;
306        let mut seen: std::collections::HashSet<String> =
307            spans.iter().map(|s| s.span_id.clone()).collect();
308        // Union with cold; dedup by span_id in case of transient overlap during
309        // compaction.
310        for s in self.cold.get_trace(trace_id)? {
311            if seen.insert(s.span_id.clone()) {
312                spans.push(s);
313            }
314        }
315        spans.sort_by_key(|s| s.start_time);
316        Ok(spans)
317    }
318    fn list_services(&self) -> Result<Vec<ServiceInfo>> {
319        self.hot.list_services()
320    }
321    fn query_logs(&self, query: &LogQuery) -> Result<Vec<LogRecord>> {
322        let mut results = self.hot.query_logs(query)?;
323        let limit = query.limit.unwrap_or(100) as usize;
324        if results.len() < limit {
325            let cutoff = query
326                .last_seconds
327                .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
328            let mut cold: Vec<LogRecord> = self
329                .cold
330                .all_logs()?
331                .into_iter()
332                .filter(|l| hot::log_matches(l, query, cutoff))
333                .collect();
334            cold.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
335            for l in cold {
336                if results.len() >= limit {
337                    break;
338                }
339                results.push(l);
340            }
341        }
342        Ok(results)
343    }
344    fn query_metrics(&self, query: &MetricQuery) -> Result<Vec<MetricPoint>> {
345        let mut results = self.hot.query_metrics(query)?;
346        let limit = query.limit.unwrap_or(500) as usize;
347        if results.len() < limit {
348            let cutoff = query
349                .last_seconds
350                .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
351            let mut cold: Vec<MetricPoint> = self
352                .cold
353                .all_metrics()?
354                .into_iter()
355                .filter(|m| hot::metric_matches(m, query, cutoff))
356                .collect();
357            cold.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
358            for m in cold {
359                if results.len() >= limit {
360                    break;
361                }
362                results.push(m);
363            }
364        }
365        Ok(results)
366    }
367
368    // ── Comments & heavier analytics: projection (for now) ──────────
369    fn add_comment(
370        &self,
371        trace_id: &str,
372        span_id: Option<&str>,
373        author: &str,
374        body: &str,
375    ) -> Result<TraceComment> {
376        self.inner.add_comment(trace_id, span_id, author, body)
377    }
378    fn get_comments(&self, trace_id: &str) -> Result<Vec<TraceComment>> {
379        self.inner.get_comments(trace_id)
380    }
381    fn query_summary(&self, last_seconds: i64, service: Option<&str>) -> Result<SummaryReport> {
382        self.inner.query_summary(last_seconds, service)
383    }
384    fn query_anomalies(
385        &self,
386        current_seconds: i64,
387        baseline_seconds: i64,
388        service: Option<&str>,
389    ) -> Result<AnomalyReport> {
390        self.inner
391            .query_anomalies(current_seconds, baseline_seconds, service)
392    }
393    fn query_correlate(&self, trace_id: &str) -> Result<Option<CorrelateReport>> {
394        self.inner.query_correlate(trace_id)
395    }
396    fn query_sql(&self, sql: &str) -> Result<Vec<serde_json::Value>> {
397        // SQL runs over the DuckDB projection, which retains all signals.
398        self.inner.query_sql(sql)
399    }
400
401    fn flush(&self) -> Result<()> {
402        // Graceful-shutdown flush: tighten the hot tier so a restart/standby
403        // replays less WAL. WAL fsync already guarantees durability.
404        self.hot.flush()
405    }
406
407    /// Standby entrypoint: durably accept a framed WAL record shipped from a
408    /// leader and bring local state up to it. Mirrors the leader's write
409    /// discipline (append → apply → consume) so the standby's WAL, hot tier, and
410    /// projection stay byte-identical and itself replayable — the basis for
411    /// promotion on leader loss (§5.1).
412    fn apply_framed_wal(&self, framed: &[u8]) -> Result<()> {
413        let record = WalRecord::decode(framed)?;
414        self.wal.append_framed(framed)?;
415        match &record {
416            WalRecord::Spans(s) => self.apply_spans(s)?,
417            WalRecord::Logs(l) => self.apply_logs(l)?,
418            WalRecord::Metrics(m) => self.apply_metrics(m)?,
419        }
420        self.wal.mark_applied()?;
421        Ok(())
422    }
423}
424
425#[cfg(test)]
426mod tests {
427    use super::*;
428    use crate::storage::models::{SpanKind, SpanStatus};
429    use chrono::Utc;
430    use std::collections::HashMap;
431
432    /// Removes a walrus namespace dir (`wal_files/<key>`) on drop.
433    struct NsGuard(String);
434    impl Drop for NsGuard {
435        fn drop(&mut self) {
436            let _ = std::fs::remove_dir_all(format!("wal_files/{}", self.0));
437        }
438    }
439
440    fn backend() -> (TaelBackend, tempfile::TempDir, NsGuard) {
441        let dir = tempfile::tempdir().unwrap();
442        let key = format!("tael-test-backend-{}", uuid::Uuid::new_v4());
443        let b = TaelBackend::with_wal_key(dir.path().to_str().unwrap(), &key).unwrap();
444        (b, dir, NsGuard(key))
445    }
446
447    fn span(trace: &str, sid: &str, svc: &str, dur: f64, status: SpanStatus) -> Span {
448        let now = Utc::now();
449        Span {
450            trace_id: trace.into(),
451            span_id: sid.into(),
452            parent_span_id: None,
453            service: svc.into(),
454            operation: "op".into(),
455            start_time: now,
456            end_time: now,
457            duration_ms: dur,
458            status,
459            attributes: HashMap::new(),
460            events: vec![],
461            kind: SpanKind::Internal,
462            llm: None,
463        }
464    }
465
466    #[test]
467    fn get_trace_reconstructs_span_tree_from_hot_tier() {
468        let (b, _d, _g) = backend();
469        b.insert_spans(&[
470            span("t1", "s1", "api", 10.0, SpanStatus::Ok),
471            span("t1", "s2", "db", 20.0, SpanStatus::Ok),
472            span("t2", "s3", "api", 5.0, SpanStatus::Error),
473        ])
474        .unwrap();
475
476        let trace = b.get_trace("t1").unwrap();
477        assert_eq!(trace.len(), 2);
478        assert!(trace.iter().all(|s| s.trace_id == "t1"));
479        assert_eq!(b.get_trace("t2").unwrap().len(), 1);
480        assert!(b.get_trace("missing").unwrap().is_empty());
481    }
482
483    #[test]
484    fn query_traces_filters_match_duckdb() {
485        let (b, _d, _g) = backend();
486        let spans = vec![
487            span("t1", "s1", "api", 10.0, SpanStatus::Ok),
488            span("t2", "s2", "db", 600.0, SpanStatus::Error),
489            span("t3", "s3", "api", 50.0, SpanStatus::Error),
490        ];
491        b.insert_spans(&spans).unwrap();
492
493        // Independent DuckDB with identical data = the parity oracle.
494        let oracle_dir = tempfile::tempdir().unwrap();
495        let oracle = DuckDbStore::new(oracle_dir.path().to_str().unwrap()).unwrap();
496        oracle.insert_spans(&spans).unwrap();
497
498        let queries = [
499            TraceQuery {
500                service: Some("api".into()),
501                ..Default::default()
502            },
503            TraceQuery {
504                status: Some("error".into()),
505                ..Default::default()
506            },
507            TraceQuery {
508                min_duration_ms: Some(100.0),
509                ..Default::default()
510            },
511            TraceQuery::default(),
512        ];
513        for q in &queries {
514            let mut hot: Vec<String> = b
515                .query_traces(q)
516                .unwrap()
517                .into_iter()
518                .map(|s| s.span_id)
519                .collect();
520            let mut duck: Vec<String> = oracle
521                .query_traces(q)
522                .unwrap()
523                .into_iter()
524                .map(|s| s.span_id)
525                .collect();
526            hot.sort();
527            duck.sort();
528            assert_eq!(hot, duck, "hot tier and DuckDB disagree for {q:?}");
529        }
530    }
531
532    #[test]
533    fn list_services_aggregates_from_hot_tier() {
534        let (b, _d, _g) = backend();
535        b.insert_spans(&[
536            span("t1", "s1", "api", 10.0, SpanStatus::Ok),
537            span("t1", "s2", "api", 30.0, SpanStatus::Error),
538            span("t2", "s3", "db", 20.0, SpanStatus::Ok),
539        ])
540        .unwrap();
541
542        let services = b.list_services().unwrap();
543        let api = services.iter().find(|s| s.name == "api").unwrap();
544        assert_eq!(api.span_count, 2);
545        assert_eq!(api.trace_count, 1);
546        assert_eq!(api.avg_duration_ms, 20.0);
547        assert!((api.error_rate - 0.5).abs() < 1e-9);
548    }
549
550    #[test]
551    fn standby_rebuilds_identical_state_from_shipped_wal() {
552        use crate::storage::models::{LogRecord, LogSeverity};
553
554        // The standby: a normal backend that is never written to directly.
555        let (standby, _sd, _sg) = backend();
556        let standby = Arc::new(standby);
557
558        // A WAL sink that ships each framed record into the standby — the
559        // in-process stand-in for the (deferred) network transport.
560        struct ReplicaSink(Arc<TaelBackend>);
561        impl WalSink for ReplicaSink {
562            fn append_framed(&self, framed: &[u8]) -> Result<()> {
563                self.0.apply_framed_wal(framed)
564            }
565        }
566
567        // The leader, with the standby attached as a replication sink.
568        let leader_dir = tempfile::tempdir().unwrap();
569        let leader_key = format!("tael-test-leader-{}", uuid::Uuid::new_v4());
570        let _lg = NsGuard(leader_key.clone());
571        let leader = TaelBackend::with_wal_key_and_sinks(
572            leader_dir.path().to_str().unwrap(),
573            &leader_key,
574            vec![Arc::new(ReplicaSink(Arc::clone(&standby)))],
575            None, // synchronous: require all (one) standbys
576        )
577        .unwrap();
578
579        // Write a mix of signals to the leader only.
580        leader
581            .insert_spans(&[
582                span("t1", "s1", "api", 10.0, SpanStatus::Ok),
583                span("t1", "s2", "db", 20.0, SpanStatus::Ok),
584                span("t2", "s3", "api", 5.0, SpanStatus::Error),
585            ])
586            .unwrap();
587        leader
588            .insert_logs(&[LogRecord {
589                timestamp: Utc::now(),
590                observed_timestamp: Utc::now(),
591                trace_id: Some("t1".into()),
592                span_id: None,
593                severity: LogSeverity::Error,
594                severity_text: "ERROR".into(),
595                body: "boom".into(),
596                service: "api".into(),
597                attributes: HashMap::new(),
598                body_sha256: None,
599            }])
600            .unwrap();
601
602        // The standby reconstructed identical state purely from the shipped WAL.
603        assert_eq!(standby.get_trace("t1").unwrap().len(), 2);
604        assert_eq!(standby.get_trace("t2").unwrap().len(), 1);
605        let leader_traces = leader.query_traces(&TraceQuery::default()).unwrap();
606        let standby_traces = standby.query_traces(&TraceQuery::default()).unwrap();
607        assert_eq!(leader_traces.len(), standby_traces.len());
608        assert_eq!(standby_traces.len(), 3);
609    }
610
611    #[test]
612    fn logs_and_metrics_round_trip_through_hot_tier() {
613        use crate::storage::models::{LogRecord, LogSeverity, MetricPoint, MetricType};
614        let (b, _d, _g) = backend();
615
616        b.insert_logs(&[LogRecord {
617            timestamp: Utc::now(),
618            observed_timestamp: Utc::now(),
619            trace_id: Some("t1".into()),
620            span_id: None,
621            severity: LogSeverity::Error,
622            severity_text: "ERROR".into(),
623            body: "connection refused".into(),
624            service: "api".into(),
625            attributes: HashMap::new(),
626            body_sha256: None,
627        }])
628        .unwrap();
629        let logs = b
630            .query_logs(&LogQuery {
631                severity: Some("error".into()),
632                ..Default::default()
633            })
634            .unwrap();
635        assert_eq!(logs.len(), 1);
636        assert_eq!(logs[0].body, "connection refused");
637
638        b.insert_metrics(&[MetricPoint {
639            timestamp: Utc::now(),
640            service: "api".into(),
641            name: "http_requests_total".into(),
642            metric_type: MetricType::Sum,
643            value: 42.0,
644            unit: "1".into(),
645            attributes: HashMap::new(),
646        }])
647        .unwrap();
648        let metrics = b
649            .query_metrics(&MetricQuery {
650                name: Some("http_requests_total".into()),
651                ..Default::default()
652            })
653            .unwrap();
654        assert_eq!(metrics.len(), 1);
655        assert_eq!(metrics[0].value, 42.0);
656    }
657
658    #[test]
659    fn compaction_moves_spans_to_cold_and_reads_still_union() {
660        let (b, _d, _g) = backend();
661        b.insert_spans(&[
662            span("t1", "s1", "api", 10.0, SpanStatus::Ok),
663            span("t1", "s2", "db", 20.0, SpanStatus::Ok),
664            span("t2", "s3", "api", 5.0, SpanStatus::Error),
665        ])
666        .unwrap();
667
668        // Compact everything (cutoff in the future) → all spans roll to cold.
669        let moved = b
670            .compact_spans(Utc::now() + chrono::Duration::seconds(60))
671            .unwrap();
672        assert_eq!(moved, 3);
673        // Hot tier is now empty...
674        assert!(b.hot.get_trace("t1").unwrap().is_empty());
675        // ...but the unioned reads still see everything.
676        assert_eq!(b.get_trace("t1").unwrap().len(), 2);
677        let all = b.query_traces(&TraceQuery::default()).unwrap();
678        assert_eq!(all.len(), 3);
679        let errors = b
680            .query_traces(&TraceQuery {
681                status: Some("error".into()),
682                ..Default::default()
683            })
684            .unwrap();
685        assert_eq!(errors.len(), 1);
686        assert_eq!(errors[0].span_id, "s3");
687
688        // Re-compaction is a no-op (nothing left in hot).
689        assert_eq!(b.compact_spans(Utc::now()).unwrap(), 0);
690    }
691
692    #[test]
693    fn logs_metrics_compact_to_cold_and_union_reads() {
694        use crate::storage::models::{LogRecord, LogSeverity, MetricPoint, MetricType};
695        let (b, _d, _g) = backend();
696        b.insert_logs(&[LogRecord {
697            timestamp: Utc::now(),
698            observed_timestamp: Utc::now(),
699            trace_id: Some("t1".into()),
700            span_id: None,
701            severity: LogSeverity::Error,
702            severity_text: "ERROR".into(),
703            body: "boom".into(),
704            service: "api".into(),
705            attributes: HashMap::new(),
706            body_sha256: None,
707        }])
708        .unwrap();
709        b.insert_metrics(&[MetricPoint {
710            timestamp: Utc::now(),
711            service: "api".into(),
712            name: "rps".into(),
713            metric_type: MetricType::Sum,
714            value: 7.0,
715            unit: "1".into(),
716            attributes: HashMap::new(),
717        }])
718        .unwrap();
719
720        let moved = b
721            .compact_logs_metrics(Utc::now() + chrono::Duration::seconds(60))
722            .unwrap();
723        assert_eq!(moved, 2);
724
725        // Served from cold via union after the hot tier emptied.
726        let logs = b.query_logs(&LogQuery::default()).unwrap();
727        assert_eq!(logs.len(), 1);
728        assert_eq!(logs[0].body, "boom");
729        let metrics = b.query_metrics(&MetricQuery::default()).unwrap();
730        assert_eq!(metrics.len(), 1);
731        assert_eq!(metrics[0].value, 7.0);
732    }
733
734    #[test]
735    fn full_text_search_filters_traces_by_payload() {
736        let (b, _d, _g) = backend();
737        b.insert_spans(&[
738            span("t1", "s1", "llm-proxy", 100.0, SpanStatus::Ok),
739            span("t2", "s2", "llm-proxy", 100.0, SpanStatus::Ok),
740        ])
741        .unwrap();
742        // Index payload text the way ingestion would.
743        let idx = b.search_index();
744        idx.index_span("t1", "s1", "summarize the rate limit policy")
745            .unwrap();
746        idx.index_span("t2", "s2", "translate to French").unwrap();
747        idx.commit().unwrap();
748
749        let hits = b
750            .query_traces(&TraceQuery {
751                text: Some("rate limit".into()),
752                ..Default::default()
753            })
754            .unwrap();
755        assert_eq!(hits.len(), 1);
756        assert_eq!(hits[0].trace_id, "t1");
757
758        // Non-matching query → empty.
759        let none = b
760            .query_traces(&TraceQuery {
761                text: Some("quantum".into()),
762                ..Default::default()
763            })
764            .unwrap();
765        assert!(none.is_empty());
766    }
767
768    #[test]
769    fn survives_reopen_via_persistent_hot_tier() {
770        let dir = tempfile::tempdir().unwrap();
771        let path = dir.path().to_str().unwrap();
772        let key = format!("tael-test-reopen-{}", uuid::Uuid::new_v4());
773        {
774            let b = TaelBackend::with_wal_key(path, &key).unwrap();
775            b.insert_spans(&[span("t1", "s1", "api", 10.0, SpanStatus::Ok)])
776                .unwrap();
777        }
778        // Reopen: data persists (hot tier + DuckDB are durable).
779        let b2 = TaelBackend::with_wal_key(path, &key).unwrap();
780        assert_eq!(b2.get_trace("t1").unwrap().len(), 1);
781        let _ = std::fs::remove_dir_all(format!("wal_files/{key}"));
782    }
783}