1use anyhow::Result;
7use scirs2_core::random::*; use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, VecDeque};
10use std::sync::{Arc, Mutex};
11use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
12use tokio::sync::broadcast;
13use tokio::time::interval;
14use tokio_stream::wrappers::BroadcastStream;
15use uuid::Uuid;
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct DashboardConfig {
20 pub websocket_port: u16,
22 pub update_frequency_ms: u64,
24 pub max_data_points: usize,
26 pub enable_gpu_monitoring: bool,
28 pub enable_memory_profiling: bool,
30 pub enable_network_monitoring: bool,
32 pub enable_performance_alerts: bool,
34 pub alert_thresholds: AlertThresholds,
36}
37
38impl Default for DashboardConfig {
39 fn default() -> Self {
40 Self {
41 websocket_port: 8080,
42 update_frequency_ms: 100,
43 max_data_points: 1000,
44 enable_gpu_monitoring: true,
45 enable_memory_profiling: true,
46 enable_network_monitoring: false,
47 enable_performance_alerts: true,
48 alert_thresholds: AlertThresholds::default(),
49 }
50 }
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct AlertThresholds {
56 pub memory_threshold: f64,
58 pub gpu_utilization_threshold: f64,
60 pub temperature_threshold: f64,
62 pub loss_spike_threshold: f64,
64 pub gradient_norm_threshold: f64,
66}
67
68impl Default for AlertThresholds {
69 fn default() -> Self {
70 Self {
71 memory_threshold: 90.0,
72 gpu_utilization_threshold: 95.0,
73 temperature_threshold: 80.0,
74 loss_spike_threshold: 2.0,
75 gradient_norm_threshold: 10.0,
76 }
77 }
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct MetricDataPoint {
83 pub timestamp: u64,
84 pub value: f64,
85 pub label: String,
86 pub category: MetricCategory,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
91pub enum MetricCategory {
92 Training,
93 Memory,
94 GPU,
95 Network,
96 Performance,
97 Custom(String),
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct DashboardAlert {
103 pub id: String,
104 pub timestamp: u64,
105 pub severity: AlertSeverity,
106 pub category: MetricCategory,
107 pub title: String,
108 pub message: String,
109 pub value: Option<f64>,
110 pub threshold: Option<f64>,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
115pub enum AlertSeverity {
116 Info,
117 Warning,
118 Error,
119 Critical,
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize)]
124#[serde(tag = "type")]
125pub enum WebSocketMessage {
126 MetricUpdate {
127 data: Vec<MetricDataPoint>,
128 },
129 Alert {
130 alert: DashboardAlert,
131 },
132 ConfigUpdate {
133 config: DashboardConfig,
134 },
135 SessionInfo {
136 session_id: String,
137 uptime: u64,
138 },
139 HistoricalData {
140 category: MetricCategory,
141 data: Vec<MetricDataPoint>,
142 },
143 SystemStats {
144 stats: SystemStats,
145 },
146 #[serde(untagged)]
147 Generic {
148 message_type: String,
149 data: serde_json::Value,
150 timestamp: u64,
151 session_id: String,
152 },
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct AnomalyDetection {
158 pub timestamp: u64,
159 pub value: f64,
160 pub expected_range: (f64, f64),
161 pub anomaly_type: AnomalyType,
162 pub confidence_score: f64,
163 pub category: MetricCategory,
164 pub description: String,
165}
166
167#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
169pub enum AnomalyType {
170 Spike,
171 Drop,
172 GradualIncrease,
173 GradualDecrease,
174 Outlier,
175}
176
177#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct DashboardVisualizationData {
180 pub heatmap_data: HashMap<MetricCategory, HeatmapData>,
181 pub time_series_data: HashMap<MetricCategory, Vec<TimeSeriesPoint>>,
182 pub correlation_matrix: Vec<Vec<f64>>,
183 pub performance_distribution: HashMap<MetricCategory, HistogramData>,
184 pub generated_at: u64,
185 pub session_id: String,
186}
187
188#[derive(Debug, Clone, Serialize, Deserialize)]
190pub struct HeatmapData {
191 pub intensity: f64,
192 pub normalized_intensity: f64,
193 pub data_points: usize,
194 pub timestamp: u64,
195}
196
197#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct TimeSeriesPoint {
200 pub timestamp: u64,
201 pub value: f64,
202 pub label: String,
203}
204
205#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct HistogramData {
208 pub bins: Vec<HistogramBin>,
209 pub max_frequency: usize,
210}
211
212#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct HistogramBin {
215 pub range_start: f64,
216 pub range_end: f64,
217 pub frequency: usize,
218}
219
220#[derive(Debug, Clone, Serialize, Deserialize)]
222pub struct PerformancePrediction {
223 pub category: MetricCategory,
224 pub predicted_value: f64,
225 pub confidence_interval: (f64, f64),
226 pub trend_direction: TrendDirection,
227 pub trend_strength: f64,
228 pub prediction_horizon_hours: u64,
229 pub model_accuracy: f64,
230 pub generated_at: u64,
231 pub recommendations: Vec<String>,
232}
233
234#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
236pub enum TrendDirection {
237 Increasing,
238 Decreasing,
239 Stable,
240}
241
242#[derive(Debug, Clone, Serialize, Deserialize)]
244pub struct DashboardTheme {
245 pub name: String,
246 pub primary_color: String,
247 pub secondary_color: String,
248 pub background_color: String,
249 pub text_color: String,
250 pub accent_color: String,
251 pub chart_colors: Vec<String>,
252 pub dark_mode: bool,
253 pub font_family: String,
254 pub border_radius: u8,
255}
256
257impl Default for DashboardTheme {
258 fn default() -> Self {
259 Self {
260 name: "Default".to_string(),
261 primary_color: "#3b82f6".to_string(),
262 secondary_color: "#64748b".to_string(),
263 background_color: "#ffffff".to_string(),
264 text_color: "#1f2937".to_string(),
265 accent_color: "#10b981".to_string(),
266 chart_colors: vec![
267 "#3b82f6".to_string(),
268 "#ef4444".to_string(),
269 "#10b981".to_string(),
270 "#f59e0b".to_string(),
271 "#8b5cf6".to_string(),
272 ],
273 dark_mode: false,
274 font_family: "Inter, sans-serif".to_string(),
275 border_radius: 8,
276 }
277 }
278}
279
280#[derive(Debug, Clone, Serialize, Deserialize)]
282pub enum ExportFormat {
283 JSON,
284 CSV,
285 MessagePack,
286}
287
288#[derive(Debug, Clone, Serialize, Deserialize)]
290pub struct SystemStats {
291 pub uptime: u64,
292 pub total_alerts: usize,
293 pub active_connections: usize,
294 pub data_points_collected: usize,
295 pub memory_usage_mb: f64,
296 pub cpu_usage_percent: f64,
297}
298
299#[derive(Debug)]
301pub struct RealtimeDashboard {
302 config: Arc<Mutex<DashboardConfig>>,
303 session_id: String,
304 start_time: Instant,
305 metric_data: Arc<Mutex<HashMap<MetricCategory, VecDeque<MetricDataPoint>>>>,
306 alert_history: Arc<Mutex<VecDeque<DashboardAlert>>>,
307 websocket_sender: broadcast::Sender<WebSocketMessage>,
308 active_connections: Arc<Mutex<usize>>,
309 total_data_points: Arc<Mutex<usize>>,
310 is_running: Arc<Mutex<bool>>,
311}
312
313impl RealtimeDashboard {
314 pub fn new(config: DashboardConfig) -> Self {
316 let (websocket_sender, _) = broadcast::channel(1000);
317
318 Self {
319 config: Arc::new(Mutex::new(config)),
320 session_id: Uuid::new_v4().to_string(),
321 start_time: Instant::now(),
322 metric_data: Arc::new(Mutex::new(HashMap::new())),
323 alert_history: Arc::new(Mutex::new(VecDeque::new())),
324 websocket_sender,
325 active_connections: Arc::new(Mutex::new(0)),
326 total_data_points: Arc::new(Mutex::new(0)),
327 is_running: Arc::new(Mutex::new(false)),
328 }
329 }
330
331 pub async fn start(&self) -> Result<()> {
333 {
334 let mut running = self
335 .is_running
336 .lock()
337 .map_err(|_| anyhow::anyhow!("Failed to acquire running state lock"))?;
338 if *running {
339 return Ok(());
340 }
341 *running = true;
342 }
343
344 self.start_data_collection().await?;
346
347 self.start_system_stats_updates().await?;
349
350 self.start_alert_monitoring().await?;
352
353 Ok(())
354 }
355
356 pub fn stop(&self) {
358 if let Ok(mut running) = self.is_running.lock() {
359 *running = false;
360 }
361 }
362
363 pub fn add_metric(&self, category: MetricCategory, label: String, value: f64) -> Result<()> {
365 let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis() as u64;
366
367 let data_point = MetricDataPoint {
368 timestamp,
369 value,
370 label,
371 category: category.clone(),
372 };
373
374 {
376 let mut data = self
377 .metric_data
378 .lock()
379 .map_err(|_| anyhow::anyhow!("Failed to acquire metric data lock"))?;
380 let category_data = data.entry(category.clone()).or_insert_with(VecDeque::new);
381
382 category_data.push_back(data_point.clone());
383
384 let max_points = self
385 .config
386 .lock()
387 .map_err(|_| anyhow::anyhow!("Failed to acquire config lock"))?
388 .max_data_points;
389 while category_data.len() > max_points {
390 category_data.pop_front();
391 }
392 }
393
394 {
396 if let Ok(mut total) = self.total_data_points.lock() {
397 *total += 1;
398 }
399 }
400
401 let message = WebSocketMessage::MetricUpdate {
403 data: vec![data_point],
404 };
405
406 let _ = self.websocket_sender.send(message);
407
408 self.check_for_alerts(&category, value);
410
411 Ok(())
412 }
413
414 pub fn add_metrics(&self, metrics: Vec<(MetricCategory, String, f64)>) -> Result<()> {
416 let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis() as u64;
417
418 let mut data_points = Vec::new();
419
420 for (category, label, value) in metrics {
422 let data_point = MetricDataPoint {
423 timestamp,
424 value,
425 label,
426 category: category.clone(),
427 };
428
429 {
431 let mut data = self.metric_data.lock().expect("lock should not be poisoned");
432 let category_data = data.entry(category.clone()).or_default();
433 category_data.push_back(data_point.clone());
434
435 let max_points =
436 self.config.lock().expect("lock should not be poisoned").max_data_points;
437 while category_data.len() > max_points {
438 category_data.pop_front();
439 }
440 }
441
442 data_points.push(data_point);
443
444 self.check_for_alerts(&category, value);
446 }
447
448 {
450 let mut total = self.total_data_points.lock().expect("lock should not be poisoned");
451 *total += data_points.len();
452 }
453
454 let message = WebSocketMessage::MetricUpdate { data: data_points };
456 let _ = self.websocket_sender.send(message);
457
458 Ok(())
459 }
460
461 pub fn create_alert(
463 &self,
464 severity: AlertSeverity,
465 category: MetricCategory,
466 title: String,
467 message: String,
468 value: Option<f64>,
469 threshold: Option<f64>,
470 ) -> Result<()> {
471 let alert = DashboardAlert {
472 id: Uuid::new_v4().to_string(),
473 timestamp: SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis() as u64,
474 severity,
475 category,
476 title,
477 message,
478 value,
479 threshold,
480 };
481
482 {
484 let mut history = self.alert_history.lock().expect("lock should not be poisoned");
485 history.push_back(alert.clone());
486
487 while history.len() > 100 {
489 history.pop_front();
490 }
491 }
492
493 let message = WebSocketMessage::Alert { alert };
495 let _ = self.websocket_sender.send(message);
496
497 Ok(())
498 }
499
500 pub fn get_historical_data(&self, category: &MetricCategory) -> Vec<MetricDataPoint> {
502 let data = self.metric_data.lock().expect("lock should not be poisoned");
503 data.get(category)
504 .map(|deque| deque.iter().cloned().collect())
505 .unwrap_or_default()
506 }
507
508 pub fn get_system_stats(&self) -> SystemStats {
510 let uptime = self.start_time.elapsed().as_secs();
511 let total_alerts = self.alert_history.lock().expect("lock should not be poisoned").len();
512 let active_connections =
513 *self.active_connections.lock().expect("lock should not be poisoned");
514 let data_points_collected =
515 *self.total_data_points.lock().expect("lock should not be poisoned");
516
517 let memory_usage_mb = self.estimate_memory_usage();
519 let cpu_usage_percent = self.estimate_cpu_usage();
520
521 SystemStats {
522 uptime,
523 total_alerts,
524 active_connections,
525 data_points_collected,
526 memory_usage_mb,
527 cpu_usage_percent,
528 }
529 }
530
531 pub fn subscribe(&self) -> BroadcastStream<WebSocketMessage> {
533 {
535 let mut connections =
536 self.active_connections.lock().expect("lock should not be poisoned");
537 *connections += 1;
538 }
539
540 BroadcastStream::new(self.websocket_sender.subscribe())
541 }
542
543 pub fn update_config(&self, new_config: DashboardConfig) -> Result<()> {
545 {
546 let mut config = self.config.lock().expect("lock should not be poisoned");
547 *config = new_config.clone();
548 }
549
550 let message = WebSocketMessage::ConfigUpdate { config: new_config };
552 let _ = self.websocket_sender.send(message);
553
554 Ok(())
555 }
556
557 pub fn get_config(&self) -> DashboardConfig {
559 self.config.lock().expect("lock should not be poisoned").clone()
560 }
561
562 async fn start_data_collection(&self) -> Result<()> {
564 let config = self.config.clone();
565 let _metric_data = self.metric_data.clone();
566 let websocket_sender = self.websocket_sender.clone();
567 let is_running = self.is_running.clone();
568
569 tokio::spawn(async move {
570 let mut interval = interval(Duration::from_millis(
571 config.lock().expect("lock should not be poisoned").update_frequency_ms,
572 ));
573
574 while *is_running.lock().expect("lock should not be poisoned") {
575 interval.tick().await;
576
577 if let Ok(metrics) = Self::collect_system_metrics(&config).await {
579 let message = WebSocketMessage::MetricUpdate { data: metrics };
580 let _ = websocket_sender.send(message);
581 }
582 }
583 });
584
585 Ok(())
586 }
587
588 async fn start_system_stats_updates(&self) -> Result<()> {
590 let websocket_sender = self.websocket_sender.clone();
591 let start_time = self.start_time;
592 let alert_history = self.alert_history.clone();
593 let active_connections = self.active_connections.clone();
594 let total_data_points = self.total_data_points.clone();
595 let is_running = self.is_running.clone();
596
597 tokio::spawn(async move {
598 let mut interval = interval(Duration::from_secs(5)); while *is_running.lock().expect("lock should not be poisoned") {
601 interval.tick().await;
602
603 let stats = SystemStats {
604 uptime: start_time.elapsed().as_secs(),
605 total_alerts: alert_history.lock().expect("lock should not be poisoned").len(),
606 active_connections: *active_connections
607 .lock()
608 .expect("lock should not be poisoned"),
609 data_points_collected: *total_data_points
610 .lock()
611 .expect("lock should not be poisoned"),
612 memory_usage_mb: 0.0, cpu_usage_percent: 0.0, };
615
616 let message = WebSocketMessage::SystemStats { stats };
617 let _ = websocket_sender.send(message);
618 }
619 });
620
621 Ok(())
622 }
623
624 async fn start_alert_monitoring(&self) -> Result<()> {
626 let config = self.config.clone();
627 let metric_data = self.metric_data.clone();
628 let is_running = self.is_running.clone();
629
630 tokio::spawn(async move {
631 let mut interval = interval(Duration::from_secs(1));
632
633 while *is_running.lock().expect("lock should not be poisoned") {
634 interval.tick().await;
635
636 Self::check_threshold_breaches(&config, &metric_data).await;
638 }
639 });
640
641 Ok(())
642 }
643
644 async fn collect_system_metrics(
646 config: &Arc<Mutex<DashboardConfig>>,
647 ) -> Result<Vec<MetricDataPoint>> {
648 let mut metrics = Vec::new();
649 let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis() as u64;
650
651 let cfg = config.lock().expect("lock should not be poisoned");
652
653 if cfg.enable_memory_profiling {
654 let memory_usage = Self::get_memory_usage();
656 metrics.push(MetricDataPoint {
657 timestamp,
658 value: memory_usage,
659 label: "Memory Usage".to_string(),
660 category: MetricCategory::Memory,
661 });
662 }
663
664 if cfg.enable_gpu_monitoring {
665 let gpu_utilization = Self::get_gpu_utilization();
667 metrics.push(MetricDataPoint {
668 timestamp,
669 value: gpu_utilization,
670 label: "GPU Utilization".to_string(),
671 category: MetricCategory::GPU,
672 });
673
674 let gpu_memory = Self::get_gpu_memory_usage();
675 metrics.push(MetricDataPoint {
676 timestamp,
677 value: gpu_memory,
678 label: "GPU Memory".to_string(),
679 category: MetricCategory::GPU,
680 });
681 }
682
683 Ok(metrics)
684 }
685
686 fn check_for_alerts(&self, category: &MetricCategory, value: f64) {
688 let config = self.config.lock().expect("lock should not be poisoned");
689 let thresholds = &config.alert_thresholds;
690
691 match category {
692 MetricCategory::Memory if value > thresholds.memory_threshold => {
693 let _ = self.create_alert(
694 AlertSeverity::Warning,
695 category.clone(),
696 "High Memory Usage".to_string(),
697 format!(
698 "Memory usage is {:.1}% (threshold: {:.1}%)",
699 value, thresholds.memory_threshold
700 ),
701 Some(value),
702 Some(thresholds.memory_threshold),
703 );
704 },
705 MetricCategory::GPU if value > thresholds.gpu_utilization_threshold => {
706 let _ = self.create_alert(
707 AlertSeverity::Warning,
708 category.clone(),
709 "High GPU Utilization".to_string(),
710 format!(
711 "GPU utilization is {:.1}% (threshold: {:.1}%)",
712 value, thresholds.gpu_utilization_threshold
713 ),
714 Some(value),
715 Some(thresholds.gpu_utilization_threshold),
716 );
717 },
718 MetricCategory::Training if value > thresholds.loss_spike_threshold => {
719 let _ = self.create_alert(
720 AlertSeverity::Error,
721 category.clone(),
722 "Training Loss Spike".to_string(),
723 format!(
724 "Loss spike detected: {:.4} (threshold: {:.4})",
725 value, thresholds.loss_spike_threshold
726 ),
727 Some(value),
728 Some(thresholds.loss_spike_threshold),
729 );
730 },
731 _ => {},
732 }
733 }
734
735 async fn check_threshold_breaches(
737 config: &Arc<Mutex<DashboardConfig>>,
738 metric_data: &Arc<Mutex<HashMap<MetricCategory, VecDeque<MetricDataPoint>>>>,
739 ) {
740 let _config = config.lock().expect("lock should not be poisoned");
741 let _data = metric_data.lock().expect("lock should not be poisoned");
742
743 }
746
747 fn get_memory_usage() -> f64 {
749 50.0 + (thread_rng().random::<f64>() * 40.0)
751 }
752
753 fn get_gpu_utilization() -> f64 {
755 30.0 + (thread_rng().random::<f64>() * 60.0)
757 }
758
759 fn get_gpu_memory_usage() -> f64 {
761 40.0 + (thread_rng().random::<f64>() * 50.0)
763 }
764
765 fn estimate_memory_usage(&self) -> f64 {
767 let data = self.metric_data.lock().expect("lock should not be poisoned");
768 let mut total_points = 0;
769
770 for deque in data.values() {
771 total_points += deque.len();
772 }
773
774 (total_points * 100) as f64 / (1024.0 * 1024.0)
776 }
777
778 fn estimate_cpu_usage(&self) -> f64 {
780 5.0 + (thread_rng().random::<f64>() * 10.0)
782 }
783
784 pub async fn detect_metric_anomalies(
786 &self,
787 category: &MetricCategory,
788 ) -> Result<Vec<AnomalyDetection>> {
789 let data = self.get_historical_data(category);
790 let mut anomalies = Vec::new();
791
792 if data.len() < 10 {
793 return Ok(anomalies); }
795
796 let values: Vec<f64> = data.iter().map(|d| d.value).collect();
798 let mean = values.iter().sum::<f64>() / values.len() as f64;
799 let variance = values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / values.len() as f64;
800 let std_dev = variance.sqrt();
801
802 let z_threshold = 2.0; for point in data.iter() {
805 let z_score = (point.value - mean).abs() / std_dev;
806 if z_score > z_threshold {
807 let anomaly_type =
808 if point.value > mean { AnomalyType::Spike } else { AnomalyType::Drop };
809
810 anomalies.push(AnomalyDetection {
811 timestamp: point.timestamp,
812 value: point.value,
813 expected_range: (mean - std_dev, mean + std_dev),
814 anomaly_type,
815 confidence_score: (z_score - z_threshold) / z_threshold,
816 category: category.clone(),
817 description: format!(
818 "Detected {} in {} metrics: value {} (Z-score: {:.2})",
819 match anomaly_type {
820 AnomalyType::Spike => "spike",
821 AnomalyType::Drop => "drop",
822 _ => "anomaly",
823 },
824 match category {
825 MetricCategory::Training => "training",
826 MetricCategory::Memory => "memory",
827 MetricCategory::GPU => "GPU",
828 MetricCategory::Network => "network",
829 MetricCategory::Performance => "performance",
830 MetricCategory::Custom(name) => name,
831 },
832 point.value,
833 z_score
834 ),
835 });
836 }
837 }
838
839 if data.len() >= 20 {
841 let recent_window = &data[data.len() - 10..];
842 let earlier_window = &data[data.len() - 20..data.len() - 10];
843
844 let recent_avg =
845 recent_window.iter().map(|d| d.value).sum::<f64>() / recent_window.len() as f64;
846 let earlier_avg =
847 earlier_window.iter().map(|d| d.value).sum::<f64>() / earlier_window.len() as f64;
848
849 let trend_change = (recent_avg - earlier_avg) / earlier_avg;
850
851 if trend_change.abs() > 0.3 {
852 anomalies.push(AnomalyDetection {
854 timestamp: recent_window
855 .last()
856 .expect("recent_window has at least 10 elements")
857 .timestamp,
858 value: recent_avg,
859 expected_range: (earlier_avg * 0.9, earlier_avg * 1.1),
860 anomaly_type: if trend_change > 0.0 {
861 AnomalyType::GradualIncrease
862 } else {
863 AnomalyType::GradualDecrease
864 },
865 confidence_score: trend_change.abs(),
866 category: category.clone(),
867 description: format!(
868 "Detected gradual {} trend: {:.1}% change over recent measurements",
869 if trend_change > 0.0 { "increase" } else { "decrease" },
870 trend_change.abs() * 100.0
871 ),
872 });
873 }
874 }
875
876 Ok(anomalies)
877 }
878
879 pub fn generate_advanced_visualizations(&self) -> Result<DashboardVisualizationData> {
881 let mut heatmap_data = HashMap::new();
882 let mut time_series_data = HashMap::new();
883 let mut correlation_matrix = Vec::new();
884 let mut performance_distribution = HashMap::new();
885
886 for (category, data) in self.metric_data.lock().expect("lock should not be poisoned").iter()
888 {
889 if data.len() >= 10 {
890 let recent_data: Vec<f64> = data.iter().rev().take(10).map(|d| d.value).collect();
891 let avg_value = recent_data.iter().sum::<f64>() / recent_data.len() as f64;
892
893 heatmap_data.insert(
894 category.clone(),
895 HeatmapData {
896 intensity: avg_value,
897 normalized_intensity: (avg_value / (avg_value + 1.0)).min(1.0), data_points: recent_data.len(),
899 timestamp: SystemTime::now()
900 .duration_since(UNIX_EPOCH)
901 .expect("System time should be after UNIX_EPOCH")
902 .as_secs(),
903 },
904 );
905
906 let time_series: Vec<TimeSeriesPoint> = data
908 .iter()
909 .map(|d| TimeSeriesPoint {
910 timestamp: d.timestamp,
911 value: d.value,
912 label: d.label.clone(),
913 })
914 .collect();
915
916 time_series_data.insert(category.clone(), time_series);
917
918 let values: Vec<f64> = data.iter().map(|d| d.value).collect();
920 let histogram = self.create_histogram(&values, 10);
921 performance_distribution.insert(category.clone(), histogram);
922 }
923 }
924
925 let categories: Vec<&MetricCategory> = heatmap_data.keys().collect();
927 for (i, cat1) in categories.iter().enumerate() {
928 let mut row = Vec::new();
929 for (j, cat2) in categories.iter().enumerate() {
930 if i == j {
931 row.push(1.0); } else {
933 let corr = self.calculate_correlation_coefficient(cat1, cat2);
935 row.push(corr);
936 }
937 }
938 correlation_matrix.push(row);
939 }
940
941 Ok(DashboardVisualizationData {
942 heatmap_data,
943 time_series_data,
944 correlation_matrix,
945 performance_distribution,
946 generated_at: SystemTime::now()
947 .duration_since(UNIX_EPOCH)
948 .expect("System time should be after UNIX_EPOCH")
949 .as_secs(),
950 session_id: self.session_id.clone(),
951 })
952 }
953
954 pub async fn predict_performance_trends(
956 &self,
957 category: &MetricCategory,
958 hours_ahead: u64,
959 ) -> Result<PerformancePrediction> {
960 let data = self.get_historical_data(category);
961
962 if data.len() < 20 {
963 return Err(anyhow::anyhow!(
964 "Insufficient data for prediction (need at least 20 points)"
965 ));
966 }
967
968 let values: Vec<f64> = data.iter().map(|d| d.value).collect();
969 let timestamps: Vec<u64> = data.iter().map(|d| d.timestamp).collect();
970
971 let n = values.len() as f64;
973 let sum_x = timestamps.iter().sum::<u64>() as f64;
974 let sum_y = values.iter().sum::<f64>();
975 let sum_xy = timestamps.iter().zip(&values).map(|(x, y)| *x as f64 * y).sum::<f64>();
976 let sum_x2 = timestamps.iter().map(|x| (*x as f64).powi(2)).sum::<f64>();
977
978 let slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x.powi(2));
979 let intercept = (sum_y - slope * sum_x) / n;
980
981 let current_time = SystemTime::now()
983 .duration_since(UNIX_EPOCH)
984 .expect("System time should be after UNIX_EPOCH")
985 .as_secs();
986 let prediction_time = current_time + (hours_ahead * 3600);
987 let predicted_value = slope * prediction_time as f64 + intercept;
988
989 let mean = sum_y / n;
991 let variance = values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / n;
992 let std_error = (variance / n).sqrt();
993 let confidence_interval = std_error * 1.96; let trend_strength = slope.abs() / mean.abs();
997 let trend_direction = if slope > 0.01 {
998 TrendDirection::Increasing
999 } else if slope < -0.01 {
1000 TrendDirection::Decreasing
1001 } else {
1002 TrendDirection::Stable
1003 };
1004
1005 Ok(PerformancePrediction {
1006 category: category.clone(),
1007 predicted_value,
1008 confidence_interval: (
1009 predicted_value - confidence_interval,
1010 predicted_value + confidence_interval,
1011 ),
1012 trend_direction,
1013 trend_strength,
1014 prediction_horizon_hours: hours_ahead,
1015 model_accuracy: 1.0 - (std_error / mean.abs()).min(1.0), generated_at: current_time,
1017 recommendations: self.generate_performance_recommendations(
1018 &trend_direction,
1019 trend_strength,
1020 predicted_value,
1021 ),
1022 })
1023 }
1024
1025 pub fn apply_dashboard_theme(&self, theme: DashboardTheme) -> Result<()> {
1027 let theme_message = WebSocketMessage::Generic {
1029 message_type: "theme_update".to_string(),
1030 data: serde_json::to_value(&theme)?,
1031 timestamp: SystemTime::now()
1032 .duration_since(UNIX_EPOCH)
1033 .expect("System time should be after UNIX_EPOCH")
1034 .as_secs(),
1035 session_id: self.session_id.clone(),
1036 };
1037
1038 if self.websocket_sender.send(theme_message).is_err() {
1039 }
1041
1042 Ok(())
1043 }
1044
1045 pub async fn export_dashboard_data(
1047 &self,
1048 format: ExportFormat,
1049 time_range: Option<(u64, u64)>,
1050 ) -> Result<Vec<u8>> {
1051 let data = if let Some((start, end)) = time_range {
1052 self.get_filtered_data(start, end)
1053 } else {
1054 self.get_all_data()
1055 };
1056
1057 match format {
1058 ExportFormat::JSON => {
1059 let json_data = serde_json::to_string_pretty(&data)?;
1060 Ok(json_data.into_bytes())
1061 },
1062 ExportFormat::CSV => {
1063 let mut csv_data = String::from("timestamp,category,label,value\n");
1064 for (category, points) in data {
1065 for point in points {
1066 csv_data.push_str(&format!(
1067 "{},{:?},{},{}\n",
1068 point.timestamp, category, point.label, point.value
1069 ));
1070 }
1071 }
1072 Ok(csv_data.into_bytes())
1073 },
1074 ExportFormat::MessagePack => {
1075 let json_data = serde_json::to_string(&data)?;
1078 Ok(json_data.into_bytes())
1079 },
1080 }
1081 }
1082
1083 fn create_histogram(&self, values: &[f64], bins: usize) -> HistogramData {
1086 if values.is_empty() {
1087 return HistogramData {
1088 bins: Vec::new(),
1089 max_frequency: 0,
1090 };
1091 }
1092
1093 let min_val = values.iter().fold(f64::INFINITY, |a, &b| a.min(b));
1094 let max_val = values.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
1095 let bin_width = (max_val - min_val) / bins as f64;
1096
1097 let mut histogram_bins = vec![0; bins];
1098
1099 for &value in values {
1100 let bin_idx = ((value - min_val) / bin_width).floor() as usize;
1101 let bin_idx = bin_idx.min(bins - 1); histogram_bins[bin_idx] += 1;
1103 }
1104
1105 let max_frequency = *histogram_bins.iter().max().unwrap_or(&0);
1106
1107 let bins_data: Vec<HistogramBin> = histogram_bins
1108 .into_iter()
1109 .enumerate()
1110 .map(|(i, count)| HistogramBin {
1111 range_start: min_val + i as f64 * bin_width,
1112 range_end: min_val + (i + 1) as f64 * bin_width,
1113 frequency: count,
1114 })
1115 .collect();
1116
1117 HistogramData {
1118 bins: bins_data,
1119 max_frequency,
1120 }
1121 }
1122
1123 fn calculate_correlation_coefficient(
1124 &self,
1125 cat1: &MetricCategory,
1126 cat2: &MetricCategory,
1127 ) -> f64 {
1128 let data = self.metric_data.lock().expect("lock should not be poisoned");
1129
1130 let data1 = match data.get(cat1) {
1131 Some(d) => d,
1132 None => return 0.0,
1133 };
1134
1135 let data2 = match data.get(cat2) {
1136 Some(d) => d,
1137 None => return 0.0,
1138 };
1139
1140 if data1.len() < 2 || data2.len() < 2 {
1141 return 0.0;
1142 }
1143
1144 let min_len = data1.len().min(data2.len()).min(50); let values1: Vec<f64> = data1.iter().rev().take(min_len).map(|d| d.value).collect();
1147 let values2: Vec<f64> = data2.iter().rev().take(min_len).map(|d| d.value).collect();
1148
1149 let n = values1.len() as f64;
1151 let mean1 = values1.iter().sum::<f64>() / n;
1152 let mean2 = values2.iter().sum::<f64>() / n;
1153
1154 let covariance = values1
1155 .iter()
1156 .zip(&values2)
1157 .map(|(v1, v2)| (v1 - mean1) * (v2 - mean2))
1158 .sum::<f64>()
1159 / n;
1160
1161 let std1 = (values1.iter().map(|v| (v - mean1).powi(2)).sum::<f64>() / n).sqrt();
1162 let std2 = (values2.iter().map(|v| (v - mean2).powi(2)).sum::<f64>() / n).sqrt();
1163
1164 if std1 == 0.0 || std2 == 0.0 {
1165 0.0
1166 } else {
1167 covariance / (std1 * std2)
1168 }
1169 }
1170
1171 fn generate_performance_recommendations(
1172 &self,
1173 trend: &TrendDirection,
1174 strength: f64,
1175 predicted_value: f64,
1176 ) -> Vec<String> {
1177 let mut recommendations = Vec::new();
1178
1179 match trend {
1180 TrendDirection::Increasing => {
1181 if strength > 0.1 {
1182 recommendations.push(
1183 "Monitor for potential resource exhaustion due to increasing trend"
1184 .to_string(),
1185 );
1186 recommendations.push("Consider scaling resources proactively".to_string());
1187 }
1188 if predicted_value > 90.0 {
1189 recommendations.push(
1190 "Critical threshold approaching - immediate action recommended".to_string(),
1191 );
1192 }
1193 },
1194 TrendDirection::Decreasing => {
1195 if strength > 0.05 {
1196 recommendations
1197 .push("Investigate potential performance degradation".to_string());
1198 recommendations.push("Check for resource leaks or inefficiencies".to_string());
1199 }
1200 },
1201 TrendDirection::Stable => {
1202 recommendations
1203 .push("Performance trend is stable - continue monitoring".to_string());
1204 },
1205 }
1206
1207 if recommendations.is_empty() {
1208 recommendations.push("No specific recommendations at this time".to_string());
1209 }
1210
1211 recommendations
1212 }
1213
1214 fn get_filtered_data(
1215 &self,
1216 start: u64,
1217 end: u64,
1218 ) -> HashMap<MetricCategory, VecDeque<MetricDataPoint>> {
1219 let data = self.metric_data.lock().expect("lock should not be poisoned");
1220 let mut filtered_data = HashMap::new();
1221
1222 for (category, points) in data.iter() {
1223 let filtered_points: VecDeque<MetricDataPoint> = points
1224 .iter()
1225 .filter(|p| p.timestamp >= start && p.timestamp <= end)
1226 .cloned()
1227 .collect();
1228
1229 if !filtered_points.is_empty() {
1230 filtered_data.insert(category.clone(), filtered_points);
1231 }
1232 }
1233
1234 filtered_data
1235 }
1236
1237 fn get_all_data(&self) -> HashMap<MetricCategory, VecDeque<MetricDataPoint>> {
1238 self.metric_data.lock().expect("lock should not be poisoned").clone()
1239 }
1240}
1241
1242#[derive(Debug, Default)]
1244pub struct DashboardBuilder {
1245 config: DashboardConfig,
1246}
1247
1248impl DashboardBuilder {
1249 pub fn new() -> Self {
1251 Self::default()
1252 }
1253
1254 pub fn port(mut self, port: u16) -> Self {
1256 self.config.websocket_port = port;
1257 self
1258 }
1259
1260 pub fn update_frequency(mut self, frequency_ms: u64) -> Self {
1262 self.config.update_frequency_ms = frequency_ms;
1263 self
1264 }
1265
1266 pub fn max_data_points(mut self, max_points: usize) -> Self {
1268 self.config.max_data_points = max_points;
1269 self
1270 }
1271
1272 pub fn gpu_monitoring(mut self, enabled: bool) -> Self {
1274 self.config.enable_gpu_monitoring = enabled;
1275 self
1276 }
1277
1278 pub fn memory_profiling(mut self, enabled: bool) -> Self {
1280 self.config.enable_memory_profiling = enabled;
1281 self
1282 }
1283
1284 pub fn alert_thresholds(mut self, thresholds: AlertThresholds) -> Self {
1286 self.config.alert_thresholds = thresholds;
1287 self
1288 }
1289
1290 pub fn build(self) -> RealtimeDashboard {
1292 RealtimeDashboard::new(self.config)
1293 }
1294}
1295
1296#[cfg(test)]
1297mod tests {
1298 use super::*;
1299 use futures::StreamExt;
1300 use std::time::Duration;
1301
1302 #[tokio::test]
1303 async fn test_dashboard_creation() {
1304 let dashboard = DashboardBuilder::new()
1305 .port(8081)
1306 .update_frequency(50)
1307 .max_data_points(500)
1308 .build();
1309
1310 assert_eq!(dashboard.get_config().websocket_port, 8081);
1311 assert_eq!(dashboard.get_config().update_frequency_ms, 50);
1312 assert_eq!(dashboard.get_config().max_data_points, 500);
1313 }
1314
1315 #[tokio::test]
1316 async fn test_metric_addition() {
1317 let dashboard = DashboardBuilder::new().build();
1318
1319 let result = dashboard.add_metric(MetricCategory::Training, "loss".to_string(), 0.5);
1320
1321 assert!(result.is_ok());
1322
1323 let historical_data = dashboard.get_historical_data(&MetricCategory::Training);
1324 assert_eq!(historical_data.len(), 1);
1325 assert_eq!(historical_data[0].value, 0.5);
1326 assert_eq!(historical_data[0].label, "loss");
1327 }
1328
1329 #[tokio::test]
1330 async fn test_batch_metrics() {
1331 let dashboard = DashboardBuilder::new().build();
1332
1333 let metrics = vec![
1334 (MetricCategory::Training, "loss".to_string(), 0.5),
1335 (MetricCategory::Training, "accuracy".to_string(), 0.9),
1336 (MetricCategory::GPU, "utilization".to_string(), 75.0),
1337 ];
1338
1339 let result = dashboard.add_metrics(metrics);
1340 assert!(result.is_ok());
1341
1342 let training_data = dashboard.get_historical_data(&MetricCategory::Training);
1343 assert_eq!(training_data.len(), 2);
1344
1345 let gpu_data = dashboard.get_historical_data(&MetricCategory::GPU);
1346 assert_eq!(gpu_data.len(), 1);
1347 }
1348
1349 #[tokio::test]
1350 async fn test_alert_creation() {
1351 let dashboard = DashboardBuilder::new().build();
1352
1353 let result = dashboard.create_alert(
1354 AlertSeverity::Warning,
1355 MetricCategory::Memory,
1356 "High Memory".to_string(),
1357 "Memory usage is high".to_string(),
1358 Some(95.0),
1359 Some(90.0),
1360 );
1361
1362 assert!(result.is_ok());
1363
1364 let history = dashboard.alert_history.lock().expect("lock should not be poisoned");
1365 assert_eq!(history.len(), 1);
1366 assert_eq!(history[0].title, "High Memory");
1367 }
1368
1369 #[tokio::test]
1370 async fn test_websocket_subscription() {
1371 let dashboard = DashboardBuilder::new().build();
1372
1373 let mut stream = dashboard.subscribe();
1374
1375 let dashboard_clone = Arc::new(dashboard);
1377 let dashboard_for_task = dashboard_clone.clone();
1378
1379 tokio::spawn(async move {
1380 let _ = dashboard_for_task.start().await;
1381 });
1382
1383 let _ =
1385 dashboard_clone.add_metric(MetricCategory::Training, "test_metric".to_string(), 42.0);
1386
1387 let message_result = tokio::time::timeout(Duration::from_millis(100), stream.next()).await;
1389
1390 dashboard_clone.stop();
1391
1392 assert!(message_result.is_ok());
1394 if let Ok(Some(Ok(message))) = message_result {
1395 match message {
1396 WebSocketMessage::MetricUpdate { data } => {
1397 assert!(!data.is_empty());
1398 assert_eq!(data[0].value, 42.0);
1399 assert_eq!(data[0].label, "test_metric");
1400 },
1401 _ => panic!("Expected MetricUpdate message"),
1402 }
1403 }
1404 }
1405
1406 #[tokio::test]
1407 async fn test_system_stats() {
1408 let dashboard = DashboardBuilder::new().build();
1409
1410 let _ = dashboard.add_metric(MetricCategory::Training, "loss".to_string(), 0.5);
1412 let _ = dashboard.create_alert(
1413 AlertSeverity::Info,
1414 MetricCategory::Training,
1415 "Test Alert".to_string(),
1416 "Test message".to_string(),
1417 None,
1418 None,
1419 );
1420
1421 let stats = dashboard.get_system_stats();
1422
1423 assert_eq!(stats.data_points_collected, 1);
1424 assert_eq!(stats.total_alerts, 1);
1425 }
1427
1428 #[tokio::test]
1429 async fn test_data_point_limit() {
1430 let dashboard = DashboardBuilder::new().max_data_points(2).build();
1431
1432 let _ = dashboard.add_metric(MetricCategory::Training, "metric1".to_string(), 1.0);
1434 let _ = dashboard.add_metric(MetricCategory::Training, "metric2".to_string(), 2.0);
1435 let _ = dashboard.add_metric(MetricCategory::Training, "metric3".to_string(), 3.0);
1436
1437 let data = dashboard.get_historical_data(&MetricCategory::Training);
1438
1439 assert_eq!(data.len(), 2);
1441 assert_eq!(data[0].value, 2.0); assert_eq!(data[1].value, 3.0); }
1444}