sklears_compose/
distributed_tracing.rs

1//! Distributed tracing system for pipeline execution monitoring
2//!
3//! This module provides comprehensive distributed tracing capabilities for
4//! monitoring pipeline execution across different processes, threads, and
5//! potentially different machines. It implements a custom tracing protocol
6//! that can track execution flow, dependencies, and performance metrics.
7
8use sklears_core::traits::Estimator;
9use std::collections::{HashMap, VecDeque};
10use std::sync::{Arc, Mutex, RwLock};
11use std::time::{Duration, SystemTime, UNIX_EPOCH};
12use uuid::Uuid;
13
14/// Distributed tracing system for monitoring pipeline execution across services
15pub struct DistributedTracer {
16    /// Tracer configuration
17    config: TracingConfig,
18    /// Active spans storage
19    spans: Arc<RwLock<HashMap<String, TraceSpan>>>,
20    /// Completed traces storage
21    traces: Arc<Mutex<VecDeque<Trace>>>,
22    /// Span relationships (parent-child)
23    relationships: Arc<RwLock<HashMap<String, Vec<String>>>>,
24    /// Service registry for multi-service tracing
25    services: Arc<RwLock<HashMap<String, ServiceInfo>>>,
26    /// Trace exporters for external systems
27    exporters: Vec<Box<dyn TraceExporter>>,
28}
29
30impl DistributedTracer {
31    /// Create a new distributed tracer
32    #[must_use]
33    pub fn new(config: TracingConfig) -> Self {
34        Self {
35            config,
36            spans: Arc::new(RwLock::new(HashMap::new())),
37            traces: Arc::new(Mutex::new(VecDeque::new())),
38            relationships: Arc::new(RwLock::new(HashMap::new())),
39            services: Arc::new(RwLock::new(HashMap::new())),
40            exporters: Vec::new(),
41        }
42    }
43
44    /// Add trace exporter
45    pub fn add_exporter(&mut self, exporter: Box<dyn TraceExporter>) {
46        self.exporters.push(exporter);
47    }
48
49    /// Register a service
50    pub fn register_service(&self, service_id: &str, service_info: ServiceInfo) {
51        if let Ok(mut services) = self.services.write() {
52            services.insert(service_id.to_string(), service_info);
53        }
54    }
55
56    /// Start a new root trace
57    #[must_use]
58    pub fn start_trace(&self, operation_name: &str, service_id: &str) -> TraceHandle {
59        let trace_id = Uuid::new_v4().to_string();
60        let span_id = Uuid::new_v4().to_string();
61
62        let span = TraceSpan {
63            trace_id: trace_id.clone(),
64            span_id: span_id.clone(),
65            parent_span_id: None,
66            operation_name: operation_name.to_string(),
67            service_id: service_id.to_string(),
68            start_time: SystemTime::now()
69                .duration_since(UNIX_EPOCH)
70                .unwrap()
71                .as_nanos() as u64,
72            end_time: None,
73            duration_ns: None,
74            status: SpanStatus::Active,
75            tags: HashMap::new(),
76            logs: Vec::new(),
77            baggage: HashMap::new(),
78        };
79
80        if let Ok(mut spans) = self.spans.write() {
81            spans.insert(span_id.clone(), span);
82        }
83
84        TraceHandle::new(
85            trace_id,
86            span_id,
87            self.spans.clone(),
88            self.relationships.clone(),
89            self.traces.clone(),
90            !self.exporters.is_empty(),
91        )
92    }
93
94    /// Start a child span
95    pub fn start_child_span(
96        &self,
97        parent_handle: &TraceHandle,
98        operation_name: &str,
99        service_id: &str,
100    ) -> TraceHandle {
101        let span_id = Uuid::new_v4().to_string();
102
103        let span = TraceSpan {
104            trace_id: parent_handle.trace_id.clone(),
105            span_id: span_id.clone(),
106            parent_span_id: Some(parent_handle.span_id.clone()),
107            operation_name: operation_name.to_string(),
108            service_id: service_id.to_string(),
109            start_time: SystemTime::now()
110                .duration_since(UNIX_EPOCH)
111                .unwrap()
112                .as_nanos() as u64,
113            end_time: None,
114            duration_ns: None,
115            status: SpanStatus::Active,
116            tags: HashMap::new(),
117            logs: Vec::new(),
118            baggage: HashMap::new(),
119        };
120
121        if let Ok(mut spans) = self.spans.write() {
122            spans.insert(span_id.clone(), span);
123        }
124
125        // Record parent-child relationship
126        if let Ok(mut relationships) = self.relationships.write() {
127            relationships
128                .entry(parent_handle.span_id.clone())
129                .or_insert_with(Vec::new)
130                .push(span_id.clone());
131        }
132
133        TraceHandle::new(
134            parent_handle.trace_id.clone(),
135            span_id,
136            self.spans.clone(),
137            self.relationships.clone(),
138            self.traces.clone(),
139            !self.exporters.is_empty(),
140        )
141    }
142
143    /// Get a completed trace by ID
144    #[must_use]
145    pub fn get_trace(&self, trace_id: &str) -> Option<Trace> {
146        if let Ok(traces) = self.traces.lock() {
147            traces.iter().find(|t| t.trace_id == trace_id).cloned()
148        } else {
149            None
150        }
151    }
152
153    /// Get all active spans
154    #[must_use]
155    pub fn get_active_spans(&self) -> Vec<TraceSpan> {
156        if let Ok(spans) = self.spans.read() {
157            spans
158                .values()
159                .filter(|s| s.status == SpanStatus::Active)
160                .cloned()
161                .collect()
162        } else {
163            Vec::new()
164        }
165    }
166
167    /// Analyze trace performance
168    #[must_use]
169    pub fn analyze_trace_performance(&self, trace_id: &str) -> Option<TraceAnalysis> {
170        self.get_trace(trace_id)
171            .map(|trace| TraceAnalysis::from_trace(&trace))
172    }
173
174    /// Export traces to configured exporters
175    pub fn export_traces(&mut self) {
176        if let Ok(mut traces) = self.traces.lock() {
177            let traces_to_export: Vec<Trace> = traces.drain(..).collect();
178
179            for exporter in &mut self.exporters {
180                for trace in &traces_to_export {
181                    let _ = exporter.export_trace(trace);
182                }
183            }
184        }
185    }
186
187    /// Get trace statistics
188    #[must_use]
189    pub fn get_trace_statistics(&self) -> TraceStatistics {
190        let mut stats = TraceStatistics::default();
191
192        if let Ok(traces) = self.traces.lock() {
193            stats.total_traces = traces.len();
194
195            let mut total_duration = 0u64;
196            let mut service_counts = HashMap::new();
197            let mut operation_counts = HashMap::new();
198
199            for trace in traces.iter() {
200                if let Some(duration) = trace.total_duration_ns {
201                    total_duration += duration;
202                }
203
204                for span in &trace.spans {
205                    *service_counts.entry(span.service_id.clone()).or_insert(0) += 1;
206                    *operation_counts
207                        .entry(span.operation_name.clone())
208                        .or_insert(0) += 1;
209                }
210            }
211
212            if stats.total_traces > 0 {
213                stats.average_duration_ns = total_duration / stats.total_traces as u64;
214            }
215
216            stats.service_counts = service_counts;
217            stats.operation_counts = operation_counts;
218        }
219
220        if let Ok(spans) = self.spans.read() {
221            stats.active_spans = spans.len();
222        }
223
224        stats
225    }
226
227    /// Clean up old completed traces
228    pub fn cleanup_old_traces(&self, retention_period: Duration) {
229        let cutoff = SystemTime::now()
230            .duration_since(UNIX_EPOCH)
231            .unwrap()
232            .as_nanos() as u64
233            - retention_period.as_nanos() as u64;
234
235        if let Ok(mut traces) = self.traces.lock() {
236            traces.retain(|trace| trace.spans.iter().any(|span| span.start_time > cutoff));
237        }
238    }
239}
240
241/// Tracing configuration
242#[derive(Debug, Clone)]
243pub struct TracingConfig {
244    /// Service name for this tracer instance
245    pub service_name: String,
246    /// Maximum number of traces to retain
247    pub max_traces: usize,
248    /// Maximum number of spans per trace
249    pub max_spans_per_trace: usize,
250    /// Sampling rate (0.0 to 1.0)
251    pub sampling_rate: f64,
252    /// Enable baggage propagation
253    pub enable_baggage: bool,
254    /// Auto-export interval
255    pub export_interval: Duration,
256    /// Trace retention period
257    pub retention_period: Duration,
258}
259
260impl TracingConfig {
261    /// Create new tracing configuration
262    #[must_use]
263    pub fn new(service_name: &str) -> Self {
264        Self {
265            service_name: service_name.to_string(),
266            max_traces: 1000,
267            max_spans_per_trace: 100,
268            sampling_rate: 1.0,
269            enable_baggage: true,
270            export_interval: Duration::from_secs(30),
271            retention_period: Duration::from_secs(3600), // 1 hour
272        }
273    }
274
275    /// Set maximum traces
276    #[must_use]
277    pub fn max_traces(mut self, max: usize) -> Self {
278        self.max_traces = max;
279        self
280    }
281
282    /// Set sampling rate
283    #[must_use]
284    pub fn sampling_rate(mut self, rate: f64) -> Self {
285        self.sampling_rate = rate.max(0.0).min(1.0);
286        self
287    }
288
289    /// Set export interval
290    #[must_use]
291    pub fn export_interval(mut self, interval: Duration) -> Self {
292        self.export_interval = interval;
293        self
294    }
295}
296
297impl Default for TracingConfig {
298    fn default() -> Self {
299        Self::new("default-service")
300    }
301}
302
303/// Handle for managing a trace span
304pub struct TraceHandle {
305    /// Trace ID
306    pub trace_id: String,
307    /// Span ID
308    pub span_id: String,
309    /// Reference to spans storage
310    spans: Arc<RwLock<HashMap<String, TraceSpan>>>,
311    /// Reference to relationships storage
312    relationships: Arc<RwLock<HashMap<String, Vec<String>>>>,
313    /// Reference to traces storage
314    traces: Arc<Mutex<VecDeque<Trace>>>,
315    /// Whether to export traces
316    should_export: bool,
317}
318
319impl TraceHandle {
320    /// Create new trace handle
321    fn new(
322        trace_id: String,
323        span_id: String,
324        spans: Arc<RwLock<HashMap<String, TraceSpan>>>,
325        relationships: Arc<RwLock<HashMap<String, Vec<String>>>>,
326        traces: Arc<Mutex<VecDeque<Trace>>>,
327        should_export: bool,
328    ) -> Self {
329        Self {
330            trace_id,
331            span_id,
332            spans,
333            relationships,
334            traces,
335            should_export,
336        }
337    }
338
339    /// Add a tag to the span
340    pub fn set_tag(&self, key: &str, value: &str) {
341        if let Ok(mut spans) = self.spans.write() {
342            if let Some(span) = spans.get_mut(&self.span_id) {
343                span.tags.insert(key.to_string(), value.to_string());
344            }
345        }
346    }
347
348    /// Add baggage (cross-process data)
349    pub fn set_baggage(&self, key: &str, value: &str) {
350        if let Ok(mut spans) = self.spans.write() {
351            if let Some(span) = spans.get_mut(&self.span_id) {
352                span.baggage.insert(key.to_string(), value.to_string());
353            }
354        }
355    }
356
357    /// Get baggage value
358    #[must_use]
359    pub fn get_baggage(&self, key: &str) -> Option<String> {
360        if let Ok(spans) = self.spans.read() {
361            spans.get(&self.span_id)?.baggage.get(key).cloned()
362        } else {
363            None
364        }
365    }
366
367    /// Log an event
368    pub fn log_event(&self, message: &str, level: LogLevel) {
369        let log_entry = LogEntry {
370            timestamp: SystemTime::now()
371                .duration_since(UNIX_EPOCH)
372                .unwrap()
373                .as_nanos() as u64,
374            level,
375            message: message.to_string(),
376            fields: HashMap::new(),
377        };
378
379        if let Ok(mut spans) = self.spans.write() {
380            if let Some(span) = spans.get_mut(&self.span_id) {
381                span.logs.push(log_entry);
382            }
383        }
384    }
385
386    /// Set span status
387    pub fn set_status(&self, status: SpanStatus) {
388        if let Ok(mut spans) = self.spans.write() {
389            if let Some(span) = spans.get_mut(&self.span_id) {
390                span.status = status;
391            }
392        }
393    }
394
395    /// Record an error
396    pub fn record_error(&self, error: &str) {
397        self.set_tag("error", "true");
398        self.set_tag("error.message", error);
399        self.log_event(&format!("Error: {error}"), LogLevel::Error);
400        self.set_status(SpanStatus::Error);
401    }
402
403    /// Finish the span
404    pub fn finish(self) {
405        let end_time = SystemTime::now()
406            .duration_since(UNIX_EPOCH)
407            .unwrap()
408            .as_nanos() as u64;
409
410        if let Ok(mut spans) = self.spans.write() {
411            if let Some(span) = spans.get_mut(&self.span_id) {
412                span.end_time = Some(end_time);
413                span.duration_ns = Some(end_time - span.start_time);
414                if span.status == SpanStatus::Active {
415                    span.status = SpanStatus::Completed;
416                }
417            }
418        }
419
420        // Check if this completes a trace
421        self.check_trace_completion();
422    }
423
424    /// Check if trace is complete and move to completed traces
425    fn check_trace_completion(&self) {
426        let mut all_spans_complete = true;
427        let mut trace_spans = Vec::new();
428
429        if let Ok(spans) = self.spans.read() {
430            for span in spans.values() {
431                if span.trace_id == self.trace_id {
432                    if span.status == SpanStatus::Active {
433                        all_spans_complete = false;
434                    }
435                    trace_spans.push(span.clone());
436                }
437            }
438        }
439
440        if all_spans_complete && !trace_spans.is_empty() {
441            let mut total_duration = 0u64;
442            let mut root_span = None;
443
444            for span in &trace_spans {
445                if let Some(duration) = span.duration_ns {
446                    total_duration = total_duration.max(duration);
447                }
448                if span.parent_span_id.is_none() {
449                    root_span = Some(span.clone());
450                }
451            }
452
453            let trace = Trace {
454                trace_id: self.trace_id.clone(),
455                spans: trace_spans,
456                root_span,
457                total_duration_ns: if total_duration > 0 {
458                    Some(total_duration)
459                } else {
460                    None
461                },
462                service_count: self.count_unique_services(),
463                start_time: self.get_trace_start_time(),
464                end_time: self.get_trace_end_time(),
465            };
466
467            if let Ok(mut traces) = self.traces.lock() {
468                traces.push_back(trace);
469
470                // Remove completed spans from active spans
471                if let Ok(mut spans) = self.spans.write() {
472                    spans.retain(|_, span| span.trace_id != self.trace_id);
473                }
474            }
475        }
476    }
477
478    /// Count unique services in the trace
479    fn count_unique_services(&self) -> usize {
480        if let Ok(spans) = self.spans.read() {
481            let services: std::collections::HashSet<_> = spans
482                .values()
483                .filter(|span| span.trace_id == self.trace_id)
484                .map(|span| &span.service_id)
485                .collect();
486            services.len()
487        } else {
488            0
489        }
490    }
491
492    /// Get trace start time
493    fn get_trace_start_time(&self) -> u64 {
494        if let Ok(spans) = self.spans.read() {
495            spans
496                .values()
497                .filter(|span| span.trace_id == self.trace_id)
498                .map(|span| span.start_time)
499                .min()
500                .unwrap_or(0)
501        } else {
502            0
503        }
504    }
505
506    /// Get trace end time
507    fn get_trace_end_time(&self) -> Option<u64> {
508        if let Ok(spans) = self.spans.read() {
509            spans
510                .values()
511                .filter(|span| span.trace_id == self.trace_id)
512                .filter_map(|span| span.end_time)
513                .max()
514        } else {
515            None
516        }
517    }
518}
519
520/// Individual span in a distributed trace
521#[derive(Debug, Clone)]
522pub struct TraceSpan {
523    /// Unique trace identifier
524    pub trace_id: String,
525    /// Unique span identifier
526    pub span_id: String,
527    /// Parent span ID (None for root spans)
528    pub parent_span_id: Option<String>,
529    /// Operation name
530    pub operation_name: String,
531    /// Service identifier
532    pub service_id: String,
533    /// Start timestamp (nanoseconds since epoch)
534    pub start_time: u64,
535    /// End timestamp (nanoseconds since epoch)
536    pub end_time: Option<u64>,
537    /// Duration in nanoseconds
538    pub duration_ns: Option<u64>,
539    /// Span status
540    pub status: SpanStatus,
541    /// Key-value tags
542    pub tags: HashMap<String, String>,
543    /// Log entries
544    pub logs: Vec<LogEntry>,
545    /// Cross-process baggage
546    pub baggage: HashMap<String, String>,
547}
548
549/// Span status
550#[derive(Debug, Clone, PartialEq)]
551pub enum SpanStatus {
552    /// Span is currently active
553    Active,
554    /// Span completed successfully
555    Completed,
556    /// Span completed with error
557    Error,
558    /// Span was cancelled
559    Cancelled,
560}
561
562/// Log entry within a span
563#[derive(Debug, Clone)]
564pub struct LogEntry {
565    /// Timestamp (nanoseconds since epoch)
566    pub timestamp: u64,
567    /// Log level
568    pub level: LogLevel,
569    /// Log message
570    pub message: String,
571    /// Additional fields
572    pub fields: HashMap<String, String>,
573}
574
575/// Log levels
576#[derive(Debug, Clone, PartialEq)]
577pub enum LogLevel {
578    /// Debug information
579    Debug,
580    /// Informational messages
581    Info,
582    /// Warning messages
583    Warn,
584    /// Error messages
585    Error,
586}
587
588/// Complete trace containing all spans
589#[derive(Debug, Clone)]
590pub struct Trace {
591    /// Unique trace identifier
592    pub trace_id: String,
593    /// All spans in the trace
594    pub spans: Vec<TraceSpan>,
595    /// Root span (entry point)
596    pub root_span: Option<TraceSpan>,
597    /// Total trace duration
598    pub total_duration_ns: Option<u64>,
599    /// Number of unique services
600    pub service_count: usize,
601    /// Trace start time
602    pub start_time: u64,
603    /// Trace end time
604    pub end_time: Option<u64>,
605}
606
607/// Service information for multi-service tracing
608#[derive(Debug, Clone)]
609pub struct ServiceInfo {
610    /// Service name
611    pub name: String,
612    /// Service version
613    pub version: String,
614    /// Service endpoint/address
615    pub address: String,
616    /// Service metadata
617    pub metadata: HashMap<String, String>,
618}
619
620/// Trace analysis results
621#[derive(Debug, Clone)]
622pub struct TraceAnalysis {
623    /// Trace duration
624    pub total_duration_ns: u64,
625    /// Critical path (longest sequential chain)
626    pub critical_path: Vec<String>,
627    /// Service breakdown
628    pub service_breakdown: HashMap<String, ServiceAnalysis>,
629    /// Bottlenecks identified
630    pub bottlenecks: Vec<Bottleneck>,
631    /// Parallelism analysis
632    pub parallelism_factor: f64,
633}
634
635impl TraceAnalysis {
636    /// Create analysis from trace
637    fn from_trace(trace: &Trace) -> Self {
638        let total_duration_ns = trace.total_duration_ns.unwrap_or(0);
639        let critical_path = Self::find_critical_path(trace);
640        let service_breakdown = Self::analyze_services(trace);
641        let bottlenecks = Self::identify_bottlenecks(trace);
642        let parallelism_factor = Self::calculate_parallelism(trace);
643
644        Self {
645            total_duration_ns,
646            critical_path,
647            service_breakdown,
648            bottlenecks,
649            parallelism_factor,
650        }
651    }
652
653    /// Find the critical path (longest sequential execution chain)
654    fn find_critical_path(trace: &Trace) -> Vec<String> {
655        // Simplified critical path analysis
656        let mut path = Vec::new();
657
658        if let Some(root) = &trace.root_span {
659            path.push(root.operation_name.clone());
660            // In a real implementation, we'd build a dependency graph
661            // and find the longest path through it
662        }
663
664        path
665    }
666
667    /// Analyze service performance
668    fn analyze_services(trace: &Trace) -> HashMap<String, ServiceAnalysis> {
669        let mut analysis = HashMap::new();
670
671        for span in &trace.spans {
672            let entry =
673                analysis
674                    .entry(span.service_id.clone())
675                    .or_insert_with(|| ServiceAnalysis {
676                        total_duration_ns: 0,
677                        span_count: 0,
678                        error_count: 0,
679                        operations: HashMap::new(),
680                    });
681
682            entry.span_count += 1;
683            if let Some(duration) = span.duration_ns {
684                entry.total_duration_ns += duration;
685            }
686            if span.status == SpanStatus::Error {
687                entry.error_count += 1;
688            }
689
690            *entry
691                .operations
692                .entry(span.operation_name.clone())
693                .or_insert(0) += 1;
694        }
695
696        analysis
697    }
698
699    /// Identify performance bottlenecks
700    fn identify_bottlenecks(trace: &Trace) -> Vec<Bottleneck> {
701        let mut bottlenecks = Vec::new();
702
703        // Find spans that take unusually long
704        let durations: Vec<u64> = trace.spans.iter().filter_map(|s| s.duration_ns).collect();
705
706        if let Some(max_duration) = durations.iter().max() {
707            let avg_duration = durations.iter().sum::<u64>() / durations.len() as u64;
708
709            for span in &trace.spans {
710                if let Some(duration) = span.duration_ns {
711                    if duration > avg_duration * 3 {
712                        bottlenecks.push(Bottleneck {
713                            span_id: span.span_id.clone(),
714                            operation_name: span.operation_name.clone(),
715                            service_id: span.service_id.clone(),
716                            duration_ns: duration,
717                            severity: if duration == *max_duration {
718                                BottleneckSeverity::Critical
719                            } else {
720                                BottleneckSeverity::High
721                            },
722                        });
723                    }
724                }
725            }
726        }
727
728        bottlenecks
729    }
730
731    /// Calculate parallelism factor
732    fn calculate_parallelism(trace: &Trace) -> f64 {
733        if trace.spans.is_empty() {
734            return 1.0;
735        }
736
737        let total_work: u64 = trace.spans.iter().filter_map(|s| s.duration_ns).sum();
738
739        let total_duration = trace.total_duration_ns.unwrap_or(1);
740
741        if total_duration == 0 {
742            1.0
743        } else {
744            total_work as f64 / total_duration as f64
745        }
746    }
747}
748
749/// Service analysis within a trace
750#[derive(Debug, Clone)]
751pub struct ServiceAnalysis {
752    /// Total time spent in this service
753    pub total_duration_ns: u64,
754    /// Number of spans for this service
755    pub span_count: usize,
756    /// Number of errors in this service
757    pub error_count: usize,
758    /// Operations performed by this service
759    pub operations: HashMap<String, usize>,
760}
761
762/// Performance bottleneck identification
763#[derive(Debug, Clone)]
764pub struct Bottleneck {
765    /// Span ID of the bottleneck
766    pub span_id: String,
767    /// Operation name
768    pub operation_name: String,
769    /// Service ID
770    pub service_id: String,
771    /// Duration that makes it a bottleneck
772    pub duration_ns: u64,
773    /// Severity level
774    pub severity: BottleneckSeverity,
775}
776
777/// Bottleneck severity
778#[derive(Debug, Clone, PartialEq)]
779pub enum BottleneckSeverity {
780    /// Minor bottleneck
781    Low,
782    /// Moderate bottleneck
783    Medium,
784    /// Significant bottleneck
785    High,
786    /// Critical bottleneck requiring immediate attention
787    Critical,
788}
789
790/// Trace statistics
791#[derive(Debug, Clone, Default)]
792pub struct TraceStatistics {
793    /// Total number of completed traces
794    pub total_traces: usize,
795    /// Number of currently active spans
796    pub active_spans: usize,
797    /// Average trace duration
798    pub average_duration_ns: u64,
799    /// Count of spans per service
800    pub service_counts: HashMap<String, usize>,
801    /// Count of spans per operation
802    pub operation_counts: HashMap<String, usize>,
803}
804
805/// Trait for exporting traces to external systems
806pub trait TraceExporter: Send + Sync {
807    /// Export a single trace
808    fn export_trace(&mut self, trace: &Trace) -> Result<(), Box<dyn std::error::Error>>;
809
810    /// Flush any pending exports
811    fn flush(&mut self) -> Result<(), Box<dyn std::error::Error>>;
812}
813
814/// Console trace exporter for debugging
815pub struct ConsoleTraceExporter {
816    /// Whether to include detailed span information
817    pub verbose: bool,
818}
819
820impl ConsoleTraceExporter {
821    /// Create new console exporter
822    #[must_use]
823    pub fn new(verbose: bool) -> Self {
824        Self { verbose }
825    }
826}
827
828impl TraceExporter for ConsoleTraceExporter {
829    fn export_trace(&mut self, trace: &Trace) -> Result<(), Box<dyn std::error::Error>> {
830        println!("=== Trace {} ===", trace.trace_id);
831        println!("Duration: {:?}ns", trace.total_duration_ns);
832        println!("Services: {}", trace.service_count);
833        println!("Spans: {}", trace.spans.len());
834
835        if self.verbose {
836            for span in &trace.spans {
837                println!(
838                    "  {} [{}] {}ms in {}",
839                    span.operation_name,
840                    &span.span_id[..8],
841                    span.duration_ns.unwrap_or(0) / 1_000_000,
842                    span.service_id
843                );
844
845                for (key, value) in &span.tags {
846                    println!("    {key}: {value}");
847                }
848            }
849        }
850
851        Ok(())
852    }
853
854    fn flush(&mut self) -> Result<(), Box<dyn std::error::Error>> {
855        Ok(())
856    }
857}
858
859/// JSON file trace exporter
860pub struct JsonFileTraceExporter {
861    /// File path for export
862    pub file_path: String,
863}
864
865impl JsonFileTraceExporter {
866    /// Create new JSON file exporter
867    #[must_use]
868    pub fn new(file_path: &str) -> Self {
869        Self {
870            file_path: file_path.to_string(),
871        }
872    }
873}
874
875impl TraceExporter for JsonFileTraceExporter {
876    fn export_trace(&mut self, trace: &Trace) -> Result<(), Box<dyn std::error::Error>> {
877        // In a real implementation, we'd serialize the trace to JSON
878        // and append it to the file
879        println!(
880            "Would export trace {} to {}",
881            trace.trace_id, self.file_path
882        );
883        Ok(())
884    }
885
886    fn flush(&mut self) -> Result<(), Box<dyn std::error::Error>> {
887        Ok(())
888    }
889}
890
891#[allow(non_snake_case)]
892#[cfg(test)]
893mod tests {
894    use super::*;
895    use std::thread;
896    use std::time::Duration;
897
898    #[test]
899    fn test_distributed_tracer_creation() {
900        let config = TracingConfig::new("test-service");
901        let tracer = DistributedTracer::new(config);
902
903        let stats = tracer.get_trace_statistics();
904        assert_eq!(stats.total_traces, 0);
905        assert_eq!(stats.active_spans, 0);
906    }
907
908    #[test]
909    fn test_trace_creation_and_completion() {
910        let config = TracingConfig::new("test-service");
911        let tracer = DistributedTracer::new(config);
912
913        let handle = tracer.start_trace("test-operation", "test-service");
914        handle.set_tag("version", "1.0");
915        handle.log_event("Starting operation", LogLevel::Info);
916
917        thread::sleep(Duration::from_millis(10));
918        handle.finish();
919
920        thread::sleep(Duration::from_millis(10));
921        let stats = tracer.get_trace_statistics();
922        assert_eq!(stats.total_traces, 1);
923    }
924
925    #[test]
926    fn test_child_span_creation() {
927        let config = TracingConfig::new("test-service");
928        let tracer = DistributedTracer::new(config);
929
930        let parent_handle = tracer.start_trace("parent-operation", "test-service");
931        let child_handle =
932            tracer.start_child_span(&parent_handle, "child-operation", "test-service");
933
934        child_handle.set_tag("child", "true");
935        child_handle.finish();
936        parent_handle.finish();
937
938        thread::sleep(Duration::from_millis(10));
939        let stats = tracer.get_trace_statistics();
940        assert_eq!(stats.total_traces, 1);
941    }
942
943    #[test]
944    fn test_baggage_propagation() {
945        let config = TracingConfig::new("test-service");
946        let tracer = DistributedTracer::new(config);
947
948        let handle = tracer.start_trace("test-operation", "test-service");
949        handle.set_baggage("user_id", "123");
950        handle.set_baggage("session_id", "abc");
951
952        assert_eq!(handle.get_baggage("user_id"), Some("123".to_string()));
953        assert_eq!(handle.get_baggage("session_id"), Some("abc".to_string()));
954        assert_eq!(handle.get_baggage("nonexistent"), None);
955
956        handle.finish();
957    }
958
959    #[test]
960    fn test_error_recording() {
961        let config = TracingConfig::new("test-service");
962        let tracer = DistributedTracer::new(config);
963
964        let handle = tracer.start_trace("failing-operation", "test-service");
965        handle.record_error("Something went wrong");
966        let trace_id = handle.trace_id.clone();
967        handle.finish();
968
969        thread::sleep(Duration::from_millis(10));
970
971        if let Some(trace) = tracer.get_trace(&trace_id) {
972            let span = &trace.spans[0];
973            assert_eq!(span.status, SpanStatus::Error);
974            assert_eq!(span.tags.get("error"), Some(&"true".to_string()));
975            assert!(span.tags.contains_key("error.message"));
976        }
977    }
978
979    #[test]
980    fn test_trace_analysis() {
981        let config = TracingConfig::new("test-service");
982        let tracer = DistributedTracer::new(config);
983
984        let handle = tracer.start_trace("complex-operation", "test-service");
985        thread::sleep(Duration::from_millis(50));
986        let trace_id = handle.trace_id.clone();
987        handle.finish();
988
989        thread::sleep(Duration::from_millis(10));
990
991        if let Some(analysis) = tracer.analyze_trace_performance(&trace_id) {
992            assert!(analysis.total_duration_ns > 0);
993            assert!(!analysis.critical_path.is_empty());
994        }
995    }
996
997    #[test]
998    fn test_service_registration() {
999        let config = TracingConfig::new("test-service");
1000        let tracer = DistributedTracer::new(config);
1001
1002        let service_info = ServiceInfo {
1003            name: "test-service".to_string(),
1004            version: "1.0.0".to_string(),
1005            address: "localhost:8080".to_string(),
1006            metadata: HashMap::new(),
1007        };
1008
1009        tracer.register_service("test-service", service_info);
1010
1011        // Service registration is stored internally
1012        assert_eq!(tracer.get_active_spans().len(), 0);
1013    }
1014
1015    #[test]
1016    fn test_console_trace_exporter() {
1017        let mut exporter = ConsoleTraceExporter::new(true);
1018
1019        let trace = Trace {
1020            trace_id: "test-trace".to_string(),
1021            spans: vec![],
1022            root_span: None,
1023            total_duration_ns: Some(1000000),
1024            service_count: 1,
1025            start_time: 123456789,
1026            end_time: Some(123456790),
1027        };
1028
1029        assert!(exporter.export_trace(&trace).is_ok());
1030        assert!(exporter.flush().is_ok());
1031    }
1032
1033    #[test]
1034    fn test_tracing_config() {
1035        let config = TracingConfig::new("my-service")
1036            .max_traces(500)
1037            .sampling_rate(0.8)
1038            .export_interval(Duration::from_secs(60));
1039
1040        assert_eq!(config.service_name, "my-service");
1041        assert_eq!(config.max_traces, 500);
1042        assert_eq!(config.sampling_rate, 0.8);
1043        assert_eq!(config.export_interval, Duration::from_secs(60));
1044    }
1045}