1mod cold;
15mod hot;
16mod wal;
17
18use anyhow::Result;
19
20use super::DuckDbStore;
21use super::Store;
22use super::models::{
23 AnomalyReport, CorrelateReport, LogQuery, LogRecord, MetricPoint, MetricQuery, ServiceInfo,
24 Span, SummaryReport, TraceComment, TraceQuery,
25};
26use std::sync::Arc;
27
28use super::SearchIndex;
29use cold::ColdTier;
30use hot::HotTier;
31use wal::WalLog;
32pub use wal::{WalRecord, WalSink};
33
34pub struct TaelBackend {
35 hot: HotTier,
37 cold: ColdTier,
39 inner: DuckDbStore,
41 search: Arc<SearchIndex>,
43 wal: WalLog,
44}
45
46impl TaelBackend {
47 pub fn new(data_dir: &str) -> Result<Self> {
48 Self::with_wal_key(data_dir, "tael-backend")
49 }
50
51 pub fn with_wal_key(data_dir: &str, wal_key: &str) -> Result<Self> {
54 Self::with_wal_key_and_sinks(data_dir, wal_key, Vec::new(), None)
55 }
56
57 pub fn with_wal_key_and_sinks(
65 data_dir: &str,
66 wal_key: &str,
67 sinks: Vec<Arc<dyn WalSink>>,
68 required_acks: Option<usize>,
69 ) -> Result<Self> {
70 let hot = HotTier::open(data_dir)?;
71 let cold = ColdTier::open(data_dir)?;
72 let inner = DuckDbStore::new(data_dir)?;
73 let search = Arc::new(SearchIndex::open(data_dir)?);
74 let mut wal = if sinks.is_empty() {
75 WalLog::new_for_key(wal_key)?
76 } else {
77 WalLog::new_for_key_with_sinks(wal_key, sinks)?
78 };
79 if let Some(n) = required_acks {
80 wal = wal.with_required_acks(n);
81 }
82 let backend = Self {
83 hot,
84 cold,
85 inner,
86 search,
87 wal,
88 };
89 backend.replay()?;
90 Ok(backend)
91 }
92
93
94 pub fn search_index(&self) -> Arc<SearchIndex> {
98 Arc::clone(&self.search)
99 }
100
101 pub fn compact_spans(&self, cutoff: chrono::DateTime<chrono::Utc>) -> Result<usize> {
104 let cutoff_ns = cutoff.timestamp_nanos_opt().unwrap_or(0);
105 let evicted = self.hot.evict_spans_before(cutoff_ns)?;
106 if evicted.is_empty() {
107 return Ok(0);
108 }
109 self.cold.write_spans(&evicted)?;
112 tracing::info!(spans = evicted.len(), "tael-backend: compacted spans to cold tier");
113 Ok(evicted.len())
114 }
115
116 pub fn compact_logs_metrics(&self, cutoff: chrono::DateTime<chrono::Utc>) -> Result<usize> {
119 let cutoff_ns = cutoff.timestamp_nanos_opt().unwrap_or(0);
120 let logs = self.hot.evict_logs_before(cutoff_ns)?;
121 if !logs.is_empty() {
122 self.cold.write_logs(&logs)?;
123 }
124 let metrics = self.hot.evict_metrics_before(cutoff_ns)?;
125 if !metrics.is_empty() {
126 self.cold.write_metrics(&metrics)?;
127 self.cold.write_downsampled(&metrics)?;
130 }
131 let n = logs.len() + metrics.len();
132 if n > 0 {
133 tracing::info!(logs = logs.len(), metrics = metrics.len(), "tael-backend: compacted logs/metrics to cold tier");
134 }
135 Ok(n)
136 }
137
138 pub fn collect_live_blob_hashes(&self) -> Result<std::collections::HashSet<String>> {
142 use super::models::{LogQuery, TraceQuery};
143 let mut live = std::collections::HashSet::new();
144 let spans = self.query_traces(&TraceQuery {
146 limit: Some(u32::MAX),
147 ..Default::default()
148 })?;
149 for s in spans {
150 if let Some(llm) = s.llm {
151 live.extend(llm.prompt_sha256);
152 live.extend(llm.completion_sha256);
153 }
154 }
155 let logs = self.query_logs(&LogQuery {
156 limit: Some(u32::MAX),
157 ..Default::default()
158 })?;
159 for l in logs {
160 live.extend(l.body_sha256);
161 }
162 Ok(live)
163 }
164
165 pub fn enforce_span_retention(&self, keep: chrono::DateTime<chrono::Utc>) -> Result<usize> {
169 let cutoff_date = keep.format("%Y-%m-%d").to_string();
170 let dropped = self.cold.drop_partitions_before(&cutoff_date)?;
171 if dropped > 0 {
172 tracing::info!(partitions = dropped, "tael-backend: dropped expired cold partitions");
173 }
174 Ok(dropped)
175 }
176
177 fn apply_spans(&self, spans: &[Span]) -> Result<()> {
180 self.hot.insert_spans(spans)?;
181 self.inner.insert_spans(spans)
182 }
183 fn apply_logs(&self, logs: &[LogRecord]) -> Result<()> {
184 self.hot.insert_logs(logs)?;
185 self.inner.insert_logs(logs)
186 }
187 fn apply_metrics(&self, metrics: &[MetricPoint]) -> Result<()> {
188 self.hot.insert_metrics(metrics)?;
189 self.inner.insert_metrics(metrics)
190 }
191
192 fn replay(&self) -> Result<()> {
195 let records = self.wal.drain()?;
196 if records.is_empty() {
197 return Ok(());
198 }
199 let mut spans = 0usize;
200 let mut logs = 0usize;
201 let mut metrics = 0usize;
202 for record in records {
203 match record {
204 WalRecord::Spans(s) => {
205 spans += s.len();
206 self.apply_spans(&s)?;
207 }
208 WalRecord::Logs(l) => {
209 logs += l.len();
210 self.apply_logs(&l)?;
211 }
212 WalRecord::Metrics(m) => {
213 metrics += m.len();
214 self.apply_metrics(&m)?;
215 }
216 }
217 }
218 tracing::info!(spans, logs, metrics, "tael-backend: replayed WAL");
219 Ok(())
220 }
221}
222
223impl Store for TaelBackend {
224 fn insert_spans(&self, spans: &[Span]) -> Result<()> {
226 self.wal.append_spans(spans)?;
227 self.apply_spans(spans)?;
228 self.wal.mark_applied()?;
229 Ok(())
230 }
231
232 fn insert_logs(&self, logs: &[LogRecord]) -> Result<()> {
233 self.wal.append_logs(logs)?;
234 self.apply_logs(logs)?;
235 self.wal.mark_applied()?;
236 Ok(())
237 }
238
239 fn insert_metrics(&self, metrics: &[MetricPoint]) -> Result<()> {
240 self.wal.append_metrics(metrics)?;
241 self.apply_metrics(metrics)?;
242 self.wal.mark_applied()?;
243 Ok(())
244 }
245
246 fn query_traces(&self, query: &TraceQuery) -> Result<Vec<Span>> {
248 if let Some(ref text) = query.text {
251 let trace_ids = self.search.search_trace_ids(text, 1000)?;
252 if trace_ids.is_empty() {
253 return Ok(Vec::new());
254 }
255 let cutoff = query
256 .last_seconds
257 .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
258 let limit = query.limit.unwrap_or(100) as usize;
259 let mut matched: Vec<Span> = Vec::new();
260 for tid in &trace_ids {
261 for s in self.get_trace(tid)? {
262 if hot::span_matches(&s, query, cutoff) {
263 matched.push(s);
264 }
265 }
266 }
267 matched.sort_by(|a, b| b.start_time.cmp(&a.start_time));
268 matched.truncate(limit);
269 return Ok(matched);
270 }
271 let mut results = self.hot.query_traces(query)?;
274 let limit = query.limit.unwrap_or(100) as usize;
275 if results.len() < limit {
276 let cutoff = query
277 .last_seconds
278 .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
279 let mut cold: Vec<Span> = self
280 .cold
281 .all_spans()?
282 .into_iter()
283 .filter(|s| hot::span_matches(s, query, cutoff))
284 .collect();
285 cold.sort_by(|a, b| b.start_time.cmp(&a.start_time));
286 for s in cold {
287 if results.len() >= limit {
288 break;
289 }
290 results.push(s);
291 }
292 }
293 Ok(results)
294 }
295 fn get_trace(&self, trace_id: &str) -> Result<Vec<Span>> {
296 let mut spans = self.hot.get_trace(trace_id)?;
297 let mut seen: std::collections::HashSet<String> =
298 spans.iter().map(|s| s.span_id.clone()).collect();
299 for s in self.cold.get_trace(trace_id)? {
302 if seen.insert(s.span_id.clone()) {
303 spans.push(s);
304 }
305 }
306 spans.sort_by_key(|s| s.start_time);
307 Ok(spans)
308 }
309 fn list_services(&self) -> Result<Vec<ServiceInfo>> {
310 self.hot.list_services()
311 }
312 fn query_logs(&self, query: &LogQuery) -> Result<Vec<LogRecord>> {
313 let mut results = self.hot.query_logs(query)?;
314 let limit = query.limit.unwrap_or(100) as usize;
315 if results.len() < limit {
316 let cutoff = query
317 .last_seconds
318 .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
319 let mut cold: Vec<LogRecord> = self
320 .cold
321 .all_logs()?
322 .into_iter()
323 .filter(|l| hot::log_matches(l, query, cutoff))
324 .collect();
325 cold.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
326 for l in cold {
327 if results.len() >= limit {
328 break;
329 }
330 results.push(l);
331 }
332 }
333 Ok(results)
334 }
335 fn query_metrics(&self, query: &MetricQuery) -> Result<Vec<MetricPoint>> {
336 let mut results = self.hot.query_metrics(query)?;
337 let limit = query.limit.unwrap_or(500) as usize;
338 if results.len() < limit {
339 let cutoff = query
340 .last_seconds
341 .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
342 let mut cold: Vec<MetricPoint> = self
343 .cold
344 .all_metrics()?
345 .into_iter()
346 .filter(|m| hot::metric_matches(m, query, cutoff))
347 .collect();
348 cold.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
349 for m in cold {
350 if results.len() >= limit {
351 break;
352 }
353 results.push(m);
354 }
355 }
356 Ok(results)
357 }
358
359 fn add_comment(
361 &self,
362 trace_id: &str,
363 span_id: Option<&str>,
364 author: &str,
365 body: &str,
366 ) -> Result<TraceComment> {
367 self.inner.add_comment(trace_id, span_id, author, body)
368 }
369 fn get_comments(&self, trace_id: &str) -> Result<Vec<TraceComment>> {
370 self.inner.get_comments(trace_id)
371 }
372 fn query_summary(&self, last_seconds: i64, service: Option<&str>) -> Result<SummaryReport> {
373 self.inner.query_summary(last_seconds, service)
374 }
375 fn query_anomalies(
376 &self,
377 current_seconds: i64,
378 baseline_seconds: i64,
379 service: Option<&str>,
380 ) -> Result<AnomalyReport> {
381 self.inner
382 .query_anomalies(current_seconds, baseline_seconds, service)
383 }
384 fn query_correlate(&self, trace_id: &str) -> Result<Option<CorrelateReport>> {
385 self.inner.query_correlate(trace_id)
386 }
387 fn query_sql(&self, sql: &str) -> Result<Vec<serde_json::Value>> {
388 self.inner.query_sql(sql)
390 }
391
392 fn flush(&self) -> Result<()> {
393 self.hot.flush()
396 }
397
398 fn apply_framed_wal(&self, framed: &[u8]) -> Result<()> {
404 let record = WalRecord::decode(framed)?;
405 self.wal.append_framed(framed)?;
406 match &record {
407 WalRecord::Spans(s) => self.apply_spans(s)?,
408 WalRecord::Logs(l) => self.apply_logs(l)?,
409 WalRecord::Metrics(m) => self.apply_metrics(m)?,
410 }
411 self.wal.mark_applied()?;
412 Ok(())
413 }
414}
415
416#[cfg(test)]
417mod tests {
418 use super::*;
419 use crate::storage::models::{SpanKind, SpanStatus};
420 use chrono::Utc;
421 use std::collections::HashMap;
422
423 struct NsGuard(String);
425 impl Drop for NsGuard {
426 fn drop(&mut self) {
427 let _ = std::fs::remove_dir_all(format!("wal_files/{}", self.0));
428 }
429 }
430
431 fn backend() -> (TaelBackend, tempfile::TempDir, NsGuard) {
432 let dir = tempfile::tempdir().unwrap();
433 let key = format!("tael-test-backend-{}", uuid::Uuid::new_v4());
434 let b = TaelBackend::with_wal_key(dir.path().to_str().unwrap(), &key).unwrap();
435 (b, dir, NsGuard(key))
436 }
437
438 fn span(trace: &str, sid: &str, svc: &str, dur: f64, status: SpanStatus) -> Span {
439 let now = Utc::now();
440 Span {
441 trace_id: trace.into(),
442 span_id: sid.into(),
443 parent_span_id: None,
444 service: svc.into(),
445 operation: "op".into(),
446 start_time: now,
447 end_time: now,
448 duration_ms: dur,
449 status,
450 attributes: HashMap::new(),
451 events: vec![],
452 kind: SpanKind::Internal,
453 llm: None,
454 }
455 }
456
457 #[test]
458 fn get_trace_reconstructs_span_tree_from_hot_tier() {
459 let (b, _d, _g) = backend();
460 b.insert_spans(&[
461 span("t1", "s1", "api", 10.0, SpanStatus::Ok),
462 span("t1", "s2", "db", 20.0, SpanStatus::Ok),
463 span("t2", "s3", "api", 5.0, SpanStatus::Error),
464 ])
465 .unwrap();
466
467 let trace = b.get_trace("t1").unwrap();
468 assert_eq!(trace.len(), 2);
469 assert!(trace.iter().all(|s| s.trace_id == "t1"));
470 assert_eq!(b.get_trace("t2").unwrap().len(), 1);
471 assert!(b.get_trace("missing").unwrap().is_empty());
472 }
473
474 #[test]
475 fn query_traces_filters_match_duckdb() {
476 let (b, _d, _g) = backend();
477 let spans = vec![
478 span("t1", "s1", "api", 10.0, SpanStatus::Ok),
479 span("t2", "s2", "db", 600.0, SpanStatus::Error),
480 span("t3", "s3", "api", 50.0, SpanStatus::Error),
481 ];
482 b.insert_spans(&spans).unwrap();
483
484 let oracle_dir = tempfile::tempdir().unwrap();
486 let oracle = DuckDbStore::new(oracle_dir.path().to_str().unwrap()).unwrap();
487 oracle.insert_spans(&spans).unwrap();
488
489 let queries = [
490 TraceQuery {
491 service: Some("api".into()),
492 ..Default::default()
493 },
494 TraceQuery {
495 status: Some("error".into()),
496 ..Default::default()
497 },
498 TraceQuery {
499 min_duration_ms: Some(100.0),
500 ..Default::default()
501 },
502 TraceQuery::default(),
503 ];
504 for q in &queries {
505 let mut hot: Vec<String> = b.query_traces(q).unwrap().into_iter().map(|s| s.span_id).collect();
506 let mut duck: Vec<String> = oracle.query_traces(q).unwrap().into_iter().map(|s| s.span_id).collect();
507 hot.sort();
508 duck.sort();
509 assert_eq!(hot, duck, "hot tier and DuckDB disagree for {q:?}");
510 }
511 }
512
513 #[test]
514 fn list_services_aggregates_from_hot_tier() {
515 let (b, _d, _g) = backend();
516 b.insert_spans(&[
517 span("t1", "s1", "api", 10.0, SpanStatus::Ok),
518 span("t1", "s2", "api", 30.0, SpanStatus::Error),
519 span("t2", "s3", "db", 20.0, SpanStatus::Ok),
520 ])
521 .unwrap();
522
523 let services = b.list_services().unwrap();
524 let api = services.iter().find(|s| s.name == "api").unwrap();
525 assert_eq!(api.span_count, 2);
526 assert_eq!(api.trace_count, 1);
527 assert_eq!(api.avg_duration_ms, 20.0);
528 assert!((api.error_rate - 0.5).abs() < 1e-9);
529 }
530
531 #[test]
532 fn standby_rebuilds_identical_state_from_shipped_wal() {
533 use crate::storage::models::{LogRecord, LogSeverity};
534
535 let (standby, _sd, _sg) = backend();
537 let standby = Arc::new(standby);
538
539 struct ReplicaSink(Arc<TaelBackend>);
542 impl WalSink for ReplicaSink {
543 fn append_framed(&self, framed: &[u8]) -> Result<()> {
544 self.0.apply_framed_wal(framed)
545 }
546 }
547
548 let leader_dir = tempfile::tempdir().unwrap();
550 let leader_key = format!("tael-test-leader-{}", uuid::Uuid::new_v4());
551 let _lg = NsGuard(leader_key.clone());
552 let leader = TaelBackend::with_wal_key_and_sinks(
553 leader_dir.path().to_str().unwrap(),
554 &leader_key,
555 vec![Arc::new(ReplicaSink(Arc::clone(&standby)))],
556 None, )
558 .unwrap();
559
560 leader
562 .insert_spans(&[
563 span("t1", "s1", "api", 10.0, SpanStatus::Ok),
564 span("t1", "s2", "db", 20.0, SpanStatus::Ok),
565 span("t2", "s3", "api", 5.0, SpanStatus::Error),
566 ])
567 .unwrap();
568 leader
569 .insert_logs(&[LogRecord {
570 timestamp: Utc::now(),
571 observed_timestamp: Utc::now(),
572 trace_id: Some("t1".into()),
573 span_id: None,
574 severity: LogSeverity::Error,
575 severity_text: "ERROR".into(),
576 body: "boom".into(),
577 service: "api".into(),
578 attributes: HashMap::new(),
579 body_sha256: None,
580 }])
581 .unwrap();
582
583 assert_eq!(standby.get_trace("t1").unwrap().len(), 2);
585 assert_eq!(standby.get_trace("t2").unwrap().len(), 1);
586 let leader_traces = leader.query_traces(&TraceQuery::default()).unwrap();
587 let standby_traces = standby.query_traces(&TraceQuery::default()).unwrap();
588 assert_eq!(leader_traces.len(), standby_traces.len());
589 assert_eq!(standby_traces.len(), 3);
590 }
591
592 #[test]
593 fn logs_and_metrics_round_trip_through_hot_tier() {
594 use crate::storage::models::{LogRecord, LogSeverity, MetricPoint, MetricType};
595 let (b, _d, _g) = backend();
596
597 b.insert_logs(&[LogRecord {
598 timestamp: Utc::now(),
599 observed_timestamp: Utc::now(),
600 trace_id: Some("t1".into()),
601 span_id: None,
602 severity: LogSeverity::Error,
603 severity_text: "ERROR".into(),
604 body: "connection refused".into(),
605 service: "api".into(),
606 attributes: HashMap::new(),
607 body_sha256: None,
608 }])
609 .unwrap();
610 let logs = b
611 .query_logs(&LogQuery {
612 severity: Some("error".into()),
613 ..Default::default()
614 })
615 .unwrap();
616 assert_eq!(logs.len(), 1);
617 assert_eq!(logs[0].body, "connection refused");
618
619 b.insert_metrics(&[MetricPoint {
620 timestamp: Utc::now(),
621 service: "api".into(),
622 name: "http_requests_total".into(),
623 metric_type: MetricType::Sum,
624 value: 42.0,
625 unit: "1".into(),
626 attributes: HashMap::new(),
627 }])
628 .unwrap();
629 let metrics = b
630 .query_metrics(&MetricQuery {
631 name: Some("http_requests_total".into()),
632 ..Default::default()
633 })
634 .unwrap();
635 assert_eq!(metrics.len(), 1);
636 assert_eq!(metrics[0].value, 42.0);
637 }
638
639 #[test]
640 fn compaction_moves_spans_to_cold_and_reads_still_union() {
641 let (b, _d, _g) = backend();
642 b.insert_spans(&[
643 span("t1", "s1", "api", 10.0, SpanStatus::Ok),
644 span("t1", "s2", "db", 20.0, SpanStatus::Ok),
645 span("t2", "s3", "api", 5.0, SpanStatus::Error),
646 ])
647 .unwrap();
648
649 let moved = b
651 .compact_spans(Utc::now() + chrono::Duration::seconds(60))
652 .unwrap();
653 assert_eq!(moved, 3);
654 assert!(b.hot.get_trace("t1").unwrap().is_empty());
656 assert_eq!(b.get_trace("t1").unwrap().len(), 2);
658 let all = b.query_traces(&TraceQuery::default()).unwrap();
659 assert_eq!(all.len(), 3);
660 let errors = b
661 .query_traces(&TraceQuery {
662 status: Some("error".into()),
663 ..Default::default()
664 })
665 .unwrap();
666 assert_eq!(errors.len(), 1);
667 assert_eq!(errors[0].span_id, "s3");
668
669 assert_eq!(b.compact_spans(Utc::now()).unwrap(), 0);
671 }
672
673 #[test]
674 fn logs_metrics_compact_to_cold_and_union_reads() {
675 use crate::storage::models::{LogRecord, LogSeverity, MetricPoint, MetricType};
676 let (b, _d, _g) = backend();
677 b.insert_logs(&[LogRecord {
678 timestamp: Utc::now(),
679 observed_timestamp: Utc::now(),
680 trace_id: Some("t1".into()),
681 span_id: None,
682 severity: LogSeverity::Error,
683 severity_text: "ERROR".into(),
684 body: "boom".into(),
685 service: "api".into(),
686 attributes: HashMap::new(),
687 body_sha256: None,
688 }])
689 .unwrap();
690 b.insert_metrics(&[MetricPoint {
691 timestamp: Utc::now(),
692 service: "api".into(),
693 name: "rps".into(),
694 metric_type: MetricType::Sum,
695 value: 7.0,
696 unit: "1".into(),
697 attributes: HashMap::new(),
698 }])
699 .unwrap();
700
701 let moved = b
702 .compact_logs_metrics(Utc::now() + chrono::Duration::seconds(60))
703 .unwrap();
704 assert_eq!(moved, 2);
705
706 let logs = b.query_logs(&LogQuery::default()).unwrap();
708 assert_eq!(logs.len(), 1);
709 assert_eq!(logs[0].body, "boom");
710 let metrics = b.query_metrics(&MetricQuery::default()).unwrap();
711 assert_eq!(metrics.len(), 1);
712 assert_eq!(metrics[0].value, 7.0);
713 }
714
715 #[test]
716 fn full_text_search_filters_traces_by_payload() {
717 let (b, _d, _g) = backend();
718 b.insert_spans(&[
719 span("t1", "s1", "llm-proxy", 100.0, SpanStatus::Ok),
720 span("t2", "s2", "llm-proxy", 100.0, SpanStatus::Ok),
721 ])
722 .unwrap();
723 let idx = b.search_index();
725 idx.index_span("t1", "s1", "summarize the rate limit policy").unwrap();
726 idx.index_span("t2", "s2", "translate to French").unwrap();
727 idx.commit().unwrap();
728
729 let hits = b
730 .query_traces(&TraceQuery {
731 text: Some("rate limit".into()),
732 ..Default::default()
733 })
734 .unwrap();
735 assert_eq!(hits.len(), 1);
736 assert_eq!(hits[0].trace_id, "t1");
737
738 let none = b
740 .query_traces(&TraceQuery {
741 text: Some("quantum".into()),
742 ..Default::default()
743 })
744 .unwrap();
745 assert!(none.is_empty());
746 }
747
748 #[test]
749 fn survives_reopen_via_persistent_hot_tier() {
750 let dir = tempfile::tempdir().unwrap();
751 let path = dir.path().to_str().unwrap();
752 let key = format!("tael-test-reopen-{}", uuid::Uuid::new_v4());
753 {
754 let b = TaelBackend::with_wal_key(path, &key).unwrap();
755 b.insert_spans(&[span("t1", "s1", "api", 10.0, SpanStatus::Ok)])
756 .unwrap();
757 }
758 let b2 = TaelBackend::with_wal_key(path, &key).unwrap();
760 assert_eq!(b2.get_trace("t1").unwrap().len(), 1);
761 let _ = std::fs::remove_dir_all(format!("wal_files/{key}"));
762 }
763}