rustkernel_core/observability/
tracing.rs

1//! Distributed Tracing
2//!
3//! OpenTelemetry-compatible distributed tracing for kernel execution.
4//!
5//! # Features
6//!
7//! - Span creation for kernel execution
8//! - Trace context propagation in K2K messages
9//! - OTLP export to Jaeger, Zipkin, etc.
10//!
11//! # Example
12//!
13//! ```rust,ignore
14//! use rustkernel_core::observability::tracing::{KernelSpan, TracingConfig};
15//!
16//! let config = TracingConfig::otlp("http://jaeger:4317");
17//! config.init().await?;
18//!
19//! let span = KernelSpan::start("graph/pagerank", "execute");
20//! // ... kernel execution ...
21//! span.end();
22//! ```
23
24use serde::{Deserialize, Serialize};
25use std::time::{Duration, Instant};
26
27/// Tracing configuration
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct TracingConfig {
30    /// Enable tracing
31    pub enabled: bool,
32    /// OTLP endpoint URL
33    pub otlp_endpoint: Option<String>,
34    /// Sampling rate (0.0 - 1.0)
35    pub sampling_rate: f64,
36    /// Service name
37    pub service_name: String,
38    /// Include span events
39    pub include_events: bool,
40    /// Max attributes per span
41    pub max_attributes: u32,
42    /// Batch export settings
43    pub batch_size: usize,
44    /// Export timeout
45    pub export_timeout: Duration,
46}
47
48impl Default for TracingConfig {
49    fn default() -> Self {
50        Self {
51            enabled: true,
52            otlp_endpoint: None,
53            sampling_rate: 1.0, // 100% sampling in dev
54            service_name: "rustkernels".to_string(),
55            include_events: true,
56            max_attributes: 128,
57            batch_size: 512,
58            export_timeout: Duration::from_secs(30),
59        }
60    }
61}
62
63impl TracingConfig {
64    /// Create config for OTLP export
65    pub fn otlp(endpoint: impl Into<String>) -> Self {
66        Self {
67            otlp_endpoint: Some(endpoint.into()),
68            ..Default::default()
69        }
70    }
71
72    /// Set sampling rate
73    pub fn with_sampling(mut self, rate: f64) -> Self {
74        self.sampling_rate = rate.clamp(0.0, 1.0);
75        self
76    }
77
78    /// Initialize tracing
79    #[cfg(feature = "otlp")]
80    pub async fn init(&self) -> crate::error::Result<()> {
81        use opentelemetry_otlp::WithExportConfig;
82        use opentelemetry_sdk::trace::SdkTracerProvider;
83
84        if !self.enabled {
85            return Ok(());
86        }
87
88        if let Some(ref endpoint) = self.otlp_endpoint {
89            let exporter = opentelemetry_otlp::SpanExporter::builder()
90                .with_tonic()
91                .with_endpoint(endpoint)
92                .with_timeout(self.export_timeout)
93                .build()
94                .map_err(|e| crate::error::KernelError::ConfigError(e.to_string()))?;
95
96            let provider = SdkTracerProvider::builder()
97                .with_batch_exporter(exporter)
98                .build();
99
100            opentelemetry::global::set_tracer_provider(provider);
101        }
102
103        Ok(())
104    }
105
106    /// No-op init when OTLP feature is disabled
107    #[cfg(not(feature = "otlp"))]
108    pub async fn init(&self) -> crate::error::Result<()> {
109        Ok(())
110    }
111}
112
113/// Span context for trace propagation
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct SpanContext {
116    /// Trace ID (128-bit hex)
117    pub trace_id: String,
118    /// Span ID (64-bit hex)
119    pub span_id: String,
120    /// Trace flags
121    pub trace_flags: u8,
122    /// Trace state
123    pub trace_state: Option<String>,
124}
125
126impl SpanContext {
127    /// Create a new span context
128    pub fn new(trace_id: impl Into<String>, span_id: impl Into<String>) -> Self {
129        Self {
130            trace_id: trace_id.into(),
131            span_id: span_id.into(),
132            trace_flags: 0x01, // Sampled
133            trace_state: None,
134        }
135    }
136
137    /// Generate a new trace ID
138    pub fn generate_trace_id() -> String {
139        format!("{:032x}", rand::random::<u128>())
140    }
141
142    /// Generate a new span ID
143    pub fn generate_span_id() -> String {
144        format!("{:016x}", rand::random::<u64>())
145    }
146
147    /// Create a new root span context
148    pub fn new_root() -> Self {
149        Self::new(Self::generate_trace_id(), Self::generate_span_id())
150    }
151
152    /// Create a child span context
153    pub fn new_child(&self) -> Self {
154        Self {
155            trace_id: self.trace_id.clone(),
156            span_id: Self::generate_span_id(),
157            trace_flags: self.trace_flags,
158            trace_state: self.trace_state.clone(),
159        }
160    }
161
162    /// Convert to W3C trace context header value
163    pub fn to_traceparent(&self) -> String {
164        format!(
165            "00-{}-{}-{:02x}",
166            self.trace_id, self.span_id, self.trace_flags
167        )
168    }
169
170    /// Parse from W3C trace context header
171    pub fn from_traceparent(header: &str) -> Option<Self> {
172        let parts: Vec<&str> = header.split('-').collect();
173        if parts.len() != 4 {
174            return None;
175        }
176
177        Some(Self {
178            trace_id: parts[1].to_string(),
179            span_id: parts[2].to_string(),
180            trace_flags: u8::from_str_radix(parts[3], 16).ok()?,
181            trace_state: None,
182        })
183    }
184}
185
186/// A kernel execution span
187pub struct KernelSpan {
188    /// Kernel ID
189    pub kernel_id: String,
190    /// Operation name
191    pub operation: String,
192    /// Span context
193    pub context: SpanContext,
194    /// Start time
195    pub start: Instant,
196    /// Attributes
197    pub attributes: std::collections::HashMap<String, String>,
198    /// Events
199    pub events: Vec<SpanEvent>,
200}
201
202impl KernelSpan {
203    /// Start a new span
204    pub fn start(kernel_id: impl Into<String>, operation: impl Into<String>) -> Self {
205        Self {
206            kernel_id: kernel_id.into(),
207            operation: operation.into(),
208            context: SpanContext::new_root(),
209            start: Instant::now(),
210            attributes: std::collections::HashMap::new(),
211            events: Vec::new(),
212        }
213    }
214
215    /// Start a child span
216    pub fn start_child(
217        parent: &SpanContext,
218        kernel_id: impl Into<String>,
219        operation: impl Into<String>,
220    ) -> Self {
221        Self {
222            kernel_id: kernel_id.into(),
223            operation: operation.into(),
224            context: parent.new_child(),
225            start: Instant::now(),
226            attributes: std::collections::HashMap::new(),
227            events: Vec::new(),
228        }
229    }
230
231    /// Add an attribute
232    pub fn set_attribute(&mut self, key: impl Into<String>, value: impl Into<String>) {
233        self.attributes.insert(key.into(), value.into());
234    }
235
236    /// Add an event
237    pub fn add_event(&mut self, name: impl Into<String>) {
238        self.events.push(SpanEvent {
239            name: name.into(),
240            timestamp: Instant::now(),
241            attributes: std::collections::HashMap::new(),
242        });
243    }
244
245    /// Add an event with attributes
246    pub fn add_event_with_attributes(
247        &mut self,
248        name: impl Into<String>,
249        attributes: std::collections::HashMap<String, String>,
250    ) {
251        self.events.push(SpanEvent {
252            name: name.into(),
253            timestamp: Instant::now(),
254            attributes,
255        });
256    }
257
258    /// Record an error
259    pub fn record_error(&mut self, error: &dyn std::error::Error) {
260        self.set_attribute("error", "true");
261        self.set_attribute("error.message", error.to_string());
262        self.add_event("exception");
263    }
264
265    /// End the span
266    pub fn end(self) -> Duration {
267        let duration = self.start.elapsed();
268
269        #[cfg(feature = "otlp")]
270        {
271            use tracing::info_span;
272            // Record span to tracing
273            let span = info_span!(
274                "kernel_execution",
275                kernel_id = %self.kernel_id,
276                operation = %self.operation,
277                trace_id = %self.context.trace_id,
278                span_id = %self.context.span_id,
279                duration_us = duration.as_micros() as u64,
280            );
281            span.in_scope(|| {
282                for (key, value) in &self.attributes {
283                    tracing::info!(key = %key, value = %value, "span attribute");
284                }
285            });
286        }
287
288        duration
289    }
290
291    /// Get duration so far
292    pub fn elapsed(&self) -> Duration {
293        self.start.elapsed()
294    }
295}
296
297/// A span event
298#[derive(Debug, Clone)]
299pub struct SpanEvent {
300    /// Event name
301    pub name: String,
302    /// Event timestamp
303    pub timestamp: Instant,
304    /// Event attributes
305    pub attributes: std::collections::HashMap<String, String>,
306}
307
308#[cfg(test)]
309mod tests {
310    use super::*;
311
312    #[test]
313    fn test_span_context() {
314        let ctx = SpanContext::new_root();
315        assert_eq!(ctx.trace_id.len(), 32);
316        assert_eq!(ctx.span_id.len(), 16);
317
318        let child = ctx.new_child();
319        assert_eq!(child.trace_id, ctx.trace_id);
320        assert_ne!(child.span_id, ctx.span_id);
321    }
322
323    #[test]
324    fn test_traceparent() {
325        let ctx = SpanContext::new("0af7651916cd43dd8448eb211c80319c", "b7ad6b7169203331");
326        let header = ctx.to_traceparent();
327        assert!(header.starts_with("00-"));
328
329        let parsed = SpanContext::from_traceparent(&header).unwrap();
330        assert_eq!(parsed.trace_id, ctx.trace_id);
331        assert_eq!(parsed.span_id, ctx.span_id);
332    }
333
334    #[test]
335    fn test_kernel_span() {
336        let mut span = KernelSpan::start("graph/pagerank", "execute");
337        span.set_attribute("input_size", "1000");
338        span.add_event("started");
339
340        std::thread::sleep(std::time::Duration::from_millis(10));
341
342        let duration = span.end();
343        assert!(duration >= std::time::Duration::from_millis(10));
344    }
345
346    #[test]
347    fn test_tracing_config() {
348        let config = TracingConfig::otlp("http://jaeger:4317").with_sampling(0.5);
349
350        assert_eq!(config.otlp_endpoint, Some("http://jaeger:4317".to_string()));
351        assert_eq!(config.sampling_rate, 0.5);
352    }
353}