Skip to main content

trustformers_debug/
realtime_dashboard.rs

1//! Real-Time Debugging Dashboard
2//!
3//! This module provides a modern, real-time debugging dashboard with WebSocket support,
4//! interactive visualizations, and live data streaming for comprehensive neural network monitoring.
5
6use anyhow::Result;
7use scirs2_core::random::*; // SciRS2 Integration Policy
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, VecDeque};
10use std::sync::{Arc, Mutex};
11use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
12use tokio::sync::broadcast;
13use tokio::time::interval;
14use tokio_stream::wrappers::BroadcastStream;
15use uuid::Uuid;
16
17/// Configuration for the real-time dashboard
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct DashboardConfig {
20    /// Port for WebSocket server
21    pub websocket_port: u16,
22    /// Update frequency in milliseconds
23    pub update_frequency_ms: u64,
24    /// Maximum number of data points to keep in memory
25    pub max_data_points: usize,
26    /// Enable GPU monitoring
27    pub enable_gpu_monitoring: bool,
28    /// Enable memory profiling
29    pub enable_memory_profiling: bool,
30    /// Enable network traffic monitoring
31    pub enable_network_monitoring: bool,
32    /// Enable performance alerts
33    pub enable_performance_alerts: bool,
34    /// Alert thresholds
35    pub alert_thresholds: AlertThresholds,
36}
37
38impl Default for DashboardConfig {
39    fn default() -> Self {
40        Self {
41            websocket_port: 8080,
42            update_frequency_ms: 100,
43            max_data_points: 1000,
44            enable_gpu_monitoring: true,
45            enable_memory_profiling: true,
46            enable_network_monitoring: false,
47            enable_performance_alerts: true,
48            alert_thresholds: AlertThresholds::default(),
49        }
50    }
51}
52
53/// Alert threshold configuration
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct AlertThresholds {
56    /// Memory usage threshold (percentage)
57    pub memory_threshold: f64,
58    /// GPU utilization threshold (percentage)
59    pub gpu_utilization_threshold: f64,
60    /// Temperature threshold (Celsius)
61    pub temperature_threshold: f64,
62    /// Loss spike threshold
63    pub loss_spike_threshold: f64,
64    /// Gradient norm threshold
65    pub gradient_norm_threshold: f64,
66}
67
68impl Default for AlertThresholds {
69    fn default() -> Self {
70        Self {
71            memory_threshold: 90.0,
72            gpu_utilization_threshold: 95.0,
73            temperature_threshold: 80.0,
74            loss_spike_threshold: 2.0,
75            gradient_norm_threshold: 10.0,
76        }
77    }
78}
79
80/// Real-time metric data point
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct MetricDataPoint {
83    pub timestamp: u64,
84    pub value: f64,
85    pub label: String,
86    pub category: MetricCategory,
87}
88
89/// Categories of metrics for organization
90#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
91pub enum MetricCategory {
92    Training,
93    Memory,
94    GPU,
95    Network,
96    Performance,
97    Custom(String),
98}
99
100/// Dashboard alert
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct DashboardAlert {
103    pub id: String,
104    pub timestamp: u64,
105    pub severity: AlertSeverity,
106    pub category: MetricCategory,
107    pub title: String,
108    pub message: String,
109    pub value: Option<f64>,
110    pub threshold: Option<f64>,
111}
112
113/// Alert severity levels
114#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
115pub enum AlertSeverity {
116    Info,
117    Warning,
118    Error,
119    Critical,
120}
121
122/// WebSocket message types
123#[derive(Debug, Clone, Serialize, Deserialize)]
124#[serde(tag = "type")]
125pub enum WebSocketMessage {
126    MetricUpdate {
127        data: Vec<MetricDataPoint>,
128    },
129    Alert {
130        alert: DashboardAlert,
131    },
132    ConfigUpdate {
133        config: DashboardConfig,
134    },
135    SessionInfo {
136        session_id: String,
137        uptime: u64,
138    },
139    HistoricalData {
140        category: MetricCategory,
141        data: Vec<MetricDataPoint>,
142    },
143    SystemStats {
144        stats: SystemStats,
145    },
146    #[serde(untagged)]
147    Generic {
148        message_type: String,
149        data: serde_json::Value,
150        timestamp: u64,
151        session_id: String,
152    },
153}
154
155/// Anomaly detection result
156#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct AnomalyDetection {
158    pub timestamp: u64,
159    pub value: f64,
160    pub expected_range: (f64, f64),
161    pub anomaly_type: AnomalyType,
162    pub confidence_score: f64,
163    pub category: MetricCategory,
164    pub description: String,
165}
166
167/// Types of anomalies that can be detected
168#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
169pub enum AnomalyType {
170    Spike,
171    Drop,
172    GradualIncrease,
173    GradualDecrease,
174    Outlier,
175}
176
177/// Advanced dashboard visualization data
178#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct DashboardVisualizationData {
180    pub heatmap_data: HashMap<MetricCategory, HeatmapData>,
181    pub time_series_data: HashMap<MetricCategory, Vec<TimeSeriesPoint>>,
182    pub correlation_matrix: Vec<Vec<f64>>,
183    pub performance_distribution: HashMap<MetricCategory, HistogramData>,
184    pub generated_at: u64,
185    pub session_id: String,
186}
187
188/// Heatmap data for metric visualization
189#[derive(Debug, Clone, Serialize, Deserialize)]
190pub struct HeatmapData {
191    pub intensity: f64,
192    pub normalized_intensity: f64,
193    pub data_points: usize,
194    pub timestamp: u64,
195}
196
197/// Time series data point for trend visualization
198#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct TimeSeriesPoint {
200    pub timestamp: u64,
201    pub value: f64,
202    pub label: String,
203}
204
205/// Histogram data for performance distribution analysis
206#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct HistogramData {
208    pub bins: Vec<HistogramBin>,
209    pub max_frequency: usize,
210}
211
212/// Individual histogram bin
213#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct HistogramBin {
215    pub range_start: f64,
216    pub range_end: f64,
217    pub frequency: usize,
218}
219
220/// Performance prediction result
221#[derive(Debug, Clone, Serialize, Deserialize)]
222pub struct PerformancePrediction {
223    pub category: MetricCategory,
224    pub predicted_value: f64,
225    pub confidence_interval: (f64, f64),
226    pub trend_direction: TrendDirection,
227    pub trend_strength: f64,
228    pub prediction_horizon_hours: u64,
229    pub model_accuracy: f64,
230    pub generated_at: u64,
231    pub recommendations: Vec<String>,
232}
233
234/// Trend direction for predictions
235#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
236pub enum TrendDirection {
237    Increasing,
238    Decreasing,
239    Stable,
240}
241
242/// Dashboard theme configuration
243#[derive(Debug, Clone, Serialize, Deserialize)]
244pub struct DashboardTheme {
245    pub name: String,
246    pub primary_color: String,
247    pub secondary_color: String,
248    pub background_color: String,
249    pub text_color: String,
250    pub accent_color: String,
251    pub chart_colors: Vec<String>,
252    pub dark_mode: bool,
253    pub font_family: String,
254    pub border_radius: u8,
255}
256
257impl Default for DashboardTheme {
258    fn default() -> Self {
259        Self {
260            name: "Default".to_string(),
261            primary_color: "#3b82f6".to_string(),
262            secondary_color: "#64748b".to_string(),
263            background_color: "#ffffff".to_string(),
264            text_color: "#1f2937".to_string(),
265            accent_color: "#10b981".to_string(),
266            chart_colors: vec![
267                "#3b82f6".to_string(),
268                "#ef4444".to_string(),
269                "#10b981".to_string(),
270                "#f59e0b".to_string(),
271                "#8b5cf6".to_string(),
272            ],
273            dark_mode: false,
274            font_family: "Inter, sans-serif".to_string(),
275            border_radius: 8,
276        }
277    }
278}
279
280/// Export format options
281#[derive(Debug, Clone, Serialize, Deserialize)]
282pub enum ExportFormat {
283    JSON,
284    CSV,
285    MessagePack,
286}
287
288/// System statistics
289#[derive(Debug, Clone, Serialize, Deserialize)]
290pub struct SystemStats {
291    pub uptime: u64,
292    pub total_alerts: usize,
293    pub active_connections: usize,
294    pub data_points_collected: usize,
295    pub memory_usage_mb: f64,
296    pub cpu_usage_percent: f64,
297}
298
299/// Real-time dashboard server
300#[derive(Debug)]
301pub struct RealtimeDashboard {
302    config: Arc<Mutex<DashboardConfig>>,
303    session_id: String,
304    start_time: Instant,
305    metric_data: Arc<Mutex<HashMap<MetricCategory, VecDeque<MetricDataPoint>>>>,
306    alert_history: Arc<Mutex<VecDeque<DashboardAlert>>>,
307    websocket_sender: broadcast::Sender<WebSocketMessage>,
308    active_connections: Arc<Mutex<usize>>,
309    total_data_points: Arc<Mutex<usize>>,
310    is_running: Arc<Mutex<bool>>,
311}
312
313impl RealtimeDashboard {
314    /// Create new real-time dashboard
315    pub fn new(config: DashboardConfig) -> Self {
316        let (websocket_sender, _) = broadcast::channel(1000);
317
318        Self {
319            config: Arc::new(Mutex::new(config)),
320            session_id: Uuid::new_v4().to_string(),
321            start_time: Instant::now(),
322            metric_data: Arc::new(Mutex::new(HashMap::new())),
323            alert_history: Arc::new(Mutex::new(VecDeque::new())),
324            websocket_sender,
325            active_connections: Arc::new(Mutex::new(0)),
326            total_data_points: Arc::new(Mutex::new(0)),
327            is_running: Arc::new(Mutex::new(false)),
328        }
329    }
330
331    /// Start the dashboard server
332    pub async fn start(&self) -> Result<()> {
333        {
334            let mut running = self
335                .is_running
336                .lock()
337                .map_err(|_| anyhow::anyhow!("Failed to acquire running state lock"))?;
338            if *running {
339                return Ok(());
340            }
341            *running = true;
342        }
343
344        // Start periodic data collection
345        self.start_data_collection().await?;
346
347        // Start periodic system stats updates
348        self.start_system_stats_updates().await?;
349
350        // Start alert monitoring
351        self.start_alert_monitoring().await?;
352
353        Ok(())
354    }
355
356    /// Stop the dashboard server
357    pub fn stop(&self) {
358        if let Ok(mut running) = self.is_running.lock() {
359            *running = false;
360        }
361    }
362
363    /// Add a metric data point
364    pub fn add_metric(&self, category: MetricCategory, label: String, value: f64) -> Result<()> {
365        let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis() as u64;
366
367        let data_point = MetricDataPoint {
368            timestamp,
369            value,
370            label,
371            category: category.clone(),
372        };
373
374        // Add to metric data with size limit
375        {
376            let mut data = self
377                .metric_data
378                .lock()
379                .map_err(|_| anyhow::anyhow!("Failed to acquire metric data lock"))?;
380            let category_data = data.entry(category.clone()).or_insert_with(VecDeque::new);
381
382            category_data.push_back(data_point.clone());
383
384            let max_points = self
385                .config
386                .lock()
387                .map_err(|_| anyhow::anyhow!("Failed to acquire config lock"))?
388                .max_data_points;
389            while category_data.len() > max_points {
390                category_data.pop_front();
391            }
392        }
393
394        // Increment total data points counter
395        {
396            if let Ok(mut total) = self.total_data_points.lock() {
397                *total += 1;
398            }
399        }
400
401        // Broadcast update to WebSocket clients
402        let message = WebSocketMessage::MetricUpdate {
403            data: vec![data_point],
404        };
405
406        let _ = self.websocket_sender.send(message);
407
408        // Check for alerts
409        self.check_for_alerts(&category, value);
410
411        Ok(())
412    }
413
414    /// Add multiple metrics at once
415    pub fn add_metrics(&self, metrics: Vec<(MetricCategory, String, f64)>) -> Result<()> {
416        let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis() as u64;
417
418        let mut data_points = Vec::new();
419
420        // Process all metrics
421        for (category, label, value) in metrics {
422            let data_point = MetricDataPoint {
423                timestamp,
424                value,
425                label,
426                category: category.clone(),
427            };
428
429            // Add to metric data
430            {
431                let mut data = self.metric_data.lock().expect("lock should not be poisoned");
432                let category_data = data.entry(category.clone()).or_default();
433                category_data.push_back(data_point.clone());
434
435                let max_points =
436                    self.config.lock().expect("lock should not be poisoned").max_data_points;
437                while category_data.len() > max_points {
438                    category_data.pop_front();
439                }
440            }
441
442            data_points.push(data_point);
443
444            // Check for alerts
445            self.check_for_alerts(&category, value);
446        }
447
448        // Update total counter
449        {
450            let mut total = self.total_data_points.lock().expect("lock should not be poisoned");
451            *total += data_points.len();
452        }
453
454        // Broadcast batch update
455        let message = WebSocketMessage::MetricUpdate { data: data_points };
456        let _ = self.websocket_sender.send(message);
457
458        Ok(())
459    }
460
461    /// Create an alert
462    pub fn create_alert(
463        &self,
464        severity: AlertSeverity,
465        category: MetricCategory,
466        title: String,
467        message: String,
468        value: Option<f64>,
469        threshold: Option<f64>,
470    ) -> Result<()> {
471        let alert = DashboardAlert {
472            id: Uuid::new_v4().to_string(),
473            timestamp: SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis() as u64,
474            severity,
475            category,
476            title,
477            message,
478            value,
479            threshold,
480        };
481
482        // Add to alert history
483        {
484            let mut history = self.alert_history.lock().expect("lock should not be poisoned");
485            history.push_back(alert.clone());
486
487            // Keep only last 100 alerts
488            while history.len() > 100 {
489                history.pop_front();
490            }
491        }
492
493        // Broadcast alert
494        let message = WebSocketMessage::Alert { alert };
495        let _ = self.websocket_sender.send(message);
496
497        Ok(())
498    }
499
500    /// Get historical data for a category
501    pub fn get_historical_data(&self, category: &MetricCategory) -> Vec<MetricDataPoint> {
502        let data = self.metric_data.lock().expect("lock should not be poisoned");
503        data.get(category)
504            .map(|deque| deque.iter().cloned().collect())
505            .unwrap_or_default()
506    }
507
508    /// Get current system stats
509    pub fn get_system_stats(&self) -> SystemStats {
510        let uptime = self.start_time.elapsed().as_secs();
511        let total_alerts = self.alert_history.lock().expect("lock should not be poisoned").len();
512        let active_connections =
513            *self.active_connections.lock().expect("lock should not be poisoned");
514        let data_points_collected =
515            *self.total_data_points.lock().expect("lock should not be poisoned");
516
517        // Simple memory and CPU usage estimation
518        let memory_usage_mb = self.estimate_memory_usage();
519        let cpu_usage_percent = self.estimate_cpu_usage();
520
521        SystemStats {
522            uptime,
523            total_alerts,
524            active_connections,
525            data_points_collected,
526            memory_usage_mb,
527            cpu_usage_percent,
528        }
529    }
530
531    /// Subscribe to WebSocket messages
532    pub fn subscribe(&self) -> BroadcastStream<WebSocketMessage> {
533        // Increment connection counter
534        {
535            let mut connections =
536                self.active_connections.lock().expect("lock should not be poisoned");
537            *connections += 1;
538        }
539
540        BroadcastStream::new(self.websocket_sender.subscribe())
541    }
542
543    /// Update dashboard configuration
544    pub fn update_config(&self, new_config: DashboardConfig) -> Result<()> {
545        {
546            let mut config = self.config.lock().expect("lock should not be poisoned");
547            *config = new_config.clone();
548        }
549
550        // Broadcast configuration update
551        let message = WebSocketMessage::ConfigUpdate { config: new_config };
552        let _ = self.websocket_sender.send(message);
553
554        Ok(())
555    }
556
557    /// Get current configuration
558    pub fn get_config(&self) -> DashboardConfig {
559        self.config.lock().expect("lock should not be poisoned").clone()
560    }
561
562    /// Start periodic data collection
563    async fn start_data_collection(&self) -> Result<()> {
564        let config = self.config.clone();
565        let _metric_data = self.metric_data.clone();
566        let websocket_sender = self.websocket_sender.clone();
567        let is_running = self.is_running.clone();
568
569        tokio::spawn(async move {
570            let mut interval = interval(Duration::from_millis(
571                config.lock().expect("lock should not be poisoned").update_frequency_ms,
572            ));
573
574            while *is_running.lock().expect("lock should not be poisoned") {
575                interval.tick().await;
576
577                // Collect system metrics periodically
578                if let Ok(metrics) = Self::collect_system_metrics(&config).await {
579                    let message = WebSocketMessage::MetricUpdate { data: metrics };
580                    let _ = websocket_sender.send(message);
581                }
582            }
583        });
584
585        Ok(())
586    }
587
588    /// Start system stats updates
589    async fn start_system_stats_updates(&self) -> Result<()> {
590        let websocket_sender = self.websocket_sender.clone();
591        let start_time = self.start_time;
592        let alert_history = self.alert_history.clone();
593        let active_connections = self.active_connections.clone();
594        let total_data_points = self.total_data_points.clone();
595        let is_running = self.is_running.clone();
596
597        tokio::spawn(async move {
598            let mut interval = interval(Duration::from_secs(5)); // Update every 5 seconds
599
600            while *is_running.lock().expect("lock should not be poisoned") {
601                interval.tick().await;
602
603                let stats = SystemStats {
604                    uptime: start_time.elapsed().as_secs(),
605                    total_alerts: alert_history.lock().expect("lock should not be poisoned").len(),
606                    active_connections: *active_connections
607                        .lock()
608                        .expect("lock should not be poisoned"),
609                    data_points_collected: *total_data_points
610                        .lock()
611                        .expect("lock should not be poisoned"),
612                    memory_usage_mb: 0.0,   // Placeholder
613                    cpu_usage_percent: 0.0, // Placeholder
614                };
615
616                let message = WebSocketMessage::SystemStats { stats };
617                let _ = websocket_sender.send(message);
618            }
619        });
620
621        Ok(())
622    }
623
624    /// Start alert monitoring
625    async fn start_alert_monitoring(&self) -> Result<()> {
626        let config = self.config.clone();
627        let metric_data = self.metric_data.clone();
628        let is_running = self.is_running.clone();
629
630        tokio::spawn(async move {
631            let mut interval = interval(Duration::from_secs(1));
632
633            while *is_running.lock().expect("lock should not be poisoned") {
634                interval.tick().await;
635
636                // Monitor for threshold breaches and create alerts
637                Self::check_threshold_breaches(&config, &metric_data).await;
638            }
639        });
640
641        Ok(())
642    }
643
644    /// Collect system metrics
645    async fn collect_system_metrics(
646        config: &Arc<Mutex<DashboardConfig>>,
647    ) -> Result<Vec<MetricDataPoint>> {
648        let mut metrics = Vec::new();
649        let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis() as u64;
650
651        let cfg = config.lock().expect("lock should not be poisoned");
652
653        if cfg.enable_memory_profiling {
654            // Simulate memory metrics
655            let memory_usage = Self::get_memory_usage();
656            metrics.push(MetricDataPoint {
657                timestamp,
658                value: memory_usage,
659                label: "Memory Usage".to_string(),
660                category: MetricCategory::Memory,
661            });
662        }
663
664        if cfg.enable_gpu_monitoring {
665            // Simulate GPU metrics
666            let gpu_utilization = Self::get_gpu_utilization();
667            metrics.push(MetricDataPoint {
668                timestamp,
669                value: gpu_utilization,
670                label: "GPU Utilization".to_string(),
671                category: MetricCategory::GPU,
672            });
673
674            let gpu_memory = Self::get_gpu_memory_usage();
675            metrics.push(MetricDataPoint {
676                timestamp,
677                value: gpu_memory,
678                label: "GPU Memory".to_string(),
679                category: MetricCategory::GPU,
680            });
681        }
682
683        Ok(metrics)
684    }
685
686    /// Check for alerts based on new metric value
687    fn check_for_alerts(&self, category: &MetricCategory, value: f64) {
688        let config = self.config.lock().expect("lock should not be poisoned");
689        let thresholds = &config.alert_thresholds;
690
691        match category {
692            MetricCategory::Memory if value > thresholds.memory_threshold => {
693                let _ = self.create_alert(
694                    AlertSeverity::Warning,
695                    category.clone(),
696                    "High Memory Usage".to_string(),
697                    format!(
698                        "Memory usage is {:.1}% (threshold: {:.1}%)",
699                        value, thresholds.memory_threshold
700                    ),
701                    Some(value),
702                    Some(thresholds.memory_threshold),
703                );
704            },
705            MetricCategory::GPU if value > thresholds.gpu_utilization_threshold => {
706                let _ = self.create_alert(
707                    AlertSeverity::Warning,
708                    category.clone(),
709                    "High GPU Utilization".to_string(),
710                    format!(
711                        "GPU utilization is {:.1}% (threshold: {:.1}%)",
712                        value, thresholds.gpu_utilization_threshold
713                    ),
714                    Some(value),
715                    Some(thresholds.gpu_utilization_threshold),
716                );
717            },
718            MetricCategory::Training if value > thresholds.loss_spike_threshold => {
719                let _ = self.create_alert(
720                    AlertSeverity::Error,
721                    category.clone(),
722                    "Training Loss Spike".to_string(),
723                    format!(
724                        "Loss spike detected: {:.4} (threshold: {:.4})",
725                        value, thresholds.loss_spike_threshold
726                    ),
727                    Some(value),
728                    Some(thresholds.loss_spike_threshold),
729                );
730            },
731            _ => {},
732        }
733    }
734
735    /// Check for threshold breaches across all metrics
736    async fn check_threshold_breaches(
737        config: &Arc<Mutex<DashboardConfig>>,
738        metric_data: &Arc<Mutex<HashMap<MetricCategory, VecDeque<MetricDataPoint>>>>,
739    ) {
740        let _config = config.lock().expect("lock should not be poisoned");
741        let _data = metric_data.lock().expect("lock should not be poisoned");
742
743        // Implementation would check for patterns, sustained threshold breaches, etc.
744        // This is a placeholder for more complex alert logic
745    }
746
747    /// Simulate getting memory usage
748    fn get_memory_usage() -> f64 {
749        // Placeholder - in real implementation would use system APIs
750        50.0 + (thread_rng().random::<f64>() * 40.0)
751    }
752
753    /// Simulate getting GPU utilization
754    fn get_gpu_utilization() -> f64 {
755        // Placeholder - in real implementation would use NVIDIA ML, ROCm, etc.
756        30.0 + (thread_rng().random::<f64>() * 60.0)
757    }
758
759    /// Simulate getting GPU memory usage
760    fn get_gpu_memory_usage() -> f64 {
761        // Placeholder - in real implementation would use GPU APIs
762        40.0 + (thread_rng().random::<f64>() * 50.0)
763    }
764
765    /// Estimate memory usage of dashboard
766    fn estimate_memory_usage(&self) -> f64 {
767        let data = self.metric_data.lock().expect("lock should not be poisoned");
768        let mut total_points = 0;
769
770        for deque in data.values() {
771            total_points += deque.len();
772        }
773
774        // Rough estimate: ~100 bytes per data point
775        (total_points * 100) as f64 / (1024.0 * 1024.0)
776    }
777
778    /// Estimate CPU usage
779    fn estimate_cpu_usage(&self) -> f64 {
780        // Simple placeholder - in real implementation would use system APIs
781        5.0 + (thread_rng().random::<f64>() * 10.0)
782    }
783
784    /// AI-powered anomaly detection for metric patterns
785    pub async fn detect_metric_anomalies(
786        &self,
787        category: &MetricCategory,
788    ) -> Result<Vec<AnomalyDetection>> {
789        let data = self.get_historical_data(category);
790        let mut anomalies = Vec::new();
791
792        if data.len() < 10 {
793            return Ok(anomalies); // Need sufficient data for anomaly detection
794        }
795
796        // Calculate statistical thresholds
797        let values: Vec<f64> = data.iter().map(|d| d.value).collect();
798        let mean = values.iter().sum::<f64>() / values.len() as f64;
799        let variance = values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / values.len() as f64;
800        let std_dev = variance.sqrt();
801
802        // Z-score based anomaly detection
803        let z_threshold = 2.0; // 2 standard deviations
804        for point in data.iter() {
805            let z_score = (point.value - mean).abs() / std_dev;
806            if z_score > z_threshold {
807                let anomaly_type =
808                    if point.value > mean { AnomalyType::Spike } else { AnomalyType::Drop };
809
810                anomalies.push(AnomalyDetection {
811                    timestamp: point.timestamp,
812                    value: point.value,
813                    expected_range: (mean - std_dev, mean + std_dev),
814                    anomaly_type,
815                    confidence_score: (z_score - z_threshold) / z_threshold,
816                    category: category.clone(),
817                    description: format!(
818                        "Detected {} in {} metrics: value {} (Z-score: {:.2})",
819                        match anomaly_type {
820                            AnomalyType::Spike => "spike",
821                            AnomalyType::Drop => "drop",
822                            _ => "anomaly",
823                        },
824                        match category {
825                            MetricCategory::Training => "training",
826                            MetricCategory::Memory => "memory",
827                            MetricCategory::GPU => "GPU",
828                            MetricCategory::Network => "network",
829                            MetricCategory::Performance => "performance",
830                            MetricCategory::Custom(name) => name,
831                        },
832                        point.value,
833                        z_score
834                    ),
835                });
836            }
837        }
838
839        // Advanced pattern detection - look for gradual trends
840        if data.len() >= 20 {
841            let recent_window = &data[data.len() - 10..];
842            let earlier_window = &data[data.len() - 20..data.len() - 10];
843
844            let recent_avg =
845                recent_window.iter().map(|d| d.value).sum::<f64>() / recent_window.len() as f64;
846            let earlier_avg =
847                earlier_window.iter().map(|d| d.value).sum::<f64>() / earlier_window.len() as f64;
848
849            let trend_change = (recent_avg - earlier_avg) / earlier_avg;
850
851            if trend_change.abs() > 0.3 {
852                // 30% change
853                anomalies.push(AnomalyDetection {
854                    timestamp: recent_window
855                        .last()
856                        .expect("recent_window has at least 10 elements")
857                        .timestamp,
858                    value: recent_avg,
859                    expected_range: (earlier_avg * 0.9, earlier_avg * 1.1),
860                    anomaly_type: if trend_change > 0.0 {
861                        AnomalyType::GradualIncrease
862                    } else {
863                        AnomalyType::GradualDecrease
864                    },
865                    confidence_score: trend_change.abs(),
866                    category: category.clone(),
867                    description: format!(
868                        "Detected gradual {} trend: {:.1}% change over recent measurements",
869                        if trend_change > 0.0 { "increase" } else { "decrease" },
870                        trend_change.abs() * 100.0
871                    ),
872                });
873            }
874        }
875
876        Ok(anomalies)
877    }
878
879    /// Generate advanced visualization data for modern dashboard components
880    pub fn generate_advanced_visualizations(&self) -> Result<DashboardVisualizationData> {
881        let mut heatmap_data = HashMap::new();
882        let mut time_series_data = HashMap::new();
883        let mut correlation_matrix = Vec::new();
884        let mut performance_distribution = HashMap::new();
885
886        // Generate heatmap data for different metric categories
887        for (category, data) in self.metric_data.lock().expect("lock should not be poisoned").iter()
888        {
889            if data.len() >= 10 {
890                let recent_data: Vec<f64> = data.iter().rev().take(10).map(|d| d.value).collect();
891                let avg_value = recent_data.iter().sum::<f64>() / recent_data.len() as f64;
892
893                heatmap_data.insert(
894                    category.clone(),
895                    HeatmapData {
896                        intensity: avg_value,
897                        normalized_intensity: (avg_value / (avg_value + 1.0)).min(1.0), // Normalize to 0-1
898                        data_points: recent_data.len(),
899                        timestamp: SystemTime::now()
900                            .duration_since(UNIX_EPOCH)
901                            .expect("System time should be after UNIX_EPOCH")
902                            .as_secs(),
903                    },
904                );
905
906                // Time series data for trend visualization
907                let time_series: Vec<TimeSeriesPoint> = data
908                    .iter()
909                    .map(|d| TimeSeriesPoint {
910                        timestamp: d.timestamp,
911                        value: d.value,
912                        label: d.label.clone(),
913                    })
914                    .collect();
915
916                time_series_data.insert(category.clone(), time_series);
917
918                // Performance distribution data
919                let values: Vec<f64> = data.iter().map(|d| d.value).collect();
920                let histogram = self.create_histogram(&values, 10);
921                performance_distribution.insert(category.clone(), histogram);
922            }
923        }
924
925        // Generate correlation matrix for different metrics
926        let categories: Vec<&MetricCategory> = heatmap_data.keys().collect();
927        for (i, cat1) in categories.iter().enumerate() {
928            let mut row = Vec::new();
929            for (j, cat2) in categories.iter().enumerate() {
930                if i == j {
931                    row.push(1.0); // Perfect correlation with itself
932                } else {
933                    // Calculate correlation coefficient (simplified)
934                    let corr = self.calculate_correlation_coefficient(cat1, cat2);
935                    row.push(corr);
936                }
937            }
938            correlation_matrix.push(row);
939        }
940
941        Ok(DashboardVisualizationData {
942            heatmap_data,
943            time_series_data,
944            correlation_matrix,
945            performance_distribution,
946            generated_at: SystemTime::now()
947                .duration_since(UNIX_EPOCH)
948                .expect("System time should be after UNIX_EPOCH")
949                .as_secs(),
950            session_id: self.session_id.clone(),
951        })
952    }
953
954    /// AI-powered performance prediction based on historical trends
955    pub async fn predict_performance_trends(
956        &self,
957        category: &MetricCategory,
958        hours_ahead: u64,
959    ) -> Result<PerformancePrediction> {
960        let data = self.get_historical_data(category);
961
962        if data.len() < 20 {
963            return Err(anyhow::anyhow!(
964                "Insufficient data for prediction (need at least 20 points)"
965            ));
966        }
967
968        let values: Vec<f64> = data.iter().map(|d| d.value).collect();
969        let timestamps: Vec<u64> = data.iter().map(|d| d.timestamp).collect();
970
971        // Simple linear regression for trend prediction
972        let n = values.len() as f64;
973        let sum_x = timestamps.iter().sum::<u64>() as f64;
974        let sum_y = values.iter().sum::<f64>();
975        let sum_xy = timestamps.iter().zip(&values).map(|(x, y)| *x as f64 * y).sum::<f64>();
976        let sum_x2 = timestamps.iter().map(|x| (*x as f64).powi(2)).sum::<f64>();
977
978        let slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x.powi(2));
979        let intercept = (sum_y - slope * sum_x) / n;
980
981        // Generate predictions
982        let current_time = SystemTime::now()
983            .duration_since(UNIX_EPOCH)
984            .expect("System time should be after UNIX_EPOCH")
985            .as_secs();
986        let prediction_time = current_time + (hours_ahead * 3600);
987        let predicted_value = slope * prediction_time as f64 + intercept;
988
989        // Calculate confidence intervals (simplified)
990        let mean = sum_y / n;
991        let variance = values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / n;
992        let std_error = (variance / n).sqrt();
993        let confidence_interval = std_error * 1.96; // 95% confidence
994
995        // Analyze trend direction and strength
996        let trend_strength = slope.abs() / mean.abs();
997        let trend_direction = if slope > 0.01 {
998            TrendDirection::Increasing
999        } else if slope < -0.01 {
1000            TrendDirection::Decreasing
1001        } else {
1002            TrendDirection::Stable
1003        };
1004
1005        Ok(PerformancePrediction {
1006            category: category.clone(),
1007            predicted_value,
1008            confidence_interval: (
1009                predicted_value - confidence_interval,
1010                predicted_value + confidence_interval,
1011            ),
1012            trend_direction,
1013            trend_strength,
1014            prediction_horizon_hours: hours_ahead,
1015            model_accuracy: 1.0 - (std_error / mean.abs()).min(1.0), // Simplified accuracy
1016            generated_at: current_time,
1017            recommendations: self.generate_performance_recommendations(
1018                &trend_direction,
1019                trend_strength,
1020                predicted_value,
1021            ),
1022        })
1023    }
1024
1025    /// Advanced dashboard theme and customization support
1026    pub fn apply_dashboard_theme(&self, theme: DashboardTheme) -> Result<()> {
1027        // This would typically update UI styling, but we'll store theme preferences
1028        let theme_message = WebSocketMessage::Generic {
1029            message_type: "theme_update".to_string(),
1030            data: serde_json::to_value(&theme)?,
1031            timestamp: SystemTime::now()
1032                .duration_since(UNIX_EPOCH)
1033                .expect("System time should be after UNIX_EPOCH")
1034                .as_secs(),
1035            session_id: self.session_id.clone(),
1036        };
1037
1038        if self.websocket_sender.send(theme_message).is_err() {
1039            // No active subscribers, but that's okay
1040        }
1041
1042        Ok(())
1043    }
1044
1045    /// Export dashboard data in various formats
1046    pub async fn export_dashboard_data(
1047        &self,
1048        format: ExportFormat,
1049        time_range: Option<(u64, u64)>,
1050    ) -> Result<Vec<u8>> {
1051        let data = if let Some((start, end)) = time_range {
1052            self.get_filtered_data(start, end)
1053        } else {
1054            self.get_all_data()
1055        };
1056
1057        match format {
1058            ExportFormat::JSON => {
1059                let json_data = serde_json::to_string_pretty(&data)?;
1060                Ok(json_data.into_bytes())
1061            },
1062            ExportFormat::CSV => {
1063                let mut csv_data = String::from("timestamp,category,label,value\n");
1064                for (category, points) in data {
1065                    for point in points {
1066                        csv_data.push_str(&format!(
1067                            "{},{:?},{},{}\n",
1068                            point.timestamp, category, point.label, point.value
1069                        ));
1070                    }
1071                }
1072                Ok(csv_data.into_bytes())
1073            },
1074            ExportFormat::MessagePack => {
1075                // Would use rmp_serde for MessagePack serialization
1076                // For now, return JSON as fallback
1077                let json_data = serde_json::to_string(&data)?;
1078                Ok(json_data.into_bytes())
1079            },
1080        }
1081    }
1082
1083    // Helper methods for advanced features
1084
1085    fn create_histogram(&self, values: &[f64], bins: usize) -> HistogramData {
1086        if values.is_empty() {
1087            return HistogramData {
1088                bins: Vec::new(),
1089                max_frequency: 0,
1090            };
1091        }
1092
1093        let min_val = values.iter().fold(f64::INFINITY, |a, &b| a.min(b));
1094        let max_val = values.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
1095        let bin_width = (max_val - min_val) / bins as f64;
1096
1097        let mut histogram_bins = vec![0; bins];
1098
1099        for &value in values {
1100            let bin_idx = ((value - min_val) / bin_width).floor() as usize;
1101            let bin_idx = bin_idx.min(bins - 1); // Ensure we don't go out of bounds
1102            histogram_bins[bin_idx] += 1;
1103        }
1104
1105        let max_frequency = *histogram_bins.iter().max().unwrap_or(&0);
1106
1107        let bins_data: Vec<HistogramBin> = histogram_bins
1108            .into_iter()
1109            .enumerate()
1110            .map(|(i, count)| HistogramBin {
1111                range_start: min_val + i as f64 * bin_width,
1112                range_end: min_val + (i + 1) as f64 * bin_width,
1113                frequency: count,
1114            })
1115            .collect();
1116
1117        HistogramData {
1118            bins: bins_data,
1119            max_frequency,
1120        }
1121    }
1122
1123    fn calculate_correlation_coefficient(
1124        &self,
1125        cat1: &MetricCategory,
1126        cat2: &MetricCategory,
1127    ) -> f64 {
1128        let data = self.metric_data.lock().expect("lock should not be poisoned");
1129
1130        let data1 = match data.get(cat1) {
1131            Some(d) => d,
1132            None => return 0.0,
1133        };
1134
1135        let data2 = match data.get(cat2) {
1136            Some(d) => d,
1137            None => return 0.0,
1138        };
1139
1140        if data1.len() < 2 || data2.len() < 2 {
1141            return 0.0;
1142        }
1143
1144        // Take the minimum length to align the datasets
1145        let min_len = data1.len().min(data2.len()).min(50); // Use at most 50 points for performance
1146        let values1: Vec<f64> = data1.iter().rev().take(min_len).map(|d| d.value).collect();
1147        let values2: Vec<f64> = data2.iter().rev().take(min_len).map(|d| d.value).collect();
1148
1149        // Calculate Pearson correlation coefficient
1150        let n = values1.len() as f64;
1151        let mean1 = values1.iter().sum::<f64>() / n;
1152        let mean2 = values2.iter().sum::<f64>() / n;
1153
1154        let covariance = values1
1155            .iter()
1156            .zip(&values2)
1157            .map(|(v1, v2)| (v1 - mean1) * (v2 - mean2))
1158            .sum::<f64>()
1159            / n;
1160
1161        let std1 = (values1.iter().map(|v| (v - mean1).powi(2)).sum::<f64>() / n).sqrt();
1162        let std2 = (values2.iter().map(|v| (v - mean2).powi(2)).sum::<f64>() / n).sqrt();
1163
1164        if std1 == 0.0 || std2 == 0.0 {
1165            0.0
1166        } else {
1167            covariance / (std1 * std2)
1168        }
1169    }
1170
1171    fn generate_performance_recommendations(
1172        &self,
1173        trend: &TrendDirection,
1174        strength: f64,
1175        predicted_value: f64,
1176    ) -> Vec<String> {
1177        let mut recommendations = Vec::new();
1178
1179        match trend {
1180            TrendDirection::Increasing => {
1181                if strength > 0.1 {
1182                    recommendations.push(
1183                        "Monitor for potential resource exhaustion due to increasing trend"
1184                            .to_string(),
1185                    );
1186                    recommendations.push("Consider scaling resources proactively".to_string());
1187                }
1188                if predicted_value > 90.0 {
1189                    recommendations.push(
1190                        "Critical threshold approaching - immediate action recommended".to_string(),
1191                    );
1192                }
1193            },
1194            TrendDirection::Decreasing => {
1195                if strength > 0.05 {
1196                    recommendations
1197                        .push("Investigate potential performance degradation".to_string());
1198                    recommendations.push("Check for resource leaks or inefficiencies".to_string());
1199                }
1200            },
1201            TrendDirection::Stable => {
1202                recommendations
1203                    .push("Performance trend is stable - continue monitoring".to_string());
1204            },
1205        }
1206
1207        if recommendations.is_empty() {
1208            recommendations.push("No specific recommendations at this time".to_string());
1209        }
1210
1211        recommendations
1212    }
1213
1214    fn get_filtered_data(
1215        &self,
1216        start: u64,
1217        end: u64,
1218    ) -> HashMap<MetricCategory, VecDeque<MetricDataPoint>> {
1219        let data = self.metric_data.lock().expect("lock should not be poisoned");
1220        let mut filtered_data = HashMap::new();
1221
1222        for (category, points) in data.iter() {
1223            let filtered_points: VecDeque<MetricDataPoint> = points
1224                .iter()
1225                .filter(|p| p.timestamp >= start && p.timestamp <= end)
1226                .cloned()
1227                .collect();
1228
1229            if !filtered_points.is_empty() {
1230                filtered_data.insert(category.clone(), filtered_points);
1231            }
1232        }
1233
1234        filtered_data
1235    }
1236
1237    fn get_all_data(&self) -> HashMap<MetricCategory, VecDeque<MetricDataPoint>> {
1238        self.metric_data.lock().expect("lock should not be poisoned").clone()
1239    }
1240}
1241
1242/// Dashboard builder for easier configuration
1243#[derive(Debug, Default)]
1244pub struct DashboardBuilder {
1245    config: DashboardConfig,
1246}
1247
1248impl DashboardBuilder {
1249    /// Create new dashboard builder
1250    pub fn new() -> Self {
1251        Self::default()
1252    }
1253
1254    /// Set WebSocket port
1255    pub fn port(mut self, port: u16) -> Self {
1256        self.config.websocket_port = port;
1257        self
1258    }
1259
1260    /// Set update frequency
1261    pub fn update_frequency(mut self, frequency_ms: u64) -> Self {
1262        self.config.update_frequency_ms = frequency_ms;
1263        self
1264    }
1265
1266    /// Set maximum data points
1267    pub fn max_data_points(mut self, max_points: usize) -> Self {
1268        self.config.max_data_points = max_points;
1269        self
1270    }
1271
1272    /// Enable/disable GPU monitoring
1273    pub fn gpu_monitoring(mut self, enabled: bool) -> Self {
1274        self.config.enable_gpu_monitoring = enabled;
1275        self
1276    }
1277
1278    /// Enable/disable memory profiling
1279    pub fn memory_profiling(mut self, enabled: bool) -> Self {
1280        self.config.enable_memory_profiling = enabled;
1281        self
1282    }
1283
1284    /// Set alert thresholds
1285    pub fn alert_thresholds(mut self, thresholds: AlertThresholds) -> Self {
1286        self.config.alert_thresholds = thresholds;
1287        self
1288    }
1289
1290    /// Build the dashboard
1291    pub fn build(self) -> RealtimeDashboard {
1292        RealtimeDashboard::new(self.config)
1293    }
1294}
1295
1296#[cfg(test)]
1297mod tests {
1298    use super::*;
1299    use futures::StreamExt;
1300    use std::time::Duration;
1301
1302    #[tokio::test]
1303    async fn test_dashboard_creation() {
1304        let dashboard = DashboardBuilder::new()
1305            .port(8081)
1306            .update_frequency(50)
1307            .max_data_points(500)
1308            .build();
1309
1310        assert_eq!(dashboard.get_config().websocket_port, 8081);
1311        assert_eq!(dashboard.get_config().update_frequency_ms, 50);
1312        assert_eq!(dashboard.get_config().max_data_points, 500);
1313    }
1314
1315    #[tokio::test]
1316    async fn test_metric_addition() {
1317        let dashboard = DashboardBuilder::new().build();
1318
1319        let result = dashboard.add_metric(MetricCategory::Training, "loss".to_string(), 0.5);
1320
1321        assert!(result.is_ok());
1322
1323        let historical_data = dashboard.get_historical_data(&MetricCategory::Training);
1324        assert_eq!(historical_data.len(), 1);
1325        assert_eq!(historical_data[0].value, 0.5);
1326        assert_eq!(historical_data[0].label, "loss");
1327    }
1328
1329    #[tokio::test]
1330    async fn test_batch_metrics() {
1331        let dashboard = DashboardBuilder::new().build();
1332
1333        let metrics = vec![
1334            (MetricCategory::Training, "loss".to_string(), 0.5),
1335            (MetricCategory::Training, "accuracy".to_string(), 0.9),
1336            (MetricCategory::GPU, "utilization".to_string(), 75.0),
1337        ];
1338
1339        let result = dashboard.add_metrics(metrics);
1340        assert!(result.is_ok());
1341
1342        let training_data = dashboard.get_historical_data(&MetricCategory::Training);
1343        assert_eq!(training_data.len(), 2);
1344
1345        let gpu_data = dashboard.get_historical_data(&MetricCategory::GPU);
1346        assert_eq!(gpu_data.len(), 1);
1347    }
1348
1349    #[tokio::test]
1350    async fn test_alert_creation() {
1351        let dashboard = DashboardBuilder::new().build();
1352
1353        let result = dashboard.create_alert(
1354            AlertSeverity::Warning,
1355            MetricCategory::Memory,
1356            "High Memory".to_string(),
1357            "Memory usage is high".to_string(),
1358            Some(95.0),
1359            Some(90.0),
1360        );
1361
1362        assert!(result.is_ok());
1363
1364        let history = dashboard.alert_history.lock().expect("lock should not be poisoned");
1365        assert_eq!(history.len(), 1);
1366        assert_eq!(history[0].title, "High Memory");
1367    }
1368
1369    #[tokio::test]
1370    async fn test_websocket_subscription() {
1371        let dashboard = DashboardBuilder::new().build();
1372
1373        let mut stream = dashboard.subscribe();
1374
1375        // Start the dashboard
1376        let dashboard_clone = Arc::new(dashboard);
1377        let dashboard_for_task = dashboard_clone.clone();
1378
1379        tokio::spawn(async move {
1380            let _ = dashboard_for_task.start().await;
1381        });
1382
1383        // Add a metric to trigger a message
1384        let _ =
1385            dashboard_clone.add_metric(MetricCategory::Training, "test_metric".to_string(), 42.0);
1386
1387        // Try to receive a message (with timeout)
1388        let message_result = tokio::time::timeout(Duration::from_millis(100), stream.next()).await;
1389
1390        dashboard_clone.stop();
1391
1392        // Check if we received a message
1393        assert!(message_result.is_ok());
1394        if let Ok(Some(Ok(message))) = message_result {
1395            match message {
1396                WebSocketMessage::MetricUpdate { data } => {
1397                    assert!(!data.is_empty());
1398                    assert_eq!(data[0].value, 42.0);
1399                    assert_eq!(data[0].label, "test_metric");
1400                },
1401                _ => panic!("Expected MetricUpdate message"),
1402            }
1403        }
1404    }
1405
1406    #[tokio::test]
1407    async fn test_system_stats() {
1408        let dashboard = DashboardBuilder::new().build();
1409
1410        // Add some data
1411        let _ = dashboard.add_metric(MetricCategory::Training, "loss".to_string(), 0.5);
1412        let _ = dashboard.create_alert(
1413            AlertSeverity::Info,
1414            MetricCategory::Training,
1415            "Test Alert".to_string(),
1416            "Test message".to_string(),
1417            None,
1418            None,
1419        );
1420
1421        let stats = dashboard.get_system_stats();
1422
1423        assert_eq!(stats.data_points_collected, 1);
1424        assert_eq!(stats.total_alerts, 1);
1425        // uptime is a Duration which is always >= 0
1426    }
1427
1428    #[tokio::test]
1429    async fn test_data_point_limit() {
1430        let dashboard = DashboardBuilder::new().max_data_points(2).build();
1431
1432        // Add 3 data points
1433        let _ = dashboard.add_metric(MetricCategory::Training, "metric1".to_string(), 1.0);
1434        let _ = dashboard.add_metric(MetricCategory::Training, "metric2".to_string(), 2.0);
1435        let _ = dashboard.add_metric(MetricCategory::Training, "metric3".to_string(), 3.0);
1436
1437        let data = dashboard.get_historical_data(&MetricCategory::Training);
1438
1439        // Should only keep the last 2 data points
1440        assert_eq!(data.len(), 2);
1441        assert_eq!(data[0].value, 2.0); // First of the remaining two
1442        assert_eq!(data[1].value, 3.0); // Last added
1443    }
1444}