1mod cold;
15mod hot;
16mod wal;
17
18use anyhow::Result;
19
20use super::DuckDbStore;
21use super::Store;
22use super::models::{
23 AnomalyReport, CorrelateReport, LogQuery, LogRecord, MetricPoint, MetricQuery, ServiceInfo,
24 Span, SummaryReport, TraceComment, TraceQuery,
25};
26use std::sync::Arc;
27
28use super::SearchIndex;
29use cold::ColdTier;
30use hot::HotTier;
31use wal::WalLog;
32pub use wal::{WalRecord, WalSink};
33
34pub struct TaelBackend {
35 hot: HotTier,
37 cold: ColdTier,
39 inner: DuckDbStore,
41 search: Arc<SearchIndex>,
43 wal: WalLog,
44}
45
46impl TaelBackend {
47 pub fn new(data_dir: &str) -> Result<Self> {
48 Self::with_wal_key(data_dir, "tael-backend")
49 }
50
51 pub fn with_wal_key(data_dir: &str, wal_key: &str) -> Result<Self> {
54 Self::with_wal_key_and_sinks(data_dir, wal_key, Vec::new(), None)
55 }
56
57 pub fn with_wal_key_and_sinks(
65 data_dir: &str,
66 wal_key: &str,
67 sinks: Vec<Arc<dyn WalSink>>,
68 required_acks: Option<usize>,
69 ) -> Result<Self> {
70 let hot = HotTier::open(data_dir)?;
71 let cold = ColdTier::open(data_dir)?;
72 let inner = DuckDbStore::new(data_dir)?;
73 let search = Arc::new(SearchIndex::open(data_dir)?);
74 let mut wal = if sinks.is_empty() {
75 WalLog::new_for_key(wal_key)?
76 } else {
77 WalLog::new_for_key_with_sinks(wal_key, sinks)?
78 };
79 if let Some(n) = required_acks {
80 wal = wal.with_required_acks(n);
81 }
82 let backend = Self {
83 hot,
84 cold,
85 inner,
86 search,
87 wal,
88 };
89 backend.replay()?;
90 Ok(backend)
91 }
92
93 pub fn search_index(&self) -> Arc<SearchIndex> {
97 Arc::clone(&self.search)
98 }
99
100 pub fn compact_spans(&self, cutoff: chrono::DateTime<chrono::Utc>) -> Result<usize> {
103 let cutoff_ns = cutoff.timestamp_nanos_opt().unwrap_or(0);
104 let evicted = self.hot.evict_spans_before(cutoff_ns)?;
105 if evicted.is_empty() {
106 return Ok(0);
107 }
108 self.cold.write_spans(&evicted)?;
111 tracing::info!(
112 spans = evicted.len(),
113 "tael-backend: compacted spans to cold tier"
114 );
115 Ok(evicted.len())
116 }
117
118 pub fn compact_logs_metrics(&self, cutoff: chrono::DateTime<chrono::Utc>) -> Result<usize> {
121 let cutoff_ns = cutoff.timestamp_nanos_opt().unwrap_or(0);
122 let logs = self.hot.evict_logs_before(cutoff_ns)?;
123 if !logs.is_empty() {
124 self.cold.write_logs(&logs)?;
125 }
126 let metrics = self.hot.evict_metrics_before(cutoff_ns)?;
127 if !metrics.is_empty() {
128 self.cold.write_metrics(&metrics)?;
129 self.cold.write_downsampled(&metrics)?;
132 }
133 let n = logs.len() + metrics.len();
134 if n > 0 {
135 tracing::info!(
136 logs = logs.len(),
137 metrics = metrics.len(),
138 "tael-backend: compacted logs/metrics to cold tier"
139 );
140 }
141 Ok(n)
142 }
143
144 pub fn collect_live_blob_hashes(&self) -> Result<std::collections::HashSet<String>> {
148 use super::models::{LogQuery, TraceQuery};
149 let mut live = std::collections::HashSet::new();
150 let spans = self.query_traces(&TraceQuery {
152 limit: Some(u32::MAX),
153 ..Default::default()
154 })?;
155 for s in spans {
156 if let Some(llm) = s.llm {
157 live.extend(llm.prompt_sha256);
158 live.extend(llm.completion_sha256);
159 }
160 }
161 let logs = self.query_logs(&LogQuery {
162 limit: Some(u32::MAX),
163 ..Default::default()
164 })?;
165 for l in logs {
166 live.extend(l.body_sha256);
167 }
168 Ok(live)
169 }
170
171 pub fn enforce_span_retention(&self, keep: chrono::DateTime<chrono::Utc>) -> Result<usize> {
175 let cutoff_date = keep.format("%Y-%m-%d").to_string();
176 let dropped = self.cold.drop_partitions_before(&cutoff_date)?;
177 if dropped > 0 {
178 tracing::info!(
179 partitions = dropped,
180 "tael-backend: dropped expired cold partitions"
181 );
182 }
183 Ok(dropped)
184 }
185
186 fn apply_spans(&self, spans: &[Span]) -> Result<()> {
189 self.hot.insert_spans(spans)?;
190 self.inner.insert_spans(spans)
191 }
192 fn apply_logs(&self, logs: &[LogRecord]) -> Result<()> {
193 self.hot.insert_logs(logs)?;
194 self.inner.insert_logs(logs)
195 }
196 fn apply_metrics(&self, metrics: &[MetricPoint]) -> Result<()> {
197 self.hot.insert_metrics(metrics)?;
198 self.inner.insert_metrics(metrics)
199 }
200
201 fn replay(&self) -> Result<()> {
204 let records = self.wal.drain()?;
205 if records.is_empty() {
206 return Ok(());
207 }
208 let mut spans = 0usize;
209 let mut logs = 0usize;
210 let mut metrics = 0usize;
211 for record in records {
212 match record {
213 WalRecord::Spans(s) => {
214 spans += s.len();
215 self.apply_spans(&s)?;
216 }
217 WalRecord::Logs(l) => {
218 logs += l.len();
219 self.apply_logs(&l)?;
220 }
221 WalRecord::Metrics(m) => {
222 metrics += m.len();
223 self.apply_metrics(&m)?;
224 }
225 }
226 }
227 tracing::info!(spans, logs, metrics, "tael-backend: replayed WAL");
228 Ok(())
229 }
230}
231
232impl Store for TaelBackend {
233 fn insert_spans(&self, spans: &[Span]) -> Result<()> {
235 self.wal.append_spans(spans)?;
236 self.apply_spans(spans)?;
237 self.wal.mark_applied()?;
238 Ok(())
239 }
240
241 fn insert_logs(&self, logs: &[LogRecord]) -> Result<()> {
242 self.wal.append_logs(logs)?;
243 self.apply_logs(logs)?;
244 self.wal.mark_applied()?;
245 Ok(())
246 }
247
248 fn insert_metrics(&self, metrics: &[MetricPoint]) -> Result<()> {
249 self.wal.append_metrics(metrics)?;
250 self.apply_metrics(metrics)?;
251 self.wal.mark_applied()?;
252 Ok(())
253 }
254
255 fn query_traces(&self, query: &TraceQuery) -> Result<Vec<Span>> {
257 if let Some(ref text) = query.text {
260 let trace_ids = self.search.search_trace_ids(text, 1000)?;
261 if trace_ids.is_empty() {
262 return Ok(Vec::new());
263 }
264 let cutoff = query
265 .last_seconds
266 .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
267 let limit = query.limit.unwrap_or(100) as usize;
268 let mut matched: Vec<Span> = Vec::new();
269 for tid in &trace_ids {
270 for s in self.get_trace(tid)? {
271 if hot::span_matches(&s, query, cutoff) {
272 matched.push(s);
273 }
274 }
275 }
276 matched.sort_by(|a, b| b.start_time.cmp(&a.start_time));
277 matched.truncate(limit);
278 return Ok(matched);
279 }
280 let mut results = self.hot.query_traces(query)?;
283 let limit = query.limit.unwrap_or(100) as usize;
284 if results.len() < limit {
285 let cutoff = query
286 .last_seconds
287 .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
288 let mut cold: Vec<Span> = self
289 .cold
290 .all_spans()?
291 .into_iter()
292 .filter(|s| hot::span_matches(s, query, cutoff))
293 .collect();
294 cold.sort_by(|a, b| b.start_time.cmp(&a.start_time));
295 for s in cold {
296 if results.len() >= limit {
297 break;
298 }
299 results.push(s);
300 }
301 }
302 Ok(results)
303 }
304 fn get_trace(&self, trace_id: &str) -> Result<Vec<Span>> {
305 let mut spans = self.hot.get_trace(trace_id)?;
306 let mut seen: std::collections::HashSet<String> =
307 spans.iter().map(|s| s.span_id.clone()).collect();
308 for s in self.cold.get_trace(trace_id)? {
311 if seen.insert(s.span_id.clone()) {
312 spans.push(s);
313 }
314 }
315 spans.sort_by_key(|s| s.start_time);
316 Ok(spans)
317 }
318 fn list_services(&self) -> Result<Vec<ServiceInfo>> {
319 self.hot.list_services()
320 }
321 fn query_logs(&self, query: &LogQuery) -> Result<Vec<LogRecord>> {
322 let mut results = self.hot.query_logs(query)?;
323 let limit = query.limit.unwrap_or(100) as usize;
324 if results.len() < limit {
325 let cutoff = query
326 .last_seconds
327 .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
328 let mut cold: Vec<LogRecord> = self
329 .cold
330 .all_logs()?
331 .into_iter()
332 .filter(|l| hot::log_matches(l, query, cutoff))
333 .collect();
334 cold.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
335 for l in cold {
336 if results.len() >= limit {
337 break;
338 }
339 results.push(l);
340 }
341 }
342 Ok(results)
343 }
344 fn query_metrics(&self, query: &MetricQuery) -> Result<Vec<MetricPoint>> {
345 let mut results = self.hot.query_metrics(query)?;
346 let limit = query.limit.unwrap_or(500) as usize;
347 if results.len() < limit {
348 let cutoff = query
349 .last_seconds
350 .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
351 let mut cold: Vec<MetricPoint> = self
352 .cold
353 .all_metrics()?
354 .into_iter()
355 .filter(|m| hot::metric_matches(m, query, cutoff))
356 .collect();
357 cold.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
358 for m in cold {
359 if results.len() >= limit {
360 break;
361 }
362 results.push(m);
363 }
364 }
365 Ok(results)
366 }
367
368 fn add_comment(
370 &self,
371 trace_id: &str,
372 span_id: Option<&str>,
373 author: &str,
374 body: &str,
375 ) -> Result<TraceComment> {
376 self.inner.add_comment(trace_id, span_id, author, body)
377 }
378 fn get_comments(&self, trace_id: &str) -> Result<Vec<TraceComment>> {
379 self.inner.get_comments(trace_id)
380 }
381 fn query_summary(&self, last_seconds: i64, service: Option<&str>) -> Result<SummaryReport> {
382 self.inner.query_summary(last_seconds, service)
383 }
384 fn query_anomalies(
385 &self,
386 current_seconds: i64,
387 baseline_seconds: i64,
388 service: Option<&str>,
389 ) -> Result<AnomalyReport> {
390 self.inner
391 .query_anomalies(current_seconds, baseline_seconds, service)
392 }
393 fn query_correlate(&self, trace_id: &str) -> Result<Option<CorrelateReport>> {
394 self.inner.query_correlate(trace_id)
395 }
396 fn query_sql(&self, sql: &str) -> Result<Vec<serde_json::Value>> {
397 self.inner.query_sql(sql)
399 }
400
401 fn flush(&self) -> Result<()> {
402 self.hot.flush()
405 }
406
407 fn apply_framed_wal(&self, framed: &[u8]) -> Result<()> {
413 let record = WalRecord::decode(framed)?;
414 self.wal.append_framed(framed)?;
415 match &record {
416 WalRecord::Spans(s) => self.apply_spans(s)?,
417 WalRecord::Logs(l) => self.apply_logs(l)?,
418 WalRecord::Metrics(m) => self.apply_metrics(m)?,
419 }
420 self.wal.mark_applied()?;
421 Ok(())
422 }
423}
424
425#[cfg(test)]
426mod tests {
427 use super::*;
428 use crate::storage::models::{SpanKind, SpanStatus};
429 use chrono::Utc;
430 use std::collections::HashMap;
431
432 struct NsGuard(String);
434 impl Drop for NsGuard {
435 fn drop(&mut self) {
436 let _ = std::fs::remove_dir_all(format!("wal_files/{}", self.0));
437 }
438 }
439
440 fn backend() -> (TaelBackend, tempfile::TempDir, NsGuard) {
441 let dir = tempfile::tempdir().unwrap();
442 let key = format!("tael-test-backend-{}", uuid::Uuid::new_v4());
443 let b = TaelBackend::with_wal_key(dir.path().to_str().unwrap(), &key).unwrap();
444 (b, dir, NsGuard(key))
445 }
446
447 fn span(trace: &str, sid: &str, svc: &str, dur: f64, status: SpanStatus) -> Span {
448 let now = Utc::now();
449 Span {
450 trace_id: trace.into(),
451 span_id: sid.into(),
452 parent_span_id: None,
453 service: svc.into(),
454 operation: "op".into(),
455 start_time: now,
456 end_time: now,
457 duration_ms: dur,
458 status,
459 attributes: HashMap::new(),
460 events: vec![],
461 kind: SpanKind::Internal,
462 llm: None,
463 }
464 }
465
466 #[test]
467 fn get_trace_reconstructs_span_tree_from_hot_tier() {
468 let (b, _d, _g) = backend();
469 b.insert_spans(&[
470 span("t1", "s1", "api", 10.0, SpanStatus::Ok),
471 span("t1", "s2", "db", 20.0, SpanStatus::Ok),
472 span("t2", "s3", "api", 5.0, SpanStatus::Error),
473 ])
474 .unwrap();
475
476 let trace = b.get_trace("t1").unwrap();
477 assert_eq!(trace.len(), 2);
478 assert!(trace.iter().all(|s| s.trace_id == "t1"));
479 assert_eq!(b.get_trace("t2").unwrap().len(), 1);
480 assert!(b.get_trace("missing").unwrap().is_empty());
481 }
482
483 #[test]
484 fn query_traces_filters_match_duckdb() {
485 let (b, _d, _g) = backend();
486 let spans = vec![
487 span("t1", "s1", "api", 10.0, SpanStatus::Ok),
488 span("t2", "s2", "db", 600.0, SpanStatus::Error),
489 span("t3", "s3", "api", 50.0, SpanStatus::Error),
490 ];
491 b.insert_spans(&spans).unwrap();
492
493 let oracle_dir = tempfile::tempdir().unwrap();
495 let oracle = DuckDbStore::new(oracle_dir.path().to_str().unwrap()).unwrap();
496 oracle.insert_spans(&spans).unwrap();
497
498 let queries = [
499 TraceQuery {
500 service: Some("api".into()),
501 ..Default::default()
502 },
503 TraceQuery {
504 status: Some("error".into()),
505 ..Default::default()
506 },
507 TraceQuery {
508 min_duration_ms: Some(100.0),
509 ..Default::default()
510 },
511 TraceQuery::default(),
512 ];
513 for q in &queries {
514 let mut hot: Vec<String> = b
515 .query_traces(q)
516 .unwrap()
517 .into_iter()
518 .map(|s| s.span_id)
519 .collect();
520 let mut duck: Vec<String> = oracle
521 .query_traces(q)
522 .unwrap()
523 .into_iter()
524 .map(|s| s.span_id)
525 .collect();
526 hot.sort();
527 duck.sort();
528 assert_eq!(hot, duck, "hot tier and DuckDB disagree for {q:?}");
529 }
530 }
531
532 #[test]
533 fn list_services_aggregates_from_hot_tier() {
534 let (b, _d, _g) = backend();
535 b.insert_spans(&[
536 span("t1", "s1", "api", 10.0, SpanStatus::Ok),
537 span("t1", "s2", "api", 30.0, SpanStatus::Error),
538 span("t2", "s3", "db", 20.0, SpanStatus::Ok),
539 ])
540 .unwrap();
541
542 let services = b.list_services().unwrap();
543 let api = services.iter().find(|s| s.name == "api").unwrap();
544 assert_eq!(api.span_count, 2);
545 assert_eq!(api.trace_count, 1);
546 assert_eq!(api.avg_duration_ms, 20.0);
547 assert!((api.error_rate - 0.5).abs() < 1e-9);
548 }
549
550 #[test]
551 fn standby_rebuilds_identical_state_from_shipped_wal() {
552 use crate::storage::models::{LogRecord, LogSeverity};
553
554 let (standby, _sd, _sg) = backend();
556 let standby = Arc::new(standby);
557
558 struct ReplicaSink(Arc<TaelBackend>);
561 impl WalSink for ReplicaSink {
562 fn append_framed(&self, framed: &[u8]) -> Result<()> {
563 self.0.apply_framed_wal(framed)
564 }
565 }
566
567 let leader_dir = tempfile::tempdir().unwrap();
569 let leader_key = format!("tael-test-leader-{}", uuid::Uuid::new_v4());
570 let _lg = NsGuard(leader_key.clone());
571 let leader = TaelBackend::with_wal_key_and_sinks(
572 leader_dir.path().to_str().unwrap(),
573 &leader_key,
574 vec![Arc::new(ReplicaSink(Arc::clone(&standby)))],
575 None, )
577 .unwrap();
578
579 leader
581 .insert_spans(&[
582 span("t1", "s1", "api", 10.0, SpanStatus::Ok),
583 span("t1", "s2", "db", 20.0, SpanStatus::Ok),
584 span("t2", "s3", "api", 5.0, SpanStatus::Error),
585 ])
586 .unwrap();
587 leader
588 .insert_logs(&[LogRecord {
589 timestamp: Utc::now(),
590 observed_timestamp: Utc::now(),
591 trace_id: Some("t1".into()),
592 span_id: None,
593 severity: LogSeverity::Error,
594 severity_text: "ERROR".into(),
595 body: "boom".into(),
596 service: "api".into(),
597 attributes: HashMap::new(),
598 body_sha256: None,
599 }])
600 .unwrap();
601
602 assert_eq!(standby.get_trace("t1").unwrap().len(), 2);
604 assert_eq!(standby.get_trace("t2").unwrap().len(), 1);
605 let leader_traces = leader.query_traces(&TraceQuery::default()).unwrap();
606 let standby_traces = standby.query_traces(&TraceQuery::default()).unwrap();
607 assert_eq!(leader_traces.len(), standby_traces.len());
608 assert_eq!(standby_traces.len(), 3);
609 }
610
611 #[test]
612 fn logs_and_metrics_round_trip_through_hot_tier() {
613 use crate::storage::models::{LogRecord, LogSeverity, MetricPoint, MetricType};
614 let (b, _d, _g) = backend();
615
616 b.insert_logs(&[LogRecord {
617 timestamp: Utc::now(),
618 observed_timestamp: Utc::now(),
619 trace_id: Some("t1".into()),
620 span_id: None,
621 severity: LogSeverity::Error,
622 severity_text: "ERROR".into(),
623 body: "connection refused".into(),
624 service: "api".into(),
625 attributes: HashMap::new(),
626 body_sha256: None,
627 }])
628 .unwrap();
629 let logs = b
630 .query_logs(&LogQuery {
631 severity: Some("error".into()),
632 ..Default::default()
633 })
634 .unwrap();
635 assert_eq!(logs.len(), 1);
636 assert_eq!(logs[0].body, "connection refused");
637
638 b.insert_metrics(&[MetricPoint {
639 timestamp: Utc::now(),
640 service: "api".into(),
641 name: "http_requests_total".into(),
642 metric_type: MetricType::Sum,
643 value: 42.0,
644 unit: "1".into(),
645 attributes: HashMap::new(),
646 }])
647 .unwrap();
648 let metrics = b
649 .query_metrics(&MetricQuery {
650 name: Some("http_requests_total".into()),
651 ..Default::default()
652 })
653 .unwrap();
654 assert_eq!(metrics.len(), 1);
655 assert_eq!(metrics[0].value, 42.0);
656 }
657
658 #[test]
659 fn compaction_moves_spans_to_cold_and_reads_still_union() {
660 let (b, _d, _g) = backend();
661 b.insert_spans(&[
662 span("t1", "s1", "api", 10.0, SpanStatus::Ok),
663 span("t1", "s2", "db", 20.0, SpanStatus::Ok),
664 span("t2", "s3", "api", 5.0, SpanStatus::Error),
665 ])
666 .unwrap();
667
668 let moved = b
670 .compact_spans(Utc::now() + chrono::Duration::seconds(60))
671 .unwrap();
672 assert_eq!(moved, 3);
673 assert!(b.hot.get_trace("t1").unwrap().is_empty());
675 assert_eq!(b.get_trace("t1").unwrap().len(), 2);
677 let all = b.query_traces(&TraceQuery::default()).unwrap();
678 assert_eq!(all.len(), 3);
679 let errors = b
680 .query_traces(&TraceQuery {
681 status: Some("error".into()),
682 ..Default::default()
683 })
684 .unwrap();
685 assert_eq!(errors.len(), 1);
686 assert_eq!(errors[0].span_id, "s3");
687
688 assert_eq!(b.compact_spans(Utc::now()).unwrap(), 0);
690 }
691
692 #[test]
693 fn logs_metrics_compact_to_cold_and_union_reads() {
694 use crate::storage::models::{LogRecord, LogSeverity, MetricPoint, MetricType};
695 let (b, _d, _g) = backend();
696 b.insert_logs(&[LogRecord {
697 timestamp: Utc::now(),
698 observed_timestamp: Utc::now(),
699 trace_id: Some("t1".into()),
700 span_id: None,
701 severity: LogSeverity::Error,
702 severity_text: "ERROR".into(),
703 body: "boom".into(),
704 service: "api".into(),
705 attributes: HashMap::new(),
706 body_sha256: None,
707 }])
708 .unwrap();
709 b.insert_metrics(&[MetricPoint {
710 timestamp: Utc::now(),
711 service: "api".into(),
712 name: "rps".into(),
713 metric_type: MetricType::Sum,
714 value: 7.0,
715 unit: "1".into(),
716 attributes: HashMap::new(),
717 }])
718 .unwrap();
719
720 let moved = b
721 .compact_logs_metrics(Utc::now() + chrono::Duration::seconds(60))
722 .unwrap();
723 assert_eq!(moved, 2);
724
725 let logs = b.query_logs(&LogQuery::default()).unwrap();
727 assert_eq!(logs.len(), 1);
728 assert_eq!(logs[0].body, "boom");
729 let metrics = b.query_metrics(&MetricQuery::default()).unwrap();
730 assert_eq!(metrics.len(), 1);
731 assert_eq!(metrics[0].value, 7.0);
732 }
733
734 #[test]
735 fn full_text_search_filters_traces_by_payload() {
736 let (b, _d, _g) = backend();
737 b.insert_spans(&[
738 span("t1", "s1", "llm-proxy", 100.0, SpanStatus::Ok),
739 span("t2", "s2", "llm-proxy", 100.0, SpanStatus::Ok),
740 ])
741 .unwrap();
742 let idx = b.search_index();
744 idx.index_span("t1", "s1", "summarize the rate limit policy")
745 .unwrap();
746 idx.index_span("t2", "s2", "translate to French").unwrap();
747 idx.commit().unwrap();
748
749 let hits = b
750 .query_traces(&TraceQuery {
751 text: Some("rate limit".into()),
752 ..Default::default()
753 })
754 .unwrap();
755 assert_eq!(hits.len(), 1);
756 assert_eq!(hits[0].trace_id, "t1");
757
758 let none = b
760 .query_traces(&TraceQuery {
761 text: Some("quantum".into()),
762 ..Default::default()
763 })
764 .unwrap();
765 assert!(none.is_empty());
766 }
767
768 #[test]
769 fn survives_reopen_via_persistent_hot_tier() {
770 let dir = tempfile::tempdir().unwrap();
771 let path = dir.path().to_str().unwrap();
772 let key = format!("tael-test-reopen-{}", uuid::Uuid::new_v4());
773 {
774 let b = TaelBackend::with_wal_key(path, &key).unwrap();
775 b.insert_spans(&[span("t1", "s1", "api", 10.0, SpanStatus::Ok)])
776 .unwrap();
777 }
778 let b2 = TaelBackend::with_wal_key(path, &key).unwrap();
780 assert_eq!(b2.get_trace("t1").unwrap().len(), 1);
781 let _ = std::fs::remove_dir_all(format!("wal_files/{key}"));
782 }
783}