1use super::*;
25use crate::adaptive::{
26 AdaptiveGossipSub, AdaptiveRouter, ChurnHandler, ContentStore, ReplicationManager,
27 learning::{QLearnCacheManager, ThompsonSampling},
28};
29use anyhow::Result;
30#[cfg(feature = "metrics")]
31use prometheus::{
32 Counter, Encoder, Gauge, Histogram, IntCounter, IntGauge, Registry, TextEncoder,
33 register_counter, register_gauge, register_histogram, register_int_counter, register_int_gauge,
34};
35use std::{
36 collections::{HashMap, VecDeque},
37 sync::Arc,
38 time::{Duration, Instant},
39};
40use tokio::sync::{RwLock, mpsc};
41
42pub struct MonitoringSystem {
44 #[cfg(feature = "metrics")]
45 registry: Arc<Registry>,
47
48 metrics: Arc<NetworkMetrics>,
50
51 anomaly_detector: Arc<AnomalyDetector>,
53
54 alert_manager: Arc<AlertManager>,
56
57 profiler: Arc<PerformanceProfiler>,
59
60 logger: Arc<DebugLogger>,
62
63 components: Arc<MonitoredComponents>,
65
66 config: MonitoringConfig,
68}
69
70#[derive(Debug, Clone)]
72pub struct MonitoringConfig {
73 pub collection_interval: Duration,
75
76 pub anomaly_window_size: usize,
78
79 pub alert_cooldown: Duration,
81
82 pub profiling_sample_rate: f64,
84
85 pub log_level: LogLevel,
87
88 pub dashboard_interval: Duration,
90}
91
92impl Default for MonitoringConfig {
93 fn default() -> Self {
94 Self {
95 collection_interval: Duration::from_secs(5),
96 anomaly_window_size: 100,
97 alert_cooldown: Duration::from_secs(300),
98 profiling_sample_rate: 0.01,
99 log_level: LogLevel::Info,
100 dashboard_interval: Duration::from_secs(1),
101 }
102 }
103}
104
105#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
107pub enum LogLevel {
108 Error,
109 Warn,
110 Info,
111 Debug,
112 Trace,
113}
114
115#[allow(dead_code)]
117pub(crate) struct NetworkMetrics {
118 #[cfg(feature = "metrics")]
119 connected_nodes: IntGauge,
121 #[cfg(feature = "metrics")]
122 active_nodes: IntGauge,
123 #[cfg(feature = "metrics")]
124 suspicious_nodes: IntGauge,
125 #[cfg(feature = "metrics")]
126 failed_nodes: IntGauge,
127
128 #[cfg(feature = "metrics")]
129 routing_requests: Counter,
131 #[cfg(feature = "metrics")]
132 routing_success: Counter,
133 #[cfg(feature = "metrics")]
134 routing_latency: Histogram,
135
136 #[cfg(feature = "metrics")]
137 stored_items: IntGauge,
139 #[cfg(feature = "metrics")]
140 storage_bytes: IntGauge,
141 #[cfg(feature = "metrics")]
142 replication_factor: Gauge,
143
144 #[cfg(feature = "metrics")]
145 messages_sent: Counter,
147 #[cfg(feature = "metrics")]
148 messages_received: Counter,
149 #[cfg(feature = "metrics")]
150 bytes_sent: Counter,
151 #[cfg(feature = "metrics")]
152 bytes_received: Counter,
153
154 #[cfg(feature = "metrics")]
155 cache_hits: Counter,
157 #[cfg(feature = "metrics")]
158 cache_misses: Counter,
159 #[cfg(feature = "metrics")]
160 cache_size: IntGauge,
161 #[cfg(feature = "metrics")]
162 cache_evictions: Counter,
163
164 #[cfg(feature = "metrics")]
165 thompson_selections: IntCounter,
167 #[cfg(feature = "metrics")]
168 qlearn_updates: Counter,
169 #[cfg(feature = "metrics")]
170 churn_predictions: Counter,
171
172 #[cfg(feature = "metrics")]
173 gossip_messages: Counter,
175 #[cfg(feature = "metrics")]
176 mesh_size: IntGauge,
177 #[cfg(feature = "metrics")]
178 topic_count: IntGauge,
179
180 #[cfg(feature = "metrics")]
181 cpu_usage: Gauge,
183 #[cfg(feature = "metrics")]
184 memory_usage: IntGauge,
185 #[cfg(feature = "metrics")]
186 thread_count: IntGauge,
187
188 #[cfg(not(feature = "metrics"))]
189 _placeholder: (),
191}
192
193pub struct MonitoredComponents {
195 pub router: Arc<AdaptiveRouter>,
196 pub churn_handler: Arc<ChurnHandler>,
197 pub gossip: Arc<AdaptiveGossipSub>,
198 pub storage: Arc<ContentStore>,
199 pub replication: Arc<ReplicationManager>,
200 pub thompson: Arc<ThompsonSampling>,
201 pub cache: Arc<QLearnCacheManager>,
202}
203
204pub struct AnomalyDetector {
206 history: Arc<RwLock<HashMap<String, MetricHistory>>>,
208
209 anomalies: Arc<RwLock<Vec<Anomaly>>>,
211
212 window_size: usize,
214}
215
216struct MetricHistory {
218 values: VecDeque<f64>,
220
221 mean: f64,
223 std_dev: f64,
224
225 last_update: Instant,
227}
228
229#[derive(Debug, Clone)]
231pub struct Anomaly {
232 pub metric: String,
234
235 pub anomaly_type: AnomalyType,
237
238 pub severity: f64,
240
241 pub detected_at: Instant,
243
244 pub value: f64,
246
247 pub expected_range: (f64, f64),
249}
250
251#[derive(Debug, Clone, PartialEq)]
253pub enum AnomalyType {
254 Statistical,
256
257 Spike,
259
260 Drift,
262
263 Pattern,
265}
266
267pub struct AlertManager {
269 active_alerts: Arc<RwLock<HashMap<String, Alert>>>,
271
272 rules: Arc<RwLock<Vec<AlertRule>>>,
274
275 channels: Arc<RwLock<Vec<Box<dyn AlertChannel>>>>,
277
278 cooldowns: Arc<RwLock<HashMap<String, Instant>>>,
280
281 cooldown_period: Duration,
283}
284
285#[derive(Debug, Clone)]
287pub struct Alert {
288 pub id: String,
290
291 pub name: String,
293
294 pub severity: AlertSeverity,
296
297 pub message: String,
299
300 pub triggered_at: Instant,
302
303 pub metrics: HashMap<String, f64>,
305}
306
307#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
309pub enum AlertSeverity {
310 Info,
311 Warning,
312 Critical,
313}
314
315#[derive(Debug, Clone)]
317pub struct AlertRule {
318 pub name: String,
320
321 pub condition: AlertCondition,
323
324 pub severity: AlertSeverity,
326
327 pub message_template: String,
329}
330
331#[derive(Debug, Clone)]
333pub enum AlertCondition {
334 Above { metric: String, threshold: f64 },
336
337 Below { metric: String, threshold: f64 },
339
340 RateOfChange { metric: String, threshold: f64 },
342
343 AnomalyDetected { metric: String },
345}
346
347#[async_trait]
349pub trait AlertChannel: Send + Sync {
350 async fn send_alert(&self, alert: &Alert) -> Result<()>;
352}
353
354pub struct PerformanceProfiler {
356 profiles: Arc<RwLock<HashMap<String, Profile>>>,
358
359 completed: Arc<RwLock<VecDeque<CompletedProfile>>>,
361
362 sample_rate: f64,
364}
365
366struct Profile {
368 name: String,
370
371 started_at: Instant,
373
374 samples: Vec<ProfileSample>,
376}
377
378#[derive(Debug, Clone)]
380struct ProfileSample {
381 _timestamp: Instant,
383
384 cpu_usage: f64,
386
387 memory_bytes: u64,
389
390 _operations: HashMap<String, u64>,
392}
393
394#[derive(Debug, Clone)]
396pub struct CompletedProfile {
397 pub name: String,
399
400 pub duration: Duration,
402
403 pub avg_cpu: f64,
405
406 pub peak_memory: u64,
408
409 pub operations: HashMap<String, u64>,
411}
412
413pub struct DebugLogger {
415 level: LogLevel,
417
418 buffer: Arc<RwLock<VecDeque<LogEntry>>>,
420
421 channels: Arc<RwLock<Vec<mpsc::UnboundedSender<LogEntry>>>>,
423}
424
425#[derive(Debug, Clone)]
427pub struct LogEntry {
428 pub timestamp: Instant,
430
431 pub level: LogLevel,
433
434 pub component: String,
436
437 pub message: String,
439
440 pub data: Option<serde_json::Value>,
442}
443
444impl MonitoringSystem {
445 pub fn new(components: MonitoredComponents, config: MonitoringConfig) -> Result<Self> {
447 Self::new_with_registry(components, config, None)
448 }
449
450 pub fn new_with_registry(
452 components: MonitoredComponents,
453 config: MonitoringConfig,
454 #[cfg(feature = "metrics")] custom_registry: Option<Registry>,
455 #[cfg(not(feature = "metrics"))] _custom_registry: Option<()>,
456 ) -> Result<Self> {
457 #[cfg(feature = "metrics")]
459 let is_test = custom_registry.is_some();
460 #[cfg(feature = "metrics")]
461 let metric_prefix = if is_test {
462 format!("p2p_test_{}_", std::process::id())
463 } else {
464 "p2p_".to_string()
465 };
466
467 #[cfg(feature = "metrics")]
468 let registry = custom_registry.unwrap_or_default();
469
470 #[cfg(feature = "metrics")]
472 let metrics = if is_test {
473 use prometheus::{Counter, Gauge, Histogram, HistogramOpts, IntCounter, IntGauge};
475
476 let connected_nodes = IntGauge::new(
478 format!("{}connected_nodes", metric_prefix),
479 "Number of connected nodes",
480 )?;
481 registry.register(Box::new(connected_nodes.clone()))?;
482
483 let active_nodes = IntGauge::new(
484 format!("{}active_nodes", metric_prefix),
485 "Number of active nodes",
486 )?;
487 registry.register(Box::new(active_nodes.clone()))?;
488
489 let suspicious_nodes = IntGauge::new(
490 format!("{}suspicious_nodes", metric_prefix),
491 "Number of suspicious nodes",
492 )?;
493 registry.register(Box::new(suspicious_nodes.clone()))?;
494
495 let failed_nodes = IntGauge::new(
496 format!("{}failed_nodes", metric_prefix),
497 "Number of failed nodes",
498 )?;
499 registry.register(Box::new(failed_nodes.clone()))?;
500
501 let routing_requests = Counter::new(
503 format!("{}routing_requests_total", metric_prefix),
504 "Total routing requests",
505 )?;
506 registry.register(Box::new(routing_requests.clone()))?;
507
508 let routing_success = Counter::new(
509 format!("{}routing_success_total", metric_prefix),
510 "Successful routing requests",
511 )?;
512 registry.register(Box::new(routing_success.clone()))?;
513
514 let routing_latency = Histogram::with_opts(HistogramOpts::new(
515 format!("{}routing_latency_seconds", metric_prefix),
516 "Routing request latency in seconds",
517 ))?;
518 registry.register(Box::new(routing_latency.clone()))?;
519
520 let stored_items = IntGauge::new(
522 format!("{}stored_items", metric_prefix),
523 "Number of stored items",
524 )?;
525 registry.register(Box::new(stored_items.clone()))?;
526
527 let storage_bytes = IntGauge::new(
528 format!("{}storage_bytes", metric_prefix),
529 "Total storage in bytes",
530 )?;
531 registry.register(Box::new(storage_bytes.clone()))?;
532
533 let replication_factor = Gauge::new(
534 format!("{}replication_factor", metric_prefix),
535 "Average replication factor",
536 )?;
537 registry.register(Box::new(replication_factor.clone()))?;
538
539 let messages_sent = Counter::new(
541 format!("{}messages_sent_total", metric_prefix),
542 "Total messages sent",
543 )?;
544 registry.register(Box::new(messages_sent.clone()))?;
545
546 let messages_received = Counter::new(
547 format!("{}messages_received_total", metric_prefix),
548 "Total messages received",
549 )?;
550 registry.register(Box::new(messages_received.clone()))?;
551
552 let bytes_sent = Counter::new(
553 format!("{}bytes_sent_total", metric_prefix),
554 "Total bytes sent",
555 )?;
556 registry.register(Box::new(bytes_sent.clone()))?;
557
558 let bytes_received = Counter::new(
559 format!("{}bytes_received_total", metric_prefix),
560 "Total bytes received",
561 )?;
562 registry.register(Box::new(bytes_received.clone()))?;
563
564 let cache_hits = Counter::new(
566 format!("{}cache_hits_total", metric_prefix),
567 "Total cache hits",
568 )?;
569 registry.register(Box::new(cache_hits.clone()))?;
570
571 let cache_misses = Counter::new(
572 format!("{}cache_misses_total", metric_prefix),
573 "Total cache misses",
574 )?;
575 registry.register(Box::new(cache_misses.clone()))?;
576
577 let cache_size = IntGauge::new(
578 format!("{}cache_size_bytes", metric_prefix),
579 "Cache size in bytes",
580 )?;
581 registry.register(Box::new(cache_size.clone()))?;
582
583 let cache_evictions = Counter::new(
584 format!("{}cache_evictions_total", metric_prefix),
585 "Total cache evictions",
586 )?;
587 registry.register(Box::new(cache_evictions.clone()))?;
588
589 let thompson_selections = IntCounter::new(
591 format!("{}thompson_selections_total", metric_prefix),
592 "Thompson sampling strategy selections",
593 )?;
594 registry.register(Box::new(thompson_selections.clone()))?;
595
596 let qlearn_updates = Counter::new(
597 format!("{}qlearn_updates_total", metric_prefix),
598 "Q-learning updates",
599 )?;
600 registry.register(Box::new(qlearn_updates.clone()))?;
601
602 let churn_predictions = Counter::new(
603 format!("{}churn_predictions_total", metric_prefix),
604 "Churn predictions made",
605 )?;
606 registry.register(Box::new(churn_predictions.clone()))?;
607
608 let gossip_messages = Counter::new(
610 format!("{}gossip_messages_total", metric_prefix),
611 "Total gossip messages",
612 )?;
613 registry.register(Box::new(gossip_messages.clone()))?;
614
615 let mesh_size = IntGauge::new(
616 format!("{}gossip_mesh_size", metric_prefix),
617 "Gossip mesh size",
618 )?;
619 registry.register(Box::new(mesh_size.clone()))?;
620
621 let topic_count = IntGauge::new(
622 format!("{}gossip_topics", metric_prefix),
623 "Number of gossip topics",
624 )?;
625 registry.register(Box::new(topic_count.clone()))?;
626
627 let cpu_usage = Gauge::new(
629 format!("{}cpu_usage_percent", metric_prefix),
630 "CPU usage percentage",
631 )?;
632 registry.register(Box::new(cpu_usage.clone()))?;
633
634 let memory_usage = IntGauge::new(
635 format!("{}memory_usage_bytes", metric_prefix),
636 "Memory usage in bytes",
637 )?;
638 registry.register(Box::new(memory_usage.clone()))?;
639
640 let thread_count = IntGauge::new(
641 format!("{}thread_count", metric_prefix),
642 "Number of threads",
643 )?;
644 registry.register(Box::new(thread_count.clone()))?;
645
646 NetworkMetrics {
647 connected_nodes,
648 active_nodes,
649 suspicious_nodes,
650 failed_nodes,
651 routing_requests,
652 routing_success,
653 routing_latency,
654 stored_items,
655 storage_bytes,
656 replication_factor,
657 messages_sent,
658 messages_received,
659 bytes_sent,
660 bytes_received,
661 cache_hits,
662 cache_misses,
663 cache_size,
664 cache_evictions,
665 thompson_selections,
666 qlearn_updates,
667 churn_predictions,
668 gossip_messages,
669 mesh_size,
670 topic_count,
671 cpu_usage,
672 memory_usage,
673 thread_count,
674 }
675 } else {
676 NetworkMetrics {
678 connected_nodes: register_int_gauge!(
680 &format!("{}connected_nodes", metric_prefix),
681 "Number of connected nodes"
682 )?,
683 active_nodes: register_int_gauge!(
684 &format!("{}active_nodes", metric_prefix),
685 "Number of active nodes"
686 )?,
687 suspicious_nodes: register_int_gauge!(
688 &format!("{}suspicious_nodes", metric_prefix),
689 "Number of suspicious nodes"
690 )?,
691 failed_nodes: register_int_gauge!(
692 &format!("{}failed_nodes", metric_prefix),
693 "Number of failed nodes"
694 )?,
695
696 routing_requests: register_counter!(
698 &format!("{}routing_requests_total", metric_prefix),
699 "Total routing requests"
700 )?,
701 routing_success: register_counter!(
702 &format!("{}routing_success_total", metric_prefix),
703 "Successful routing requests"
704 )?,
705 routing_latency: register_histogram!(
706 &format!("{}routing_latency_seconds", metric_prefix),
707 "Routing request latency in seconds"
708 )?,
709
710 stored_items: register_int_gauge!(
712 &format!("{}stored_items", metric_prefix),
713 "Number of stored items"
714 )?,
715 storage_bytes: register_int_gauge!(
716 &format!("{}storage_bytes", metric_prefix),
717 "Total storage in bytes"
718 )?,
719 replication_factor: register_gauge!(
720 &format!("{}replication_factor", metric_prefix),
721 "Average replication factor"
722 )?,
723
724 messages_sent: register_counter!(
726 &format!("{}messages_sent_total", metric_prefix),
727 "Total messages sent"
728 )?,
729 messages_received: register_counter!(
730 &format!("{}messages_received_total", metric_prefix),
731 "Total messages received"
732 )?,
733 bytes_sent: register_counter!(
734 &format!("{}bytes_sent_total", metric_prefix),
735 "Total bytes sent"
736 )?,
737 bytes_received: register_counter!(
738 &format!("{}bytes_received_total", metric_prefix),
739 "Total bytes received"
740 )?,
741
742 cache_hits: register_counter!(
744 &format!("{}cache_hits_total", metric_prefix),
745 "Total cache hits"
746 )?,
747 cache_misses: register_counter!(
748 &format!("{}cache_misses_total", metric_prefix),
749 "Total cache misses"
750 )?,
751 cache_size: register_int_gauge!(
752 &format!("{}cache_size_bytes", metric_prefix),
753 "Cache size in bytes"
754 )?,
755 cache_evictions: register_counter!(
756 &format!("{}cache_evictions_total", metric_prefix),
757 "Total cache evictions"
758 )?,
759
760 thompson_selections: register_int_counter!(
762 &format!("{}thompson_selections_total", metric_prefix),
763 "Thompson sampling strategy selections"
764 )?,
765 qlearn_updates: register_counter!(
766 &format!("{}qlearn_updates_total", metric_prefix),
767 "Q-learning updates"
768 )?,
769 churn_predictions: register_counter!(
770 &format!("{}churn_predictions_total", metric_prefix),
771 "Churn predictions made"
772 )?,
773
774 gossip_messages: register_counter!(
776 &format!("{}gossip_messages_total", metric_prefix),
777 "Total gossip messages"
778 )?,
779 mesh_size: register_int_gauge!(
780 &format!("{}gossip_mesh_size", metric_prefix),
781 "Gossip mesh size"
782 )?,
783 topic_count: register_int_gauge!(
784 &format!("{}gossip_topics", metric_prefix),
785 "Number of gossip topics"
786 )?,
787
788 cpu_usage: register_gauge!(
790 &format!("{}cpu_usage_percent", metric_prefix),
791 "CPU usage percentage"
792 )?,
793 memory_usage: register_int_gauge!(
794 &format!("{}memory_usage_bytes", metric_prefix),
795 "Memory usage in bytes"
796 )?,
797 thread_count: register_int_gauge!(
798 &format!("{}thread_count", metric_prefix),
799 "Number of threads"
800 )?,
801 }
802 };
803
804 #[cfg(not(feature = "metrics"))]
805 let metrics = NetworkMetrics { _placeholder: () };
806
807 let anomaly_detector = Arc::new(AnomalyDetector::new(config.anomaly_window_size));
808 let alert_manager = Arc::new(AlertManager::new(config.alert_cooldown));
809 let profiler = Arc::new(PerformanceProfiler::new(config.profiling_sample_rate));
810 let logger = Arc::new(DebugLogger::new(config.log_level));
811
812 let monitoring = Self {
814 #[cfg(feature = "metrics")]
815 registry: Arc::new(registry),
816 metrics: Arc::new(metrics),
817 anomaly_detector,
818 alert_manager,
819 profiler,
820 logger,
821 components: Arc::new(components),
822 config,
823 };
824
825 Ok(monitoring)
826 }
827
828 pub async fn start(&self) {
830 let interval = self.config.collection_interval;
831 let monitoring = self.clone_for_task();
832
833 tokio::spawn(async move {
834 let mut interval = tokio::time::interval(interval);
835
836 loop {
837 interval.tick().await;
838
839 if let Err(e) = monitoring.collect_metrics().await {
840 monitoring
842 .logger
843 .error("monitoring", &format!("Metric collection error: {e}"))
844 .await;
845 }
846 }
847 });
848
849 self.start_anomaly_detection().await;
851
852 self.start_alert_processing().await;
854 }
855
856 #[allow(unused_variables)]
858 async fn collect_metrics(&self) -> Result<()> {
859 let churn_stats = self.components.churn_handler.get_stats().await;
861
862 #[cfg(feature = "metrics")]
863 {
864 self.metrics
865 .active_nodes
866 .set(churn_stats.active_nodes as i64);
867 self.metrics
868 .suspicious_nodes
869 .set(churn_stats.suspicious_nodes as i64);
870 self.metrics
871 .failed_nodes
872 .set(churn_stats.failed_nodes as i64);
873 }
874
875 let routing_stats = self.components.router.get_stats().await;
877
878 #[cfg(feature = "metrics")]
879 {
880 self.metrics
881 .routing_requests
882 .inc_by(routing_stats.total_requests as f64);
883 self.metrics
884 .routing_success
885 .inc_by(routing_stats.successful_requests as f64);
886 }
887
888 let storage_stats = self.components.storage.get_stats().await;
890
891 #[cfg(feature = "metrics")]
892 {
893 self.metrics
894 .stored_items
895 .set(storage_stats.total_items as i64);
896 self.metrics
897 .storage_bytes
898 .set(storage_stats.total_bytes as i64);
899 }
900
901 let gossip_stats = self.components.gossip.get_stats().await;
903
904 #[cfg(feature = "metrics")]
905 {
906 self.metrics
907 .gossip_messages
908 .inc_by(gossip_stats.messages_sent as f64);
909 self.metrics.mesh_size.set(gossip_stats.mesh_size as i64);
910 self.metrics
911 .topic_count
912 .set(gossip_stats.topic_count as i64);
913 }
914
915 let cache_stats = self.components.cache.get_stats();
917
918 #[cfg(feature = "metrics")]
919 {
920 self.metrics.cache_hits.inc_by(cache_stats.hits as f64);
921 self.metrics.cache_misses.inc_by(cache_stats.misses as f64);
922 self.metrics.cache_size.set(cache_stats.size_bytes as i64);
923 }
924
925 self.update_anomaly_detector().await?;
927
928 Ok(())
929 }
930
931 pub fn export_metrics(&self) -> Result<String> {
933 #[cfg(feature = "metrics")]
934 {
935 let encoder = TextEncoder::new();
936 let metric_families = self.registry.gather();
937 let mut buffer = Vec::new();
938 encoder.encode(&metric_families, &mut buffer)?;
939 String::from_utf8(buffer).map_err(|e| anyhow::anyhow!("UTF-8 error: {}", e))
940 }
941
942 #[cfg(not(feature = "metrics"))]
943 {
944 Ok("# Metrics disabled\n".to_string())
945 }
946 }
947
948 pub async fn get_health(&self) -> NetworkHealth {
950 let churn_stats = self.components.churn_handler.get_stats().await;
951 let routing_stats = self.components.router.get_stats().await;
952 let storage_stats = self.components.storage.get_stats().await;
953
954 let health_score =
955 self.calculate_health_score(&churn_stats, &routing_stats, &storage_stats);
956
957 NetworkHealth {
958 score: health_score,
959 status: if health_score > 0.8 {
960 HealthStatus::Healthy
961 } else if health_score > 0.5 {
962 HealthStatus::Degraded
963 } else {
964 HealthStatus::Critical
965 },
966 active_nodes: churn_stats.active_nodes,
967 churn_rate: churn_stats.churn_rate,
968 routing_success_rate: routing_stats.success_rate(),
969 storage_utilization: storage_stats.utilization(),
970 active_alerts: self.alert_manager.get_active_alerts().await.len(),
971 }
972 }
973
974 fn calculate_health_score(
976 &self,
977 churn_stats: &crate::adaptive::churn::ChurnStats,
978 routing_stats: &crate::adaptive::routing::RoutingStats,
979 storage_stats: &crate::adaptive::storage::StorageStats,
980 ) -> f64 {
981 let mut score = 1.0;
982
983 if churn_stats.churn_rate > 0.3 {
985 score *= 0.7;
986 } else if churn_stats.churn_rate > 0.1 {
987 score *= 0.9;
988 }
989
990 let routing_success = routing_stats.success_rate();
992 if routing_success < 0.9 {
993 score *= routing_success;
994 }
995
996 let storage_util = storage_stats.utilization();
998 if storage_util > 0.9 {
999 score *= 0.8;
1000 }
1001
1002 score
1003 }
1004
1005 async fn start_anomaly_detection(&self) {
1007 let detector = self.anomaly_detector.clone();
1008 let alert_manager = self.alert_manager.clone();
1009 let logger = self.logger.clone();
1010
1011 tokio::spawn(async move {
1012 let mut interval = tokio::time::interval(Duration::from_secs(10));
1013
1014 loop {
1015 interval.tick().await;
1016
1017 let anomalies = detector.get_recent_anomalies().await;
1018 for anomaly in anomalies {
1019 logger
1021 .warn(
1022 "anomaly_detector",
1023 &format!("Anomaly detected: {anomaly:?}"),
1024 )
1025 .await;
1026
1027 if anomaly.severity > 0.7 {
1029 let alert = Alert {
1030 id: format!("anomaly_{}", anomaly.metric),
1031 name: format!("{} Anomaly", anomaly.metric),
1032 severity: AlertSeverity::Warning,
1033 message: format!(
1034 "Anomaly detected in {}: value {} outside expected range {:?}",
1035 anomaly.metric, anomaly.value, anomaly.expected_range
1036 ),
1037 triggered_at: Instant::now(),
1038 metrics: HashMap::from([(anomaly.metric.clone(), anomaly.value)]),
1039 };
1040
1041 let _ = alert_manager.trigger_alert(alert).await;
1042 }
1043 }
1044 }
1045 });
1046 }
1047
1048 async fn start_alert_processing(&self) {
1050 let alert_manager = self.alert_manager.clone();
1051
1052 let rules = vec![
1054 AlertRule {
1055 name: "High Churn Rate".to_string(),
1056 condition: AlertCondition::Above {
1057 metric: "churn_rate".to_string(),
1058 threshold: 0.5,
1059 },
1060 severity: AlertSeverity::Critical,
1061 message_template: "Churn rate is critically high: {value}".to_string(),
1062 },
1063 AlertRule {
1064 name: "Low Routing Success".to_string(),
1065 condition: AlertCondition::Below {
1066 metric: "routing_success_rate".to_string(),
1067 threshold: 0.8,
1068 },
1069 severity: AlertSeverity::Warning,
1070 message_template: "Routing success rate is low: {value}".to_string(),
1071 },
1072 AlertRule {
1073 name: "Storage Near Capacity".to_string(),
1074 condition: AlertCondition::Above {
1075 metric: "storage_utilization".to_string(),
1076 threshold: 0.9,
1077 },
1078 severity: AlertSeverity::Warning,
1079 message_template: "Storage utilization is high: {value}".to_string(),
1080 },
1081 ];
1082
1083 for rule in rules {
1084 let _ = alert_manager.add_rule(rule).await;
1085 }
1086
1087 tokio::spawn(async move {
1089 let mut interval = tokio::time::interval(Duration::from_secs(30));
1090
1091 loop {
1092 interval.tick().await;
1093 let _ = alert_manager.evaluate_rules().await;
1094 }
1095 });
1096 }
1097
1098 async fn update_anomaly_detector(&self) -> Result<()> {
1100 let churn_stats = self.components.churn_handler.get_stats().await;
1102 self.anomaly_detector
1103 .update_metric("churn_rate", churn_stats.churn_rate)
1104 .await;
1105
1106 let routing_stats = self.components.router.get_stats().await;
1107 self.anomaly_detector
1108 .update_metric("routing_success_rate", routing_stats.success_rate())
1109 .await;
1110
1111 let storage_stats = self.components.storage.get_stats().await;
1112 self.anomaly_detector
1113 .update_metric("storage_utilization", storage_stats.utilization())
1114 .await;
1115
1116 Ok(())
1117 }
1118
1119 pub async fn get_dashboard_data(&self) -> DashboardData {
1121 DashboardData {
1122 health: self.get_health().await,
1123 metrics: self.get_current_metrics().await,
1124 recent_alerts: self.alert_manager.get_recent_alerts(10).await,
1125 anomalies: self.anomaly_detector.get_recent_anomalies().await,
1126 performance: self.profiler.get_current_profile().await,
1127 }
1128 }
1129
1130 async fn get_current_metrics(&self) -> HashMap<String, f64> {
1132 let mut metrics = HashMap::new();
1133
1134 let churn_stats = self.components.churn_handler.get_stats().await;
1136 metrics.insert("active_nodes".to_string(), churn_stats.active_nodes as f64);
1137 metrics.insert("churn_rate".to_string(), churn_stats.churn_rate);
1138
1139 let routing_stats = self.components.router.get_stats().await;
1140 metrics.insert(
1141 "routing_success_rate".to_string(),
1142 routing_stats.success_rate(),
1143 );
1144
1145 let storage_stats = self.components.storage.get_stats().await;
1146 metrics.insert(
1147 "storage_items".to_string(),
1148 storage_stats.total_items as f64,
1149 );
1150 metrics.insert(
1151 "storage_bytes".to_string(),
1152 storage_stats.total_bytes as f64,
1153 );
1154
1155 metrics
1156 }
1157
1158 fn clone_for_task(&self) -> Self {
1160 Self {
1161 #[cfg(feature = "metrics")]
1162 registry: self.registry.clone(),
1163 metrics: self.metrics.clone(),
1164 anomaly_detector: self.anomaly_detector.clone(),
1165 alert_manager: self.alert_manager.clone(),
1166 profiler: self.profiler.clone(),
1167 logger: self.logger.clone(),
1168 components: self.components.clone(),
1169 config: self.config.clone(),
1170 }
1171 }
1172}
1173
1174impl AnomalyDetector {
1175 pub fn new(window_size: usize) -> Self {
1177 Self {
1178 history: Arc::new(RwLock::new(HashMap::new())),
1179 anomalies: Arc::new(RwLock::new(Vec::new())),
1180 window_size,
1181 }
1182 }
1183
1184 pub async fn update_metric(&self, metric: &str, value: f64) {
1186 let mut history = self.history.write().await;
1187
1188 let metric_history = history
1189 .entry(metric.to_string())
1190 .or_insert_with(|| MetricHistory {
1191 values: VecDeque::new(),
1192 mean: 0.0,
1193 std_dev: 0.0,
1194 last_update: Instant::now(),
1195 });
1196
1197 metric_history.values.push_back(value);
1199 if metric_history.values.len() > self.window_size {
1200 metric_history.values.pop_front();
1201 }
1202
1203 if metric_history.values.len() >= 10 {
1205 let sum: f64 = metric_history.values.iter().sum();
1206 metric_history.mean = sum / metric_history.values.len() as f64;
1207
1208 let variance: f64 = metric_history
1209 .values
1210 .iter()
1211 .map(|v| (v - metric_history.mean).powi(2))
1212 .sum::<f64>()
1213 / metric_history.values.len() as f64;
1214 metric_history.std_dev = variance.sqrt();
1215
1216 if let Some(anomaly) = self.detect_anomaly(metric, value, metric_history) {
1218 let mut anomalies = self.anomalies.write().await;
1219 anomalies.push(anomaly);
1220
1221 if anomalies.len() > 1000 {
1223 anomalies.drain(0..100);
1224 }
1225 }
1226 }
1227
1228 metric_history.last_update = Instant::now();
1229 }
1230
1231 fn detect_anomaly(&self, metric: &str, value: f64, history: &MetricHistory) -> Option<Anomaly> {
1233 let z_score = (value - history.mean).abs() / history.std_dev;
1235 if z_score > 3.0 {
1236 return Some(Anomaly {
1237 metric: metric.to_string(),
1238 anomaly_type: AnomalyType::Statistical,
1239 severity: (z_score - 3.0).min(1.0),
1240 detected_at: Instant::now(),
1241 value,
1242 expected_range: (
1243 history.mean - 3.0 * history.std_dev,
1244 history.mean + 3.0 * history.std_dev,
1245 ),
1246 });
1247 }
1248
1249 if history.values.len() >= 2 {
1251 let prev_value = history.values[history.values.len() - 2];
1252 let change_rate = (value - prev_value).abs() / prev_value.abs().max(1.0);
1253
1254 if change_rate > 0.5 {
1255 return Some(Anomaly {
1256 metric: metric.to_string(),
1257 anomaly_type: AnomalyType::Spike,
1258 severity: change_rate.min(1.0),
1259 detected_at: Instant::now(),
1260 value,
1261 expected_range: (prev_value * 0.5, prev_value * 1.5),
1262 });
1263 }
1264 }
1265
1266 None
1267 }
1268
1269 pub async fn get_recent_anomalies(&self) -> Vec<Anomaly> {
1271 let anomalies = self.anomalies.read().await;
1272 let cutoff = Instant::now() - Duration::from_secs(300);
1273
1274 anomalies
1275 .iter()
1276 .filter(|a| a.detected_at > cutoff)
1277 .cloned()
1278 .collect()
1279 }
1280}
1281
1282impl AlertManager {
1283 pub fn new(cooldown_period: Duration) -> Self {
1285 Self {
1286 active_alerts: Arc::new(RwLock::new(HashMap::new())),
1287 rules: Arc::new(RwLock::new(Vec::new())),
1288 channels: Arc::new(RwLock::new(Vec::new())),
1289 cooldowns: Arc::new(RwLock::new(HashMap::new())),
1290 cooldown_period,
1291 }
1292 }
1293
1294 pub async fn add_rule(&self, rule: AlertRule) -> Result<()> {
1296 let mut rules = self.rules.write().await;
1297 rules.push(rule);
1298 Ok(())
1299 }
1300
1301 pub async fn add_channel(&self, channel: Box<dyn AlertChannel>) {
1303 let mut channels = self.channels.write().await;
1304 channels.push(channel);
1305 }
1306
1307 pub async fn trigger_alert(&self, alert: Alert) -> Result<()> {
1309 let mut cooldowns = self.cooldowns.write().await;
1311 if let Some(last_trigger) = cooldowns.get(&alert.id)
1312 && last_trigger.elapsed() < self.cooldown_period
1313 {
1314 return Ok(()); }
1316
1317 let mut active_alerts = self.active_alerts.write().await;
1319 active_alerts.insert(alert.id.clone(), alert.clone());
1320 cooldowns.insert(alert.id.clone(), Instant::now());
1321
1322 let channels = self.channels.read().await;
1324 for channel in channels.iter() {
1325 let _ = channel.send_alert(&alert).await;
1326 }
1327
1328 Ok(())
1329 }
1330
1331 pub async fn evaluate_rules(&self) -> Result<()> {
1333 let rules = self.rules.read().await.clone();
1334
1335 for _rule in rules {
1336 }
1340
1341 Ok(())
1342 }
1343
1344 pub async fn get_active_alerts(&self) -> Vec<Alert> {
1346 self.active_alerts.read().await.values().cloned().collect()
1347 }
1348
1349 pub async fn get_recent_alerts(&self, count: usize) -> Vec<Alert> {
1351 let mut alerts: Vec<_> = self.active_alerts.read().await.values().cloned().collect();
1352 alerts.sort_by_key(|a| std::cmp::Reverse(a.triggered_at));
1353 alerts.truncate(count);
1354 alerts
1355 }
1356}
1357
1358impl PerformanceProfiler {
1359 pub fn new(sample_rate: f64) -> Self {
1361 Self {
1362 profiles: Arc::new(RwLock::new(HashMap::new())),
1363 completed: Arc::new(RwLock::new(VecDeque::new())),
1364 sample_rate,
1365 }
1366 }
1367
1368 pub async fn start_profile(&self, name: String) {
1370 if rand::random::<f64>() > self.sample_rate {
1371 return; }
1373
1374 let mut profiles = self.profiles.write().await;
1375 profiles.insert(
1376 name.clone(),
1377 Profile {
1378 name,
1379 started_at: Instant::now(),
1380 samples: Vec::new(),
1381 },
1382 );
1383 }
1384
1385 pub async fn record_sample(&self, profile_name: &str) {
1387 let mut profiles = self.profiles.write().await;
1388
1389 if let Some(profile) = profiles.get_mut(profile_name) {
1390 profile.samples.push(ProfileSample {
1391 _timestamp: Instant::now(),
1392 cpu_usage: Self::get_cpu_usage(),
1393 memory_bytes: Self::get_memory_usage(),
1394 _operations: HashMap::new(), });
1396 }
1397 }
1398
1399 pub async fn end_profile(&self, name: &str) {
1401 let mut profiles = self.profiles.write().await;
1402
1403 if let Some(profile) = profiles.remove(name) {
1404 let duration = profile.started_at.elapsed();
1405
1406 let avg_cpu = profile.samples.iter().map(|s| s.cpu_usage).sum::<f64>()
1407 / profile.samples.len().max(1) as f64;
1408
1409 let peak_memory = profile
1410 .samples
1411 .iter()
1412 .map(|s| s.memory_bytes)
1413 .max()
1414 .unwrap_or(0);
1415
1416 let completed_profile = CompletedProfile {
1417 name: profile.name,
1418 duration,
1419 avg_cpu,
1420 peak_memory,
1421 operations: HashMap::new(), };
1423
1424 let mut completed = self.completed.write().await;
1425 completed.push_back(completed_profile);
1426
1427 if completed.len() > 100 {
1429 completed.pop_front();
1430 }
1431 }
1432 }
1433
1434 pub async fn get_current_profile(&self) -> Option<ProfileData> {
1436 Some(ProfileData {
1437 cpu_usage: Self::get_cpu_usage(),
1438 memory_bytes: Self::get_memory_usage(),
1439 thread_count: Self::get_thread_count(),
1440 active_profiles: self.profiles.read().await.len(),
1441 })
1442 }
1443
1444 fn get_cpu_usage() -> f64 {
1446 rand::random::<f64>() * 100.0
1448 }
1449
1450 fn get_memory_usage() -> u64 {
1452 1024 * 1024 * 512 }
1455
1456 fn get_thread_count() -> usize {
1458 8
1460 }
1461}
1462
1463impl DebugLogger {
1464 pub fn new(level: LogLevel) -> Self {
1466 Self {
1467 level,
1468 buffer: Arc::new(RwLock::new(VecDeque::new())),
1469 channels: Arc::new(RwLock::new(Vec::new())),
1470 }
1471 }
1472
1473 pub async fn log(&self, level: LogLevel, component: &str, message: &str) {
1475 if level > self.level {
1476 return; }
1478
1479 let entry = LogEntry {
1480 timestamp: Instant::now(),
1481 level,
1482 component: component.to_string(),
1483 message: message.to_string(),
1484 data: None,
1485 };
1486
1487 let mut buffer = self.buffer.write().await;
1489 buffer.push_back(entry.clone());
1490
1491 if buffer.len() > 10000 {
1493 buffer.pop_front();
1494 }
1495
1496 let channels = self.channels.read().await;
1498 for channel in channels.iter() {
1499 let _ = channel.send(entry.clone());
1500 }
1501 }
1502
1503 pub async fn error(&self, component: &str, message: &str) {
1505 self.log(LogLevel::Error, component, message).await;
1506 }
1507
1508 pub async fn warn(&self, component: &str, message: &str) {
1510 self.log(LogLevel::Warn, component, message).await;
1511 }
1512
1513 pub async fn info(&self, component: &str, message: &str) {
1515 self.log(LogLevel::Info, component, message).await;
1516 }
1517
1518 pub async fn debug(&self, component: &str, message: &str) {
1520 self.log(LogLevel::Debug, component, message).await;
1521 }
1522
1523 pub async fn trace(&self, component: &str, message: &str) {
1525 self.log(LogLevel::Trace, component, message).await;
1526 }
1527
1528 pub async fn subscribe(&self) -> mpsc::UnboundedReceiver<LogEntry> {
1530 let (tx, rx) = mpsc::unbounded_channel();
1531 let mut channels = self.channels.write().await;
1532 channels.push(tx);
1533 rx
1534 }
1535
1536 pub async fn get_recent_logs(&self, count: usize) -> Vec<LogEntry> {
1538 let buffer = self.buffer.read().await;
1539 buffer.iter().rev().take(count).cloned().collect()
1540 }
1541}
1542
1543#[derive(Debug, Clone)]
1545pub struct NetworkHealth {
1546 pub score: f64,
1548
1549 pub status: HealthStatus,
1551
1552 pub active_nodes: u64,
1554
1555 pub churn_rate: f64,
1557
1558 pub routing_success_rate: f64,
1560
1561 pub storage_utilization: f64,
1563
1564 pub active_alerts: usize,
1566}
1567
1568#[derive(Debug, Clone, PartialEq)]
1570pub enum HealthStatus {
1571 Healthy,
1572 Degraded,
1573 Critical,
1574}
1575
1576#[derive(Debug, Clone)]
1578pub struct DashboardData {
1579 pub health: NetworkHealth,
1581
1582 pub metrics: HashMap<String, f64>,
1584
1585 pub recent_alerts: Vec<Alert>,
1587
1588 pub anomalies: Vec<Anomaly>,
1590
1591 pub performance: Option<ProfileData>,
1593}
1594
1595#[derive(Debug, Clone)]
1597pub struct ProfileData {
1598 pub cpu_usage: f64,
1600
1601 pub memory_bytes: u64,
1603
1604 pub thread_count: usize,
1606
1607 pub active_profiles: usize,
1609}
1610
1611#[cfg(test)]
1612mod tests {
1613 use super::*;
1614
1615 #[tokio::test]
1616 async fn test_anomaly_detection() {
1617 let detector = AnomalyDetector::new(100);
1618
1619 for i in 0..50 {
1621 detector
1622 .update_metric("test_metric", 50.0 + (i as f64 % 10.0))
1623 .await;
1624 }
1625
1626 detector.update_metric("test_metric", 200.0).await;
1628
1629 let anomalies = detector.get_recent_anomalies().await;
1631 assert!(!anomalies.is_empty());
1632 assert_eq!(anomalies[0].anomaly_type, AnomalyType::Statistical);
1633 }
1634
1635 #[tokio::test]
1636 async fn test_alert_cooldown() {
1637 let alert_manager = AlertManager::new(Duration::from_secs(60));
1638
1639 let alert = Alert {
1640 id: "test_alert".to_string(),
1641 name: "Test Alert".to_string(),
1642 severity: AlertSeverity::Warning,
1643 message: "Test message".to_string(),
1644 triggered_at: Instant::now(),
1645 metrics: HashMap::new(),
1646 };
1647
1648 alert_manager.trigger_alert(alert.clone()).await.unwrap();
1650 assert_eq!(alert_manager.get_active_alerts().await.len(), 1);
1651
1652 alert_manager.trigger_alert(alert).await.unwrap();
1654 assert_eq!(alert_manager.get_active_alerts().await.len(), 1);
1655 }
1656
1657 #[tokio::test]
1658 async fn test_performance_profiling() {
1659 let profiler = PerformanceProfiler::new(1.0); profiler.start_profile("test_operation".to_string()).await;
1662
1663 for _ in 0..5 {
1665 profiler.record_sample("test_operation").await;
1666 tokio::time::sleep(Duration::from_millis(10)).await;
1667 }
1668
1669 profiler.end_profile("test_operation").await;
1670
1671 let completed = profiler.completed.read().await;
1673 assert_eq!(completed.len(), 1);
1674 assert_eq!(completed[0].name, "test_operation");
1675 }
1676
1677 #[tokio::test]
1678 async fn test_debug_logging() {
1679 let logger = DebugLogger::new(LogLevel::Debug);
1680
1681 let mut rx = logger.subscribe().await;
1683
1684 logger.error("test", "Error message").await;
1686 logger.warn("test", "Warning message").await;
1687 logger.info("test", "Info message").await;
1688 logger.debug("test", "Debug message").await;
1689 logger.trace("test", "Trace message").await; let mut count = 0;
1693 while let Ok(entry) = rx.try_recv() {
1694 count += 1;
1695 assert!(entry.level <= LogLevel::Debug);
1696 }
1697 assert_eq!(count, 4); }
1699}