ant_quic/monitoring/
distributed_tracing.rs

1//! Distributed Tracing System
2//!
3//! This module implements distributed tracing for NAT traversal operations
4//! to provide end-to-end visibility across the entire connection establishment process.
5
6use std::{
7    collections::HashMap,
8    sync::Arc,
9    time::{Duration, Instant, SystemTime},
10};
11
12use tokio::sync::{RwLock, Mutex};
13use tracing::{debug, info, warn, Span};
14use uuid::Uuid;
15
16use crate::monitoring::{
17    MonitoringError, NatTraversalAttempt, NatTraversalResult,
18};
19
20/// Distributed trace collector for NAT traversal operations
21pub struct DistributedTraceCollector {
22    /// Tracing configuration
23    config: TracingConfig,
24    /// Active traces storage
25    active_traces: Arc<RwLock<HashMap<String, TraceContext>>>,
26    /// Trace exporter
27    exporter: Arc<TraceExporter>,
28    /// Sampling decision engine
29    sampler: Arc<TraceSampler>,
30    /// Span builder for creating structured spans
31    span_builder: Arc<SpanBuilder>,
32    /// Correlation ID manager
33    correlation_manager: Arc<CorrelationManager>,
34}
35
36impl DistributedTraceCollector {
37    /// Create new distributed trace collector
38    pub async fn new(config: TracingConfig) -> Result<Self, MonitoringError> {
39        let exporter = Arc::new(TraceExporter::new(config.export.clone()));
40        let sampler = Arc::new(TraceSampler::new(config.sampling.clone()));
41        let span_builder = Arc::new(SpanBuilder::new());
42        let correlation_manager = Arc::new(CorrelationManager::new());
43        
44        Ok(Self {
45            config,
46            active_traces: Arc::new(RwLock::new(HashMap::new())),
47            exporter,
48            sampler,
49            span_builder,
50            correlation_manager,
51        })
52    }
53    
54    /// Start tracing system
55    pub async fn start(&self) -> Result<(), MonitoringError> {
56        info!("Starting distributed trace collector");
57        
58        // Initialize exporter
59        self.exporter.start().await?;
60        
61        info!("Distributed trace collector started");
62        Ok(())
63    }
64    
65    /// Stop tracing system
66    pub async fn stop(&self) -> Result<(), MonitoringError> {
67        info!("Stopping distributed trace collector");
68        
69        // Flush remaining traces
70        self.flush_active_traces().await?;
71        
72        // Stop exporter
73        self.exporter.stop().await?;
74        
75        info!("Distributed trace collector stopped");
76        Ok(())
77    }
78    
79    /// Start NAT traversal trace
80    pub async fn start_nat_trace(&self, attempt: &NatTraversalAttempt) -> Result<TraceId, MonitoringError> {
81        // Check sampling decision
82        if !self.sampler.should_sample_nat_trace(attempt).await {
83            return Ok(TraceId::new()); // Return empty trace ID
84        }
85        
86        let trace_id = TraceId::new();
87        let correlation_id = self.correlation_manager.generate_correlation_id().await;
88        
89        // Create root span for NAT traversal
90        let root_span = self.span_builder.create_nat_traversal_span(
91            &trace_id,
92            None, // No parent span
93            attempt,
94        ).await?;
95        
96        // Create trace context
97        let trace_context = TraceContext {
98            trace_id: trace_id.clone(),
99            correlation_id,
100            root_span: root_span.clone(),
101            active_spans: HashMap::new(),
102            start_time: SystemTime::now(),
103            client_info: attempt.client_info.clone(),
104            server_info: attempt.server_info.clone(),
105            bootstrap_nodes: attempt.bootstrap_nodes.clone(),
106            events: Vec::new(),
107        };
108        
109        // Store active trace
110        {
111            let mut active_traces = self.active_traces.write().await;
112            active_traces.insert(attempt.attempt_id.clone(), trace_context);
113        }
114        
115        debug!("Started NAT traversal trace: {} (attempt: {})", trace_id, attempt.attempt_id);
116        Ok(trace_id)
117    }
118    
119    /// Complete NAT traversal trace
120    pub async fn complete_nat_trace(&self, result: &NatTraversalResult) -> Result<(), MonitoringError> {
121        let mut active_traces = self.active_traces.write().await;
122        
123        if let Some(mut trace_context) = active_traces.remove(&result.attempt_id) {
124            // Add final result information
125            let result_event = TraceEvent {
126                timestamp: SystemTime::now(),
127                event_type: TraceEventType::NatTraversalCompleted,
128                span_id: trace_context.root_span.span_id.clone(),
129                attributes: self.result_to_attributes(result),
130                duration: Some(result.duration),
131            };
132            
133            trace_context.events.push(result_event);
134            
135            // Close root span
136            trace_context.root_span.end_time = Some(SystemTime::now());
137            trace_context.root_span.status = if result.success {
138                SpanStatus::Ok
139            } else {
140                SpanStatus::Error
141            };
142            
143            // Export completed trace
144            self.exporter.export_trace(trace_context).await?;
145            
146            debug!("Completed NAT traversal trace for attempt: {}", result.attempt_id);
147        } else {
148            warn!("No active trace found for attempt: {}", result.attempt_id);
149        }
150        
151        Ok(())
152    }
153    
154    /// Add span to existing trace
155    pub async fn add_span(
156        &self,
157        attempt_id: &str,
158        span_name: &str,
159        parent_span_id: Option<SpanId>,
160        attributes: HashMap<String, AttributeValue>,
161    ) -> Result<SpanId, MonitoringError> {
162        let mut active_traces = self.active_traces.write().await;
163        
164        if let Some(trace_context) = active_traces.get_mut(attempt_id) {
165            let span = self.span_builder.create_child_span(
166                &trace_context.trace_id,
167                parent_span_id.as_ref().unwrap_or(&trace_context.root_span.span_id),
168                span_name,
169                attributes,
170            ).await?;
171            
172            let span_id = span.span_id.clone();
173            trace_context.active_spans.insert(span_id.clone(), span);
174            
175            Ok(span_id)
176        } else {
177            Err(MonitoringError::TracingError(
178                format!("No active trace found for attempt: {}", attempt_id)
179            ))
180        }
181    }
182    
183    /// Add event to trace
184    pub async fn add_event(
185        &self,
186        attempt_id: &str,
187        span_id: &SpanId,
188        event_type: TraceEventType,
189        attributes: HashMap<String, AttributeValue>,
190    ) -> Result<(), MonitoringError> {
191        let mut active_traces = self.active_traces.write().await;
192        
193        if let Some(trace_context) = active_traces.get_mut(attempt_id) {
194            debug!("Adding event {:?} to trace for attempt: {}", event_type, attempt_id);
195            
196            let event = TraceEvent {
197                timestamp: SystemTime::now(),
198                event_type,
199                span_id: span_id.clone(),
200                attributes,
201                duration: None,
202            };
203            
204            trace_context.events.push(event);
205        } else {
206            warn!("No active trace found for attempt: {}", attempt_id);
207        }
208        
209        Ok(())
210    }
211    
212    /// Complete span in trace
213    pub async fn complete_span(
214        &self,
215        attempt_id: &str,
216        span_id: &SpanId,
217        status: SpanStatus,
218    ) -> Result<(), MonitoringError> {
219        let mut active_traces = self.active_traces.write().await;
220        
221        if let Some(trace_context) = active_traces.get_mut(attempt_id) {
222            if let Some(span) = trace_context.active_spans.get_mut(span_id) {
223                span.end_time = Some(SystemTime::now());
224                span.status = status;
225                
226                debug!("Completed span {} in trace for attempt: {}", span_id, attempt_id);
227            }
228        }
229        
230        Ok(())
231    }
232    
233    /// Get trace status
234    pub async fn get_status(&self) -> String {
235        let active_traces = self.active_traces.read().await;
236        format!("Active traces: {}", active_traces.len())
237    }
238    
239    /// Flush active traces
240    async fn flush_active_traces(&self) -> Result<(), MonitoringError> {
241        let mut active_traces = self.active_traces.write().await;
242        
243        for (attempt_id, mut trace_context) in active_traces.drain() {
244            // Mark as incomplete
245            trace_context.root_span.status = SpanStatus::Cancelled;
246            trace_context.root_span.end_time = Some(SystemTime::now());
247            
248            // Export incomplete trace
249            if let Err(e) = self.exporter.export_trace(trace_context).await {
250                warn!("Failed to export incomplete trace for {}: {}", attempt_id, e);
251            }
252        }
253        
254        Ok(())
255    }
256    
257    /// Convert result to trace attributes
258    fn result_to_attributes(&self, result: &NatTraversalResult) -> HashMap<String, AttributeValue> {
259        let mut attributes = HashMap::new();
260        
261        attributes.insert("nat.success".to_string(), AttributeValue::Bool(result.success));
262        attributes.insert("nat.duration_ms".to_string(), AttributeValue::Int(result.duration.as_millis() as i64));
263        
264        if let Some(error_info) = &result.error_info {
265            attributes.insert("error.category".to_string(), AttributeValue::String(format!("{:?}", error_info.error_category)));
266            attributes.insert("error.code".to_string(), AttributeValue::String(error_info.error_code.clone()));
267            attributes.insert("error.message".to_string(), AttributeValue::String(error_info.error_message.clone()));
268        }
269        
270        let perf = &result.performance_metrics;
271        attributes.insert("nat.connection_time_ms".to_string(), AttributeValue::Int(perf.connection_time_ms as i64));
272        attributes.insert("nat.candidates_tried".to_string(), AttributeValue::Int(perf.candidates_tried as i64));
273        attributes.insert("nat.round_trips".to_string(), AttributeValue::Int(perf.round_trips as i64));
274        
275        if let Some(conn_info) = &result.connection_info {
276            attributes.insert("connection.latency_ms".to_string(), AttributeValue::Int(conn_info.quality.latency_ms as i64));
277            attributes.insert("connection.throughput_mbps".to_string(), AttributeValue::Float(conn_info.quality.throughput_mbps as f64));
278            attributes.insert("connection.path_type".to_string(), AttributeValue::String(format!("{:?}", conn_info.path.path_type)));
279        }
280        
281        attributes
282    }
283}
284
285/// Tracing configuration
286#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
287pub struct TracingConfig {
288    /// Enable distributed tracing
289    pub enabled: bool,
290    /// Sampling configuration
291    pub sampling: TraceSamplingConfig,
292    /// Export configuration
293    pub export: TraceExportConfig,
294    /// Correlation settings
295    pub correlation: CorrelationConfig,
296    /// Resource limits
297    pub resource_limits: TraceResourceLimits,
298}
299
300impl Default for TracingConfig {
301    fn default() -> Self {
302        Self {
303            enabled: true,
304            sampling: TraceSamplingConfig::default(),
305            export: TraceExportConfig::default(),
306            correlation: CorrelationConfig::default(),
307            resource_limits: TraceResourceLimits::default(),
308        }
309    }
310}
311
312/// Trace sampling configuration
313#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
314pub struct TraceSamplingConfig {
315    /// Base sampling rate for NAT traversal traces
316    pub nat_traversal_rate: f64,
317    /// Sampling rate for successful operations
318    pub success_rate: f64,
319    /// Sampling rate for failed operations
320    pub failure_rate: f64,
321    /// Adaptive sampling settings
322    pub adaptive: AdaptiveTraceSamplingConfig,
323}
324
325impl Default for TraceSamplingConfig {
326    fn default() -> Self {
327        Self {
328            nat_traversal_rate: 0.1,  // 10% of NAT traversals
329            success_rate: 0.05,       // 5% of successful operations
330            failure_rate: 1.0,        // 100% of failures
331            adaptive: AdaptiveTraceSamplingConfig::default(),
332        }
333    }
334}
335
336/// Adaptive trace sampling configuration
337#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
338pub struct AdaptiveTraceSamplingConfig {
339    /// Enable adaptive sampling
340    pub enabled: bool,
341    /// Target traces per second
342    pub target_traces_per_second: f64,
343    /// Adjustment interval
344    pub adjustment_interval: Duration,
345}
346
347impl Default for AdaptiveTraceSamplingConfig {
348    fn default() -> Self {
349        Self {
350            enabled: true,
351            target_traces_per_second: 100.0,
352            adjustment_interval: Duration::from_secs(60),
353        }
354    }
355}
356
357/// Trace export configuration
358#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
359pub struct TraceExportConfig {
360    /// Export destinations
361    pub destinations: Vec<TraceExportDestination>,
362    /// Batch size for export
363    pub batch_size: usize,
364    /// Export interval
365    pub export_interval: Duration,
366    /// Export timeout
367    pub export_timeout: Duration,
368}
369
370impl Default for TraceExportConfig {
371    fn default() -> Self {
372        Self {
373            destinations: vec![TraceExportDestination::Jaeger {
374                endpoint: "http://localhost:14268/api/traces".to_string(),
375            }],
376            batch_size: 100,
377            export_interval: Duration::from_secs(10),
378            export_timeout: Duration::from_secs(30),
379        }
380    }
381}
382
383/// Trace export destinations
384#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
385pub enum TraceExportDestination {
386    Jaeger { endpoint: String },
387    Zipkin { endpoint: String },
388    OTLP { endpoint: String },
389    CloudTrace { project_id: String },
390    XRay { region: String },
391}
392
393/// Correlation configuration
394#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
395pub struct CorrelationConfig {
396    /// Correlation ID header name
397    pub correlation_header: String,
398    /// Enable cross-service correlation
399    pub cross_service: bool,
400    /// Correlation ID format
401    pub id_format: CorrelationIdFormat,
402}
403
404impl Default for CorrelationConfig {
405    fn default() -> Self {
406        Self {
407            correlation_header: "X-Correlation-ID".to_string(),
408            cross_service: true,
409            id_format: CorrelationIdFormat::UUID,
410        }
411    }
412}
413
414/// Correlation ID formats
415#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
416pub enum CorrelationIdFormat {
417    UUID,
418    Snowflake,
419    Custom(String),
420}
421
422/// Trace resource limits
423#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
424pub struct TraceResourceLimits {
425    /// Maximum active traces
426    pub max_active_traces: usize,
427    /// Maximum spans per trace
428    pub max_spans_per_trace: usize,
429    /// Maximum events per trace
430    pub max_events_per_trace: usize,
431    /// Maximum trace duration
432    pub max_trace_duration: Duration,
433}
434
435impl Default for TraceResourceLimits {
436    fn default() -> Self {
437        Self {
438            max_active_traces: 10000,
439            max_spans_per_trace: 100,
440            max_events_per_trace: 500,
441            max_trace_duration: Duration::from_secs(300), // 5 minutes
442        }
443    }
444}
445
446/// Unique trace identifier
447#[derive(Debug, Clone, PartialEq, Eq, Hash)]
448#[allow(dead_code)] // Used for distributed tracing correlation
449pub struct TraceId(String);
450
451impl TraceId {
452    fn new() -> Self {
453        Self(Uuid::new_v4().to_string())
454    }
455}
456
457impl std::fmt::Display for TraceId {
458    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
459        write!(f, "{}", self.0)
460    }
461}
462
463/// Unique span identifier
464#[derive(Debug, Clone, PartialEq, Eq, Hash)]
465#[allow(dead_code)] // Used for distributed tracing hierarchy
466pub struct SpanId(String);
467
468impl SpanId {
469    fn new() -> Self {
470        Self(Uuid::new_v4().to_string())
471    }
472}
473
474impl std::fmt::Display for SpanId {
475    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
476        write!(f, "{}", self.0)
477    }
478}
479
480/// Correlation identifier for cross-service tracing
481#[derive(Debug, Clone)]
482#[allow(dead_code)] // Used for cross-service request correlation
483pub struct CorrelationId(String);
484
485impl CorrelationId {
486    fn new() -> Self {
487        Self(Uuid::new_v4().to_string())
488    }
489}
490
491impl std::fmt::Display for CorrelationId {
492    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
493        write!(f, "{}", self.0)
494    }
495}
496
497/// Trace context containing all trace information
498#[derive(Debug, Clone)]
499#[allow(dead_code)] // Fields used for distributed trace state management
500struct TraceContext {
501    /// Unique trace identifier
502    trace_id: TraceId,
503    /// Correlation identifier
504    correlation_id: CorrelationId,
505    /// Root span for the trace
506    root_span: TraceSpan,
507    /// Active child spans
508    active_spans: HashMap<SpanId, TraceSpan>,
509    /// Trace start time
510    start_time: SystemTime,
511    /// Client endpoint information
512    client_info: crate::monitoring::EndpointInfo,
513    /// Server endpoint information
514    server_info: crate::monitoring::EndpointInfo,
515    /// Bootstrap nodes involved
516    bootstrap_nodes: Vec<String>,
517    /// Trace events
518    events: Vec<TraceEvent>,
519}
520
521/// Individual span in a trace
522#[derive(Debug, Clone)]
523#[allow(dead_code)] // Fields populated during trace lifecycle
524struct TraceSpan {
525    /// Unique span identifier
526    span_id: SpanId,
527    /// Parent span identifier
528    parent_span_id: Option<SpanId>,
529    /// Span name/operation
530    name: String,
531    /// Span start time
532    start_time: SystemTime,
533    /// Span end time
534    end_time: Option<SystemTime>,
535    /// Span status
536    status: SpanStatus,
537    /// Span attributes
538    attributes: HashMap<String, AttributeValue>,
539    /// Span tags
540    tags: HashMap<String, String>,
541}
542
543/// Span status
544#[derive(Debug, Clone, PartialEq)]
545pub enum SpanStatus {
546    Ok,
547    Error,
548    Cancelled,
549    Timeout,
550}
551
552/// Trace event
553#[derive(Debug, Clone)]
554#[allow(dead_code)] // Fields used for recording trace events
555struct TraceEvent {
556    /// Event timestamp
557    timestamp: SystemTime,
558    /// Event type
559    event_type: TraceEventType,
560    /// Associated span ID
561    span_id: SpanId,
562    /// Event attributes
563    attributes: HashMap<String, AttributeValue>,
564    /// Event duration (if applicable)
565    duration: Option<Duration>,
566}
567
568/// Trace event types
569#[derive(Debug, Clone)]
570#[allow(dead_code)] // All variants used for comprehensive trace event categorization
571pub enum TraceEventType {
572    NatTraversalStarted,
573    NatTraversalCompleted,
574    CandidateDiscoveryStarted,
575    CandidateDiscoveryCompleted,
576    CandidateTestStarted,
577    CandidateTestCompleted,
578    HolePunchingStarted,
579    HolePunchingCompleted,
580    ConnectionEstablished,
581    BootstrapNodeContacted,
582    ErrorOccurred,
583    Custom(String),
584}
585
586/// Attribute value types
587#[derive(Debug, Clone)]
588#[allow(dead_code)] // All variants used for flexible attribute storage
589pub enum AttributeValue {
590    String(String),
591    Int(i64),
592    Float(f64),
593    Bool(bool),
594    Array(Vec<AttributeValue>),
595}
596
597/// Trace sampler for sampling decisions
598struct TraceSampler {
599    config: TraceSamplingConfig,
600    current_rate: Arc<RwLock<f64>>,
601    traces_this_period: Arc<RwLock<u32>>,
602    last_adjustment: Arc<RwLock<Instant>>,
603}
604
605impl TraceSampler {
606    fn new(config: TraceSamplingConfig) -> Self {
607        Self {
608            current_rate: Arc::new(RwLock::new(config.nat_traversal_rate)),
609            config,
610            traces_this_period: Arc::new(RwLock::new(0)),
611            last_adjustment: Arc::new(RwLock::new(Instant::now())),
612        }
613    }
614    
615    async fn should_sample_nat_trace(&self, _attempt: &NatTraversalAttempt) -> bool {
616        // Adjust sampling rate if adaptive sampling is enabled
617        if self.config.adaptive.enabled {
618            self.adjust_sampling_rate().await;
619        }
620        
621        let current_rate = *self.current_rate.read().await;
622        let should_sample = rand::random::<f64>() < current_rate;
623        
624        if should_sample {
625            let mut traces_count = self.traces_this_period.write().await;
626            *traces_count += 1;
627        }
628        
629        should_sample
630    }
631    
632    async fn adjust_sampling_rate(&self) {
633        let mut last_adjustment = self.last_adjustment.write().await;
634        
635        if last_adjustment.elapsed() < self.config.adaptive.adjustment_interval {
636            return;
637        }
638        
639        let traces_count = {
640            let mut count = self.traces_this_period.write().await;
641            let current_count = *count;
642            *count = 0; // Reset for next period
643            current_count
644        };
645        
646        let period_seconds = self.config.adaptive.adjustment_interval.as_secs_f64();
647        let current_traces_per_second = traces_count as f64 / period_seconds;
648        let target_traces_per_second = self.config.adaptive.target_traces_per_second;
649        
650        let mut current_rate = self.current_rate.write().await;
651        let adjustment_factor = target_traces_per_second / current_traces_per_second.max(1.0);
652        *current_rate = (*current_rate * adjustment_factor).min(1.0).max(0.001);
653        
654        *last_adjustment = Instant::now();
655        
656        debug!("Adjusted trace sampling rate to {:.4} (current rate: {:.2} traces/sec, target: {:.2})",
657            *current_rate, current_traces_per_second, target_traces_per_second);
658    }
659}
660
661/// Span builder for creating structured spans
662struct SpanBuilder;
663
664impl SpanBuilder {
665    fn new() -> Self {
666        Self
667    }
668    
669    async fn create_nat_traversal_span(
670        &self,
671        _trace_id: &TraceId,
672        parent_span_id: Option<SpanId>,
673        attempt: &NatTraversalAttempt,
674    ) -> Result<TraceSpan, MonitoringError> {
675        let mut attributes = HashMap::new();
676        
677        // Add NAT traversal specific attributes
678        attributes.insert("nat.attempt_id".to_string(), AttributeValue::String(attempt.attempt_id.clone()));
679        attributes.insert("nat.client.region".to_string(), AttributeValue::String(
680            attempt.client_info.region.as_deref().unwrap_or("unknown").to_string()
681        ));
682        attributes.insert("nat.server.region".to_string(), AttributeValue::String(
683            attempt.server_info.region.as_deref().unwrap_or("unknown").to_string()
684        ));
685        attributes.insert("nat.bootstrap_nodes".to_string(), AttributeValue::Array(
686            attempt.bootstrap_nodes.iter()
687                .map(|node| AttributeValue::String(node.clone()))
688                .collect()
689        ));
690        
691        if let Some(client_nat_type) = &attempt.client_info.nat_type {
692            attributes.insert("nat.client.type".to_string(), AttributeValue::String(format!("{:?}", client_nat_type)));
693        }
694        
695        if let Some(server_nat_type) = &attempt.server_info.nat_type {
696            attributes.insert("nat.server.type".to_string(), AttributeValue::String(format!("{:?}", server_nat_type)));
697        }
698        
699        // Add network conditions
700        if let Some(rtt) = attempt.network_conditions.rtt_ms {
701            attributes.insert("network.rtt_ms".to_string(), AttributeValue::Int(rtt as i64));
702        }
703        
704        if let Some(loss_rate) = attempt.network_conditions.packet_loss_rate {
705            attributes.insert("network.packet_loss_rate".to_string(), AttributeValue::Float(loss_rate as f64));
706        }
707        
708        Ok(TraceSpan {
709            span_id: SpanId::new(),
710            parent_span_id,
711            name: "nat_traversal".to_string(),
712            start_time: attempt.timestamp,
713            end_time: None,
714            status: SpanStatus::Ok,
715            attributes,
716            tags: HashMap::new(),
717        })
718    }
719    
720    async fn create_child_span(
721        &self,
722        _trace_id: &TraceId,
723        parent_span_id: &SpanId,
724        span_name: &str,
725        attributes: HashMap<String, AttributeValue>,
726    ) -> Result<TraceSpan, MonitoringError> {
727        Ok(TraceSpan {
728            span_id: SpanId::new(),
729            parent_span_id: Some(parent_span_id.clone()),
730            name: span_name.to_string(),
731            start_time: SystemTime::now(),
732            end_time: None,
733            status: SpanStatus::Ok,
734            attributes,
735            tags: HashMap::new(),
736        })
737    }
738}
739
740/// Trace exporter for sending traces to external systems
741struct TraceExporter {
742    config: TraceExportConfig,
743    pending_traces: Arc<Mutex<Vec<TraceContext>>>,
744}
745
746impl TraceExporter {
747    fn new(config: TraceExportConfig) -> Self {
748        Self {
749            config,
750            pending_traces: Arc::new(Mutex::new(Vec::new())),
751        }
752    }
753    
754    async fn start(&self) -> Result<(), MonitoringError> {
755        // Start background export task
756        info!("Trace exporter started");
757        Ok(())
758    }
759    
760    async fn stop(&self) -> Result<(), MonitoringError> {
761        // Flush remaining traces
762        self.flush_pending_traces().await?;
763        info!("Trace exporter stopped");
764        Ok(())
765    }
766    
767    async fn export_trace(&self, trace_context: TraceContext) -> Result<(), MonitoringError> {
768        // Add to pending traces
769        {
770            let mut pending = self.pending_traces.lock().await;
771            pending.push(trace_context);
772            
773            // Export if batch size reached
774            if pending.len() >= self.config.batch_size {
775                let traces = pending.drain(..).collect::<Vec<_>>();
776                drop(pending); // Release lock early
777                self.export_batch(traces).await?;
778            }
779        }
780        
781        Ok(())
782    }
783    
784    async fn export_batch(&self, traces: Vec<TraceContext>) -> Result<(), MonitoringError> {
785        for destination in &self.config.destinations {
786            if let Err(e) = self.export_to_destination(destination, &traces).await {
787                warn!("Failed to export traces to {:?}: {}", destination, e);
788            }
789        }
790        
791        debug!("Exported batch of {} traces", traces.len());
792        Ok(())
793    }
794    
795    async fn export_to_destination(
796        &self,
797        destination: &TraceExportDestination,
798        traces: &[TraceContext],
799    ) -> Result<(), MonitoringError> {
800        match destination {
801            TraceExportDestination::Jaeger { endpoint } => {
802                self.export_to_jaeger(endpoint, traces).await
803            }
804            TraceExportDestination::Zipkin { endpoint } => {
805                self.export_to_zipkin(endpoint, traces).await
806            }
807            TraceExportDestination::OTLP { endpoint } => {
808                self.export_to_otlp(endpoint, traces).await
809            }
810            TraceExportDestination::CloudTrace { project_id } => {
811                self.export_to_cloud_trace(project_id, traces).await
812            }
813            TraceExportDestination::XRay { region } => {
814                self.export_to_xray(region, traces).await
815            }
816        }
817    }
818    
819    async fn export_to_jaeger(&self, endpoint: &str, traces: &[TraceContext]) -> Result<(), MonitoringError> {
820        debug!("Exporting {} traces to Jaeger at {}", traces.len(), endpoint);
821        // Would implement actual Jaeger export
822        Ok(())
823    }
824    
825    async fn export_to_zipkin(&self, endpoint: &str, traces: &[TraceContext]) -> Result<(), MonitoringError> {
826        debug!("Exporting {} traces to Zipkin at {}", traces.len(), endpoint);
827        // Would implement actual Zipkin export
828        Ok(())
829    }
830    
831    async fn export_to_otlp(&self, endpoint: &str, traces: &[TraceContext]) -> Result<(), MonitoringError> {
832        debug!("Exporting {} traces to OTLP at {}", traces.len(), endpoint);
833        // Would implement actual OTLP export
834        Ok(())
835    }
836    
837    async fn export_to_cloud_trace(&self, project_id: &str, traces: &[TraceContext]) -> Result<(), MonitoringError> {
838        debug!("Exporting {} traces to Cloud Trace (project: {})", traces.len(), project_id);
839        // Would implement actual Cloud Trace export
840        Ok(())
841    }
842    
843    async fn export_to_xray(&self, region: &str, traces: &[TraceContext]) -> Result<(), MonitoringError> {
844        debug!("Exporting {} traces to X-Ray (region: {})", traces.len(), region);
845        // Would implement actual X-Ray export
846        Ok(())
847    }
848    
849    async fn flush_pending_traces(&self) -> Result<(), MonitoringError> {
850        let traces = {
851            let mut pending = self.pending_traces.lock().await;
852            pending.drain(..).collect::<Vec<_>>()
853        };
854        
855        if !traces.is_empty() {
856            self.export_batch(traces).await?;
857        }
858        
859        Ok(())
860    }
861}
862
863/// Correlation manager for managing correlation IDs
864struct CorrelationManager {
865    current_correlation: Arc<RwLock<Option<CorrelationId>>>,
866}
867
868impl CorrelationManager {
869    fn new() -> Self {
870        Self {
871            current_correlation: Arc::new(RwLock::new(None)),
872        }
873    }
874    
875    async fn generate_correlation_id(&self) -> CorrelationId {
876        let correlation_id = CorrelationId::new();
877        
878        // Store current correlation ID
879        {
880            let mut current = self.current_correlation.write().await;
881            *current = Some(correlation_id.clone());
882        }
883        
884        correlation_id
885    }
886    
887    async fn get_current_correlation(&self) -> Option<CorrelationId> {
888        let current = self.current_correlation.read().await;
889        current.clone()
890    }
891}
892
893/// Tracing utilities for manual instrumentation
894pub struct TracingUtils;
895
896impl TracingUtils {
897    /// Create a new trace span with the current tracing context
898    pub fn create_span(name: &'static str) -> Span {
899        tracing::info_span!("{}", name)
900    }
901    
902    /// Add attributes to current span
903    pub fn add_span_attributes(attributes: HashMap<String, AttributeValue>) {
904        let span = Span::current();
905        for (key, value) in attributes {
906            match value {
907                AttributeValue::String(s) => { span.record(key.as_str(), &s); }
908                AttributeValue::Int(i) => { span.record(key.as_str(), &i); }
909                AttributeValue::Float(f) => { span.record(key.as_str(), &f); }
910                AttributeValue::Bool(b) => { span.record(key.as_str(), &b); }
911                _ => {} // Complex types not supported by tracing
912            }
913        }
914    }
915    
916    /// Record an event in the current span
917    pub fn record_event(event_name: &str, attributes: HashMap<String, AttributeValue>) {
918        // Convert attributes to tracing format and record
919        info!("Event: {} with {} attributes", event_name, attributes.len());
920    }
921}
922
923#[cfg(test)]
924mod tests {
925    use super::*;
926
927    #[tokio::test]
928    async fn test_trace_collector_creation() {
929        let config = TracingConfig::default();
930        let collector = DistributedTraceCollector::new(config).await.unwrap();
931        
932        let status = collector.get_status().await;
933        assert!(status.contains("Active traces: 0"));
934    }
935    
936    #[tokio::test]
937    async fn test_trace_sampling() {
938        let config = TraceSamplingConfig {
939            nat_traversal_rate: 0.5, // 50% sampling
940            success_rate: 0.5,
941            failure_rate: 1.0,
942            adaptive: AdaptiveTraceSamplingConfig::default(),
943        };
944        
945        let sampler = TraceSampler::new(config);
946        
947        // Create mock attempt
948        let attempt = NatTraversalAttempt {
949            attempt_id: "test".to_string(),
950            timestamp: SystemTime::now(),
951            client_info: crate::monitoring::EndpointInfo {
952                id: "client".to_string(),
953                role: crate::monitoring::EndpointRole::Client,
954                address_hash: "hash".to_string(),
955                nat_type: None,
956                region: None,
957            },
958            server_info: crate::monitoring::EndpointInfo {
959                id: "server".to_string(),
960                role: crate::monitoring::EndpointRole::Server,
961                address_hash: "hash".to_string(),
962                nat_type: None,
963                region: None,
964            },
965            nat_config: crate::nat_traversal_api::NatTraversalConfig::default(),
966            bootstrap_nodes: vec![],
967            network_conditions: crate::monitoring::NetworkConditions {
968                rtt_ms: None,
969                packet_loss_rate: None,
970                bandwidth_mbps: None,
971                congestion_level: crate::monitoring::CongestionLevel::Low,
972            },
973        };
974        
975        // Test sampling decision
976        let should_sample = sampler.should_sample_nat_trace(&attempt).await;
977        // With 50% rate, result is probabilistic, so we just ensure it doesn't panic
978        assert!(should_sample || !should_sample);
979    }
980    
981    #[tokio::test]
982    async fn test_span_builder() {
983        let span_builder = SpanBuilder::new();
984        let trace_id = TraceId::new();
985        
986        // Create mock attempt
987        let attempt = NatTraversalAttempt {
988            attempt_id: "test".to_string(),
989            timestamp: SystemTime::now(),
990            client_info: crate::monitoring::EndpointInfo {
991                id: "client".to_string(),
992                role: crate::monitoring::EndpointRole::Client,
993                address_hash: "hash".to_string(),
994                nat_type: Some(crate::monitoring::NatType::FullCone),
995                region: Some("us-east".to_string()),
996            },
997            server_info: crate::monitoring::EndpointInfo {
998                id: "server".to_string(),
999                role: crate::monitoring::EndpointRole::Server,
1000                address_hash: "hash".to_string(),
1001                nat_type: Some(crate::monitoring::NatType::Symmetric),
1002                region: Some("eu-west".to_string()),
1003            },
1004            nat_config: crate::nat_traversal_api::NatTraversalConfig::default(),
1005            bootstrap_nodes: vec!["bootstrap1".to_string()],
1006            network_conditions: crate::monitoring::NetworkConditions {
1007                rtt_ms: Some(50),
1008                packet_loss_rate: Some(0.01),
1009                bandwidth_mbps: Some(100),
1010                congestion_level: crate::monitoring::CongestionLevel::Low,
1011            },
1012        };
1013        
1014        let span = span_builder.create_nat_traversal_span(&trace_id, None, &attempt).await.unwrap();
1015        
1016        assert_eq!(span.name, "nat_traversal");
1017        assert!(span.attributes.contains_key("nat.attempt_id"));
1018        assert!(span.attributes.contains_key("nat.client.region"));
1019        assert!(span.attributes.contains_key("nat.server.region"));
1020    }
1021}