ant_quic/monitoring/
metrics.rs

1//! Production Metrics Collection
2//!
3//! This module implements high-performance metrics collection for NAT traversal
4//! operations with intelligent sampling, aggregation, and export capabilities.
5
6use std::{
7    collections::{HashMap, VecDeque},
8    sync::{
9        atomic::{AtomicU64, Ordering},
10        Arc,
11    },
12    time::{Duration, Instant, SystemTime},
13    net::SocketAddr,
14};
15
16use tokio::{
17    sync::{RwLock, Mutex},
18    time::interval,
19};
20use tracing::{debug, info, warn};
21
22use super::{
23    MonitoringError, NatTraversalAttempt, NatTraversalResult, MetricsSummary,
24};
25
26/// Production metrics collector with intelligent sampling
27pub struct ProductionMetricsCollector {
28    /// Metrics configuration
29    config: MetricsConfig,
30    /// Core metrics storage
31    metrics_store: Arc<MetricsStore>,
32    /// Sampling controller
33    sampler: Arc<AdaptiveSampler>,
34    /// Aggregation engine
35    aggregator: Arc<MetricsAggregator>,
36    /// Export manager
37    exporter: Arc<MetricsExporter>,
38    /// Circuit breaker for overload protection
39    circuit_breaker: Arc<CircuitBreaker>,
40    /// Collector state
41    state: Arc<RwLock<CollectorState>>,
42    /// Background tasks
43    tasks: Arc<Mutex<Vec<tokio::task::JoinHandle<()>>>>,
44}
45
46impl ProductionMetricsCollector {
47    /// Create new production metrics collector
48    pub async fn new(config: MetricsConfig) -> Result<Self, MonitoringError> {
49        let metrics_store = Arc::new(MetricsStore::new(config.storage.clone()));
50        let sampler = Arc::new(AdaptiveSampler::new(config.sampling.clone()));
51        let aggregator = Arc::new(MetricsAggregator::new(config.aggregation.clone()));
52        let exporter = Arc::new(MetricsExporter::new(config.export.clone()));
53        let circuit_breaker = Arc::new(CircuitBreaker::new(config.circuit_breaker.clone()));
54        
55        Ok(Self {
56            config,
57            metrics_store,
58            sampler,
59            aggregator,
60            exporter,
61            circuit_breaker,
62            state: Arc::new(RwLock::new(CollectorState::new())),
63            tasks: Arc::new(Mutex::new(Vec::new())),
64        })
65    }
66    
67    /// Start metrics collection
68    pub async fn start(&self) -> Result<(), MonitoringError> {
69        info!("Starting production metrics collector");
70        
71        // Update state
72        {
73            let mut state = self.state.write().await;
74            state.status = CollectorStatus::Starting;
75            state.start_time = Some(Instant::now());
76        }
77        
78        // Start background tasks
79        self.start_aggregation_task().await?;
80        self.start_export_task().await?;
81        self.start_cleanup_task().await?;
82        self.start_health_task().await?;
83        
84        // Update state to running
85        {
86            let mut state = self.state.write().await;
87            state.status = CollectorStatus::Running;
88        }
89        
90        info!("Production metrics collector started");
91        Ok(())
92    }
93    
94    /// Stop metrics collection
95    pub async fn stop(&self) -> Result<(), MonitoringError> {
96        info!("Stopping production metrics collector");
97        
98        // Update state
99        {
100            let mut state = self.state.write().await;
101            state.status = CollectorStatus::Stopping;
102        }
103        
104        // Stop background tasks
105        let mut tasks = self.tasks.lock().await;
106        for task in tasks.drain(..) {
107            task.abort();
108        }
109        
110        // Final export
111        self.exporter.flush().await?;
112        
113        // Update state
114        {
115            let mut state = self.state.write().await;
116            state.status = CollectorStatus::Stopped;
117        }
118        
119        info!("Production metrics collector stopped");
120        Ok(())
121    }
122    
123    /// Record NAT traversal attempt
124    pub async fn record_nat_attempt(&self, attempt: &NatTraversalAttempt) -> Result<(), MonitoringError> {
125        // Check circuit breaker
126        if !self.circuit_breaker.allow_request().await {
127            return Ok(()); // Fail fast during overload
128        }
129        
130        // Check sampling decision
131        if !self.sampler.should_sample_attempt(attempt).await {
132            return Ok(());
133        }
134        
135        // Record attempt metrics
136        let attempt_metric = AttemptMetric {
137            attempt_id: attempt.attempt_id.clone(),
138            timestamp: attempt.timestamp,
139            client_region: attempt.client_info.region.clone(),
140            server_region: attempt.server_info.region.clone(),
141            nat_types: (
142                attempt.client_info.nat_type.clone(),
143                attempt.server_info.nat_type.clone(),
144            ),
145            network_conditions: attempt.network_conditions.clone(),
146        };
147        
148        self.metrics_store.record_attempt(attempt_metric).await?;
149        
150        // Update counters
151        self.increment_counter("nat_attempts_total", &[
152            ("client_region", attempt.client_info.region.as_deref().unwrap_or("unknown")),
153            ("server_region", attempt.server_info.region.as_deref().unwrap_or("unknown")),
154        ]).await;
155        
156        Ok(())
157    }
158    
159    /// Record NAT traversal result
160    pub async fn record_nat_result(&self, result: &NatTraversalResult) -> Result<(), MonitoringError> {
161        // Always sample results (more important than attempts)
162        let sample_rate = if result.success { 0.1 } else { 1.0 }; // 10% success, 100% failures
163        if !self.sampler.should_sample_with_rate(sample_rate).await {
164            return Ok(());
165        }
166        
167        // Record result metrics
168        let result_metric = ResultMetric {
169            attempt_id: result.attempt_id.clone(),
170            success: result.success,
171            duration: result.duration,
172            error_category: result.error_info.as_ref().map(|e| e.error_category.clone()),
173            performance: result.performance_metrics.clone(),
174            connection_info: result.connection_info.clone(),
175        };
176        
177        self.metrics_store.record_result(result_metric).await?;
178        
179        // Update counters and histograms
180        let status = if result.success { "success" } else { "failure" };
181        self.increment_counter("nat_results_total", &[("status", status)]).await;
182        
183        self.record_histogram("nat_duration_ms", result.duration.as_millis() as f64, &[
184            ("status", status),
185        ]).await;
186        
187        if let Some(conn_info) = &result.connection_info {
188            self.record_histogram("connection_latency_ms", conn_info.quality.latency_ms as f64, &[]).await;
189            self.record_histogram("connection_throughput_mbps", conn_info.quality.throughput_mbps as f64, &[]).await;
190        }
191        
192        // Record error metrics
193        if let Some(error_info) = &result.error_info {
194            self.increment_counter("nat_errors_total", &[
195                ("category", &format!("{:?}", error_info.error_category)),
196                ("code", &error_info.error_code),
197            ]).await;
198        }
199        
200        Ok(())
201    }
202    
203    /// Get collector status
204    pub async fn get_status(&self) -> String {
205        let state = self.state.read().await;
206        format!("{:?}", state.status)
207    }
208    
209    /// Get metrics summary
210    pub async fn get_summary(&self) -> MetricsSummary {
211        self.metrics_store.get_summary().await
212    }
213    
214    /// Increment counter metric
215    async fn increment_counter(&self, name: &str, labels: &[(&str, &str)]) {
216        self.metrics_store.increment_counter(name, labels).await;
217    }
218    
219    /// Record histogram value
220    async fn record_histogram(&self, name: &str, value: f64, labels: &[(&str, &str)]) {
221        self.metrics_store.record_histogram(name, value, labels).await;
222    }
223    
224    /// Start aggregation background task
225    async fn start_aggregation_task(&self) -> Result<(), MonitoringError> {
226        let aggregator = self.aggregator.clone();
227        let metrics_store = self.metrics_store.clone();
228        let interval_duration = self.config.aggregation.interval;
229        
230        let task = tokio::spawn(async move {
231            let mut interval = interval(interval_duration);
232            
233            loop {
234                interval.tick().await;
235                
236                if let Err(e) = aggregator.aggregate_metrics(&metrics_store).await {
237                    warn!("Metrics aggregation failed: {}", e);
238                }
239            }
240        });
241        
242        self.tasks.lock().await.push(task);
243        Ok(())
244    }
245    
246    /// Start export background task
247    async fn start_export_task(&self) -> Result<(), MonitoringError> {
248        let exporter = self.exporter.clone();
249        let aggregator = self.aggregator.clone();
250        let interval_duration = self.config.export.interval;
251        
252        let task = tokio::spawn(async move {
253            let mut interval = interval(interval_duration);
254            
255            loop {
256                interval.tick().await;
257                
258                match aggregator.get_aggregated_metrics().await {
259                    Ok(metrics) => {
260                        if let Err(e) = exporter.export_metrics(metrics).await {
261                            warn!("Metrics export failed: {}", e);
262                        }
263                    }
264                    Err(e) => {
265                        warn!("Failed to get aggregated metrics: {}", e);
266                    }
267                }
268            }
269        });
270        
271        self.tasks.lock().await.push(task);
272        Ok(())
273    }
274    
275    /// Start cleanup background task
276    async fn start_cleanup_task(&self) -> Result<(), MonitoringError> {
277        let metrics_store = self.metrics_store.clone();
278        let retention_period = self.config.storage.retention_period;
279        
280        let task = tokio::spawn(async move {
281            let mut interval = interval(Duration::from_secs(3600)); // Cleanup hourly
282            
283            loop {
284                interval.tick().await;
285                
286                if let Err(e) = metrics_store.cleanup_old_data(retention_period).await {
287                    warn!("Metrics cleanup failed: {}", e);
288                }
289            }
290        });
291        
292        self.tasks.lock().await.push(task);
293        Ok(())
294    }
295    
296    /// Start health monitoring task
297    async fn start_health_task(&self) -> Result<(), MonitoringError> {
298        let circuit_breaker = self.circuit_breaker.clone();
299        let metrics_store = self.metrics_store.clone();
300        let state = self.state.clone();
301        
302        let task = tokio::spawn(async move {
303            let mut interval = interval(Duration::from_secs(30)); // Health check every 30s
304            
305            loop {
306                interval.tick().await;
307                
308                // Check system health
309                let health = metrics_store.get_health_metrics().await;
310                let metrics_per_second = health.metrics_per_second;
311                circuit_breaker.update_health(health).await;
312                
313                // Update collector state
314                let mut collector_state = state.write().await;
315                collector_state.last_health_check = Some(Instant::now());
316                collector_state.metrics_collected += metrics_per_second as u64;
317            }
318        });
319        
320        self.tasks.lock().await.push(task);
321        Ok(())
322    }
323}
324
325/// Metrics configuration
326#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
327pub struct MetricsConfig {
328    /// Storage configuration
329    pub storage: StorageConfig,
330    /// Sampling configuration
331    pub sampling: SamplingConfig,
332    /// Aggregation configuration
333    pub aggregation: AggregationConfig,
334    /// Export configuration
335    pub export: MetricsExportConfig,
336    /// Circuit breaker configuration
337    pub circuit_breaker: CircuitBreakerConfig,
338}
339
340impl Default for MetricsConfig {
341    fn default() -> Self {
342        Self {
343            storage: StorageConfig::default(),
344            sampling: SamplingConfig::default(),
345            aggregation: AggregationConfig::default(),
346            export: MetricsExportConfig::default(),
347            circuit_breaker: CircuitBreakerConfig::default(),
348        }
349    }
350}
351
352/// Storage configuration
353#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
354pub struct StorageConfig {
355    /// Maximum metrics to store in memory
356    pub max_metrics: usize,
357    /// Data retention period
358    pub retention_period: Duration,
359    /// Flush interval to disk
360    pub flush_interval: Duration,
361}
362
363impl Default for StorageConfig {
364    fn default() -> Self {
365        Self {
366            max_metrics: 100_000,
367            retention_period: Duration::from_secs(3600), // 1 hour
368            flush_interval: Duration::from_secs(60),     // 1 minute
369        }
370    }
371}
372
373/// Sampling configuration
374#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
375pub struct SamplingConfig {
376    /// Base sampling rate for attempts
377    pub base_attempt_rate: f64,
378    /// Sampling rate for results
379    pub result_rate: f64,
380    /// Adaptive sampling settings
381    pub adaptive: AdaptiveSamplingConfig,
382}
383
384impl Default for SamplingConfig {
385    fn default() -> Self {
386        Self {
387            base_attempt_rate: 0.01, // 1% of attempts
388            result_rate: 0.1,        // 10% of results
389            adaptive: AdaptiveSamplingConfig::default(),
390        }
391    }
392}
393
394/// Adaptive sampling configuration
395#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
396pub struct AdaptiveSamplingConfig {
397    /// Enable adaptive sampling
398    pub enabled: bool,
399    /// Target metrics per second
400    pub target_rate: f64,
401    /// Adjustment interval
402    pub adjustment_interval: Duration,
403    /// Maximum sampling rate
404    pub max_rate: f64,
405    /// Minimum sampling rate
406    pub min_rate: f64,
407}
408
409impl Default for AdaptiveSamplingConfig {
410    fn default() -> Self {
411        Self {
412            enabled: true,
413            target_rate: 1000.0, // 1000 metrics/sec
414            adjustment_interval: Duration::from_secs(60),
415            max_rate: 1.0,
416            min_rate: 0.001,
417        }
418    }
419}
420
421/// Aggregation configuration
422#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
423pub struct AggregationConfig {
424    /// Aggregation interval
425    pub interval: Duration,
426    /// Aggregation window size
427    pub window_size: Duration,
428    /// Enable percentile calculations
429    pub enable_percentiles: bool,
430}
431
432impl Default for AggregationConfig {
433    fn default() -> Self {
434        Self {
435            interval: Duration::from_secs(10),        // Aggregate every 10s
436            window_size: Duration::from_secs(60),     // 60s window
437            enable_percentiles: true,
438        }
439    }
440}
441
442/// Export configuration
443#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
444pub struct MetricsExportConfig {
445    /// Export interval
446    pub interval: Duration,
447    /// Export destinations
448    pub destinations: Vec<ExportDestination>,
449    /// Batch size for export
450    pub batch_size: usize,
451    /// Export timeout
452    pub timeout: Duration,
453}
454
455impl Default for MetricsExportConfig {
456    fn default() -> Self {
457        Self {
458            interval: Duration::from_secs(30),
459            destinations: vec![ExportDestination::Prometheus {
460                endpoint: "http://localhost:9090/api/v1/write".to_string(),
461            }],
462            batch_size: 1000,
463            timeout: Duration::from_secs(10),
464        }
465    }
466}
467
468/// Export destinations
469#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
470pub enum ExportDestination {
471    Prometheus { endpoint: String },
472    InfluxDB { endpoint: String, database: String },
473    CloudWatch { region: String },
474    DataDog { api_key: String },
475    StatsD { endpoint: String },
476}
477
478/// Circuit breaker configuration
479#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
480pub struct CircuitBreakerConfig {
481    /// Failure threshold
482    pub failure_threshold: u32,
483    /// Success threshold for recovery
484    pub success_threshold: u32,
485    /// Timeout duration
486    pub timeout: Duration,
487    /// Maximum queue size
488    pub max_queue_size: usize,
489}
490
491impl Default for CircuitBreakerConfig {
492    fn default() -> Self {
493        Self {
494            failure_threshold: 5,
495            success_threshold: 3,
496            timeout: Duration::from_secs(60),
497            max_queue_size: 10000,
498        }
499    }
500}
501
502/// Metrics store for high-performance storage
503struct MetricsStore {
504    /// Counter metrics
505    counters: Arc<RwLock<HashMap<String, CounterMetric>>>,
506    /// Histogram metrics
507    histograms: Arc<RwLock<HashMap<String, HistogramMetric>>>,
508    /// Attempt metrics
509    attempts: Arc<Mutex<VecDeque<AttemptMetric>>>,
510    /// Result metrics
511    results: Arc<Mutex<VecDeque<ResultMetric>>>,
512    /// Storage configuration
513    config: StorageConfig,
514}
515
516impl MetricsStore {
517    fn new(config: StorageConfig) -> Self {
518        Self {
519            counters: Arc::new(RwLock::new(HashMap::new())),
520            histograms: Arc::new(RwLock::new(HashMap::new())),
521            attempts: Arc::new(Mutex::new(VecDeque::new())),
522            results: Arc::new(Mutex::new(VecDeque::new())),
523            config,
524        }
525    }
526    
527    async fn record_attempt(&self, attempt: AttemptMetric) -> Result<(), MonitoringError> {
528        let mut attempts = self.attempts.lock().await;
529        attempts.push_back(attempt);
530        
531        // Enforce size limit
532        while attempts.len() > self.config.max_metrics {
533            attempts.pop_front();
534        }
535        
536        Ok(())
537    }
538    
539    async fn record_result(&self, result: ResultMetric) -> Result<(), MonitoringError> {
540        let mut results = self.results.lock().await;
541        results.push_back(result);
542        
543        // Enforce size limit
544        while results.len() > self.config.max_metrics {
545            results.pop_front();
546        }
547        
548        Ok(())
549    }
550    
551    async fn increment_counter(&self, name: &str, labels: &[(&str, &str)]) {
552        let key = format!("{}:{}", name, labels_to_string(labels));
553        let mut counters = self.counters.write().await;
554        
555        counters.entry(key)
556            .or_insert_with(|| CounterMetric::new(name.to_string(), labels_to_map(labels)))
557            .increment();
558    }
559    
560    async fn record_histogram(&self, name: &str, value: f64, labels: &[(&str, &str)]) {
561        let key = format!("{}:{}", name, labels_to_string(labels));
562        let mut histograms = self.histograms.write().await;
563        
564        histograms.entry(key)
565            .or_insert_with(|| HistogramMetric::new(name.to_string(), labels_to_map(labels)))
566            .record(value);
567    }
568    
569    async fn get_summary(&self) -> MetricsSummary {
570        let results = self.results.lock().await;
571        let _one_hour_ago = SystemTime::now() - Duration::from_secs(3600);
572        
573        let recent_results: Vec<_> = results.iter()
574            .filter(|r| r.attempt_id.len() > 0) // Simple time filter
575            .collect();
576        
577        let total_attempts = recent_results.len() as u64;
578        let successful = recent_results.iter().filter(|r| r.success).count() as u64;
579        let success_rate = if total_attempts > 0 {
580            successful as f32 / total_attempts as f32
581        } else {
582            0.0
583        };
584        
585        let avg_duration = if !recent_results.is_empty() {
586            recent_results.iter()
587                .map(|r| r.duration.as_millis())
588                .sum::<u128>() / recent_results.len() as u128
589        } else {
590            0
591        };
592        
593        MetricsSummary {
594            nat_attempts_last_hour: total_attempts,
595            success_rate_last_hour: success_rate,
596            avg_connection_time_ms: avg_duration as u64,
597            active_connections: 0, // Would be tracked separately
598            error_rate_last_hour: 1.0 - success_rate,
599        }
600    }
601    
602    async fn cleanup_old_data(&self, retention_period: Duration) -> Result<(), MonitoringError> {
603        let cutoff = SystemTime::now() - retention_period;
604        
605        // Cleanup attempts
606        {
607            let mut attempts = self.attempts.lock().await;
608            while let Some(front) = attempts.front() {
609                if front.timestamp < cutoff {
610                    attempts.pop_front();
611                } else {
612                    break;
613                }
614            }
615        }
616        
617        // Cleanup results would be similar
618        // In practice, would also cleanup counters and histograms
619        
620        Ok(())
621    }
622    
623    async fn get_health_metrics(&self) -> HealthMetrics {
624        let attempts_count = self.attempts.lock().await.len();
625        let results_count = self.results.lock().await.len();
626        
627        HealthMetrics {
628            metrics_per_second: (attempts_count + results_count) as f64 / 60.0, // Rough estimate
629            memory_usage_mb: ((attempts_count + results_count) * 1024) as f64 / 1024.0 / 1024.0,
630            queue_depth: attempts_count + results_count,
631            error_rate: 0.0, // Would calculate from actual errors
632        }
633    }
634}
635
636/// Adaptive sampler for intelligent sampling decisions
637struct AdaptiveSampler {
638    config: SamplingConfig,
639    current_rate: Arc<AtomicU64>, // Stored as u64 for atomic operations
640    last_adjustment: Arc<RwLock<Instant>>,
641}
642
643impl AdaptiveSampler {
644    fn new(config: SamplingConfig) -> Self {
645        let initial_rate = (config.base_attempt_rate * 1_000_000.0) as u64; // Store as millionths
646        
647        Self {
648            config,
649            current_rate: Arc::new(AtomicU64::new(initial_rate)),
650            last_adjustment: Arc::new(RwLock::new(Instant::now())),
651        }
652    }
653    
654    async fn should_sample_attempt(&self, _attempt: &NatTraversalAttempt) -> bool {
655        let rate = self.current_rate.load(Ordering::Relaxed) as f64 / 1_000_000.0;
656        rand::random::<f64>() < rate
657    }
658    
659    async fn should_sample_with_rate(&self, rate: f64) -> bool {
660        rand::random::<f64>() < rate
661    }
662    
663    async fn adjust_sampling_rate(&self, current_metrics_rate: f64) {
664        if !self.config.adaptive.enabled {
665            return;
666        }
667        
668        let mut last_adjustment = self.last_adjustment.write().await;
669        if last_adjustment.elapsed() < self.config.adaptive.adjustment_interval {
670            return;
671        }
672        
673        let target_rate = self.config.adaptive.target_rate;
674        let current_rate = self.current_rate.load(Ordering::Relaxed) as f64 / 1_000_000.0;
675        
676        let adjustment_factor = target_rate / current_metrics_rate.max(1.0);
677        let new_rate = (current_rate * adjustment_factor)
678            .max(self.config.adaptive.min_rate)
679            .min(self.config.adaptive.max_rate);
680        
681        self.current_rate.store((new_rate * 1_000_000.0) as u64, Ordering::Relaxed);
682        *last_adjustment = Instant::now();
683        
684        debug!("Adjusted sampling rate from {:.4} to {:.4}", current_rate, new_rate);
685    }
686}
687
688/// Circuit breaker for overload protection
689struct CircuitBreaker {
690    config: CircuitBreakerConfig,
691    state: Arc<RwLock<CircuitBreakerState>>,
692    consecutive_failures: Arc<AtomicU64>,
693    consecutive_successes: Arc<AtomicU64>,
694    queue_size: Arc<AtomicU64>,
695}
696
697impl CircuitBreaker {
698    fn new(config: CircuitBreakerConfig) -> Self {
699        Self {
700            config,
701            state: Arc::new(RwLock::new(CircuitBreakerState::Closed)),
702            consecutive_failures: Arc::new(AtomicU64::new(0)),
703            consecutive_successes: Arc::new(AtomicU64::new(0)),
704            queue_size: Arc::new(AtomicU64::new(0)),
705        }
706    }
707    
708    async fn allow_request(&self) -> bool {
709        let state = self.state.read().await;
710        
711        match *state {
712            CircuitBreakerState::Closed => {
713                // Check queue size
714                self.queue_size.load(Ordering::Relaxed) < self.config.max_queue_size as u64
715            }
716            CircuitBreakerState::Open => false,
717            CircuitBreakerState::HalfOpen => {
718                // Allow limited requests to test recovery
719                self.queue_size.load(Ordering::Relaxed) < (self.config.max_queue_size / 10) as u64
720            }
721        }
722    }
723    
724    async fn update_health(&self, health: HealthMetrics) {
725        // Update queue size
726        self.queue_size.store(health.queue_depth as u64, Ordering::Relaxed);
727        
728        // Check if we should change circuit breaker state
729        if health.error_rate > 0.5 {
730            // High error rate
731            let failures = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
732            self.consecutive_successes.store(0, Ordering::Relaxed);
733            
734            if failures >= self.config.failure_threshold as u64 {
735                let mut state = self.state.write().await;
736                *state = CircuitBreakerState::Open;
737                warn!("Circuit breaker opened due to high error rate");
738            }
739        } else {
740            // Low error rate
741            let successes = self.consecutive_successes.fetch_add(1, Ordering::Relaxed) + 1;
742            self.consecutive_failures.store(0, Ordering::Relaxed);
743            
744            let current_state = *self.state.read().await;
745            if matches!(current_state, CircuitBreakerState::Open) && successes >= self.config.success_threshold as u64 {
746                let mut state = self.state.write().await;
747                *state = CircuitBreakerState::HalfOpen;
748                info!("Circuit breaker moved to half-open state");
749            } else if matches!(current_state, CircuitBreakerState::HalfOpen) && successes >= self.config.success_threshold as u64 * 2 {
750                let mut state = self.state.write().await;
751                *state = CircuitBreakerState::Closed;
752                info!("Circuit breaker closed - system recovered");
753            }
754        }
755    }
756}
757
758/// Circuit breaker states
759#[derive(Debug, Clone, Copy)]
760enum CircuitBreakerState {
761    Closed,
762    Open,
763    HalfOpen,
764}
765
766/// Metrics aggregator for time-based aggregation
767struct MetricsAggregator {
768    config: AggregationConfig,
769    aggregated_data: Arc<RwLock<AggregatedData>>,
770}
771
772impl MetricsAggregator {
773    fn new(config: AggregationConfig) -> Self {
774        Self {
775            config,
776            aggregated_data: Arc::new(RwLock::new(AggregatedData::new())),
777        }
778    }
779    
780    async fn aggregate_metrics(&self, _metrics_store: &MetricsStore) -> Result<(), MonitoringError> {
781        // Aggregate metrics from the store
782        // This would calculate time-based windows, percentiles, etc.
783        debug!("Aggregating metrics for export");
784        Ok(())
785    }
786    
787    async fn get_aggregated_metrics(&self) -> Result<Vec<ExportMetric>, MonitoringError> {
788        let data = self.aggregated_data.read().await;
789        Ok(data.to_export_metrics())
790    }
791}
792
793/// Metrics exporter for various destinations
794struct MetricsExporter {
795    config: MetricsExportConfig,
796}
797
798impl MetricsExporter {
799    fn new(config: MetricsExportConfig) -> Self {
800        Self { config }
801    }
802    
803    async fn export_metrics(&self, metrics: Vec<ExportMetric>) -> Result<(), MonitoringError> {
804        for destination in &self.config.destinations {
805            if let Err(e) = self.export_to_destination(destination, &metrics).await {
806                warn!("Failed to export to {:?}: {}", destination, e);
807            }
808        }
809        Ok(())
810    }
811    
812    async fn export_to_destination(&self, destination: &ExportDestination, metrics: &[ExportMetric]) -> Result<(), MonitoringError> {
813        match destination {
814            ExportDestination::Prometheus { endpoint } => {
815                debug!("Exporting {} metrics to Prometheus at {}", metrics.len(), endpoint);
816                // Would implement actual Prometheus export
817            }
818            ExportDestination::InfluxDB { endpoint, database } => {
819                debug!("Exporting {} metrics to InfluxDB at {} (db: {})", metrics.len(), endpoint, database);
820                // Would implement actual InfluxDB export
821            }
822            _ => {
823                debug!("Export to {:?} not yet implemented", destination);
824            }
825        }
826        Ok(())
827    }
828    
829    async fn flush(&self) -> Result<(), MonitoringError> {
830        debug!("Flushing remaining metrics");
831        Ok(())
832    }
833}
834
835// Helper functions and data structures
836
837fn labels_to_string(labels: &[(&str, &str)]) -> String {
838    labels.iter()
839        .map(|(k, v)| format!("{}={}", k, v))
840        .collect::<Vec<_>>()
841        .join(",")
842}
843
844fn labels_to_map(labels: &[(&str, &str)]) -> HashMap<String, String> {
845    labels.iter()
846        .map(|(k, v)| (k.to_string(), v.to_string()))
847        .collect()
848}
849
850/// Collector state
851#[derive(Debug)]
852struct CollectorState {
853    status: CollectorStatus,
854    start_time: Option<Instant>,
855    last_health_check: Option<Instant>,
856    metrics_collected: u64,
857    errors_encountered: u64,
858}
859
860impl CollectorState {
861    fn new() -> Self {
862        Self {
863            status: CollectorStatus::Stopped,
864            start_time: None,
865            last_health_check: None,
866            metrics_collected: 0,
867            errors_encountered: 0,
868        }
869    }
870}
871
872/// Collector status
873#[derive(Debug, Clone)]
874enum CollectorStatus {
875    Stopped,
876    Starting,
877    Running,
878    Stopping,
879    Error,
880}
881
882/// Health metrics
883#[derive(Debug, Clone)]
884struct HealthMetrics {
885    metrics_per_second: f64,
886    memory_usage_mb: f64,
887    queue_depth: usize,
888    error_rate: f64,
889}
890
891/// Attempt metric
892#[derive(Debug, Clone)]
893struct AttemptMetric {
894    attempt_id: String,
895    timestamp: SystemTime,
896    client_region: Option<String>,
897    server_region: Option<String>,
898    nat_types: (Option<crate::monitoring::NatType>, Option<crate::monitoring::NatType>),
899    network_conditions: crate::monitoring::NetworkConditions,
900}
901
902/// Result metric
903#[derive(Debug, Clone)]
904struct ResultMetric {
905    attempt_id: String,
906    success: bool,
907    duration: Duration,
908    error_category: Option<crate::monitoring::ErrorCategory>,
909    performance: crate::monitoring::PerformanceMetrics,
910    connection_info: Option<crate::monitoring::ConnectionInfo>,
911}
912
913/// Counter metric for tracking counts
914#[derive(Debug)]
915struct CounterMetric {
916    name: String,
917    labels: HashMap<String, String>,
918    value: AtomicU64,
919    last_updated: std::sync::RwLock<Instant>,
920}
921
922impl CounterMetric {
923    fn new(name: String, labels: HashMap<String, String>) -> Self {
924        Self {
925            name,
926            labels,
927            value: AtomicU64::new(0),
928            last_updated: std::sync::RwLock::new(Instant::now()),
929        }
930    }
931    
932    fn increment(&self) {
933        self.value.fetch_add(1, Ordering::Relaxed);
934        if let Ok(mut last_updated) = self.last_updated.write() {
935            *last_updated = Instant::now();
936        }
937    }
938    
939    fn get_value(&self) -> u64 {
940        self.value.load(Ordering::Relaxed)
941    }
942}
943
944/// Histogram metric for tracking distributions
945#[derive(Debug)]
946struct HistogramMetric {
947    name: String,
948    labels: HashMap<String, String>,
949    values: std::sync::Mutex<Vec<f64>>,
950    buckets: Vec<f64>,
951    last_updated: std::sync::RwLock<Instant>,
952}
953
954impl HistogramMetric {
955    fn new(name: String, labels: HashMap<String, String>) -> Self {
956        // Standard histogram buckets for latency/duration metrics
957        let buckets = vec![
958            0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 25.0, 50.0, 100.0
959        ];
960        
961        Self {
962            name,
963            labels,
964            values: std::sync::Mutex::new(Vec::new()),
965            buckets,
966            last_updated: std::sync::RwLock::new(Instant::now()),
967        }
968    }
969    
970    fn record(&self, value: f64) {
971        if let Ok(mut values) = self.values.lock() {
972            values.push(value);
973            // Keep only recent values to prevent unbounded growth
974            if values.len() > 10000 {
975                values.drain(0..5000);
976            }
977        }
978        if let Ok(mut last_updated) = self.last_updated.write() {
979            *last_updated = Instant::now();
980        }
981    }
982    
983    fn get_percentile(&self, percentile: f64) -> Option<f64> {
984        let values = self.values.lock().ok()?;
985        if values.is_empty() {
986            return None;
987        }
988        
989        let mut sorted_values = values.clone();
990        sorted_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
991        
992        let index = ((percentile / 100.0) * (sorted_values.len() - 1) as f64) as usize;
993        Some(sorted_values[index])
994    }
995    
996    fn get_bucket_counts(&self) -> Vec<(f64, u64)> {
997        let values = match self.values.lock() {
998            Ok(guard) => guard,
999            Err(_) => return Vec::new(),
1000        };
1001        
1002        self.buckets.iter().map(|&bucket| {
1003            let count = values.iter().filter(|&&v| v <= bucket).count() as u64;
1004            (bucket, count)
1005        }).collect()
1006    }
1007}
1008
1009/// Bootstrap node performance metrics
1010#[derive(Debug, Clone)]
1011pub struct BootstrapNodeMetrics {
1012    /// Node address
1013    pub address: SocketAddr,
1014    /// Total coordination requests handled
1015    pub coordination_requests: u64,
1016    /// Successful coordinations
1017    pub successful_coordinations: u64,
1018    /// Average response time
1019    pub avg_response_time_ms: f64,
1020    /// Current availability (0.0 to 1.0)
1021    pub availability: f64,
1022    /// Last successful contact
1023    pub last_contact: Option<SystemTime>,
1024    /// Error rate in last hour
1025    pub error_rate: f64,
1026}
1027
1028/// NAT type success rate metrics
1029#[derive(Debug, Clone)]
1030pub struct NatTypeMetrics {
1031    /// NAT type
1032    pub nat_type: crate::monitoring::NatType,
1033    /// Total attempts for this NAT type
1034    pub total_attempts: u64,
1035    /// Successful attempts
1036    pub successful_attempts: u64,
1037    /// Success rate (0.0 to 1.0)
1038    pub success_rate: f64,
1039    /// Average connection time for successful attempts
1040    pub avg_connection_time_ms: f64,
1041    /// Most common failure reasons
1042    pub common_failures: Vec<(String, u64)>,
1043}
1044
1045/// Connection latency and RTT metrics
1046#[derive(Debug, Clone)]
1047pub struct LatencyMetrics {
1048    /// Connection establishment latency percentiles
1049    pub connection_latency_p50: f64,
1050    pub connection_latency_p95: f64,
1051    pub connection_latency_p99: f64,
1052    /// Round-trip time percentiles
1053    pub rtt_p50: f64,
1054    pub rtt_p95: f64,
1055    pub rtt_p99: f64,
1056    /// Jitter measurements
1057    pub jitter_avg: f64,
1058    pub jitter_max: f64,
1059}
1060
1061/// Comprehensive metrics collection implementation
1062impl ProductionMetricsCollector {
1063    /// Record bootstrap node performance
1064    pub async fn record_bootstrap_performance(
1065        &self,
1066        node_address: SocketAddr,
1067        response_time: Duration,
1068        success: bool,
1069    ) -> Result<(), MonitoringError> {
1070        // Update bootstrap node specific metrics
1071        let node_str = node_address.to_string();
1072        let status_str = if success { "success" } else { "failure" };
1073        let labels = &[
1074            ("node", node_str.as_str()),
1075            ("status", status_str),
1076        ];
1077        
1078        self.increment_counter("bootstrap_requests_total", labels).await;
1079        self.record_histogram("bootstrap_response_time_ms", response_time.as_millis() as f64, &[
1080            ("node", &node_address.to_string()),
1081        ]).await;
1082        
1083        if !success {
1084            self.increment_counter("bootstrap_errors_total", &[
1085                ("node", &node_address.to_string()),
1086            ]).await;
1087        }
1088        
1089        Ok(())
1090    }
1091    
1092    /// Record NAT type specific metrics
1093    pub async fn record_nat_type_result(
1094        &self,
1095        nat_type: crate::monitoring::NatType,
1096        success: bool,
1097        duration: Duration,
1098        error_category: Option<crate::monitoring::ErrorCategory>,
1099    ) -> Result<(), MonitoringError> {
1100        let nat_type_str = format!("{:?}", nat_type);
1101        let status = if success { "success" } else { "failure" };
1102        
1103        // Record NAT type specific success/failure
1104        self.increment_counter("nat_traversal_by_type_total", &[
1105            ("nat_type", &nat_type_str),
1106            ("status", status),
1107        ]).await;
1108        
1109        // Record duration by NAT type
1110        self.record_histogram("nat_traversal_duration_by_type_ms", duration.as_millis() as f64, &[
1111            ("nat_type", &nat_type_str),
1112            ("status", status),
1113        ]).await;
1114        
1115        // Record error categories for failures
1116        if let Some(error_cat) = error_category {
1117            self.increment_counter("nat_traversal_errors_by_type", &[
1118                ("nat_type", &nat_type_str),
1119                ("error_category", &format!("{:?}", error_cat)),
1120            ]).await;
1121        }
1122        
1123        Ok(())
1124    }
1125    
1126    /// Record connection quality metrics
1127    pub async fn record_connection_quality(
1128        &self,
1129        latency_ms: u32,
1130        jitter_ms: u32,
1131        throughput_mbps: f32,
1132        packet_loss_rate: f32,
1133    ) -> Result<(), MonitoringError> {
1134        // Record latency metrics
1135        self.record_histogram("connection_latency_ms", latency_ms as f64, &[]).await;
1136        self.record_histogram("connection_jitter_ms", jitter_ms as f64, &[]).await;
1137        self.record_histogram("connection_throughput_mbps", throughput_mbps as f64, &[]).await;
1138        self.record_histogram("connection_packet_loss_rate", packet_loss_rate as f64, &[]).await;
1139        
1140        Ok(())
1141    }
1142    
1143    /// Get bootstrap node metrics
1144    pub async fn get_bootstrap_metrics(&self) -> Vec<BootstrapNodeMetrics> {
1145        let counters = self.metrics_store.counters.read().await;
1146        let histograms = self.metrics_store.histograms.read().await;
1147        
1148        let mut node_metrics = HashMap::new();
1149        
1150        // Collect bootstrap node data from counters and histograms
1151        for (key, counter) in counters.iter() {
1152            if key.starts_with("bootstrap_requests_total:") {
1153                if let Some(node_addr) = extract_label_value(key, "node") {
1154                    let entry = node_metrics.entry(node_addr.clone()).or_insert_with(|| BootstrapNodeMetrics {
1155                        address: node_addr.parse().unwrap_or_else(|_| "0.0.0.0:0".parse().unwrap()),
1156                        coordination_requests: 0,
1157                        successful_coordinations: 0,
1158                        avg_response_time_ms: 0.0,
1159                        availability: 1.0,
1160                        last_contact: Some(SystemTime::now()),
1161                        error_rate: 0.0,
1162                    });
1163                    
1164                    if key.contains("status=success") {
1165                        entry.successful_coordinations = counter.get_value();
1166                    }
1167                    entry.coordination_requests += counter.get_value();
1168                }
1169            }
1170        }
1171        
1172        // Add response time data from histograms
1173        for (key, histogram) in histograms.iter() {
1174            if key.starts_with("bootstrap_response_time_ms:") {
1175                if let Some(node_addr) = extract_label_value(key, "node") {
1176                    if let Some(entry) = node_metrics.get_mut(&node_addr) {
1177                        entry.avg_response_time_ms = histogram.get_percentile(50.0).unwrap_or(0.0);
1178                    }
1179                }
1180            }
1181        }
1182        
1183        // Calculate availability and error rates
1184        for metrics in node_metrics.values_mut() {
1185            if metrics.coordination_requests > 0 {
1186                metrics.availability = metrics.successful_coordinations as f64 / metrics.coordination_requests as f64;
1187                metrics.error_rate = 1.0 - metrics.availability;
1188            }
1189        }
1190        
1191        node_metrics.into_values().collect()
1192    }
1193    
1194    /// Get NAT type success rate metrics
1195    pub async fn get_nat_type_metrics(&self) -> Vec<NatTypeMetrics> {
1196        let counters = self.metrics_store.counters.read().await;
1197        let histograms = self.metrics_store.histograms.read().await;
1198        
1199        let mut nat_metrics = HashMap::new();
1200        
1201        // Collect NAT type data from counters
1202        for (key, counter) in counters.iter() {
1203            if key.starts_with("nat_traversal_by_type_total:") {
1204                if let Some(nat_type_str) = extract_label_value(key, "nat_type") {
1205                    let nat_type = parse_nat_type(&nat_type_str);
1206                    let entry = nat_metrics.entry(nat_type_str.clone()).or_insert_with(|| NatTypeMetrics {
1207                        nat_type,
1208                        total_attempts: 0,
1209                        successful_attempts: 0,
1210                        success_rate: 0.0,
1211                        avg_connection_time_ms: 0.0,
1212                        common_failures: Vec::new(),
1213                    });
1214                    
1215                    if key.contains("status=success") {
1216                        entry.successful_attempts = counter.get_value();
1217                    }
1218                    entry.total_attempts += counter.get_value();
1219                }
1220            }
1221        }
1222        
1223        // Add duration data from histograms
1224        for (key, histogram) in histograms.iter() {
1225            if key.starts_with("nat_traversal_duration_by_type_ms:") && key.contains("status=success") {
1226                if let Some(nat_type_str) = extract_label_value(key, "nat_type") {
1227                    if let Some(entry) = nat_metrics.get_mut(&nat_type_str) {
1228                        entry.avg_connection_time_ms = histogram.get_percentile(50.0).unwrap_or(0.0);
1229                    }
1230                }
1231            }
1232        }
1233        
1234        // Calculate success rates
1235        for metrics in nat_metrics.values_mut() {
1236            if metrics.total_attempts > 0 {
1237                metrics.success_rate = metrics.successful_attempts as f64 / metrics.total_attempts as f64;
1238            }
1239        }
1240        
1241        nat_metrics.into_values().collect()
1242    }
1243    
1244    /// Get latency and RTT metrics
1245    pub async fn get_latency_metrics(&self) -> LatencyMetrics {
1246        let histograms = self.metrics_store.histograms.read().await;
1247        
1248        let connection_latency = histograms.get("connection_latency_ms:");
1249        let rtt_histogram = histograms.get("connection_rtt_ms:");
1250        let jitter_histogram = histograms.get("connection_jitter_ms:");
1251        
1252        LatencyMetrics {
1253            connection_latency_p50: connection_latency.and_then(|h| h.get_percentile(50.0)).unwrap_or(0.0),
1254            connection_latency_p95: connection_latency.and_then(|h| h.get_percentile(95.0)).unwrap_or(0.0),
1255            connection_latency_p99: connection_latency.and_then(|h| h.get_percentile(99.0)).unwrap_or(0.0),
1256            rtt_p50: rtt_histogram.and_then(|h| h.get_percentile(50.0)).unwrap_or(0.0),
1257            rtt_p95: rtt_histogram.and_then(|h| h.get_percentile(95.0)).unwrap_or(0.0),
1258            rtt_p99: rtt_histogram.and_then(|h| h.get_percentile(99.0)).unwrap_or(0.0),
1259            jitter_avg: jitter_histogram.and_then(|h| h.get_percentile(50.0)).unwrap_or(0.0),
1260            jitter_max: jitter_histogram.and_then(|h| h.get_percentile(100.0)).unwrap_or(0.0),
1261        }
1262    }
1263}
1264
1265// Helper functions
1266fn extract_label_value(key: &str, label_name: &str) -> Option<String> {
1267    let label_prefix = format!("{}=", label_name);
1268    key.split(',')
1269        .find(|part| part.contains(&label_prefix))
1270        .and_then(|part| part.split('=').nth(1))
1271        .map(|s| s.to_string())
1272}
1273
1274fn parse_nat_type(nat_type_str: &str) -> crate::monitoring::NatType {
1275    match nat_type_str {
1276        "FullCone" => crate::monitoring::NatType::FullCone,
1277        "RestrictedCone" => crate::monitoring::NatType::RestrictedCone,
1278        "PortRestrictedCone" => crate::monitoring::NatType::PortRestrictedCone,
1279        "Symmetric" => crate::monitoring::NatType::Symmetric,
1280        "CarrierGrade" => crate::monitoring::NatType::CarrierGrade,
1281        "DoubleNat" => crate::monitoring::NatType::DoubleNat,
1282        "None" => crate::monitoring::NatType::None,
1283        _ => crate::monitoring::NatType::None,
1284    }
1285}
1286
1287/// Aggregated data for export
1288#[derive(Debug)]
1289struct AggregatedData {
1290    /// Aggregated counters
1291    counters: HashMap<String, u64>,
1292    /// Aggregated histograms with percentiles
1293    histograms: HashMap<String, HistogramSummary>,
1294    /// Last aggregation time
1295    last_aggregation: Instant,
1296}
1297
1298impl AggregatedData {
1299    fn new() -> Self {
1300        Self {
1301            counters: HashMap::new(),
1302            histograms: HashMap::new(),
1303            last_aggregation: Instant::now(),
1304        }
1305    }
1306    
1307    fn to_export_metrics(&self) -> Vec<ExportMetric> {
1308        let mut metrics = Vec::new();
1309        
1310        // Export counters
1311        for (name, value) in &self.counters {
1312            metrics.push(ExportMetric {
1313                name: name.clone(),
1314                metric_type: MetricType::Counter,
1315                value: MetricValue::Counter(*value),
1316                labels: HashMap::new(),
1317                timestamp: SystemTime::now(),
1318            });
1319        }
1320        
1321        // Export histogram summaries
1322        for (name, summary) in &self.histograms {
1323            metrics.push(ExportMetric {
1324                name: format!("{}_p50", name),
1325                metric_type: MetricType::Gauge,
1326                value: MetricValue::Gauge(summary.p50),
1327                labels: HashMap::new(),
1328                timestamp: SystemTime::now(),
1329            });
1330            
1331            metrics.push(ExportMetric {
1332                name: format!("{}_p95", name),
1333                metric_type: MetricType::Gauge,
1334                value: MetricValue::Gauge(summary.p95),
1335                labels: HashMap::new(),
1336                timestamp: SystemTime::now(),
1337            });
1338            
1339            metrics.push(ExportMetric {
1340                name: format!("{}_p99", name),
1341                metric_type: MetricType::Gauge,
1342                value: MetricValue::Gauge(summary.p99),
1343                labels: HashMap::new(),
1344                timestamp: SystemTime::now(),
1345            });
1346        }
1347        
1348        metrics
1349    }
1350}
1351
1352/// Histogram summary for aggregation
1353#[derive(Debug, Clone)]
1354struct HistogramSummary {
1355    pub count: u64,
1356    pub sum: f64,
1357    pub p50: f64,
1358    pub p95: f64,
1359    pub p99: f64,
1360}
1361
1362/// Export metric format
1363#[derive(Debug, Clone)]
1364pub struct ExportMetric {
1365    pub name: String,
1366    pub metric_type: MetricType,
1367    pub value: MetricValue,
1368    pub labels: HashMap<String, String>,
1369    pub timestamp: SystemTime,
1370}
1371
1372/// Metric types for export
1373#[derive(Debug, Clone)]
1374pub enum MetricType {
1375    Counter,
1376    Gauge,
1377    Histogram,
1378}
1379
1380/// Metric values for export
1381#[derive(Debug, Clone)]
1382pub enum MetricValue {
1383    Counter(u64),
1384    Gauge(f64),
1385    Histogram(Vec<(f64, u64)>), // bucket, count pairs
1386}
1387
1388
1389
1390#[cfg(test)]
1391mod tests {
1392    use super::*;
1393
1394    #[tokio::test]
1395    async fn test_metrics_collector_creation() {
1396        let config = MetricsConfig::default();
1397        let collector = ProductionMetricsCollector::new(config).await.unwrap();
1398        
1399        let status = collector.get_status().await;
1400        assert!(status.contains("Stopped"));
1401    }
1402    
1403    #[tokio::test]
1404    async fn test_adaptive_sampler() {
1405        let mut config = SamplingConfig::default();
1406        // Set a shorter adjustment interval for testing
1407        config.adaptive.adjustment_interval = Duration::from_millis(10);
1408        let sampler = AdaptiveSampler::new(config.clone());
1409        
1410        // Wait for the adjustment interval to allow first adjustment
1411        tokio::time::sleep(Duration::from_millis(20)).await;
1412        
1413        // Test rate adjustment with half the target rate
1414        sampler.adjust_sampling_rate(500.0).await; // Half of target rate (1000)
1415        
1416        // Rate should increase to compensate (double the base rate)
1417        let rate = sampler.current_rate.load(Ordering::Relaxed) as f64 / 1_000_000.0;
1418        let expected_rate = config.base_attempt_rate * 2.0; // Should double
1419        assert!((rate - expected_rate).abs() < 0.001); // Allow small floating point error
1420    }
1421    
1422    #[tokio::test]
1423    async fn test_circuit_breaker() {
1424        let config = CircuitBreakerConfig::default();
1425        let breaker = CircuitBreaker::new(config);
1426        
1427        // Initially should allow requests
1428        assert!(breaker.allow_request().await);
1429        
1430        // Simulate high error rate
1431        let bad_health = HealthMetrics {
1432            metrics_per_second: 1000.0,
1433            memory_usage_mb: 100.0,
1434            queue_depth: 100,
1435            error_rate: 0.8, // High error rate
1436        };
1437        
1438        // Update health multiple times to trip circuit breaker
1439        for _ in 0..10 {
1440            breaker.update_health(bad_health.clone()).await;
1441        }
1442        
1443        // Should eventually deny requests (depending on thresholds)
1444        // This test might need adjustment based on exact logic
1445    }
1446}