saorsa_core/adaptive/
monitoring.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: saorsalabs@gmail.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Monitoring and metrics system for the adaptive P2P network
15//!
16//! This module provides comprehensive monitoring capabilities:
17//! - Prometheus metrics export for external monitoring
18//! - Real-time anomaly detection using statistical analysis
19//! - Network health dashboards with key performance indicators
20//! - Alert system for critical conditions
21//! - Performance profiling for bottleneck detection
22//! - Debug logging with configurable levels
23
24use super::*;
25use crate::adaptive::{
26    AdaptiveGossipSub, AdaptiveRouter, ChurnHandler, ContentStore, ReplicationManager,
27    learning::{QLearnCacheManager, ThompsonSampling},
28};
29use anyhow::Result;
30#[cfg(feature = "metrics")]
31use prometheus::{
32    Counter, Encoder, Gauge, Histogram, IntCounter, IntGauge, Registry, TextEncoder,
33    register_counter, register_gauge, register_histogram, register_int_counter, register_int_gauge,
34};
35use std::{
36    collections::{HashMap, VecDeque},
37    sync::Arc,
38    time::{Duration, Instant},
39};
40use tokio::sync::{RwLock, mpsc};
41
42/// Monitoring system for the adaptive P2P network
43pub struct MonitoringSystem {
44    #[cfg(feature = "metrics")]
45    /// Prometheus registry for metrics
46    registry: Arc<Registry>,
47
48    /// Core metrics
49    metrics: Arc<NetworkMetrics>,
50
51    /// Anomaly detector
52    anomaly_detector: Arc<AnomalyDetector>,
53
54    /// Alert manager
55    alert_manager: Arc<AlertManager>,
56
57    /// Performance profiler
58    profiler: Arc<PerformanceProfiler>,
59
60    /// Debug logger
61    logger: Arc<DebugLogger>,
62
63    /// Network components to monitor
64    components: Arc<MonitoredComponents>,
65
66    /// Configuration
67    config: MonitoringConfig,
68}
69
70/// Configuration for monitoring system
71#[derive(Debug, Clone)]
72pub struct MonitoringConfig {
73    /// Metrics collection interval
74    pub collection_interval: Duration,
75
76    /// Anomaly detection window size
77    pub anomaly_window_size: usize,
78
79    /// Alert cooldown period
80    pub alert_cooldown: Duration,
81
82    /// Performance sampling rate (0.0-1.0)
83    pub profiling_sample_rate: f64,
84
85    /// Debug log level
86    pub log_level: LogLevel,
87
88    /// Dashboard update interval
89    pub dashboard_interval: Duration,
90}
91
92impl Default for MonitoringConfig {
93    fn default() -> Self {
94        Self {
95            collection_interval: Duration::from_secs(5),
96            anomaly_window_size: 100,
97            alert_cooldown: Duration::from_secs(300),
98            profiling_sample_rate: 0.01,
99            log_level: LogLevel::Info,
100            dashboard_interval: Duration::from_secs(1),
101        }
102    }
103}
104
105/// Log levels for debug logging
106#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
107pub enum LogLevel {
108    Error,
109    Warn,
110    Info,
111    Debug,
112    Trace,
113}
114
115/// Core network metrics exposed via Prometheus
116#[allow(dead_code)]
117pub(crate) struct NetworkMetrics {
118    #[cfg(feature = "metrics")]
119    // Node metrics
120    connected_nodes: IntGauge,
121    #[cfg(feature = "metrics")]
122    active_nodes: IntGauge,
123    #[cfg(feature = "metrics")]
124    suspicious_nodes: IntGauge,
125    #[cfg(feature = "metrics")]
126    failed_nodes: IntGauge,
127
128    #[cfg(feature = "metrics")]
129    // Routing metrics
130    routing_requests: Counter,
131    #[cfg(feature = "metrics")]
132    routing_success: Counter,
133    #[cfg(feature = "metrics")]
134    routing_latency: Histogram,
135
136    #[cfg(feature = "metrics")]
137    // Storage metrics
138    stored_items: IntGauge,
139    #[cfg(feature = "metrics")]
140    storage_bytes: IntGauge,
141    #[cfg(feature = "metrics")]
142    replication_factor: Gauge,
143
144    #[cfg(feature = "metrics")]
145    // Network traffic metrics
146    messages_sent: Counter,
147    #[cfg(feature = "metrics")]
148    messages_received: Counter,
149    #[cfg(feature = "metrics")]
150    bytes_sent: Counter,
151    #[cfg(feature = "metrics")]
152    bytes_received: Counter,
153
154    #[cfg(feature = "metrics")]
155    // Cache metrics
156    cache_hits: Counter,
157    #[cfg(feature = "metrics")]
158    cache_misses: Counter,
159    #[cfg(feature = "metrics")]
160    cache_size: IntGauge,
161    #[cfg(feature = "metrics")]
162    cache_evictions: Counter,
163
164    #[cfg(feature = "metrics")]
165    // Learning metrics
166    thompson_selections: IntCounter,
167    #[cfg(feature = "metrics")]
168    qlearn_updates: Counter,
169    #[cfg(feature = "metrics")]
170    churn_predictions: Counter,
171
172    #[cfg(feature = "metrics")]
173    // Gossip metrics
174    gossip_messages: Counter,
175    #[cfg(feature = "metrics")]
176    mesh_size: IntGauge,
177    #[cfg(feature = "metrics")]
178    topic_count: IntGauge,
179
180    #[cfg(feature = "metrics")]
181    // Performance metrics
182    cpu_usage: Gauge,
183    #[cfg(feature = "metrics")]
184    memory_usage: IntGauge,
185    #[cfg(feature = "metrics")]
186    thread_count: IntGauge,
187
188    #[cfg(not(feature = "metrics"))]
189    // Placeholder for when metrics are disabled
190    _placeholder: (),
191}
192
193/// Components being monitored
194pub struct MonitoredComponents {
195    pub router: Arc<AdaptiveRouter>,
196    pub churn_handler: Arc<ChurnHandler>,
197    pub gossip: Arc<AdaptiveGossipSub>,
198    pub storage: Arc<ContentStore>,
199    pub replication: Arc<ReplicationManager>,
200    pub thompson: Arc<ThompsonSampling>,
201    pub cache: Arc<QLearnCacheManager>,
202}
203
204/// Anomaly detection system
205pub struct AnomalyDetector {
206    /// Historical data for each metric
207    history: Arc<RwLock<HashMap<String, MetricHistory>>>,
208
209    /// Detected anomalies
210    anomalies: Arc<RwLock<Vec<Anomaly>>>,
211
212    /// Configuration
213    window_size: usize,
214}
215
216/// Historical data for a metric
217struct MetricHistory {
218    /// Sliding window of values
219    values: VecDeque<f64>,
220
221    /// Running statistics
222    mean: f64,
223    std_dev: f64,
224
225    /// Last update time
226    last_update: Instant,
227}
228
229/// Detected anomaly
230#[derive(Debug, Clone)]
231pub struct Anomaly {
232    /// Metric name
233    pub metric: String,
234
235    /// Anomaly type
236    pub anomaly_type: AnomalyType,
237
238    /// Severity (0.0-1.0)
239    pub severity: f64,
240
241    /// Detection time
242    pub detected_at: Instant,
243
244    /// Current value
245    pub value: f64,
246
247    /// Expected range
248    pub expected_range: (f64, f64),
249}
250
251/// Types of anomalies
252#[derive(Debug, Clone, PartialEq)]
253pub enum AnomalyType {
254    /// Value outside statistical bounds
255    Statistical,
256
257    /// Sudden spike in value
258    Spike,
259
260    /// Gradual drift from normal
261    Drift,
262
263    /// Unusual pattern
264    Pattern,
265}
266
267/// Alert management system
268pub struct AlertManager {
269    /// Active alerts
270    active_alerts: Arc<RwLock<HashMap<String, Alert>>>,
271
272    /// Alert rules
273    rules: Arc<RwLock<Vec<AlertRule>>>,
274
275    /// Alert channels
276    channels: Arc<RwLock<Vec<Box<dyn AlertChannel>>>>,
277
278    /// Cooldown tracking
279    cooldowns: Arc<RwLock<HashMap<String, Instant>>>,
280
281    /// Configuration
282    cooldown_period: Duration,
283}
284
285/// Alert definition
286#[derive(Debug, Clone)]
287pub struct Alert {
288    /// Alert ID
289    pub id: String,
290
291    /// Alert name
292    pub name: String,
293
294    /// Severity level
295    pub severity: AlertSeverity,
296
297    /// Alert message
298    pub message: String,
299
300    /// Triggered at
301    pub triggered_at: Instant,
302
303    /// Associated metrics
304    pub metrics: HashMap<String, f64>,
305}
306
307/// Alert severity levels
308#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
309pub enum AlertSeverity {
310    Info,
311    Warning,
312    Critical,
313}
314
315/// Alert rule definition
316#[derive(Debug, Clone)]
317pub struct AlertRule {
318    /// Rule name
319    pub name: String,
320
321    /// Condition to check
322    pub condition: AlertCondition,
323
324    /// Severity if triggered
325    pub severity: AlertSeverity,
326
327    /// Message template
328    pub message_template: String,
329}
330
331/// Alert conditions
332#[derive(Debug, Clone)]
333pub enum AlertCondition {
334    /// Metric above threshold
335    Above { metric: String, threshold: f64 },
336
337    /// Metric below threshold
338    Below { metric: String, threshold: f64 },
339
340    /// Metric rate of change
341    RateOfChange { metric: String, threshold: f64 },
342
343    /// Anomaly detected
344    AnomalyDetected { metric: String },
345}
346
347/// Alert channel trait
348#[async_trait]
349pub trait AlertChannel: Send + Sync {
350    /// Send an alert
351    async fn send_alert(&self, alert: &Alert) -> Result<()>;
352}
353
354/// Performance profiler
355pub struct PerformanceProfiler {
356    /// Active profiles
357    profiles: Arc<RwLock<HashMap<String, Profile>>>,
358
359    /// Completed profiles
360    completed: Arc<RwLock<VecDeque<CompletedProfile>>>,
361
362    /// Sampling rate
363    sample_rate: f64,
364}
365
366/// Active performance profile
367struct Profile {
368    /// Profile name
369    name: String,
370
371    /// Start time
372    started_at: Instant,
373
374    /// Samples collected
375    samples: Vec<ProfileSample>,
376}
377
378/// Profile sample
379#[derive(Debug, Clone)]
380struct ProfileSample {
381    /// Timestamp
382    _timestamp: Instant,
383
384    /// CPU usage
385    cpu_usage: f64,
386
387    /// Memory usage
388    memory_bytes: u64,
389
390    /// Active operations
391    _operations: HashMap<String, u64>,
392}
393
394/// Completed profile
395#[derive(Debug, Clone)]
396pub struct CompletedProfile {
397    /// Profile name
398    pub name: String,
399
400    /// Duration
401    pub duration: Duration,
402
403    /// Average CPU usage
404    pub avg_cpu: f64,
405
406    /// Peak memory usage
407    pub peak_memory: u64,
408
409    /// Operation counts
410    pub operations: HashMap<String, u64>,
411}
412
413/// Debug logger
414pub struct DebugLogger {
415    /// Log level
416    level: LogLevel,
417
418    /// Log buffer
419    buffer: Arc<RwLock<VecDeque<LogEntry>>>,
420
421    /// Log channels
422    channels: Arc<RwLock<Vec<mpsc::UnboundedSender<LogEntry>>>>,
423}
424
425/// Log entry
426#[derive(Debug, Clone)]
427pub struct LogEntry {
428    /// Timestamp
429    pub timestamp: Instant,
430
431    /// Log level
432    pub level: LogLevel,
433
434    /// Component
435    pub component: String,
436
437    /// Message
438    pub message: String,
439
440    /// Associated data
441    pub data: Option<serde_json::Value>,
442}
443
444impl MonitoringSystem {
445    /// Create a new monitoring system
446    pub fn new(components: MonitoredComponents, config: MonitoringConfig) -> Result<Self> {
447        Self::new_with_registry(components, config, None)
448    }
449
450    /// Create a new monitoring system with a custom registry (for testing)
451    pub fn new_with_registry(
452        components: MonitoredComponents,
453        config: MonitoringConfig,
454        #[cfg(feature = "metrics")] custom_registry: Option<Registry>,
455        #[cfg(not(feature = "metrics"))] _custom_registry: Option<()>,
456    ) -> Result<Self> {
457        // Generate unique metric names for tests to avoid conflicts
458        #[cfg(feature = "metrics")]
459        let is_test = custom_registry.is_some();
460        #[cfg(feature = "metrics")]
461        let metric_prefix = if is_test {
462            format!("p2p_test_{}_", std::process::id())
463        } else {
464            "p2p_".to_string()
465        };
466
467        #[cfg(feature = "metrics")]
468        let registry = custom_registry.unwrap_or_default();
469
470        // Initialize metrics - use custom registry if provided
471        #[cfg(feature = "metrics")]
472        let metrics = if is_test {
473            // For tests, register metrics with the custom registry
474            use prometheus::{Counter, Gauge, Histogram, HistogramOpts, IntCounter, IntGauge};
475
476            // Node metrics
477            let connected_nodes = IntGauge::new(
478                format!("{}connected_nodes", metric_prefix),
479                "Number of connected nodes",
480            )?;
481            registry.register(Box::new(connected_nodes.clone()))?;
482
483            let active_nodes = IntGauge::new(
484                format!("{}active_nodes", metric_prefix),
485                "Number of active nodes",
486            )?;
487            registry.register(Box::new(active_nodes.clone()))?;
488
489            let suspicious_nodes = IntGauge::new(
490                format!("{}suspicious_nodes", metric_prefix),
491                "Number of suspicious nodes",
492            )?;
493            registry.register(Box::new(suspicious_nodes.clone()))?;
494
495            let failed_nodes = IntGauge::new(
496                format!("{}failed_nodes", metric_prefix),
497                "Number of failed nodes",
498            )?;
499            registry.register(Box::new(failed_nodes.clone()))?;
500
501            // Routing metrics
502            let routing_requests = Counter::new(
503                format!("{}routing_requests_total", metric_prefix),
504                "Total routing requests",
505            )?;
506            registry.register(Box::new(routing_requests.clone()))?;
507
508            let routing_success = Counter::new(
509                format!("{}routing_success_total", metric_prefix),
510                "Successful routing requests",
511            )?;
512            registry.register(Box::new(routing_success.clone()))?;
513
514            let routing_latency = Histogram::with_opts(HistogramOpts::new(
515                format!("{}routing_latency_seconds", metric_prefix),
516                "Routing request latency in seconds",
517            ))?;
518            registry.register(Box::new(routing_latency.clone()))?;
519
520            // Storage metrics
521            let stored_items = IntGauge::new(
522                format!("{}stored_items", metric_prefix),
523                "Number of stored items",
524            )?;
525            registry.register(Box::new(stored_items.clone()))?;
526
527            let storage_bytes = IntGauge::new(
528                format!("{}storage_bytes", metric_prefix),
529                "Total storage in bytes",
530            )?;
531            registry.register(Box::new(storage_bytes.clone()))?;
532
533            let replication_factor = Gauge::new(
534                format!("{}replication_factor", metric_prefix),
535                "Average replication factor",
536            )?;
537            registry.register(Box::new(replication_factor.clone()))?;
538
539            // Network traffic metrics
540            let messages_sent = Counter::new(
541                format!("{}messages_sent_total", metric_prefix),
542                "Total messages sent",
543            )?;
544            registry.register(Box::new(messages_sent.clone()))?;
545
546            let messages_received = Counter::new(
547                format!("{}messages_received_total", metric_prefix),
548                "Total messages received",
549            )?;
550            registry.register(Box::new(messages_received.clone()))?;
551
552            let bytes_sent = Counter::new(
553                format!("{}bytes_sent_total", metric_prefix),
554                "Total bytes sent",
555            )?;
556            registry.register(Box::new(bytes_sent.clone()))?;
557
558            let bytes_received = Counter::new(
559                format!("{}bytes_received_total", metric_prefix),
560                "Total bytes received",
561            )?;
562            registry.register(Box::new(bytes_received.clone()))?;
563
564            // Cache metrics
565            let cache_hits = Counter::new(
566                format!("{}cache_hits_total", metric_prefix),
567                "Total cache hits",
568            )?;
569            registry.register(Box::new(cache_hits.clone()))?;
570
571            let cache_misses = Counter::new(
572                format!("{}cache_misses_total", metric_prefix),
573                "Total cache misses",
574            )?;
575            registry.register(Box::new(cache_misses.clone()))?;
576
577            let cache_size = IntGauge::new(
578                format!("{}cache_size_bytes", metric_prefix),
579                "Cache size in bytes",
580            )?;
581            registry.register(Box::new(cache_size.clone()))?;
582
583            let cache_evictions = Counter::new(
584                format!("{}cache_evictions_total", metric_prefix),
585                "Total cache evictions",
586            )?;
587            registry.register(Box::new(cache_evictions.clone()))?;
588
589            // Learning metrics
590            let thompson_selections = IntCounter::new(
591                format!("{}thompson_selections_total", metric_prefix),
592                "Thompson sampling strategy selections",
593            )?;
594            registry.register(Box::new(thompson_selections.clone()))?;
595
596            let qlearn_updates = Counter::new(
597                format!("{}qlearn_updates_total", metric_prefix),
598                "Q-learning updates",
599            )?;
600            registry.register(Box::new(qlearn_updates.clone()))?;
601
602            let churn_predictions = Counter::new(
603                format!("{}churn_predictions_total", metric_prefix),
604                "Churn predictions made",
605            )?;
606            registry.register(Box::new(churn_predictions.clone()))?;
607
608            // Gossip metrics
609            let gossip_messages = Counter::new(
610                format!("{}gossip_messages_total", metric_prefix),
611                "Total gossip messages",
612            )?;
613            registry.register(Box::new(gossip_messages.clone()))?;
614
615            let mesh_size = IntGauge::new(
616                format!("{}gossip_mesh_size", metric_prefix),
617                "Gossip mesh size",
618            )?;
619            registry.register(Box::new(mesh_size.clone()))?;
620
621            let topic_count = IntGauge::new(
622                format!("{}gossip_topics", metric_prefix),
623                "Number of gossip topics",
624            )?;
625            registry.register(Box::new(topic_count.clone()))?;
626
627            // Performance metrics
628            let cpu_usage = Gauge::new(
629                format!("{}cpu_usage_percent", metric_prefix),
630                "CPU usage percentage",
631            )?;
632            registry.register(Box::new(cpu_usage.clone()))?;
633
634            let memory_usage = IntGauge::new(
635                format!("{}memory_usage_bytes", metric_prefix),
636                "Memory usage in bytes",
637            )?;
638            registry.register(Box::new(memory_usage.clone()))?;
639
640            let thread_count = IntGauge::new(
641                format!("{}thread_count", metric_prefix),
642                "Number of threads",
643            )?;
644            registry.register(Box::new(thread_count.clone()))?;
645
646            NetworkMetrics {
647                connected_nodes,
648                active_nodes,
649                suspicious_nodes,
650                failed_nodes,
651                routing_requests,
652                routing_success,
653                routing_latency,
654                stored_items,
655                storage_bytes,
656                replication_factor,
657                messages_sent,
658                messages_received,
659                bytes_sent,
660                bytes_received,
661                cache_hits,
662                cache_misses,
663                cache_size,
664                cache_evictions,
665                thompson_selections,
666                qlearn_updates,
667                churn_predictions,
668                gossip_messages,
669                mesh_size,
670                topic_count,
671                cpu_usage,
672                memory_usage,
673                thread_count,
674            }
675        } else {
676            // For production, use the global registry macros
677            NetworkMetrics {
678                // Node metrics
679                connected_nodes: register_int_gauge!(
680                    &format!("{}connected_nodes", metric_prefix),
681                    "Number of connected nodes"
682                )?,
683                active_nodes: register_int_gauge!(
684                    &format!("{}active_nodes", metric_prefix),
685                    "Number of active nodes"
686                )?,
687                suspicious_nodes: register_int_gauge!(
688                    &format!("{}suspicious_nodes", metric_prefix),
689                    "Number of suspicious nodes"
690                )?,
691                failed_nodes: register_int_gauge!(
692                    &format!("{}failed_nodes", metric_prefix),
693                    "Number of failed nodes"
694                )?,
695
696                // Routing metrics
697                routing_requests: register_counter!(
698                    &format!("{}routing_requests_total", metric_prefix),
699                    "Total routing requests"
700                )?,
701                routing_success: register_counter!(
702                    &format!("{}routing_success_total", metric_prefix),
703                    "Successful routing requests"
704                )?,
705                routing_latency: register_histogram!(
706                    &format!("{}routing_latency_seconds", metric_prefix),
707                    "Routing request latency in seconds"
708                )?,
709
710                // Storage metrics
711                stored_items: register_int_gauge!(
712                    &format!("{}stored_items", metric_prefix),
713                    "Number of stored items"
714                )?,
715                storage_bytes: register_int_gauge!(
716                    &format!("{}storage_bytes", metric_prefix),
717                    "Total storage in bytes"
718                )?,
719                replication_factor: register_gauge!(
720                    &format!("{}replication_factor", metric_prefix),
721                    "Average replication factor"
722                )?,
723
724                // Network traffic metrics
725                messages_sent: register_counter!(
726                    &format!("{}messages_sent_total", metric_prefix),
727                    "Total messages sent"
728                )?,
729                messages_received: register_counter!(
730                    &format!("{}messages_received_total", metric_prefix),
731                    "Total messages received"
732                )?,
733                bytes_sent: register_counter!(
734                    &format!("{}bytes_sent_total", metric_prefix),
735                    "Total bytes sent"
736                )?,
737                bytes_received: register_counter!(
738                    &format!("{}bytes_received_total", metric_prefix),
739                    "Total bytes received"
740                )?,
741
742                // Cache metrics
743                cache_hits: register_counter!(
744                    &format!("{}cache_hits_total", metric_prefix),
745                    "Total cache hits"
746                )?,
747                cache_misses: register_counter!(
748                    &format!("{}cache_misses_total", metric_prefix),
749                    "Total cache misses"
750                )?,
751                cache_size: register_int_gauge!(
752                    &format!("{}cache_size_bytes", metric_prefix),
753                    "Cache size in bytes"
754                )?,
755                cache_evictions: register_counter!(
756                    &format!("{}cache_evictions_total", metric_prefix),
757                    "Total cache evictions"
758                )?,
759
760                // Learning metrics
761                thompson_selections: register_int_counter!(
762                    &format!("{}thompson_selections_total", metric_prefix),
763                    "Thompson sampling strategy selections"
764                )?,
765                qlearn_updates: register_counter!(
766                    &format!("{}qlearn_updates_total", metric_prefix),
767                    "Q-learning updates"
768                )?,
769                churn_predictions: register_counter!(
770                    &format!("{}churn_predictions_total", metric_prefix),
771                    "Churn predictions made"
772                )?,
773
774                // Gossip metrics
775                gossip_messages: register_counter!(
776                    &format!("{}gossip_messages_total", metric_prefix),
777                    "Total gossip messages"
778                )?,
779                mesh_size: register_int_gauge!(
780                    &format!("{}gossip_mesh_size", metric_prefix),
781                    "Gossip mesh size"
782                )?,
783                topic_count: register_int_gauge!(
784                    &format!("{}gossip_topics", metric_prefix),
785                    "Number of gossip topics"
786                )?,
787
788                // Performance metrics
789                cpu_usage: register_gauge!(
790                    &format!("{}cpu_usage_percent", metric_prefix),
791                    "CPU usage percentage"
792                )?,
793                memory_usage: register_int_gauge!(
794                    &format!("{}memory_usage_bytes", metric_prefix),
795                    "Memory usage in bytes"
796                )?,
797                thread_count: register_int_gauge!(
798                    &format!("{}thread_count", metric_prefix),
799                    "Number of threads"
800                )?,
801            }
802        };
803
804        #[cfg(not(feature = "metrics"))]
805        let metrics = NetworkMetrics { _placeholder: () };
806
807        let anomaly_detector = Arc::new(AnomalyDetector::new(config.anomaly_window_size));
808        let alert_manager = Arc::new(AlertManager::new(config.alert_cooldown));
809        let profiler = Arc::new(PerformanceProfiler::new(config.profiling_sample_rate));
810        let logger = Arc::new(DebugLogger::new(config.log_level));
811
812        // Set up default alert rules
813        let monitoring = Self {
814            #[cfg(feature = "metrics")]
815            registry: Arc::new(registry),
816            metrics: Arc::new(metrics),
817            anomaly_detector,
818            alert_manager,
819            profiler,
820            logger,
821            components: Arc::new(components),
822            config,
823        };
824
825        Ok(monitoring)
826    }
827
828    /// Start monitoring
829    pub async fn start(&self) {
830        let interval = self.config.collection_interval;
831        let monitoring = self.clone_for_task();
832
833        tokio::spawn(async move {
834            let mut interval = tokio::time::interval(interval);
835
836            loop {
837                interval.tick().await;
838
839                if let Err(e) = monitoring.collect_metrics().await {
840                    // Log error but continue monitoring
841                    monitoring
842                        .logger
843                        .error("monitoring", &format!("Metric collection error: {e}"))
844                        .await;
845                }
846            }
847        });
848
849        // Start anomaly detection
850        self.start_anomaly_detection().await;
851
852        // Start alert processing
853        self.start_alert_processing().await;
854    }
855
856    /// Collect metrics from all components
857    #[allow(unused_variables)]
858    async fn collect_metrics(&self) -> Result<()> {
859        // Collect churn statistics
860        let churn_stats = self.components.churn_handler.get_stats().await;
861
862        #[cfg(feature = "metrics")]
863        {
864            self.metrics
865                .active_nodes
866                .set(churn_stats.active_nodes as i64);
867            self.metrics
868                .suspicious_nodes
869                .set(churn_stats.suspicious_nodes as i64);
870            self.metrics
871                .failed_nodes
872                .set(churn_stats.failed_nodes as i64);
873        }
874
875        // Collect routing statistics
876        let routing_stats = self.components.router.get_stats().await;
877
878        #[cfg(feature = "metrics")]
879        {
880            self.metrics
881                .routing_requests
882                .inc_by(routing_stats.total_requests as f64);
883            self.metrics
884                .routing_success
885                .inc_by(routing_stats.successful_requests as f64);
886        }
887
888        // Collect storage statistics
889        let storage_stats = self.components.storage.get_stats().await;
890
891        #[cfg(feature = "metrics")]
892        {
893            self.metrics
894                .stored_items
895                .set(storage_stats.total_items as i64);
896            self.metrics
897                .storage_bytes
898                .set(storage_stats.total_bytes as i64);
899        }
900
901        // Collect gossip statistics
902        let gossip_stats = self.components.gossip.get_stats().await;
903
904        #[cfg(feature = "metrics")]
905        {
906            self.metrics
907                .gossip_messages
908                .inc_by(gossip_stats.messages_sent as f64);
909            self.metrics.mesh_size.set(gossip_stats.mesh_size as i64);
910            self.metrics
911                .topic_count
912                .set(gossip_stats.topic_count as i64);
913        }
914
915        // Collect cache statistics
916        let cache_stats = self.components.cache.get_stats();
917
918        #[cfg(feature = "metrics")]
919        {
920            self.metrics.cache_hits.inc_by(cache_stats.hits as f64);
921            self.metrics.cache_misses.inc_by(cache_stats.misses as f64);
922            self.metrics.cache_size.set(cache_stats.size_bytes as i64);
923        }
924
925        // Update anomaly detector
926        self.update_anomaly_detector().await?;
927
928        Ok(())
929    }
930
931    /// Export metrics in Prometheus format
932    pub fn export_metrics(&self) -> Result<String> {
933        #[cfg(feature = "metrics")]
934        {
935            let encoder = TextEncoder::new();
936            let metric_families = self.registry.gather();
937            let mut buffer = Vec::new();
938            encoder.encode(&metric_families, &mut buffer)?;
939            String::from_utf8(buffer).map_err(|e| anyhow::anyhow!("UTF-8 error: {}", e))
940        }
941
942        #[cfg(not(feature = "metrics"))]
943        {
944            Ok("# Metrics disabled\n".to_string())
945        }
946    }
947
948    /// Get current network health
949    pub async fn get_health(&self) -> NetworkHealth {
950        let churn_stats = self.components.churn_handler.get_stats().await;
951        let routing_stats = self.components.router.get_stats().await;
952        let storage_stats = self.components.storage.get_stats().await;
953
954        let health_score =
955            self.calculate_health_score(&churn_stats, &routing_stats, &storage_stats);
956
957        NetworkHealth {
958            score: health_score,
959            status: if health_score > 0.8 {
960                HealthStatus::Healthy
961            } else if health_score > 0.5 {
962                HealthStatus::Degraded
963            } else {
964                HealthStatus::Critical
965            },
966            active_nodes: churn_stats.active_nodes,
967            churn_rate: churn_stats.churn_rate,
968            routing_success_rate: routing_stats.success_rate(),
969            storage_utilization: storage_stats.utilization(),
970            active_alerts: self.alert_manager.get_active_alerts().await.len(),
971        }
972    }
973
974    /// Calculate overall health score
975    fn calculate_health_score(
976        &self,
977        churn_stats: &crate::adaptive::churn::ChurnStats,
978        routing_stats: &crate::adaptive::routing::RoutingStats,
979        storage_stats: &crate::adaptive::storage::StorageStats,
980    ) -> f64 {
981        let mut score = 1.0;
982
983        // Penalize high churn
984        if churn_stats.churn_rate > 0.3 {
985            score *= 0.7;
986        } else if churn_stats.churn_rate > 0.1 {
987            score *= 0.9;
988        }
989
990        // Penalize low routing success
991        let routing_success = routing_stats.success_rate();
992        if routing_success < 0.9 {
993            score *= routing_success;
994        }
995
996        // Penalize high storage utilization
997        let storage_util = storage_stats.utilization();
998        if storage_util > 0.9 {
999            score *= 0.8;
1000        }
1001
1002        score
1003    }
1004
1005    /// Start anomaly detection
1006    async fn start_anomaly_detection(&self) {
1007        let detector = self.anomaly_detector.clone();
1008        let alert_manager = self.alert_manager.clone();
1009        let logger = self.logger.clone();
1010
1011        tokio::spawn(async move {
1012            let mut interval = tokio::time::interval(Duration::from_secs(10));
1013
1014            loop {
1015                interval.tick().await;
1016
1017                let anomalies = detector.get_recent_anomalies().await;
1018                for anomaly in anomalies {
1019                    // Log anomaly
1020                    logger
1021                        .warn(
1022                            "anomaly_detector",
1023                            &format!("Anomaly detected: {anomaly:?}"),
1024                        )
1025                        .await;
1026
1027                    // Create alert if severe
1028                    if anomaly.severity > 0.7 {
1029                        let alert = Alert {
1030                            id: format!("anomaly_{}", anomaly.metric),
1031                            name: format!("{} Anomaly", anomaly.metric),
1032                            severity: AlertSeverity::Warning,
1033                            message: format!(
1034                                "Anomaly detected in {}: value {} outside expected range {:?}",
1035                                anomaly.metric, anomaly.value, anomaly.expected_range
1036                            ),
1037                            triggered_at: Instant::now(),
1038                            metrics: HashMap::from([(anomaly.metric.clone(), anomaly.value)]),
1039                        };
1040
1041                        let _ = alert_manager.trigger_alert(alert).await;
1042                    }
1043                }
1044            }
1045        });
1046    }
1047
1048    /// Start alert processing
1049    async fn start_alert_processing(&self) {
1050        let alert_manager = self.alert_manager.clone();
1051
1052        // Add default alert rules
1053        let rules = vec![
1054            AlertRule {
1055                name: "High Churn Rate".to_string(),
1056                condition: AlertCondition::Above {
1057                    metric: "churn_rate".to_string(),
1058                    threshold: 0.5,
1059                },
1060                severity: AlertSeverity::Critical,
1061                message_template: "Churn rate is critically high: {value}".to_string(),
1062            },
1063            AlertRule {
1064                name: "Low Routing Success".to_string(),
1065                condition: AlertCondition::Below {
1066                    metric: "routing_success_rate".to_string(),
1067                    threshold: 0.8,
1068                },
1069                severity: AlertSeverity::Warning,
1070                message_template: "Routing success rate is low: {value}".to_string(),
1071            },
1072            AlertRule {
1073                name: "Storage Near Capacity".to_string(),
1074                condition: AlertCondition::Above {
1075                    metric: "storage_utilization".to_string(),
1076                    threshold: 0.9,
1077                },
1078                severity: AlertSeverity::Warning,
1079                message_template: "Storage utilization is high: {value}".to_string(),
1080            },
1081        ];
1082
1083        for rule in rules {
1084            let _ = alert_manager.add_rule(rule).await;
1085        }
1086
1087        // Start rule evaluation
1088        tokio::spawn(async move {
1089            let mut interval = tokio::time::interval(Duration::from_secs(30));
1090
1091            loop {
1092                interval.tick().await;
1093                let _ = alert_manager.evaluate_rules().await;
1094            }
1095        });
1096    }
1097
1098    /// Update anomaly detector with current metrics
1099    async fn update_anomaly_detector(&self) -> Result<()> {
1100        // Update with key metrics
1101        let churn_stats = self.components.churn_handler.get_stats().await;
1102        self.anomaly_detector
1103            .update_metric("churn_rate", churn_stats.churn_rate)
1104            .await;
1105
1106        let routing_stats = self.components.router.get_stats().await;
1107        self.anomaly_detector
1108            .update_metric("routing_success_rate", routing_stats.success_rate())
1109            .await;
1110
1111        let storage_stats = self.components.storage.get_stats().await;
1112        self.anomaly_detector
1113            .update_metric("storage_utilization", storage_stats.utilization())
1114            .await;
1115
1116        Ok(())
1117    }
1118
1119    /// Get dashboard data
1120    pub async fn get_dashboard_data(&self) -> DashboardData {
1121        DashboardData {
1122            health: self.get_health().await,
1123            metrics: self.get_current_metrics().await,
1124            recent_alerts: self.alert_manager.get_recent_alerts(10).await,
1125            anomalies: self.anomaly_detector.get_recent_anomalies().await,
1126            performance: self.profiler.get_current_profile().await,
1127        }
1128    }
1129
1130    /// Get current metric values
1131    async fn get_current_metrics(&self) -> HashMap<String, f64> {
1132        let mut metrics = HashMap::new();
1133
1134        // Collect current values
1135        let churn_stats = self.components.churn_handler.get_stats().await;
1136        metrics.insert("active_nodes".to_string(), churn_stats.active_nodes as f64);
1137        metrics.insert("churn_rate".to_string(), churn_stats.churn_rate);
1138
1139        let routing_stats = self.components.router.get_stats().await;
1140        metrics.insert(
1141            "routing_success_rate".to_string(),
1142            routing_stats.success_rate(),
1143        );
1144
1145        let storage_stats = self.components.storage.get_stats().await;
1146        metrics.insert(
1147            "storage_items".to_string(),
1148            storage_stats.total_items as f64,
1149        );
1150        metrics.insert(
1151            "storage_bytes".to_string(),
1152            storage_stats.total_bytes as f64,
1153        );
1154
1155        metrics
1156    }
1157
1158    /// Clone for spawning tasks
1159    fn clone_for_task(&self) -> Self {
1160        Self {
1161            #[cfg(feature = "metrics")]
1162            registry: self.registry.clone(),
1163            metrics: self.metrics.clone(),
1164            anomaly_detector: self.anomaly_detector.clone(),
1165            alert_manager: self.alert_manager.clone(),
1166            profiler: self.profiler.clone(),
1167            logger: self.logger.clone(),
1168            components: self.components.clone(),
1169            config: self.config.clone(),
1170        }
1171    }
1172}
1173
1174impl AnomalyDetector {
1175    /// Create a new anomaly detector
1176    pub fn new(window_size: usize) -> Self {
1177        Self {
1178            history: Arc::new(RwLock::new(HashMap::new())),
1179            anomalies: Arc::new(RwLock::new(Vec::new())),
1180            window_size,
1181        }
1182    }
1183
1184    /// Update a metric value
1185    pub async fn update_metric(&self, metric: &str, value: f64) {
1186        let mut history = self.history.write().await;
1187
1188        let metric_history = history
1189            .entry(metric.to_string())
1190            .or_insert_with(|| MetricHistory {
1191                values: VecDeque::new(),
1192                mean: 0.0,
1193                std_dev: 0.0,
1194                last_update: Instant::now(),
1195            });
1196
1197        // Add value to sliding window
1198        metric_history.values.push_back(value);
1199        if metric_history.values.len() > self.window_size {
1200            metric_history.values.pop_front();
1201        }
1202
1203        // Update statistics
1204        if metric_history.values.len() >= 10 {
1205            let sum: f64 = metric_history.values.iter().sum();
1206            metric_history.mean = sum / metric_history.values.len() as f64;
1207
1208            let variance: f64 = metric_history
1209                .values
1210                .iter()
1211                .map(|v| (v - metric_history.mean).powi(2))
1212                .sum::<f64>()
1213                / metric_history.values.len() as f64;
1214            metric_history.std_dev = variance.sqrt();
1215
1216            // Check for anomalies
1217            if let Some(anomaly) = self.detect_anomaly(metric, value, metric_history) {
1218                let mut anomalies = self.anomalies.write().await;
1219                anomalies.push(anomaly);
1220
1221                // Keep only recent anomalies
1222                if anomalies.len() > 1000 {
1223                    anomalies.drain(0..100);
1224                }
1225            }
1226        }
1227
1228        metric_history.last_update = Instant::now();
1229    }
1230
1231    /// Detect anomaly in metric
1232    fn detect_anomaly(&self, metric: &str, value: f64, history: &MetricHistory) -> Option<Anomaly> {
1233        // Statistical anomaly detection (3-sigma rule)
1234        let z_score = (value - history.mean).abs() / history.std_dev;
1235        if z_score > 3.0 {
1236            return Some(Anomaly {
1237                metric: metric.to_string(),
1238                anomaly_type: AnomalyType::Statistical,
1239                severity: (z_score - 3.0).min(1.0),
1240                detected_at: Instant::now(),
1241                value,
1242                expected_range: (
1243                    history.mean - 3.0 * history.std_dev,
1244                    history.mean + 3.0 * history.std_dev,
1245                ),
1246            });
1247        }
1248
1249        // Spike detection
1250        if history.values.len() >= 2 {
1251            let prev_value = history.values[history.values.len() - 2];
1252            let change_rate = (value - prev_value).abs() / prev_value.abs().max(1.0);
1253
1254            if change_rate > 0.5 {
1255                return Some(Anomaly {
1256                    metric: metric.to_string(),
1257                    anomaly_type: AnomalyType::Spike,
1258                    severity: change_rate.min(1.0),
1259                    detected_at: Instant::now(),
1260                    value,
1261                    expected_range: (prev_value * 0.5, prev_value * 1.5),
1262                });
1263            }
1264        }
1265
1266        None
1267    }
1268
1269    /// Get recent anomalies
1270    pub async fn get_recent_anomalies(&self) -> Vec<Anomaly> {
1271        let anomalies = self.anomalies.read().await;
1272        let cutoff = Instant::now() - Duration::from_secs(300);
1273
1274        anomalies
1275            .iter()
1276            .filter(|a| a.detected_at > cutoff)
1277            .cloned()
1278            .collect()
1279    }
1280}
1281
1282impl AlertManager {
1283    /// Create a new alert manager
1284    pub fn new(cooldown_period: Duration) -> Self {
1285        Self {
1286            active_alerts: Arc::new(RwLock::new(HashMap::new())),
1287            rules: Arc::new(RwLock::new(Vec::new())),
1288            channels: Arc::new(RwLock::new(Vec::new())),
1289            cooldowns: Arc::new(RwLock::new(HashMap::new())),
1290            cooldown_period,
1291        }
1292    }
1293
1294    /// Add an alert rule
1295    pub async fn add_rule(&self, rule: AlertRule) -> Result<()> {
1296        let mut rules = self.rules.write().await;
1297        rules.push(rule);
1298        Ok(())
1299    }
1300
1301    /// Add an alert channel
1302    pub async fn add_channel(&self, channel: Box<dyn AlertChannel>) {
1303        let mut channels = self.channels.write().await;
1304        channels.push(channel);
1305    }
1306
1307    /// Trigger an alert
1308    pub async fn trigger_alert(&self, alert: Alert) -> Result<()> {
1309        // Check cooldown
1310        let mut cooldowns = self.cooldowns.write().await;
1311        if let Some(last_trigger) = cooldowns.get(&alert.id)
1312            && last_trigger.elapsed() < self.cooldown_period
1313        {
1314            return Ok(()); // Skip due to cooldown
1315        }
1316
1317        // Record alert
1318        let mut active_alerts = self.active_alerts.write().await;
1319        active_alerts.insert(alert.id.clone(), alert.clone());
1320        cooldowns.insert(alert.id.clone(), Instant::now());
1321
1322        // Send to all channels
1323        let channels = self.channels.read().await;
1324        for channel in channels.iter() {
1325            let _ = channel.send_alert(&alert).await;
1326        }
1327
1328        Ok(())
1329    }
1330
1331    /// Evaluate all rules
1332    pub async fn evaluate_rules(&self) -> Result<()> {
1333        let rules = self.rules.read().await.clone();
1334
1335        for _rule in rules {
1336            // Evaluate condition
1337            // This would check actual metric values
1338            // For now, this is a placeholder
1339        }
1340
1341        Ok(())
1342    }
1343
1344    /// Get active alerts
1345    pub async fn get_active_alerts(&self) -> Vec<Alert> {
1346        self.active_alerts.read().await.values().cloned().collect()
1347    }
1348
1349    /// Get recent alerts
1350    pub async fn get_recent_alerts(&self, count: usize) -> Vec<Alert> {
1351        let mut alerts: Vec<_> = self.active_alerts.read().await.values().cloned().collect();
1352        alerts.sort_by_key(|a| std::cmp::Reverse(a.triggered_at));
1353        alerts.truncate(count);
1354        alerts
1355    }
1356}
1357
1358impl PerformanceProfiler {
1359    /// Create a new performance profiler
1360    pub fn new(sample_rate: f64) -> Self {
1361        Self {
1362            profiles: Arc::new(RwLock::new(HashMap::new())),
1363            completed: Arc::new(RwLock::new(VecDeque::new())),
1364            sample_rate,
1365        }
1366    }
1367
1368    /// Start a profile
1369    pub async fn start_profile(&self, name: String) {
1370        if rand::random::<f64>() > self.sample_rate {
1371            return; // Skip based on sampling rate
1372        }
1373
1374        let mut profiles = self.profiles.write().await;
1375        profiles.insert(
1376            name.clone(),
1377            Profile {
1378                name,
1379                started_at: Instant::now(),
1380                samples: Vec::new(),
1381            },
1382        );
1383    }
1384
1385    /// Record a sample
1386    pub async fn record_sample(&self, profile_name: &str) {
1387        let mut profiles = self.profiles.write().await;
1388
1389        if let Some(profile) = profiles.get_mut(profile_name) {
1390            profile.samples.push(ProfileSample {
1391                _timestamp: Instant::now(),
1392                cpu_usage: Self::get_cpu_usage(),
1393                memory_bytes: Self::get_memory_usage(),
1394                _operations: HashMap::new(), // Would track actual operations
1395            });
1396        }
1397    }
1398
1399    /// End a profile
1400    pub async fn end_profile(&self, name: &str) {
1401        let mut profiles = self.profiles.write().await;
1402
1403        if let Some(profile) = profiles.remove(name) {
1404            let duration = profile.started_at.elapsed();
1405
1406            let avg_cpu = profile.samples.iter().map(|s| s.cpu_usage).sum::<f64>()
1407                / profile.samples.len().max(1) as f64;
1408
1409            let peak_memory = profile
1410                .samples
1411                .iter()
1412                .map(|s| s.memory_bytes)
1413                .max()
1414                .unwrap_or(0);
1415
1416            let completed_profile = CompletedProfile {
1417                name: profile.name,
1418                duration,
1419                avg_cpu,
1420                peak_memory,
1421                operations: HashMap::new(), // Aggregate operations
1422            };
1423
1424            let mut completed = self.completed.write().await;
1425            completed.push_back(completed_profile);
1426
1427            // Keep only recent profiles
1428            if completed.len() > 100 {
1429                completed.pop_front();
1430            }
1431        }
1432    }
1433
1434    /// Get current profile data
1435    pub async fn get_current_profile(&self) -> Option<ProfileData> {
1436        Some(ProfileData {
1437            cpu_usage: Self::get_cpu_usage(),
1438            memory_bytes: Self::get_memory_usage(),
1439            thread_count: Self::get_thread_count(),
1440            active_profiles: self.profiles.read().await.len(),
1441        })
1442    }
1443
1444    /// Get CPU usage (placeholder implementation)
1445    fn get_cpu_usage() -> f64 {
1446        // In real implementation, would use platform-specific APIs
1447        rand::random::<f64>() * 100.0
1448    }
1449
1450    /// Get memory usage (placeholder implementation)
1451    fn get_memory_usage() -> u64 {
1452        // In real implementation, would use platform-specific APIs
1453        1024 * 1024 * 512 // 512MB placeholder
1454    }
1455
1456    /// Get thread count (placeholder implementation)
1457    fn get_thread_count() -> usize {
1458        // In real implementation, would use platform-specific APIs
1459        8
1460    }
1461}
1462
1463impl DebugLogger {
1464    /// Create a new debug logger
1465    pub fn new(level: LogLevel) -> Self {
1466        Self {
1467            level,
1468            buffer: Arc::new(RwLock::new(VecDeque::new())),
1469            channels: Arc::new(RwLock::new(Vec::new())),
1470        }
1471    }
1472
1473    /// Log a message
1474    pub async fn log(&self, level: LogLevel, component: &str, message: &str) {
1475        if level > self.level {
1476            return; // Skip lower priority logs
1477        }
1478
1479        let entry = LogEntry {
1480            timestamp: Instant::now(),
1481            level,
1482            component: component.to_string(),
1483            message: message.to_string(),
1484            data: None,
1485        };
1486
1487        // Add to buffer
1488        let mut buffer = self.buffer.write().await;
1489        buffer.push_back(entry.clone());
1490
1491        // Keep buffer size limited
1492        if buffer.len() > 10000 {
1493            buffer.pop_front();
1494        }
1495
1496        // Send to channels
1497        let channels = self.channels.read().await;
1498        for channel in channels.iter() {
1499            let _ = channel.send(entry.clone());
1500        }
1501    }
1502
1503    /// Log error
1504    pub async fn error(&self, component: &str, message: &str) {
1505        self.log(LogLevel::Error, component, message).await;
1506    }
1507
1508    /// Log warning
1509    pub async fn warn(&self, component: &str, message: &str) {
1510        self.log(LogLevel::Warn, component, message).await;
1511    }
1512
1513    /// Log info
1514    pub async fn info(&self, component: &str, message: &str) {
1515        self.log(LogLevel::Info, component, message).await;
1516    }
1517
1518    /// Log debug
1519    pub async fn debug(&self, component: &str, message: &str) {
1520        self.log(LogLevel::Debug, component, message).await;
1521    }
1522
1523    /// Log trace
1524    pub async fn trace(&self, component: &str, message: &str) {
1525        self.log(LogLevel::Trace, component, message).await;
1526    }
1527
1528    /// Subscribe to log stream
1529    pub async fn subscribe(&self) -> mpsc::UnboundedReceiver<LogEntry> {
1530        let (tx, rx) = mpsc::unbounded_channel();
1531        let mut channels = self.channels.write().await;
1532        channels.push(tx);
1533        rx
1534    }
1535
1536    /// Get recent logs
1537    pub async fn get_recent_logs(&self, count: usize) -> Vec<LogEntry> {
1538        let buffer = self.buffer.read().await;
1539        buffer.iter().rev().take(count).cloned().collect()
1540    }
1541}
1542
1543/// Network health status
1544#[derive(Debug, Clone)]
1545pub struct NetworkHealth {
1546    /// Overall health score (0.0-1.0)
1547    pub score: f64,
1548
1549    /// Health status
1550    pub status: HealthStatus,
1551
1552    /// Number of active nodes
1553    pub active_nodes: u64,
1554
1555    /// Current churn rate
1556    pub churn_rate: f64,
1557
1558    /// Routing success rate
1559    pub routing_success_rate: f64,
1560
1561    /// Storage utilization
1562    pub storage_utilization: f64,
1563
1564    /// Number of active alerts
1565    pub active_alerts: usize,
1566}
1567
1568/// Health status levels
1569#[derive(Debug, Clone, PartialEq)]
1570pub enum HealthStatus {
1571    Healthy,
1572    Degraded,
1573    Critical,
1574}
1575
1576/// Dashboard data structure
1577#[derive(Debug, Clone)]
1578pub struct DashboardData {
1579    /// Current health
1580    pub health: NetworkHealth,
1581
1582    /// Current metric values
1583    pub metrics: HashMap<String, f64>,
1584
1585    /// Recent alerts
1586    pub recent_alerts: Vec<Alert>,
1587
1588    /// Recent anomalies
1589    pub anomalies: Vec<Anomaly>,
1590
1591    /// Performance data
1592    pub performance: Option<ProfileData>,
1593}
1594
1595/// Current profile data
1596#[derive(Debug, Clone)]
1597pub struct ProfileData {
1598    /// CPU usage percentage
1599    pub cpu_usage: f64,
1600
1601    /// Memory usage in bytes
1602    pub memory_bytes: u64,
1603
1604    /// Number of threads
1605    pub thread_count: usize,
1606
1607    /// Active profiles
1608    pub active_profiles: usize,
1609}
1610
1611#[cfg(test)]
1612mod tests {
1613    use super::*;
1614
1615    #[tokio::test]
1616    async fn test_anomaly_detection() {
1617        let detector = AnomalyDetector::new(100);
1618
1619        // Add normal values
1620        for i in 0..50 {
1621            detector
1622                .update_metric("test_metric", 50.0 + (i as f64 % 10.0))
1623                .await;
1624        }
1625
1626        // Add anomalous value
1627        detector.update_metric("test_metric", 200.0).await;
1628
1629        // Check anomalies detected
1630        let anomalies = detector.get_recent_anomalies().await;
1631        assert!(!anomalies.is_empty());
1632        assert_eq!(anomalies[0].anomaly_type, AnomalyType::Statistical);
1633    }
1634
1635    #[tokio::test]
1636    async fn test_alert_cooldown() {
1637        let alert_manager = AlertManager::new(Duration::from_secs(60));
1638
1639        let alert = Alert {
1640            id: "test_alert".to_string(),
1641            name: "Test Alert".to_string(),
1642            severity: AlertSeverity::Warning,
1643            message: "Test message".to_string(),
1644            triggered_at: Instant::now(),
1645            metrics: HashMap::new(),
1646        };
1647
1648        // First alert should trigger
1649        alert_manager.trigger_alert(alert.clone()).await.unwrap();
1650        assert_eq!(alert_manager.get_active_alerts().await.len(), 1);
1651
1652        // Second alert should be skipped due to cooldown
1653        alert_manager.trigger_alert(alert).await.unwrap();
1654        assert_eq!(alert_manager.get_active_alerts().await.len(), 1);
1655    }
1656
1657    #[tokio::test]
1658    async fn test_performance_profiling() {
1659        let profiler = PerformanceProfiler::new(1.0); // 100% sampling
1660
1661        profiler.start_profile("test_operation".to_string()).await;
1662
1663        // Record some samples
1664        for _ in 0..5 {
1665            profiler.record_sample("test_operation").await;
1666            tokio::time::sleep(Duration::from_millis(10)).await;
1667        }
1668
1669        profiler.end_profile("test_operation").await;
1670
1671        // Check profile was completed
1672        let completed = profiler.completed.read().await;
1673        assert_eq!(completed.len(), 1);
1674        assert_eq!(completed[0].name, "test_operation");
1675    }
1676
1677    #[tokio::test]
1678    async fn test_debug_logging() {
1679        let logger = DebugLogger::new(LogLevel::Debug);
1680
1681        // Subscribe to logs
1682        let mut rx = logger.subscribe().await;
1683
1684        // Log messages
1685        logger.error("test", "Error message").await;
1686        logger.warn("test", "Warning message").await;
1687        logger.info("test", "Info message").await;
1688        logger.debug("test", "Debug message").await;
1689        logger.trace("test", "Trace message").await; // Should be filtered
1690
1691        // Check received logs
1692        let mut count = 0;
1693        while let Ok(entry) = rx.try_recv() {
1694            count += 1;
1695            assert!(entry.level <= LogLevel::Debug);
1696        }
1697        assert_eq!(count, 4); // Trace should be filtered out
1698    }
1699}