Skip to main content

trustformers_debug/
realtime_dashboard.rs

1//! Real-Time Debugging Dashboard
2//!
3//! This module provides a modern, real-time debugging dashboard with WebSocket support,
4//! interactive visualizations, and live data streaming for comprehensive neural network monitoring.
5
6use anyhow::Result;
7use scirs2_core::random::*; // SciRS2 Integration Policy
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, VecDeque};
10use std::sync::{Arc, Mutex};
11use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
12use tokio::sync::broadcast;
13use tokio::time::interval;
14use tokio_stream::wrappers::BroadcastStream;
15use uuid::Uuid;
16
17/// Configuration for the real-time dashboard
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct DashboardConfig {
20    /// Port for WebSocket server
21    pub websocket_port: u16,
22    /// Update frequency in milliseconds
23    pub update_frequency_ms: u64,
24    /// Maximum number of data points to keep in memory
25    pub max_data_points: usize,
26    /// Enable GPU monitoring
27    pub enable_gpu_monitoring: bool,
28    /// Enable memory profiling
29    pub enable_memory_profiling: bool,
30    /// Enable network traffic monitoring
31    pub enable_network_monitoring: bool,
32    /// Enable performance alerts
33    pub enable_performance_alerts: bool,
34    /// Alert thresholds
35    pub alert_thresholds: AlertThresholds,
36}
37
38impl Default for DashboardConfig {
39    fn default() -> Self {
40        Self {
41            websocket_port: 8080,
42            update_frequency_ms: 100,
43            max_data_points: 1000,
44            enable_gpu_monitoring: true,
45            enable_memory_profiling: true,
46            enable_network_monitoring: false,
47            enable_performance_alerts: true,
48            alert_thresholds: AlertThresholds::default(),
49        }
50    }
51}
52
53/// Alert threshold configuration
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct AlertThresholds {
56    /// Memory usage threshold (percentage)
57    pub memory_threshold: f64,
58    /// GPU utilization threshold (percentage)
59    pub gpu_utilization_threshold: f64,
60    /// Temperature threshold (Celsius)
61    pub temperature_threshold: f64,
62    /// Loss spike threshold
63    pub loss_spike_threshold: f64,
64    /// Gradient norm threshold
65    pub gradient_norm_threshold: f64,
66}
67
68impl Default for AlertThresholds {
69    fn default() -> Self {
70        Self {
71            memory_threshold: 90.0,
72            gpu_utilization_threshold: 95.0,
73            temperature_threshold: 80.0,
74            loss_spike_threshold: 2.0,
75            gradient_norm_threshold: 10.0,
76        }
77    }
78}
79
80/// Real-time metric data point
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct MetricDataPoint {
83    pub timestamp: u64,
84    pub value: f64,
85    pub label: String,
86    pub category: MetricCategory,
87}
88
89/// Categories of metrics for organization
90#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
91pub enum MetricCategory {
92    Training,
93    Memory,
94    GPU,
95    Network,
96    Performance,
97    Custom(String),
98}
99
100/// Dashboard alert
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct DashboardAlert {
103    pub id: String,
104    pub timestamp: u64,
105    pub severity: AlertSeverity,
106    pub category: MetricCategory,
107    pub title: String,
108    pub message: String,
109    pub value: Option<f64>,
110    pub threshold: Option<f64>,
111}
112
113/// Alert severity levels
114#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
115pub enum AlertSeverity {
116    Info,
117    Warning,
118    Error,
119    Critical,
120}
121
122/// WebSocket message types
123#[derive(Debug, Clone, Serialize, Deserialize)]
124#[serde(tag = "type")]
125pub enum WebSocketMessage {
126    MetricUpdate {
127        data: Vec<MetricDataPoint>,
128    },
129    Alert {
130        alert: DashboardAlert,
131    },
132    ConfigUpdate {
133        config: DashboardConfig,
134    },
135    SessionInfo {
136        session_id: String,
137        uptime: u64,
138    },
139    HistoricalData {
140        category: MetricCategory,
141        data: Vec<MetricDataPoint>,
142    },
143    SystemStats {
144        stats: SystemStats,
145    },
146    #[serde(untagged)]
147    Generic {
148        message_type: String,
149        data: serde_json::Value,
150        timestamp: u64,
151        session_id: String,
152    },
153}
154
155/// Anomaly detection result
156#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct AnomalyDetection {
158    pub timestamp: u64,
159    pub value: f64,
160    pub expected_range: (f64, f64),
161    pub anomaly_type: AnomalyType,
162    pub confidence_score: f64,
163    pub category: MetricCategory,
164    pub description: String,
165}
166
167/// Types of anomalies that can be detected
168#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
169pub enum AnomalyType {
170    Spike,
171    Drop,
172    GradualIncrease,
173    GradualDecrease,
174    Outlier,
175}
176
177/// Advanced dashboard visualization data
178#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct DashboardVisualizationData {
180    pub heatmap_data: HashMap<MetricCategory, HeatmapData>,
181    pub time_series_data: HashMap<MetricCategory, Vec<TimeSeriesPoint>>,
182    pub correlation_matrix: Vec<Vec<f64>>,
183    pub performance_distribution: HashMap<MetricCategory, HistogramData>,
184    pub generated_at: u64,
185    pub session_id: String,
186}
187
188/// Heatmap data for metric visualization
189#[derive(Debug, Clone, Serialize, Deserialize)]
190pub struct HeatmapData {
191    pub intensity: f64,
192    pub normalized_intensity: f64,
193    pub data_points: usize,
194    pub timestamp: u64,
195}
196
197/// Time series data point for trend visualization
198#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct TimeSeriesPoint {
200    pub timestamp: u64,
201    pub value: f64,
202    pub label: String,
203}
204
205/// Histogram data for performance distribution analysis
206#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct HistogramData {
208    pub bins: Vec<HistogramBin>,
209    pub max_frequency: usize,
210}
211
212/// Individual histogram bin
213#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct HistogramBin {
215    pub range_start: f64,
216    pub range_end: f64,
217    pub frequency: usize,
218}
219
220/// Performance prediction result
221#[derive(Debug, Clone, Serialize, Deserialize)]
222pub struct PerformancePrediction {
223    pub category: MetricCategory,
224    pub predicted_value: f64,
225    pub confidence_interval: (f64, f64),
226    pub trend_direction: TrendDirection,
227    pub trend_strength: f64,
228    pub prediction_horizon_hours: u64,
229    pub model_accuracy: f64,
230    pub generated_at: u64,
231    pub recommendations: Vec<String>,
232}
233
234/// Trend direction for predictions
235#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
236pub enum TrendDirection {
237    Increasing,
238    Decreasing,
239    Stable,
240}
241
242/// Dashboard theme configuration
243#[derive(Debug, Clone, Serialize, Deserialize)]
244pub struct DashboardTheme {
245    pub name: String,
246    pub primary_color: String,
247    pub secondary_color: String,
248    pub background_color: String,
249    pub text_color: String,
250    pub accent_color: String,
251    pub chart_colors: Vec<String>,
252    pub dark_mode: bool,
253    pub font_family: String,
254    pub border_radius: u8,
255}
256
257impl Default for DashboardTheme {
258    fn default() -> Self {
259        Self {
260            name: "Default".to_string(),
261            primary_color: "#3b82f6".to_string(),
262            secondary_color: "#64748b".to_string(),
263            background_color: "#ffffff".to_string(),
264            text_color: "#1f2937".to_string(),
265            accent_color: "#10b981".to_string(),
266            chart_colors: vec![
267                "#3b82f6".to_string(),
268                "#ef4444".to_string(),
269                "#10b981".to_string(),
270                "#f59e0b".to_string(),
271                "#8b5cf6".to_string(),
272            ],
273            dark_mode: false,
274            font_family: "Inter, sans-serif".to_string(),
275            border_radius: 8,
276        }
277    }
278}
279
280/// Export format options
281#[derive(Debug, Clone, Serialize, Deserialize)]
282pub enum ExportFormat {
283    JSON,
284    CSV,
285    MessagePack,
286}
287
288/// System statistics
289#[derive(Debug, Clone, Serialize, Deserialize)]
290pub struct SystemStats {
291    pub uptime: u64,
292    pub total_alerts: usize,
293    pub active_connections: usize,
294    pub data_points_collected: usize,
295    pub memory_usage_mb: f64,
296    pub cpu_usage_percent: f64,
297}
298
299/// Real-time dashboard server
300#[derive(Debug)]
301pub struct RealtimeDashboard {
302    config: Arc<Mutex<DashboardConfig>>,
303    session_id: String,
304    start_time: Instant,
305    metric_data: Arc<Mutex<HashMap<MetricCategory, VecDeque<MetricDataPoint>>>>,
306    alert_history: Arc<Mutex<VecDeque<DashboardAlert>>>,
307    websocket_sender: broadcast::Sender<WebSocketMessage>,
308    active_connections: Arc<Mutex<usize>>,
309    total_data_points: Arc<Mutex<usize>>,
310    is_running: Arc<Mutex<bool>>,
311}
312
313impl RealtimeDashboard {
314    /// Create new real-time dashboard
315    pub fn new(config: DashboardConfig) -> Self {
316        let (websocket_sender, _) = broadcast::channel(1000);
317
318        Self {
319            config: Arc::new(Mutex::new(config)),
320            session_id: Uuid::new_v4().to_string(),
321            start_time: Instant::now(),
322            metric_data: Arc::new(Mutex::new(HashMap::new())),
323            alert_history: Arc::new(Mutex::new(VecDeque::new())),
324            websocket_sender,
325            active_connections: Arc::new(Mutex::new(0)),
326            total_data_points: Arc::new(Mutex::new(0)),
327            is_running: Arc::new(Mutex::new(false)),
328        }
329    }
330
331    /// Start the dashboard server
332    pub async fn start(&self) -> Result<()> {
333        {
334            let mut running = self
335                .is_running
336                .lock()
337                .map_err(|_| anyhow::anyhow!("Failed to acquire running state lock"))?;
338            if *running {
339                return Ok(());
340            }
341            *running = true;
342        }
343
344        // Start periodic data collection
345        self.start_data_collection().await?;
346
347        // Start periodic system stats updates
348        self.start_system_stats_updates().await?;
349
350        // Start alert monitoring
351        self.start_alert_monitoring().await?;
352
353        Ok(())
354    }
355
356    /// Stop the dashboard server
357    pub fn stop(&self) {
358        if let Ok(mut running) = self.is_running.lock() {
359            *running = false;
360        }
361    }
362
363    /// Add a metric data point
364    pub fn add_metric(&self, category: MetricCategory, label: String, value: f64) -> Result<()> {
365        let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis() as u64;
366
367        let data_point = MetricDataPoint {
368            timestamp,
369            value,
370            label,
371            category: category.clone(),
372        };
373
374        // Add to metric data with size limit
375        {
376            let mut data = self
377                .metric_data
378                .lock()
379                .map_err(|_| anyhow::anyhow!("Failed to acquire metric data lock"))?;
380            let category_data = data.entry(category.clone()).or_insert_with(VecDeque::new);
381
382            category_data.push_back(data_point.clone());
383
384            let max_points = self
385                .config
386                .lock()
387                .map_err(|_| anyhow::anyhow!("Failed to acquire config lock"))?
388                .max_data_points;
389            while category_data.len() > max_points {
390                category_data.pop_front();
391            }
392        }
393
394        // Increment total data points counter
395        {
396            if let Ok(mut total) = self.total_data_points.lock() {
397                *total += 1;
398            }
399        }
400
401        // Broadcast update to WebSocket clients
402        let message = WebSocketMessage::MetricUpdate {
403            data: vec![data_point],
404        };
405
406        let _ = self.websocket_sender.send(message);
407
408        // Check for alerts
409        self.check_for_alerts(&category, value);
410
411        Ok(())
412    }
413
414    /// Add multiple metrics at once
415    pub fn add_metrics(&self, metrics: Vec<(MetricCategory, String, f64)>) -> Result<()> {
416        let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis() as u64;
417
418        let mut data_points = Vec::new();
419
420        // Process all metrics
421        for (category, label, value) in metrics {
422            let data_point = MetricDataPoint {
423                timestamp,
424                value,
425                label,
426                category: category.clone(),
427            };
428
429            // Add to metric data
430            {
431                let mut data = self.metric_data.lock().unwrap();
432                let category_data = data.entry(category.clone()).or_default();
433                category_data.push_back(data_point.clone());
434
435                let max_points = self.config.lock().unwrap().max_data_points;
436                while category_data.len() > max_points {
437                    category_data.pop_front();
438                }
439            }
440
441            data_points.push(data_point);
442
443            // Check for alerts
444            self.check_for_alerts(&category, value);
445        }
446
447        // Update total counter
448        {
449            let mut total = self.total_data_points.lock().unwrap();
450            *total += data_points.len();
451        }
452
453        // Broadcast batch update
454        let message = WebSocketMessage::MetricUpdate { data: data_points };
455        let _ = self.websocket_sender.send(message);
456
457        Ok(())
458    }
459
460    /// Create an alert
461    pub fn create_alert(
462        &self,
463        severity: AlertSeverity,
464        category: MetricCategory,
465        title: String,
466        message: String,
467        value: Option<f64>,
468        threshold: Option<f64>,
469    ) -> Result<()> {
470        let alert = DashboardAlert {
471            id: Uuid::new_v4().to_string(),
472            timestamp: SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis() as u64,
473            severity,
474            category,
475            title,
476            message,
477            value,
478            threshold,
479        };
480
481        // Add to alert history
482        {
483            let mut history = self.alert_history.lock().unwrap();
484            history.push_back(alert.clone());
485
486            // Keep only last 100 alerts
487            while history.len() > 100 {
488                history.pop_front();
489            }
490        }
491
492        // Broadcast alert
493        let message = WebSocketMessage::Alert { alert };
494        let _ = self.websocket_sender.send(message);
495
496        Ok(())
497    }
498
499    /// Get historical data for a category
500    pub fn get_historical_data(&self, category: &MetricCategory) -> Vec<MetricDataPoint> {
501        let data = self.metric_data.lock().unwrap();
502        data.get(category)
503            .map(|deque| deque.iter().cloned().collect())
504            .unwrap_or_default()
505    }
506
507    /// Get current system stats
508    pub fn get_system_stats(&self) -> SystemStats {
509        let uptime = self.start_time.elapsed().as_secs();
510        let total_alerts = self.alert_history.lock().unwrap().len();
511        let active_connections = *self.active_connections.lock().unwrap();
512        let data_points_collected = *self.total_data_points.lock().unwrap();
513
514        // Simple memory and CPU usage estimation
515        let memory_usage_mb = self.estimate_memory_usage();
516        let cpu_usage_percent = self.estimate_cpu_usage();
517
518        SystemStats {
519            uptime,
520            total_alerts,
521            active_connections,
522            data_points_collected,
523            memory_usage_mb,
524            cpu_usage_percent,
525        }
526    }
527
528    /// Subscribe to WebSocket messages
529    pub fn subscribe(&self) -> BroadcastStream<WebSocketMessage> {
530        // Increment connection counter
531        {
532            let mut connections = self.active_connections.lock().unwrap();
533            *connections += 1;
534        }
535
536        BroadcastStream::new(self.websocket_sender.subscribe())
537    }
538
539    /// Update dashboard configuration
540    pub fn update_config(&self, new_config: DashboardConfig) -> Result<()> {
541        {
542            let mut config = self.config.lock().unwrap();
543            *config = new_config.clone();
544        }
545
546        // Broadcast configuration update
547        let message = WebSocketMessage::ConfigUpdate { config: new_config };
548        let _ = self.websocket_sender.send(message);
549
550        Ok(())
551    }
552
553    /// Get current configuration
554    pub fn get_config(&self) -> DashboardConfig {
555        self.config.lock().unwrap().clone()
556    }
557
558    /// Start periodic data collection
559    async fn start_data_collection(&self) -> Result<()> {
560        let config = self.config.clone();
561        let _metric_data = self.metric_data.clone();
562        let websocket_sender = self.websocket_sender.clone();
563        let is_running = self.is_running.clone();
564
565        tokio::spawn(async move {
566            let mut interval = interval(Duration::from_millis(
567                config.lock().unwrap().update_frequency_ms,
568            ));
569
570            while *is_running.lock().unwrap() {
571                interval.tick().await;
572
573                // Collect system metrics periodically
574                if let Ok(metrics) = Self::collect_system_metrics(&config).await {
575                    let message = WebSocketMessage::MetricUpdate { data: metrics };
576                    let _ = websocket_sender.send(message);
577                }
578            }
579        });
580
581        Ok(())
582    }
583
584    /// Start system stats updates
585    async fn start_system_stats_updates(&self) -> Result<()> {
586        let websocket_sender = self.websocket_sender.clone();
587        let start_time = self.start_time;
588        let alert_history = self.alert_history.clone();
589        let active_connections = self.active_connections.clone();
590        let total_data_points = self.total_data_points.clone();
591        let is_running = self.is_running.clone();
592
593        tokio::spawn(async move {
594            let mut interval = interval(Duration::from_secs(5)); // Update every 5 seconds
595
596            while *is_running.lock().unwrap() {
597                interval.tick().await;
598
599                let stats = SystemStats {
600                    uptime: start_time.elapsed().as_secs(),
601                    total_alerts: alert_history.lock().unwrap().len(),
602                    active_connections: *active_connections.lock().unwrap(),
603                    data_points_collected: *total_data_points.lock().unwrap(),
604                    memory_usage_mb: 0.0,   // Placeholder
605                    cpu_usage_percent: 0.0, // Placeholder
606                };
607
608                let message = WebSocketMessage::SystemStats { stats };
609                let _ = websocket_sender.send(message);
610            }
611        });
612
613        Ok(())
614    }
615
616    /// Start alert monitoring
617    async fn start_alert_monitoring(&self) -> Result<()> {
618        let config = self.config.clone();
619        let metric_data = self.metric_data.clone();
620        let is_running = self.is_running.clone();
621
622        tokio::spawn(async move {
623            let mut interval = interval(Duration::from_secs(1));
624
625            while *is_running.lock().unwrap() {
626                interval.tick().await;
627
628                // Monitor for threshold breaches and create alerts
629                Self::check_threshold_breaches(&config, &metric_data).await;
630            }
631        });
632
633        Ok(())
634    }
635
636    /// Collect system metrics
637    async fn collect_system_metrics(
638        config: &Arc<Mutex<DashboardConfig>>,
639    ) -> Result<Vec<MetricDataPoint>> {
640        let mut metrics = Vec::new();
641        let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis() as u64;
642
643        let cfg = config.lock().unwrap();
644
645        if cfg.enable_memory_profiling {
646            // Simulate memory metrics
647            let memory_usage = Self::get_memory_usage();
648            metrics.push(MetricDataPoint {
649                timestamp,
650                value: memory_usage,
651                label: "Memory Usage".to_string(),
652                category: MetricCategory::Memory,
653            });
654        }
655
656        if cfg.enable_gpu_monitoring {
657            // Simulate GPU metrics
658            let gpu_utilization = Self::get_gpu_utilization();
659            metrics.push(MetricDataPoint {
660                timestamp,
661                value: gpu_utilization,
662                label: "GPU Utilization".to_string(),
663                category: MetricCategory::GPU,
664            });
665
666            let gpu_memory = Self::get_gpu_memory_usage();
667            metrics.push(MetricDataPoint {
668                timestamp,
669                value: gpu_memory,
670                label: "GPU Memory".to_string(),
671                category: MetricCategory::GPU,
672            });
673        }
674
675        Ok(metrics)
676    }
677
678    /// Check for alerts based on new metric value
679    fn check_for_alerts(&self, category: &MetricCategory, value: f64) {
680        let config = self.config.lock().unwrap();
681        let thresholds = &config.alert_thresholds;
682
683        match category {
684            MetricCategory::Memory => {
685                if value > thresholds.memory_threshold {
686                    let _ = self.create_alert(
687                        AlertSeverity::Warning,
688                        category.clone(),
689                        "High Memory Usage".to_string(),
690                        format!(
691                            "Memory usage is {:.1}% (threshold: {:.1}%)",
692                            value, thresholds.memory_threshold
693                        ),
694                        Some(value),
695                        Some(thresholds.memory_threshold),
696                    );
697                }
698            },
699            MetricCategory::GPU => {
700                if value > thresholds.gpu_utilization_threshold {
701                    let _ = self.create_alert(
702                        AlertSeverity::Warning,
703                        category.clone(),
704                        "High GPU Utilization".to_string(),
705                        format!(
706                            "GPU utilization is {:.1}% (threshold: {:.1}%)",
707                            value, thresholds.gpu_utilization_threshold
708                        ),
709                        Some(value),
710                        Some(thresholds.gpu_utilization_threshold),
711                    );
712                }
713            },
714            MetricCategory::Training => {
715                if value > thresholds.loss_spike_threshold {
716                    let _ = self.create_alert(
717                        AlertSeverity::Error,
718                        category.clone(),
719                        "Training Loss Spike".to_string(),
720                        format!(
721                            "Loss spike detected: {:.4} (threshold: {:.4})",
722                            value, thresholds.loss_spike_threshold
723                        ),
724                        Some(value),
725                        Some(thresholds.loss_spike_threshold),
726                    );
727                }
728            },
729            _ => {},
730        }
731    }
732
733    /// Check for threshold breaches across all metrics
734    async fn check_threshold_breaches(
735        config: &Arc<Mutex<DashboardConfig>>,
736        metric_data: &Arc<Mutex<HashMap<MetricCategory, VecDeque<MetricDataPoint>>>>,
737    ) {
738        let _config = config.lock().unwrap();
739        let _data = metric_data.lock().unwrap();
740
741        // Implementation would check for patterns, sustained threshold breaches, etc.
742        // This is a placeholder for more complex alert logic
743    }
744
745    /// Simulate getting memory usage
746    fn get_memory_usage() -> f64 {
747        // Placeholder - in real implementation would use system APIs
748        50.0 + (thread_rng().random::<f64>() * 40.0)
749    }
750
751    /// Simulate getting GPU utilization
752    fn get_gpu_utilization() -> f64 {
753        // Placeholder - in real implementation would use NVIDIA ML, ROCm, etc.
754        30.0 + (thread_rng().random::<f64>() * 60.0)
755    }
756
757    /// Simulate getting GPU memory usage
758    fn get_gpu_memory_usage() -> f64 {
759        // Placeholder - in real implementation would use GPU APIs
760        40.0 + (thread_rng().random::<f64>() * 50.0)
761    }
762
763    /// Estimate memory usage of dashboard
764    fn estimate_memory_usage(&self) -> f64 {
765        let data = self.metric_data.lock().unwrap();
766        let mut total_points = 0;
767
768        for deque in data.values() {
769            total_points += deque.len();
770        }
771
772        // Rough estimate: ~100 bytes per data point
773        (total_points * 100) as f64 / (1024.0 * 1024.0)
774    }
775
776    /// Estimate CPU usage
777    fn estimate_cpu_usage(&self) -> f64 {
778        // Simple placeholder - in real implementation would use system APIs
779        5.0 + (thread_rng().random::<f64>() * 10.0)
780    }
781
782    /// AI-powered anomaly detection for metric patterns
783    pub async fn detect_metric_anomalies(
784        &self,
785        category: &MetricCategory,
786    ) -> Result<Vec<AnomalyDetection>> {
787        let data = self.get_historical_data(category);
788        let mut anomalies = Vec::new();
789
790        if data.len() < 10 {
791            return Ok(anomalies); // Need sufficient data for anomaly detection
792        }
793
794        // Calculate statistical thresholds
795        let values: Vec<f64> = data.iter().map(|d| d.value).collect();
796        let mean = values.iter().sum::<f64>() / values.len() as f64;
797        let variance = values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / values.len() as f64;
798        let std_dev = variance.sqrt();
799
800        // Z-score based anomaly detection
801        let z_threshold = 2.0; // 2 standard deviations
802        for point in data.iter() {
803            let z_score = (point.value - mean).abs() / std_dev;
804            if z_score > z_threshold {
805                let anomaly_type =
806                    if point.value > mean { AnomalyType::Spike } else { AnomalyType::Drop };
807
808                anomalies.push(AnomalyDetection {
809                    timestamp: point.timestamp,
810                    value: point.value,
811                    expected_range: (mean - std_dev, mean + std_dev),
812                    anomaly_type,
813                    confidence_score: (z_score - z_threshold) / z_threshold,
814                    category: category.clone(),
815                    description: format!(
816                        "Detected {} in {} metrics: value {} (Z-score: {:.2})",
817                        match anomaly_type {
818                            AnomalyType::Spike => "spike",
819                            AnomalyType::Drop => "drop",
820                            _ => "anomaly",
821                        },
822                        match category {
823                            MetricCategory::Training => "training",
824                            MetricCategory::Memory => "memory",
825                            MetricCategory::GPU => "GPU",
826                            MetricCategory::Network => "network",
827                            MetricCategory::Performance => "performance",
828                            MetricCategory::Custom(name) => name,
829                        },
830                        point.value,
831                        z_score
832                    ),
833                });
834            }
835        }
836
837        // Advanced pattern detection - look for gradual trends
838        if data.len() >= 20 {
839            let recent_window = &data[data.len() - 10..];
840            let earlier_window = &data[data.len() - 20..data.len() - 10];
841
842            let recent_avg =
843                recent_window.iter().map(|d| d.value).sum::<f64>() / recent_window.len() as f64;
844            let earlier_avg =
845                earlier_window.iter().map(|d| d.value).sum::<f64>() / earlier_window.len() as f64;
846
847            let trend_change = (recent_avg - earlier_avg) / earlier_avg;
848
849            if trend_change.abs() > 0.3 {
850                // 30% change
851                anomalies.push(AnomalyDetection {
852                    timestamp: recent_window.last().unwrap().timestamp,
853                    value: recent_avg,
854                    expected_range: (earlier_avg * 0.9, earlier_avg * 1.1),
855                    anomaly_type: if trend_change > 0.0 {
856                        AnomalyType::GradualIncrease
857                    } else {
858                        AnomalyType::GradualDecrease
859                    },
860                    confidence_score: trend_change.abs(),
861                    category: category.clone(),
862                    description: format!(
863                        "Detected gradual {} trend: {:.1}% change over recent measurements",
864                        if trend_change > 0.0 { "increase" } else { "decrease" },
865                        trend_change.abs() * 100.0
866                    ),
867                });
868            }
869        }
870
871        Ok(anomalies)
872    }
873
874    /// Generate advanced visualization data for modern dashboard components
875    pub fn generate_advanced_visualizations(&self) -> Result<DashboardVisualizationData> {
876        let mut heatmap_data = HashMap::new();
877        let mut time_series_data = HashMap::new();
878        let mut correlation_matrix = Vec::new();
879        let mut performance_distribution = HashMap::new();
880
881        // Generate heatmap data for different metric categories
882        for (category, data) in self.metric_data.lock().unwrap().iter() {
883            if data.len() >= 10 {
884                let recent_data: Vec<f64> = data.iter().rev().take(10).map(|d| d.value).collect();
885                let avg_value = recent_data.iter().sum::<f64>() / recent_data.len() as f64;
886
887                heatmap_data.insert(
888                    category.clone(),
889                    HeatmapData {
890                        intensity: avg_value,
891                        normalized_intensity: (avg_value / (avg_value + 1.0)).min(1.0), // Normalize to 0-1
892                        data_points: recent_data.len(),
893                        timestamp: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(),
894                    },
895                );
896
897                // Time series data for trend visualization
898                let time_series: Vec<TimeSeriesPoint> = data
899                    .iter()
900                    .map(|d| TimeSeriesPoint {
901                        timestamp: d.timestamp,
902                        value: d.value,
903                        label: d.label.clone(),
904                    })
905                    .collect();
906
907                time_series_data.insert(category.clone(), time_series);
908
909                // Performance distribution data
910                let values: Vec<f64> = data.iter().map(|d| d.value).collect();
911                let histogram = self.create_histogram(&values, 10);
912                performance_distribution.insert(category.clone(), histogram);
913            }
914        }
915
916        // Generate correlation matrix for different metrics
917        let categories: Vec<&MetricCategory> = heatmap_data.keys().collect();
918        for (i, cat1) in categories.iter().enumerate() {
919            let mut row = Vec::new();
920            for (j, cat2) in categories.iter().enumerate() {
921                if i == j {
922                    row.push(1.0); // Perfect correlation with itself
923                } else {
924                    // Calculate correlation coefficient (simplified)
925                    let corr = self.calculate_correlation_coefficient(cat1, cat2);
926                    row.push(corr);
927                }
928            }
929            correlation_matrix.push(row);
930        }
931
932        Ok(DashboardVisualizationData {
933            heatmap_data,
934            time_series_data,
935            correlation_matrix,
936            performance_distribution,
937            generated_at: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(),
938            session_id: self.session_id.clone(),
939        })
940    }
941
942    /// AI-powered performance prediction based on historical trends
943    pub async fn predict_performance_trends(
944        &self,
945        category: &MetricCategory,
946        hours_ahead: u64,
947    ) -> Result<PerformancePrediction> {
948        let data = self.get_historical_data(category);
949
950        if data.len() < 20 {
951            return Err(anyhow::anyhow!(
952                "Insufficient data for prediction (need at least 20 points)"
953            ));
954        }
955
956        let values: Vec<f64> = data.iter().map(|d| d.value).collect();
957        let timestamps: Vec<u64> = data.iter().map(|d| d.timestamp).collect();
958
959        // Simple linear regression for trend prediction
960        let n = values.len() as f64;
961        let sum_x = timestamps.iter().sum::<u64>() as f64;
962        let sum_y = values.iter().sum::<f64>();
963        let sum_xy = timestamps.iter().zip(&values).map(|(x, y)| *x as f64 * y).sum::<f64>();
964        let sum_x2 = timestamps.iter().map(|x| (*x as f64).powi(2)).sum::<f64>();
965
966        let slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x.powi(2));
967        let intercept = (sum_y - slope * sum_x) / n;
968
969        // Generate predictions
970        let current_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
971        let prediction_time = current_time + (hours_ahead * 3600);
972        let predicted_value = slope * prediction_time as f64 + intercept;
973
974        // Calculate confidence intervals (simplified)
975        let mean = sum_y / n;
976        let variance = values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / n;
977        let std_error = (variance / n).sqrt();
978        let confidence_interval = std_error * 1.96; // 95% confidence
979
980        // Analyze trend direction and strength
981        let trend_strength = slope.abs() / mean.abs();
982        let trend_direction = if slope > 0.01 {
983            TrendDirection::Increasing
984        } else if slope < -0.01 {
985            TrendDirection::Decreasing
986        } else {
987            TrendDirection::Stable
988        };
989
990        Ok(PerformancePrediction {
991            category: category.clone(),
992            predicted_value,
993            confidence_interval: (
994                predicted_value - confidence_interval,
995                predicted_value + confidence_interval,
996            ),
997            trend_direction,
998            trend_strength,
999            prediction_horizon_hours: hours_ahead,
1000            model_accuracy: 1.0 - (std_error / mean.abs()).min(1.0), // Simplified accuracy
1001            generated_at: current_time,
1002            recommendations: self.generate_performance_recommendations(
1003                &trend_direction,
1004                trend_strength,
1005                predicted_value,
1006            ),
1007        })
1008    }
1009
1010    /// Advanced dashboard theme and customization support
1011    pub fn apply_dashboard_theme(&self, theme: DashboardTheme) -> Result<()> {
1012        // This would typically update UI styling, but we'll store theme preferences
1013        let theme_message = WebSocketMessage::Generic {
1014            message_type: "theme_update".to_string(),
1015            data: serde_json::to_value(&theme)?,
1016            timestamp: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(),
1017            session_id: self.session_id.clone(),
1018        };
1019
1020        if self.websocket_sender.send(theme_message).is_err() {
1021            // No active subscribers, but that's okay
1022        }
1023
1024        Ok(())
1025    }
1026
1027    /// Export dashboard data in various formats
1028    pub async fn export_dashboard_data(
1029        &self,
1030        format: ExportFormat,
1031        time_range: Option<(u64, u64)>,
1032    ) -> Result<Vec<u8>> {
1033        let data = if let Some((start, end)) = time_range {
1034            self.get_filtered_data(start, end)
1035        } else {
1036            self.get_all_data()
1037        };
1038
1039        match format {
1040            ExportFormat::JSON => {
1041                let json_data = serde_json::to_string_pretty(&data)?;
1042                Ok(json_data.into_bytes())
1043            },
1044            ExportFormat::CSV => {
1045                let mut csv_data = String::from("timestamp,category,label,value\n");
1046                for (category, points) in data {
1047                    for point in points {
1048                        csv_data.push_str(&format!(
1049                            "{},{:?},{},{}\n",
1050                            point.timestamp, category, point.label, point.value
1051                        ));
1052                    }
1053                }
1054                Ok(csv_data.into_bytes())
1055            },
1056            ExportFormat::MessagePack => {
1057                // Would use rmp_serde for MessagePack serialization
1058                // For now, return JSON as fallback
1059                let json_data = serde_json::to_string(&data)?;
1060                Ok(json_data.into_bytes())
1061            },
1062        }
1063    }
1064
1065    // Helper methods for advanced features
1066
1067    fn create_histogram(&self, values: &[f64], bins: usize) -> HistogramData {
1068        if values.is_empty() {
1069            return HistogramData {
1070                bins: Vec::new(),
1071                max_frequency: 0,
1072            };
1073        }
1074
1075        let min_val = values.iter().fold(f64::INFINITY, |a, &b| a.min(b));
1076        let max_val = values.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
1077        let bin_width = (max_val - min_val) / bins as f64;
1078
1079        let mut histogram_bins = vec![0; bins];
1080
1081        for &value in values {
1082            let bin_idx = ((value - min_val) / bin_width).floor() as usize;
1083            let bin_idx = bin_idx.min(bins - 1); // Ensure we don't go out of bounds
1084            histogram_bins[bin_idx] += 1;
1085        }
1086
1087        let max_frequency = *histogram_bins.iter().max().unwrap_or(&0);
1088
1089        let bins_data: Vec<HistogramBin> = histogram_bins
1090            .into_iter()
1091            .enumerate()
1092            .map(|(i, count)| HistogramBin {
1093                range_start: min_val + i as f64 * bin_width,
1094                range_end: min_val + (i + 1) as f64 * bin_width,
1095                frequency: count,
1096            })
1097            .collect();
1098
1099        HistogramData {
1100            bins: bins_data,
1101            max_frequency,
1102        }
1103    }
1104
1105    fn calculate_correlation_coefficient(
1106        &self,
1107        cat1: &MetricCategory,
1108        cat2: &MetricCategory,
1109    ) -> f64 {
1110        let data = self.metric_data.lock().unwrap();
1111
1112        let data1 = match data.get(cat1) {
1113            Some(d) => d,
1114            None => return 0.0,
1115        };
1116
1117        let data2 = match data.get(cat2) {
1118            Some(d) => d,
1119            None => return 0.0,
1120        };
1121
1122        if data1.len() < 2 || data2.len() < 2 {
1123            return 0.0;
1124        }
1125
1126        // Take the minimum length to align the datasets
1127        let min_len = data1.len().min(data2.len()).min(50); // Use at most 50 points for performance
1128        let values1: Vec<f64> = data1.iter().rev().take(min_len).map(|d| d.value).collect();
1129        let values2: Vec<f64> = data2.iter().rev().take(min_len).map(|d| d.value).collect();
1130
1131        // Calculate Pearson correlation coefficient
1132        let n = values1.len() as f64;
1133        let mean1 = values1.iter().sum::<f64>() / n;
1134        let mean2 = values2.iter().sum::<f64>() / n;
1135
1136        let covariance = values1
1137            .iter()
1138            .zip(&values2)
1139            .map(|(v1, v2)| (v1 - mean1) * (v2 - mean2))
1140            .sum::<f64>()
1141            / n;
1142
1143        let std1 = (values1.iter().map(|v| (v - mean1).powi(2)).sum::<f64>() / n).sqrt();
1144        let std2 = (values2.iter().map(|v| (v - mean2).powi(2)).sum::<f64>() / n).sqrt();
1145
1146        if std1 == 0.0 || std2 == 0.0 {
1147            0.0
1148        } else {
1149            covariance / (std1 * std2)
1150        }
1151    }
1152
1153    fn generate_performance_recommendations(
1154        &self,
1155        trend: &TrendDirection,
1156        strength: f64,
1157        predicted_value: f64,
1158    ) -> Vec<String> {
1159        let mut recommendations = Vec::new();
1160
1161        match trend {
1162            TrendDirection::Increasing => {
1163                if strength > 0.1 {
1164                    recommendations.push(
1165                        "Monitor for potential resource exhaustion due to increasing trend"
1166                            .to_string(),
1167                    );
1168                    recommendations.push("Consider scaling resources proactively".to_string());
1169                }
1170                if predicted_value > 90.0 {
1171                    recommendations.push(
1172                        "Critical threshold approaching - immediate action recommended".to_string(),
1173                    );
1174                }
1175            },
1176            TrendDirection::Decreasing => {
1177                if strength > 0.05 {
1178                    recommendations
1179                        .push("Investigate potential performance degradation".to_string());
1180                    recommendations.push("Check for resource leaks or inefficiencies".to_string());
1181                }
1182            },
1183            TrendDirection::Stable => {
1184                recommendations
1185                    .push("Performance trend is stable - continue monitoring".to_string());
1186            },
1187        }
1188
1189        if recommendations.is_empty() {
1190            recommendations.push("No specific recommendations at this time".to_string());
1191        }
1192
1193        recommendations
1194    }
1195
1196    fn get_filtered_data(
1197        &self,
1198        start: u64,
1199        end: u64,
1200    ) -> HashMap<MetricCategory, VecDeque<MetricDataPoint>> {
1201        let data = self.metric_data.lock().unwrap();
1202        let mut filtered_data = HashMap::new();
1203
1204        for (category, points) in data.iter() {
1205            let filtered_points: VecDeque<MetricDataPoint> = points
1206                .iter()
1207                .filter(|p| p.timestamp >= start && p.timestamp <= end)
1208                .cloned()
1209                .collect();
1210
1211            if !filtered_points.is_empty() {
1212                filtered_data.insert(category.clone(), filtered_points);
1213            }
1214        }
1215
1216        filtered_data
1217    }
1218
1219    fn get_all_data(&self) -> HashMap<MetricCategory, VecDeque<MetricDataPoint>> {
1220        self.metric_data.lock().unwrap().clone()
1221    }
1222}
1223
1224/// Dashboard builder for easier configuration
1225#[derive(Debug, Default)]
1226pub struct DashboardBuilder {
1227    config: DashboardConfig,
1228}
1229
1230impl DashboardBuilder {
1231    /// Create new dashboard builder
1232    pub fn new() -> Self {
1233        Self::default()
1234    }
1235
1236    /// Set WebSocket port
1237    pub fn port(mut self, port: u16) -> Self {
1238        self.config.websocket_port = port;
1239        self
1240    }
1241
1242    /// Set update frequency
1243    pub fn update_frequency(mut self, frequency_ms: u64) -> Self {
1244        self.config.update_frequency_ms = frequency_ms;
1245        self
1246    }
1247
1248    /// Set maximum data points
1249    pub fn max_data_points(mut self, max_points: usize) -> Self {
1250        self.config.max_data_points = max_points;
1251        self
1252    }
1253
1254    /// Enable/disable GPU monitoring
1255    pub fn gpu_monitoring(mut self, enabled: bool) -> Self {
1256        self.config.enable_gpu_monitoring = enabled;
1257        self
1258    }
1259
1260    /// Enable/disable memory profiling
1261    pub fn memory_profiling(mut self, enabled: bool) -> Self {
1262        self.config.enable_memory_profiling = enabled;
1263        self
1264    }
1265
1266    /// Set alert thresholds
1267    pub fn alert_thresholds(mut self, thresholds: AlertThresholds) -> Self {
1268        self.config.alert_thresholds = thresholds;
1269        self
1270    }
1271
1272    /// Build the dashboard
1273    pub fn build(self) -> RealtimeDashboard {
1274        RealtimeDashboard::new(self.config)
1275    }
1276}
1277
1278#[cfg(test)]
1279mod tests {
1280    use super::*;
1281    use futures::StreamExt;
1282    use std::time::Duration;
1283
1284    #[tokio::test]
1285    async fn test_dashboard_creation() {
1286        let dashboard = DashboardBuilder::new()
1287            .port(8081)
1288            .update_frequency(50)
1289            .max_data_points(500)
1290            .build();
1291
1292        assert_eq!(dashboard.get_config().websocket_port, 8081);
1293        assert_eq!(dashboard.get_config().update_frequency_ms, 50);
1294        assert_eq!(dashboard.get_config().max_data_points, 500);
1295    }
1296
1297    #[tokio::test]
1298    async fn test_metric_addition() {
1299        let dashboard = DashboardBuilder::new().build();
1300
1301        let result = dashboard.add_metric(MetricCategory::Training, "loss".to_string(), 0.5);
1302
1303        assert!(result.is_ok());
1304
1305        let historical_data = dashboard.get_historical_data(&MetricCategory::Training);
1306        assert_eq!(historical_data.len(), 1);
1307        assert_eq!(historical_data[0].value, 0.5);
1308        assert_eq!(historical_data[0].label, "loss");
1309    }
1310
1311    #[tokio::test]
1312    async fn test_batch_metrics() {
1313        let dashboard = DashboardBuilder::new().build();
1314
1315        let metrics = vec![
1316            (MetricCategory::Training, "loss".to_string(), 0.5),
1317            (MetricCategory::Training, "accuracy".to_string(), 0.9),
1318            (MetricCategory::GPU, "utilization".to_string(), 75.0),
1319        ];
1320
1321        let result = dashboard.add_metrics(metrics);
1322        assert!(result.is_ok());
1323
1324        let training_data = dashboard.get_historical_data(&MetricCategory::Training);
1325        assert_eq!(training_data.len(), 2);
1326
1327        let gpu_data = dashboard.get_historical_data(&MetricCategory::GPU);
1328        assert_eq!(gpu_data.len(), 1);
1329    }
1330
1331    #[tokio::test]
1332    async fn test_alert_creation() {
1333        let dashboard = DashboardBuilder::new().build();
1334
1335        let result = dashboard.create_alert(
1336            AlertSeverity::Warning,
1337            MetricCategory::Memory,
1338            "High Memory".to_string(),
1339            "Memory usage is high".to_string(),
1340            Some(95.0),
1341            Some(90.0),
1342        );
1343
1344        assert!(result.is_ok());
1345
1346        let history = dashboard.alert_history.lock().unwrap();
1347        assert_eq!(history.len(), 1);
1348        assert_eq!(history[0].title, "High Memory");
1349    }
1350
1351    #[tokio::test]
1352    async fn test_websocket_subscription() {
1353        let dashboard = DashboardBuilder::new().build();
1354
1355        let mut stream = dashboard.subscribe();
1356
1357        // Start the dashboard
1358        let dashboard_clone = Arc::new(dashboard);
1359        let dashboard_for_task = dashboard_clone.clone();
1360
1361        tokio::spawn(async move {
1362            let _ = dashboard_for_task.start().await;
1363        });
1364
1365        // Add a metric to trigger a message
1366        let _ =
1367            dashboard_clone.add_metric(MetricCategory::Training, "test_metric".to_string(), 42.0);
1368
1369        // Try to receive a message (with timeout)
1370        let message_result = tokio::time::timeout(Duration::from_millis(100), stream.next()).await;
1371
1372        dashboard_clone.stop();
1373
1374        // Check if we received a message
1375        assert!(message_result.is_ok());
1376        if let Ok(Some(Ok(message))) = message_result {
1377            match message {
1378                WebSocketMessage::MetricUpdate { data } => {
1379                    assert!(!data.is_empty());
1380                    assert_eq!(data[0].value, 42.0);
1381                    assert_eq!(data[0].label, "test_metric");
1382                },
1383                _ => panic!("Expected MetricUpdate message"),
1384            }
1385        }
1386    }
1387
1388    #[tokio::test]
1389    async fn test_system_stats() {
1390        let dashboard = DashboardBuilder::new().build();
1391
1392        // Add some data
1393        let _ = dashboard.add_metric(MetricCategory::Training, "loss".to_string(), 0.5);
1394        let _ = dashboard.create_alert(
1395            AlertSeverity::Info,
1396            MetricCategory::Training,
1397            "Test Alert".to_string(),
1398            "Test message".to_string(),
1399            None,
1400            None,
1401        );
1402
1403        let stats = dashboard.get_system_stats();
1404
1405        assert_eq!(stats.data_points_collected, 1);
1406        assert_eq!(stats.total_alerts, 1);
1407        // uptime is a Duration which is always >= 0
1408    }
1409
1410    #[tokio::test]
1411    async fn test_data_point_limit() {
1412        let dashboard = DashboardBuilder::new().max_data_points(2).build();
1413
1414        // Add 3 data points
1415        let _ = dashboard.add_metric(MetricCategory::Training, "metric1".to_string(), 1.0);
1416        let _ = dashboard.add_metric(MetricCategory::Training, "metric2".to_string(), 2.0);
1417        let _ = dashboard.add_metric(MetricCategory::Training, "metric3".to_string(), 3.0);
1418
1419        let data = dashboard.get_historical_data(&MetricCategory::Training);
1420
1421        // Should only keep the last 2 data points
1422        assert_eq!(data.len(), 2);
1423        assert_eq!(data[0].value, 2.0); // First of the remaining two
1424        assert_eq!(data[1].value, 3.0); // Last added
1425    }
1426}