saorsa_core/adaptive/
monitoring.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: saorsalabs@gmail.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Monitoring and metrics system for the adaptive P2P network
15//!
16//! This module provides comprehensive monitoring capabilities:
17//! - Prometheus metrics export for external monitoring
18//! - Real-time anomaly detection using statistical analysis
19//! - Network health dashboards with key performance indicators
20//! - Alert system for critical conditions
21//! - Performance profiling for bottleneck detection
22//! - Debug logging with configurable levels
23
24use super::*;
25use crate::adaptive::{
26    AdaptiveGossipSub, AdaptiveRouter, ChurnHandler, ContentStore, ReplicationManager,
27    learning::{QLearnCacheManager, ThompsonSampling},
28};
29use anyhow::Result;
30#[cfg(feature = "metrics")]
31use prometheus::{
32    Counter, Encoder, Gauge, Histogram, IntCounter, IntGauge, Registry, TextEncoder,
33    register_counter, register_gauge, register_histogram, register_int_counter, register_int_gauge,
34};
35use std::{
36    collections::{HashMap, VecDeque},
37    sync::Arc,
38    time::{Duration, Instant},
39};
40use tokio::sync::{RwLock, mpsc};
41
42/// Monitoring system for the adaptive P2P network
43pub struct MonitoringSystem {
44    #[cfg(feature = "metrics")]
45    /// Prometheus registry for metrics
46    registry: Arc<Registry>,
47
48    /// Core metrics
49    metrics: Arc<NetworkMetrics>,
50
51    /// Anomaly detector
52    anomaly_detector: Arc<AnomalyDetector>,
53
54    /// Alert manager
55    alert_manager: Arc<AlertManager>,
56
57    /// Performance profiler
58    profiler: Arc<PerformanceProfiler>,
59
60    /// Debug logger
61    logger: Arc<DebugLogger>,
62
63    /// Network components to monitor
64    components: Arc<MonitoredComponents>,
65
66    /// Configuration
67    config: MonitoringConfig,
68}
69
70/// Configuration for monitoring system
71#[derive(Debug, Clone)]
72pub struct MonitoringConfig {
73    /// Metrics collection interval
74    pub collection_interval: Duration,
75
76    /// Anomaly detection window size
77    pub anomaly_window_size: usize,
78
79    /// Alert cooldown period
80    pub alert_cooldown: Duration,
81
82    /// Performance sampling rate (0.0-1.0)
83    pub profiling_sample_rate: f64,
84
85    /// Debug log level
86    pub log_level: LogLevel,
87
88    /// Dashboard update interval
89    pub dashboard_interval: Duration,
90}
91
92impl Default for MonitoringConfig {
93    fn default() -> Self {
94        Self {
95            collection_interval: Duration::from_secs(5),
96            anomaly_window_size: 100,
97            alert_cooldown: Duration::from_secs(300),
98            profiling_sample_rate: 0.01,
99            log_level: LogLevel::Info,
100            dashboard_interval: Duration::from_secs(1),
101        }
102    }
103}
104
105/// Log levels for debug logging
106#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
107pub enum LogLevel {
108    Error,
109    Warn,
110    Info,
111    Debug,
112    Trace,
113}
114
115/// Core network metrics exposed via Prometheus
116#[allow(dead_code)]
117struct NetworkMetrics {
118    #[cfg(feature = "metrics")]
119    // Node metrics
120    connected_nodes: IntGauge,
121    #[cfg(feature = "metrics")]
122    active_nodes: IntGauge,
123    #[cfg(feature = "metrics")]
124    suspicious_nodes: IntGauge,
125    #[cfg(feature = "metrics")]
126    failed_nodes: IntGauge,
127
128    #[cfg(feature = "metrics")]
129    // Routing metrics
130    routing_requests: Counter,
131    #[cfg(feature = "metrics")]
132    routing_success: Counter,
133    #[cfg(feature = "metrics")]
134    routing_latency: Histogram,
135
136    #[cfg(feature = "metrics")]
137    // Storage metrics
138    stored_items: IntGauge,
139    #[cfg(feature = "metrics")]
140    storage_bytes: IntGauge,
141    #[cfg(feature = "metrics")]
142    replication_factor: Gauge,
143
144    #[cfg(feature = "metrics")]
145    // Network traffic metrics
146    messages_sent: Counter,
147    #[cfg(feature = "metrics")]
148    messages_received: Counter,
149    #[cfg(feature = "metrics")]
150    bytes_sent: Counter,
151    #[cfg(feature = "metrics")]
152    bytes_received: Counter,
153
154    #[cfg(feature = "metrics")]
155    // Cache metrics
156    cache_hits: Counter,
157    #[cfg(feature = "metrics")]
158    cache_misses: Counter,
159    #[cfg(feature = "metrics")]
160    cache_size: IntGauge,
161    #[cfg(feature = "metrics")]
162    cache_evictions: Counter,
163
164    #[cfg(feature = "metrics")]
165    // Learning metrics
166    thompson_selections: IntCounter,
167    #[cfg(feature = "metrics")]
168    qlearn_updates: Counter,
169    #[cfg(feature = "metrics")]
170    churn_predictions: Counter,
171
172    #[cfg(feature = "metrics")]
173    // Gossip metrics
174    gossip_messages: Counter,
175    #[cfg(feature = "metrics")]
176    mesh_size: IntGauge,
177    #[cfg(feature = "metrics")]
178    topic_count: IntGauge,
179
180    #[cfg(feature = "metrics")]
181    // Performance metrics
182    cpu_usage: Gauge,
183    #[cfg(feature = "metrics")]
184    memory_usage: IntGauge,
185    #[cfg(feature = "metrics")]
186    thread_count: IntGauge,
187
188    #[cfg(not(feature = "metrics"))]
189    // Placeholder for when metrics are disabled
190    _placeholder: (),
191}
192
193/// Components being monitored
194pub struct MonitoredComponents {
195    pub router: Arc<AdaptiveRouter>,
196    pub churn_handler: Arc<ChurnHandler>,
197    pub gossip: Arc<AdaptiveGossipSub>,
198    pub storage: Arc<ContentStore>,
199    pub replication: Arc<ReplicationManager>,
200    pub thompson: Arc<ThompsonSampling>,
201    pub cache: Arc<QLearnCacheManager>,
202}
203
204/// Anomaly detection system
205pub struct AnomalyDetector {
206    /// Historical data for each metric
207    history: Arc<RwLock<HashMap<String, MetricHistory>>>,
208
209    /// Detected anomalies
210    anomalies: Arc<RwLock<Vec<Anomaly>>>,
211
212    /// Configuration
213    window_size: usize,
214}
215
216/// Historical data for a metric
217struct MetricHistory {
218    /// Sliding window of values
219    values: VecDeque<f64>,
220
221    /// Running statistics
222    mean: f64,
223    std_dev: f64,
224
225    /// Last update time
226    last_update: Instant,
227}
228
229/// Detected anomaly
230#[derive(Debug, Clone)]
231pub struct Anomaly {
232    /// Metric name
233    pub metric: String,
234
235    /// Anomaly type
236    pub anomaly_type: AnomalyType,
237
238    /// Severity (0.0-1.0)
239    pub severity: f64,
240
241    /// Detection time
242    pub detected_at: Instant,
243
244    /// Current value
245    pub value: f64,
246
247    /// Expected range
248    pub expected_range: (f64, f64),
249}
250
251/// Types of anomalies
252#[derive(Debug, Clone, PartialEq)]
253pub enum AnomalyType {
254    /// Value outside statistical bounds
255    Statistical,
256
257    /// Sudden spike in value
258    Spike,
259
260    /// Gradual drift from normal
261    Drift,
262
263    /// Unusual pattern
264    Pattern,
265}
266
267/// Alert management system
268pub struct AlertManager {
269    /// Active alerts
270    active_alerts: Arc<RwLock<HashMap<String, Alert>>>,
271
272    /// Alert rules
273    rules: Arc<RwLock<Vec<AlertRule>>>,
274
275    /// Alert channels
276    channels: Arc<RwLock<Vec<Box<dyn AlertChannel>>>>,
277
278    /// Cooldown tracking
279    cooldowns: Arc<RwLock<HashMap<String, Instant>>>,
280
281    /// Configuration
282    cooldown_period: Duration,
283}
284
285/// Alert definition
286#[derive(Debug, Clone)]
287pub struct Alert {
288    /// Alert ID
289    pub id: String,
290
291    /// Alert name
292    pub name: String,
293
294    /// Severity level
295    pub severity: AlertSeverity,
296
297    /// Alert message
298    pub message: String,
299
300    /// Triggered at
301    pub triggered_at: Instant,
302
303    /// Associated metrics
304    pub metrics: HashMap<String, f64>,
305}
306
307/// Alert severity levels
308#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
309pub enum AlertSeverity {
310    Info,
311    Warning,
312    Critical,
313}
314
315/// Alert rule definition
316#[derive(Debug, Clone)]
317pub struct AlertRule {
318    /// Rule name
319    pub name: String,
320
321    /// Condition to check
322    pub condition: AlertCondition,
323
324    /// Severity if triggered
325    pub severity: AlertSeverity,
326
327    /// Message template
328    pub message_template: String,
329}
330
331/// Alert conditions
332#[derive(Debug, Clone)]
333pub enum AlertCondition {
334    /// Metric above threshold
335    Above { metric: String, threshold: f64 },
336
337    /// Metric below threshold
338    Below { metric: String, threshold: f64 },
339
340    /// Metric rate of change
341    RateOfChange { metric: String, threshold: f64 },
342
343    /// Anomaly detected
344    AnomalyDetected { metric: String },
345}
346
347/// Alert channel trait
348#[async_trait]
349pub trait AlertChannel: Send + Sync {
350    /// Send an alert
351    async fn send_alert(&self, alert: &Alert) -> Result<()>;
352}
353
354/// Performance profiler
355pub struct PerformanceProfiler {
356    /// Active profiles
357    profiles: Arc<RwLock<HashMap<String, Profile>>>,
358
359    /// Completed profiles
360    completed: Arc<RwLock<VecDeque<CompletedProfile>>>,
361
362    /// Sampling rate
363    sample_rate: f64,
364}
365
366/// Active performance profile
367struct Profile {
368    /// Profile name
369    name: String,
370
371    /// Start time
372    started_at: Instant,
373
374    /// Samples collected
375    samples: Vec<ProfileSample>,
376}
377
378/// Profile sample
379#[derive(Debug, Clone)]
380struct ProfileSample {
381    /// Timestamp
382    _timestamp: Instant,
383
384    /// CPU usage
385    cpu_usage: f64,
386
387    /// Memory usage
388    memory_bytes: u64,
389
390    /// Active operations
391    _operations: HashMap<String, u64>,
392}
393
394/// Completed profile
395#[derive(Debug, Clone)]
396pub struct CompletedProfile {
397    /// Profile name
398    pub name: String,
399
400    /// Duration
401    pub duration: Duration,
402
403    /// Average CPU usage
404    pub avg_cpu: f64,
405
406    /// Peak memory usage
407    pub peak_memory: u64,
408
409    /// Operation counts
410    pub operations: HashMap<String, u64>,
411}
412
413/// Debug logger
414pub struct DebugLogger {
415    /// Log level
416    level: LogLevel,
417
418    /// Log buffer
419    buffer: Arc<RwLock<VecDeque<LogEntry>>>,
420
421    /// Log channels
422    channels: Arc<RwLock<Vec<mpsc::UnboundedSender<LogEntry>>>>,
423}
424
425/// Log entry
426#[derive(Debug, Clone)]
427pub struct LogEntry {
428    /// Timestamp
429    pub timestamp: Instant,
430
431    /// Log level
432    pub level: LogLevel,
433
434    /// Component
435    pub component: String,
436
437    /// Message
438    pub message: String,
439
440    /// Associated data
441    pub data: Option<serde_json::Value>,
442}
443
444impl MonitoringSystem {
445    /// Create a new monitoring system
446    pub fn new(components: MonitoredComponents, config: MonitoringConfig) -> Result<Self> {
447        #[cfg(feature = "metrics")]
448        let registry = Registry::new();
449
450        // Initialize metrics
451        #[cfg(feature = "metrics")]
452        let metrics = NetworkMetrics {
453            // Node metrics
454            connected_nodes: register_int_gauge!(
455                "p2p_connected_nodes",
456                "Number of connected nodes"
457            )?,
458            active_nodes: register_int_gauge!("p2p_active_nodes", "Number of active nodes")?,
459            suspicious_nodes: register_int_gauge!(
460                "p2p_suspicious_nodes",
461                "Number of suspicious nodes"
462            )?,
463            failed_nodes: register_int_gauge!("p2p_failed_nodes", "Number of failed nodes")?,
464
465            // Routing metrics
466            routing_requests: register_counter!(
467                "p2p_routing_requests_total",
468                "Total routing requests"
469            )?,
470            routing_success: register_counter!(
471                "p2p_routing_success_total",
472                "Successful routing requests"
473            )?,
474            routing_latency: register_histogram!(
475                "p2p_routing_latency_seconds",
476                "Routing request latency in seconds"
477            )?,
478
479            // Storage metrics
480            stored_items: register_int_gauge!("p2p_stored_items", "Number of stored items")?,
481            storage_bytes: register_int_gauge!("p2p_storage_bytes", "Total storage in bytes")?,
482            replication_factor: register_gauge!(
483                "p2p_replication_factor",
484                "Average replication factor"
485            )?,
486
487            // Network traffic metrics
488            messages_sent: register_counter!("p2p_messages_sent_total", "Total messages sent")?,
489            messages_received: register_counter!(
490                "p2p_messages_received_total",
491                "Total messages received"
492            )?,
493            bytes_sent: register_counter!("p2p_bytes_sent_total", "Total bytes sent")?,
494            bytes_received: register_counter!("p2p_bytes_received_total", "Total bytes received")?,
495
496            // Cache metrics
497            cache_hits: register_counter!("p2p_cache_hits_total", "Total cache hits")?,
498            cache_misses: register_counter!("p2p_cache_misses_total", "Total cache misses")?,
499            cache_size: register_int_gauge!("p2p_cache_size_bytes", "Cache size in bytes")?,
500            cache_evictions: register_counter!(
501                "p2p_cache_evictions_total",
502                "Total cache evictions"
503            )?,
504
505            // Learning metrics
506            thompson_selections: register_int_counter!(
507                "p2p_thompson_selections_total",
508                "Thompson sampling strategy selections"
509            )?,
510            qlearn_updates: register_counter!("p2p_qlearn_updates_total", "Q-learning updates")?,
511            churn_predictions: register_counter!(
512                "p2p_churn_predictions_total",
513                "Churn predictions made"
514            )?,
515
516            // Gossip metrics
517            gossip_messages: register_counter!(
518                "p2p_gossip_messages_total",
519                "Total gossip messages"
520            )?,
521            mesh_size: register_int_gauge!("p2p_gossip_mesh_size", "Gossip mesh size")?,
522            topic_count: register_int_gauge!("p2p_gossip_topics", "Number of gossip topics")?,
523
524            // Performance metrics
525            cpu_usage: register_gauge!("p2p_cpu_usage_percent", "CPU usage percentage")?,
526            memory_usage: register_int_gauge!("p2p_memory_usage_bytes", "Memory usage in bytes")?,
527            thread_count: register_int_gauge!("p2p_thread_count", "Number of threads")?,
528        };
529
530        #[cfg(not(feature = "metrics"))]
531        let metrics = NetworkMetrics { _placeholder: () };
532
533        let anomaly_detector = Arc::new(AnomalyDetector::new(config.anomaly_window_size));
534        let alert_manager = Arc::new(AlertManager::new(config.alert_cooldown));
535        let profiler = Arc::new(PerformanceProfiler::new(config.profiling_sample_rate));
536        let logger = Arc::new(DebugLogger::new(config.log_level));
537
538        // Set up default alert rules
539        let monitoring = Self {
540            #[cfg(feature = "metrics")]
541            registry: Arc::new(registry),
542            metrics: Arc::new(metrics),
543            anomaly_detector,
544            alert_manager,
545            profiler,
546            logger,
547            components: Arc::new(components),
548            config,
549        };
550
551        Ok(monitoring)
552    }
553
554    /// Start monitoring
555    pub async fn start(&self) {
556        let interval = self.config.collection_interval;
557        let monitoring = self.clone_for_task();
558
559        tokio::spawn(async move {
560            let mut interval = tokio::time::interval(interval);
561
562            loop {
563                interval.tick().await;
564
565                if let Err(e) = monitoring.collect_metrics().await {
566                    // Log error but continue monitoring
567                    monitoring
568                        .logger
569                        .error("monitoring", &format!("Metric collection error: {e}"))
570                        .await;
571                }
572            }
573        });
574
575        // Start anomaly detection
576        self.start_anomaly_detection().await;
577
578        // Start alert processing
579        self.start_alert_processing().await;
580    }
581
582    /// Collect metrics from all components
583    async fn collect_metrics(&self) -> Result<()> {
584        // Collect churn statistics
585        let churn_stats = self.components.churn_handler.get_stats().await;
586
587        #[cfg(feature = "metrics")]
588        {
589            self.metrics
590                .active_nodes
591                .set(churn_stats.active_nodes as i64);
592            self.metrics
593                .suspicious_nodes
594                .set(churn_stats.suspicious_nodes as i64);
595            self.metrics
596                .failed_nodes
597                .set(churn_stats.failed_nodes as i64);
598        }
599
600        // Collect routing statistics
601        let routing_stats = self.components.router.get_stats().await;
602
603        #[cfg(feature = "metrics")]
604        {
605            self.metrics
606                .routing_requests
607                .inc_by(routing_stats.total_requests as f64);
608            self.metrics
609                .routing_success
610                .inc_by(routing_stats.successful_requests as f64);
611        }
612
613        // Collect storage statistics
614        let storage_stats = self.components.storage.get_stats().await;
615
616        #[cfg(feature = "metrics")]
617        {
618            self.metrics
619                .stored_items
620                .set(storage_stats.total_items as i64);
621            self.metrics
622                .storage_bytes
623                .set(storage_stats.total_bytes as i64);
624        }
625
626        // Collect gossip statistics
627        let gossip_stats = self.components.gossip.get_stats().await;
628
629        #[cfg(feature = "metrics")]
630        {
631            self.metrics
632                .gossip_messages
633                .inc_by(gossip_stats.messages_sent as f64);
634            self.metrics.mesh_size.set(gossip_stats.mesh_size as i64);
635            self.metrics
636                .topic_count
637                .set(gossip_stats.topic_count as i64);
638        }
639
640        // Collect cache statistics
641        let cache_stats = self.components.cache.get_stats();
642
643        #[cfg(feature = "metrics")]
644        {
645            self.metrics.cache_hits.inc_by(cache_stats.hits as f64);
646            self.metrics.cache_misses.inc_by(cache_stats.misses as f64);
647            self.metrics.cache_size.set(cache_stats.size_bytes as i64);
648        }
649
650        // Update anomaly detector
651        self.update_anomaly_detector().await?;
652
653        Ok(())
654    }
655
656    /// Export metrics in Prometheus format
657    pub fn export_metrics(&self) -> Result<String> {
658        #[cfg(feature = "metrics")]
659        {
660            let encoder = TextEncoder::new();
661            let metric_families = self.registry.gather();
662            let mut buffer = Vec::new();
663            encoder.encode(&metric_families, &mut buffer)?;
664            String::from_utf8(buffer).map_err(|e| anyhow::anyhow!("UTF-8 error: {}", e))
665        }
666
667        #[cfg(not(feature = "metrics"))]
668        {
669            Ok("# Metrics disabled\n".to_string())
670        }
671    }
672
673    /// Get current network health
674    pub async fn get_health(&self) -> NetworkHealth {
675        let churn_stats = self.components.churn_handler.get_stats().await;
676        let routing_stats = self.components.router.get_stats().await;
677        let storage_stats = self.components.storage.get_stats().await;
678
679        let health_score =
680            self.calculate_health_score(&churn_stats, &routing_stats, &storage_stats);
681
682        NetworkHealth {
683            score: health_score,
684            status: if health_score > 0.8 {
685                HealthStatus::Healthy
686            } else if health_score > 0.5 {
687                HealthStatus::Degraded
688            } else {
689                HealthStatus::Critical
690            },
691            active_nodes: churn_stats.active_nodes,
692            churn_rate: churn_stats.churn_rate,
693            routing_success_rate: routing_stats.success_rate(),
694            storage_utilization: storage_stats.utilization(),
695            active_alerts: self.alert_manager.get_active_alerts().await.len(),
696        }
697    }
698
699    /// Calculate overall health score
700    fn calculate_health_score(
701        &self,
702        churn_stats: &crate::adaptive::churn::ChurnStats,
703        routing_stats: &crate::adaptive::routing::RoutingStats,
704        storage_stats: &crate::adaptive::storage::StorageStats,
705    ) -> f64 {
706        let mut score = 1.0;
707
708        // Penalize high churn
709        if churn_stats.churn_rate > 0.3 {
710            score *= 0.7;
711        } else if churn_stats.churn_rate > 0.1 {
712            score *= 0.9;
713        }
714
715        // Penalize low routing success
716        let routing_success = routing_stats.success_rate();
717        if routing_success < 0.9 {
718            score *= routing_success;
719        }
720
721        // Penalize high storage utilization
722        let storage_util = storage_stats.utilization();
723        if storage_util > 0.9 {
724            score *= 0.8;
725        }
726
727        score
728    }
729
730    /// Start anomaly detection
731    async fn start_anomaly_detection(&self) {
732        let detector = self.anomaly_detector.clone();
733        let alert_manager = self.alert_manager.clone();
734        let logger = self.logger.clone();
735
736        tokio::spawn(async move {
737            let mut interval = tokio::time::interval(Duration::from_secs(10));
738
739            loop {
740                interval.tick().await;
741
742                let anomalies = detector.get_recent_anomalies().await;
743                for anomaly in anomalies {
744                    // Log anomaly
745                    logger
746                        .warn(
747                            "anomaly_detector",
748                            &format!("Anomaly detected: {anomaly:?}"),
749                        )
750                        .await;
751
752                    // Create alert if severe
753                    if anomaly.severity > 0.7 {
754                        let alert = Alert {
755                            id: format!("anomaly_{}", anomaly.metric),
756                            name: format!("{} Anomaly", anomaly.metric),
757                            severity: AlertSeverity::Warning,
758                            message: format!(
759                                "Anomaly detected in {}: value {} outside expected range {:?}",
760                                anomaly.metric, anomaly.value, anomaly.expected_range
761                            ),
762                            triggered_at: Instant::now(),
763                            metrics: HashMap::from([(anomaly.metric.clone(), anomaly.value)]),
764                        };
765
766                        let _ = alert_manager.trigger_alert(alert).await;
767                    }
768                }
769            }
770        });
771    }
772
773    /// Start alert processing
774    async fn start_alert_processing(&self) {
775        let alert_manager = self.alert_manager.clone();
776
777        // Add default alert rules
778        let rules = vec![
779            AlertRule {
780                name: "High Churn Rate".to_string(),
781                condition: AlertCondition::Above {
782                    metric: "churn_rate".to_string(),
783                    threshold: 0.5,
784                },
785                severity: AlertSeverity::Critical,
786                message_template: "Churn rate is critically high: {value}".to_string(),
787            },
788            AlertRule {
789                name: "Low Routing Success".to_string(),
790                condition: AlertCondition::Below {
791                    metric: "routing_success_rate".to_string(),
792                    threshold: 0.8,
793                },
794                severity: AlertSeverity::Warning,
795                message_template: "Routing success rate is low: {value}".to_string(),
796            },
797            AlertRule {
798                name: "Storage Near Capacity".to_string(),
799                condition: AlertCondition::Above {
800                    metric: "storage_utilization".to_string(),
801                    threshold: 0.9,
802                },
803                severity: AlertSeverity::Warning,
804                message_template: "Storage utilization is high: {value}".to_string(),
805            },
806        ];
807
808        for rule in rules {
809            let _ = alert_manager.add_rule(rule).await;
810        }
811
812        // Start rule evaluation
813        tokio::spawn(async move {
814            let mut interval = tokio::time::interval(Duration::from_secs(30));
815
816            loop {
817                interval.tick().await;
818                let _ = alert_manager.evaluate_rules().await;
819            }
820        });
821    }
822
823    /// Update anomaly detector with current metrics
824    async fn update_anomaly_detector(&self) -> Result<()> {
825        // Update with key metrics
826        let churn_stats = self.components.churn_handler.get_stats().await;
827        self.anomaly_detector
828            .update_metric("churn_rate", churn_stats.churn_rate)
829            .await;
830
831        let routing_stats = self.components.router.get_stats().await;
832        self.anomaly_detector
833            .update_metric("routing_success_rate", routing_stats.success_rate())
834            .await;
835
836        let storage_stats = self.components.storage.get_stats().await;
837        self.anomaly_detector
838            .update_metric("storage_utilization", storage_stats.utilization())
839            .await;
840
841        Ok(())
842    }
843
844    /// Get dashboard data
845    pub async fn get_dashboard_data(&self) -> DashboardData {
846        DashboardData {
847            health: self.get_health().await,
848            metrics: self.get_current_metrics().await,
849            recent_alerts: self.alert_manager.get_recent_alerts(10).await,
850            anomalies: self.anomaly_detector.get_recent_anomalies().await,
851            performance: self.profiler.get_current_profile().await,
852        }
853    }
854
855    /// Get current metric values
856    async fn get_current_metrics(&self) -> HashMap<String, f64> {
857        let mut metrics = HashMap::new();
858
859        // Collect current values
860        let churn_stats = self.components.churn_handler.get_stats().await;
861        metrics.insert("active_nodes".to_string(), churn_stats.active_nodes as f64);
862        metrics.insert("churn_rate".to_string(), churn_stats.churn_rate);
863
864        let routing_stats = self.components.router.get_stats().await;
865        metrics.insert(
866            "routing_success_rate".to_string(),
867            routing_stats.success_rate(),
868        );
869
870        let storage_stats = self.components.storage.get_stats().await;
871        metrics.insert(
872            "storage_items".to_string(),
873            storage_stats.total_items as f64,
874        );
875        metrics.insert(
876            "storage_bytes".to_string(),
877            storage_stats.total_bytes as f64,
878        );
879
880        metrics
881    }
882
883    /// Clone for spawning tasks
884    fn clone_for_task(&self) -> Self {
885        Self {
886            #[cfg(feature = "metrics")]
887            registry: self.registry.clone(),
888            metrics: self.metrics.clone(),
889            anomaly_detector: self.anomaly_detector.clone(),
890            alert_manager: self.alert_manager.clone(),
891            profiler: self.profiler.clone(),
892            logger: self.logger.clone(),
893            components: self.components.clone(),
894            config: self.config.clone(),
895        }
896    }
897}
898
899impl AnomalyDetector {
900    /// Create a new anomaly detector
901    pub fn new(window_size: usize) -> Self {
902        Self {
903            history: Arc::new(RwLock::new(HashMap::new())),
904            anomalies: Arc::new(RwLock::new(Vec::new())),
905            window_size,
906        }
907    }
908
909    /// Update a metric value
910    pub async fn update_metric(&self, metric: &str, value: f64) {
911        let mut history = self.history.write().await;
912
913        let metric_history = history
914            .entry(metric.to_string())
915            .or_insert_with(|| MetricHistory {
916                values: VecDeque::new(),
917                mean: 0.0,
918                std_dev: 0.0,
919                last_update: Instant::now(),
920            });
921
922        // Add value to sliding window
923        metric_history.values.push_back(value);
924        if metric_history.values.len() > self.window_size {
925            metric_history.values.pop_front();
926        }
927
928        // Update statistics
929        if metric_history.values.len() >= 10 {
930            let sum: f64 = metric_history.values.iter().sum();
931            metric_history.mean = sum / metric_history.values.len() as f64;
932
933            let variance: f64 = metric_history
934                .values
935                .iter()
936                .map(|v| (v - metric_history.mean).powi(2))
937                .sum::<f64>()
938                / metric_history.values.len() as f64;
939            metric_history.std_dev = variance.sqrt();
940
941            // Check for anomalies
942            if let Some(anomaly) = self.detect_anomaly(metric, value, metric_history) {
943                let mut anomalies = self.anomalies.write().await;
944                anomalies.push(anomaly);
945
946                // Keep only recent anomalies
947                if anomalies.len() > 1000 {
948                    anomalies.drain(0..100);
949                }
950            }
951        }
952
953        metric_history.last_update = Instant::now();
954    }
955
956    /// Detect anomaly in metric
957    fn detect_anomaly(&self, metric: &str, value: f64, history: &MetricHistory) -> Option<Anomaly> {
958        // Statistical anomaly detection (3-sigma rule)
959        let z_score = (value - history.mean).abs() / history.std_dev;
960        if z_score > 3.0 {
961            return Some(Anomaly {
962                metric: metric.to_string(),
963                anomaly_type: AnomalyType::Statistical,
964                severity: (z_score - 3.0).min(1.0),
965                detected_at: Instant::now(),
966                value,
967                expected_range: (
968                    history.mean - 3.0 * history.std_dev,
969                    history.mean + 3.0 * history.std_dev,
970                ),
971            });
972        }
973
974        // Spike detection
975        if history.values.len() >= 2 {
976            let prev_value = history.values[history.values.len() - 2];
977            let change_rate = (value - prev_value).abs() / prev_value.abs().max(1.0);
978
979            if change_rate > 0.5 {
980                return Some(Anomaly {
981                    metric: metric.to_string(),
982                    anomaly_type: AnomalyType::Spike,
983                    severity: change_rate.min(1.0),
984                    detected_at: Instant::now(),
985                    value,
986                    expected_range: (prev_value * 0.5, prev_value * 1.5),
987                });
988            }
989        }
990
991        None
992    }
993
994    /// Get recent anomalies
995    pub async fn get_recent_anomalies(&self) -> Vec<Anomaly> {
996        let anomalies = self.anomalies.read().await;
997        let cutoff = Instant::now() - Duration::from_secs(300);
998
999        anomalies
1000            .iter()
1001            .filter(|a| a.detected_at > cutoff)
1002            .cloned()
1003            .collect()
1004    }
1005}
1006
1007impl AlertManager {
1008    /// Create a new alert manager
1009    pub fn new(cooldown_period: Duration) -> Self {
1010        Self {
1011            active_alerts: Arc::new(RwLock::new(HashMap::new())),
1012            rules: Arc::new(RwLock::new(Vec::new())),
1013            channels: Arc::new(RwLock::new(Vec::new())),
1014            cooldowns: Arc::new(RwLock::new(HashMap::new())),
1015            cooldown_period,
1016        }
1017    }
1018
1019    /// Add an alert rule
1020    pub async fn add_rule(&self, rule: AlertRule) -> Result<()> {
1021        let mut rules = self.rules.write().await;
1022        rules.push(rule);
1023        Ok(())
1024    }
1025
1026    /// Add an alert channel
1027    pub async fn add_channel(&self, channel: Box<dyn AlertChannel>) {
1028        let mut channels = self.channels.write().await;
1029        channels.push(channel);
1030    }
1031
1032    /// Trigger an alert
1033    pub async fn trigger_alert(&self, alert: Alert) -> Result<()> {
1034        // Check cooldown
1035        let mut cooldowns = self.cooldowns.write().await;
1036        if let Some(last_trigger) = cooldowns.get(&alert.id)
1037            && last_trigger.elapsed() < self.cooldown_period
1038        {
1039            return Ok(()); // Skip due to cooldown
1040        }
1041
1042        // Record alert
1043        let mut active_alerts = self.active_alerts.write().await;
1044        active_alerts.insert(alert.id.clone(), alert.clone());
1045        cooldowns.insert(alert.id.clone(), Instant::now());
1046
1047        // Send to all channels
1048        let channels = self.channels.read().await;
1049        for channel in channels.iter() {
1050            let _ = channel.send_alert(&alert).await;
1051        }
1052
1053        Ok(())
1054    }
1055
1056    /// Evaluate all rules
1057    pub async fn evaluate_rules(&self) -> Result<()> {
1058        let rules = self.rules.read().await.clone();
1059
1060        for _rule in rules {
1061            // Evaluate condition
1062            // This would check actual metric values
1063            // For now, this is a placeholder
1064        }
1065
1066        Ok(())
1067    }
1068
1069    /// Get active alerts
1070    pub async fn get_active_alerts(&self) -> Vec<Alert> {
1071        self.active_alerts.read().await.values().cloned().collect()
1072    }
1073
1074    /// Get recent alerts
1075    pub async fn get_recent_alerts(&self, count: usize) -> Vec<Alert> {
1076        let mut alerts: Vec<_> = self.active_alerts.read().await.values().cloned().collect();
1077        alerts.sort_by_key(|a| std::cmp::Reverse(a.triggered_at));
1078        alerts.truncate(count);
1079        alerts
1080    }
1081}
1082
1083impl PerformanceProfiler {
1084    /// Create a new performance profiler
1085    pub fn new(sample_rate: f64) -> Self {
1086        Self {
1087            profiles: Arc::new(RwLock::new(HashMap::new())),
1088            completed: Arc::new(RwLock::new(VecDeque::new())),
1089            sample_rate,
1090        }
1091    }
1092
1093    /// Start a profile
1094    pub async fn start_profile(&self, name: String) {
1095        if rand::random::<f64>() > self.sample_rate {
1096            return; // Skip based on sampling rate
1097        }
1098
1099        let mut profiles = self.profiles.write().await;
1100        profiles.insert(
1101            name.clone(),
1102            Profile {
1103                name,
1104                started_at: Instant::now(),
1105                samples: Vec::new(),
1106            },
1107        );
1108    }
1109
1110    /// Record a sample
1111    pub async fn record_sample(&self, profile_name: &str) {
1112        let mut profiles = self.profiles.write().await;
1113
1114        if let Some(profile) = profiles.get_mut(profile_name) {
1115            profile.samples.push(ProfileSample {
1116                _timestamp: Instant::now(),
1117                cpu_usage: Self::get_cpu_usage(),
1118                memory_bytes: Self::get_memory_usage(),
1119                _operations: HashMap::new(), // Would track actual operations
1120            });
1121        }
1122    }
1123
1124    /// End a profile
1125    pub async fn end_profile(&self, name: &str) {
1126        let mut profiles = self.profiles.write().await;
1127
1128        if let Some(profile) = profiles.remove(name) {
1129            let duration = profile.started_at.elapsed();
1130
1131            let avg_cpu = profile.samples.iter().map(|s| s.cpu_usage).sum::<f64>()
1132                / profile.samples.len().max(1) as f64;
1133
1134            let peak_memory = profile
1135                .samples
1136                .iter()
1137                .map(|s| s.memory_bytes)
1138                .max()
1139                .unwrap_or(0);
1140
1141            let completed_profile = CompletedProfile {
1142                name: profile.name,
1143                duration,
1144                avg_cpu,
1145                peak_memory,
1146                operations: HashMap::new(), // Aggregate operations
1147            };
1148
1149            let mut completed = self.completed.write().await;
1150            completed.push_back(completed_profile);
1151
1152            // Keep only recent profiles
1153            if completed.len() > 100 {
1154                completed.pop_front();
1155            }
1156        }
1157    }
1158
1159    /// Get current profile data
1160    pub async fn get_current_profile(&self) -> Option<ProfileData> {
1161        Some(ProfileData {
1162            cpu_usage: Self::get_cpu_usage(),
1163            memory_bytes: Self::get_memory_usage(),
1164            thread_count: Self::get_thread_count(),
1165            active_profiles: self.profiles.read().await.len(),
1166        })
1167    }
1168
1169    /// Get CPU usage (placeholder implementation)
1170    fn get_cpu_usage() -> f64 {
1171        // In real implementation, would use platform-specific APIs
1172        rand::random::<f64>() * 100.0
1173    }
1174
1175    /// Get memory usage (placeholder implementation)
1176    fn get_memory_usage() -> u64 {
1177        // In real implementation, would use platform-specific APIs
1178        1024 * 1024 * 512 // 512MB placeholder
1179    }
1180
1181    /// Get thread count (placeholder implementation)
1182    fn get_thread_count() -> usize {
1183        // In real implementation, would use platform-specific APIs
1184        8
1185    }
1186}
1187
1188impl DebugLogger {
1189    /// Create a new debug logger
1190    pub fn new(level: LogLevel) -> Self {
1191        Self {
1192            level,
1193            buffer: Arc::new(RwLock::new(VecDeque::new())),
1194            channels: Arc::new(RwLock::new(Vec::new())),
1195        }
1196    }
1197
1198    /// Log a message
1199    pub async fn log(&self, level: LogLevel, component: &str, message: &str) {
1200        if level > self.level {
1201            return; // Skip lower priority logs
1202        }
1203
1204        let entry = LogEntry {
1205            timestamp: Instant::now(),
1206            level,
1207            component: component.to_string(),
1208            message: message.to_string(),
1209            data: None,
1210        };
1211
1212        // Add to buffer
1213        let mut buffer = self.buffer.write().await;
1214        buffer.push_back(entry.clone());
1215
1216        // Keep buffer size limited
1217        if buffer.len() > 10000 {
1218            buffer.pop_front();
1219        }
1220
1221        // Send to channels
1222        let channels = self.channels.read().await;
1223        for channel in channels.iter() {
1224            let _ = channel.send(entry.clone());
1225        }
1226    }
1227
1228    /// Log error
1229    pub async fn error(&self, component: &str, message: &str) {
1230        self.log(LogLevel::Error, component, message).await;
1231    }
1232
1233    /// Log warning
1234    pub async fn warn(&self, component: &str, message: &str) {
1235        self.log(LogLevel::Warn, component, message).await;
1236    }
1237
1238    /// Log info
1239    pub async fn info(&self, component: &str, message: &str) {
1240        self.log(LogLevel::Info, component, message).await;
1241    }
1242
1243    /// Log debug
1244    pub async fn debug(&self, component: &str, message: &str) {
1245        self.log(LogLevel::Debug, component, message).await;
1246    }
1247
1248    /// Log trace
1249    pub async fn trace(&self, component: &str, message: &str) {
1250        self.log(LogLevel::Trace, component, message).await;
1251    }
1252
1253    /// Subscribe to log stream
1254    pub async fn subscribe(&self) -> mpsc::UnboundedReceiver<LogEntry> {
1255        let (tx, rx) = mpsc::unbounded_channel();
1256        let mut channels = self.channels.write().await;
1257        channels.push(tx);
1258        rx
1259    }
1260
1261    /// Get recent logs
1262    pub async fn get_recent_logs(&self, count: usize) -> Vec<LogEntry> {
1263        let buffer = self.buffer.read().await;
1264        buffer.iter().rev().take(count).cloned().collect()
1265    }
1266}
1267
1268/// Network health status
1269#[derive(Debug, Clone)]
1270pub struct NetworkHealth {
1271    /// Overall health score (0.0-1.0)
1272    pub score: f64,
1273
1274    /// Health status
1275    pub status: HealthStatus,
1276
1277    /// Number of active nodes
1278    pub active_nodes: u64,
1279
1280    /// Current churn rate
1281    pub churn_rate: f64,
1282
1283    /// Routing success rate
1284    pub routing_success_rate: f64,
1285
1286    /// Storage utilization
1287    pub storage_utilization: f64,
1288
1289    /// Number of active alerts
1290    pub active_alerts: usize,
1291}
1292
1293/// Health status levels
1294#[derive(Debug, Clone, PartialEq)]
1295pub enum HealthStatus {
1296    Healthy,
1297    Degraded,
1298    Critical,
1299}
1300
1301/// Dashboard data structure
1302#[derive(Debug, Clone)]
1303pub struct DashboardData {
1304    /// Current health
1305    pub health: NetworkHealth,
1306
1307    /// Current metric values
1308    pub metrics: HashMap<String, f64>,
1309
1310    /// Recent alerts
1311    pub recent_alerts: Vec<Alert>,
1312
1313    /// Recent anomalies
1314    pub anomalies: Vec<Anomaly>,
1315
1316    /// Performance data
1317    pub performance: Option<ProfileData>,
1318}
1319
1320/// Current profile data
1321#[derive(Debug, Clone)]
1322pub struct ProfileData {
1323    /// CPU usage percentage
1324    pub cpu_usage: f64,
1325
1326    /// Memory usage in bytes
1327    pub memory_bytes: u64,
1328
1329    /// Number of threads
1330    pub thread_count: usize,
1331
1332    /// Active profiles
1333    pub active_profiles: usize,
1334}
1335
1336#[cfg(test)]
1337mod tests {
1338    use super::*;
1339
1340    #[tokio::test]
1341    async fn test_anomaly_detection() {
1342        let detector = AnomalyDetector::new(100);
1343
1344        // Add normal values
1345        for i in 0..50 {
1346            detector
1347                .update_metric("test_metric", 50.0 + (i as f64 % 10.0))
1348                .await;
1349        }
1350
1351        // Add anomalous value
1352        detector.update_metric("test_metric", 200.0).await;
1353
1354        // Check anomalies detected
1355        let anomalies = detector.get_recent_anomalies().await;
1356        assert!(!anomalies.is_empty());
1357        assert_eq!(anomalies[0].anomaly_type, AnomalyType::Statistical);
1358    }
1359
1360    #[tokio::test]
1361    async fn test_alert_cooldown() {
1362        let alert_manager = AlertManager::new(Duration::from_secs(60));
1363
1364        let alert = Alert {
1365            id: "test_alert".to_string(),
1366            name: "Test Alert".to_string(),
1367            severity: AlertSeverity::Warning,
1368            message: "Test message".to_string(),
1369            triggered_at: Instant::now(),
1370            metrics: HashMap::new(),
1371        };
1372
1373        // First alert should trigger
1374        alert_manager.trigger_alert(alert.clone()).await.unwrap();
1375        assert_eq!(alert_manager.get_active_alerts().await.len(), 1);
1376
1377        // Second alert should be skipped due to cooldown
1378        alert_manager.trigger_alert(alert).await.unwrap();
1379        assert_eq!(alert_manager.get_active_alerts().await.len(), 1);
1380    }
1381
1382    #[tokio::test]
1383    async fn test_performance_profiling() {
1384        let profiler = PerformanceProfiler::new(1.0); // 100% sampling
1385
1386        profiler.start_profile("test_operation".to_string()).await;
1387
1388        // Record some samples
1389        for _ in 0..5 {
1390            profiler.record_sample("test_operation").await;
1391            tokio::time::sleep(Duration::from_millis(10)).await;
1392        }
1393
1394        profiler.end_profile("test_operation").await;
1395
1396        // Check profile was completed
1397        let completed = profiler.completed.read().await;
1398        assert_eq!(completed.len(), 1);
1399        assert_eq!(completed[0].name, "test_operation");
1400    }
1401
1402    #[tokio::test]
1403    async fn test_debug_logging() {
1404        let logger = DebugLogger::new(LogLevel::Debug);
1405
1406        // Subscribe to logs
1407        let mut rx = logger.subscribe().await;
1408
1409        // Log messages
1410        logger.error("test", "Error message").await;
1411        logger.warn("test", "Warning message").await;
1412        logger.info("test", "Info message").await;
1413        logger.debug("test", "Debug message").await;
1414        logger.trace("test", "Trace message").await; // Should be filtered
1415
1416        // Check received logs
1417        let mut count = 0;
1418        while let Ok(entry) = rx.try_recv() {
1419            count += 1;
1420            assert!(entry.level <= LogLevel::Debug);
1421        }
1422        assert_eq!(count, 4); // Trace should be filtered out
1423    }
1424}