elara_runtime/observability/tracing.rs
1//! Distributed tracing system with OpenTelemetry support.
2//!
3//! This module provides distributed tracing capabilities for the ELARA Protocol,
4//! enabling end-to-end request tracing across nodes. It supports multiple exporter
5//! backends (Jaeger, Zipkin, OTLP) and configurable sampling rates.
6//!
7//! # Examples
8//!
9//! ```no_run
10//! use elara_runtime::observability::tracing::{TracingConfig, TracingExporter, init_tracing};
11//!
12//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
13//! let config = TracingConfig {
14//! service_name: "elara-node".to_string(),
15//! exporter: TracingExporter::Jaeger {
16//! endpoint: "http://localhost:14268/api/traces".to_string(),
17//! },
18//! sampling_rate: 1.0,
19//! resource_attributes: vec![
20//! ("environment".to_string(), "production".to_string()),
21//! ],
22//! };
23//!
24//! let handle = init_tracing(config).await?;
25//!
26//! // Use tracing throughout your application
27//! tracing::info!("Application started");
28//!
29//! // Graceful shutdown
30//! handle.shutdown().await?;
31//! # Ok(())
32//! # }
33//! ```
34
35use opentelemetry::global;
36use opentelemetry::trace::TracerProvider as _;
37use opentelemetry::KeyValue;
38use opentelemetry_sdk::trace::{RandomIdGenerator, Sampler, TracerProvider};
39use opentelemetry_sdk::Resource;
40use std::sync::atomic::{AtomicBool, Ordering};
41use std::sync::Arc;
42use thiserror::Error;
43use tracing::Span;
44use tracing_subscriber::layer::SubscriberExt;
45use tracing_subscriber::Registry;
46
47/// Configuration for the distributed tracing system.
48///
49/// This struct defines all parameters needed to initialize OpenTelemetry tracing,
50/// including the service name, exporter backend, sampling rate, and resource attributes.
51#[derive(Debug, Clone)]
52pub struct TracingConfig {
53 /// Service name to identify this node in traces
54 pub service_name: String,
55
56 /// Exporter backend configuration
57 pub exporter: TracingExporter,
58
59 /// Sampling rate (0.0 to 1.0)
60 /// - 0.0: No traces are sampled
61 /// - 1.0: All traces are sampled
62 /// - 0.1: 10% of traces are sampled
63 pub sampling_rate: f64,
64
65 /// Additional resource attributes (key-value pairs)
66 /// These are attached to all spans and can be used for filtering/grouping
67 pub resource_attributes: Vec<(String, String)>,
68}
69
70/// Exporter backend configuration.
71///
72/// Supports multiple OpenTelemetry-compatible backends for trace export.
73#[derive(Debug, Clone)]
74pub enum TracingExporter {
75 /// Jaeger exporter (good for development/testing)
76 /// Endpoint format: "http://localhost:14268/api/traces"
77 Jaeger { endpoint: String },
78
79 /// Zipkin exporter (for compatibility with Zipkin infrastructure)
80 /// Endpoint format: "http://localhost:9411/api/v2/spans"
81 Zipkin { endpoint: String },
82
83 /// OTLP (OpenTelemetry Protocol) exporter (production standard)
84 /// Endpoint format: "http://localhost:4317" (gRPC)
85 Otlp { endpoint: String },
86
87 /// No tracing (disabled)
88 None,
89}
90
91/// Handle for managing the tracing system lifecycle.
92///
93/// This handle allows for graceful shutdown of the tracing system,
94/// ensuring all pending spans are flushed before termination.
95#[derive(Clone)]
96pub struct TracingHandle {
97 initialized: Arc<AtomicBool>,
98}
99
100impl TracingHandle {
101 /// Shutdown the tracing system gracefully.
102 ///
103 /// This flushes all pending spans and shuts down the exporter.
104 /// After calling this, no more traces will be exported.
105 pub async fn shutdown(self) -> Result<(), TracingError> {
106 if self.initialized.load(Ordering::SeqCst) {
107 global::shutdown_tracer_provider();
108 self.initialized.store(false, Ordering::SeqCst);
109 }
110 Ok(())
111 }
112}
113
114/// Errors that can occur during tracing initialization or operation.
115#[derive(Debug, Error)]
116pub enum TracingError {
117 /// Tracing system has already been initialized
118 #[error("Tracing system already initialized")]
119 AlreadyInitialized,
120
121 /// Invalid sampling rate (must be between 0.0 and 1.0)
122 #[error("Invalid sampling rate: {0} (must be between 0.0 and 1.0)")]
123 InvalidSamplingRate(f64),
124
125 /// Failed to initialize the exporter
126 #[error("Failed to initialize exporter: {0}")]
127 ExporterInitialization(String),
128
129 /// Failed to set global tracer provider
130 #[error("Failed to set global tracer provider: {0}")]
131 GlobalTracerSetup(String),
132}
133
134/// Global flag to track if tracing has been initialized.
135/// This ensures idempotency - init_tracing can only be called once.
136static TRACING_INITIALIZED: AtomicBool = AtomicBool::new(false);
137
138/// Initialize the distributed tracing system.
139///
140/// This function sets up OpenTelemetry tracing with the specified configuration.
141/// It is idempotent - calling it multiple times will return an error after the first call.
142///
143/// # Arguments
144///
145/// * `config` - Tracing configuration including service name, exporter, and sampling rate
146///
147/// # Returns
148///
149/// * `Ok(TracingHandle)` - Handle for graceful shutdown
150/// * `Err(TracingError)` - If initialization fails or tracing is already initialized
151///
152/// # Examples
153///
154/// ```no_run
155/// use elara_runtime::observability::tracing::{TracingConfig, TracingExporter, init_tracing};
156///
157/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
158/// let config = TracingConfig {
159/// service_name: "elara-node".to_string(),
160/// exporter: TracingExporter::Otlp {
161/// endpoint: "http://localhost:4317".to_string(),
162/// },
163/// sampling_rate: 0.1, // Sample 10% of traces
164/// resource_attributes: vec![
165/// ("node.id".to_string(), "node-1".to_string()),
166/// ("environment".to_string(), "production".to_string()),
167/// ],
168/// };
169///
170/// let handle = init_tracing(config).await?;
171/// # Ok(())
172/// # }
173/// ```
174pub async fn init_tracing(config: TracingConfig) -> Result<TracingHandle, TracingError> {
175 // Check if already initialized (idempotency)
176 if TRACING_INITIALIZED.swap(true, Ordering::SeqCst) {
177 return Err(TracingError::AlreadyInitialized);
178 }
179
180 // Validate sampling rate
181 if config.sampling_rate < 0.0 || config.sampling_rate > 1.0 {
182 TRACING_INITIALIZED.store(false, Ordering::SeqCst);
183 return Err(TracingError::InvalidSamplingRate(config.sampling_rate));
184 }
185
186 // Handle disabled tracing
187 if matches!(config.exporter, TracingExporter::None) {
188 TRACING_INITIALIZED.store(false, Ordering::SeqCst);
189 return Ok(TracingHandle {
190 initialized: Arc::new(AtomicBool::new(false)),
191 });
192 }
193
194 // Build resource with service name and custom attributes
195 let mut resource_kvs = vec![KeyValue::new("service.name", config.service_name.clone())];
196 for (key, value) in config.resource_attributes {
197 resource_kvs.push(KeyValue::new(key, value));
198 }
199 let resource = Resource::new(resource_kvs);
200
201 // Configure sampler based on sampling rate
202 let sampler = if config.sampling_rate >= 1.0 {
203 Sampler::AlwaysOn
204 } else if config.sampling_rate <= 0.0 {
205 Sampler::AlwaysOff
206 } else {
207 Sampler::TraceIdRatioBased(config.sampling_rate)
208 };
209
210 // Initialize the appropriate exporter
211 let tracer_provider = match config.exporter {
212 TracingExporter::Jaeger { endpoint } => {
213 init_jaeger_exporter(&endpoint, resource, sampler).await?
214 }
215 TracingExporter::Zipkin { endpoint } => {
216 init_zipkin_exporter(&endpoint, resource, sampler).await?
217 }
218 TracingExporter::Otlp { endpoint } => {
219 init_otlp_exporter(&endpoint, resource, sampler).await?
220 }
221 TracingExporter::None => unreachable!(), // Already handled above
222 };
223
224 // Set as global tracer provider
225 global::set_tracer_provider(tracer_provider.clone());
226
227 // Create OpenTelemetry tracing layer using a concrete tracer from the provider
228 // Note: We use the provider's tracer() method instead of global::tracer()
229 // to get a concrete Tracer type that implements PreSampledTracer
230 let tracer = tracer_provider.tracer("elara-runtime");
231 let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer);
232
233 // Install the layer with the existing subscriber
234 // Note: This assumes a subscriber is already set up (e.g., by logging module)
235 // If not, we create a basic one
236 let subscriber = Registry::default().with(telemetry_layer);
237
238 tracing::subscriber::set_global_default(subscriber).map_err(|e| {
239 TRACING_INITIALIZED.store(false, Ordering::SeqCst);
240 TracingError::GlobalTracerSetup(e.to_string())
241 })?;
242
243 Ok(TracingHandle {
244 initialized: Arc::new(AtomicBool::new(true)),
245 })
246}
247
248/// Initialize Jaeger exporter.
249async fn init_jaeger_exporter(
250 endpoint: &str,
251 resource: Resource,
252 sampler: Sampler,
253) -> Result<TracerProvider, TracingError> {
254 let exporter = opentelemetry_jaeger::new_agent_pipeline()
255 .with_endpoint(endpoint)
256 .with_service_name("elara-node")
257 .build_async_agent_exporter(opentelemetry_sdk::runtime::Tokio)
258 .map_err(|e| TracingError::ExporterInitialization(e.to_string()))?;
259
260 let tracer_provider = TracerProvider::builder()
261 .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
262 .with_config(
263 opentelemetry_sdk::trace::Config::default()
264 .with_sampler(sampler)
265 .with_id_generator(RandomIdGenerator::default())
266 .with_resource(resource),
267 )
268 .build();
269
270 Ok(tracer_provider)
271}
272
273/// Initialize Zipkin exporter.
274async fn init_zipkin_exporter(
275 endpoint: &str,
276 resource: Resource,
277 sampler: Sampler,
278) -> Result<TracerProvider, TracingError> {
279 let exporter = opentelemetry_zipkin::new_pipeline()
280 .with_collector_endpoint(endpoint)
281 .init_exporter()
282 .map_err(|e| TracingError::ExporterInitialization(e.to_string()))?;
283
284 let tracer_provider = TracerProvider::builder()
285 .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
286 .with_config(
287 opentelemetry_sdk::trace::Config::default()
288 .with_sampler(sampler)
289 .with_id_generator(RandomIdGenerator::default())
290 .with_resource(resource),
291 )
292 .build();
293
294 Ok(tracer_provider)
295}
296
297/// Initialize OTLP exporter.
298async fn init_otlp_exporter(
299 endpoint: &str,
300 resource: Resource,
301 sampler: Sampler,
302) -> Result<TracerProvider, TracingError> {
303 use opentelemetry_otlp::WithExportConfig;
304
305 let exporter = opentelemetry_otlp::new_exporter()
306 .tonic()
307 .with_endpoint(endpoint)
308 .build_span_exporter()
309 .map_err(|e| TracingError::ExporterInitialization(e.to_string()))?;
310
311 let tracer_provider = TracerProvider::builder()
312 .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
313 .with_config(
314 opentelemetry_sdk::trace::Config::default()
315 .with_sampler(sampler)
316 .with_id_generator(RandomIdGenerator::default())
317 .with_resource(resource),
318 )
319 .build();
320
321 Ok(tracer_provider)
322}
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327
328 #[tokio::test]
329 async fn test_tracing_config_validation() {
330 // Test invalid sampling rate
331 let config = TracingConfig {
332 service_name: "test".to_string(),
333 exporter: TracingExporter::None,
334 sampling_rate: 1.5,
335 resource_attributes: vec![],
336 };
337
338 let result = init_tracing(config).await;
339 assert!(matches!(result, Err(TracingError::InvalidSamplingRate(_))));
340 }
341
342 #[tokio::test]
343 async fn test_disabled_tracing() {
344 let config = TracingConfig {
345 service_name: "test".to_string(),
346 exporter: TracingExporter::None,
347 sampling_rate: 1.0,
348 resource_attributes: vec![],
349 };
350
351 let result = init_tracing(config).await;
352 assert!(result.is_ok());
353 }
354}
355
356/// Span helper functions for common tracing patterns.
357///
358/// These functions create pre-configured spans for common operations in the ELARA Protocol,
359/// making it easier to maintain consistent tracing across the codebase.
360
361/// Create a span for message send operations.
362///
363/// # Arguments
364///
365/// * `node_id` - The ID of the node sending the message
366/// * `session_id` - Optional session ID
367/// * `message_count` - Number of messages being sent
368///
369/// # Examples
370///
371/// ```no_run
372/// use elara_runtime::observability::tracing::span_message_send;
373/// use elara_core::NodeId;
374///
375/// let span = span_message_send(NodeId(1), Some(42), 5);
376/// let _enter = span.enter();
377/// // ... send messages ...
378/// ```
379pub fn span_message_send(node_id: u64, session_id: Option<u64>, message_count: usize) -> Span {
380 tracing::span!(
381 tracing::Level::DEBUG,
382 "message_send",
383 node_id = node_id,
384 session_id = ?session_id,
385 message_count = message_count,
386 )
387}
388
389/// Create a span for message receive operations.
390///
391/// # Arguments
392///
393/// * `node_id` - The ID of the node receiving the message
394/// * `session_id` - Optional session ID
395/// * `message_count` - Number of messages being received
396///
397/// # Examples
398///
399/// ```no_run
400/// use elara_runtime::observability::tracing::span_message_receive;
401/// use elara_core::NodeId;
402///
403/// let span = span_message_receive(NodeId(1), Some(42), 3);
404/// let _enter = span.enter();
405/// // ... receive messages ...
406/// ```
407pub fn span_message_receive(node_id: u64, session_id: Option<u64>, message_count: usize) -> Span {
408 tracing::span!(
409 tracing::Level::DEBUG,
410 "message_receive",
411 node_id = node_id,
412 session_id = ?session_id,
413 message_count = message_count,
414 )
415}
416
417/// Create a span for state synchronization operations.
418///
419/// # Arguments
420///
421/// * `node_id` - The ID of the node performing state sync
422/// * `session_id` - Optional session ID
423/// * `event_count` - Number of events being processed
424///
425/// # Examples
426///
427/// ```no_run
428/// use elara_runtime::observability::tracing::span_state_sync;
429/// use elara_core::NodeId;
430///
431/// let span = span_state_sync(NodeId(1), Some(42), 10);
432/// let _enter = span.enter();
433/// // ... synchronize state ...
434/// ```
435pub fn span_state_sync(node_id: u64, session_id: Option<u64>, event_count: usize) -> Span {
436 tracing::span!(
437 tracing::Level::DEBUG,
438 "state_sync",
439 node_id = node_id,
440 session_id = ?session_id,
441 event_count = event_count,
442 )
443}
444
445/// Create a span for connection establishment operations.
446///
447/// # Arguments
448///
449/// * `node_id` - The ID of the node establishing the connection
450/// * `session_id` - The session ID being joined
451///
452/// # Examples
453///
454/// ```no_run
455/// use elara_runtime::observability::tracing::span_connection_establish;
456/// use elara_core::NodeId;
457///
458/// let span = span_connection_establish(NodeId(1), 42);
459/// let _enter = span.enter();
460/// // ... establish connection ...
461/// ```
462pub fn span_connection_establish(node_id: u64, session_id: u64) -> Span {
463 tracing::span!(
464 tracing::Level::INFO,
465 "connection_establish",
466 node_id = node_id,
467 session_id = session_id,
468 )
469}
470
471/// Create a span for connection teardown operations.
472///
473/// # Arguments
474///
475/// * `node_id` - The ID of the node tearing down the connection
476/// * `session_id` - Optional session ID being left
477///
478/// # Examples
479///
480/// ```no_run
481/// use elara_runtime::observability::tracing::span_connection_teardown;
482/// use elara_core::NodeId;
483///
484/// let span = span_connection_teardown(NodeId(1), Some(42));
485/// let _enter = span.enter();
486/// // ... teardown connection ...
487/// ```
488pub fn span_connection_teardown(node_id: u64, session_id: Option<u64>) -> Span {
489 tracing::span!(
490 tracing::Level::INFO,
491 "connection_teardown",
492 node_id = node_id,
493 session_id = ?session_id,
494 )
495}
496
497/// Create a span for node tick operations.
498///
499/// # Arguments
500///
501/// * `node_id` - The ID of the node performing the tick
502/// * `session_id` - Optional session ID
503///
504/// # Examples
505///
506/// ```no_run
507/// use elara_runtime::observability::tracing::span_node_tick;
508/// use elara_core::NodeId;
509///
510/// let span = span_node_tick(NodeId(1), Some(42));
511/// let _enter = span.enter();
512/// // ... perform tick ...
513/// ```
514pub fn span_node_tick(node_id: u64, session_id: Option<u64>) -> Span {
515 tracing::span!(
516 tracing::Level::INFO,
517 "node_tick",
518 node_id = node_id,
519 session_id = ?session_id,
520 )
521}
522
523/// Create a span for decryption operations.
524///
525/// # Arguments
526///
527/// * `node_id` - The ID of the node performing decryption
528/// * `packet_count` - Number of packets being decrypted
529///
530/// # Examples
531///
532/// ```no_run
533/// use elara_runtime::observability::tracing::span_decrypt;
534/// use elara_core::NodeId;
535///
536/// let span = span_decrypt(NodeId(1), 5);
537/// let _enter = span.enter();
538/// // ... decrypt packets ...
539/// ```
540pub fn span_decrypt(node_id: u64, packet_count: usize) -> Span {
541 tracing::span!(
542 tracing::Level::DEBUG,
543 "decrypt",
544 node_id = node_id,
545 packet_count = packet_count,
546 )
547}
548
549/// Create a span for event classification operations.
550///
551/// # Arguments
552///
553/// * `node_id` - The ID of the node classifying events
554/// * `packet_count` - Number of packets being classified
555///
556/// # Examples
557///
558/// ```no_run
559/// use elara_runtime::observability::tracing::span_classify_events;
560/// use elara_core::NodeId;
561///
562/// let span = span_classify_events(NodeId(1), 3);
563/// let _enter = span.enter();
564/// // ... classify events ...
565/// ```
566pub fn span_classify_events(node_id: u64, packet_count: usize) -> Span {
567 tracing::span!(
568 tracing::Level::DEBUG,
569 "classify_events",
570 node_id = node_id,
571 packet_count = packet_count,
572 )
573}