1mod cold;
13mod hot;
14mod wal;
15
16use anyhow::{Result, bail};
17
18#[cfg(feature = "duckdb")]
19use super::DuckDbStore;
20use super::Store;
21use super::models::{
22 Anomaly, AnomalyReport, CorrelateReport, ErrorOperation, LogQuery, LogRecord, LogSeverity,
23 LogSummary, MetricPoint, MetricQuery, MetricSummary, ServiceInfo, ServiceSummary, Span,
24 SpanStatus, SummaryReport, TraceComment, TraceQuery, TraceSummary,
25};
26use std::collections::{HashMap, HashSet};
27use std::sync::Arc;
28
29use super::DynObjectBackend;
30use super::SearchIndex;
31use super::{CommentsStore, JsonlComments};
32use cold::ColdTier;
33use hot::HotTier;
34use wal::WalLog;
35pub use wal::{WalRecord, WalSink};
36
37pub struct TaelBackend {
38 hot: HotTier,
40 cold: ColdTier,
42 #[cfg(feature = "duckdb")]
44 inner: DuckDbStore,
45 comments: Box<dyn CommentsStore>,
47 search: Arc<SearchIndex>,
49 wal: WalLog,
50}
51
52impl TaelBackend {
53 pub fn new(data_dir: &str) -> Result<Self> {
54 Self::with_wal_key(data_dir, "tael-backend")
55 }
56
57 pub fn with_wal_key(data_dir: &str, wal_key: &str) -> Result<Self> {
60 Self::with_wal_key_and_sinks(data_dir, wal_key, Vec::new(), None)
61 }
62
63 pub fn with_wal_key_and_sinks(
71 data_dir: &str,
72 wal_key: &str,
73 sinks: Vec<Arc<dyn WalSink>>,
74 required_acks: Option<usize>,
75 ) -> Result<Self> {
76 let comments: Box<dyn CommentsStore> = Box::new(JsonlComments::open(data_dir)?);
78 Self::with_components(data_dir, wal_key, sinks, required_acks, None, comments)
79 }
80
81 pub fn with_components(
86 data_dir: &str,
87 wal_key: &str,
88 sinks: Vec<Arc<dyn WalSink>>,
89 required_acks: Option<usize>,
90 cold_backend: Option<DynObjectBackend>,
91 comments: Box<dyn CommentsStore>,
92 ) -> Result<Self> {
93 let hot = HotTier::open(data_dir)?;
94 let cold = match cold_backend {
95 Some(backend) => ColdTier::with_backend(backend)?,
96 None => ColdTier::open(data_dir)?,
97 };
98 #[cfg(feature = "duckdb")]
99 let inner = DuckDbStore::new(data_dir)?;
100 let search = Arc::new(SearchIndex::open(data_dir)?);
101 let mut wal = if sinks.is_empty() {
102 WalLog::new_for_key(wal_key)?
103 } else {
104 WalLog::new_for_key_with_sinks(wal_key, sinks)?
105 };
106 if let Some(n) = required_acks {
107 wal = wal.with_required_acks(n);
108 }
109 let backend = Self {
110 hot,
111 cold,
112 #[cfg(feature = "duckdb")]
113 inner,
114 comments,
115 search,
116 wal,
117 };
118 backend.replay()?;
119 Ok(backend)
120 }
121
122 pub fn search_index(&self) -> Arc<SearchIndex> {
126 Arc::clone(&self.search)
127 }
128
129 pub fn compact_spans(&self, cutoff: chrono::DateTime<chrono::Utc>) -> Result<usize> {
132 let cutoff_ns = cutoff.timestamp_nanos_opt().unwrap_or(0);
133 let evicted = self.hot.evict_spans_before(cutoff_ns)?;
134 if evicted.is_empty() {
135 return Ok(0);
136 }
137 self.cold.write_spans(&evicted)?;
140 tracing::info!(
141 spans = evicted.len(),
142 "tael-backend: compacted spans to cold tier"
143 );
144 Ok(evicted.len())
145 }
146
147 pub fn compact_logs_metrics(&self, cutoff: chrono::DateTime<chrono::Utc>) -> Result<usize> {
150 let cutoff_ns = cutoff.timestamp_nanos_opt().unwrap_or(0);
151 let logs = self.hot.evict_logs_before(cutoff_ns)?;
152 if !logs.is_empty() {
153 self.cold.write_logs(&logs)?;
154 }
155 let metrics = self.hot.evict_metrics_before(cutoff_ns)?;
156 if !metrics.is_empty() {
157 self.cold.write_metrics(&metrics)?;
158 self.cold.write_downsampled(&metrics)?;
161 }
162 let n = logs.len() + metrics.len();
163 if n > 0 {
164 tracing::info!(
165 logs = logs.len(),
166 metrics = metrics.len(),
167 "tael-backend: compacted logs/metrics to cold tier"
168 );
169 }
170 Ok(n)
171 }
172
173 pub fn collect_live_blob_hashes(&self) -> Result<std::collections::HashSet<String>> {
177 use super::models::{LogQuery, TraceQuery};
178 let mut live = std::collections::HashSet::new();
179 let spans = self.query_traces(&TraceQuery {
181 limit: Some(u32::MAX),
182 ..Default::default()
183 })?;
184 for s in spans {
185 if let Some(llm) = s.llm {
186 live.extend(llm.prompt_sha256);
187 live.extend(llm.completion_sha256);
188 }
189 }
190 let logs = self.query_logs(&LogQuery {
191 limit: Some(u32::MAX),
192 ..Default::default()
193 })?;
194 for l in logs {
195 live.extend(l.body_sha256);
196 }
197 Ok(live)
198 }
199
200 pub fn enforce_span_retention(&self, keep: chrono::DateTime<chrono::Utc>) -> Result<usize> {
204 let cutoff_date = keep.format("%Y-%m-%d").to_string();
205 let dropped = self.cold.drop_partitions_before(&cutoff_date)?;
206 if dropped > 0 {
207 tracing::info!(
208 partitions = dropped,
209 "tael-backend: dropped expired cold partitions"
210 );
211 }
212 Ok(dropped)
213 }
214
215 fn apply_spans(&self, spans: &[Span]) -> Result<()> {
218 self.hot.insert_spans(spans)?;
219 #[cfg(feature = "duckdb")]
220 {
221 self.inner.insert_spans(spans)
222 }
223 #[cfg(not(feature = "duckdb"))]
224 {
225 Ok(())
226 }
227 }
228 fn apply_logs(&self, logs: &[LogRecord]) -> Result<()> {
229 self.hot.insert_logs(logs)?;
230 #[cfg(feature = "duckdb")]
231 {
232 self.inner.insert_logs(logs)
233 }
234 #[cfg(not(feature = "duckdb"))]
235 {
236 Ok(())
237 }
238 }
239 fn apply_metrics(&self, metrics: &[MetricPoint]) -> Result<()> {
240 self.hot.insert_metrics(metrics)?;
241 #[cfg(feature = "duckdb")]
242 {
243 self.inner.insert_metrics(metrics)
244 }
245 #[cfg(not(feature = "duckdb"))]
246 {
247 Ok(())
248 }
249 }
250
251 fn replay(&self) -> Result<()> {
254 let records = self.wal.drain()?;
255 if records.is_empty() {
256 return Ok(());
257 }
258 let mut spans = 0usize;
259 let mut logs = 0usize;
260 let mut metrics = 0usize;
261 for record in records {
262 match record {
263 WalRecord::Spans(s) => {
264 spans += s.len();
265 self.apply_spans(&s)?;
266 }
267 WalRecord::Logs(l) => {
268 logs += l.len();
269 self.apply_logs(&l)?;
270 }
271 WalRecord::Metrics(m) => {
272 metrics += m.len();
273 self.apply_metrics(&m)?;
274 }
275 }
276 }
277 tracing::info!(spans, logs, metrics, "tael-backend: replayed WAL");
278 Ok(())
279 }
280}
281
282impl Store for TaelBackend {
283 fn insert_spans(&self, spans: &[Span]) -> Result<()> {
285 self.wal.append_spans(spans)?;
286 self.apply_spans(spans)?;
287 self.wal.mark_applied()?;
288 Ok(())
289 }
290
291 fn insert_logs(&self, logs: &[LogRecord]) -> Result<()> {
292 self.wal.append_logs(logs)?;
293 self.apply_logs(logs)?;
294 self.wal.mark_applied()?;
295 Ok(())
296 }
297
298 fn insert_metrics(&self, metrics: &[MetricPoint]) -> Result<()> {
299 self.wal.append_metrics(metrics)?;
300 self.apply_metrics(metrics)?;
301 self.wal.mark_applied()?;
302 Ok(())
303 }
304
305 fn query_traces(&self, query: &TraceQuery) -> Result<Vec<Span>> {
307 if let Some(ref text) = query.text {
310 let trace_ids = self.search.search_trace_ids(text, 1000)?;
311 if trace_ids.is_empty() {
312 return Ok(Vec::new());
313 }
314 let cutoff = query
315 .last_seconds
316 .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
317 let limit = query.limit.unwrap_or(100) as usize;
318 let mut matched: Vec<Span> = Vec::new();
319 for tid in &trace_ids {
320 for s in self.get_trace(tid)? {
321 if hot::span_matches(&s, query, cutoff) {
322 matched.push(s);
323 }
324 }
325 }
326 matched.sort_by(|a, b| b.start_time.cmp(&a.start_time));
327 matched.truncate(limit);
328 return Ok(matched);
329 }
330 let mut results = self.hot.query_traces(query)?;
333 let limit = query.limit.unwrap_or(100) as usize;
334 if results.len() < limit {
335 let cutoff = query
336 .last_seconds
337 .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
338 let mut cold: Vec<Span> = self
339 .cold
340 .all_spans()?
341 .into_iter()
342 .filter(|s| hot::span_matches(s, query, cutoff))
343 .collect();
344 cold.sort_by(|a, b| b.start_time.cmp(&a.start_time));
345 for s in cold {
346 if results.len() >= limit {
347 break;
348 }
349 results.push(s);
350 }
351 }
352 Ok(results)
353 }
354 fn get_trace(&self, trace_id: &str) -> Result<Vec<Span>> {
355 let mut spans = self.hot.get_trace(trace_id)?;
356 let mut seen: std::collections::HashSet<String> =
357 spans.iter().map(|s| s.span_id.clone()).collect();
358 for s in self.cold.get_trace(trace_id)? {
361 if seen.insert(s.span_id.clone()) {
362 spans.push(s);
363 }
364 }
365 spans.sort_by_key(|s| s.start_time);
366 Ok(spans)
367 }
368 fn list_services(&self) -> Result<Vec<ServiceInfo>> {
369 self.hot.list_services()
370 }
371 fn query_logs(&self, query: &LogQuery) -> Result<Vec<LogRecord>> {
372 let mut results = self.hot.query_logs(query)?;
373 let limit = query.limit.unwrap_or(100) as usize;
374 if results.len() < limit {
375 let cutoff = query
376 .last_seconds
377 .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
378 let mut cold: Vec<LogRecord> = self
379 .cold
380 .all_logs()?
381 .into_iter()
382 .filter(|l| hot::log_matches(l, query, cutoff))
383 .collect();
384 cold.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
385 for l in cold {
386 if results.len() >= limit {
387 break;
388 }
389 results.push(l);
390 }
391 }
392 Ok(results)
393 }
394 fn query_metrics(&self, query: &MetricQuery) -> Result<Vec<MetricPoint>> {
395 let mut results = self.hot.query_metrics(query)?;
396 let limit = query.limit.unwrap_or(500) as usize;
397 if results.len() < limit {
398 let cutoff = query
399 .last_seconds
400 .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
401 let mut cold: Vec<MetricPoint> = self
402 .cold
403 .all_metrics()?
404 .into_iter()
405 .filter(|m| hot::metric_matches(m, query, cutoff))
406 .collect();
407 cold.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
408 for m in cold {
409 if results.len() >= limit {
410 break;
411 }
412 results.push(m);
413 }
414 }
415 Ok(results)
416 }
417
418 fn add_comment(
420 &self,
421 trace_id: &str,
422 span_id: Option<&str>,
423 author: &str,
424 body: &str,
425 ) -> Result<TraceComment> {
426 #[cfg(feature = "duckdb")]
427 {
428 self.inner.add_comment(trace_id, span_id, author, body)
429 }
430 #[cfg(not(feature = "duckdb"))]
431 {
432 self.comments.add(trace_id, span_id, author, body)
433 }
434 }
435 fn get_comments(&self, trace_id: &str) -> Result<Vec<TraceComment>> {
436 #[cfg(feature = "duckdb")]
437 {
438 self.inner.get_comments(trace_id)
439 }
440 #[cfg(not(feature = "duckdb"))]
441 {
442 self.comments.get(trace_id)
443 }
444 }
445 fn query_summary(&self, last_seconds: i64, service: Option<&str>) -> Result<SummaryReport> {
446 #[cfg(feature = "duckdb")]
447 {
448 self.inner.query_summary(last_seconds, service)
449 }
450 #[cfg(not(feature = "duckdb"))]
451 {
452 self.summary_native(last_seconds, service)
453 }
454 }
455 fn query_anomalies(
456 &self,
457 current_seconds: i64,
458 baseline_seconds: i64,
459 service: Option<&str>,
460 ) -> Result<AnomalyReport> {
461 #[cfg(feature = "duckdb")]
462 {
463 self
464 .inner
465 .query_anomalies(current_seconds, baseline_seconds, service)
466 }
467 #[cfg(not(feature = "duckdb"))]
468 {
469 self.anomalies_native(current_seconds, baseline_seconds, service)
470 }
471 }
472 fn query_correlate(&self, trace_id: &str) -> Result<Option<CorrelateReport>> {
473 #[cfg(feature = "duckdb")]
474 {
475 self.inner.query_correlate(trace_id)
476 }
477 #[cfg(not(feature = "duckdb"))]
478 {
479 self.correlate_native(trace_id)
480 }
481 }
482 fn query_sql(&self, sql: &str) -> Result<Vec<serde_json::Value>> {
483 #[cfg(feature = "duckdb")]
484 {
485 self.inner.query_sql(sql)
486 }
487 #[cfg(not(feature = "duckdb"))]
488 {
489 let _ = sql;
490 bail!("SQL queries require a build with the `duckdb` feature")
491 }
492 }
493
494 fn flush(&self) -> Result<()> {
495 self.hot.flush()
498 }
499
500 fn apply_framed_wal(&self, framed: &[u8]) -> Result<()> {
506 let record = WalRecord::decode(framed)?;
507 self.wal.append_framed(framed)?;
508 match &record {
509 WalRecord::Spans(s) => self.apply_spans(s)?,
510 WalRecord::Logs(l) => self.apply_logs(l)?,
511 WalRecord::Metrics(m) => self.apply_metrics(m)?,
512 }
513 self.wal.mark_applied()?;
514 Ok(())
515 }
516}
517
518impl TaelBackend {
519 fn summary_native(&self, last_seconds: i64, service: Option<&str>) -> Result<SummaryReport> {
520 let spans = self.query_traces(&TraceQuery {
521 service: service.map(str::to_string),
522 last_seconds: Some(last_seconds),
523 limit: Some(u32::MAX),
524 ..Default::default()
525 })?;
526 let logs = self.query_logs(&LogQuery {
527 service: service.map(str::to_string),
528 last_seconds: Some(last_seconds),
529 limit: Some(u32::MAX),
530 ..Default::default()
531 })?;
532 let metrics = self.query_metrics(&MetricQuery {
533 service: service.map(str::to_string),
534 last_seconds: Some(last_seconds),
535 limit: Some(u32::MAX),
536 ..Default::default()
537 })?;
538 Ok(SummaryReport {
539 window_seconds: last_seconds,
540 service_filter: service.map(str::to_string),
541 traces: trace_summary(&spans),
542 top_services: service_summaries(&spans),
543 top_error_operations: error_operations(&spans),
544 logs: log_summary(&logs),
545 metrics: metric_summary(&metrics),
546 })
547 }
548
549 fn anomalies_native(
550 &self,
551 current_seconds: i64,
552 baseline_seconds: i64,
553 service: Option<&str>,
554 ) -> Result<AnomalyReport> {
555 let current = self.query_traces(&TraceQuery {
556 service: service.map(str::to_string),
557 last_seconds: Some(current_seconds),
558 limit: Some(u32::MAX),
559 ..Default::default()
560 })?;
561 let baseline = self.query_traces(&TraceQuery {
562 service: service.map(str::to_string),
563 last_seconds: Some(baseline_seconds),
564 limit: Some(u32::MAX),
565 ..Default::default()
566 })?;
567 Ok(AnomalyReport {
568 current_seconds,
569 baseline_seconds,
570 service_filter: service.map(str::to_string),
571 anomalies: anomalies(¤t, &baseline),
572 })
573 }
574
575 fn correlate_native(&self, trace_id: &str) -> Result<Option<CorrelateReport>> {
576 let spans = self.get_trace(trace_id)?;
577 if spans.is_empty() {
578 return Ok(None);
579 }
580 let start = spans
581 .iter()
582 .map(|s| s.start_time)
583 .min()
584 .unwrap_or_else(chrono::Utc::now);
585 let end = spans
586 .iter()
587 .map(|s| s.end_time)
588 .max()
589 .unwrap_or_else(chrono::Utc::now);
590 let services = spans
591 .iter()
592 .map(|s| s.service.clone())
593 .collect::<HashSet<_>>()
594 .into_iter()
595 .collect();
596 let logs = self.query_logs(&LogQuery {
597 trace_id: Some(trace_id.to_string()),
598 limit: Some(500),
599 ..Default::default()
600 })?;
601 let service = spans.first().map(|s| s.service.clone());
602 let metrics = self.query_metrics(&MetricQuery {
603 service,
604 limit: Some(500),
605 ..Default::default()
606 })?;
607 Ok(Some(CorrelateReport {
608 trace_id: trace_id.to_string(),
609 span_count: spans.len(),
610 services,
611 start_time: start.to_rfc3339(),
612 end_time: end.to_rfc3339(),
613 duration_ms: (end - start).num_microseconds().unwrap_or(0) as f64 / 1000.0,
614 error_count: spans
615 .iter()
616 .filter(|s| matches!(s.status, SpanStatus::Error))
617 .count() as i64,
618 logs,
619 metrics,
620 }))
621 }
622}
623
624fn trace_summary(spans: &[Span]) -> TraceSummary {
625 let mut durations: Vec<f64> = spans.iter().map(|s| s.duration_ms).collect();
626 durations.sort_by(|a, b| a.total_cmp(b));
627 let trace_count = spans.iter().map(|s| &s.trace_id).collect::<HashSet<_>>().len() as i64;
628 let error_count = spans
629 .iter()
630 .filter(|s| matches!(s.status, SpanStatus::Error))
631 .count() as i64;
632 TraceSummary {
633 span_count: spans.len() as i64,
634 trace_count,
635 error_count,
636 error_rate: ratio(error_count, spans.len() as i64),
637 avg_ms: if durations.is_empty() {
638 0.0
639 } else {
640 durations.iter().sum::<f64>() / durations.len() as f64
641 },
642 max_ms: durations.last().copied().unwrap_or(0.0),
643 p50_ms: percentile(&durations, 0.50),
644 p95_ms: percentile(&durations, 0.95),
645 p99_ms: percentile(&durations, 0.99),
646 }
647}
648
649fn service_summaries(spans: &[Span]) -> Vec<ServiceSummary> {
650 let mut by_service: HashMap<String, Vec<Span>> = HashMap::new();
651 for span in spans {
652 by_service
653 .entry(span.service.clone())
654 .or_default()
655 .push(span.clone());
656 }
657 let mut rows: Vec<ServiceSummary> = by_service
658 .into_iter()
659 .map(|(service, spans)| {
660 let summary = trace_summary(&spans);
661 ServiceSummary {
662 service,
663 span_count: summary.span_count,
664 error_rate: summary.error_rate,
665 p95_ms: summary.p95_ms,
666 }
667 })
668 .collect();
669 rows.sort_by(|a, b| b.span_count.cmp(&a.span_count));
670 rows.truncate(10);
671 rows
672}
673
674fn error_operations(spans: &[Span]) -> Vec<ErrorOperation> {
675 let mut counts: HashMap<(String, String), i64> = HashMap::new();
676 for span in spans
677 .iter()
678 .filter(|s| matches!(s.status, SpanStatus::Error))
679 {
680 *counts
681 .entry((span.service.clone(), span.operation.clone()))
682 .or_default() += 1;
683 }
684 let mut rows: Vec<ErrorOperation> = counts
685 .into_iter()
686 .map(|((service, operation), error_count)| ErrorOperation {
687 service,
688 operation,
689 error_count,
690 })
691 .collect();
692 rows.sort_by(|a, b| b.error_count.cmp(&a.error_count));
693 rows.truncate(10);
694 rows
695}
696
697fn log_summary(logs: &[LogRecord]) -> LogSummary {
698 let mut summary = LogSummary {
699 total: logs.len() as i64,
700 ..Default::default()
701 };
702 for log in logs {
703 match log.severity {
704 LogSeverity::Error | LogSeverity::Fatal => summary.error += 1,
705 LogSeverity::Warn => summary.warn += 1,
706 LogSeverity::Info => summary.info += 1,
707 LogSeverity::Debug => summary.debug += 1,
708 _ => {}
709 }
710 }
711 summary
712}
713
714fn metric_summary(metrics: &[MetricPoint]) -> MetricSummary {
715 MetricSummary {
716 point_count: metrics.len() as i64,
717 unique_names: metrics.iter().map(|m| &m.name).collect::<HashSet<_>>().len() as i64,
718 }
719}
720
721fn anomalies(current: &[Span], baseline: &[Span]) -> Vec<Anomaly> {
722 let cur = service_summaries(current);
723 let base = service_summaries(baseline);
724 let base_map: HashMap<String, ServiceSummary> =
725 base.into_iter().map(|s| (s.service.clone(), s)).collect();
726 let mut rows = Vec::new();
727 for c in cur {
728 let Some(b) = base_map.get(&c.service) else {
729 continue;
730 };
731 if c.error_rate > b.error_rate + 0.05 {
732 rows.push(Anomaly {
733 service: c.service.clone(),
734 kind: "error_rate".to_string(),
735 severity: if c.error_rate > b.error_rate + 0.20 {
736 "high".to_string()
737 } else {
738 "medium".to_string()
739 },
740 current: c.error_rate,
741 baseline: b.error_rate,
742 delta: c.error_rate - b.error_rate,
743 description: format!(
744 "{} error rate increased from {:.1}% to {:.1}%",
745 c.service,
746 b.error_rate * 100.0,
747 c.error_rate * 100.0
748 ),
749 });
750 }
751 if c.p95_ms > b.p95_ms * 1.5 && c.p95_ms - b.p95_ms > 25.0 {
752 rows.push(Anomaly {
753 service: c.service.clone(),
754 kind: "p95_latency".to_string(),
755 severity: if c.p95_ms > b.p95_ms * 2.0 {
756 "high".to_string()
757 } else {
758 "medium".to_string()
759 },
760 current: c.p95_ms,
761 baseline: b.p95_ms,
762 delta: c.p95_ms - b.p95_ms,
763 description: format!(
764 "{} p95 latency increased from {:.1}ms to {:.1}ms",
765 c.service, b.p95_ms, c.p95_ms
766 ),
767 });
768 }
769 }
770 rows
771}
772
773fn percentile(sorted: &[f64], p: f64) -> f64 {
774 if sorted.is_empty() {
775 return 0.0;
776 }
777 let idx = ((sorted.len() - 1) as f64 * p).round() as usize;
778 sorted[idx]
779}
780
781fn ratio(numerator: i64, denominator: i64) -> f64 {
782 if denominator == 0 {
783 0.0
784 } else {
785 numerator as f64 / denominator as f64
786 }
787}
788
789#[cfg(test)]
790mod tests {
791 use super::*;
792 use crate::storage::models::{SpanKind, SpanStatus};
793 use chrono::Utc;
794 use std::collections::HashMap;
795
796 struct NsGuard(String);
798 impl Drop for NsGuard {
799 fn drop(&mut self) {
800 let _ = std::fs::remove_dir_all(format!("wal_files/{}", self.0));
801 }
802 }
803
804 fn backend() -> (TaelBackend, tempfile::TempDir, NsGuard) {
805 let dir = tempfile::tempdir().unwrap();
806 let key = format!("tael-test-backend-{}", uuid::Uuid::new_v4());
807 let b = TaelBackend::with_wal_key(dir.path().to_str().unwrap(), &key).unwrap();
808 (b, dir, NsGuard(key))
809 }
810
811 fn span(trace: &str, sid: &str, svc: &str, dur: f64, status: SpanStatus) -> Span {
812 let now = Utc::now();
813 Span {
814 trace_id: trace.into(),
815 span_id: sid.into(),
816 parent_span_id: None,
817 service: svc.into(),
818 operation: "op".into(),
819 start_time: now,
820 end_time: now,
821 duration_ms: dur,
822 status,
823 attributes: HashMap::new(),
824 events: vec![],
825 kind: SpanKind::Internal,
826 llm: None,
827 }
828 }
829
830 #[test]
831 fn get_trace_reconstructs_span_tree_from_hot_tier() {
832 let (b, _d, _g) = backend();
833 b.insert_spans(&[
834 span("t1", "s1", "api", 10.0, SpanStatus::Ok),
835 span("t1", "s2", "db", 20.0, SpanStatus::Ok),
836 span("t2", "s3", "api", 5.0, SpanStatus::Error),
837 ])
838 .unwrap();
839
840 let trace = b.get_trace("t1").unwrap();
841 assert_eq!(trace.len(), 2);
842 assert!(trace.iter().all(|s| s.trace_id == "t1"));
843 assert_eq!(b.get_trace("t2").unwrap().len(), 1);
844 assert!(b.get_trace("missing").unwrap().is_empty());
845 }
846
847 #[test]
848 #[cfg(feature = "duckdb")]
849 fn query_traces_filters_match_duckdb() {
850 let (b, _d, _g) = backend();
851 let spans = vec![
852 span("t1", "s1", "api", 10.0, SpanStatus::Ok),
853 span("t2", "s2", "db", 600.0, SpanStatus::Error),
854 span("t3", "s3", "api", 50.0, SpanStatus::Error),
855 ];
856 b.insert_spans(&spans).unwrap();
857
858 let oracle_dir = tempfile::tempdir().unwrap();
860 let oracle = DuckDbStore::new(oracle_dir.path().to_str().unwrap()).unwrap();
861 oracle.insert_spans(&spans).unwrap();
862
863 let queries = [
864 TraceQuery {
865 service: Some("api".into()),
866 ..Default::default()
867 },
868 TraceQuery {
869 status: Some("error".into()),
870 ..Default::default()
871 },
872 TraceQuery {
873 min_duration_ms: Some(100.0),
874 ..Default::default()
875 },
876 TraceQuery::default(),
877 ];
878 for q in &queries {
879 let mut hot: Vec<String> = b
880 .query_traces(q)
881 .unwrap()
882 .into_iter()
883 .map(|s| s.span_id)
884 .collect();
885 let mut duck: Vec<String> = oracle
886 .query_traces(q)
887 .unwrap()
888 .into_iter()
889 .map(|s| s.span_id)
890 .collect();
891 hot.sort();
892 duck.sort();
893 assert_eq!(hot, duck, "hot tier and DuckDB disagree for {q:?}");
894 }
895 }
896
897 #[test]
898 fn list_services_aggregates_from_hot_tier() {
899 let (b, _d, _g) = backend();
900 b.insert_spans(&[
901 span("t1", "s1", "api", 10.0, SpanStatus::Ok),
902 span("t1", "s2", "api", 30.0, SpanStatus::Error),
903 span("t2", "s3", "db", 20.0, SpanStatus::Ok),
904 ])
905 .unwrap();
906
907 let services = b.list_services().unwrap();
908 let api = services.iter().find(|s| s.name == "api").unwrap();
909 assert_eq!(api.span_count, 2);
910 assert_eq!(api.trace_count, 1);
911 assert_eq!(api.avg_duration_ms, 20.0);
912 assert!((api.error_rate - 0.5).abs() < 1e-9);
913 }
914
915 #[test]
916 fn standby_rebuilds_identical_state_from_shipped_wal() {
917 use crate::storage::models::{LogRecord, LogSeverity};
918
919 let (standby, _sd, _sg) = backend();
921 let standby = Arc::new(standby);
922
923 struct ReplicaSink(Arc<TaelBackend>);
926 impl WalSink for ReplicaSink {
927 fn append_framed(&self, framed: &[u8]) -> Result<()> {
928 self.0.apply_framed_wal(framed)
929 }
930 }
931
932 let leader_dir = tempfile::tempdir().unwrap();
934 let leader_key = format!("tael-test-leader-{}", uuid::Uuid::new_v4());
935 let _lg = NsGuard(leader_key.clone());
936 let leader = TaelBackend::with_wal_key_and_sinks(
937 leader_dir.path().to_str().unwrap(),
938 &leader_key,
939 vec![Arc::new(ReplicaSink(Arc::clone(&standby)))],
940 None, )
942 .unwrap();
943
944 leader
946 .insert_spans(&[
947 span("t1", "s1", "api", 10.0, SpanStatus::Ok),
948 span("t1", "s2", "db", 20.0, SpanStatus::Ok),
949 span("t2", "s3", "api", 5.0, SpanStatus::Error),
950 ])
951 .unwrap();
952 leader
953 .insert_logs(&[LogRecord {
954 timestamp: Utc::now(),
955 observed_timestamp: Utc::now(),
956 trace_id: Some("t1".into()),
957 span_id: None,
958 severity: LogSeverity::Error,
959 severity_text: "ERROR".into(),
960 body: "boom".into(),
961 service: "api".into(),
962 attributes: HashMap::new(),
963 body_sha256: None,
964 }])
965 .unwrap();
966
967 assert_eq!(standby.get_trace("t1").unwrap().len(), 2);
969 assert_eq!(standby.get_trace("t2").unwrap().len(), 1);
970 let leader_traces = leader.query_traces(&TraceQuery::default()).unwrap();
971 let standby_traces = standby.query_traces(&TraceQuery::default()).unwrap();
972 assert_eq!(leader_traces.len(), standby_traces.len());
973 assert_eq!(standby_traces.len(), 3);
974 }
975
976 #[test]
977 fn logs_and_metrics_round_trip_through_hot_tier() {
978 use crate::storage::models::{LogRecord, LogSeverity, MetricPoint, MetricType};
979 let (b, _d, _g) = backend();
980
981 b.insert_logs(&[LogRecord {
982 timestamp: Utc::now(),
983 observed_timestamp: Utc::now(),
984 trace_id: Some("t1".into()),
985 span_id: None,
986 severity: LogSeverity::Error,
987 severity_text: "ERROR".into(),
988 body: "connection refused".into(),
989 service: "api".into(),
990 attributes: HashMap::new(),
991 body_sha256: None,
992 }])
993 .unwrap();
994 let logs = b
995 .query_logs(&LogQuery {
996 severity: Some("error".into()),
997 ..Default::default()
998 })
999 .unwrap();
1000 assert_eq!(logs.len(), 1);
1001 assert_eq!(logs[0].body, "connection refused");
1002
1003 b.insert_metrics(&[MetricPoint {
1004 timestamp: Utc::now(),
1005 service: "api".into(),
1006 name: "http_requests_total".into(),
1007 metric_type: MetricType::Sum,
1008 value: 42.0,
1009 unit: "1".into(),
1010 attributes: HashMap::new(),
1011 }])
1012 .unwrap();
1013 let metrics = b
1014 .query_metrics(&MetricQuery {
1015 name: Some("http_requests_total".into()),
1016 ..Default::default()
1017 })
1018 .unwrap();
1019 assert_eq!(metrics.len(), 1);
1020 assert_eq!(metrics[0].value, 42.0);
1021 }
1022
1023 #[test]
1024 fn compaction_moves_spans_to_cold_and_reads_still_union() {
1025 let (b, _d, _g) = backend();
1026 b.insert_spans(&[
1027 span("t1", "s1", "api", 10.0, SpanStatus::Ok),
1028 span("t1", "s2", "db", 20.0, SpanStatus::Ok),
1029 span("t2", "s3", "api", 5.0, SpanStatus::Error),
1030 ])
1031 .unwrap();
1032
1033 let moved = b
1035 .compact_spans(Utc::now() + chrono::Duration::seconds(60))
1036 .unwrap();
1037 assert_eq!(moved, 3);
1038 assert!(b.hot.get_trace("t1").unwrap().is_empty());
1040 assert_eq!(b.get_trace("t1").unwrap().len(), 2);
1042 let all = b.query_traces(&TraceQuery::default()).unwrap();
1043 assert_eq!(all.len(), 3);
1044 let errors = b
1045 .query_traces(&TraceQuery {
1046 status: Some("error".into()),
1047 ..Default::default()
1048 })
1049 .unwrap();
1050 assert_eq!(errors.len(), 1);
1051 assert_eq!(errors[0].span_id, "s3");
1052
1053 assert_eq!(b.compact_spans(Utc::now()).unwrap(), 0);
1055 }
1056
1057 #[test]
1058 fn logs_metrics_compact_to_cold_and_union_reads() {
1059 use crate::storage::models::{LogRecord, LogSeverity, MetricPoint, MetricType};
1060 let (b, _d, _g) = backend();
1061 b.insert_logs(&[LogRecord {
1062 timestamp: Utc::now(),
1063 observed_timestamp: Utc::now(),
1064 trace_id: Some("t1".into()),
1065 span_id: None,
1066 severity: LogSeverity::Error,
1067 severity_text: "ERROR".into(),
1068 body: "boom".into(),
1069 service: "api".into(),
1070 attributes: HashMap::new(),
1071 body_sha256: None,
1072 }])
1073 .unwrap();
1074 b.insert_metrics(&[MetricPoint {
1075 timestamp: Utc::now(),
1076 service: "api".into(),
1077 name: "rps".into(),
1078 metric_type: MetricType::Sum,
1079 value: 7.0,
1080 unit: "1".into(),
1081 attributes: HashMap::new(),
1082 }])
1083 .unwrap();
1084
1085 let moved = b
1086 .compact_logs_metrics(Utc::now() + chrono::Duration::seconds(60))
1087 .unwrap();
1088 assert_eq!(moved, 2);
1089
1090 let logs = b.query_logs(&LogQuery::default()).unwrap();
1092 assert_eq!(logs.len(), 1);
1093 assert_eq!(logs[0].body, "boom");
1094 let metrics = b.query_metrics(&MetricQuery::default()).unwrap();
1095 assert_eq!(metrics.len(), 1);
1096 assert_eq!(metrics[0].value, 7.0);
1097 }
1098
1099 #[test]
1100 fn full_text_search_filters_traces_by_payload() {
1101 let (b, _d, _g) = backend();
1102 b.insert_spans(&[
1103 span("t1", "s1", "llm-proxy", 100.0, SpanStatus::Ok),
1104 span("t2", "s2", "llm-proxy", 100.0, SpanStatus::Ok),
1105 ])
1106 .unwrap();
1107 let idx = b.search_index();
1109 idx.index_span("t1", "s1", "summarize the rate limit policy")
1110 .unwrap();
1111 idx.index_span("t2", "s2", "translate to French").unwrap();
1112 idx.commit().unwrap();
1113
1114 let hits = b
1115 .query_traces(&TraceQuery {
1116 text: Some("rate limit".into()),
1117 ..Default::default()
1118 })
1119 .unwrap();
1120 assert_eq!(hits.len(), 1);
1121 assert_eq!(hits[0].trace_id, "t1");
1122
1123 let none = b
1125 .query_traces(&TraceQuery {
1126 text: Some("quantum".into()),
1127 ..Default::default()
1128 })
1129 .unwrap();
1130 assert!(none.is_empty());
1131 }
1132
1133 #[test]
1134 fn survives_reopen_via_persistent_hot_tier() {
1135 let dir = tempfile::tempdir().unwrap();
1136 let path = dir.path().to_str().unwrap();
1137 let key = format!("tael-test-reopen-{}", uuid::Uuid::new_v4());
1138 {
1139 let b = TaelBackend::with_wal_key(path, &key).unwrap();
1140 b.insert_spans(&[span("t1", "s1", "api", 10.0, SpanStatus::Ok)])
1141 .unwrap();
1142 }
1143 let b2 = TaelBackend::with_wal_key(path, &key).unwrap();
1145 assert_eq!(b2.get_trace("t1").unwrap().len(), 1);
1146 let _ = std::fs::remove_dir_all(format!("wal_files/{key}"));
1147 }
1148}