1use super::*;
25use crate::adaptive::{
26 AdaptiveGossipSub, AdaptiveRouter, ChurnHandler, ContentStore, ReplicationManager,
27 learning::{QLearnCacheManager, ThompsonSampling},
28};
29use anyhow::Result;
30#[cfg(feature = "metrics")]
31use prometheus::{
32 Counter, Encoder, Gauge, Histogram, IntCounter, IntGauge, Registry, TextEncoder,
33 register_counter, register_gauge, register_histogram, register_int_counter, register_int_gauge,
34};
35use std::{
36 collections::{HashMap, VecDeque},
37 sync::Arc,
38 time::{Duration, Instant},
39};
40use tokio::sync::{RwLock, mpsc};
41
42pub struct MonitoringSystem {
44 #[cfg(feature = "metrics")]
45 registry: Arc<Registry>,
47
48 metrics: Arc<NetworkMetrics>,
50
51 anomaly_detector: Arc<AnomalyDetector>,
53
54 alert_manager: Arc<AlertManager>,
56
57 profiler: Arc<PerformanceProfiler>,
59
60 logger: Arc<DebugLogger>,
62
63 components: Arc<MonitoredComponents>,
65
66 config: MonitoringConfig,
68}
69
70#[derive(Debug, Clone)]
72pub struct MonitoringConfig {
73 pub collection_interval: Duration,
75
76 pub anomaly_window_size: usize,
78
79 pub alert_cooldown: Duration,
81
82 pub profiling_sample_rate: f64,
84
85 pub log_level: LogLevel,
87
88 pub dashboard_interval: Duration,
90}
91
92impl Default for MonitoringConfig {
93 fn default() -> Self {
94 Self {
95 collection_interval: Duration::from_secs(5),
96 anomaly_window_size: 100,
97 alert_cooldown: Duration::from_secs(300),
98 profiling_sample_rate: 0.01,
99 log_level: LogLevel::Info,
100 dashboard_interval: Duration::from_secs(1),
101 }
102 }
103}
104
105#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
107pub enum LogLevel {
108 Error,
109 Warn,
110 Info,
111 Debug,
112 Trace,
113}
114
115#[allow(dead_code)]
117struct NetworkMetrics {
118 #[cfg(feature = "metrics")]
119 connected_nodes: IntGauge,
121 #[cfg(feature = "metrics")]
122 active_nodes: IntGauge,
123 #[cfg(feature = "metrics")]
124 suspicious_nodes: IntGauge,
125 #[cfg(feature = "metrics")]
126 failed_nodes: IntGauge,
127
128 #[cfg(feature = "metrics")]
129 routing_requests: Counter,
131 #[cfg(feature = "metrics")]
132 routing_success: Counter,
133 #[cfg(feature = "metrics")]
134 routing_latency: Histogram,
135
136 #[cfg(feature = "metrics")]
137 stored_items: IntGauge,
139 #[cfg(feature = "metrics")]
140 storage_bytes: IntGauge,
141 #[cfg(feature = "metrics")]
142 replication_factor: Gauge,
143
144 #[cfg(feature = "metrics")]
145 messages_sent: Counter,
147 #[cfg(feature = "metrics")]
148 messages_received: Counter,
149 #[cfg(feature = "metrics")]
150 bytes_sent: Counter,
151 #[cfg(feature = "metrics")]
152 bytes_received: Counter,
153
154 #[cfg(feature = "metrics")]
155 cache_hits: Counter,
157 #[cfg(feature = "metrics")]
158 cache_misses: Counter,
159 #[cfg(feature = "metrics")]
160 cache_size: IntGauge,
161 #[cfg(feature = "metrics")]
162 cache_evictions: Counter,
163
164 #[cfg(feature = "metrics")]
165 thompson_selections: IntCounter,
167 #[cfg(feature = "metrics")]
168 qlearn_updates: Counter,
169 #[cfg(feature = "metrics")]
170 churn_predictions: Counter,
171
172 #[cfg(feature = "metrics")]
173 gossip_messages: Counter,
175 #[cfg(feature = "metrics")]
176 mesh_size: IntGauge,
177 #[cfg(feature = "metrics")]
178 topic_count: IntGauge,
179
180 #[cfg(feature = "metrics")]
181 cpu_usage: Gauge,
183 #[cfg(feature = "metrics")]
184 memory_usage: IntGauge,
185 #[cfg(feature = "metrics")]
186 thread_count: IntGauge,
187
188 #[cfg(not(feature = "metrics"))]
189 _placeholder: (),
191}
192
193pub struct MonitoredComponents {
195 pub router: Arc<AdaptiveRouter>,
196 pub churn_handler: Arc<ChurnHandler>,
197 pub gossip: Arc<AdaptiveGossipSub>,
198 pub storage: Arc<ContentStore>,
199 pub replication: Arc<ReplicationManager>,
200 pub thompson: Arc<ThompsonSampling>,
201 pub cache: Arc<QLearnCacheManager>,
202}
203
204pub struct AnomalyDetector {
206 history: Arc<RwLock<HashMap<String, MetricHistory>>>,
208
209 anomalies: Arc<RwLock<Vec<Anomaly>>>,
211
212 window_size: usize,
214}
215
216struct MetricHistory {
218 values: VecDeque<f64>,
220
221 mean: f64,
223 std_dev: f64,
224
225 last_update: Instant,
227}
228
229#[derive(Debug, Clone)]
231pub struct Anomaly {
232 pub metric: String,
234
235 pub anomaly_type: AnomalyType,
237
238 pub severity: f64,
240
241 pub detected_at: Instant,
243
244 pub value: f64,
246
247 pub expected_range: (f64, f64),
249}
250
251#[derive(Debug, Clone, PartialEq)]
253pub enum AnomalyType {
254 Statistical,
256
257 Spike,
259
260 Drift,
262
263 Pattern,
265}
266
267pub struct AlertManager {
269 active_alerts: Arc<RwLock<HashMap<String, Alert>>>,
271
272 rules: Arc<RwLock<Vec<AlertRule>>>,
274
275 channels: Arc<RwLock<Vec<Box<dyn AlertChannel>>>>,
277
278 cooldowns: Arc<RwLock<HashMap<String, Instant>>>,
280
281 cooldown_period: Duration,
283}
284
285#[derive(Debug, Clone)]
287pub struct Alert {
288 pub id: String,
290
291 pub name: String,
293
294 pub severity: AlertSeverity,
296
297 pub message: String,
299
300 pub triggered_at: Instant,
302
303 pub metrics: HashMap<String, f64>,
305}
306
307#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
309pub enum AlertSeverity {
310 Info,
311 Warning,
312 Critical,
313}
314
315#[derive(Debug, Clone)]
317pub struct AlertRule {
318 pub name: String,
320
321 pub condition: AlertCondition,
323
324 pub severity: AlertSeverity,
326
327 pub message_template: String,
329}
330
331#[derive(Debug, Clone)]
333pub enum AlertCondition {
334 Above { metric: String, threshold: f64 },
336
337 Below { metric: String, threshold: f64 },
339
340 RateOfChange { metric: String, threshold: f64 },
342
343 AnomalyDetected { metric: String },
345}
346
347#[async_trait]
349pub trait AlertChannel: Send + Sync {
350 async fn send_alert(&self, alert: &Alert) -> Result<()>;
352}
353
354pub struct PerformanceProfiler {
356 profiles: Arc<RwLock<HashMap<String, Profile>>>,
358
359 completed: Arc<RwLock<VecDeque<CompletedProfile>>>,
361
362 sample_rate: f64,
364}
365
366struct Profile {
368 name: String,
370
371 started_at: Instant,
373
374 samples: Vec<ProfileSample>,
376}
377
378#[derive(Debug, Clone)]
380struct ProfileSample {
381 _timestamp: Instant,
383
384 cpu_usage: f64,
386
387 memory_bytes: u64,
389
390 _operations: HashMap<String, u64>,
392}
393
394#[derive(Debug, Clone)]
396pub struct CompletedProfile {
397 pub name: String,
399
400 pub duration: Duration,
402
403 pub avg_cpu: f64,
405
406 pub peak_memory: u64,
408
409 pub operations: HashMap<String, u64>,
411}
412
413pub struct DebugLogger {
415 level: LogLevel,
417
418 buffer: Arc<RwLock<VecDeque<LogEntry>>>,
420
421 channels: Arc<RwLock<Vec<mpsc::UnboundedSender<LogEntry>>>>,
423}
424
425#[derive(Debug, Clone)]
427pub struct LogEntry {
428 pub timestamp: Instant,
430
431 pub level: LogLevel,
433
434 pub component: String,
436
437 pub message: String,
439
440 pub data: Option<serde_json::Value>,
442}
443
444impl MonitoringSystem {
445 pub fn new(components: MonitoredComponents, config: MonitoringConfig) -> Result<Self> {
447 #[cfg(feature = "metrics")]
448 let registry = Registry::new();
449
450 #[cfg(feature = "metrics")]
452 let metrics = NetworkMetrics {
453 connected_nodes: register_int_gauge!(
455 "p2p_connected_nodes",
456 "Number of connected nodes"
457 )?,
458 active_nodes: register_int_gauge!("p2p_active_nodes", "Number of active nodes")?,
459 suspicious_nodes: register_int_gauge!(
460 "p2p_suspicious_nodes",
461 "Number of suspicious nodes"
462 )?,
463 failed_nodes: register_int_gauge!("p2p_failed_nodes", "Number of failed nodes")?,
464
465 routing_requests: register_counter!(
467 "p2p_routing_requests_total",
468 "Total routing requests"
469 )?,
470 routing_success: register_counter!(
471 "p2p_routing_success_total",
472 "Successful routing requests"
473 )?,
474 routing_latency: register_histogram!(
475 "p2p_routing_latency_seconds",
476 "Routing request latency in seconds"
477 )?,
478
479 stored_items: register_int_gauge!("p2p_stored_items", "Number of stored items")?,
481 storage_bytes: register_int_gauge!("p2p_storage_bytes", "Total storage in bytes")?,
482 replication_factor: register_gauge!(
483 "p2p_replication_factor",
484 "Average replication factor"
485 )?,
486
487 messages_sent: register_counter!("p2p_messages_sent_total", "Total messages sent")?,
489 messages_received: register_counter!(
490 "p2p_messages_received_total",
491 "Total messages received"
492 )?,
493 bytes_sent: register_counter!("p2p_bytes_sent_total", "Total bytes sent")?,
494 bytes_received: register_counter!("p2p_bytes_received_total", "Total bytes received")?,
495
496 cache_hits: register_counter!("p2p_cache_hits_total", "Total cache hits")?,
498 cache_misses: register_counter!("p2p_cache_misses_total", "Total cache misses")?,
499 cache_size: register_int_gauge!("p2p_cache_size_bytes", "Cache size in bytes")?,
500 cache_evictions: register_counter!(
501 "p2p_cache_evictions_total",
502 "Total cache evictions"
503 )?,
504
505 thompson_selections: register_int_counter!(
507 "p2p_thompson_selections_total",
508 "Thompson sampling strategy selections"
509 )?,
510 qlearn_updates: register_counter!("p2p_qlearn_updates_total", "Q-learning updates")?,
511 churn_predictions: register_counter!(
512 "p2p_churn_predictions_total",
513 "Churn predictions made"
514 )?,
515
516 gossip_messages: register_counter!(
518 "p2p_gossip_messages_total",
519 "Total gossip messages"
520 )?,
521 mesh_size: register_int_gauge!("p2p_gossip_mesh_size", "Gossip mesh size")?,
522 topic_count: register_int_gauge!("p2p_gossip_topics", "Number of gossip topics")?,
523
524 cpu_usage: register_gauge!("p2p_cpu_usage_percent", "CPU usage percentage")?,
526 memory_usage: register_int_gauge!("p2p_memory_usage_bytes", "Memory usage in bytes")?,
527 thread_count: register_int_gauge!("p2p_thread_count", "Number of threads")?,
528 };
529
530 #[cfg(not(feature = "metrics"))]
531 let metrics = NetworkMetrics { _placeholder: () };
532
533 let anomaly_detector = Arc::new(AnomalyDetector::new(config.anomaly_window_size));
534 let alert_manager = Arc::new(AlertManager::new(config.alert_cooldown));
535 let profiler = Arc::new(PerformanceProfiler::new(config.profiling_sample_rate));
536 let logger = Arc::new(DebugLogger::new(config.log_level));
537
538 let monitoring = Self {
540 #[cfg(feature = "metrics")]
541 registry: Arc::new(registry),
542 metrics: Arc::new(metrics),
543 anomaly_detector,
544 alert_manager,
545 profiler,
546 logger,
547 components: Arc::new(components),
548 config,
549 };
550
551 Ok(monitoring)
552 }
553
554 pub async fn start(&self) {
556 let interval = self.config.collection_interval;
557 let monitoring = self.clone_for_task();
558
559 tokio::spawn(async move {
560 let mut interval = tokio::time::interval(interval);
561
562 loop {
563 interval.tick().await;
564
565 if let Err(e) = monitoring.collect_metrics().await {
566 monitoring
568 .logger
569 .error("monitoring", &format!("Metric collection error: {e}"))
570 .await;
571 }
572 }
573 });
574
575 self.start_anomaly_detection().await;
577
578 self.start_alert_processing().await;
580 }
581
582 async fn collect_metrics(&self) -> Result<()> {
584 let churn_stats = self.components.churn_handler.get_stats().await;
586
587 #[cfg(feature = "metrics")]
588 {
589 self.metrics
590 .active_nodes
591 .set(churn_stats.active_nodes as i64);
592 self.metrics
593 .suspicious_nodes
594 .set(churn_stats.suspicious_nodes as i64);
595 self.metrics
596 .failed_nodes
597 .set(churn_stats.failed_nodes as i64);
598 }
599
600 let routing_stats = self.components.router.get_stats().await;
602
603 #[cfg(feature = "metrics")]
604 {
605 self.metrics
606 .routing_requests
607 .inc_by(routing_stats.total_requests as f64);
608 self.metrics
609 .routing_success
610 .inc_by(routing_stats.successful_requests as f64);
611 }
612
613 let storage_stats = self.components.storage.get_stats().await;
615
616 #[cfg(feature = "metrics")]
617 {
618 self.metrics
619 .stored_items
620 .set(storage_stats.total_items as i64);
621 self.metrics
622 .storage_bytes
623 .set(storage_stats.total_bytes as i64);
624 }
625
626 let gossip_stats = self.components.gossip.get_stats().await;
628
629 #[cfg(feature = "metrics")]
630 {
631 self.metrics
632 .gossip_messages
633 .inc_by(gossip_stats.messages_sent as f64);
634 self.metrics.mesh_size.set(gossip_stats.mesh_size as i64);
635 self.metrics
636 .topic_count
637 .set(gossip_stats.topic_count as i64);
638 }
639
640 let cache_stats = self.components.cache.get_stats();
642
643 #[cfg(feature = "metrics")]
644 {
645 self.metrics.cache_hits.inc_by(cache_stats.hits as f64);
646 self.metrics.cache_misses.inc_by(cache_stats.misses as f64);
647 self.metrics.cache_size.set(cache_stats.size_bytes as i64);
648 }
649
650 self.update_anomaly_detector().await?;
652
653 Ok(())
654 }
655
656 pub fn export_metrics(&self) -> Result<String> {
658 #[cfg(feature = "metrics")]
659 {
660 let encoder = TextEncoder::new();
661 let metric_families = self.registry.gather();
662 let mut buffer = Vec::new();
663 encoder.encode(&metric_families, &mut buffer)?;
664 String::from_utf8(buffer).map_err(|e| anyhow::anyhow!("UTF-8 error: {}", e))
665 }
666
667 #[cfg(not(feature = "metrics"))]
668 {
669 Ok("# Metrics disabled\n".to_string())
670 }
671 }
672
673 pub async fn get_health(&self) -> NetworkHealth {
675 let churn_stats = self.components.churn_handler.get_stats().await;
676 let routing_stats = self.components.router.get_stats().await;
677 let storage_stats = self.components.storage.get_stats().await;
678
679 let health_score =
680 self.calculate_health_score(&churn_stats, &routing_stats, &storage_stats);
681
682 NetworkHealth {
683 score: health_score,
684 status: if health_score > 0.8 {
685 HealthStatus::Healthy
686 } else if health_score > 0.5 {
687 HealthStatus::Degraded
688 } else {
689 HealthStatus::Critical
690 },
691 active_nodes: churn_stats.active_nodes,
692 churn_rate: churn_stats.churn_rate,
693 routing_success_rate: routing_stats.success_rate(),
694 storage_utilization: storage_stats.utilization(),
695 active_alerts: self.alert_manager.get_active_alerts().await.len(),
696 }
697 }
698
699 fn calculate_health_score(
701 &self,
702 churn_stats: &crate::adaptive::churn::ChurnStats,
703 routing_stats: &crate::adaptive::routing::RoutingStats,
704 storage_stats: &crate::adaptive::storage::StorageStats,
705 ) -> f64 {
706 let mut score = 1.0;
707
708 if churn_stats.churn_rate > 0.3 {
710 score *= 0.7;
711 } else if churn_stats.churn_rate > 0.1 {
712 score *= 0.9;
713 }
714
715 let routing_success = routing_stats.success_rate();
717 if routing_success < 0.9 {
718 score *= routing_success;
719 }
720
721 let storage_util = storage_stats.utilization();
723 if storage_util > 0.9 {
724 score *= 0.8;
725 }
726
727 score
728 }
729
730 async fn start_anomaly_detection(&self) {
732 let detector = self.anomaly_detector.clone();
733 let alert_manager = self.alert_manager.clone();
734 let logger = self.logger.clone();
735
736 tokio::spawn(async move {
737 let mut interval = tokio::time::interval(Duration::from_secs(10));
738
739 loop {
740 interval.tick().await;
741
742 let anomalies = detector.get_recent_anomalies().await;
743 for anomaly in anomalies {
744 logger
746 .warn(
747 "anomaly_detector",
748 &format!("Anomaly detected: {anomaly:?}"),
749 )
750 .await;
751
752 if anomaly.severity > 0.7 {
754 let alert = Alert {
755 id: format!("anomaly_{}", anomaly.metric),
756 name: format!("{} Anomaly", anomaly.metric),
757 severity: AlertSeverity::Warning,
758 message: format!(
759 "Anomaly detected in {}: value {} outside expected range {:?}",
760 anomaly.metric, anomaly.value, anomaly.expected_range
761 ),
762 triggered_at: Instant::now(),
763 metrics: HashMap::from([(anomaly.metric.clone(), anomaly.value)]),
764 };
765
766 let _ = alert_manager.trigger_alert(alert).await;
767 }
768 }
769 }
770 });
771 }
772
773 async fn start_alert_processing(&self) {
775 let alert_manager = self.alert_manager.clone();
776
777 let rules = vec![
779 AlertRule {
780 name: "High Churn Rate".to_string(),
781 condition: AlertCondition::Above {
782 metric: "churn_rate".to_string(),
783 threshold: 0.5,
784 },
785 severity: AlertSeverity::Critical,
786 message_template: "Churn rate is critically high: {value}".to_string(),
787 },
788 AlertRule {
789 name: "Low Routing Success".to_string(),
790 condition: AlertCondition::Below {
791 metric: "routing_success_rate".to_string(),
792 threshold: 0.8,
793 },
794 severity: AlertSeverity::Warning,
795 message_template: "Routing success rate is low: {value}".to_string(),
796 },
797 AlertRule {
798 name: "Storage Near Capacity".to_string(),
799 condition: AlertCondition::Above {
800 metric: "storage_utilization".to_string(),
801 threshold: 0.9,
802 },
803 severity: AlertSeverity::Warning,
804 message_template: "Storage utilization is high: {value}".to_string(),
805 },
806 ];
807
808 for rule in rules {
809 let _ = alert_manager.add_rule(rule).await;
810 }
811
812 tokio::spawn(async move {
814 let mut interval = tokio::time::interval(Duration::from_secs(30));
815
816 loop {
817 interval.tick().await;
818 let _ = alert_manager.evaluate_rules().await;
819 }
820 });
821 }
822
823 async fn update_anomaly_detector(&self) -> Result<()> {
825 let churn_stats = self.components.churn_handler.get_stats().await;
827 self.anomaly_detector
828 .update_metric("churn_rate", churn_stats.churn_rate)
829 .await;
830
831 let routing_stats = self.components.router.get_stats().await;
832 self.anomaly_detector
833 .update_metric("routing_success_rate", routing_stats.success_rate())
834 .await;
835
836 let storage_stats = self.components.storage.get_stats().await;
837 self.anomaly_detector
838 .update_metric("storage_utilization", storage_stats.utilization())
839 .await;
840
841 Ok(())
842 }
843
844 pub async fn get_dashboard_data(&self) -> DashboardData {
846 DashboardData {
847 health: self.get_health().await,
848 metrics: self.get_current_metrics().await,
849 recent_alerts: self.alert_manager.get_recent_alerts(10).await,
850 anomalies: self.anomaly_detector.get_recent_anomalies().await,
851 performance: self.profiler.get_current_profile().await,
852 }
853 }
854
855 async fn get_current_metrics(&self) -> HashMap<String, f64> {
857 let mut metrics = HashMap::new();
858
859 let churn_stats = self.components.churn_handler.get_stats().await;
861 metrics.insert("active_nodes".to_string(), churn_stats.active_nodes as f64);
862 metrics.insert("churn_rate".to_string(), churn_stats.churn_rate);
863
864 let routing_stats = self.components.router.get_stats().await;
865 metrics.insert(
866 "routing_success_rate".to_string(),
867 routing_stats.success_rate(),
868 );
869
870 let storage_stats = self.components.storage.get_stats().await;
871 metrics.insert(
872 "storage_items".to_string(),
873 storage_stats.total_items as f64,
874 );
875 metrics.insert(
876 "storage_bytes".to_string(),
877 storage_stats.total_bytes as f64,
878 );
879
880 metrics
881 }
882
883 fn clone_for_task(&self) -> Self {
885 Self {
886 #[cfg(feature = "metrics")]
887 registry: self.registry.clone(),
888 metrics: self.metrics.clone(),
889 anomaly_detector: self.anomaly_detector.clone(),
890 alert_manager: self.alert_manager.clone(),
891 profiler: self.profiler.clone(),
892 logger: self.logger.clone(),
893 components: self.components.clone(),
894 config: self.config.clone(),
895 }
896 }
897}
898
899impl AnomalyDetector {
900 pub fn new(window_size: usize) -> Self {
902 Self {
903 history: Arc::new(RwLock::new(HashMap::new())),
904 anomalies: Arc::new(RwLock::new(Vec::new())),
905 window_size,
906 }
907 }
908
909 pub async fn update_metric(&self, metric: &str, value: f64) {
911 let mut history = self.history.write().await;
912
913 let metric_history = history
914 .entry(metric.to_string())
915 .or_insert_with(|| MetricHistory {
916 values: VecDeque::new(),
917 mean: 0.0,
918 std_dev: 0.0,
919 last_update: Instant::now(),
920 });
921
922 metric_history.values.push_back(value);
924 if metric_history.values.len() > self.window_size {
925 metric_history.values.pop_front();
926 }
927
928 if metric_history.values.len() >= 10 {
930 let sum: f64 = metric_history.values.iter().sum();
931 metric_history.mean = sum / metric_history.values.len() as f64;
932
933 let variance: f64 = metric_history
934 .values
935 .iter()
936 .map(|v| (v - metric_history.mean).powi(2))
937 .sum::<f64>()
938 / metric_history.values.len() as f64;
939 metric_history.std_dev = variance.sqrt();
940
941 if let Some(anomaly) = self.detect_anomaly(metric, value, metric_history) {
943 let mut anomalies = self.anomalies.write().await;
944 anomalies.push(anomaly);
945
946 if anomalies.len() > 1000 {
948 anomalies.drain(0..100);
949 }
950 }
951 }
952
953 metric_history.last_update = Instant::now();
954 }
955
956 fn detect_anomaly(&self, metric: &str, value: f64, history: &MetricHistory) -> Option<Anomaly> {
958 let z_score = (value - history.mean).abs() / history.std_dev;
960 if z_score > 3.0 {
961 return Some(Anomaly {
962 metric: metric.to_string(),
963 anomaly_type: AnomalyType::Statistical,
964 severity: (z_score - 3.0).min(1.0),
965 detected_at: Instant::now(),
966 value,
967 expected_range: (
968 history.mean - 3.0 * history.std_dev,
969 history.mean + 3.0 * history.std_dev,
970 ),
971 });
972 }
973
974 if history.values.len() >= 2 {
976 let prev_value = history.values[history.values.len() - 2];
977 let change_rate = (value - prev_value).abs() / prev_value.abs().max(1.0);
978
979 if change_rate > 0.5 {
980 return Some(Anomaly {
981 metric: metric.to_string(),
982 anomaly_type: AnomalyType::Spike,
983 severity: change_rate.min(1.0),
984 detected_at: Instant::now(),
985 value,
986 expected_range: (prev_value * 0.5, prev_value * 1.5),
987 });
988 }
989 }
990
991 None
992 }
993
994 pub async fn get_recent_anomalies(&self) -> Vec<Anomaly> {
996 let anomalies = self.anomalies.read().await;
997 let cutoff = Instant::now() - Duration::from_secs(300);
998
999 anomalies
1000 .iter()
1001 .filter(|a| a.detected_at > cutoff)
1002 .cloned()
1003 .collect()
1004 }
1005}
1006
1007impl AlertManager {
1008 pub fn new(cooldown_period: Duration) -> Self {
1010 Self {
1011 active_alerts: Arc::new(RwLock::new(HashMap::new())),
1012 rules: Arc::new(RwLock::new(Vec::new())),
1013 channels: Arc::new(RwLock::new(Vec::new())),
1014 cooldowns: Arc::new(RwLock::new(HashMap::new())),
1015 cooldown_period,
1016 }
1017 }
1018
1019 pub async fn add_rule(&self, rule: AlertRule) -> Result<()> {
1021 let mut rules = self.rules.write().await;
1022 rules.push(rule);
1023 Ok(())
1024 }
1025
1026 pub async fn add_channel(&self, channel: Box<dyn AlertChannel>) {
1028 let mut channels = self.channels.write().await;
1029 channels.push(channel);
1030 }
1031
1032 pub async fn trigger_alert(&self, alert: Alert) -> Result<()> {
1034 let mut cooldowns = self.cooldowns.write().await;
1036 if let Some(last_trigger) = cooldowns.get(&alert.id)
1037 && last_trigger.elapsed() < self.cooldown_period
1038 {
1039 return Ok(()); }
1041
1042 let mut active_alerts = self.active_alerts.write().await;
1044 active_alerts.insert(alert.id.clone(), alert.clone());
1045 cooldowns.insert(alert.id.clone(), Instant::now());
1046
1047 let channels = self.channels.read().await;
1049 for channel in channels.iter() {
1050 let _ = channel.send_alert(&alert).await;
1051 }
1052
1053 Ok(())
1054 }
1055
1056 pub async fn evaluate_rules(&self) -> Result<()> {
1058 let rules = self.rules.read().await.clone();
1059
1060 for _rule in rules {
1061 }
1065
1066 Ok(())
1067 }
1068
1069 pub async fn get_active_alerts(&self) -> Vec<Alert> {
1071 self.active_alerts.read().await.values().cloned().collect()
1072 }
1073
1074 pub async fn get_recent_alerts(&self, count: usize) -> Vec<Alert> {
1076 let mut alerts: Vec<_> = self.active_alerts.read().await.values().cloned().collect();
1077 alerts.sort_by_key(|a| std::cmp::Reverse(a.triggered_at));
1078 alerts.truncate(count);
1079 alerts
1080 }
1081}
1082
1083impl PerformanceProfiler {
1084 pub fn new(sample_rate: f64) -> Self {
1086 Self {
1087 profiles: Arc::new(RwLock::new(HashMap::new())),
1088 completed: Arc::new(RwLock::new(VecDeque::new())),
1089 sample_rate,
1090 }
1091 }
1092
1093 pub async fn start_profile(&self, name: String) {
1095 if rand::random::<f64>() > self.sample_rate {
1096 return; }
1098
1099 let mut profiles = self.profiles.write().await;
1100 profiles.insert(
1101 name.clone(),
1102 Profile {
1103 name,
1104 started_at: Instant::now(),
1105 samples: Vec::new(),
1106 },
1107 );
1108 }
1109
1110 pub async fn record_sample(&self, profile_name: &str) {
1112 let mut profiles = self.profiles.write().await;
1113
1114 if let Some(profile) = profiles.get_mut(profile_name) {
1115 profile.samples.push(ProfileSample {
1116 _timestamp: Instant::now(),
1117 cpu_usage: Self::get_cpu_usage(),
1118 memory_bytes: Self::get_memory_usage(),
1119 _operations: HashMap::new(), });
1121 }
1122 }
1123
1124 pub async fn end_profile(&self, name: &str) {
1126 let mut profiles = self.profiles.write().await;
1127
1128 if let Some(profile) = profiles.remove(name) {
1129 let duration = profile.started_at.elapsed();
1130
1131 let avg_cpu = profile.samples.iter().map(|s| s.cpu_usage).sum::<f64>()
1132 / profile.samples.len().max(1) as f64;
1133
1134 let peak_memory = profile
1135 .samples
1136 .iter()
1137 .map(|s| s.memory_bytes)
1138 .max()
1139 .unwrap_or(0);
1140
1141 let completed_profile = CompletedProfile {
1142 name: profile.name,
1143 duration,
1144 avg_cpu,
1145 peak_memory,
1146 operations: HashMap::new(), };
1148
1149 let mut completed = self.completed.write().await;
1150 completed.push_back(completed_profile);
1151
1152 if completed.len() > 100 {
1154 completed.pop_front();
1155 }
1156 }
1157 }
1158
1159 pub async fn get_current_profile(&self) -> Option<ProfileData> {
1161 Some(ProfileData {
1162 cpu_usage: Self::get_cpu_usage(),
1163 memory_bytes: Self::get_memory_usage(),
1164 thread_count: Self::get_thread_count(),
1165 active_profiles: self.profiles.read().await.len(),
1166 })
1167 }
1168
1169 fn get_cpu_usage() -> f64 {
1171 rand::random::<f64>() * 100.0
1173 }
1174
1175 fn get_memory_usage() -> u64 {
1177 1024 * 1024 * 512 }
1180
1181 fn get_thread_count() -> usize {
1183 8
1185 }
1186}
1187
1188impl DebugLogger {
1189 pub fn new(level: LogLevel) -> Self {
1191 Self {
1192 level,
1193 buffer: Arc::new(RwLock::new(VecDeque::new())),
1194 channels: Arc::new(RwLock::new(Vec::new())),
1195 }
1196 }
1197
1198 pub async fn log(&self, level: LogLevel, component: &str, message: &str) {
1200 if level > self.level {
1201 return; }
1203
1204 let entry = LogEntry {
1205 timestamp: Instant::now(),
1206 level,
1207 component: component.to_string(),
1208 message: message.to_string(),
1209 data: None,
1210 };
1211
1212 let mut buffer = self.buffer.write().await;
1214 buffer.push_back(entry.clone());
1215
1216 if buffer.len() > 10000 {
1218 buffer.pop_front();
1219 }
1220
1221 let channels = self.channels.read().await;
1223 for channel in channels.iter() {
1224 let _ = channel.send(entry.clone());
1225 }
1226 }
1227
1228 pub async fn error(&self, component: &str, message: &str) {
1230 self.log(LogLevel::Error, component, message).await;
1231 }
1232
1233 pub async fn warn(&self, component: &str, message: &str) {
1235 self.log(LogLevel::Warn, component, message).await;
1236 }
1237
1238 pub async fn info(&self, component: &str, message: &str) {
1240 self.log(LogLevel::Info, component, message).await;
1241 }
1242
1243 pub async fn debug(&self, component: &str, message: &str) {
1245 self.log(LogLevel::Debug, component, message).await;
1246 }
1247
1248 pub async fn trace(&self, component: &str, message: &str) {
1250 self.log(LogLevel::Trace, component, message).await;
1251 }
1252
1253 pub async fn subscribe(&self) -> mpsc::UnboundedReceiver<LogEntry> {
1255 let (tx, rx) = mpsc::unbounded_channel();
1256 let mut channels = self.channels.write().await;
1257 channels.push(tx);
1258 rx
1259 }
1260
1261 pub async fn get_recent_logs(&self, count: usize) -> Vec<LogEntry> {
1263 let buffer = self.buffer.read().await;
1264 buffer.iter().rev().take(count).cloned().collect()
1265 }
1266}
1267
1268#[derive(Debug, Clone)]
1270pub struct NetworkHealth {
1271 pub score: f64,
1273
1274 pub status: HealthStatus,
1276
1277 pub active_nodes: u64,
1279
1280 pub churn_rate: f64,
1282
1283 pub routing_success_rate: f64,
1285
1286 pub storage_utilization: f64,
1288
1289 pub active_alerts: usize,
1291}
1292
1293#[derive(Debug, Clone, PartialEq)]
1295pub enum HealthStatus {
1296 Healthy,
1297 Degraded,
1298 Critical,
1299}
1300
1301#[derive(Debug, Clone)]
1303pub struct DashboardData {
1304 pub health: NetworkHealth,
1306
1307 pub metrics: HashMap<String, f64>,
1309
1310 pub recent_alerts: Vec<Alert>,
1312
1313 pub anomalies: Vec<Anomaly>,
1315
1316 pub performance: Option<ProfileData>,
1318}
1319
1320#[derive(Debug, Clone)]
1322pub struct ProfileData {
1323 pub cpu_usage: f64,
1325
1326 pub memory_bytes: u64,
1328
1329 pub thread_count: usize,
1331
1332 pub active_profiles: usize,
1334}
1335
1336#[cfg(test)]
1337mod tests {
1338 use super::*;
1339
1340 #[tokio::test]
1341 async fn test_anomaly_detection() {
1342 let detector = AnomalyDetector::new(100);
1343
1344 for i in 0..50 {
1346 detector
1347 .update_metric("test_metric", 50.0 + (i as f64 % 10.0))
1348 .await;
1349 }
1350
1351 detector.update_metric("test_metric", 200.0).await;
1353
1354 let anomalies = detector.get_recent_anomalies().await;
1356 assert!(!anomalies.is_empty());
1357 assert_eq!(anomalies[0].anomaly_type, AnomalyType::Statistical);
1358 }
1359
1360 #[tokio::test]
1361 async fn test_alert_cooldown() {
1362 let alert_manager = AlertManager::new(Duration::from_secs(60));
1363
1364 let alert = Alert {
1365 id: "test_alert".to_string(),
1366 name: "Test Alert".to_string(),
1367 severity: AlertSeverity::Warning,
1368 message: "Test message".to_string(),
1369 triggered_at: Instant::now(),
1370 metrics: HashMap::new(),
1371 };
1372
1373 alert_manager.trigger_alert(alert.clone()).await.unwrap();
1375 assert_eq!(alert_manager.get_active_alerts().await.len(), 1);
1376
1377 alert_manager.trigger_alert(alert).await.unwrap();
1379 assert_eq!(alert_manager.get_active_alerts().await.len(), 1);
1380 }
1381
1382 #[tokio::test]
1383 async fn test_performance_profiling() {
1384 let profiler = PerformanceProfiler::new(1.0); profiler.start_profile("test_operation".to_string()).await;
1387
1388 for _ in 0..5 {
1390 profiler.record_sample("test_operation").await;
1391 tokio::time::sleep(Duration::from_millis(10)).await;
1392 }
1393
1394 profiler.end_profile("test_operation").await;
1395
1396 let completed = profiler.completed.read().await;
1398 assert_eq!(completed.len(), 1);
1399 assert_eq!(completed[0].name, "test_operation");
1400 }
1401
1402 #[tokio::test]
1403 async fn test_debug_logging() {
1404 let logger = DebugLogger::new(LogLevel::Debug);
1405
1406 let mut rx = logger.subscribe().await;
1408
1409 logger.error("test", "Error message").await;
1411 logger.warn("test", "Warning message").await;
1412 logger.info("test", "Info message").await;
1413 logger.debug("test", "Debug message").await;
1414 logger.trace("test", "Trace message").await; let mut count = 0;
1418 while let Ok(entry) = rx.try_recv() {
1419 count += 1;
1420 assert!(entry.level <= LogLevel::Debug);
1421 }
1422 assert_eq!(count, 4); }
1424}