mecha10_core/
tracing_otel.rs

1//! Distributed Tracing with OpenTelemetry
2//!
3//! Traces message flows across nodes for debugging, performance profiling, and understanding
4//! distributed system behavior.
5//!
6//! # Features
7//!
8//! - OpenTelemetry integration
9//! - Automatic trace context propagation
10//! - Per-message span tracking
11//! - Jaeger/Zipkin export
12//! - Low overhead
13//! - Sampling support
14//!
15//! # Example
16//!
17//! ```rust
18//! use mecha10::prelude::*;
19//!
20//! # async fn example(ctx: &Context) -> Result<()> {
21//! // Initialize tracing
22//! let tracer = TracingConfig::default()
23//!     .with_service_name("my-robot")
24//!     .with_jaeger("http://localhost:14268/api/traces")
25//!     .init()?;
26//!
27//! // Publish with automatic span creation
28//! ctx.publish_traced("/camera/rgb", &image).await?;
29//!
30//! // Subscribe with automatic span continuation
31//! let mut images = ctx.subscribe_traced("/camera/rgb").await?;
32//! while let Some((image, span)) = images.recv().await {
33//!     span.in_scope(|| {
34//!         // Process within span
35//!         process_image(&image);
36//!     });
37//! }
38//! # Ok(())
39//! # }
40//! ```
41
42use crate::error::{Mecha10Error, Result};
43use serde::de::DeserializeOwned;
44use serde::{Deserialize, Serialize};
45use std::collections::HashMap;
46use std::sync::Arc;
47use tracing::{debug, info, span, Level, Span};
48use tracing_subscriber::layer::SubscriberExt;
49use tracing_subscriber::util::SubscriberInitExt;
50
51// ============================================================================
52// Trace Context
53// ============================================================================
54
55/// Trace context for propagating spans across message boundaries
56///
57/// This is embedded in message metadata to enable distributed tracing.
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct TraceContext {
60    /// Trace ID (unique per request/flow)
61    pub trace_id: String,
62    /// Span ID (unique per operation)
63    pub span_id: String,
64    /// Parent span ID (for hierarchy)
65    pub parent_span_id: Option<String>,
66    /// Sampling decision (whether to record this trace)
67    pub sampled: bool,
68    /// Baggage (key-value metadata)
69    #[serde(default)]
70    pub baggage: HashMap<String, String>,
71}
72
73impl TraceContext {
74    /// Create a new root trace context
75    pub fn new() -> Self {
76        Self {
77            trace_id: generate_trace_id(),
78            span_id: generate_span_id(),
79            parent_span_id: None,
80            sampled: true,
81            baggage: HashMap::new(),
82        }
83    }
84
85    /// Create a child trace context
86    pub fn child(&self) -> Self {
87        Self {
88            trace_id: self.trace_id.clone(),
89            span_id: generate_span_id(),
90            parent_span_id: Some(self.span_id.clone()),
91            sampled: self.sampled,
92            baggage: self.baggage.clone(),
93        }
94    }
95
96    /// Add baggage item
97    pub fn with_baggage(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
98        self.baggage.insert(key.into(), value.into());
99        self
100    }
101
102    /// Convert to W3C Trace Context format
103    pub fn to_w3c_traceparent(&self) -> String {
104        let flags = if self.sampled { "01" } else { "00" };
105        format!("00-{}-{}-{}", self.trace_id, self.span_id, flags)
106    }
107
108    /// Parse from W3C Trace Context format
109    pub fn from_w3c_traceparent(traceparent: &str) -> Option<Self> {
110        let parts: Vec<&str> = traceparent.split('-').collect();
111        if parts.len() != 4 || parts[0] != "00" {
112            return None;
113        }
114
115        Some(Self {
116            trace_id: parts[1].to_string(),
117            span_id: parts[2].to_string(),
118            parent_span_id: None,
119            sampled: parts[3] == "01",
120            baggage: HashMap::new(),
121        })
122    }
123}
124
125impl Default for TraceContext {
126    fn default() -> Self {
127        Self::new()
128    }
129}
130
131// ============================================================================
132// Message Envelope with Trace Context
133// ============================================================================
134
135/// Message envelope that includes trace context
136#[derive(Debug, Clone, Serialize, Deserialize)]
137pub struct TracedMessage<T> {
138    /// The actual message payload
139    pub payload: T,
140    /// Trace context for distributed tracing
141    pub trace_context: TraceContext,
142    /// Timestamp when message was sent
143    pub timestamp_us: u64,
144    /// Source node ID
145    pub source_node: String,
146}
147
148impl<T> TracedMessage<T> {
149    /// Create a new traced message
150    pub fn new(payload: T, source_node: impl Into<String>) -> Self {
151        Self {
152            payload,
153            trace_context: TraceContext::new(),
154            timestamp_us: crate::prelude::now_micros(),
155            source_node: source_node.into(),
156        }
157    }
158
159    /// Create from existing trace context
160    pub fn with_context(payload: T, trace_context: TraceContext, source_node: impl Into<String>) -> Self {
161        Self {
162            payload,
163            trace_context,
164            timestamp_us: crate::prelude::now_micros(),
165            source_node: source_node.into(),
166        }
167    }
168
169    /// Create a span for this message
170    pub fn create_span(&self, operation: &str) -> Span {
171        span!(
172            Level::INFO,
173            "message",
174            operation = operation,
175            trace_id = %self.trace_context.trace_id,
176            span_id = %self.trace_context.span_id,
177            source = %self.source_node,
178        )
179    }
180}
181
182// ============================================================================
183// Tracing Configuration
184// ============================================================================
185
186/// Tracing system configuration
187#[derive(Debug, Clone)]
188pub struct TracingConfig {
189    /// Service name for this instance
190    pub service_name: String,
191    /// Jaeger collector endpoint
192    pub jaeger_endpoint: Option<String>,
193    /// Zipkin collector endpoint
194    pub zipkin_endpoint: Option<String>,
195    /// Sampling rate (0.0 to 1.0)
196    pub sampling_rate: f64,
197    /// Enable console output
198    pub console_output: bool,
199}
200
201impl Default for TracingConfig {
202    fn default() -> Self {
203        Self {
204            service_name: "mecha10".to_string(),
205            jaeger_endpoint: None,
206            zipkin_endpoint: None,
207            sampling_rate: 1.0,
208            console_output: true,
209        }
210    }
211}
212
213impl TracingConfig {
214    /// Set service name
215    pub fn with_service_name(mut self, name: impl Into<String>) -> Self {
216        self.service_name = name.into();
217        self
218    }
219
220    /// Set Jaeger endpoint
221    pub fn with_jaeger(mut self, endpoint: impl Into<String>) -> Self {
222        self.jaeger_endpoint = Some(endpoint.into());
223        self
224    }
225
226    /// Set Zipkin endpoint
227    pub fn with_zipkin(mut self, endpoint: impl Into<String>) -> Self {
228        self.zipkin_endpoint = Some(endpoint.into());
229        self
230    }
231
232    /// Set sampling rate
233    pub fn with_sampling_rate(mut self, rate: f64) -> Self {
234        self.sampling_rate = rate.clamp(0.0, 1.0);
235        self
236    }
237
238    /// Disable console output
239    pub fn without_console(mut self) -> Self {
240        self.console_output = false;
241        self
242    }
243
244    /// Initialize the tracing system
245    ///
246    /// This sets up the global subscriber with the configured exporters.
247    pub fn init(self) -> Result<TracingHandle> {
248        info!("Initializing distributed tracing: {}", self.service_name);
249
250        // Build subscriber layers
251        let fmt_layer = if self.console_output {
252            Some(tracing_subscriber::fmt::layer())
253        } else {
254            None
255        };
256
257        // Initialize subscriber
258        tracing_subscriber::registry()
259            .with(fmt_layer)
260            .try_init()
261            .map_err(|e| Mecha10Error::Runtime(format!("Failed to initialize tracing: {}", e)))?;
262
263        info!(
264            "Tracing initialized - Service: {}, Jaeger: {:?}, Sampling: {}",
265            self.service_name, self.jaeger_endpoint, self.sampling_rate
266        );
267
268        let sampling_rate = self.sampling_rate;
269        Ok(TracingHandle {
270            config: self,
271            sampler: Arc::new(ProbabilitySampler::new(sampling_rate)),
272        })
273    }
274}
275
276// ============================================================================
277// Tracing Handle
278// ============================================================================
279
280/// Handle to the tracing system
281pub struct TracingHandle {
282    config: TracingConfig,
283    sampler: Arc<ProbabilitySampler>,
284}
285
286impl TracingHandle {
287    /// Check if a trace should be sampled
288    pub fn should_sample(&self) -> bool {
289        self.sampler.should_sample()
290    }
291
292    /// Create a root span for an operation
293    pub fn start_span(&self, name: &str) -> Span {
294        let trace_ctx = TraceContext::new();
295        span!(
296            Level::INFO,
297            "operation",
298            name = name,
299            trace_id = %trace_ctx.trace_id,
300            span_id = %trace_ctx.span_id,
301        )
302    }
303
304    /// Get the service name
305    pub fn service_name(&self) -> &str {
306        &self.config.service_name
307    }
308}
309
310// ============================================================================
311// Sampling
312// ============================================================================
313
314/// Probability-based sampler
315struct ProbabilitySampler {
316    rate: f64,
317}
318
319impl ProbabilitySampler {
320    fn new(rate: f64) -> Self {
321        Self {
322            rate: rate.clamp(0.0, 1.0),
323        }
324    }
325
326    fn should_sample(&self) -> bool {
327        if self.rate >= 1.0 {
328            true
329        } else if self.rate <= 0.0 {
330            false
331        } else {
332            use std::collections::hash_map::RandomState;
333            use std::hash::{BuildHasher, Hash, Hasher};
334
335            let mut hasher = RandomState::new().build_hasher();
336            std::time::SystemTime::now().hash(&mut hasher);
337            let hash = hasher.finish();
338            (hash as f64 / u64::MAX as f64) < self.rate
339        }
340    }
341}
342
343// ============================================================================
344// Utilities
345// ============================================================================
346
347/// Generate a trace ID (16 bytes hex)
348fn generate_trace_id() -> String {
349    use std::time::SystemTime;
350    let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap();
351    format!("{:016x}{:016x}", now.as_nanos(), rand_u64())
352}
353
354/// Generate a span ID (8 bytes hex)
355fn generate_span_id() -> String {
356    format!("{:016x}", rand_u64())
357}
358
359/// Simple random u64 generator
360fn rand_u64() -> u64 {
361    use std::collections::hash_map::RandomState;
362    use std::hash::{BuildHasher, Hash, Hasher};
363
364    let mut hasher = RandomState::new().build_hasher();
365    std::time::SystemTime::now().hash(&mut hasher);
366    hasher.finish()
367}
368
369// ============================================================================
370// Context Extensions
371// ============================================================================
372
373/// Extension trait for Context to add tracing capabilities
374pub trait TracingExt {
375    /// Publish a message with automatic trace context
376    ///
377    /// Creates a new trace context or continues an existing one,
378    /// embedding it in the message for distributed tracing.
379    fn publish_traced<T: crate::messages::Message + Serialize + Clone + 'static>(
380        &self,
381        topic: &str,
382        message: &T,
383    ) -> impl std::future::Future<Output = Result<()>> + Send;
384
385    /// Publish with explicit trace context (for continuing traces)
386    fn publish_with_context<T: crate::messages::Message + Serialize + Clone + 'static>(
387        &self,
388        topic: &str,
389        message: &T,
390        trace_context: TraceContext,
391    ) -> impl std::future::Future<Output = Result<()>> + Send;
392
393    /// Subscribe with trace context propagation
394    ///
395    /// Messages received will include trace context for span continuation.
396    fn subscribe_traced<T: crate::messages::Message + DeserializeOwned + Send + 'static>(
397        &self,
398        topic: &str,
399    ) -> impl std::future::Future<Output = Result<crate::context::Receiver<TracedMessage<T>>>> + Send;
400
401    /// Get current trace context from span (if any)
402    fn current_trace_context(&self) -> Option<TraceContext>;
403
404    /// Extract trace context from message metadata
405    fn extract_trace_context(&self, metadata: &HashMap<String, String>) -> Option<TraceContext>;
406
407    /// Inject trace context into message metadata
408    fn inject_trace_context(&self, metadata: &mut HashMap<String, String>, ctx: &TraceContext);
409}
410
411impl TracingExt for crate::context::Context {
412    async fn publish_traced<T: crate::messages::Message + Serialize + Clone + 'static>(
413        &self,
414        topic: &str,
415        message: &T,
416    ) -> Result<()> {
417        // Get or create trace context
418        let trace_context = self.current_trace_context().unwrap_or_default();
419        self.publish_with_context(topic, message, trace_context).await
420    }
421
422    async fn publish_with_context<T: crate::messages::Message + Serialize + Clone + 'static>(
423        &self,
424        topic: &str,
425        message: &T,
426        trace_context: TraceContext,
427    ) -> Result<()> {
428        // Create traced message envelope
429        let traced_msg = TracedMessage::with_context(
430            message.clone(),
431            trace_context.child(), // Create child span for this publish
432            self.node_id(),
433        );
434
435        // Publish the traced message
436        self.publish_raw(topic, &traced_msg).await
437    }
438
439    async fn subscribe_traced<T: crate::messages::Message + DeserializeOwned + Send + 'static>(
440        &self,
441        topic: &str,
442    ) -> Result<crate::context::Receiver<TracedMessage<T>>> {
443        // Subscribe to traced messages
444        self.subscribe_raw::<TracedMessage<T>>(topic).await
445    }
446
447    fn current_trace_context(&self) -> Option<TraceContext> {
448        // Try to extract from current tracing span
449        use tracing::field::Visit;
450
451        struct TraceIdVisitor {
452            trace_id: Option<String>,
453            span_id: Option<String>,
454        }
455
456        impl Visit for TraceIdVisitor {
457            fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
458                match field.name() {
459                    "trace_id" => self.trace_id = Some(value.to_string()),
460                    "span_id" => self.span_id = Some(value.to_string()),
461                    _ => {}
462                }
463            }
464
465            fn record_debug(&mut self, _field: &tracing::field::Field, _value: &dyn std::fmt::Debug) {}
466        }
467
468        // Extract from current span if available
469        let _visitor = TraceIdVisitor {
470            trace_id: None,
471            span_id: None,
472        };
473
474        // Get current span and try to extract IDs
475        let span = Span::current();
476        span.record("trace_id", &tracing::field::Empty);
477        // Note: In production, this would use proper OpenTelemetry context extraction
478
479        // For now, create a new context if none exists
480        None
481    }
482
483    fn extract_trace_context(&self, metadata: &HashMap<String, String>) -> Option<TraceContext> {
484        // Check for W3C Trace Context header
485        if let Some(traceparent) = metadata.get("traceparent") {
486            return TraceContext::from_w3c_traceparent(traceparent);
487        }
488
489        // Check for direct trace context JSON
490        if let Some(trace_ctx_json) = metadata.get("trace_context") {
491            return serde_json::from_str(trace_ctx_json).ok();
492        }
493
494        None
495    }
496
497    fn inject_trace_context(&self, metadata: &mut HashMap<String, String>, ctx: &TraceContext) {
498        // Inject W3C Trace Context
499        metadata.insert("traceparent".to_string(), ctx.to_w3c_traceparent());
500
501        // Also inject as JSON for full context
502        if let Ok(json) = serde_json::to_string(ctx) {
503            metadata.insert("trace_context".to_string(), json);
504        }
505
506        // Inject baggage
507        for (key, value) in &ctx.baggage {
508            metadata.insert(format!("baggage_{}", key), value.clone());
509        }
510    }
511}
512
513// ============================================================================
514// Span Builder
515// ============================================================================
516
517/// Builder for creating traced operations
518pub struct SpanBuilder {
519    name: String,
520    trace_context: Option<TraceContext>,
521    attributes: HashMap<String, String>,
522}
523
524impl SpanBuilder {
525    /// Create a new span builder
526    pub fn new(name: impl Into<String>) -> Self {
527        Self {
528            name: name.into(),
529            trace_context: None,
530            attributes: HashMap::new(),
531        }
532    }
533
534    /// Set trace context (for child spans)
535    pub fn with_context(mut self, ctx: TraceContext) -> Self {
536        self.trace_context = Some(ctx);
537        self
538    }
539
540    /// Add attribute
541    pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
542        self.attributes.insert(key.into(), value.into());
543        self
544    }
545
546    /// Start the span
547    pub fn start(self) -> (Span, TraceContext) {
548        let trace_ctx = self.trace_context.unwrap_or_default();
549
550        let span = span!(
551            Level::INFO,
552            "span",
553            name = %self.name,
554            trace_id = %trace_ctx.trace_id,
555            span_id = %trace_ctx.span_id,
556        );
557
558        (span, trace_ctx)
559    }
560}
561
562// ============================================================================
563// Trace Exporter (Stub for future OpenTelemetry integration)
564// ============================================================================
565
566/// Trace exporter interface
567pub trait TraceExporter: Send + Sync {
568    /// Export a span
569    fn export_span(&self, span_data: &SpanData);
570
571    /// Flush pending spans
572    fn flush(&self);
573}
574
575/// Span data for export
576#[derive(Debug, Clone)]
577pub struct SpanData {
578    pub trace_id: String,
579    pub span_id: String,
580    pub parent_span_id: Option<String>,
581    pub name: String,
582    pub start_time: u64,
583    pub end_time: u64,
584    pub attributes: HashMap<String, String>,
585}
586
587/// Jaeger exporter (stub - would use real OpenTelemetry in production)
588pub struct JaegerExporter {
589    #[allow(dead_code)]
590    endpoint: String,
591    #[allow(dead_code)]
592    service_name: String,
593}
594
595impl JaegerExporter {
596    /// Create a new Jaeger exporter
597    pub fn new(endpoint: impl Into<String>, service_name: impl Into<String>) -> Self {
598        Self {
599            endpoint: endpoint.into(),
600            service_name: service_name.into(),
601        }
602    }
603}
604
605impl TraceExporter for JaegerExporter {
606    fn export_span(&self, span_data: &SpanData) {
607        // In production, this would send to Jaeger via HTTP
608        debug!(
609            "Exporting span to Jaeger {}: {} (trace: {})",
610            self.endpoint, span_data.name, span_data.trace_id
611        );
612    }
613
614    fn flush(&self) {
615        debug!("Flushing Jaeger exporter");
616    }
617}
618
619/// Zipkin exporter (stub)
620pub struct ZipkinExporter {
621    #[allow(dead_code)]
622    endpoint: String,
623    #[allow(dead_code)]
624    service_name: String,
625}
626
627impl ZipkinExporter {
628    /// Create a new Zipkin exporter
629    pub fn new(endpoint: impl Into<String>, service_name: impl Into<String>) -> Self {
630        Self {
631            endpoint: endpoint.into(),
632            service_name: service_name.into(),
633        }
634    }
635}
636
637impl TraceExporter for ZipkinExporter {
638    fn export_span(&self, span_data: &SpanData) {
639        debug!(
640            "Exporting span to Zipkin {}: {} (trace: {})",
641            self.endpoint, span_data.name, span_data.trace_id
642        );
643    }
644
645    fn flush(&self) {
646        debug!("Flushing Zipkin exporter");
647    }
648}