1mod cold;
13mod hot;
14mod wal;
15
16use anyhow::{Context, Result, bail};
17
18#[cfg(feature = "duckdb")]
19use super::DuckDbStore;
20use super::Store;
21use super::models::{
22 Anomaly, AnomalyReport, CorrelateReport, ErrorOperation, LogQuery, LogRecord, LogSeverity,
23 LogSummary, MetricPoint, MetricQuery, MetricSummary, ServiceInfo, ServiceSummary, Span,
24 SpanStatus, SummaryReport, TraceComment, TraceQuery, TraceSummary,
25};
26use std::collections::{HashMap, HashSet};
27use std::fs::OpenOptions;
28use std::io::{BufRead, Write};
29use std::path::{Path, PathBuf};
30use std::sync::{Arc, Mutex};
31
32use super::SearchIndex;
33use cold::ColdTier;
34use hot::HotTier;
35use wal::WalLog;
36pub use wal::{WalRecord, WalSink};
37
38pub struct TaelBackend {
39 hot: HotTier,
41 cold: ColdTier,
43 #[cfg(feature = "duckdb")]
45 inner: DuckDbStore,
46 comments: CommentStore,
47 search: Arc<SearchIndex>,
49 wal: WalLog,
50}
51
52impl TaelBackend {
53 pub fn new(data_dir: &str) -> Result<Self> {
54 Self::with_wal_key(data_dir, "tael-backend")
55 }
56
57 pub fn with_wal_key(data_dir: &str, wal_key: &str) -> Result<Self> {
60 Self::with_wal_key_and_sinks(data_dir, wal_key, Vec::new(), None)
61 }
62
63 pub fn with_wal_key_and_sinks(
71 data_dir: &str,
72 wal_key: &str,
73 sinks: Vec<Arc<dyn WalSink>>,
74 required_acks: Option<usize>,
75 ) -> Result<Self> {
76 let hot = HotTier::open(data_dir)?;
77 let cold = ColdTier::open(data_dir)?;
78 #[cfg(feature = "duckdb")]
79 let inner = DuckDbStore::new(data_dir)?;
80 let comments = CommentStore::open(data_dir)?;
81 let search = Arc::new(SearchIndex::open(data_dir)?);
82 let mut wal = if sinks.is_empty() {
83 WalLog::new_for_key(wal_key)?
84 } else {
85 WalLog::new_for_key_with_sinks(wal_key, sinks)?
86 };
87 if let Some(n) = required_acks {
88 wal = wal.with_required_acks(n);
89 }
90 let backend = Self {
91 hot,
92 cold,
93 #[cfg(feature = "duckdb")]
94 inner,
95 comments,
96 search,
97 wal,
98 };
99 backend.replay()?;
100 Ok(backend)
101 }
102
103 pub fn search_index(&self) -> Arc<SearchIndex> {
107 Arc::clone(&self.search)
108 }
109
110 pub fn compact_spans(&self, cutoff: chrono::DateTime<chrono::Utc>) -> Result<usize> {
113 let cutoff_ns = cutoff.timestamp_nanos_opt().unwrap_or(0);
114 let evicted = self.hot.evict_spans_before(cutoff_ns)?;
115 if evicted.is_empty() {
116 return Ok(0);
117 }
118 self.cold.write_spans(&evicted)?;
121 tracing::info!(
122 spans = evicted.len(),
123 "tael-backend: compacted spans to cold tier"
124 );
125 Ok(evicted.len())
126 }
127
128 pub fn compact_logs_metrics(&self, cutoff: chrono::DateTime<chrono::Utc>) -> Result<usize> {
131 let cutoff_ns = cutoff.timestamp_nanos_opt().unwrap_or(0);
132 let logs = self.hot.evict_logs_before(cutoff_ns)?;
133 if !logs.is_empty() {
134 self.cold.write_logs(&logs)?;
135 }
136 let metrics = self.hot.evict_metrics_before(cutoff_ns)?;
137 if !metrics.is_empty() {
138 self.cold.write_metrics(&metrics)?;
139 self.cold.write_downsampled(&metrics)?;
142 }
143 let n = logs.len() + metrics.len();
144 if n > 0 {
145 tracing::info!(
146 logs = logs.len(),
147 metrics = metrics.len(),
148 "tael-backend: compacted logs/metrics to cold tier"
149 );
150 }
151 Ok(n)
152 }
153
154 pub fn collect_live_blob_hashes(&self) -> Result<std::collections::HashSet<String>> {
158 use super::models::{LogQuery, TraceQuery};
159 let mut live = std::collections::HashSet::new();
160 let spans = self.query_traces(&TraceQuery {
162 limit: Some(u32::MAX),
163 ..Default::default()
164 })?;
165 for s in spans {
166 if let Some(llm) = s.llm {
167 live.extend(llm.prompt_sha256);
168 live.extend(llm.completion_sha256);
169 }
170 }
171 let logs = self.query_logs(&LogQuery {
172 limit: Some(u32::MAX),
173 ..Default::default()
174 })?;
175 for l in logs {
176 live.extend(l.body_sha256);
177 }
178 Ok(live)
179 }
180
181 pub fn enforce_span_retention(&self, keep: chrono::DateTime<chrono::Utc>) -> Result<usize> {
185 let cutoff_date = keep.format("%Y-%m-%d").to_string();
186 let dropped = self.cold.drop_partitions_before(&cutoff_date)?;
187 if dropped > 0 {
188 tracing::info!(
189 partitions = dropped,
190 "tael-backend: dropped expired cold partitions"
191 );
192 }
193 Ok(dropped)
194 }
195
196 fn apply_spans(&self, spans: &[Span]) -> Result<()> {
199 self.hot.insert_spans(spans)?;
200 #[cfg(feature = "duckdb")]
201 {
202 self.inner.insert_spans(spans)
203 }
204 #[cfg(not(feature = "duckdb"))]
205 {
206 Ok(())
207 }
208 }
209 fn apply_logs(&self, logs: &[LogRecord]) -> Result<()> {
210 self.hot.insert_logs(logs)?;
211 #[cfg(feature = "duckdb")]
212 {
213 self.inner.insert_logs(logs)
214 }
215 #[cfg(not(feature = "duckdb"))]
216 {
217 Ok(())
218 }
219 }
220 fn apply_metrics(&self, metrics: &[MetricPoint]) -> Result<()> {
221 self.hot.insert_metrics(metrics)?;
222 #[cfg(feature = "duckdb")]
223 {
224 self.inner.insert_metrics(metrics)
225 }
226 #[cfg(not(feature = "duckdb"))]
227 {
228 Ok(())
229 }
230 }
231
232 fn replay(&self) -> Result<()> {
235 let records = self.wal.drain()?;
236 if records.is_empty() {
237 return Ok(());
238 }
239 let mut spans = 0usize;
240 let mut logs = 0usize;
241 let mut metrics = 0usize;
242 for record in records {
243 match record {
244 WalRecord::Spans(s) => {
245 spans += s.len();
246 self.apply_spans(&s)?;
247 }
248 WalRecord::Logs(l) => {
249 logs += l.len();
250 self.apply_logs(&l)?;
251 }
252 WalRecord::Metrics(m) => {
253 metrics += m.len();
254 self.apply_metrics(&m)?;
255 }
256 }
257 }
258 tracing::info!(spans, logs, metrics, "tael-backend: replayed WAL");
259 Ok(())
260 }
261}
262
263impl Store for TaelBackend {
264 fn insert_spans(&self, spans: &[Span]) -> Result<()> {
266 self.wal.append_spans(spans)?;
267 self.apply_spans(spans)?;
268 self.wal.mark_applied()?;
269 Ok(())
270 }
271
272 fn insert_logs(&self, logs: &[LogRecord]) -> Result<()> {
273 self.wal.append_logs(logs)?;
274 self.apply_logs(logs)?;
275 self.wal.mark_applied()?;
276 Ok(())
277 }
278
279 fn insert_metrics(&self, metrics: &[MetricPoint]) -> Result<()> {
280 self.wal.append_metrics(metrics)?;
281 self.apply_metrics(metrics)?;
282 self.wal.mark_applied()?;
283 Ok(())
284 }
285
286 fn query_traces(&self, query: &TraceQuery) -> Result<Vec<Span>> {
288 if let Some(ref text) = query.text {
291 let trace_ids = self.search.search_trace_ids(text, 1000)?;
292 if trace_ids.is_empty() {
293 return Ok(Vec::new());
294 }
295 let cutoff = query
296 .last_seconds
297 .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
298 let limit = query.limit.unwrap_or(100) as usize;
299 let mut matched: Vec<Span> = Vec::new();
300 for tid in &trace_ids {
301 for s in self.get_trace(tid)? {
302 if hot::span_matches(&s, query, cutoff) {
303 matched.push(s);
304 }
305 }
306 }
307 matched.sort_by(|a, b| b.start_time.cmp(&a.start_time));
308 matched.truncate(limit);
309 return Ok(matched);
310 }
311 let mut results = self.hot.query_traces(query)?;
314 let limit = query.limit.unwrap_or(100) as usize;
315 if results.len() < limit {
316 let cutoff = query
317 .last_seconds
318 .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
319 let mut cold: Vec<Span> = self
320 .cold
321 .all_spans()?
322 .into_iter()
323 .filter(|s| hot::span_matches(s, query, cutoff))
324 .collect();
325 cold.sort_by(|a, b| b.start_time.cmp(&a.start_time));
326 for s in cold {
327 if results.len() >= limit {
328 break;
329 }
330 results.push(s);
331 }
332 }
333 Ok(results)
334 }
335 fn get_trace(&self, trace_id: &str) -> Result<Vec<Span>> {
336 let mut spans = self.hot.get_trace(trace_id)?;
337 let mut seen: std::collections::HashSet<String> =
338 spans.iter().map(|s| s.span_id.clone()).collect();
339 for s in self.cold.get_trace(trace_id)? {
342 if seen.insert(s.span_id.clone()) {
343 spans.push(s);
344 }
345 }
346 spans.sort_by_key(|s| s.start_time);
347 Ok(spans)
348 }
349 fn list_services(&self) -> Result<Vec<ServiceInfo>> {
350 self.hot.list_services()
351 }
352 fn query_logs(&self, query: &LogQuery) -> Result<Vec<LogRecord>> {
353 let mut results = self.hot.query_logs(query)?;
354 let limit = query.limit.unwrap_or(100) as usize;
355 if results.len() < limit {
356 let cutoff = query
357 .last_seconds
358 .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
359 let mut cold: Vec<LogRecord> = self
360 .cold
361 .all_logs()?
362 .into_iter()
363 .filter(|l| hot::log_matches(l, query, cutoff))
364 .collect();
365 cold.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
366 for l in cold {
367 if results.len() >= limit {
368 break;
369 }
370 results.push(l);
371 }
372 }
373 Ok(results)
374 }
375 fn query_metrics(&self, query: &MetricQuery) -> Result<Vec<MetricPoint>> {
376 let mut results = self.hot.query_metrics(query)?;
377 let limit = query.limit.unwrap_or(500) as usize;
378 if results.len() < limit {
379 let cutoff = query
380 .last_seconds
381 .map(|s| chrono::Utc::now() - chrono::Duration::seconds(s));
382 let mut cold: Vec<MetricPoint> = self
383 .cold
384 .all_metrics()?
385 .into_iter()
386 .filter(|m| hot::metric_matches(m, query, cutoff))
387 .collect();
388 cold.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
389 for m in cold {
390 if results.len() >= limit {
391 break;
392 }
393 results.push(m);
394 }
395 }
396 Ok(results)
397 }
398
399 fn add_comment(
401 &self,
402 trace_id: &str,
403 span_id: Option<&str>,
404 author: &str,
405 body: &str,
406 ) -> Result<TraceComment> {
407 #[cfg(feature = "duckdb")]
408 {
409 self.inner.add_comment(trace_id, span_id, author, body)
410 }
411 #[cfg(not(feature = "duckdb"))]
412 {
413 self.comments.add(trace_id, span_id, author, body)
414 }
415 }
416 fn get_comments(&self, trace_id: &str) -> Result<Vec<TraceComment>> {
417 #[cfg(feature = "duckdb")]
418 {
419 self.inner.get_comments(trace_id)
420 }
421 #[cfg(not(feature = "duckdb"))]
422 {
423 self.comments.get(trace_id)
424 }
425 }
426 fn query_summary(&self, last_seconds: i64, service: Option<&str>) -> Result<SummaryReport> {
427 #[cfg(feature = "duckdb")]
428 {
429 self.inner.query_summary(last_seconds, service)
430 }
431 #[cfg(not(feature = "duckdb"))]
432 {
433 self.summary_native(last_seconds, service)
434 }
435 }
436 fn query_anomalies(
437 &self,
438 current_seconds: i64,
439 baseline_seconds: i64,
440 service: Option<&str>,
441 ) -> Result<AnomalyReport> {
442 #[cfg(feature = "duckdb")]
443 {
444 self
445 .inner
446 .query_anomalies(current_seconds, baseline_seconds, service)
447 }
448 #[cfg(not(feature = "duckdb"))]
449 {
450 self.anomalies_native(current_seconds, baseline_seconds, service)
451 }
452 }
453 fn query_correlate(&self, trace_id: &str) -> Result<Option<CorrelateReport>> {
454 #[cfg(feature = "duckdb")]
455 {
456 self.inner.query_correlate(trace_id)
457 }
458 #[cfg(not(feature = "duckdb"))]
459 {
460 self.correlate_native(trace_id)
461 }
462 }
463 fn query_sql(&self, sql: &str) -> Result<Vec<serde_json::Value>> {
464 #[cfg(feature = "duckdb")]
465 {
466 self.inner.query_sql(sql)
467 }
468 #[cfg(not(feature = "duckdb"))]
469 {
470 let _ = sql;
471 bail!("SQL queries require a build with the `duckdb` feature")
472 }
473 }
474
475 fn flush(&self) -> Result<()> {
476 self.hot.flush()
479 }
480
481 fn apply_framed_wal(&self, framed: &[u8]) -> Result<()> {
487 let record = WalRecord::decode(framed)?;
488 self.wal.append_framed(framed)?;
489 match &record {
490 WalRecord::Spans(s) => self.apply_spans(s)?,
491 WalRecord::Logs(l) => self.apply_logs(l)?,
492 WalRecord::Metrics(m) => self.apply_metrics(m)?,
493 }
494 self.wal.mark_applied()?;
495 Ok(())
496 }
497}
498
499impl TaelBackend {
500 fn summary_native(&self, last_seconds: i64, service: Option<&str>) -> Result<SummaryReport> {
501 let spans = self.query_traces(&TraceQuery {
502 service: service.map(str::to_string),
503 last_seconds: Some(last_seconds),
504 limit: Some(u32::MAX),
505 ..Default::default()
506 })?;
507 let logs = self.query_logs(&LogQuery {
508 service: service.map(str::to_string),
509 last_seconds: Some(last_seconds),
510 limit: Some(u32::MAX),
511 ..Default::default()
512 })?;
513 let metrics = self.query_metrics(&MetricQuery {
514 service: service.map(str::to_string),
515 last_seconds: Some(last_seconds),
516 limit: Some(u32::MAX),
517 ..Default::default()
518 })?;
519 Ok(SummaryReport {
520 window_seconds: last_seconds,
521 service_filter: service.map(str::to_string),
522 traces: trace_summary(&spans),
523 top_services: service_summaries(&spans),
524 top_error_operations: error_operations(&spans),
525 logs: log_summary(&logs),
526 metrics: metric_summary(&metrics),
527 })
528 }
529
530 fn anomalies_native(
531 &self,
532 current_seconds: i64,
533 baseline_seconds: i64,
534 service: Option<&str>,
535 ) -> Result<AnomalyReport> {
536 let current = self.query_traces(&TraceQuery {
537 service: service.map(str::to_string),
538 last_seconds: Some(current_seconds),
539 limit: Some(u32::MAX),
540 ..Default::default()
541 })?;
542 let baseline = self.query_traces(&TraceQuery {
543 service: service.map(str::to_string),
544 last_seconds: Some(baseline_seconds),
545 limit: Some(u32::MAX),
546 ..Default::default()
547 })?;
548 Ok(AnomalyReport {
549 current_seconds,
550 baseline_seconds,
551 service_filter: service.map(str::to_string),
552 anomalies: anomalies(¤t, &baseline),
553 })
554 }
555
556 fn correlate_native(&self, trace_id: &str) -> Result<Option<CorrelateReport>> {
557 let spans = self.get_trace(trace_id)?;
558 if spans.is_empty() {
559 return Ok(None);
560 }
561 let start = spans
562 .iter()
563 .map(|s| s.start_time)
564 .min()
565 .unwrap_or_else(chrono::Utc::now);
566 let end = spans
567 .iter()
568 .map(|s| s.end_time)
569 .max()
570 .unwrap_or_else(chrono::Utc::now);
571 let services = spans
572 .iter()
573 .map(|s| s.service.clone())
574 .collect::<HashSet<_>>()
575 .into_iter()
576 .collect();
577 let logs = self.query_logs(&LogQuery {
578 trace_id: Some(trace_id.to_string()),
579 limit: Some(500),
580 ..Default::default()
581 })?;
582 let service = spans.first().map(|s| s.service.clone());
583 let metrics = self.query_metrics(&MetricQuery {
584 service,
585 limit: Some(500),
586 ..Default::default()
587 })?;
588 Ok(Some(CorrelateReport {
589 trace_id: trace_id.to_string(),
590 span_count: spans.len(),
591 services,
592 start_time: start.to_rfc3339(),
593 end_time: end.to_rfc3339(),
594 duration_ms: (end - start).num_microseconds().unwrap_or(0) as f64 / 1000.0,
595 error_count: spans
596 .iter()
597 .filter(|s| matches!(s.status, SpanStatus::Error))
598 .count() as i64,
599 logs,
600 metrics,
601 }))
602 }
603}
604
605#[derive(Debug)]
606struct CommentStore {
607 path: PathBuf,
608 comments: Mutex<HashMap<String, Vec<TraceComment>>>,
609}
610
611impl CommentStore {
612 fn open(data_dir: &str) -> Result<Self> {
613 let path = Path::new(data_dir).join("trace_comments.jsonl");
614 let mut comments: HashMap<String, Vec<TraceComment>> = HashMap::new();
615 if path.exists() {
616 let file = std::fs::File::open(&path)
617 .with_context(|| format!("opening {}", path.display()))?;
618 for line in std::io::BufReader::new(file).lines() {
619 let line = line?;
620 if line.trim().is_empty() {
621 continue;
622 }
623 let comment: TraceComment = serde_json::from_str(&line)
624 .with_context(|| format!("decoding {}", path.display()))?;
625 comments
626 .entry(comment.trace_id.clone())
627 .or_default()
628 .push(comment);
629 }
630 }
631 Ok(Self {
632 path,
633 comments: Mutex::new(comments),
634 })
635 }
636
637 fn add(
638 &self,
639 trace_id: &str,
640 span_id: Option<&str>,
641 author: &str,
642 body: &str,
643 ) -> Result<TraceComment> {
644 if let Some(parent) = self.path.parent() {
645 std::fs::create_dir_all(parent)?;
646 }
647 let comment = TraceComment {
648 id: uuid::Uuid::new_v4().to_string(),
649 trace_id: trace_id.to_string(),
650 span_id: span_id.map(str::to_string),
651 author: author.to_string(),
652 body: body.to_string(),
653 created_at: chrono::Utc::now().to_rfc3339(),
654 };
655 let mut file = OpenOptions::new()
656 .create(true)
657 .append(true)
658 .open(&self.path)
659 .with_context(|| format!("opening {}", self.path.display()))?;
660 writeln!(file, "{}", serde_json::to_string(&comment)?)?;
661 self.comments
662 .lock()
663 .expect("comment store lock poisoned")
664 .entry(trace_id.to_string())
665 .or_default()
666 .push(comment.clone());
667 Ok(comment)
668 }
669
670 fn get(&self, trace_id: &str) -> Result<Vec<TraceComment>> {
671 Ok(self
672 .comments
673 .lock()
674 .expect("comment store lock poisoned")
675 .get(trace_id)
676 .cloned()
677 .unwrap_or_default())
678 }
679}
680
681fn trace_summary(spans: &[Span]) -> TraceSummary {
682 let mut durations: Vec<f64> = spans.iter().map(|s| s.duration_ms).collect();
683 durations.sort_by(|a, b| a.total_cmp(b));
684 let trace_count = spans.iter().map(|s| &s.trace_id).collect::<HashSet<_>>().len() as i64;
685 let error_count = spans
686 .iter()
687 .filter(|s| matches!(s.status, SpanStatus::Error))
688 .count() as i64;
689 TraceSummary {
690 span_count: spans.len() as i64,
691 trace_count,
692 error_count,
693 error_rate: ratio(error_count, spans.len() as i64),
694 avg_ms: if durations.is_empty() {
695 0.0
696 } else {
697 durations.iter().sum::<f64>() / durations.len() as f64
698 },
699 max_ms: durations.last().copied().unwrap_or(0.0),
700 p50_ms: percentile(&durations, 0.50),
701 p95_ms: percentile(&durations, 0.95),
702 p99_ms: percentile(&durations, 0.99),
703 }
704}
705
706fn service_summaries(spans: &[Span]) -> Vec<ServiceSummary> {
707 let mut by_service: HashMap<String, Vec<Span>> = HashMap::new();
708 for span in spans {
709 by_service
710 .entry(span.service.clone())
711 .or_default()
712 .push(span.clone());
713 }
714 let mut rows: Vec<ServiceSummary> = by_service
715 .into_iter()
716 .map(|(service, spans)| {
717 let summary = trace_summary(&spans);
718 ServiceSummary {
719 service,
720 span_count: summary.span_count,
721 error_rate: summary.error_rate,
722 p95_ms: summary.p95_ms,
723 }
724 })
725 .collect();
726 rows.sort_by(|a, b| b.span_count.cmp(&a.span_count));
727 rows.truncate(10);
728 rows
729}
730
731fn error_operations(spans: &[Span]) -> Vec<ErrorOperation> {
732 let mut counts: HashMap<(String, String), i64> = HashMap::new();
733 for span in spans
734 .iter()
735 .filter(|s| matches!(s.status, SpanStatus::Error))
736 {
737 *counts
738 .entry((span.service.clone(), span.operation.clone()))
739 .or_default() += 1;
740 }
741 let mut rows: Vec<ErrorOperation> = counts
742 .into_iter()
743 .map(|((service, operation), error_count)| ErrorOperation {
744 service,
745 operation,
746 error_count,
747 })
748 .collect();
749 rows.sort_by(|a, b| b.error_count.cmp(&a.error_count));
750 rows.truncate(10);
751 rows
752}
753
754fn log_summary(logs: &[LogRecord]) -> LogSummary {
755 let mut summary = LogSummary {
756 total: logs.len() as i64,
757 ..Default::default()
758 };
759 for log in logs {
760 match log.severity {
761 LogSeverity::Error | LogSeverity::Fatal => summary.error += 1,
762 LogSeverity::Warn => summary.warn += 1,
763 LogSeverity::Info => summary.info += 1,
764 LogSeverity::Debug => summary.debug += 1,
765 _ => {}
766 }
767 }
768 summary
769}
770
771fn metric_summary(metrics: &[MetricPoint]) -> MetricSummary {
772 MetricSummary {
773 point_count: metrics.len() as i64,
774 unique_names: metrics.iter().map(|m| &m.name).collect::<HashSet<_>>().len() as i64,
775 }
776}
777
778fn anomalies(current: &[Span], baseline: &[Span]) -> Vec<Anomaly> {
779 let cur = service_summaries(current);
780 let base = service_summaries(baseline);
781 let base_map: HashMap<String, ServiceSummary> =
782 base.into_iter().map(|s| (s.service.clone(), s)).collect();
783 let mut rows = Vec::new();
784 for c in cur {
785 let Some(b) = base_map.get(&c.service) else {
786 continue;
787 };
788 if c.error_rate > b.error_rate + 0.05 {
789 rows.push(Anomaly {
790 service: c.service.clone(),
791 kind: "error_rate".to_string(),
792 severity: if c.error_rate > b.error_rate + 0.20 {
793 "high".to_string()
794 } else {
795 "medium".to_string()
796 },
797 current: c.error_rate,
798 baseline: b.error_rate,
799 delta: c.error_rate - b.error_rate,
800 description: format!(
801 "{} error rate increased from {:.1}% to {:.1}%",
802 c.service,
803 b.error_rate * 100.0,
804 c.error_rate * 100.0
805 ),
806 });
807 }
808 if c.p95_ms > b.p95_ms * 1.5 && c.p95_ms - b.p95_ms > 25.0 {
809 rows.push(Anomaly {
810 service: c.service.clone(),
811 kind: "p95_latency".to_string(),
812 severity: if c.p95_ms > b.p95_ms * 2.0 {
813 "high".to_string()
814 } else {
815 "medium".to_string()
816 },
817 current: c.p95_ms,
818 baseline: b.p95_ms,
819 delta: c.p95_ms - b.p95_ms,
820 description: format!(
821 "{} p95 latency increased from {:.1}ms to {:.1}ms",
822 c.service, b.p95_ms, c.p95_ms
823 ),
824 });
825 }
826 }
827 rows
828}
829
830fn percentile(sorted: &[f64], p: f64) -> f64 {
831 if sorted.is_empty() {
832 return 0.0;
833 }
834 let idx = ((sorted.len() - 1) as f64 * p).round() as usize;
835 sorted[idx]
836}
837
838fn ratio(numerator: i64, denominator: i64) -> f64 {
839 if denominator == 0 {
840 0.0
841 } else {
842 numerator as f64 / denominator as f64
843 }
844}
845
846#[cfg(test)]
847mod tests {
848 use super::*;
849 use crate::storage::models::{SpanKind, SpanStatus};
850 use chrono::Utc;
851 use std::collections::HashMap;
852
853 struct NsGuard(String);
855 impl Drop for NsGuard {
856 fn drop(&mut self) {
857 let _ = std::fs::remove_dir_all(format!("wal_files/{}", self.0));
858 }
859 }
860
861 fn backend() -> (TaelBackend, tempfile::TempDir, NsGuard) {
862 let dir = tempfile::tempdir().unwrap();
863 let key = format!("tael-test-backend-{}", uuid::Uuid::new_v4());
864 let b = TaelBackend::with_wal_key(dir.path().to_str().unwrap(), &key).unwrap();
865 (b, dir, NsGuard(key))
866 }
867
868 fn span(trace: &str, sid: &str, svc: &str, dur: f64, status: SpanStatus) -> Span {
869 let now = Utc::now();
870 Span {
871 trace_id: trace.into(),
872 span_id: sid.into(),
873 parent_span_id: None,
874 service: svc.into(),
875 operation: "op".into(),
876 start_time: now,
877 end_time: now,
878 duration_ms: dur,
879 status,
880 attributes: HashMap::new(),
881 events: vec![],
882 kind: SpanKind::Internal,
883 llm: None,
884 }
885 }
886
887 #[test]
888 fn get_trace_reconstructs_span_tree_from_hot_tier() {
889 let (b, _d, _g) = backend();
890 b.insert_spans(&[
891 span("t1", "s1", "api", 10.0, SpanStatus::Ok),
892 span("t1", "s2", "db", 20.0, SpanStatus::Ok),
893 span("t2", "s3", "api", 5.0, SpanStatus::Error),
894 ])
895 .unwrap();
896
897 let trace = b.get_trace("t1").unwrap();
898 assert_eq!(trace.len(), 2);
899 assert!(trace.iter().all(|s| s.trace_id == "t1"));
900 assert_eq!(b.get_trace("t2").unwrap().len(), 1);
901 assert!(b.get_trace("missing").unwrap().is_empty());
902 }
903
904 #[test]
905 #[cfg(feature = "duckdb")]
906 fn query_traces_filters_match_duckdb() {
907 let (b, _d, _g) = backend();
908 let spans = vec![
909 span("t1", "s1", "api", 10.0, SpanStatus::Ok),
910 span("t2", "s2", "db", 600.0, SpanStatus::Error),
911 span("t3", "s3", "api", 50.0, SpanStatus::Error),
912 ];
913 b.insert_spans(&spans).unwrap();
914
915 let oracle_dir = tempfile::tempdir().unwrap();
917 let oracle = DuckDbStore::new(oracle_dir.path().to_str().unwrap()).unwrap();
918 oracle.insert_spans(&spans).unwrap();
919
920 let queries = [
921 TraceQuery {
922 service: Some("api".into()),
923 ..Default::default()
924 },
925 TraceQuery {
926 status: Some("error".into()),
927 ..Default::default()
928 },
929 TraceQuery {
930 min_duration_ms: Some(100.0),
931 ..Default::default()
932 },
933 TraceQuery::default(),
934 ];
935 for q in &queries {
936 let mut hot: Vec<String> = b
937 .query_traces(q)
938 .unwrap()
939 .into_iter()
940 .map(|s| s.span_id)
941 .collect();
942 let mut duck: Vec<String> = oracle
943 .query_traces(q)
944 .unwrap()
945 .into_iter()
946 .map(|s| s.span_id)
947 .collect();
948 hot.sort();
949 duck.sort();
950 assert_eq!(hot, duck, "hot tier and DuckDB disagree for {q:?}");
951 }
952 }
953
954 #[test]
955 fn list_services_aggregates_from_hot_tier() {
956 let (b, _d, _g) = backend();
957 b.insert_spans(&[
958 span("t1", "s1", "api", 10.0, SpanStatus::Ok),
959 span("t1", "s2", "api", 30.0, SpanStatus::Error),
960 span("t2", "s3", "db", 20.0, SpanStatus::Ok),
961 ])
962 .unwrap();
963
964 let services = b.list_services().unwrap();
965 let api = services.iter().find(|s| s.name == "api").unwrap();
966 assert_eq!(api.span_count, 2);
967 assert_eq!(api.trace_count, 1);
968 assert_eq!(api.avg_duration_ms, 20.0);
969 assert!((api.error_rate - 0.5).abs() < 1e-9);
970 }
971
972 #[test]
973 fn standby_rebuilds_identical_state_from_shipped_wal() {
974 use crate::storage::models::{LogRecord, LogSeverity};
975
976 let (standby, _sd, _sg) = backend();
978 let standby = Arc::new(standby);
979
980 struct ReplicaSink(Arc<TaelBackend>);
983 impl WalSink for ReplicaSink {
984 fn append_framed(&self, framed: &[u8]) -> Result<()> {
985 self.0.apply_framed_wal(framed)
986 }
987 }
988
989 let leader_dir = tempfile::tempdir().unwrap();
991 let leader_key = format!("tael-test-leader-{}", uuid::Uuid::new_v4());
992 let _lg = NsGuard(leader_key.clone());
993 let leader = TaelBackend::with_wal_key_and_sinks(
994 leader_dir.path().to_str().unwrap(),
995 &leader_key,
996 vec![Arc::new(ReplicaSink(Arc::clone(&standby)))],
997 None, )
999 .unwrap();
1000
1001 leader
1003 .insert_spans(&[
1004 span("t1", "s1", "api", 10.0, SpanStatus::Ok),
1005 span("t1", "s2", "db", 20.0, SpanStatus::Ok),
1006 span("t2", "s3", "api", 5.0, SpanStatus::Error),
1007 ])
1008 .unwrap();
1009 leader
1010 .insert_logs(&[LogRecord {
1011 timestamp: Utc::now(),
1012 observed_timestamp: Utc::now(),
1013 trace_id: Some("t1".into()),
1014 span_id: None,
1015 severity: LogSeverity::Error,
1016 severity_text: "ERROR".into(),
1017 body: "boom".into(),
1018 service: "api".into(),
1019 attributes: HashMap::new(),
1020 body_sha256: None,
1021 }])
1022 .unwrap();
1023
1024 assert_eq!(standby.get_trace("t1").unwrap().len(), 2);
1026 assert_eq!(standby.get_trace("t2").unwrap().len(), 1);
1027 let leader_traces = leader.query_traces(&TraceQuery::default()).unwrap();
1028 let standby_traces = standby.query_traces(&TraceQuery::default()).unwrap();
1029 assert_eq!(leader_traces.len(), standby_traces.len());
1030 assert_eq!(standby_traces.len(), 3);
1031 }
1032
1033 #[test]
1034 fn logs_and_metrics_round_trip_through_hot_tier() {
1035 use crate::storage::models::{LogRecord, LogSeverity, MetricPoint, MetricType};
1036 let (b, _d, _g) = backend();
1037
1038 b.insert_logs(&[LogRecord {
1039 timestamp: Utc::now(),
1040 observed_timestamp: Utc::now(),
1041 trace_id: Some("t1".into()),
1042 span_id: None,
1043 severity: LogSeverity::Error,
1044 severity_text: "ERROR".into(),
1045 body: "connection refused".into(),
1046 service: "api".into(),
1047 attributes: HashMap::new(),
1048 body_sha256: None,
1049 }])
1050 .unwrap();
1051 let logs = b
1052 .query_logs(&LogQuery {
1053 severity: Some("error".into()),
1054 ..Default::default()
1055 })
1056 .unwrap();
1057 assert_eq!(logs.len(), 1);
1058 assert_eq!(logs[0].body, "connection refused");
1059
1060 b.insert_metrics(&[MetricPoint {
1061 timestamp: Utc::now(),
1062 service: "api".into(),
1063 name: "http_requests_total".into(),
1064 metric_type: MetricType::Sum,
1065 value: 42.0,
1066 unit: "1".into(),
1067 attributes: HashMap::new(),
1068 }])
1069 .unwrap();
1070 let metrics = b
1071 .query_metrics(&MetricQuery {
1072 name: Some("http_requests_total".into()),
1073 ..Default::default()
1074 })
1075 .unwrap();
1076 assert_eq!(metrics.len(), 1);
1077 assert_eq!(metrics[0].value, 42.0);
1078 }
1079
1080 #[test]
1081 fn compaction_moves_spans_to_cold_and_reads_still_union() {
1082 let (b, _d, _g) = backend();
1083 b.insert_spans(&[
1084 span("t1", "s1", "api", 10.0, SpanStatus::Ok),
1085 span("t1", "s2", "db", 20.0, SpanStatus::Ok),
1086 span("t2", "s3", "api", 5.0, SpanStatus::Error),
1087 ])
1088 .unwrap();
1089
1090 let moved = b
1092 .compact_spans(Utc::now() + chrono::Duration::seconds(60))
1093 .unwrap();
1094 assert_eq!(moved, 3);
1095 assert!(b.hot.get_trace("t1").unwrap().is_empty());
1097 assert_eq!(b.get_trace("t1").unwrap().len(), 2);
1099 let all = b.query_traces(&TraceQuery::default()).unwrap();
1100 assert_eq!(all.len(), 3);
1101 let errors = b
1102 .query_traces(&TraceQuery {
1103 status: Some("error".into()),
1104 ..Default::default()
1105 })
1106 .unwrap();
1107 assert_eq!(errors.len(), 1);
1108 assert_eq!(errors[0].span_id, "s3");
1109
1110 assert_eq!(b.compact_spans(Utc::now()).unwrap(), 0);
1112 }
1113
1114 #[test]
1115 fn logs_metrics_compact_to_cold_and_union_reads() {
1116 use crate::storage::models::{LogRecord, LogSeverity, MetricPoint, MetricType};
1117 let (b, _d, _g) = backend();
1118 b.insert_logs(&[LogRecord {
1119 timestamp: Utc::now(),
1120 observed_timestamp: Utc::now(),
1121 trace_id: Some("t1".into()),
1122 span_id: None,
1123 severity: LogSeverity::Error,
1124 severity_text: "ERROR".into(),
1125 body: "boom".into(),
1126 service: "api".into(),
1127 attributes: HashMap::new(),
1128 body_sha256: None,
1129 }])
1130 .unwrap();
1131 b.insert_metrics(&[MetricPoint {
1132 timestamp: Utc::now(),
1133 service: "api".into(),
1134 name: "rps".into(),
1135 metric_type: MetricType::Sum,
1136 value: 7.0,
1137 unit: "1".into(),
1138 attributes: HashMap::new(),
1139 }])
1140 .unwrap();
1141
1142 let moved = b
1143 .compact_logs_metrics(Utc::now() + chrono::Duration::seconds(60))
1144 .unwrap();
1145 assert_eq!(moved, 2);
1146
1147 let logs = b.query_logs(&LogQuery::default()).unwrap();
1149 assert_eq!(logs.len(), 1);
1150 assert_eq!(logs[0].body, "boom");
1151 let metrics = b.query_metrics(&MetricQuery::default()).unwrap();
1152 assert_eq!(metrics.len(), 1);
1153 assert_eq!(metrics[0].value, 7.0);
1154 }
1155
1156 #[test]
1157 fn full_text_search_filters_traces_by_payload() {
1158 let (b, _d, _g) = backend();
1159 b.insert_spans(&[
1160 span("t1", "s1", "llm-proxy", 100.0, SpanStatus::Ok),
1161 span("t2", "s2", "llm-proxy", 100.0, SpanStatus::Ok),
1162 ])
1163 .unwrap();
1164 let idx = b.search_index();
1166 idx.index_span("t1", "s1", "summarize the rate limit policy")
1167 .unwrap();
1168 idx.index_span("t2", "s2", "translate to French").unwrap();
1169 idx.commit().unwrap();
1170
1171 let hits = b
1172 .query_traces(&TraceQuery {
1173 text: Some("rate limit".into()),
1174 ..Default::default()
1175 })
1176 .unwrap();
1177 assert_eq!(hits.len(), 1);
1178 assert_eq!(hits[0].trace_id, "t1");
1179
1180 let none = b
1182 .query_traces(&TraceQuery {
1183 text: Some("quantum".into()),
1184 ..Default::default()
1185 })
1186 .unwrap();
1187 assert!(none.is_empty());
1188 }
1189
1190 #[test]
1191 fn survives_reopen_via_persistent_hot_tier() {
1192 let dir = tempfile::tempdir().unwrap();
1193 let path = dir.path().to_str().unwrap();
1194 let key = format!("tael-test-reopen-{}", uuid::Uuid::new_v4());
1195 {
1196 let b = TaelBackend::with_wal_key(path, &key).unwrap();
1197 b.insert_spans(&[span("t1", "s1", "api", 10.0, SpanStatus::Ok)])
1198 .unwrap();
1199 }
1200 let b2 = TaelBackend::with_wal_key(path, &key).unwrap();
1202 assert_eq!(b2.get_trace("t1").unwrap().len(), 1);
1203 let _ = std::fs::remove_dir_all(format!("wal_files/{key}"));
1204 }
1205}