rag_plusplus_core/observability/
spans.rs

1//! Tracing Spans
2//!
3//! Structured tracing for query execution.
4
5use std::time::{Duration, Instant};
6use tracing::{info_span, Span};
7
8/// Context for distributed tracing.
9#[derive(Debug, Clone, Default)]
10pub struct SpanContext {
11    /// Trace ID (for distributed tracing)
12    pub trace_id: Option<String>,
13    /// Parent span ID
14    pub parent_span_id: Option<String>,
15    /// Additional baggage
16    pub baggage: Vec<(String, String)>,
17}
18
19impl SpanContext {
20    /// Create empty context.
21    #[must_use]
22    pub fn new() -> Self {
23        Self::default()
24    }
25
26    /// Create with trace ID.
27    #[must_use]
28    pub fn with_trace_id(mut self, trace_id: impl Into<String>) -> Self {
29        self.trace_id = Some(trace_id.into());
30        self
31    }
32
33    /// Add baggage item.
34    #[must_use]
35    pub fn with_baggage(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
36        self.baggage.push((key.into(), value.into()));
37        self
38    }
39}
40
41/// Query execution span.
42///
43/// Tracks timing and attributes for a query.
44pub struct QuerySpan {
45    span: Span,
46    start: Instant,
47    #[allow(dead_code)]
48    k: usize,
49    index_count: usize,
50}
51
52impl QuerySpan {
53    /// Start a new query span.
54    #[must_use]
55    pub fn start(k: usize, has_filter: bool, context: Option<&SpanContext>) -> Self {
56        let span = info_span!(
57            "ragpp.query",
58            k = k,
59            has_filter = has_filter,
60            otel.kind = "client",
61        );
62
63        // Add trace context if provided
64        if let Some(ctx) = context {
65            if let Some(ref trace_id) = ctx.trace_id {
66                span.record("trace_id", trace_id.as_str());
67            }
68        }
69
70        Self {
71            span,
72            start: Instant::now(),
73            k,
74            index_count: 0,
75        }
76    }
77
78    /// Enter the span.
79    pub fn enter(&self) -> tracing::span::Entered<'_> {
80        self.span.enter()
81    }
82
83    /// Record index search.
84    pub fn record_index_search(&mut self, index_name: &str, candidates: usize, latency: Duration) {
85        self.index_count += 1;
86
87        tracing::info!(
88            parent: &self.span,
89            index = index_name,
90            candidates = candidates,
91            latency_ms = latency.as_millis() as u64,
92            "index search completed"
93        );
94    }
95
96    /// Record reranking.
97    pub fn record_rerank(&self, input_count: usize, output_count: usize, latency: Duration) {
98        tracing::info!(
99            parent: &self.span,
100            input_count = input_count,
101            output_count = output_count,
102            latency_ms = latency.as_millis() as u64,
103            "reranking completed"
104        );
105    }
106
107    /// Record filter application.
108    pub fn record_filter(&self, before: usize, after: usize) {
109        tracing::info!(
110            parent: &self.span,
111            before = before,
112            after = after,
113            filtered = before - after,
114            "filter applied"
115        );
116    }
117
118    /// Finish the span with results.
119    pub fn finish(self, result_count: usize, error: Option<&str>) {
120        let latency = self.start.elapsed();
121
122        if let Some(err) = error {
123            tracing::error!(
124                parent: &self.span,
125                result_count = result_count,
126                latency_ms = latency.as_millis() as u64,
127                indexes_searched = self.index_count,
128                error = err,
129                "query failed"
130            );
131        } else {
132            tracing::info!(
133                parent: &self.span,
134                result_count = result_count,
135                latency_ms = latency.as_millis() as u64,
136                indexes_searched = self.index_count,
137                "query completed"
138            );
139        }
140    }
141
142    /// Get elapsed time.
143    #[must_use]
144    pub fn elapsed(&self) -> Duration {
145        self.start.elapsed()
146    }
147}
148
149/// Create a span for index operations.
150#[allow(dead_code)]
151pub fn index_span(operation: &str, index_name: &str) -> Span {
152    info_span!(
153        "ragpp.index",
154        operation = operation,
155        index = index_name,
156    )
157}
158
159/// Create a span for store operations.
160#[allow(dead_code)]
161pub fn store_span(operation: &str) -> Span {
162    info_span!(
163        "ragpp.store",
164        operation = operation,
165    )
166}
167
168/// Create a span for WAL operations.
169#[allow(dead_code)]
170pub fn wal_span(operation: &str) -> Span {
171    info_span!(
172        "ragpp.wal",
173        operation = operation,
174    )
175}
176
177/// Log structured event.
178#[macro_export]
179macro_rules! ragpp_event {
180    ($level:expr, $($field:tt)*) => {
181        tracing::event!($level, $($field)*);
182    };
183}
184
185#[cfg(test)]
186mod tests {
187    use super::*;
188
189    #[test]
190    fn test_span_context() {
191        let ctx = SpanContext::new()
192            .with_trace_id("abc123")
193            .with_baggage("user_id", "42");
194
195        assert_eq!(ctx.trace_id, Some("abc123".to_string()));
196        assert_eq!(ctx.baggage.len(), 1);
197    }
198
199    #[test]
200    fn test_query_span() {
201        let mut span = QuerySpan::start(10, false, None);
202
203        span.record_index_search("test", 100, Duration::from_millis(5));
204        span.record_rerank(100, 10, Duration::from_millis(2));
205
206        assert!(span.elapsed() >= Duration::from_millis(0));
207        span.finish(10, None);
208    }
209
210    #[test]
211    fn test_query_span_with_error() {
212        let span = QuerySpan::start(10, false, None);
213        span.finish(0, Some("timeout"));
214    }
215}