Skip to main content

tael_server/storage/backend/
mod.rs

1//! `TaelBackend` — the purpose-built storage engine (see
2//! `docs/tael-backend-design.md`). Built incrementally behind the `Store`
3//! trait so it ships opt-in alongside `DuckDbStore`.
4//!
5//! **Phase 3:** durable WAL write path + crash-gap replay.
6//! **Phase 4 (this file):** an `fjall` LSM hot tier serves the core per-signal
7//! reads (`query_traces`, `get_trace`, `list_services`, `query_logs`,
8//! `query_metrics`). Writes fan out to the WAL, the hot tier, and an inner
9//! `DuckDbStore` projection that still backs the heavier analytics
10//! (`query_summary`/`anomalies`/`correlate`, PromQL) until DataFusion (Phase 6)
11//! serves those from hot+cold. The double-write is the explicit transitional
12//! state; `--storage=tael-backend` is always a complete backend.
13
14mod cold;
15mod hot;
16mod wal;
17
18use anyhow::Result;
19
20use super::DuckDbStore;
21use super::Store;
22use super::models::{
23    AnomalyReport, CorrelateReport, LogQuery, LogRecord, MetricPoint, MetricQuery, ServiceInfo,
24    Span, SummaryReport, TraceComment, TraceQuery,
25};
26use std::sync::Arc;
27
28use super::SearchIndex;
29use cold::ColdTier;
30use hot::HotTier;
31use wal::WalLog;
32pub use wal::{WalRecord, WalSink};
33
34pub struct TaelBackend {
35    /// LSM hot tier — serves recent reads.
36    hot: HotTier,
37    /// Parquet cold tier — aged spans rolled out of the hot tier.
38    cold: ColdTier,
39    /// Projection backing analytics not yet ported to the hot tier.
40    inner: DuckDbStore,
41    /// Full-text index over LLM payloads (shared with the ingest path).
42    search: Arc<SearchIndex>,
43    wal: WalLog,
44}
45
46impl TaelBackend {
47    pub fn new(data_dir: &str) -> Result<Self> {
48        Self::with_wal_key(data_dir, "tael-backend")
49    }
50
51    /// Like [`Self::new`] but with an explicit WAL namespace key — lets tests
52    /// run isolated instances (the WAL key is process-global in walrus).
53    pub fn with_wal_key(data_dir: &str, wal_key: &str) -> Result<Self> {
54        Self::with_wal_key_and_sinks(data_dir, wal_key, Vec::new(), None)
55    }
56
57    /// Like [`Self::with_wal_key`] but with WAL replication sinks attached: this
58    /// backend runs as a **leader** that ships every appended record to its
59    /// standbys before acking the write (`docs/tael-server-scaling-ha.md` §5.1).
60    /// `required_acks` is how many standbys must confirm before a write returns
61    /// (`None` = all = fully synchronous; `Some(0)` = async best-effort). With
62    /// no sinks the write path is unchanged. A standby on the receiving end
63    /// applies shipped records via `Store::apply_framed_wal`.
64    pub fn with_wal_key_and_sinks(
65        data_dir: &str,
66        wal_key: &str,
67        sinks: Vec<Arc<dyn WalSink>>,
68        required_acks: Option<usize>,
69    ) -> Result<Self> {
70        let hot = HotTier::open(data_dir)?;
71        let cold = ColdTier::open(data_dir)?;
72        let inner = DuckDbStore::new(data_dir)?;
73        let search = Arc::new(SearchIndex::open(data_dir)?);
74        let mut wal = if sinks.is_empty() {
75            WalLog::new_for_key(wal_key)?
76        } else {
77            WalLog::new_for_key_with_sinks(wal_key, sinks)?
78        };
79        if let Some(n) = required_acks {
80            wal = wal.with_required_acks(n);
81        }
82        let backend = Self {
83            hot,
84            cold,
85            inner,
86            search,
87            wal,
88        };
89        backend.replay()?;
90        Ok(backend)
91    }
92
93
94    /// The shared payload search index — handed to the ingest path so prompt/
95    /// completion text is indexed at write time (the text isn't retained on the
96    /// span itself, only its blob hashes).
97    pub fn search_index(&self) -> Arc<SearchIndex> {
98        Arc::clone(&self.search)
99    }
100
101    /// Roll spans older than `cutoff` out of the LSM hot tier into Parquet.
102    /// Returns the number of spans compacted. Safe to call repeatedly.
103    pub fn compact_spans(&self, cutoff: chrono::DateTime<chrono::Utc>) -> Result<usize> {
104        let cutoff_ns = cutoff.timestamp_nanos_opt().unwrap_or(0);
105        let evicted = self.hot.evict_spans_before(cutoff_ns)?;
106        if evicted.is_empty() {
107            return Ok(0);
108        }
109        // Write to cold first, then the hot eviction is already done; if we
110        // crash between, the spans remain in the DuckDB projection and the WAL.
111        self.cold.write_spans(&evicted)?;
112        tracing::info!(spans = evicted.len(), "tael-backend: compacted spans to cold tier");
113        Ok(evicted.len())
114    }
115
116    /// Roll aged logs/metrics out of the hot tier into Parquet. Returns the
117    /// total number of records compacted across both signals.
118    pub fn compact_logs_metrics(&self, cutoff: chrono::DateTime<chrono::Utc>) -> Result<usize> {
119        let cutoff_ns = cutoff.timestamp_nanos_opt().unwrap_or(0);
120        let logs = self.hot.evict_logs_before(cutoff_ns)?;
121        if !logs.is_empty() {
122            self.cold.write_logs(&logs)?;
123        }
124        let metrics = self.hot.evict_metrics_before(cutoff_ns)?;
125        if !metrics.is_empty() {
126            self.cold.write_metrics(&metrics)?;
127            // Downsample to 5m rollups alongside the raw cold write, so trends
128            // survive once raw points are dropped by retention.
129            self.cold.write_downsampled(&metrics)?;
130        }
131        let n = logs.len() + metrics.len();
132        if n > 0 {
133            tracing::info!(logs = logs.len(), metrics = metrics.len(), "tael-backend: compacted logs/metrics to cold tier");
134        }
135        Ok(n)
136    }
137
138    /// Collect every blob hash still referenced by a live row — LLM prompt and
139    /// completion hashes on spans, and `body_sha256` on logs — across hot and
140    /// cold tiers. Drives blob GC (anything not here is unreferenced).
141    pub fn collect_live_blob_hashes(&self) -> Result<std::collections::HashSet<String>> {
142        use super::models::{LogQuery, TraceQuery};
143        let mut live = std::collections::HashSet::new();
144        // Spans (hot∪cold via the unioned read path), with a high limit.
145        let spans = self.query_traces(&TraceQuery {
146            limit: Some(u32::MAX),
147            ..Default::default()
148        })?;
149        for s in spans {
150            if let Some(llm) = s.llm {
151                live.extend(llm.prompt_sha256);
152                live.extend(llm.completion_sha256);
153            }
154        }
155        let logs = self.query_logs(&LogQuery {
156            limit: Some(u32::MAX),
157            ..Default::default()
158        })?;
159        for l in logs {
160            live.extend(l.body_sha256);
161        }
162        Ok(live)
163    }
164
165    /// Drop cold partitions (spans/logs/metrics) whose date is older than
166    /// `keep`. Returns the total number of partitions removed. (Metadata GC;
167    /// payload-blob GC runs separately in the maintenance task.)
168    pub fn enforce_span_retention(&self, keep: chrono::DateTime<chrono::Utc>) -> Result<usize> {
169        let cutoff_date = keep.format("%Y-%m-%d").to_string();
170        let dropped = self.cold.drop_partitions_before(&cutoff_date)?;
171        if dropped > 0 {
172            tracing::info!(partitions = dropped, "tael-backend: dropped expired cold partitions");
173        }
174        Ok(dropped)
175    }
176
177    /// Apply a batch to every projection (hot tier + DuckDB). Used by both the
178    /// live write path and WAL replay.
179    fn apply_spans(&self, spans: &[Span]) -> Result<()> {
180        self.hot.insert_spans(spans)?;
181        self.inner.insert_spans(spans)
182    }
183    fn apply_logs(&self, logs: &[LogRecord]) -> Result<()> {
184        self.hot.insert_logs(logs)?;
185        self.inner.insert_logs(logs)
186    }
187    fn apply_metrics(&self, metrics: &[MetricPoint]) -> Result<()> {
188        self.hot.insert_metrics(metrics)?;
189        self.inner.insert_metrics(metrics)
190    }
191
192    /// Re-apply any WAL records left unconsumed by a crash, then advance past
193    /// them (they are consumed by `drain`).
194    fn replay(&self) -> Result<()> {
195        let records = self.wal.drain()?;
196        if records.is_empty() {
197            return Ok(());
198        }
199        let mut spans = 0usize;
200        let mut logs = 0usize;
201        let mut metrics = 0usize;
202        for record in records {
203            match record {
204                WalRecord::Spans(s) => {
205                    spans += s.len();
206                    self.apply_spans(&s)?;
207                }
208                WalRecord::Logs(l) => {
209                    logs += l.len();
210                    self.apply_logs(&l)?;
211                }
212                WalRecord::Metrics(m) => {
213                    metrics += m.len();
214                    self.apply_metrics(&m)?;
215                }
216            }
217        }
218        tracing::info!(spans, logs, metrics, "tael-backend: replayed WAL");
219        Ok(())
220    }
221}
222
223impl Store for TaelBackend {
224    // ── Writes: WAL → apply (hot + projection) → mark applied ───────
225    fn insert_spans(&self, spans: &[Span]) -> Result<()> {
226        self.wal.append_spans(spans)?;
227        self.apply_spans(spans)?;
228        self.wal.mark_applied()?;
229        Ok(())
230    }
231
232    fn insert_logs(&self, logs: &[LogRecord]) -> Result<()> {
233        self.wal.append_logs(logs)?;
234        self.apply_logs(logs)?;
235        self.wal.mark_applied()?;
236        Ok(())
237    }
238
239    fn insert_metrics(&self, metrics: &[MetricPoint]) -> Result<()> {
240        self.wal.append_metrics(metrics)?;
241        self.apply_metrics(metrics)?;
242        self.wal.mark_applied()?;
243        Ok(())
244    }
245
246    // ── Core reads: hot tier, unioned with the cold tier ────────────
247    fn query_traces(&self, query: &TraceQuery) -> Result<Vec<Span>> {
248        // Full-text payload filter: restrict to traces whose LLM prompts/
249        // completions match, then apply the rest of the query over those spans.
250        if let Some(ref text) = query.text {
251            let trace_ids = self.search.search_trace_ids(text, 1000)?;
252            if trace_ids.is_empty() {
253                return Ok(Vec::new());
254            }
255            let cutoff = query
256                .last_seconds
257                .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
258            let limit = query.limit.unwrap_or(100) as usize;
259            let mut matched: Vec<Span> = Vec::new();
260            for tid in &trace_ids {
261                for s in self.get_trace(tid)? {
262                    if hot::span_matches(&s, query, cutoff) {
263                        matched.push(s);
264                    }
265                }
266            }
267            matched.sort_by(|a, b| b.start_time.cmp(&a.start_time));
268            matched.truncate(limit);
269            return Ok(matched);
270        }
271        // Hot holds the most-recent spans; cold holds older ones. Newest-first
272        // ordering means hot results lead; only dip into cold to fill the limit.
273        let mut results = self.hot.query_traces(query)?;
274        let limit = query.limit.unwrap_or(100) as usize;
275        if results.len() < limit {
276            let cutoff = query
277                .last_seconds
278                .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
279            let mut cold: Vec<Span> = self
280                .cold
281                .all_spans()?
282                .into_iter()
283                .filter(|s| hot::span_matches(s, query, cutoff))
284                .collect();
285            cold.sort_by(|a, b| b.start_time.cmp(&a.start_time));
286            for s in cold {
287                if results.len() >= limit {
288                    break;
289                }
290                results.push(s);
291            }
292        }
293        Ok(results)
294    }
295    fn get_trace(&self, trace_id: &str) -> Result<Vec<Span>> {
296        let mut spans = self.hot.get_trace(trace_id)?;
297        let mut seen: std::collections::HashSet<String> =
298            spans.iter().map(|s| s.span_id.clone()).collect();
299        // Union with cold; dedup by span_id in case of transient overlap during
300        // compaction.
301        for s in self.cold.get_trace(trace_id)? {
302            if seen.insert(s.span_id.clone()) {
303                spans.push(s);
304            }
305        }
306        spans.sort_by_key(|s| s.start_time);
307        Ok(spans)
308    }
309    fn list_services(&self) -> Result<Vec<ServiceInfo>> {
310        self.hot.list_services()
311    }
312    fn query_logs(&self, query: &LogQuery) -> Result<Vec<LogRecord>> {
313        let mut results = self.hot.query_logs(query)?;
314        let limit = query.limit.unwrap_or(100) as usize;
315        if results.len() < limit {
316            let cutoff = query
317                .last_seconds
318                .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
319            let mut cold: Vec<LogRecord> = self
320                .cold
321                .all_logs()?
322                .into_iter()
323                .filter(|l| hot::log_matches(l, query, cutoff))
324                .collect();
325            cold.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
326            for l in cold {
327                if results.len() >= limit {
328                    break;
329                }
330                results.push(l);
331            }
332        }
333        Ok(results)
334    }
335    fn query_metrics(&self, query: &MetricQuery) -> Result<Vec<MetricPoint>> {
336        let mut results = self.hot.query_metrics(query)?;
337        let limit = query.limit.unwrap_or(500) as usize;
338        if results.len() < limit {
339            let cutoff = query
340                .last_seconds
341                .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
342            let mut cold: Vec<MetricPoint> = self
343                .cold
344                .all_metrics()?
345                .into_iter()
346                .filter(|m| hot::metric_matches(m, query, cutoff))
347                .collect();
348            cold.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
349            for m in cold {
350                if results.len() >= limit {
351                    break;
352                }
353                results.push(m);
354            }
355        }
356        Ok(results)
357    }
358
359    // ── Comments & heavier analytics: projection (for now) ──────────
360    fn add_comment(
361        &self,
362        trace_id: &str,
363        span_id: Option<&str>,
364        author: &str,
365        body: &str,
366    ) -> Result<TraceComment> {
367        self.inner.add_comment(trace_id, span_id, author, body)
368    }
369    fn get_comments(&self, trace_id: &str) -> Result<Vec<TraceComment>> {
370        self.inner.get_comments(trace_id)
371    }
372    fn query_summary(&self, last_seconds: i64, service: Option<&str>) -> Result<SummaryReport> {
373        self.inner.query_summary(last_seconds, service)
374    }
375    fn query_anomalies(
376        &self,
377        current_seconds: i64,
378        baseline_seconds: i64,
379        service: Option<&str>,
380    ) -> Result<AnomalyReport> {
381        self.inner
382            .query_anomalies(current_seconds, baseline_seconds, service)
383    }
384    fn query_correlate(&self, trace_id: &str) -> Result<Option<CorrelateReport>> {
385        self.inner.query_correlate(trace_id)
386    }
387    fn query_sql(&self, sql: &str) -> Result<Vec<serde_json::Value>> {
388        // SQL runs over the DuckDB projection, which retains all signals.
389        self.inner.query_sql(sql)
390    }
391
392    fn flush(&self) -> Result<()> {
393        // Graceful-shutdown flush: tighten the hot tier so a restart/standby
394        // replays less WAL. WAL fsync already guarantees durability.
395        self.hot.flush()
396    }
397
398    /// Standby entrypoint: durably accept a framed WAL record shipped from a
399    /// leader and bring local state up to it. Mirrors the leader's write
400    /// discipline (append → apply → consume) so the standby's WAL, hot tier, and
401    /// projection stay byte-identical and itself replayable — the basis for
402    /// promotion on leader loss (§5.1).
403    fn apply_framed_wal(&self, framed: &[u8]) -> Result<()> {
404        let record = WalRecord::decode(framed)?;
405        self.wal.append_framed(framed)?;
406        match &record {
407            WalRecord::Spans(s) => self.apply_spans(s)?,
408            WalRecord::Logs(l) => self.apply_logs(l)?,
409            WalRecord::Metrics(m) => self.apply_metrics(m)?,
410        }
411        self.wal.mark_applied()?;
412        Ok(())
413    }
414}
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419    use crate::storage::models::{SpanKind, SpanStatus};
420    use chrono::Utc;
421    use std::collections::HashMap;
422
423    /// Removes a walrus namespace dir (`wal_files/<key>`) on drop.
424    struct NsGuard(String);
425    impl Drop for NsGuard {
426        fn drop(&mut self) {
427            let _ = std::fs::remove_dir_all(format!("wal_files/{}", self.0));
428        }
429    }
430
431    fn backend() -> (TaelBackend, tempfile::TempDir, NsGuard) {
432        let dir = tempfile::tempdir().unwrap();
433        let key = format!("tael-test-backend-{}", uuid::Uuid::new_v4());
434        let b = TaelBackend::with_wal_key(dir.path().to_str().unwrap(), &key).unwrap();
435        (b, dir, NsGuard(key))
436    }
437
438    fn span(trace: &str, sid: &str, svc: &str, dur: f64, status: SpanStatus) -> Span {
439        let now = Utc::now();
440        Span {
441            trace_id: trace.into(),
442            span_id: sid.into(),
443            parent_span_id: None,
444            service: svc.into(),
445            operation: "op".into(),
446            start_time: now,
447            end_time: now,
448            duration_ms: dur,
449            status,
450            attributes: HashMap::new(),
451            events: vec![],
452            kind: SpanKind::Internal,
453            llm: None,
454        }
455    }
456
457    #[test]
458    fn get_trace_reconstructs_span_tree_from_hot_tier() {
459        let (b, _d, _g) = backend();
460        b.insert_spans(&[
461            span("t1", "s1", "api", 10.0, SpanStatus::Ok),
462            span("t1", "s2", "db", 20.0, SpanStatus::Ok),
463            span("t2", "s3", "api", 5.0, SpanStatus::Error),
464        ])
465        .unwrap();
466
467        let trace = b.get_trace("t1").unwrap();
468        assert_eq!(trace.len(), 2);
469        assert!(trace.iter().all(|s| s.trace_id == "t1"));
470        assert_eq!(b.get_trace("t2").unwrap().len(), 1);
471        assert!(b.get_trace("missing").unwrap().is_empty());
472    }
473
474    #[test]
475    fn query_traces_filters_match_duckdb() {
476        let (b, _d, _g) = backend();
477        let spans = vec![
478            span("t1", "s1", "api", 10.0, SpanStatus::Ok),
479            span("t2", "s2", "db", 600.0, SpanStatus::Error),
480            span("t3", "s3", "api", 50.0, SpanStatus::Error),
481        ];
482        b.insert_spans(&spans).unwrap();
483
484        // Independent DuckDB with identical data = the parity oracle.
485        let oracle_dir = tempfile::tempdir().unwrap();
486        let oracle = DuckDbStore::new(oracle_dir.path().to_str().unwrap()).unwrap();
487        oracle.insert_spans(&spans).unwrap();
488
489        let queries = [
490            TraceQuery {
491                service: Some("api".into()),
492                ..Default::default()
493            },
494            TraceQuery {
495                status: Some("error".into()),
496                ..Default::default()
497            },
498            TraceQuery {
499                min_duration_ms: Some(100.0),
500                ..Default::default()
501            },
502            TraceQuery::default(),
503        ];
504        for q in &queries {
505            let mut hot: Vec<String> = b.query_traces(q).unwrap().into_iter().map(|s| s.span_id).collect();
506            let mut duck: Vec<String> = oracle.query_traces(q).unwrap().into_iter().map(|s| s.span_id).collect();
507            hot.sort();
508            duck.sort();
509            assert_eq!(hot, duck, "hot tier and DuckDB disagree for {q:?}");
510        }
511    }
512
513    #[test]
514    fn list_services_aggregates_from_hot_tier() {
515        let (b, _d, _g) = backend();
516        b.insert_spans(&[
517            span("t1", "s1", "api", 10.0, SpanStatus::Ok),
518            span("t1", "s2", "api", 30.0, SpanStatus::Error),
519            span("t2", "s3", "db", 20.0, SpanStatus::Ok),
520        ])
521        .unwrap();
522
523        let services = b.list_services().unwrap();
524        let api = services.iter().find(|s| s.name == "api").unwrap();
525        assert_eq!(api.span_count, 2);
526        assert_eq!(api.trace_count, 1);
527        assert_eq!(api.avg_duration_ms, 20.0);
528        assert!((api.error_rate - 0.5).abs() < 1e-9);
529    }
530
531    #[test]
532    fn standby_rebuilds_identical_state_from_shipped_wal() {
533        use crate::storage::models::{LogRecord, LogSeverity};
534
535        // The standby: a normal backend that is never written to directly.
536        let (standby, _sd, _sg) = backend();
537        let standby = Arc::new(standby);
538
539        // A WAL sink that ships each framed record into the standby — the
540        // in-process stand-in for the (deferred) network transport.
541        struct ReplicaSink(Arc<TaelBackend>);
542        impl WalSink for ReplicaSink {
543            fn append_framed(&self, framed: &[u8]) -> Result<()> {
544                self.0.apply_framed_wal(framed)
545            }
546        }
547
548        // The leader, with the standby attached as a replication sink.
549        let leader_dir = tempfile::tempdir().unwrap();
550        let leader_key = format!("tael-test-leader-{}", uuid::Uuid::new_v4());
551        let _lg = NsGuard(leader_key.clone());
552        let leader = TaelBackend::with_wal_key_and_sinks(
553            leader_dir.path().to_str().unwrap(),
554            &leader_key,
555            vec![Arc::new(ReplicaSink(Arc::clone(&standby)))],
556            None, // synchronous: require all (one) standbys
557        )
558        .unwrap();
559
560        // Write a mix of signals to the leader only.
561        leader
562            .insert_spans(&[
563                span("t1", "s1", "api", 10.0, SpanStatus::Ok),
564                span("t1", "s2", "db", 20.0, SpanStatus::Ok),
565                span("t2", "s3", "api", 5.0, SpanStatus::Error),
566            ])
567            .unwrap();
568        leader
569            .insert_logs(&[LogRecord {
570                timestamp: Utc::now(),
571                observed_timestamp: Utc::now(),
572                trace_id: Some("t1".into()),
573                span_id: None,
574                severity: LogSeverity::Error,
575                severity_text: "ERROR".into(),
576                body: "boom".into(),
577                service: "api".into(),
578                attributes: HashMap::new(),
579                body_sha256: None,
580            }])
581            .unwrap();
582
583        // The standby reconstructed identical state purely from the shipped WAL.
584        assert_eq!(standby.get_trace("t1").unwrap().len(), 2);
585        assert_eq!(standby.get_trace("t2").unwrap().len(), 1);
586        let leader_traces = leader.query_traces(&TraceQuery::default()).unwrap();
587        let standby_traces = standby.query_traces(&TraceQuery::default()).unwrap();
588        assert_eq!(leader_traces.len(), standby_traces.len());
589        assert_eq!(standby_traces.len(), 3);
590    }
591
592    #[test]
593    fn logs_and_metrics_round_trip_through_hot_tier() {
594        use crate::storage::models::{LogRecord, LogSeverity, MetricPoint, MetricType};
595        let (b, _d, _g) = backend();
596
597        b.insert_logs(&[LogRecord {
598            timestamp: Utc::now(),
599            observed_timestamp: Utc::now(),
600            trace_id: Some("t1".into()),
601            span_id: None,
602            severity: LogSeverity::Error,
603            severity_text: "ERROR".into(),
604            body: "connection refused".into(),
605            service: "api".into(),
606            attributes: HashMap::new(),
607            body_sha256: None,
608        }])
609        .unwrap();
610        let logs = b
611            .query_logs(&LogQuery {
612                severity: Some("error".into()),
613                ..Default::default()
614            })
615            .unwrap();
616        assert_eq!(logs.len(), 1);
617        assert_eq!(logs[0].body, "connection refused");
618
619        b.insert_metrics(&[MetricPoint {
620            timestamp: Utc::now(),
621            service: "api".into(),
622            name: "http_requests_total".into(),
623            metric_type: MetricType::Sum,
624            value: 42.0,
625            unit: "1".into(),
626            attributes: HashMap::new(),
627        }])
628        .unwrap();
629        let metrics = b
630            .query_metrics(&MetricQuery {
631                name: Some("http_requests_total".into()),
632                ..Default::default()
633            })
634            .unwrap();
635        assert_eq!(metrics.len(), 1);
636        assert_eq!(metrics[0].value, 42.0);
637    }
638
639    #[test]
640    fn compaction_moves_spans_to_cold_and_reads_still_union() {
641        let (b, _d, _g) = backend();
642        b.insert_spans(&[
643            span("t1", "s1", "api", 10.0, SpanStatus::Ok),
644            span("t1", "s2", "db", 20.0, SpanStatus::Ok),
645            span("t2", "s3", "api", 5.0, SpanStatus::Error),
646        ])
647        .unwrap();
648
649        // Compact everything (cutoff in the future) → all spans roll to cold.
650        let moved = b
651            .compact_spans(Utc::now() + chrono::Duration::seconds(60))
652            .unwrap();
653        assert_eq!(moved, 3);
654        // Hot tier is now empty...
655        assert!(b.hot.get_trace("t1").unwrap().is_empty());
656        // ...but the unioned reads still see everything.
657        assert_eq!(b.get_trace("t1").unwrap().len(), 2);
658        let all = b.query_traces(&TraceQuery::default()).unwrap();
659        assert_eq!(all.len(), 3);
660        let errors = b
661            .query_traces(&TraceQuery {
662                status: Some("error".into()),
663                ..Default::default()
664            })
665            .unwrap();
666        assert_eq!(errors.len(), 1);
667        assert_eq!(errors[0].span_id, "s3");
668
669        // Re-compaction is a no-op (nothing left in hot).
670        assert_eq!(b.compact_spans(Utc::now()).unwrap(), 0);
671    }
672
673    #[test]
674    fn logs_metrics_compact_to_cold_and_union_reads() {
675        use crate::storage::models::{LogRecord, LogSeverity, MetricPoint, MetricType};
676        let (b, _d, _g) = backend();
677        b.insert_logs(&[LogRecord {
678            timestamp: Utc::now(),
679            observed_timestamp: Utc::now(),
680            trace_id: Some("t1".into()),
681            span_id: None,
682            severity: LogSeverity::Error,
683            severity_text: "ERROR".into(),
684            body: "boom".into(),
685            service: "api".into(),
686            attributes: HashMap::new(),
687            body_sha256: None,
688        }])
689        .unwrap();
690        b.insert_metrics(&[MetricPoint {
691            timestamp: Utc::now(),
692            service: "api".into(),
693            name: "rps".into(),
694            metric_type: MetricType::Sum,
695            value: 7.0,
696            unit: "1".into(),
697            attributes: HashMap::new(),
698        }])
699        .unwrap();
700
701        let moved = b
702            .compact_logs_metrics(Utc::now() + chrono::Duration::seconds(60))
703            .unwrap();
704        assert_eq!(moved, 2);
705
706        // Served from cold via union after the hot tier emptied.
707        let logs = b.query_logs(&LogQuery::default()).unwrap();
708        assert_eq!(logs.len(), 1);
709        assert_eq!(logs[0].body, "boom");
710        let metrics = b.query_metrics(&MetricQuery::default()).unwrap();
711        assert_eq!(metrics.len(), 1);
712        assert_eq!(metrics[0].value, 7.0);
713    }
714
715    #[test]
716    fn full_text_search_filters_traces_by_payload() {
717        let (b, _d, _g) = backend();
718        b.insert_spans(&[
719            span("t1", "s1", "llm-proxy", 100.0, SpanStatus::Ok),
720            span("t2", "s2", "llm-proxy", 100.0, SpanStatus::Ok),
721        ])
722        .unwrap();
723        // Index payload text the way ingestion would.
724        let idx = b.search_index();
725        idx.index_span("t1", "s1", "summarize the rate limit policy").unwrap();
726        idx.index_span("t2", "s2", "translate to French").unwrap();
727        idx.commit().unwrap();
728
729        let hits = b
730            .query_traces(&TraceQuery {
731                text: Some("rate limit".into()),
732                ..Default::default()
733            })
734            .unwrap();
735        assert_eq!(hits.len(), 1);
736        assert_eq!(hits[0].trace_id, "t1");
737
738        // Non-matching query → empty.
739        let none = b
740            .query_traces(&TraceQuery {
741                text: Some("quantum".into()),
742                ..Default::default()
743            })
744            .unwrap();
745        assert!(none.is_empty());
746    }
747
748    #[test]
749    fn survives_reopen_via_persistent_hot_tier() {
750        let dir = tempfile::tempdir().unwrap();
751        let path = dir.path().to_str().unwrap();
752        let key = format!("tael-test-reopen-{}", uuid::Uuid::new_v4());
753        {
754            let b = TaelBackend::with_wal_key(path, &key).unwrap();
755            b.insert_spans(&[span("t1", "s1", "api", 10.0, SpanStatus::Ok)])
756                .unwrap();
757        }
758        // Reopen: data persists (hot tier + DuckDB are durable).
759        let b2 = TaelBackend::with_wal_key(path, &key).unwrap();
760        assert_eq!(b2.get_trace("t1").unwrap().len(), 1);
761        let _ = std::fs::remove_dir_all(format!("wal_files/{key}"));
762    }
763}