1use anyhow::Result;
7use scirs2_core::random::*; use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, VecDeque};
10use std::sync::{Arc, Mutex};
11use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
12use tokio::sync::broadcast;
13use tokio::time::interval;
14use tokio_stream::wrappers::BroadcastStream;
15use uuid::Uuid;
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct DashboardConfig {
20 pub websocket_port: u16,
22 pub update_frequency_ms: u64,
24 pub max_data_points: usize,
26 pub enable_gpu_monitoring: bool,
28 pub enable_memory_profiling: bool,
30 pub enable_network_monitoring: bool,
32 pub enable_performance_alerts: bool,
34 pub alert_thresholds: AlertThresholds,
36}
37
38impl Default for DashboardConfig {
39 fn default() -> Self {
40 Self {
41 websocket_port: 8080,
42 update_frequency_ms: 100,
43 max_data_points: 1000,
44 enable_gpu_monitoring: true,
45 enable_memory_profiling: true,
46 enable_network_monitoring: false,
47 enable_performance_alerts: true,
48 alert_thresholds: AlertThresholds::default(),
49 }
50 }
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct AlertThresholds {
56 pub memory_threshold: f64,
58 pub gpu_utilization_threshold: f64,
60 pub temperature_threshold: f64,
62 pub loss_spike_threshold: f64,
64 pub gradient_norm_threshold: f64,
66}
67
68impl Default for AlertThresholds {
69 fn default() -> Self {
70 Self {
71 memory_threshold: 90.0,
72 gpu_utilization_threshold: 95.0,
73 temperature_threshold: 80.0,
74 loss_spike_threshold: 2.0,
75 gradient_norm_threshold: 10.0,
76 }
77 }
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct MetricDataPoint {
83 pub timestamp: u64,
84 pub value: f64,
85 pub label: String,
86 pub category: MetricCategory,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
91pub enum MetricCategory {
92 Training,
93 Memory,
94 GPU,
95 Network,
96 Performance,
97 Custom(String),
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct DashboardAlert {
103 pub id: String,
104 pub timestamp: u64,
105 pub severity: AlertSeverity,
106 pub category: MetricCategory,
107 pub title: String,
108 pub message: String,
109 pub value: Option<f64>,
110 pub threshold: Option<f64>,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
115pub enum AlertSeverity {
116 Info,
117 Warning,
118 Error,
119 Critical,
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize)]
124#[serde(tag = "type")]
125pub enum WebSocketMessage {
126 MetricUpdate {
127 data: Vec<MetricDataPoint>,
128 },
129 Alert {
130 alert: DashboardAlert,
131 },
132 ConfigUpdate {
133 config: DashboardConfig,
134 },
135 SessionInfo {
136 session_id: String,
137 uptime: u64,
138 },
139 HistoricalData {
140 category: MetricCategory,
141 data: Vec<MetricDataPoint>,
142 },
143 SystemStats {
144 stats: SystemStats,
145 },
146 #[serde(untagged)]
147 Generic {
148 message_type: String,
149 data: serde_json::Value,
150 timestamp: u64,
151 session_id: String,
152 },
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct AnomalyDetection {
158 pub timestamp: u64,
159 pub value: f64,
160 pub expected_range: (f64, f64),
161 pub anomaly_type: AnomalyType,
162 pub confidence_score: f64,
163 pub category: MetricCategory,
164 pub description: String,
165}
166
167#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
169pub enum AnomalyType {
170 Spike,
171 Drop,
172 GradualIncrease,
173 GradualDecrease,
174 Outlier,
175}
176
177#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct DashboardVisualizationData {
180 pub heatmap_data: HashMap<MetricCategory, HeatmapData>,
181 pub time_series_data: HashMap<MetricCategory, Vec<TimeSeriesPoint>>,
182 pub correlation_matrix: Vec<Vec<f64>>,
183 pub performance_distribution: HashMap<MetricCategory, HistogramData>,
184 pub generated_at: u64,
185 pub session_id: String,
186}
187
188#[derive(Debug, Clone, Serialize, Deserialize)]
190pub struct HeatmapData {
191 pub intensity: f64,
192 pub normalized_intensity: f64,
193 pub data_points: usize,
194 pub timestamp: u64,
195}
196
197#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct TimeSeriesPoint {
200 pub timestamp: u64,
201 pub value: f64,
202 pub label: String,
203}
204
205#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct HistogramData {
208 pub bins: Vec<HistogramBin>,
209 pub max_frequency: usize,
210}
211
212#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct HistogramBin {
215 pub range_start: f64,
216 pub range_end: f64,
217 pub frequency: usize,
218}
219
220#[derive(Debug, Clone, Serialize, Deserialize)]
222pub struct PerformancePrediction {
223 pub category: MetricCategory,
224 pub predicted_value: f64,
225 pub confidence_interval: (f64, f64),
226 pub trend_direction: TrendDirection,
227 pub trend_strength: f64,
228 pub prediction_horizon_hours: u64,
229 pub model_accuracy: f64,
230 pub generated_at: u64,
231 pub recommendations: Vec<String>,
232}
233
234#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
236pub enum TrendDirection {
237 Increasing,
238 Decreasing,
239 Stable,
240}
241
242#[derive(Debug, Clone, Serialize, Deserialize)]
244pub struct DashboardTheme {
245 pub name: String,
246 pub primary_color: String,
247 pub secondary_color: String,
248 pub background_color: String,
249 pub text_color: String,
250 pub accent_color: String,
251 pub chart_colors: Vec<String>,
252 pub dark_mode: bool,
253 pub font_family: String,
254 pub border_radius: u8,
255}
256
257impl Default for DashboardTheme {
258 fn default() -> Self {
259 Self {
260 name: "Default".to_string(),
261 primary_color: "#3b82f6".to_string(),
262 secondary_color: "#64748b".to_string(),
263 background_color: "#ffffff".to_string(),
264 text_color: "#1f2937".to_string(),
265 accent_color: "#10b981".to_string(),
266 chart_colors: vec![
267 "#3b82f6".to_string(),
268 "#ef4444".to_string(),
269 "#10b981".to_string(),
270 "#f59e0b".to_string(),
271 "#8b5cf6".to_string(),
272 ],
273 dark_mode: false,
274 font_family: "Inter, sans-serif".to_string(),
275 border_radius: 8,
276 }
277 }
278}
279
280#[derive(Debug, Clone, Serialize, Deserialize)]
282pub enum ExportFormat {
283 JSON,
284 CSV,
285 MessagePack,
286}
287
288#[derive(Debug, Clone, Serialize, Deserialize)]
290pub struct SystemStats {
291 pub uptime: u64,
292 pub total_alerts: usize,
293 pub active_connections: usize,
294 pub data_points_collected: usize,
295 pub memory_usage_mb: f64,
296 pub cpu_usage_percent: f64,
297}
298
299#[derive(Debug)]
301pub struct RealtimeDashboard {
302 config: Arc<Mutex<DashboardConfig>>,
303 session_id: String,
304 start_time: Instant,
305 metric_data: Arc<Mutex<HashMap<MetricCategory, VecDeque<MetricDataPoint>>>>,
306 alert_history: Arc<Mutex<VecDeque<DashboardAlert>>>,
307 websocket_sender: broadcast::Sender<WebSocketMessage>,
308 active_connections: Arc<Mutex<usize>>,
309 total_data_points: Arc<Mutex<usize>>,
310 is_running: Arc<Mutex<bool>>,
311}
312
313impl RealtimeDashboard {
314 pub fn new(config: DashboardConfig) -> Self {
316 let (websocket_sender, _) = broadcast::channel(1000);
317
318 Self {
319 config: Arc::new(Mutex::new(config)),
320 session_id: Uuid::new_v4().to_string(),
321 start_time: Instant::now(),
322 metric_data: Arc::new(Mutex::new(HashMap::new())),
323 alert_history: Arc::new(Mutex::new(VecDeque::new())),
324 websocket_sender,
325 active_connections: Arc::new(Mutex::new(0)),
326 total_data_points: Arc::new(Mutex::new(0)),
327 is_running: Arc::new(Mutex::new(false)),
328 }
329 }
330
331 pub async fn start(&self) -> Result<()> {
333 {
334 let mut running = self
335 .is_running
336 .lock()
337 .map_err(|_| anyhow::anyhow!("Failed to acquire running state lock"))?;
338 if *running {
339 return Ok(());
340 }
341 *running = true;
342 }
343
344 self.start_data_collection().await?;
346
347 self.start_system_stats_updates().await?;
349
350 self.start_alert_monitoring().await?;
352
353 Ok(())
354 }
355
356 pub fn stop(&self) {
358 if let Ok(mut running) = self.is_running.lock() {
359 *running = false;
360 }
361 }
362
363 pub fn add_metric(&self, category: MetricCategory, label: String, value: f64) -> Result<()> {
365 let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis() as u64;
366
367 let data_point = MetricDataPoint {
368 timestamp,
369 value,
370 label,
371 category: category.clone(),
372 };
373
374 {
376 let mut data = self
377 .metric_data
378 .lock()
379 .map_err(|_| anyhow::anyhow!("Failed to acquire metric data lock"))?;
380 let category_data = data.entry(category.clone()).or_insert_with(VecDeque::new);
381
382 category_data.push_back(data_point.clone());
383
384 let max_points = self
385 .config
386 .lock()
387 .map_err(|_| anyhow::anyhow!("Failed to acquire config lock"))?
388 .max_data_points;
389 while category_data.len() > max_points {
390 category_data.pop_front();
391 }
392 }
393
394 {
396 if let Ok(mut total) = self.total_data_points.lock() {
397 *total += 1;
398 }
399 }
400
401 let message = WebSocketMessage::MetricUpdate {
403 data: vec![data_point],
404 };
405
406 let _ = self.websocket_sender.send(message);
407
408 self.check_for_alerts(&category, value);
410
411 Ok(())
412 }
413
414 pub fn add_metrics(&self, metrics: Vec<(MetricCategory, String, f64)>) -> Result<()> {
416 let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis() as u64;
417
418 let mut data_points = Vec::new();
419
420 for (category, label, value) in metrics {
422 let data_point = MetricDataPoint {
423 timestamp,
424 value,
425 label,
426 category: category.clone(),
427 };
428
429 {
431 let mut data = self.metric_data.lock().unwrap();
432 let category_data = data.entry(category.clone()).or_default();
433 category_data.push_back(data_point.clone());
434
435 let max_points = self.config.lock().unwrap().max_data_points;
436 while category_data.len() > max_points {
437 category_data.pop_front();
438 }
439 }
440
441 data_points.push(data_point);
442
443 self.check_for_alerts(&category, value);
445 }
446
447 {
449 let mut total = self.total_data_points.lock().unwrap();
450 *total += data_points.len();
451 }
452
453 let message = WebSocketMessage::MetricUpdate { data: data_points };
455 let _ = self.websocket_sender.send(message);
456
457 Ok(())
458 }
459
460 pub fn create_alert(
462 &self,
463 severity: AlertSeverity,
464 category: MetricCategory,
465 title: String,
466 message: String,
467 value: Option<f64>,
468 threshold: Option<f64>,
469 ) -> Result<()> {
470 let alert = DashboardAlert {
471 id: Uuid::new_v4().to_string(),
472 timestamp: SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis() as u64,
473 severity,
474 category,
475 title,
476 message,
477 value,
478 threshold,
479 };
480
481 {
483 let mut history = self.alert_history.lock().unwrap();
484 history.push_back(alert.clone());
485
486 while history.len() > 100 {
488 history.pop_front();
489 }
490 }
491
492 let message = WebSocketMessage::Alert { alert };
494 let _ = self.websocket_sender.send(message);
495
496 Ok(())
497 }
498
499 pub fn get_historical_data(&self, category: &MetricCategory) -> Vec<MetricDataPoint> {
501 let data = self.metric_data.lock().unwrap();
502 data.get(category)
503 .map(|deque| deque.iter().cloned().collect())
504 .unwrap_or_default()
505 }
506
507 pub fn get_system_stats(&self) -> SystemStats {
509 let uptime = self.start_time.elapsed().as_secs();
510 let total_alerts = self.alert_history.lock().unwrap().len();
511 let active_connections = *self.active_connections.lock().unwrap();
512 let data_points_collected = *self.total_data_points.lock().unwrap();
513
514 let memory_usage_mb = self.estimate_memory_usage();
516 let cpu_usage_percent = self.estimate_cpu_usage();
517
518 SystemStats {
519 uptime,
520 total_alerts,
521 active_connections,
522 data_points_collected,
523 memory_usage_mb,
524 cpu_usage_percent,
525 }
526 }
527
528 pub fn subscribe(&self) -> BroadcastStream<WebSocketMessage> {
530 {
532 let mut connections = self.active_connections.lock().unwrap();
533 *connections += 1;
534 }
535
536 BroadcastStream::new(self.websocket_sender.subscribe())
537 }
538
539 pub fn update_config(&self, new_config: DashboardConfig) -> Result<()> {
541 {
542 let mut config = self.config.lock().unwrap();
543 *config = new_config.clone();
544 }
545
546 let message = WebSocketMessage::ConfigUpdate { config: new_config };
548 let _ = self.websocket_sender.send(message);
549
550 Ok(())
551 }
552
553 pub fn get_config(&self) -> DashboardConfig {
555 self.config.lock().unwrap().clone()
556 }
557
558 async fn start_data_collection(&self) -> Result<()> {
560 let config = self.config.clone();
561 let _metric_data = self.metric_data.clone();
562 let websocket_sender = self.websocket_sender.clone();
563 let is_running = self.is_running.clone();
564
565 tokio::spawn(async move {
566 let mut interval = interval(Duration::from_millis(
567 config.lock().unwrap().update_frequency_ms,
568 ));
569
570 while *is_running.lock().unwrap() {
571 interval.tick().await;
572
573 if let Ok(metrics) = Self::collect_system_metrics(&config).await {
575 let message = WebSocketMessage::MetricUpdate { data: metrics };
576 let _ = websocket_sender.send(message);
577 }
578 }
579 });
580
581 Ok(())
582 }
583
584 async fn start_system_stats_updates(&self) -> Result<()> {
586 let websocket_sender = self.websocket_sender.clone();
587 let start_time = self.start_time;
588 let alert_history = self.alert_history.clone();
589 let active_connections = self.active_connections.clone();
590 let total_data_points = self.total_data_points.clone();
591 let is_running = self.is_running.clone();
592
593 tokio::spawn(async move {
594 let mut interval = interval(Duration::from_secs(5)); while *is_running.lock().unwrap() {
597 interval.tick().await;
598
599 let stats = SystemStats {
600 uptime: start_time.elapsed().as_secs(),
601 total_alerts: alert_history.lock().unwrap().len(),
602 active_connections: *active_connections.lock().unwrap(),
603 data_points_collected: *total_data_points.lock().unwrap(),
604 memory_usage_mb: 0.0, cpu_usage_percent: 0.0, };
607
608 let message = WebSocketMessage::SystemStats { stats };
609 let _ = websocket_sender.send(message);
610 }
611 });
612
613 Ok(())
614 }
615
616 async fn start_alert_monitoring(&self) -> Result<()> {
618 let config = self.config.clone();
619 let metric_data = self.metric_data.clone();
620 let is_running = self.is_running.clone();
621
622 tokio::spawn(async move {
623 let mut interval = interval(Duration::from_secs(1));
624
625 while *is_running.lock().unwrap() {
626 interval.tick().await;
627
628 Self::check_threshold_breaches(&config, &metric_data).await;
630 }
631 });
632
633 Ok(())
634 }
635
636 async fn collect_system_metrics(
638 config: &Arc<Mutex<DashboardConfig>>,
639 ) -> Result<Vec<MetricDataPoint>> {
640 let mut metrics = Vec::new();
641 let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis() as u64;
642
643 let cfg = config.lock().unwrap();
644
645 if cfg.enable_memory_profiling {
646 let memory_usage = Self::get_memory_usage();
648 metrics.push(MetricDataPoint {
649 timestamp,
650 value: memory_usage,
651 label: "Memory Usage".to_string(),
652 category: MetricCategory::Memory,
653 });
654 }
655
656 if cfg.enable_gpu_monitoring {
657 let gpu_utilization = Self::get_gpu_utilization();
659 metrics.push(MetricDataPoint {
660 timestamp,
661 value: gpu_utilization,
662 label: "GPU Utilization".to_string(),
663 category: MetricCategory::GPU,
664 });
665
666 let gpu_memory = Self::get_gpu_memory_usage();
667 metrics.push(MetricDataPoint {
668 timestamp,
669 value: gpu_memory,
670 label: "GPU Memory".to_string(),
671 category: MetricCategory::GPU,
672 });
673 }
674
675 Ok(metrics)
676 }
677
678 fn check_for_alerts(&self, category: &MetricCategory, value: f64) {
680 let config = self.config.lock().unwrap();
681 let thresholds = &config.alert_thresholds;
682
683 match category {
684 MetricCategory::Memory => {
685 if value > thresholds.memory_threshold {
686 let _ = self.create_alert(
687 AlertSeverity::Warning,
688 category.clone(),
689 "High Memory Usage".to_string(),
690 format!(
691 "Memory usage is {:.1}% (threshold: {:.1}%)",
692 value, thresholds.memory_threshold
693 ),
694 Some(value),
695 Some(thresholds.memory_threshold),
696 );
697 }
698 },
699 MetricCategory::GPU => {
700 if value > thresholds.gpu_utilization_threshold {
701 let _ = self.create_alert(
702 AlertSeverity::Warning,
703 category.clone(),
704 "High GPU Utilization".to_string(),
705 format!(
706 "GPU utilization is {:.1}% (threshold: {:.1}%)",
707 value, thresholds.gpu_utilization_threshold
708 ),
709 Some(value),
710 Some(thresholds.gpu_utilization_threshold),
711 );
712 }
713 },
714 MetricCategory::Training => {
715 if value > thresholds.loss_spike_threshold {
716 let _ = self.create_alert(
717 AlertSeverity::Error,
718 category.clone(),
719 "Training Loss Spike".to_string(),
720 format!(
721 "Loss spike detected: {:.4} (threshold: {:.4})",
722 value, thresholds.loss_spike_threshold
723 ),
724 Some(value),
725 Some(thresholds.loss_spike_threshold),
726 );
727 }
728 },
729 _ => {},
730 }
731 }
732
733 async fn check_threshold_breaches(
735 config: &Arc<Mutex<DashboardConfig>>,
736 metric_data: &Arc<Mutex<HashMap<MetricCategory, VecDeque<MetricDataPoint>>>>,
737 ) {
738 let _config = config.lock().unwrap();
739 let _data = metric_data.lock().unwrap();
740
741 }
744
745 fn get_memory_usage() -> f64 {
747 50.0 + (thread_rng().random::<f64>() * 40.0)
749 }
750
751 fn get_gpu_utilization() -> f64 {
753 30.0 + (thread_rng().random::<f64>() * 60.0)
755 }
756
757 fn get_gpu_memory_usage() -> f64 {
759 40.0 + (thread_rng().random::<f64>() * 50.0)
761 }
762
763 fn estimate_memory_usage(&self) -> f64 {
765 let data = self.metric_data.lock().unwrap();
766 let mut total_points = 0;
767
768 for deque in data.values() {
769 total_points += deque.len();
770 }
771
772 (total_points * 100) as f64 / (1024.0 * 1024.0)
774 }
775
776 fn estimate_cpu_usage(&self) -> f64 {
778 5.0 + (thread_rng().random::<f64>() * 10.0)
780 }
781
782 pub async fn detect_metric_anomalies(
784 &self,
785 category: &MetricCategory,
786 ) -> Result<Vec<AnomalyDetection>> {
787 let data = self.get_historical_data(category);
788 let mut anomalies = Vec::new();
789
790 if data.len() < 10 {
791 return Ok(anomalies); }
793
794 let values: Vec<f64> = data.iter().map(|d| d.value).collect();
796 let mean = values.iter().sum::<f64>() / values.len() as f64;
797 let variance = values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / values.len() as f64;
798 let std_dev = variance.sqrt();
799
800 let z_threshold = 2.0; for point in data.iter() {
803 let z_score = (point.value - mean).abs() / std_dev;
804 if z_score > z_threshold {
805 let anomaly_type =
806 if point.value > mean { AnomalyType::Spike } else { AnomalyType::Drop };
807
808 anomalies.push(AnomalyDetection {
809 timestamp: point.timestamp,
810 value: point.value,
811 expected_range: (mean - std_dev, mean + std_dev),
812 anomaly_type,
813 confidence_score: (z_score - z_threshold) / z_threshold,
814 category: category.clone(),
815 description: format!(
816 "Detected {} in {} metrics: value {} (Z-score: {:.2})",
817 match anomaly_type {
818 AnomalyType::Spike => "spike",
819 AnomalyType::Drop => "drop",
820 _ => "anomaly",
821 },
822 match category {
823 MetricCategory::Training => "training",
824 MetricCategory::Memory => "memory",
825 MetricCategory::GPU => "GPU",
826 MetricCategory::Network => "network",
827 MetricCategory::Performance => "performance",
828 MetricCategory::Custom(name) => name,
829 },
830 point.value,
831 z_score
832 ),
833 });
834 }
835 }
836
837 if data.len() >= 20 {
839 let recent_window = &data[data.len() - 10..];
840 let earlier_window = &data[data.len() - 20..data.len() - 10];
841
842 let recent_avg =
843 recent_window.iter().map(|d| d.value).sum::<f64>() / recent_window.len() as f64;
844 let earlier_avg =
845 earlier_window.iter().map(|d| d.value).sum::<f64>() / earlier_window.len() as f64;
846
847 let trend_change = (recent_avg - earlier_avg) / earlier_avg;
848
849 if trend_change.abs() > 0.3 {
850 anomalies.push(AnomalyDetection {
852 timestamp: recent_window.last().unwrap().timestamp,
853 value: recent_avg,
854 expected_range: (earlier_avg * 0.9, earlier_avg * 1.1),
855 anomaly_type: if trend_change > 0.0 {
856 AnomalyType::GradualIncrease
857 } else {
858 AnomalyType::GradualDecrease
859 },
860 confidence_score: trend_change.abs(),
861 category: category.clone(),
862 description: format!(
863 "Detected gradual {} trend: {:.1}% change over recent measurements",
864 if trend_change > 0.0 { "increase" } else { "decrease" },
865 trend_change.abs() * 100.0
866 ),
867 });
868 }
869 }
870
871 Ok(anomalies)
872 }
873
874 pub fn generate_advanced_visualizations(&self) -> Result<DashboardVisualizationData> {
876 let mut heatmap_data = HashMap::new();
877 let mut time_series_data = HashMap::new();
878 let mut correlation_matrix = Vec::new();
879 let mut performance_distribution = HashMap::new();
880
881 for (category, data) in self.metric_data.lock().unwrap().iter() {
883 if data.len() >= 10 {
884 let recent_data: Vec<f64> = data.iter().rev().take(10).map(|d| d.value).collect();
885 let avg_value = recent_data.iter().sum::<f64>() / recent_data.len() as f64;
886
887 heatmap_data.insert(
888 category.clone(),
889 HeatmapData {
890 intensity: avg_value,
891 normalized_intensity: (avg_value / (avg_value + 1.0)).min(1.0), data_points: recent_data.len(),
893 timestamp: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(),
894 },
895 );
896
897 let time_series: Vec<TimeSeriesPoint> = data
899 .iter()
900 .map(|d| TimeSeriesPoint {
901 timestamp: d.timestamp,
902 value: d.value,
903 label: d.label.clone(),
904 })
905 .collect();
906
907 time_series_data.insert(category.clone(), time_series);
908
909 let values: Vec<f64> = data.iter().map(|d| d.value).collect();
911 let histogram = self.create_histogram(&values, 10);
912 performance_distribution.insert(category.clone(), histogram);
913 }
914 }
915
916 let categories: Vec<&MetricCategory> = heatmap_data.keys().collect();
918 for (i, cat1) in categories.iter().enumerate() {
919 let mut row = Vec::new();
920 for (j, cat2) in categories.iter().enumerate() {
921 if i == j {
922 row.push(1.0); } else {
924 let corr = self.calculate_correlation_coefficient(cat1, cat2);
926 row.push(corr);
927 }
928 }
929 correlation_matrix.push(row);
930 }
931
932 Ok(DashboardVisualizationData {
933 heatmap_data,
934 time_series_data,
935 correlation_matrix,
936 performance_distribution,
937 generated_at: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(),
938 session_id: self.session_id.clone(),
939 })
940 }
941
942 pub async fn predict_performance_trends(
944 &self,
945 category: &MetricCategory,
946 hours_ahead: u64,
947 ) -> Result<PerformancePrediction> {
948 let data = self.get_historical_data(category);
949
950 if data.len() < 20 {
951 return Err(anyhow::anyhow!(
952 "Insufficient data for prediction (need at least 20 points)"
953 ));
954 }
955
956 let values: Vec<f64> = data.iter().map(|d| d.value).collect();
957 let timestamps: Vec<u64> = data.iter().map(|d| d.timestamp).collect();
958
959 let n = values.len() as f64;
961 let sum_x = timestamps.iter().sum::<u64>() as f64;
962 let sum_y = values.iter().sum::<f64>();
963 let sum_xy = timestamps.iter().zip(&values).map(|(x, y)| *x as f64 * y).sum::<f64>();
964 let sum_x2 = timestamps.iter().map(|x| (*x as f64).powi(2)).sum::<f64>();
965
966 let slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x.powi(2));
967 let intercept = (sum_y - slope * sum_x) / n;
968
969 let current_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
971 let prediction_time = current_time + (hours_ahead * 3600);
972 let predicted_value = slope * prediction_time as f64 + intercept;
973
974 let mean = sum_y / n;
976 let variance = values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / n;
977 let std_error = (variance / n).sqrt();
978 let confidence_interval = std_error * 1.96; let trend_strength = slope.abs() / mean.abs();
982 let trend_direction = if slope > 0.01 {
983 TrendDirection::Increasing
984 } else if slope < -0.01 {
985 TrendDirection::Decreasing
986 } else {
987 TrendDirection::Stable
988 };
989
990 Ok(PerformancePrediction {
991 category: category.clone(),
992 predicted_value,
993 confidence_interval: (
994 predicted_value - confidence_interval,
995 predicted_value + confidence_interval,
996 ),
997 trend_direction,
998 trend_strength,
999 prediction_horizon_hours: hours_ahead,
1000 model_accuracy: 1.0 - (std_error / mean.abs()).min(1.0), generated_at: current_time,
1002 recommendations: self.generate_performance_recommendations(
1003 &trend_direction,
1004 trend_strength,
1005 predicted_value,
1006 ),
1007 })
1008 }
1009
1010 pub fn apply_dashboard_theme(&self, theme: DashboardTheme) -> Result<()> {
1012 let theme_message = WebSocketMessage::Generic {
1014 message_type: "theme_update".to_string(),
1015 data: serde_json::to_value(&theme)?,
1016 timestamp: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(),
1017 session_id: self.session_id.clone(),
1018 };
1019
1020 if self.websocket_sender.send(theme_message).is_err() {
1021 }
1023
1024 Ok(())
1025 }
1026
1027 pub async fn export_dashboard_data(
1029 &self,
1030 format: ExportFormat,
1031 time_range: Option<(u64, u64)>,
1032 ) -> Result<Vec<u8>> {
1033 let data = if let Some((start, end)) = time_range {
1034 self.get_filtered_data(start, end)
1035 } else {
1036 self.get_all_data()
1037 };
1038
1039 match format {
1040 ExportFormat::JSON => {
1041 let json_data = serde_json::to_string_pretty(&data)?;
1042 Ok(json_data.into_bytes())
1043 },
1044 ExportFormat::CSV => {
1045 let mut csv_data = String::from("timestamp,category,label,value\n");
1046 for (category, points) in data {
1047 for point in points {
1048 csv_data.push_str(&format!(
1049 "{},{:?},{},{}\n",
1050 point.timestamp, category, point.label, point.value
1051 ));
1052 }
1053 }
1054 Ok(csv_data.into_bytes())
1055 },
1056 ExportFormat::MessagePack => {
1057 let json_data = serde_json::to_string(&data)?;
1060 Ok(json_data.into_bytes())
1061 },
1062 }
1063 }
1064
1065 fn create_histogram(&self, values: &[f64], bins: usize) -> HistogramData {
1068 if values.is_empty() {
1069 return HistogramData {
1070 bins: Vec::new(),
1071 max_frequency: 0,
1072 };
1073 }
1074
1075 let min_val = values.iter().fold(f64::INFINITY, |a, &b| a.min(b));
1076 let max_val = values.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
1077 let bin_width = (max_val - min_val) / bins as f64;
1078
1079 let mut histogram_bins = vec![0; bins];
1080
1081 for &value in values {
1082 let bin_idx = ((value - min_val) / bin_width).floor() as usize;
1083 let bin_idx = bin_idx.min(bins - 1); histogram_bins[bin_idx] += 1;
1085 }
1086
1087 let max_frequency = *histogram_bins.iter().max().unwrap_or(&0);
1088
1089 let bins_data: Vec<HistogramBin> = histogram_bins
1090 .into_iter()
1091 .enumerate()
1092 .map(|(i, count)| HistogramBin {
1093 range_start: min_val + i as f64 * bin_width,
1094 range_end: min_val + (i + 1) as f64 * bin_width,
1095 frequency: count,
1096 })
1097 .collect();
1098
1099 HistogramData {
1100 bins: bins_data,
1101 max_frequency,
1102 }
1103 }
1104
1105 fn calculate_correlation_coefficient(
1106 &self,
1107 cat1: &MetricCategory,
1108 cat2: &MetricCategory,
1109 ) -> f64 {
1110 let data = self.metric_data.lock().unwrap();
1111
1112 let data1 = match data.get(cat1) {
1113 Some(d) => d,
1114 None => return 0.0,
1115 };
1116
1117 let data2 = match data.get(cat2) {
1118 Some(d) => d,
1119 None => return 0.0,
1120 };
1121
1122 if data1.len() < 2 || data2.len() < 2 {
1123 return 0.0;
1124 }
1125
1126 let min_len = data1.len().min(data2.len()).min(50); let values1: Vec<f64> = data1.iter().rev().take(min_len).map(|d| d.value).collect();
1129 let values2: Vec<f64> = data2.iter().rev().take(min_len).map(|d| d.value).collect();
1130
1131 let n = values1.len() as f64;
1133 let mean1 = values1.iter().sum::<f64>() / n;
1134 let mean2 = values2.iter().sum::<f64>() / n;
1135
1136 let covariance = values1
1137 .iter()
1138 .zip(&values2)
1139 .map(|(v1, v2)| (v1 - mean1) * (v2 - mean2))
1140 .sum::<f64>()
1141 / n;
1142
1143 let std1 = (values1.iter().map(|v| (v - mean1).powi(2)).sum::<f64>() / n).sqrt();
1144 let std2 = (values2.iter().map(|v| (v - mean2).powi(2)).sum::<f64>() / n).sqrt();
1145
1146 if std1 == 0.0 || std2 == 0.0 {
1147 0.0
1148 } else {
1149 covariance / (std1 * std2)
1150 }
1151 }
1152
1153 fn generate_performance_recommendations(
1154 &self,
1155 trend: &TrendDirection,
1156 strength: f64,
1157 predicted_value: f64,
1158 ) -> Vec<String> {
1159 let mut recommendations = Vec::new();
1160
1161 match trend {
1162 TrendDirection::Increasing => {
1163 if strength > 0.1 {
1164 recommendations.push(
1165 "Monitor for potential resource exhaustion due to increasing trend"
1166 .to_string(),
1167 );
1168 recommendations.push("Consider scaling resources proactively".to_string());
1169 }
1170 if predicted_value > 90.0 {
1171 recommendations.push(
1172 "Critical threshold approaching - immediate action recommended".to_string(),
1173 );
1174 }
1175 },
1176 TrendDirection::Decreasing => {
1177 if strength > 0.05 {
1178 recommendations
1179 .push("Investigate potential performance degradation".to_string());
1180 recommendations.push("Check for resource leaks or inefficiencies".to_string());
1181 }
1182 },
1183 TrendDirection::Stable => {
1184 recommendations
1185 .push("Performance trend is stable - continue monitoring".to_string());
1186 },
1187 }
1188
1189 if recommendations.is_empty() {
1190 recommendations.push("No specific recommendations at this time".to_string());
1191 }
1192
1193 recommendations
1194 }
1195
1196 fn get_filtered_data(
1197 &self,
1198 start: u64,
1199 end: u64,
1200 ) -> HashMap<MetricCategory, VecDeque<MetricDataPoint>> {
1201 let data = self.metric_data.lock().unwrap();
1202 let mut filtered_data = HashMap::new();
1203
1204 for (category, points) in data.iter() {
1205 let filtered_points: VecDeque<MetricDataPoint> = points
1206 .iter()
1207 .filter(|p| p.timestamp >= start && p.timestamp <= end)
1208 .cloned()
1209 .collect();
1210
1211 if !filtered_points.is_empty() {
1212 filtered_data.insert(category.clone(), filtered_points);
1213 }
1214 }
1215
1216 filtered_data
1217 }
1218
1219 fn get_all_data(&self) -> HashMap<MetricCategory, VecDeque<MetricDataPoint>> {
1220 self.metric_data.lock().unwrap().clone()
1221 }
1222}
1223
1224#[derive(Debug, Default)]
1226pub struct DashboardBuilder {
1227 config: DashboardConfig,
1228}
1229
1230impl DashboardBuilder {
1231 pub fn new() -> Self {
1233 Self::default()
1234 }
1235
1236 pub fn port(mut self, port: u16) -> Self {
1238 self.config.websocket_port = port;
1239 self
1240 }
1241
1242 pub fn update_frequency(mut self, frequency_ms: u64) -> Self {
1244 self.config.update_frequency_ms = frequency_ms;
1245 self
1246 }
1247
1248 pub fn max_data_points(mut self, max_points: usize) -> Self {
1250 self.config.max_data_points = max_points;
1251 self
1252 }
1253
1254 pub fn gpu_monitoring(mut self, enabled: bool) -> Self {
1256 self.config.enable_gpu_monitoring = enabled;
1257 self
1258 }
1259
1260 pub fn memory_profiling(mut self, enabled: bool) -> Self {
1262 self.config.enable_memory_profiling = enabled;
1263 self
1264 }
1265
1266 pub fn alert_thresholds(mut self, thresholds: AlertThresholds) -> Self {
1268 self.config.alert_thresholds = thresholds;
1269 self
1270 }
1271
1272 pub fn build(self) -> RealtimeDashboard {
1274 RealtimeDashboard::new(self.config)
1275 }
1276}
1277
1278#[cfg(test)]
1279mod tests {
1280 use super::*;
1281 use futures::StreamExt;
1282 use std::time::Duration;
1283
1284 #[tokio::test]
1285 async fn test_dashboard_creation() {
1286 let dashboard = DashboardBuilder::new()
1287 .port(8081)
1288 .update_frequency(50)
1289 .max_data_points(500)
1290 .build();
1291
1292 assert_eq!(dashboard.get_config().websocket_port, 8081);
1293 assert_eq!(dashboard.get_config().update_frequency_ms, 50);
1294 assert_eq!(dashboard.get_config().max_data_points, 500);
1295 }
1296
1297 #[tokio::test]
1298 async fn test_metric_addition() {
1299 let dashboard = DashboardBuilder::new().build();
1300
1301 let result = dashboard.add_metric(MetricCategory::Training, "loss".to_string(), 0.5);
1302
1303 assert!(result.is_ok());
1304
1305 let historical_data = dashboard.get_historical_data(&MetricCategory::Training);
1306 assert_eq!(historical_data.len(), 1);
1307 assert_eq!(historical_data[0].value, 0.5);
1308 assert_eq!(historical_data[0].label, "loss");
1309 }
1310
1311 #[tokio::test]
1312 async fn test_batch_metrics() {
1313 let dashboard = DashboardBuilder::new().build();
1314
1315 let metrics = vec![
1316 (MetricCategory::Training, "loss".to_string(), 0.5),
1317 (MetricCategory::Training, "accuracy".to_string(), 0.9),
1318 (MetricCategory::GPU, "utilization".to_string(), 75.0),
1319 ];
1320
1321 let result = dashboard.add_metrics(metrics);
1322 assert!(result.is_ok());
1323
1324 let training_data = dashboard.get_historical_data(&MetricCategory::Training);
1325 assert_eq!(training_data.len(), 2);
1326
1327 let gpu_data = dashboard.get_historical_data(&MetricCategory::GPU);
1328 assert_eq!(gpu_data.len(), 1);
1329 }
1330
1331 #[tokio::test]
1332 async fn test_alert_creation() {
1333 let dashboard = DashboardBuilder::new().build();
1334
1335 let result = dashboard.create_alert(
1336 AlertSeverity::Warning,
1337 MetricCategory::Memory,
1338 "High Memory".to_string(),
1339 "Memory usage is high".to_string(),
1340 Some(95.0),
1341 Some(90.0),
1342 );
1343
1344 assert!(result.is_ok());
1345
1346 let history = dashboard.alert_history.lock().unwrap();
1347 assert_eq!(history.len(), 1);
1348 assert_eq!(history[0].title, "High Memory");
1349 }
1350
1351 #[tokio::test]
1352 async fn test_websocket_subscription() {
1353 let dashboard = DashboardBuilder::new().build();
1354
1355 let mut stream = dashboard.subscribe();
1356
1357 let dashboard_clone = Arc::new(dashboard);
1359 let dashboard_for_task = dashboard_clone.clone();
1360
1361 tokio::spawn(async move {
1362 let _ = dashboard_for_task.start().await;
1363 });
1364
1365 let _ =
1367 dashboard_clone.add_metric(MetricCategory::Training, "test_metric".to_string(), 42.0);
1368
1369 let message_result = tokio::time::timeout(Duration::from_millis(100), stream.next()).await;
1371
1372 dashboard_clone.stop();
1373
1374 assert!(message_result.is_ok());
1376 if let Ok(Some(Ok(message))) = message_result {
1377 match message {
1378 WebSocketMessage::MetricUpdate { data } => {
1379 assert!(!data.is_empty());
1380 assert_eq!(data[0].value, 42.0);
1381 assert_eq!(data[0].label, "test_metric");
1382 },
1383 _ => panic!("Expected MetricUpdate message"),
1384 }
1385 }
1386 }
1387
1388 #[tokio::test]
1389 async fn test_system_stats() {
1390 let dashboard = DashboardBuilder::new().build();
1391
1392 let _ = dashboard.add_metric(MetricCategory::Training, "loss".to_string(), 0.5);
1394 let _ = dashboard.create_alert(
1395 AlertSeverity::Info,
1396 MetricCategory::Training,
1397 "Test Alert".to_string(),
1398 "Test message".to_string(),
1399 None,
1400 None,
1401 );
1402
1403 let stats = dashboard.get_system_stats();
1404
1405 assert_eq!(stats.data_points_collected, 1);
1406 assert_eq!(stats.total_alerts, 1);
1407 }
1409
1410 #[tokio::test]
1411 async fn test_data_point_limit() {
1412 let dashboard = DashboardBuilder::new().max_data_points(2).build();
1413
1414 let _ = dashboard.add_metric(MetricCategory::Training, "metric1".to_string(), 1.0);
1416 let _ = dashboard.add_metric(MetricCategory::Training, "metric2".to_string(), 2.0);
1417 let _ = dashboard.add_metric(MetricCategory::Training, "metric3".to_string(), 3.0);
1418
1419 let data = dashboard.get_historical_data(&MetricCategory::Training);
1420
1421 assert_eq!(data.len(), 2);
1423 assert_eq!(data[0].value, 2.0); assert_eq!(data[1].value, 3.0); }
1426}