Skip to main content

elara_runtime/observability/
tracing.rs

1//! Distributed tracing system with OpenTelemetry support.
2//!
3//! This module provides distributed tracing capabilities for the ELARA Protocol,
4//! enabling end-to-end request tracing across nodes. It supports multiple exporter
5//! backends (Jaeger, Zipkin, OTLP) and configurable sampling rates.
6//!
7//! # Examples
8//!
9//! ```no_run
10//! use elara_runtime::observability::tracing::{TracingConfig, TracingExporter, init_tracing};
11//!
12//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
13//! let config = TracingConfig {
14//!     service_name: "elara-node".to_string(),
15//!     exporter: TracingExporter::Jaeger {
16//!         endpoint: "http://localhost:14268/api/traces".to_string(),
17//!     },
18//!     sampling_rate: 1.0,
19//!     resource_attributes: vec![
20//!         ("environment".to_string(), "production".to_string()),
21//!     ],
22//! };
23//!
24//! let handle = init_tracing(config).await?;
25//!
26//! // Use tracing throughout your application
27//! tracing::info!("Application started");
28//!
29//! // Graceful shutdown
30//! handle.shutdown().await?;
31//! # Ok(())
32//! # }
33//! ```
34
35use opentelemetry::global;
36use opentelemetry::trace::TracerProvider as _;
37use opentelemetry::KeyValue;
38use opentelemetry_sdk::trace::{RandomIdGenerator, Sampler, TracerProvider};
39use opentelemetry_sdk::Resource;
40use std::sync::atomic::{AtomicBool, Ordering};
41use std::sync::Arc;
42use thiserror::Error;
43use tracing::Span;
44use tracing_subscriber::layer::SubscriberExt;
45use tracing_subscriber::Registry;
46
47/// Configuration for the distributed tracing system.
48///
49/// This struct defines all parameters needed to initialize OpenTelemetry tracing,
50/// including the service name, exporter backend, sampling rate, and resource attributes.
51#[derive(Debug, Clone)]
52pub struct TracingConfig {
53    /// Service name to identify this node in traces
54    pub service_name: String,
55
56    /// Exporter backend configuration
57    pub exporter: TracingExporter,
58
59    /// Sampling rate (0.0 to 1.0)
60    /// - 0.0: No traces are sampled
61    /// - 1.0: All traces are sampled
62    /// - 0.1: 10% of traces are sampled
63    pub sampling_rate: f64,
64
65    /// Additional resource attributes (key-value pairs)
66    /// These are attached to all spans and can be used for filtering/grouping
67    pub resource_attributes: Vec<(String, String)>,
68}
69
70/// Exporter backend configuration.
71///
72/// Supports multiple OpenTelemetry-compatible backends for trace export.
73#[derive(Debug, Clone)]
74pub enum TracingExporter {
75    /// Jaeger exporter (good for development/testing)
76    /// Endpoint format: "http://localhost:14268/api/traces"
77    Jaeger { endpoint: String },
78
79    /// Zipkin exporter (for compatibility with Zipkin infrastructure)
80    /// Endpoint format: "http://localhost:9411/api/v2/spans"
81    Zipkin { endpoint: String },
82
83    /// OTLP (OpenTelemetry Protocol) exporter (production standard)
84    /// Endpoint format: "http://localhost:4317" (gRPC)
85    Otlp { endpoint: String },
86
87    /// No tracing (disabled)
88    None,
89}
90
91/// Handle for managing the tracing system lifecycle.
92///
93/// This handle allows for graceful shutdown of the tracing system,
94/// ensuring all pending spans are flushed before termination.
95#[derive(Clone)]
96pub struct TracingHandle {
97    initialized: Arc<AtomicBool>,
98}
99
100impl TracingHandle {
101    /// Shutdown the tracing system gracefully.
102    ///
103    /// This flushes all pending spans and shuts down the exporter.
104    /// After calling this, no more traces will be exported.
105    pub async fn shutdown(self) -> Result<(), TracingError> {
106        if self.initialized.load(Ordering::SeqCst) {
107            global::shutdown_tracer_provider();
108            self.initialized.store(false, Ordering::SeqCst);
109        }
110        Ok(())
111    }
112}
113
114/// Errors that can occur during tracing initialization or operation.
115#[derive(Debug, Error)]
116pub enum TracingError {
117    /// Tracing system has already been initialized
118    #[error("Tracing system already initialized")]
119    AlreadyInitialized,
120
121    /// Invalid sampling rate (must be between 0.0 and 1.0)
122    #[error("Invalid sampling rate: {0} (must be between 0.0 and 1.0)")]
123    InvalidSamplingRate(f64),
124
125    /// Failed to initialize the exporter
126    #[error("Failed to initialize exporter: {0}")]
127    ExporterInitialization(String),
128
129    /// Failed to set global tracer provider
130    #[error("Failed to set global tracer provider: {0}")]
131    GlobalTracerSetup(String),
132}
133
134/// Global flag to track if tracing has been initialized.
135/// This ensures idempotency - init_tracing can only be called once.
136static TRACING_INITIALIZED: AtomicBool = AtomicBool::new(false);
137
138/// Initialize the distributed tracing system.
139///
140/// This function sets up OpenTelemetry tracing with the specified configuration.
141/// It is idempotent - calling it multiple times will return an error after the first call.
142///
143/// # Arguments
144///
145/// * `config` - Tracing configuration including service name, exporter, and sampling rate
146///
147/// # Returns
148///
149/// * `Ok(TracingHandle)` - Handle for graceful shutdown
150/// * `Err(TracingError)` - If initialization fails or tracing is already initialized
151///
152/// # Examples
153///
154/// ```no_run
155/// use elara_runtime::observability::tracing::{TracingConfig, TracingExporter, init_tracing};
156///
157/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
158/// let config = TracingConfig {
159///     service_name: "elara-node".to_string(),
160///     exporter: TracingExporter::Otlp {
161///         endpoint: "http://localhost:4317".to_string(),
162///     },
163///     sampling_rate: 0.1, // Sample 10% of traces
164///     resource_attributes: vec![
165///         ("node.id".to_string(), "node-1".to_string()),
166///         ("environment".to_string(), "production".to_string()),
167///     ],
168/// };
169///
170/// let handle = init_tracing(config).await?;
171/// # Ok(())
172/// # }
173/// ```
174pub async fn init_tracing(config: TracingConfig) -> Result<TracingHandle, TracingError> {
175    // Check if already initialized (idempotency)
176    if TRACING_INITIALIZED.swap(true, Ordering::SeqCst) {
177        return Err(TracingError::AlreadyInitialized);
178    }
179
180    // Validate sampling rate
181    if config.sampling_rate < 0.0 || config.sampling_rate > 1.0 {
182        TRACING_INITIALIZED.store(false, Ordering::SeqCst);
183        return Err(TracingError::InvalidSamplingRate(config.sampling_rate));
184    }
185
186    // Handle disabled tracing
187    if matches!(config.exporter, TracingExporter::None) {
188        TRACING_INITIALIZED.store(false, Ordering::SeqCst);
189        return Ok(TracingHandle {
190            initialized: Arc::new(AtomicBool::new(false)),
191        });
192    }
193
194    // Build resource with service name and custom attributes
195    let mut resource_kvs = vec![KeyValue::new("service.name", config.service_name.clone())];
196    for (key, value) in config.resource_attributes {
197        resource_kvs.push(KeyValue::new(key, value));
198    }
199    let resource = Resource::new(resource_kvs);
200
201    // Configure sampler based on sampling rate
202    let sampler = if config.sampling_rate >= 1.0 {
203        Sampler::AlwaysOn
204    } else if config.sampling_rate <= 0.0 {
205        Sampler::AlwaysOff
206    } else {
207        Sampler::TraceIdRatioBased(config.sampling_rate)
208    };
209
210    // Initialize the appropriate exporter
211    let tracer_provider = match config.exporter {
212        TracingExporter::Jaeger { endpoint } => {
213            init_jaeger_exporter(&endpoint, resource, sampler).await?
214        }
215        TracingExporter::Zipkin { endpoint } => {
216            init_zipkin_exporter(&endpoint, resource, sampler).await?
217        }
218        TracingExporter::Otlp { endpoint } => {
219            init_otlp_exporter(&endpoint, resource, sampler).await?
220        }
221        TracingExporter::None => unreachable!(), // Already handled above
222    };
223
224    // Set as global tracer provider
225    global::set_tracer_provider(tracer_provider.clone());
226
227    // Create OpenTelemetry tracing layer using a concrete tracer from the provider
228    // Note: We use the provider's tracer() method instead of global::tracer()
229    // to get a concrete Tracer type that implements PreSampledTracer
230    let tracer = tracer_provider.tracer("elara-runtime");
231    let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer);
232
233    // Install the layer with the existing subscriber
234    // Note: This assumes a subscriber is already set up (e.g., by logging module)
235    // If not, we create a basic one
236    let subscriber = Registry::default().with(telemetry_layer);
237
238    tracing::subscriber::set_global_default(subscriber).map_err(|e| {
239        TRACING_INITIALIZED.store(false, Ordering::SeqCst);
240        TracingError::GlobalTracerSetup(e.to_string())
241    })?;
242
243    Ok(TracingHandle {
244        initialized: Arc::new(AtomicBool::new(true)),
245    })
246}
247
248/// Initialize Jaeger exporter.
249async fn init_jaeger_exporter(
250    endpoint: &str,
251    resource: Resource,
252    sampler: Sampler,
253) -> Result<TracerProvider, TracingError> {
254    let exporter = opentelemetry_jaeger::new_agent_pipeline()
255        .with_endpoint(endpoint)
256        .with_service_name("elara-node")
257        .build_async_agent_exporter(opentelemetry_sdk::runtime::Tokio)
258        .map_err(|e| TracingError::ExporterInitialization(e.to_string()))?;
259
260    let tracer_provider = TracerProvider::builder()
261        .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
262        .with_config(
263            opentelemetry_sdk::trace::Config::default()
264                .with_sampler(sampler)
265                .with_id_generator(RandomIdGenerator::default())
266                .with_resource(resource),
267        )
268        .build();
269
270    Ok(tracer_provider)
271}
272
273/// Initialize Zipkin exporter.
274async fn init_zipkin_exporter(
275    endpoint: &str,
276    resource: Resource,
277    sampler: Sampler,
278) -> Result<TracerProvider, TracingError> {
279    let exporter = opentelemetry_zipkin::new_pipeline()
280        .with_collector_endpoint(endpoint)
281        .init_exporter()
282        .map_err(|e| TracingError::ExporterInitialization(e.to_string()))?;
283
284    let tracer_provider = TracerProvider::builder()
285        .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
286        .with_config(
287            opentelemetry_sdk::trace::Config::default()
288                .with_sampler(sampler)
289                .with_id_generator(RandomIdGenerator::default())
290                .with_resource(resource),
291        )
292        .build();
293
294    Ok(tracer_provider)
295}
296
297/// Initialize OTLP exporter.
298async fn init_otlp_exporter(
299    endpoint: &str,
300    resource: Resource,
301    sampler: Sampler,
302) -> Result<TracerProvider, TracingError> {
303    use opentelemetry_otlp::WithExportConfig;
304
305    let exporter = opentelemetry_otlp::new_exporter()
306        .tonic()
307        .with_endpoint(endpoint)
308        .build_span_exporter()
309        .map_err(|e| TracingError::ExporterInitialization(e.to_string()))?;
310
311    let tracer_provider = TracerProvider::builder()
312        .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
313        .with_config(
314            opentelemetry_sdk::trace::Config::default()
315                .with_sampler(sampler)
316                .with_id_generator(RandomIdGenerator::default())
317                .with_resource(resource),
318        )
319        .build();
320
321    Ok(tracer_provider)
322}
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327
328    #[tokio::test]
329    async fn test_tracing_config_validation() {
330        // Test invalid sampling rate
331        let config = TracingConfig {
332            service_name: "test".to_string(),
333            exporter: TracingExporter::None,
334            sampling_rate: 1.5,
335            resource_attributes: vec![],
336        };
337
338        let result = init_tracing(config).await;
339        assert!(matches!(result, Err(TracingError::InvalidSamplingRate(_))));
340    }
341
342    #[tokio::test]
343    async fn test_disabled_tracing() {
344        let config = TracingConfig {
345            service_name: "test".to_string(),
346            exporter: TracingExporter::None,
347            sampling_rate: 1.0,
348            resource_attributes: vec![],
349        };
350
351        let result = init_tracing(config).await;
352        assert!(result.is_ok());
353    }
354}
355
356/// Span helper functions for common tracing patterns.
357///
358/// These functions create pre-configured spans for common operations in the ELARA Protocol,
359/// making it easier to maintain consistent tracing across the codebase.
360
361/// Create a span for message send operations.
362///
363/// # Arguments
364///
365/// * `node_id` - The ID of the node sending the message
366/// * `session_id` - Optional session ID
367/// * `message_count` - Number of messages being sent
368///
369/// # Examples
370///
371/// ```no_run
372/// use elara_runtime::observability::tracing::span_message_send;
373/// use elara_core::NodeId;
374///
375/// let span = span_message_send(NodeId(1), Some(42), 5);
376/// let _enter = span.enter();
377/// // ... send messages ...
378/// ```
379pub fn span_message_send(node_id: u64, session_id: Option<u64>, message_count: usize) -> Span {
380    tracing::span!(
381        tracing::Level::DEBUG,
382        "message_send",
383        node_id = node_id,
384        session_id = ?session_id,
385        message_count = message_count,
386    )
387}
388
389/// Create a span for message receive operations.
390///
391/// # Arguments
392///
393/// * `node_id` - The ID of the node receiving the message
394/// * `session_id` - Optional session ID
395/// * `message_count` - Number of messages being received
396///
397/// # Examples
398///
399/// ```no_run
400/// use elara_runtime::observability::tracing::span_message_receive;
401/// use elara_core::NodeId;
402///
403/// let span = span_message_receive(NodeId(1), Some(42), 3);
404/// let _enter = span.enter();
405/// // ... receive messages ...
406/// ```
407pub fn span_message_receive(node_id: u64, session_id: Option<u64>, message_count: usize) -> Span {
408    tracing::span!(
409        tracing::Level::DEBUG,
410        "message_receive",
411        node_id = node_id,
412        session_id = ?session_id,
413        message_count = message_count,
414    )
415}
416
417/// Create a span for state synchronization operations.
418///
419/// # Arguments
420///
421/// * `node_id` - The ID of the node performing state sync
422/// * `session_id` - Optional session ID
423/// * `event_count` - Number of events being processed
424///
425/// # Examples
426///
427/// ```no_run
428/// use elara_runtime::observability::tracing::span_state_sync;
429/// use elara_core::NodeId;
430///
431/// let span = span_state_sync(NodeId(1), Some(42), 10);
432/// let _enter = span.enter();
433/// // ... synchronize state ...
434/// ```
435pub fn span_state_sync(node_id: u64, session_id: Option<u64>, event_count: usize) -> Span {
436    tracing::span!(
437        tracing::Level::DEBUG,
438        "state_sync",
439        node_id = node_id,
440        session_id = ?session_id,
441        event_count = event_count,
442    )
443}
444
445/// Create a span for connection establishment operations.
446///
447/// # Arguments
448///
449/// * `node_id` - The ID of the node establishing the connection
450/// * `session_id` - The session ID being joined
451///
452/// # Examples
453///
454/// ```no_run
455/// use elara_runtime::observability::tracing::span_connection_establish;
456/// use elara_core::NodeId;
457///
458/// let span = span_connection_establish(NodeId(1), 42);
459/// let _enter = span.enter();
460/// // ... establish connection ...
461/// ```
462pub fn span_connection_establish(node_id: u64, session_id: u64) -> Span {
463    tracing::span!(
464        tracing::Level::INFO,
465        "connection_establish",
466        node_id = node_id,
467        session_id = session_id,
468    )
469}
470
471/// Create a span for connection teardown operations.
472///
473/// # Arguments
474///
475/// * `node_id` - The ID of the node tearing down the connection
476/// * `session_id` - Optional session ID being left
477///
478/// # Examples
479///
480/// ```no_run
481/// use elara_runtime::observability::tracing::span_connection_teardown;
482/// use elara_core::NodeId;
483///
484/// let span = span_connection_teardown(NodeId(1), Some(42));
485/// let _enter = span.enter();
486/// // ... teardown connection ...
487/// ```
488pub fn span_connection_teardown(node_id: u64, session_id: Option<u64>) -> Span {
489    tracing::span!(
490        tracing::Level::INFO,
491        "connection_teardown",
492        node_id = node_id,
493        session_id = ?session_id,
494    )
495}
496
497/// Create a span for node tick operations.
498///
499/// # Arguments
500///
501/// * `node_id` - The ID of the node performing the tick
502/// * `session_id` - Optional session ID
503///
504/// # Examples
505///
506/// ```no_run
507/// use elara_runtime::observability::tracing::span_node_tick;
508/// use elara_core::NodeId;
509///
510/// let span = span_node_tick(NodeId(1), Some(42));
511/// let _enter = span.enter();
512/// // ... perform tick ...
513/// ```
514pub fn span_node_tick(node_id: u64, session_id: Option<u64>) -> Span {
515    tracing::span!(
516        tracing::Level::INFO,
517        "node_tick",
518        node_id = node_id,
519        session_id = ?session_id,
520    )
521}
522
523/// Create a span for decryption operations.
524///
525/// # Arguments
526///
527/// * `node_id` - The ID of the node performing decryption
528/// * `packet_count` - Number of packets being decrypted
529///
530/// # Examples
531///
532/// ```no_run
533/// use elara_runtime::observability::tracing::span_decrypt;
534/// use elara_core::NodeId;
535///
536/// let span = span_decrypt(NodeId(1), 5);
537/// let _enter = span.enter();
538/// // ... decrypt packets ...
539/// ```
540pub fn span_decrypt(node_id: u64, packet_count: usize) -> Span {
541    tracing::span!(
542        tracing::Level::DEBUG,
543        "decrypt",
544        node_id = node_id,
545        packet_count = packet_count,
546    )
547}
548
549/// Create a span for event classification operations.
550///
551/// # Arguments
552///
553/// * `node_id` - The ID of the node classifying events
554/// * `packet_count` - Number of packets being classified
555///
556/// # Examples
557///
558/// ```no_run
559/// use elara_runtime::observability::tracing::span_classify_events;
560/// use elara_core::NodeId;
561///
562/// let span = span_classify_events(NodeId(1), 3);
563/// let _enter = span.enter();
564/// // ... classify events ...
565/// ```
566pub fn span_classify_events(node_id: u64, packet_count: usize) -> Span {
567    tracing::span!(
568        tracing::Level::DEBUG,
569        "classify_events",
570        node_id = node_id,
571        packet_count = packet_count,
572    )
573}