1use std::{
7 collections::{HashMap, VecDeque},
8 sync::{
9 atomic::{AtomicU64, Ordering},
10 Arc,
11 },
12 time::{Duration, Instant, SystemTime},
13 net::SocketAddr,
14};
15
16use tokio::{
17 sync::{RwLock, Mutex},
18 time::interval,
19};
20use tracing::{debug, info, warn};
21
22use super::{
23 MonitoringError, NatTraversalAttempt, NatTraversalResult, MetricsSummary,
24};
25
26pub struct ProductionMetricsCollector {
28 config: MetricsConfig,
30 metrics_store: Arc<MetricsStore>,
32 sampler: Arc<AdaptiveSampler>,
34 aggregator: Arc<MetricsAggregator>,
36 exporter: Arc<MetricsExporter>,
38 circuit_breaker: Arc<CircuitBreaker>,
40 state: Arc<RwLock<CollectorState>>,
42 tasks: Arc<Mutex<Vec<tokio::task::JoinHandle<()>>>>,
44}
45
46impl ProductionMetricsCollector {
47 pub async fn new(config: MetricsConfig) -> Result<Self, MonitoringError> {
49 let metrics_store = Arc::new(MetricsStore::new(config.storage.clone()));
50 let sampler = Arc::new(AdaptiveSampler::new(config.sampling.clone()));
51 let aggregator = Arc::new(MetricsAggregator::new(config.aggregation.clone()));
52 let exporter = Arc::new(MetricsExporter::new(config.export.clone()));
53 let circuit_breaker = Arc::new(CircuitBreaker::new(config.circuit_breaker.clone()));
54
55 Ok(Self {
56 config,
57 metrics_store,
58 sampler,
59 aggregator,
60 exporter,
61 circuit_breaker,
62 state: Arc::new(RwLock::new(CollectorState::new())),
63 tasks: Arc::new(Mutex::new(Vec::new())),
64 })
65 }
66
67 pub async fn start(&self) -> Result<(), MonitoringError> {
69 info!("Starting production metrics collector");
70
71 {
73 let mut state = self.state.write().await;
74 state.status = CollectorStatus::Starting;
75 state.start_time = Some(Instant::now());
76 }
77
78 self.start_aggregation_task().await?;
80 self.start_export_task().await?;
81 self.start_cleanup_task().await?;
82 self.start_health_task().await?;
83
84 {
86 let mut state = self.state.write().await;
87 state.status = CollectorStatus::Running;
88 }
89
90 info!("Production metrics collector started");
91 Ok(())
92 }
93
94 pub async fn stop(&self) -> Result<(), MonitoringError> {
96 info!("Stopping production metrics collector");
97
98 {
100 let mut state = self.state.write().await;
101 state.status = CollectorStatus::Stopping;
102 }
103
104 let mut tasks = self.tasks.lock().await;
106 for task in tasks.drain(..) {
107 task.abort();
108 }
109
110 self.exporter.flush().await?;
112
113 {
115 let mut state = self.state.write().await;
116 state.status = CollectorStatus::Stopped;
117 }
118
119 info!("Production metrics collector stopped");
120 Ok(())
121 }
122
123 pub async fn record_nat_attempt(&self, attempt: &NatTraversalAttempt) -> Result<(), MonitoringError> {
125 if !self.circuit_breaker.allow_request().await {
127 return Ok(()); }
129
130 if !self.sampler.should_sample_attempt(attempt).await {
132 return Ok(());
133 }
134
135 let attempt_metric = AttemptMetric {
137 attempt_id: attempt.attempt_id.clone(),
138 timestamp: attempt.timestamp,
139 client_region: attempt.client_info.region.clone(),
140 server_region: attempt.server_info.region.clone(),
141 nat_types: (
142 attempt.client_info.nat_type.clone(),
143 attempt.server_info.nat_type.clone(),
144 ),
145 network_conditions: attempt.network_conditions.clone(),
146 };
147
148 self.metrics_store.record_attempt(attempt_metric).await?;
149
150 self.increment_counter("nat_attempts_total", &[
152 ("client_region", attempt.client_info.region.as_deref().unwrap_or("unknown")),
153 ("server_region", attempt.server_info.region.as_deref().unwrap_or("unknown")),
154 ]).await;
155
156 Ok(())
157 }
158
159 pub async fn record_nat_result(&self, result: &NatTraversalResult) -> Result<(), MonitoringError> {
161 let sample_rate = if result.success { 0.1 } else { 1.0 }; if !self.sampler.should_sample_with_rate(sample_rate).await {
164 return Ok(());
165 }
166
167 let result_metric = ResultMetric {
169 attempt_id: result.attempt_id.clone(),
170 success: result.success,
171 duration: result.duration,
172 error_category: result.error_info.as_ref().map(|e| e.error_category.clone()),
173 performance: result.performance_metrics.clone(),
174 connection_info: result.connection_info.clone(),
175 };
176
177 self.metrics_store.record_result(result_metric).await?;
178
179 let status = if result.success { "success" } else { "failure" };
181 self.increment_counter("nat_results_total", &[("status", status)]).await;
182
183 self.record_histogram("nat_duration_ms", result.duration.as_millis() as f64, &[
184 ("status", status),
185 ]).await;
186
187 if let Some(conn_info) = &result.connection_info {
188 self.record_histogram("connection_latency_ms", conn_info.quality.latency_ms as f64, &[]).await;
189 self.record_histogram("connection_throughput_mbps", conn_info.quality.throughput_mbps as f64, &[]).await;
190 }
191
192 if let Some(error_info) = &result.error_info {
194 self.increment_counter("nat_errors_total", &[
195 ("category", &format!("{:?}", error_info.error_category)),
196 ("code", &error_info.error_code),
197 ]).await;
198 }
199
200 Ok(())
201 }
202
203 pub async fn get_status(&self) -> String {
205 let state = self.state.read().await;
206 format!("{:?}", state.status)
207 }
208
209 pub async fn get_summary(&self) -> MetricsSummary {
211 self.metrics_store.get_summary().await
212 }
213
214 async fn increment_counter(&self, name: &str, labels: &[(&str, &str)]) {
216 self.metrics_store.increment_counter(name, labels).await;
217 }
218
219 async fn record_histogram(&self, name: &str, value: f64, labels: &[(&str, &str)]) {
221 self.metrics_store.record_histogram(name, value, labels).await;
222 }
223
224 async fn start_aggregation_task(&self) -> Result<(), MonitoringError> {
226 let aggregator = self.aggregator.clone();
227 let metrics_store = self.metrics_store.clone();
228 let interval_duration = self.config.aggregation.interval;
229
230 let task = tokio::spawn(async move {
231 let mut interval = interval(interval_duration);
232
233 loop {
234 interval.tick().await;
235
236 if let Err(e) = aggregator.aggregate_metrics(&metrics_store).await {
237 warn!("Metrics aggregation failed: {}", e);
238 }
239 }
240 });
241
242 self.tasks.lock().await.push(task);
243 Ok(())
244 }
245
246 async fn start_export_task(&self) -> Result<(), MonitoringError> {
248 let exporter = self.exporter.clone();
249 let aggregator = self.aggregator.clone();
250 let interval_duration = self.config.export.interval;
251
252 let task = tokio::spawn(async move {
253 let mut interval = interval(interval_duration);
254
255 loop {
256 interval.tick().await;
257
258 match aggregator.get_aggregated_metrics().await {
259 Ok(metrics) => {
260 if let Err(e) = exporter.export_metrics(metrics).await {
261 warn!("Metrics export failed: {}", e);
262 }
263 }
264 Err(e) => {
265 warn!("Failed to get aggregated metrics: {}", e);
266 }
267 }
268 }
269 });
270
271 self.tasks.lock().await.push(task);
272 Ok(())
273 }
274
275 async fn start_cleanup_task(&self) -> Result<(), MonitoringError> {
277 let metrics_store = self.metrics_store.clone();
278 let retention_period = self.config.storage.retention_period;
279
280 let task = tokio::spawn(async move {
281 let mut interval = interval(Duration::from_secs(3600)); loop {
284 interval.tick().await;
285
286 if let Err(e) = metrics_store.cleanup_old_data(retention_period).await {
287 warn!("Metrics cleanup failed: {}", e);
288 }
289 }
290 });
291
292 self.tasks.lock().await.push(task);
293 Ok(())
294 }
295
296 async fn start_health_task(&self) -> Result<(), MonitoringError> {
298 let circuit_breaker = self.circuit_breaker.clone();
299 let metrics_store = self.metrics_store.clone();
300 let state = self.state.clone();
301
302 let task = tokio::spawn(async move {
303 let mut interval = interval(Duration::from_secs(30)); loop {
306 interval.tick().await;
307
308 let health = metrics_store.get_health_metrics().await;
310 let metrics_per_second = health.metrics_per_second;
311 circuit_breaker.update_health(health).await;
312
313 let mut collector_state = state.write().await;
315 collector_state.last_health_check = Some(Instant::now());
316 collector_state.metrics_collected += metrics_per_second as u64;
317 }
318 });
319
320 self.tasks.lock().await.push(task);
321 Ok(())
322 }
323}
324
325#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
327pub struct MetricsConfig {
328 pub storage: StorageConfig,
330 pub sampling: SamplingConfig,
332 pub aggregation: AggregationConfig,
334 pub export: MetricsExportConfig,
336 pub circuit_breaker: CircuitBreakerConfig,
338}
339
340impl Default for MetricsConfig {
341 fn default() -> Self {
342 Self {
343 storage: StorageConfig::default(),
344 sampling: SamplingConfig::default(),
345 aggregation: AggregationConfig::default(),
346 export: MetricsExportConfig::default(),
347 circuit_breaker: CircuitBreakerConfig::default(),
348 }
349 }
350}
351
352#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
354pub struct StorageConfig {
355 pub max_metrics: usize,
357 pub retention_period: Duration,
359 pub flush_interval: Duration,
361}
362
363impl Default for StorageConfig {
364 fn default() -> Self {
365 Self {
366 max_metrics: 100_000,
367 retention_period: Duration::from_secs(3600), flush_interval: Duration::from_secs(60), }
370 }
371}
372
373#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
375pub struct SamplingConfig {
376 pub base_attempt_rate: f64,
378 pub result_rate: f64,
380 pub adaptive: AdaptiveSamplingConfig,
382}
383
384impl Default for SamplingConfig {
385 fn default() -> Self {
386 Self {
387 base_attempt_rate: 0.01, result_rate: 0.1, adaptive: AdaptiveSamplingConfig::default(),
390 }
391 }
392}
393
394#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
396pub struct AdaptiveSamplingConfig {
397 pub enabled: bool,
399 pub target_rate: f64,
401 pub adjustment_interval: Duration,
403 pub max_rate: f64,
405 pub min_rate: f64,
407}
408
409impl Default for AdaptiveSamplingConfig {
410 fn default() -> Self {
411 Self {
412 enabled: true,
413 target_rate: 1000.0, adjustment_interval: Duration::from_secs(60),
415 max_rate: 1.0,
416 min_rate: 0.001,
417 }
418 }
419}
420
421#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
423pub struct AggregationConfig {
424 pub interval: Duration,
426 pub window_size: Duration,
428 pub enable_percentiles: bool,
430}
431
432impl Default for AggregationConfig {
433 fn default() -> Self {
434 Self {
435 interval: Duration::from_secs(10), window_size: Duration::from_secs(60), enable_percentiles: true,
438 }
439 }
440}
441
442#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
444pub struct MetricsExportConfig {
445 pub interval: Duration,
447 pub destinations: Vec<ExportDestination>,
449 pub batch_size: usize,
451 pub timeout: Duration,
453}
454
455impl Default for MetricsExportConfig {
456 fn default() -> Self {
457 Self {
458 interval: Duration::from_secs(30),
459 destinations: vec![ExportDestination::Prometheus {
460 endpoint: "http://localhost:9090/api/v1/write".to_string(),
461 }],
462 batch_size: 1000,
463 timeout: Duration::from_secs(10),
464 }
465 }
466}
467
468#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
470pub enum ExportDestination {
471 Prometheus { endpoint: String },
472 InfluxDB { endpoint: String, database: String },
473 CloudWatch { region: String },
474 DataDog { api_key: String },
475 StatsD { endpoint: String },
476}
477
478#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
480pub struct CircuitBreakerConfig {
481 pub failure_threshold: u32,
483 pub success_threshold: u32,
485 pub timeout: Duration,
487 pub max_queue_size: usize,
489}
490
491impl Default for CircuitBreakerConfig {
492 fn default() -> Self {
493 Self {
494 failure_threshold: 5,
495 success_threshold: 3,
496 timeout: Duration::from_secs(60),
497 max_queue_size: 10000,
498 }
499 }
500}
501
502struct MetricsStore {
504 counters: Arc<RwLock<HashMap<String, CounterMetric>>>,
506 histograms: Arc<RwLock<HashMap<String, HistogramMetric>>>,
508 attempts: Arc<Mutex<VecDeque<AttemptMetric>>>,
510 results: Arc<Mutex<VecDeque<ResultMetric>>>,
512 config: StorageConfig,
514}
515
516impl MetricsStore {
517 fn new(config: StorageConfig) -> Self {
518 Self {
519 counters: Arc::new(RwLock::new(HashMap::new())),
520 histograms: Arc::new(RwLock::new(HashMap::new())),
521 attempts: Arc::new(Mutex::new(VecDeque::new())),
522 results: Arc::new(Mutex::new(VecDeque::new())),
523 config,
524 }
525 }
526
527 async fn record_attempt(&self, attempt: AttemptMetric) -> Result<(), MonitoringError> {
528 let mut attempts = self.attempts.lock().await;
529 attempts.push_back(attempt);
530
531 while attempts.len() > self.config.max_metrics {
533 attempts.pop_front();
534 }
535
536 Ok(())
537 }
538
539 async fn record_result(&self, result: ResultMetric) -> Result<(), MonitoringError> {
540 let mut results = self.results.lock().await;
541 results.push_back(result);
542
543 while results.len() > self.config.max_metrics {
545 results.pop_front();
546 }
547
548 Ok(())
549 }
550
551 async fn increment_counter(&self, name: &str, labels: &[(&str, &str)]) {
552 let key = format!("{}:{}", name, labels_to_string(labels));
553 let mut counters = self.counters.write().await;
554
555 counters.entry(key)
556 .or_insert_with(|| CounterMetric::new(name.to_string(), labels_to_map(labels)))
557 .increment();
558 }
559
560 async fn record_histogram(&self, name: &str, value: f64, labels: &[(&str, &str)]) {
561 let key = format!("{}:{}", name, labels_to_string(labels));
562 let mut histograms = self.histograms.write().await;
563
564 histograms.entry(key)
565 .or_insert_with(|| HistogramMetric::new(name.to_string(), labels_to_map(labels)))
566 .record(value);
567 }
568
569 async fn get_summary(&self) -> MetricsSummary {
570 let results = self.results.lock().await;
571 let _one_hour_ago = SystemTime::now() - Duration::from_secs(3600);
572
573 let recent_results: Vec<_> = results.iter()
574 .filter(|r| r.attempt_id.len() > 0) .collect();
576
577 let total_attempts = recent_results.len() as u64;
578 let successful = recent_results.iter().filter(|r| r.success).count() as u64;
579 let success_rate = if total_attempts > 0 {
580 successful as f32 / total_attempts as f32
581 } else {
582 0.0
583 };
584
585 let avg_duration = if !recent_results.is_empty() {
586 recent_results.iter()
587 .map(|r| r.duration.as_millis())
588 .sum::<u128>() / recent_results.len() as u128
589 } else {
590 0
591 };
592
593 MetricsSummary {
594 nat_attempts_last_hour: total_attempts,
595 success_rate_last_hour: success_rate,
596 avg_connection_time_ms: avg_duration as u64,
597 active_connections: 0, error_rate_last_hour: 1.0 - success_rate,
599 }
600 }
601
602 async fn cleanup_old_data(&self, retention_period: Duration) -> Result<(), MonitoringError> {
603 let cutoff = SystemTime::now() - retention_period;
604
605 {
607 let mut attempts = self.attempts.lock().await;
608 while let Some(front) = attempts.front() {
609 if front.timestamp < cutoff {
610 attempts.pop_front();
611 } else {
612 break;
613 }
614 }
615 }
616
617 Ok(())
621 }
622
623 async fn get_health_metrics(&self) -> HealthMetrics {
624 let attempts_count = self.attempts.lock().await.len();
625 let results_count = self.results.lock().await.len();
626
627 HealthMetrics {
628 metrics_per_second: (attempts_count + results_count) as f64 / 60.0, memory_usage_mb: ((attempts_count + results_count) * 1024) as f64 / 1024.0 / 1024.0,
630 queue_depth: attempts_count + results_count,
631 error_rate: 0.0, }
633 }
634}
635
636struct AdaptiveSampler {
638 config: SamplingConfig,
639 current_rate: Arc<AtomicU64>, last_adjustment: Arc<RwLock<Instant>>,
641}
642
643impl AdaptiveSampler {
644 fn new(config: SamplingConfig) -> Self {
645 let initial_rate = (config.base_attempt_rate * 1_000_000.0) as u64; Self {
648 config,
649 current_rate: Arc::new(AtomicU64::new(initial_rate)),
650 last_adjustment: Arc::new(RwLock::new(Instant::now())),
651 }
652 }
653
654 async fn should_sample_attempt(&self, _attempt: &NatTraversalAttempt) -> bool {
655 let rate = self.current_rate.load(Ordering::Relaxed) as f64 / 1_000_000.0;
656 rand::random::<f64>() < rate
657 }
658
659 async fn should_sample_with_rate(&self, rate: f64) -> bool {
660 rand::random::<f64>() < rate
661 }
662
663 async fn adjust_sampling_rate(&self, current_metrics_rate: f64) {
664 if !self.config.adaptive.enabled {
665 return;
666 }
667
668 let mut last_adjustment = self.last_adjustment.write().await;
669 if last_adjustment.elapsed() < self.config.adaptive.adjustment_interval {
670 return;
671 }
672
673 let target_rate = self.config.adaptive.target_rate;
674 let current_rate = self.current_rate.load(Ordering::Relaxed) as f64 / 1_000_000.0;
675
676 let adjustment_factor = target_rate / current_metrics_rate.max(1.0);
677 let new_rate = (current_rate * adjustment_factor)
678 .max(self.config.adaptive.min_rate)
679 .min(self.config.adaptive.max_rate);
680
681 self.current_rate.store((new_rate * 1_000_000.0) as u64, Ordering::Relaxed);
682 *last_adjustment = Instant::now();
683
684 debug!("Adjusted sampling rate from {:.4} to {:.4}", current_rate, new_rate);
685 }
686}
687
688struct CircuitBreaker {
690 config: CircuitBreakerConfig,
691 state: Arc<RwLock<CircuitBreakerState>>,
692 consecutive_failures: Arc<AtomicU64>,
693 consecutive_successes: Arc<AtomicU64>,
694 queue_size: Arc<AtomicU64>,
695}
696
697impl CircuitBreaker {
698 fn new(config: CircuitBreakerConfig) -> Self {
699 Self {
700 config,
701 state: Arc::new(RwLock::new(CircuitBreakerState::Closed)),
702 consecutive_failures: Arc::new(AtomicU64::new(0)),
703 consecutive_successes: Arc::new(AtomicU64::new(0)),
704 queue_size: Arc::new(AtomicU64::new(0)),
705 }
706 }
707
708 async fn allow_request(&self) -> bool {
709 let state = self.state.read().await;
710
711 match *state {
712 CircuitBreakerState::Closed => {
713 self.queue_size.load(Ordering::Relaxed) < self.config.max_queue_size as u64
715 }
716 CircuitBreakerState::Open => false,
717 CircuitBreakerState::HalfOpen => {
718 self.queue_size.load(Ordering::Relaxed) < (self.config.max_queue_size / 10) as u64
720 }
721 }
722 }
723
724 async fn update_health(&self, health: HealthMetrics) {
725 self.queue_size.store(health.queue_depth as u64, Ordering::Relaxed);
727
728 if health.error_rate > 0.5 {
730 let failures = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
732 self.consecutive_successes.store(0, Ordering::Relaxed);
733
734 if failures >= self.config.failure_threshold as u64 {
735 let mut state = self.state.write().await;
736 *state = CircuitBreakerState::Open;
737 warn!("Circuit breaker opened due to high error rate");
738 }
739 } else {
740 let successes = self.consecutive_successes.fetch_add(1, Ordering::Relaxed) + 1;
742 self.consecutive_failures.store(0, Ordering::Relaxed);
743
744 let current_state = *self.state.read().await;
745 if matches!(current_state, CircuitBreakerState::Open) && successes >= self.config.success_threshold as u64 {
746 let mut state = self.state.write().await;
747 *state = CircuitBreakerState::HalfOpen;
748 info!("Circuit breaker moved to half-open state");
749 } else if matches!(current_state, CircuitBreakerState::HalfOpen) && successes >= self.config.success_threshold as u64 * 2 {
750 let mut state = self.state.write().await;
751 *state = CircuitBreakerState::Closed;
752 info!("Circuit breaker closed - system recovered");
753 }
754 }
755 }
756}
757
758#[derive(Debug, Clone, Copy)]
760enum CircuitBreakerState {
761 Closed,
762 Open,
763 HalfOpen,
764}
765
766struct MetricsAggregator {
768 config: AggregationConfig,
769 aggregated_data: Arc<RwLock<AggregatedData>>,
770}
771
772impl MetricsAggregator {
773 fn new(config: AggregationConfig) -> Self {
774 Self {
775 config,
776 aggregated_data: Arc::new(RwLock::new(AggregatedData::new())),
777 }
778 }
779
780 async fn aggregate_metrics(&self, _metrics_store: &MetricsStore) -> Result<(), MonitoringError> {
781 debug!("Aggregating metrics for export");
784 Ok(())
785 }
786
787 async fn get_aggregated_metrics(&self) -> Result<Vec<ExportMetric>, MonitoringError> {
788 let data = self.aggregated_data.read().await;
789 Ok(data.to_export_metrics())
790 }
791}
792
793struct MetricsExporter {
795 config: MetricsExportConfig,
796}
797
798impl MetricsExporter {
799 fn new(config: MetricsExportConfig) -> Self {
800 Self { config }
801 }
802
803 async fn export_metrics(&self, metrics: Vec<ExportMetric>) -> Result<(), MonitoringError> {
804 for destination in &self.config.destinations {
805 if let Err(e) = self.export_to_destination(destination, &metrics).await {
806 warn!("Failed to export to {:?}: {}", destination, e);
807 }
808 }
809 Ok(())
810 }
811
812 async fn export_to_destination(&self, destination: &ExportDestination, metrics: &[ExportMetric]) -> Result<(), MonitoringError> {
813 match destination {
814 ExportDestination::Prometheus { endpoint } => {
815 debug!("Exporting {} metrics to Prometheus at {}", metrics.len(), endpoint);
816 }
818 ExportDestination::InfluxDB { endpoint, database } => {
819 debug!("Exporting {} metrics to InfluxDB at {} (db: {})", metrics.len(), endpoint, database);
820 }
822 _ => {
823 debug!("Export to {:?} not yet implemented", destination);
824 }
825 }
826 Ok(())
827 }
828
829 async fn flush(&self) -> Result<(), MonitoringError> {
830 debug!("Flushing remaining metrics");
831 Ok(())
832 }
833}
834
835fn labels_to_string(labels: &[(&str, &str)]) -> String {
838 labels.iter()
839 .map(|(k, v)| format!("{}={}", k, v))
840 .collect::<Vec<_>>()
841 .join(",")
842}
843
844fn labels_to_map(labels: &[(&str, &str)]) -> HashMap<String, String> {
845 labels.iter()
846 .map(|(k, v)| (k.to_string(), v.to_string()))
847 .collect()
848}
849
850#[derive(Debug)]
852struct CollectorState {
853 status: CollectorStatus,
854 start_time: Option<Instant>,
855 last_health_check: Option<Instant>,
856 metrics_collected: u64,
857 errors_encountered: u64,
858}
859
860impl CollectorState {
861 fn new() -> Self {
862 Self {
863 status: CollectorStatus::Stopped,
864 start_time: None,
865 last_health_check: None,
866 metrics_collected: 0,
867 errors_encountered: 0,
868 }
869 }
870}
871
872#[derive(Debug, Clone)]
874enum CollectorStatus {
875 Stopped,
876 Starting,
877 Running,
878 Stopping,
879 Error,
880}
881
882#[derive(Debug, Clone)]
884struct HealthMetrics {
885 metrics_per_second: f64,
886 memory_usage_mb: f64,
887 queue_depth: usize,
888 error_rate: f64,
889}
890
891#[derive(Debug, Clone)]
893struct AttemptMetric {
894 attempt_id: String,
895 timestamp: SystemTime,
896 client_region: Option<String>,
897 server_region: Option<String>,
898 nat_types: (Option<crate::monitoring::NatType>, Option<crate::monitoring::NatType>),
899 network_conditions: crate::monitoring::NetworkConditions,
900}
901
902#[derive(Debug, Clone)]
904struct ResultMetric {
905 attempt_id: String,
906 success: bool,
907 duration: Duration,
908 error_category: Option<crate::monitoring::ErrorCategory>,
909 performance: crate::monitoring::PerformanceMetrics,
910 connection_info: Option<crate::monitoring::ConnectionInfo>,
911}
912
913#[derive(Debug)]
915struct CounterMetric {
916 name: String,
917 labels: HashMap<String, String>,
918 value: AtomicU64,
919 last_updated: std::sync::RwLock<Instant>,
920}
921
922impl CounterMetric {
923 fn new(name: String, labels: HashMap<String, String>) -> Self {
924 Self {
925 name,
926 labels,
927 value: AtomicU64::new(0),
928 last_updated: std::sync::RwLock::new(Instant::now()),
929 }
930 }
931
932 fn increment(&self) {
933 self.value.fetch_add(1, Ordering::Relaxed);
934 if let Ok(mut last_updated) = self.last_updated.write() {
935 *last_updated = Instant::now();
936 }
937 }
938
939 fn get_value(&self) -> u64 {
940 self.value.load(Ordering::Relaxed)
941 }
942}
943
944#[derive(Debug)]
946struct HistogramMetric {
947 name: String,
948 labels: HashMap<String, String>,
949 values: std::sync::Mutex<Vec<f64>>,
950 buckets: Vec<f64>,
951 last_updated: std::sync::RwLock<Instant>,
952}
953
954impl HistogramMetric {
955 fn new(name: String, labels: HashMap<String, String>) -> Self {
956 let buckets = vec![
958 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 25.0, 50.0, 100.0
959 ];
960
961 Self {
962 name,
963 labels,
964 values: std::sync::Mutex::new(Vec::new()),
965 buckets,
966 last_updated: std::sync::RwLock::new(Instant::now()),
967 }
968 }
969
970 fn record(&self, value: f64) {
971 if let Ok(mut values) = self.values.lock() {
972 values.push(value);
973 if values.len() > 10000 {
975 values.drain(0..5000);
976 }
977 }
978 if let Ok(mut last_updated) = self.last_updated.write() {
979 *last_updated = Instant::now();
980 }
981 }
982
983 fn get_percentile(&self, percentile: f64) -> Option<f64> {
984 let values = self.values.lock().ok()?;
985 if values.is_empty() {
986 return None;
987 }
988
989 let mut sorted_values = values.clone();
990 sorted_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
991
992 let index = ((percentile / 100.0) * (sorted_values.len() - 1) as f64) as usize;
993 Some(sorted_values[index])
994 }
995
996 fn get_bucket_counts(&self) -> Vec<(f64, u64)> {
997 let values = match self.values.lock() {
998 Ok(guard) => guard,
999 Err(_) => return Vec::new(),
1000 };
1001
1002 self.buckets.iter().map(|&bucket| {
1003 let count = values.iter().filter(|&&v| v <= bucket).count() as u64;
1004 (bucket, count)
1005 }).collect()
1006 }
1007}
1008
1009#[derive(Debug, Clone)]
1011pub struct BootstrapNodeMetrics {
1012 pub address: SocketAddr,
1014 pub coordination_requests: u64,
1016 pub successful_coordinations: u64,
1018 pub avg_response_time_ms: f64,
1020 pub availability: f64,
1022 pub last_contact: Option<SystemTime>,
1024 pub error_rate: f64,
1026}
1027
1028#[derive(Debug, Clone)]
1030pub struct NatTypeMetrics {
1031 pub nat_type: crate::monitoring::NatType,
1033 pub total_attempts: u64,
1035 pub successful_attempts: u64,
1037 pub success_rate: f64,
1039 pub avg_connection_time_ms: f64,
1041 pub common_failures: Vec<(String, u64)>,
1043}
1044
1045#[derive(Debug, Clone)]
1047pub struct LatencyMetrics {
1048 pub connection_latency_p50: f64,
1050 pub connection_latency_p95: f64,
1051 pub connection_latency_p99: f64,
1052 pub rtt_p50: f64,
1054 pub rtt_p95: f64,
1055 pub rtt_p99: f64,
1056 pub jitter_avg: f64,
1058 pub jitter_max: f64,
1059}
1060
1061impl ProductionMetricsCollector {
1063 pub async fn record_bootstrap_performance(
1065 &self,
1066 node_address: SocketAddr,
1067 response_time: Duration,
1068 success: bool,
1069 ) -> Result<(), MonitoringError> {
1070 let node_str = node_address.to_string();
1072 let status_str = if success { "success" } else { "failure" };
1073 let labels = &[
1074 ("node", node_str.as_str()),
1075 ("status", status_str),
1076 ];
1077
1078 self.increment_counter("bootstrap_requests_total", labels).await;
1079 self.record_histogram("bootstrap_response_time_ms", response_time.as_millis() as f64, &[
1080 ("node", &node_address.to_string()),
1081 ]).await;
1082
1083 if !success {
1084 self.increment_counter("bootstrap_errors_total", &[
1085 ("node", &node_address.to_string()),
1086 ]).await;
1087 }
1088
1089 Ok(())
1090 }
1091
1092 pub async fn record_nat_type_result(
1094 &self,
1095 nat_type: crate::monitoring::NatType,
1096 success: bool,
1097 duration: Duration,
1098 error_category: Option<crate::monitoring::ErrorCategory>,
1099 ) -> Result<(), MonitoringError> {
1100 let nat_type_str = format!("{:?}", nat_type);
1101 let status = if success { "success" } else { "failure" };
1102
1103 self.increment_counter("nat_traversal_by_type_total", &[
1105 ("nat_type", &nat_type_str),
1106 ("status", status),
1107 ]).await;
1108
1109 self.record_histogram("nat_traversal_duration_by_type_ms", duration.as_millis() as f64, &[
1111 ("nat_type", &nat_type_str),
1112 ("status", status),
1113 ]).await;
1114
1115 if let Some(error_cat) = error_category {
1117 self.increment_counter("nat_traversal_errors_by_type", &[
1118 ("nat_type", &nat_type_str),
1119 ("error_category", &format!("{:?}", error_cat)),
1120 ]).await;
1121 }
1122
1123 Ok(())
1124 }
1125
1126 pub async fn record_connection_quality(
1128 &self,
1129 latency_ms: u32,
1130 jitter_ms: u32,
1131 throughput_mbps: f32,
1132 packet_loss_rate: f32,
1133 ) -> Result<(), MonitoringError> {
1134 self.record_histogram("connection_latency_ms", latency_ms as f64, &[]).await;
1136 self.record_histogram("connection_jitter_ms", jitter_ms as f64, &[]).await;
1137 self.record_histogram("connection_throughput_mbps", throughput_mbps as f64, &[]).await;
1138 self.record_histogram("connection_packet_loss_rate", packet_loss_rate as f64, &[]).await;
1139
1140 Ok(())
1141 }
1142
1143 pub async fn get_bootstrap_metrics(&self) -> Vec<BootstrapNodeMetrics> {
1145 let counters = self.metrics_store.counters.read().await;
1146 let histograms = self.metrics_store.histograms.read().await;
1147
1148 let mut node_metrics = HashMap::new();
1149
1150 for (key, counter) in counters.iter() {
1152 if key.starts_with("bootstrap_requests_total:") {
1153 if let Some(node_addr) = extract_label_value(key, "node") {
1154 let entry = node_metrics.entry(node_addr.clone()).or_insert_with(|| BootstrapNodeMetrics {
1155 address: node_addr.parse().unwrap_or_else(|_| "0.0.0.0:0".parse().unwrap()),
1156 coordination_requests: 0,
1157 successful_coordinations: 0,
1158 avg_response_time_ms: 0.0,
1159 availability: 1.0,
1160 last_contact: Some(SystemTime::now()),
1161 error_rate: 0.0,
1162 });
1163
1164 if key.contains("status=success") {
1165 entry.successful_coordinations = counter.get_value();
1166 }
1167 entry.coordination_requests += counter.get_value();
1168 }
1169 }
1170 }
1171
1172 for (key, histogram) in histograms.iter() {
1174 if key.starts_with("bootstrap_response_time_ms:") {
1175 if let Some(node_addr) = extract_label_value(key, "node") {
1176 if let Some(entry) = node_metrics.get_mut(&node_addr) {
1177 entry.avg_response_time_ms = histogram.get_percentile(50.0).unwrap_or(0.0);
1178 }
1179 }
1180 }
1181 }
1182
1183 for metrics in node_metrics.values_mut() {
1185 if metrics.coordination_requests > 0 {
1186 metrics.availability = metrics.successful_coordinations as f64 / metrics.coordination_requests as f64;
1187 metrics.error_rate = 1.0 - metrics.availability;
1188 }
1189 }
1190
1191 node_metrics.into_values().collect()
1192 }
1193
1194 pub async fn get_nat_type_metrics(&self) -> Vec<NatTypeMetrics> {
1196 let counters = self.metrics_store.counters.read().await;
1197 let histograms = self.metrics_store.histograms.read().await;
1198
1199 let mut nat_metrics = HashMap::new();
1200
1201 for (key, counter) in counters.iter() {
1203 if key.starts_with("nat_traversal_by_type_total:") {
1204 if let Some(nat_type_str) = extract_label_value(key, "nat_type") {
1205 let nat_type = parse_nat_type(&nat_type_str);
1206 let entry = nat_metrics.entry(nat_type_str.clone()).or_insert_with(|| NatTypeMetrics {
1207 nat_type,
1208 total_attempts: 0,
1209 successful_attempts: 0,
1210 success_rate: 0.0,
1211 avg_connection_time_ms: 0.0,
1212 common_failures: Vec::new(),
1213 });
1214
1215 if key.contains("status=success") {
1216 entry.successful_attempts = counter.get_value();
1217 }
1218 entry.total_attempts += counter.get_value();
1219 }
1220 }
1221 }
1222
1223 for (key, histogram) in histograms.iter() {
1225 if key.starts_with("nat_traversal_duration_by_type_ms:") && key.contains("status=success") {
1226 if let Some(nat_type_str) = extract_label_value(key, "nat_type") {
1227 if let Some(entry) = nat_metrics.get_mut(&nat_type_str) {
1228 entry.avg_connection_time_ms = histogram.get_percentile(50.0).unwrap_or(0.0);
1229 }
1230 }
1231 }
1232 }
1233
1234 for metrics in nat_metrics.values_mut() {
1236 if metrics.total_attempts > 0 {
1237 metrics.success_rate = metrics.successful_attempts as f64 / metrics.total_attempts as f64;
1238 }
1239 }
1240
1241 nat_metrics.into_values().collect()
1242 }
1243
1244 pub async fn get_latency_metrics(&self) -> LatencyMetrics {
1246 let histograms = self.metrics_store.histograms.read().await;
1247
1248 let connection_latency = histograms.get("connection_latency_ms:");
1249 let rtt_histogram = histograms.get("connection_rtt_ms:");
1250 let jitter_histogram = histograms.get("connection_jitter_ms:");
1251
1252 LatencyMetrics {
1253 connection_latency_p50: connection_latency.and_then(|h| h.get_percentile(50.0)).unwrap_or(0.0),
1254 connection_latency_p95: connection_latency.and_then(|h| h.get_percentile(95.0)).unwrap_or(0.0),
1255 connection_latency_p99: connection_latency.and_then(|h| h.get_percentile(99.0)).unwrap_or(0.0),
1256 rtt_p50: rtt_histogram.and_then(|h| h.get_percentile(50.0)).unwrap_or(0.0),
1257 rtt_p95: rtt_histogram.and_then(|h| h.get_percentile(95.0)).unwrap_or(0.0),
1258 rtt_p99: rtt_histogram.and_then(|h| h.get_percentile(99.0)).unwrap_or(0.0),
1259 jitter_avg: jitter_histogram.and_then(|h| h.get_percentile(50.0)).unwrap_or(0.0),
1260 jitter_max: jitter_histogram.and_then(|h| h.get_percentile(100.0)).unwrap_or(0.0),
1261 }
1262 }
1263}
1264
1265fn extract_label_value(key: &str, label_name: &str) -> Option<String> {
1267 let label_prefix = format!("{}=", label_name);
1268 key.split(',')
1269 .find(|part| part.contains(&label_prefix))
1270 .and_then(|part| part.split('=').nth(1))
1271 .map(|s| s.to_string())
1272}
1273
1274fn parse_nat_type(nat_type_str: &str) -> crate::monitoring::NatType {
1275 match nat_type_str {
1276 "FullCone" => crate::monitoring::NatType::FullCone,
1277 "RestrictedCone" => crate::monitoring::NatType::RestrictedCone,
1278 "PortRestrictedCone" => crate::monitoring::NatType::PortRestrictedCone,
1279 "Symmetric" => crate::monitoring::NatType::Symmetric,
1280 "CarrierGrade" => crate::monitoring::NatType::CarrierGrade,
1281 "DoubleNat" => crate::monitoring::NatType::DoubleNat,
1282 "None" => crate::monitoring::NatType::None,
1283 _ => crate::monitoring::NatType::None,
1284 }
1285}
1286
1287#[derive(Debug)]
1289struct AggregatedData {
1290 counters: HashMap<String, u64>,
1292 histograms: HashMap<String, HistogramSummary>,
1294 last_aggregation: Instant,
1296}
1297
1298impl AggregatedData {
1299 fn new() -> Self {
1300 Self {
1301 counters: HashMap::new(),
1302 histograms: HashMap::new(),
1303 last_aggregation: Instant::now(),
1304 }
1305 }
1306
1307 fn to_export_metrics(&self) -> Vec<ExportMetric> {
1308 let mut metrics = Vec::new();
1309
1310 for (name, value) in &self.counters {
1312 metrics.push(ExportMetric {
1313 name: name.clone(),
1314 metric_type: MetricType::Counter,
1315 value: MetricValue::Counter(*value),
1316 labels: HashMap::new(),
1317 timestamp: SystemTime::now(),
1318 });
1319 }
1320
1321 for (name, summary) in &self.histograms {
1323 metrics.push(ExportMetric {
1324 name: format!("{}_p50", name),
1325 metric_type: MetricType::Gauge,
1326 value: MetricValue::Gauge(summary.p50),
1327 labels: HashMap::new(),
1328 timestamp: SystemTime::now(),
1329 });
1330
1331 metrics.push(ExportMetric {
1332 name: format!("{}_p95", name),
1333 metric_type: MetricType::Gauge,
1334 value: MetricValue::Gauge(summary.p95),
1335 labels: HashMap::new(),
1336 timestamp: SystemTime::now(),
1337 });
1338
1339 metrics.push(ExportMetric {
1340 name: format!("{}_p99", name),
1341 metric_type: MetricType::Gauge,
1342 value: MetricValue::Gauge(summary.p99),
1343 labels: HashMap::new(),
1344 timestamp: SystemTime::now(),
1345 });
1346 }
1347
1348 metrics
1349 }
1350}
1351
1352#[derive(Debug, Clone)]
1354struct HistogramSummary {
1355 pub count: u64,
1356 pub sum: f64,
1357 pub p50: f64,
1358 pub p95: f64,
1359 pub p99: f64,
1360}
1361
1362#[derive(Debug, Clone)]
1364pub struct ExportMetric {
1365 pub name: String,
1366 pub metric_type: MetricType,
1367 pub value: MetricValue,
1368 pub labels: HashMap<String, String>,
1369 pub timestamp: SystemTime,
1370}
1371
1372#[derive(Debug, Clone)]
1374pub enum MetricType {
1375 Counter,
1376 Gauge,
1377 Histogram,
1378}
1379
1380#[derive(Debug, Clone)]
1382pub enum MetricValue {
1383 Counter(u64),
1384 Gauge(f64),
1385 Histogram(Vec<(f64, u64)>), }
1387
1388
1389
1390#[cfg(test)]
1391mod tests {
1392 use super::*;
1393
1394 #[tokio::test]
1395 async fn test_metrics_collector_creation() {
1396 let config = MetricsConfig::default();
1397 let collector = ProductionMetricsCollector::new(config).await.unwrap();
1398
1399 let status = collector.get_status().await;
1400 assert!(status.contains("Stopped"));
1401 }
1402
1403 #[tokio::test]
1404 async fn test_adaptive_sampler() {
1405 let mut config = SamplingConfig::default();
1406 config.adaptive.adjustment_interval = Duration::from_millis(10);
1408 let sampler = AdaptiveSampler::new(config.clone());
1409
1410 tokio::time::sleep(Duration::from_millis(20)).await;
1412
1413 sampler.adjust_sampling_rate(500.0).await; let rate = sampler.current_rate.load(Ordering::Relaxed) as f64 / 1_000_000.0;
1418 let expected_rate = config.base_attempt_rate * 2.0; assert!((rate - expected_rate).abs() < 0.001); }
1421
1422 #[tokio::test]
1423 async fn test_circuit_breaker() {
1424 let config = CircuitBreakerConfig::default();
1425 let breaker = CircuitBreaker::new(config);
1426
1427 assert!(breaker.allow_request().await);
1429
1430 let bad_health = HealthMetrics {
1432 metrics_per_second: 1000.0,
1433 memory_usage_mb: 100.0,
1434 queue_depth: 100,
1435 error_rate: 0.8, };
1437
1438 for _ in 0..10 {
1440 breaker.update_health(bad_health.clone()).await;
1441 }
1442
1443 }
1446}