1use std::{
7 collections::HashMap,
8 sync::Arc,
9 time::{Duration, Instant, SystemTime},
10};
11
12use tokio::sync::{RwLock, Mutex};
13use tracing::{debug, info, warn, Span};
14use uuid::Uuid;
15
16use crate::monitoring::{
17 MonitoringError, NatTraversalAttempt, NatTraversalResult,
18};
19
20pub struct DistributedTraceCollector {
22 config: TracingConfig,
24 active_traces: Arc<RwLock<HashMap<String, TraceContext>>>,
26 exporter: Arc<TraceExporter>,
28 sampler: Arc<TraceSampler>,
30 span_builder: Arc<SpanBuilder>,
32 correlation_manager: Arc<CorrelationManager>,
34}
35
36impl DistributedTraceCollector {
37 pub async fn new(config: TracingConfig) -> Result<Self, MonitoringError> {
39 let exporter = Arc::new(TraceExporter::new(config.export.clone()));
40 let sampler = Arc::new(TraceSampler::new(config.sampling.clone()));
41 let span_builder = Arc::new(SpanBuilder::new());
42 let correlation_manager = Arc::new(CorrelationManager::new());
43
44 Ok(Self {
45 config,
46 active_traces: Arc::new(RwLock::new(HashMap::new())),
47 exporter,
48 sampler,
49 span_builder,
50 correlation_manager,
51 })
52 }
53
54 pub async fn start(&self) -> Result<(), MonitoringError> {
56 info!("Starting distributed trace collector");
57
58 self.exporter.start().await?;
60
61 info!("Distributed trace collector started");
62 Ok(())
63 }
64
65 pub async fn stop(&self) -> Result<(), MonitoringError> {
67 info!("Stopping distributed trace collector");
68
69 self.flush_active_traces().await?;
71
72 self.exporter.stop().await?;
74
75 info!("Distributed trace collector stopped");
76 Ok(())
77 }
78
79 pub async fn start_nat_trace(&self, attempt: &NatTraversalAttempt) -> Result<TraceId, MonitoringError> {
81 if !self.sampler.should_sample_nat_trace(attempt).await {
83 return Ok(TraceId::new()); }
85
86 let trace_id = TraceId::new();
87 let correlation_id = self.correlation_manager.generate_correlation_id().await;
88
89 let root_span = self.span_builder.create_nat_traversal_span(
91 &trace_id,
92 None, attempt,
94 ).await?;
95
96 let trace_context = TraceContext {
98 trace_id: trace_id.clone(),
99 correlation_id,
100 root_span: root_span.clone(),
101 active_spans: HashMap::new(),
102 start_time: SystemTime::now(),
103 client_info: attempt.client_info.clone(),
104 server_info: attempt.server_info.clone(),
105 bootstrap_nodes: attempt.bootstrap_nodes.clone(),
106 events: Vec::new(),
107 };
108
109 {
111 let mut active_traces = self.active_traces.write().await;
112 active_traces.insert(attempt.attempt_id.clone(), trace_context);
113 }
114
115 debug!("Started NAT traversal trace: {} (attempt: {})", trace_id, attempt.attempt_id);
116 Ok(trace_id)
117 }
118
119 pub async fn complete_nat_trace(&self, result: &NatTraversalResult) -> Result<(), MonitoringError> {
121 let mut active_traces = self.active_traces.write().await;
122
123 if let Some(mut trace_context) = active_traces.remove(&result.attempt_id) {
124 let result_event = TraceEvent {
126 timestamp: SystemTime::now(),
127 event_type: TraceEventType::NatTraversalCompleted,
128 span_id: trace_context.root_span.span_id.clone(),
129 attributes: self.result_to_attributes(result),
130 duration: Some(result.duration),
131 };
132
133 trace_context.events.push(result_event);
134
135 trace_context.root_span.end_time = Some(SystemTime::now());
137 trace_context.root_span.status = if result.success {
138 SpanStatus::Ok
139 } else {
140 SpanStatus::Error
141 };
142
143 self.exporter.export_trace(trace_context).await?;
145
146 debug!("Completed NAT traversal trace for attempt: {}", result.attempt_id);
147 } else {
148 warn!("No active trace found for attempt: {}", result.attempt_id);
149 }
150
151 Ok(())
152 }
153
154 pub async fn add_span(
156 &self,
157 attempt_id: &str,
158 span_name: &str,
159 parent_span_id: Option<SpanId>,
160 attributes: HashMap<String, AttributeValue>,
161 ) -> Result<SpanId, MonitoringError> {
162 let mut active_traces = self.active_traces.write().await;
163
164 if let Some(trace_context) = active_traces.get_mut(attempt_id) {
165 let span = self.span_builder.create_child_span(
166 &trace_context.trace_id,
167 parent_span_id.as_ref().unwrap_or(&trace_context.root_span.span_id),
168 span_name,
169 attributes,
170 ).await?;
171
172 let span_id = span.span_id.clone();
173 trace_context.active_spans.insert(span_id.clone(), span);
174
175 Ok(span_id)
176 } else {
177 Err(MonitoringError::TracingError(
178 format!("No active trace found for attempt: {}", attempt_id)
179 ))
180 }
181 }
182
183 pub async fn add_event(
185 &self,
186 attempt_id: &str,
187 span_id: &SpanId,
188 event_type: TraceEventType,
189 attributes: HashMap<String, AttributeValue>,
190 ) -> Result<(), MonitoringError> {
191 let mut active_traces = self.active_traces.write().await;
192
193 if let Some(trace_context) = active_traces.get_mut(attempt_id) {
194 debug!("Adding event {:?} to trace for attempt: {}", event_type, attempt_id);
195
196 let event = TraceEvent {
197 timestamp: SystemTime::now(),
198 event_type,
199 span_id: span_id.clone(),
200 attributes,
201 duration: None,
202 };
203
204 trace_context.events.push(event);
205 } else {
206 warn!("No active trace found for attempt: {}", attempt_id);
207 }
208
209 Ok(())
210 }
211
212 pub async fn complete_span(
214 &self,
215 attempt_id: &str,
216 span_id: &SpanId,
217 status: SpanStatus,
218 ) -> Result<(), MonitoringError> {
219 let mut active_traces = self.active_traces.write().await;
220
221 if let Some(trace_context) = active_traces.get_mut(attempt_id) {
222 if let Some(span) = trace_context.active_spans.get_mut(span_id) {
223 span.end_time = Some(SystemTime::now());
224 span.status = status;
225
226 debug!("Completed span {} in trace for attempt: {}", span_id, attempt_id);
227 }
228 }
229
230 Ok(())
231 }
232
233 pub async fn get_status(&self) -> String {
235 let active_traces = self.active_traces.read().await;
236 format!("Active traces: {}", active_traces.len())
237 }
238
239 async fn flush_active_traces(&self) -> Result<(), MonitoringError> {
241 let mut active_traces = self.active_traces.write().await;
242
243 for (attempt_id, mut trace_context) in active_traces.drain() {
244 trace_context.root_span.status = SpanStatus::Cancelled;
246 trace_context.root_span.end_time = Some(SystemTime::now());
247
248 if let Err(e) = self.exporter.export_trace(trace_context).await {
250 warn!("Failed to export incomplete trace for {}: {}", attempt_id, e);
251 }
252 }
253
254 Ok(())
255 }
256
257 fn result_to_attributes(&self, result: &NatTraversalResult) -> HashMap<String, AttributeValue> {
259 let mut attributes = HashMap::new();
260
261 attributes.insert("nat.success".to_string(), AttributeValue::Bool(result.success));
262 attributes.insert("nat.duration_ms".to_string(), AttributeValue::Int(result.duration.as_millis() as i64));
263
264 if let Some(error_info) = &result.error_info {
265 attributes.insert("error.category".to_string(), AttributeValue::String(format!("{:?}", error_info.error_category)));
266 attributes.insert("error.code".to_string(), AttributeValue::String(error_info.error_code.clone()));
267 attributes.insert("error.message".to_string(), AttributeValue::String(error_info.error_message.clone()));
268 }
269
270 let perf = &result.performance_metrics;
271 attributes.insert("nat.connection_time_ms".to_string(), AttributeValue::Int(perf.connection_time_ms as i64));
272 attributes.insert("nat.candidates_tried".to_string(), AttributeValue::Int(perf.candidates_tried as i64));
273 attributes.insert("nat.round_trips".to_string(), AttributeValue::Int(perf.round_trips as i64));
274
275 if let Some(conn_info) = &result.connection_info {
276 attributes.insert("connection.latency_ms".to_string(), AttributeValue::Int(conn_info.quality.latency_ms as i64));
277 attributes.insert("connection.throughput_mbps".to_string(), AttributeValue::Float(conn_info.quality.throughput_mbps as f64));
278 attributes.insert("connection.path_type".to_string(), AttributeValue::String(format!("{:?}", conn_info.path.path_type)));
279 }
280
281 attributes
282 }
283}
284
285#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
287pub struct TracingConfig {
288 pub enabled: bool,
290 pub sampling: TraceSamplingConfig,
292 pub export: TraceExportConfig,
294 pub correlation: CorrelationConfig,
296 pub resource_limits: TraceResourceLimits,
298}
299
300impl Default for TracingConfig {
301 fn default() -> Self {
302 Self {
303 enabled: true,
304 sampling: TraceSamplingConfig::default(),
305 export: TraceExportConfig::default(),
306 correlation: CorrelationConfig::default(),
307 resource_limits: TraceResourceLimits::default(),
308 }
309 }
310}
311
312#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
314pub struct TraceSamplingConfig {
315 pub nat_traversal_rate: f64,
317 pub success_rate: f64,
319 pub failure_rate: f64,
321 pub adaptive: AdaptiveTraceSamplingConfig,
323}
324
325impl Default for TraceSamplingConfig {
326 fn default() -> Self {
327 Self {
328 nat_traversal_rate: 0.1, success_rate: 0.05, failure_rate: 1.0, adaptive: AdaptiveTraceSamplingConfig::default(),
332 }
333 }
334}
335
336#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
338pub struct AdaptiveTraceSamplingConfig {
339 pub enabled: bool,
341 pub target_traces_per_second: f64,
343 pub adjustment_interval: Duration,
345}
346
347impl Default for AdaptiveTraceSamplingConfig {
348 fn default() -> Self {
349 Self {
350 enabled: true,
351 target_traces_per_second: 100.0,
352 adjustment_interval: Duration::from_secs(60),
353 }
354 }
355}
356
357#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
359pub struct TraceExportConfig {
360 pub destinations: Vec<TraceExportDestination>,
362 pub batch_size: usize,
364 pub export_interval: Duration,
366 pub export_timeout: Duration,
368}
369
370impl Default for TraceExportConfig {
371 fn default() -> Self {
372 Self {
373 destinations: vec![TraceExportDestination::Jaeger {
374 endpoint: "http://localhost:14268/api/traces".to_string(),
375 }],
376 batch_size: 100,
377 export_interval: Duration::from_secs(10),
378 export_timeout: Duration::from_secs(30),
379 }
380 }
381}
382
383#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
385pub enum TraceExportDestination {
386 Jaeger { endpoint: String },
387 Zipkin { endpoint: String },
388 OTLP { endpoint: String },
389 CloudTrace { project_id: String },
390 XRay { region: String },
391}
392
393#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
395pub struct CorrelationConfig {
396 pub correlation_header: String,
398 pub cross_service: bool,
400 pub id_format: CorrelationIdFormat,
402}
403
404impl Default for CorrelationConfig {
405 fn default() -> Self {
406 Self {
407 correlation_header: "X-Correlation-ID".to_string(),
408 cross_service: true,
409 id_format: CorrelationIdFormat::UUID,
410 }
411 }
412}
413
414#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
416pub enum CorrelationIdFormat {
417 UUID,
418 Snowflake,
419 Custom(String),
420}
421
422#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
424pub struct TraceResourceLimits {
425 pub max_active_traces: usize,
427 pub max_spans_per_trace: usize,
429 pub max_events_per_trace: usize,
431 pub max_trace_duration: Duration,
433}
434
435impl Default for TraceResourceLimits {
436 fn default() -> Self {
437 Self {
438 max_active_traces: 10000,
439 max_spans_per_trace: 100,
440 max_events_per_trace: 500,
441 max_trace_duration: Duration::from_secs(300), }
443 }
444}
445
446#[derive(Debug, Clone, PartialEq, Eq, Hash)]
448#[allow(dead_code)] pub struct TraceId(String);
450
451impl TraceId {
452 fn new() -> Self {
453 Self(Uuid::new_v4().to_string())
454 }
455}
456
457impl std::fmt::Display for TraceId {
458 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
459 write!(f, "{}", self.0)
460 }
461}
462
463#[derive(Debug, Clone, PartialEq, Eq, Hash)]
465#[allow(dead_code)] pub struct SpanId(String);
467
468impl SpanId {
469 fn new() -> Self {
470 Self(Uuid::new_v4().to_string())
471 }
472}
473
474impl std::fmt::Display for SpanId {
475 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
476 write!(f, "{}", self.0)
477 }
478}
479
480#[derive(Debug, Clone)]
482#[allow(dead_code)] pub struct CorrelationId(String);
484
485impl CorrelationId {
486 fn new() -> Self {
487 Self(Uuid::new_v4().to_string())
488 }
489}
490
491impl std::fmt::Display for CorrelationId {
492 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
493 write!(f, "{}", self.0)
494 }
495}
496
497#[derive(Debug, Clone)]
499#[allow(dead_code)] struct TraceContext {
501 trace_id: TraceId,
503 correlation_id: CorrelationId,
505 root_span: TraceSpan,
507 active_spans: HashMap<SpanId, TraceSpan>,
509 start_time: SystemTime,
511 client_info: crate::monitoring::EndpointInfo,
513 server_info: crate::monitoring::EndpointInfo,
515 bootstrap_nodes: Vec<String>,
517 events: Vec<TraceEvent>,
519}
520
521#[derive(Debug, Clone)]
523#[allow(dead_code)] struct TraceSpan {
525 span_id: SpanId,
527 parent_span_id: Option<SpanId>,
529 name: String,
531 start_time: SystemTime,
533 end_time: Option<SystemTime>,
535 status: SpanStatus,
537 attributes: HashMap<String, AttributeValue>,
539 tags: HashMap<String, String>,
541}
542
543#[derive(Debug, Clone, PartialEq)]
545pub enum SpanStatus {
546 Ok,
547 Error,
548 Cancelled,
549 Timeout,
550}
551
552#[derive(Debug, Clone)]
554#[allow(dead_code)] struct TraceEvent {
556 timestamp: SystemTime,
558 event_type: TraceEventType,
560 span_id: SpanId,
562 attributes: HashMap<String, AttributeValue>,
564 duration: Option<Duration>,
566}
567
568#[derive(Debug, Clone)]
570#[allow(dead_code)] pub enum TraceEventType {
572 NatTraversalStarted,
573 NatTraversalCompleted,
574 CandidateDiscoveryStarted,
575 CandidateDiscoveryCompleted,
576 CandidateTestStarted,
577 CandidateTestCompleted,
578 HolePunchingStarted,
579 HolePunchingCompleted,
580 ConnectionEstablished,
581 BootstrapNodeContacted,
582 ErrorOccurred,
583 Custom(String),
584}
585
586#[derive(Debug, Clone)]
588#[allow(dead_code)] pub enum AttributeValue {
590 String(String),
591 Int(i64),
592 Float(f64),
593 Bool(bool),
594 Array(Vec<AttributeValue>),
595}
596
597struct TraceSampler {
599 config: TraceSamplingConfig,
600 current_rate: Arc<RwLock<f64>>,
601 traces_this_period: Arc<RwLock<u32>>,
602 last_adjustment: Arc<RwLock<Instant>>,
603}
604
605impl TraceSampler {
606 fn new(config: TraceSamplingConfig) -> Self {
607 Self {
608 current_rate: Arc::new(RwLock::new(config.nat_traversal_rate)),
609 config,
610 traces_this_period: Arc::new(RwLock::new(0)),
611 last_adjustment: Arc::new(RwLock::new(Instant::now())),
612 }
613 }
614
615 async fn should_sample_nat_trace(&self, _attempt: &NatTraversalAttempt) -> bool {
616 if self.config.adaptive.enabled {
618 self.adjust_sampling_rate().await;
619 }
620
621 let current_rate = *self.current_rate.read().await;
622 let should_sample = rand::random::<f64>() < current_rate;
623
624 if should_sample {
625 let mut traces_count = self.traces_this_period.write().await;
626 *traces_count += 1;
627 }
628
629 should_sample
630 }
631
632 async fn adjust_sampling_rate(&self) {
633 let mut last_adjustment = self.last_adjustment.write().await;
634
635 if last_adjustment.elapsed() < self.config.adaptive.adjustment_interval {
636 return;
637 }
638
639 let traces_count = {
640 let mut count = self.traces_this_period.write().await;
641 let current_count = *count;
642 *count = 0; current_count
644 };
645
646 let period_seconds = self.config.adaptive.adjustment_interval.as_secs_f64();
647 let current_traces_per_second = traces_count as f64 / period_seconds;
648 let target_traces_per_second = self.config.adaptive.target_traces_per_second;
649
650 let mut current_rate = self.current_rate.write().await;
651 let adjustment_factor = target_traces_per_second / current_traces_per_second.max(1.0);
652 *current_rate = (*current_rate * adjustment_factor).min(1.0).max(0.001);
653
654 *last_adjustment = Instant::now();
655
656 debug!("Adjusted trace sampling rate to {:.4} (current rate: {:.2} traces/sec, target: {:.2})",
657 *current_rate, current_traces_per_second, target_traces_per_second);
658 }
659}
660
661struct SpanBuilder;
663
664impl SpanBuilder {
665 fn new() -> Self {
666 Self
667 }
668
669 async fn create_nat_traversal_span(
670 &self,
671 _trace_id: &TraceId,
672 parent_span_id: Option<SpanId>,
673 attempt: &NatTraversalAttempt,
674 ) -> Result<TraceSpan, MonitoringError> {
675 let mut attributes = HashMap::new();
676
677 attributes.insert("nat.attempt_id".to_string(), AttributeValue::String(attempt.attempt_id.clone()));
679 attributes.insert("nat.client.region".to_string(), AttributeValue::String(
680 attempt.client_info.region.as_deref().unwrap_or("unknown").to_string()
681 ));
682 attributes.insert("nat.server.region".to_string(), AttributeValue::String(
683 attempt.server_info.region.as_deref().unwrap_or("unknown").to_string()
684 ));
685 attributes.insert("nat.bootstrap_nodes".to_string(), AttributeValue::Array(
686 attempt.bootstrap_nodes.iter()
687 .map(|node| AttributeValue::String(node.clone()))
688 .collect()
689 ));
690
691 if let Some(client_nat_type) = &attempt.client_info.nat_type {
692 attributes.insert("nat.client.type".to_string(), AttributeValue::String(format!("{:?}", client_nat_type)));
693 }
694
695 if let Some(server_nat_type) = &attempt.server_info.nat_type {
696 attributes.insert("nat.server.type".to_string(), AttributeValue::String(format!("{:?}", server_nat_type)));
697 }
698
699 if let Some(rtt) = attempt.network_conditions.rtt_ms {
701 attributes.insert("network.rtt_ms".to_string(), AttributeValue::Int(rtt as i64));
702 }
703
704 if let Some(loss_rate) = attempt.network_conditions.packet_loss_rate {
705 attributes.insert("network.packet_loss_rate".to_string(), AttributeValue::Float(loss_rate as f64));
706 }
707
708 Ok(TraceSpan {
709 span_id: SpanId::new(),
710 parent_span_id,
711 name: "nat_traversal".to_string(),
712 start_time: attempt.timestamp,
713 end_time: None,
714 status: SpanStatus::Ok,
715 attributes,
716 tags: HashMap::new(),
717 })
718 }
719
720 async fn create_child_span(
721 &self,
722 _trace_id: &TraceId,
723 parent_span_id: &SpanId,
724 span_name: &str,
725 attributes: HashMap<String, AttributeValue>,
726 ) -> Result<TraceSpan, MonitoringError> {
727 Ok(TraceSpan {
728 span_id: SpanId::new(),
729 parent_span_id: Some(parent_span_id.clone()),
730 name: span_name.to_string(),
731 start_time: SystemTime::now(),
732 end_time: None,
733 status: SpanStatus::Ok,
734 attributes,
735 tags: HashMap::new(),
736 })
737 }
738}
739
740struct TraceExporter {
742 config: TraceExportConfig,
743 pending_traces: Arc<Mutex<Vec<TraceContext>>>,
744}
745
746impl TraceExporter {
747 fn new(config: TraceExportConfig) -> Self {
748 Self {
749 config,
750 pending_traces: Arc::new(Mutex::new(Vec::new())),
751 }
752 }
753
754 async fn start(&self) -> Result<(), MonitoringError> {
755 info!("Trace exporter started");
757 Ok(())
758 }
759
760 async fn stop(&self) -> Result<(), MonitoringError> {
761 self.flush_pending_traces().await?;
763 info!("Trace exporter stopped");
764 Ok(())
765 }
766
767 async fn export_trace(&self, trace_context: TraceContext) -> Result<(), MonitoringError> {
768 {
770 let mut pending = self.pending_traces.lock().await;
771 pending.push(trace_context);
772
773 if pending.len() >= self.config.batch_size {
775 let traces = pending.drain(..).collect::<Vec<_>>();
776 drop(pending); self.export_batch(traces).await?;
778 }
779 }
780
781 Ok(())
782 }
783
784 async fn export_batch(&self, traces: Vec<TraceContext>) -> Result<(), MonitoringError> {
785 for destination in &self.config.destinations {
786 if let Err(e) = self.export_to_destination(destination, &traces).await {
787 warn!("Failed to export traces to {:?}: {}", destination, e);
788 }
789 }
790
791 debug!("Exported batch of {} traces", traces.len());
792 Ok(())
793 }
794
795 async fn export_to_destination(
796 &self,
797 destination: &TraceExportDestination,
798 traces: &[TraceContext],
799 ) -> Result<(), MonitoringError> {
800 match destination {
801 TraceExportDestination::Jaeger { endpoint } => {
802 self.export_to_jaeger(endpoint, traces).await
803 }
804 TraceExportDestination::Zipkin { endpoint } => {
805 self.export_to_zipkin(endpoint, traces).await
806 }
807 TraceExportDestination::OTLP { endpoint } => {
808 self.export_to_otlp(endpoint, traces).await
809 }
810 TraceExportDestination::CloudTrace { project_id } => {
811 self.export_to_cloud_trace(project_id, traces).await
812 }
813 TraceExportDestination::XRay { region } => {
814 self.export_to_xray(region, traces).await
815 }
816 }
817 }
818
819 async fn export_to_jaeger(&self, endpoint: &str, traces: &[TraceContext]) -> Result<(), MonitoringError> {
820 debug!("Exporting {} traces to Jaeger at {}", traces.len(), endpoint);
821 Ok(())
823 }
824
825 async fn export_to_zipkin(&self, endpoint: &str, traces: &[TraceContext]) -> Result<(), MonitoringError> {
826 debug!("Exporting {} traces to Zipkin at {}", traces.len(), endpoint);
827 Ok(())
829 }
830
831 async fn export_to_otlp(&self, endpoint: &str, traces: &[TraceContext]) -> Result<(), MonitoringError> {
832 debug!("Exporting {} traces to OTLP at {}", traces.len(), endpoint);
833 Ok(())
835 }
836
837 async fn export_to_cloud_trace(&self, project_id: &str, traces: &[TraceContext]) -> Result<(), MonitoringError> {
838 debug!("Exporting {} traces to Cloud Trace (project: {})", traces.len(), project_id);
839 Ok(())
841 }
842
843 async fn export_to_xray(&self, region: &str, traces: &[TraceContext]) -> Result<(), MonitoringError> {
844 debug!("Exporting {} traces to X-Ray (region: {})", traces.len(), region);
845 Ok(())
847 }
848
849 async fn flush_pending_traces(&self) -> Result<(), MonitoringError> {
850 let traces = {
851 let mut pending = self.pending_traces.lock().await;
852 pending.drain(..).collect::<Vec<_>>()
853 };
854
855 if !traces.is_empty() {
856 self.export_batch(traces).await?;
857 }
858
859 Ok(())
860 }
861}
862
863struct CorrelationManager {
865 current_correlation: Arc<RwLock<Option<CorrelationId>>>,
866}
867
868impl CorrelationManager {
869 fn new() -> Self {
870 Self {
871 current_correlation: Arc::new(RwLock::new(None)),
872 }
873 }
874
875 async fn generate_correlation_id(&self) -> CorrelationId {
876 let correlation_id = CorrelationId::new();
877
878 {
880 let mut current = self.current_correlation.write().await;
881 *current = Some(correlation_id.clone());
882 }
883
884 correlation_id
885 }
886
887 async fn get_current_correlation(&self) -> Option<CorrelationId> {
888 let current = self.current_correlation.read().await;
889 current.clone()
890 }
891}
892
893pub struct TracingUtils;
895
896impl TracingUtils {
897 pub fn create_span(name: &'static str) -> Span {
899 tracing::info_span!("{}", name)
900 }
901
902 pub fn add_span_attributes(attributes: HashMap<String, AttributeValue>) {
904 let span = Span::current();
905 for (key, value) in attributes {
906 match value {
907 AttributeValue::String(s) => { span.record(key.as_str(), &s); }
908 AttributeValue::Int(i) => { span.record(key.as_str(), &i); }
909 AttributeValue::Float(f) => { span.record(key.as_str(), &f); }
910 AttributeValue::Bool(b) => { span.record(key.as_str(), &b); }
911 _ => {} }
913 }
914 }
915
916 pub fn record_event(event_name: &str, attributes: HashMap<String, AttributeValue>) {
918 info!("Event: {} with {} attributes", event_name, attributes.len());
920 }
921}
922
923#[cfg(test)]
924mod tests {
925 use super::*;
926
927 #[tokio::test]
928 async fn test_trace_collector_creation() {
929 let config = TracingConfig::default();
930 let collector = DistributedTraceCollector::new(config).await.unwrap();
931
932 let status = collector.get_status().await;
933 assert!(status.contains("Active traces: 0"));
934 }
935
936 #[tokio::test]
937 async fn test_trace_sampling() {
938 let config = TraceSamplingConfig {
939 nat_traversal_rate: 0.5, success_rate: 0.5,
941 failure_rate: 1.0,
942 adaptive: AdaptiveTraceSamplingConfig::default(),
943 };
944
945 let sampler = TraceSampler::new(config);
946
947 let attempt = NatTraversalAttempt {
949 attempt_id: "test".to_string(),
950 timestamp: SystemTime::now(),
951 client_info: crate::monitoring::EndpointInfo {
952 id: "client".to_string(),
953 role: crate::monitoring::EndpointRole::Client,
954 address_hash: "hash".to_string(),
955 nat_type: None,
956 region: None,
957 },
958 server_info: crate::monitoring::EndpointInfo {
959 id: "server".to_string(),
960 role: crate::monitoring::EndpointRole::Server,
961 address_hash: "hash".to_string(),
962 nat_type: None,
963 region: None,
964 },
965 nat_config: crate::nat_traversal_api::NatTraversalConfig::default(),
966 bootstrap_nodes: vec![],
967 network_conditions: crate::monitoring::NetworkConditions {
968 rtt_ms: None,
969 packet_loss_rate: None,
970 bandwidth_mbps: None,
971 congestion_level: crate::monitoring::CongestionLevel::Low,
972 },
973 };
974
975 let should_sample = sampler.should_sample_nat_trace(&attempt).await;
977 assert!(should_sample || !should_sample);
979 }
980
981 #[tokio::test]
982 async fn test_span_builder() {
983 let span_builder = SpanBuilder::new();
984 let trace_id = TraceId::new();
985
986 let attempt = NatTraversalAttempt {
988 attempt_id: "test".to_string(),
989 timestamp: SystemTime::now(),
990 client_info: crate::monitoring::EndpointInfo {
991 id: "client".to_string(),
992 role: crate::monitoring::EndpointRole::Client,
993 address_hash: "hash".to_string(),
994 nat_type: Some(crate::monitoring::NatType::FullCone),
995 region: Some("us-east".to_string()),
996 },
997 server_info: crate::monitoring::EndpointInfo {
998 id: "server".to_string(),
999 role: crate::monitoring::EndpointRole::Server,
1000 address_hash: "hash".to_string(),
1001 nat_type: Some(crate::monitoring::NatType::Symmetric),
1002 region: Some("eu-west".to_string()),
1003 },
1004 nat_config: crate::nat_traversal_api::NatTraversalConfig::default(),
1005 bootstrap_nodes: vec!["bootstrap1".to_string()],
1006 network_conditions: crate::monitoring::NetworkConditions {
1007 rtt_ms: Some(50),
1008 packet_loss_rate: Some(0.01),
1009 bandwidth_mbps: Some(100),
1010 congestion_level: crate::monitoring::CongestionLevel::Low,
1011 },
1012 };
1013
1014 let span = span_builder.create_nat_traversal_span(&trace_id, None, &attempt).await.unwrap();
1015
1016 assert_eq!(span.name, "nat_traversal");
1017 assert!(span.attributes.contains_key("nat.attempt_id"));
1018 assert!(span.attributes.contains_key("nat.client.region"));
1019 assert!(span.attributes.contains_key("nat.server.region"));
1020 }
1021}