1#![allow(dead_code)]
9use crate::distributed_memory_optimization::{
10 DistributedMemoryOptimizer, MemoryOptimizationStatus,
11};
12use crate::distributed_monitoring::{ClusterSummary, DistributedMonitor, NodeMetrics};
13use crate::enhanced_fault_tolerance::{EnhancedFaultTolerance, FaultToleranceStatus};
14use crate::{TorshDistributedError, TorshResult};
15use serde::{Deserialize, Serialize};
16use std::collections::VecDeque;
17use std::sync::{Arc, Mutex, RwLock};
18use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct TrainingAnalytics {
23 pub performance: TrainingPerformanceAnalytics,
25 pub resource_utilization: ResourceUtilizationAnalytics,
27 pub communication: CommunicationAnalytics,
29 pub system_health: SystemHealthAnalytics,
31 pub convergence: ConvergenceAnalytics,
33 pub efficiency: EfficiencyAnalytics,
35 pub timestamp_ms: u64,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct TrainingPerformanceAnalytics {
42 pub current_epoch: u32,
44 pub avg_loss: f32,
46 pub loss_trend: f32,
48 pub cluster_throughput: f32,
50 pub throughput_efficiency: f32,
52 pub avg_batch_time_ms: u64,
54 pub batch_time_variance: f32,
56 pub training_stability: f32,
58 pub estimated_completion_time: Option<Duration>,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct ResourceUtilizationAnalytics {
65 pub avg_cpu_utilization: f32,
67 pub avg_gpu_utilization: f32,
69 pub avg_memory_utilization: f32,
71 pub utilization_balance: f32,
73 pub peak_cpu: f32,
75 pub peak_gpu: f32,
76 pub peak_memory: f32,
77 pub resource_efficiency: f32,
79 pub primary_bottleneck: ResourceBottleneck,
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
85pub enum ResourceBottleneck {
86 CPU,
87 GPU,
88 Memory,
89 Network,
90 Storage,
91 None,
92}
93
94impl std::fmt::Display for ResourceBottleneck {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 match self {
97 ResourceBottleneck::CPU => write!(f, "CPU"),
98 ResourceBottleneck::GPU => write!(f, "GPU"),
99 ResourceBottleneck::Memory => write!(f, "Memory"),
100 ResourceBottleneck::Network => write!(f, "Network"),
101 ResourceBottleneck::Storage => write!(f, "Storage"),
102 ResourceBottleneck::None => write!(f, "None"),
103 }
104 }
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct CommunicationAnalytics {
110 pub avg_latency_us: u64,
112 pub bandwidth_utilization: f32,
114 pub efficiency_score: f32,
116 pub failed_operations_rate: f32,
118 pub communication_patterns: CommunicationPatterns,
120 pub congestion_level: f32,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct CommunicationPatterns {
127 pub allreduce_frequency: f32,
129 pub allgather_frequency: f32,
131 pub p2p_frequency: f32,
133 pub gradient_sync_frequency: f32,
135 pub hotspots: Vec<CommunicationHotspot>,
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct CommunicationHotspot {
142 pub source_node: String,
144 pub target_node: String,
146 pub traffic_volume: f32,
148 pub congestion_score: f32,
150}
151
152#[derive(Debug, Clone, Serialize, Deserialize)]
154pub struct SystemHealthAnalytics {
155 pub cluster_health_score: f32,
157 pub healthy_nodes: usize,
159 pub degraded_nodes: usize,
161 pub critical_nodes: usize,
163 pub failed_nodes: usize,
165 pub active_incidents: usize,
167 pub stability_trend: f32,
169 pub failure_probability: f32,
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct ConvergenceAnalytics {
176 pub convergence_rate: f32,
178 pub convergence_confidence: f32,
180 pub training_progress: f32,
182 pub loss_smoothness: f32,
184 pub gradient_norm_stats: GradientNormStats,
186 pub lr_effectiveness: f32,
188 pub overfitting_risk: f32,
190}
191
192#[derive(Debug, Clone, Serialize, Deserialize)]
194pub struct GradientNormStats {
195 pub avg_norm: f32,
197 pub norm_variance: f32,
199 pub norm_trend: f32,
201 pub explosion_risk: f32,
203}
204
205#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct EfficiencyAnalytics {
208 pub overall_efficiency: f32,
210 pub compute_efficiency: f32,
212 pub communication_efficiency: f32,
214 pub memory_efficiency: f32,
216 pub energy_efficiency: f32,
218 pub cost_efficiency: f32,
220 pub recommendations: Vec<OptimizationRecommendation>,
222}
223
224#[derive(Debug, Clone, Serialize, Deserialize)]
226pub struct OptimizationRecommendation {
227 pub category: RecommendationCategory,
229 pub title: String,
231 pub description: String,
233 pub expected_impact: f32,
235 pub difficulty: f32,
237 pub priority: u32,
239}
240
241#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
243pub enum RecommendationCategory {
244 Performance,
245 Efficiency,
246 Reliability,
247 Cost,
248 Scalability,
249}
250
251impl std::fmt::Display for RecommendationCategory {
252 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
253 match self {
254 RecommendationCategory::Performance => write!(f, "Performance"),
255 RecommendationCategory::Efficiency => write!(f, "Efficiency"),
256 RecommendationCategory::Reliability => write!(f, "Reliability"),
257 RecommendationCategory::Cost => write!(f, "Cost"),
258 RecommendationCategory::Scalability => write!(f, "Scalability"),
259 }
260 }
261}
262
263#[derive(Debug, Clone, Serialize, Deserialize)]
265pub struct DashboardConfig {
266 pub update_interval: Duration,
268 pub retention_period: Duration,
270 pub enable_predictions: bool,
272 pub enable_recommendations: bool,
274 pub aggregation_window: Duration,
276 pub alert_thresholds: DashboardAlertThresholds,
278}
279
280#[derive(Debug, Clone, Serialize, Deserialize)]
282pub struct DashboardAlertThresholds {
283 pub efficiency_threshold: f32,
285 pub utilization_threshold: f32,
287 pub latency_threshold: u64,
289 pub convergence_threshold: f32,
291}
292
293impl Default for DashboardConfig {
294 fn default() -> Self {
295 Self {
296 update_interval: Duration::from_secs(10),
297 retention_period: Duration::from_secs(24 * 3600), enable_predictions: true,
299 enable_recommendations: true,
300 aggregation_window: Duration::from_secs(60),
301 alert_thresholds: DashboardAlertThresholds {
302 efficiency_threshold: 0.7,
303 utilization_threshold: 0.9,
304 latency_threshold: 10000,
305 convergence_threshold: 0.1,
306 },
307 }
308 }
309}
310
311pub struct TrainingAnalyticsDashboard {
313 config: DashboardConfig,
315 monitor: Arc<DistributedMonitor>,
317 fault_tolerance: Arc<EnhancedFaultTolerance>,
319 memory_optimizer: Arc<DistributedMemoryOptimizer>,
321 current_analytics: Arc<RwLock<Option<TrainingAnalytics>>>,
323 analytics_history: Arc<Mutex<VecDeque<TrainingAnalytics>>>,
325 trend_analyzer: Arc<Mutex<TrendAnalyzer>>,
327 recommendation_engine: Arc<Mutex<RecommendationEngine>>,
329 last_update: Arc<Mutex<Instant>>,
331}
332
333#[derive(Debug)]
335struct TrendAnalyzer {
336 loss_history: VecDeque<(u64, f32)>, throughput_history: VecDeque<(u64, f32)>,
340 resource_history: VecDeque<(u64, ResourceSnapshot)>,
342 trend_window: Duration,
344}
345
346#[derive(Debug, Clone)]
348struct ResourceSnapshot {
349 cpu: f32,
350 gpu: f32,
351 memory: f32,
352}
353
354impl TrendAnalyzer {
355 fn new(trend_window: Duration) -> Self {
356 Self {
357 loss_history: VecDeque::with_capacity(1000),
358 throughput_history: VecDeque::with_capacity(1000),
359 resource_history: VecDeque::with_capacity(1000),
360 trend_window,
361 }
362 }
363
364 fn update_loss(&mut self, timestamp: u64, loss: f32) {
365 self.loss_history.push_back((timestamp, loss));
366 self.cleanup_old_data(timestamp);
367 }
368
369 fn update_throughput(&mut self, timestamp: u64, throughput: f32) {
370 self.throughput_history.push_back((timestamp, throughput));
371 self.cleanup_old_data(timestamp);
372 }
373
374 fn update_resources(&mut self, timestamp: u64, cpu: f32, gpu: f32, memory: f32) {
375 self.resource_history
376 .push_back((timestamp, ResourceSnapshot { cpu, gpu, memory }));
377 self.cleanup_old_data(timestamp);
378 }
379
380 fn cleanup_old_data(&mut self, current_timestamp: u64) {
381 let cutoff = current_timestamp.saturating_sub(self.trend_window.as_millis() as u64);
382
383 self.loss_history.retain(|(ts, _)| *ts >= cutoff);
384 self.throughput_history.retain(|(ts, _)| *ts >= cutoff);
385 self.resource_history.retain(|(ts, _)| *ts >= cutoff);
386 }
387
388 fn calculate_loss_trend(&self) -> f32 {
389 if self.loss_history.len() < 10 {
390 return 0.0;
391 }
392
393 let recent_data: Vec<f32> = self
394 .loss_history
395 .iter()
396 .rev()
397 .take(10)
398 .map(|(_, loss)| *loss)
399 .collect();
400 let late_avg = recent_data[..5].iter().sum::<f32>() / 5.0;
403 let early_avg = recent_data[5..].iter().sum::<f32>() / (recent_data.len() - 5) as f32;
404
405 (late_avg - early_avg) / early_avg.max(0.001) }
407
408 fn calculate_throughput_trend(&self) -> f32 {
409 if self.throughput_history.len() < 10 {
410 return 0.0;
411 }
412
413 let recent_data: Vec<f32> = self
414 .throughput_history
415 .iter()
416 .rev()
417 .take(10)
418 .map(|(_, tput)| *tput)
419 .collect();
420 let early_avg = recent_data[5..].iter().sum::<f32>() / (recent_data.len() - 5) as f32;
421 let late_avg = recent_data[..5].iter().sum::<f32>() / 5.0;
422
423 (late_avg - early_avg) / early_avg.max(0.001) }
425
426 fn calculate_stability(&self) -> f32 {
427 if self.loss_history.len() < 20 {
428 return 0.5; }
430
431 let recent_losses: Vec<f32> = self
432 .loss_history
433 .iter()
434 .rev()
435 .take(20)
436 .map(|(_, loss)| *loss)
437 .collect();
438 let mean = recent_losses.iter().sum::<f32>() / recent_losses.len() as f32;
439 let variance = recent_losses
440 .iter()
441 .map(|&x| (x - mean).powi(2))
442 .sum::<f32>()
443 / recent_losses.len() as f32;
444 let std_dev = variance.sqrt();
445
446 let cv = if mean > 0.001 { std_dev / mean } else { 1.0 };
448 (1.0 - cv.min(1.0)).max(0.0)
449 }
450}
451
452#[derive(Debug)]
454struct RecommendationEngine {
455 performance_history: VecDeque<PerformanceSnapshot>,
457 recommendation_cache: Vec<OptimizationRecommendation>,
459 last_generation: Instant,
461}
462
463#[derive(Debug, Clone)]
465struct PerformanceSnapshot {
466 timestamp: Instant,
467 throughput: f32,
468 efficiency: f32,
469 cpu_util: f32,
470 gpu_util: f32,
471 memory_util: f32,
472 communication_latency: u64,
473}
474
475impl RecommendationEngine {
476 fn new() -> Self {
477 Self {
478 performance_history: VecDeque::with_capacity(100),
479 recommendation_cache: Vec::new(),
480 last_generation: Instant::now(),
481 }
482 }
483
484 fn update_performance(
485 &mut self,
486 throughput: f32,
487 efficiency: f32,
488 cpu_util: f32,
489 gpu_util: f32,
490 memory_util: f32,
491 communication_latency: u64,
492 ) {
493 let snapshot = PerformanceSnapshot {
494 timestamp: Instant::now(),
495 throughput,
496 efficiency,
497 cpu_util,
498 gpu_util,
499 memory_util,
500 communication_latency,
501 };
502
503 self.performance_history.push_back(snapshot);
504 if self.performance_history.len() > 100 {
505 self.performance_history.pop_front();
506 }
507 }
508
509 fn generate_recommendations(&mut self) -> Vec<OptimizationRecommendation> {
510 if !self.recommendation_cache.is_empty() && self.last_generation.elapsed().as_secs() < 300 {
512 return self.recommendation_cache.clone();
513 }
514
515 let mut recommendations = Vec::new();
516
517 if self.performance_history.len() < 10 {
518 return recommendations;
519 }
520
521 let recent_perf: Vec<&PerformanceSnapshot> =
523 self.performance_history.iter().rev().take(10).collect();
524
525 let avg_gpu_util =
527 recent_perf.iter().map(|p| p.gpu_util).sum::<f32>() / recent_perf.len() as f32;
528 if avg_gpu_util < 70.0 {
529 recommendations.push(OptimizationRecommendation {
530 category: RecommendationCategory::Performance,
531 title: "Increase GPU Utilization".to_string(),
532 description: format!("GPU utilization is at {:.1}%. Consider increasing batch size or adjusting data loading.", avg_gpu_util),
533 expected_impact: 0.8,
534 difficulty: 0.3,
535 priority: 4,
536 });
537 }
538
539 let avg_memory_util =
541 recent_perf.iter().map(|p| p.memory_util).sum::<f32>() / recent_perf.len() as f32;
542 if avg_memory_util > 90.0 {
543 recommendations.push(OptimizationRecommendation {
544 category: RecommendationCategory::Efficiency,
545 title: "Optimize Memory Usage".to_string(),
546 description: format!("Memory utilization is at {:.1}%. Consider enabling gradient checkpointing or reducing batch size.", avg_memory_util),
547 expected_impact: 0.6,
548 difficulty: 0.4,
549 priority: 3,
550 });
551 }
552
553 let avg_latency = recent_perf
555 .iter()
556 .map(|p| p.communication_latency)
557 .sum::<u64>()
558 / recent_perf.len() as u64;
559 if avg_latency > 5000 {
560 recommendations.push(OptimizationRecommendation {
561 category: RecommendationCategory::Performance,
562 title: "Optimize Communication".to_string(),
563 description: format!("Communication latency is {}μs. Consider gradient compression or improving network connectivity.", avg_latency),
564 expected_impact: 0.7,
565 difficulty: 0.6,
566 priority: 3,
567 });
568 }
569
570 let efficiency_trend = self.calculate_efficiency_trend(&recent_perf);
572 if efficiency_trend < -0.1 {
573 recommendations.push(OptimizationRecommendation {
574 category: RecommendationCategory::Efficiency,
575 title: "Address Efficiency Decline".to_string(),
576 description: "Training efficiency is declining. Review resource allocation and check for bottlenecks.".to_string(),
577 expected_impact: 0.9,
578 difficulty: 0.7,
579 priority: 5,
580 });
581 }
582
583 let throughput_variance = self.calculate_throughput_variance(&recent_perf);
585 if throughput_variance > 0.2 {
586 recommendations.push(OptimizationRecommendation {
587 category: RecommendationCategory::Scalability,
588 title: "Improve Load Balancing".to_string(),
589 description: "High throughput variance detected. Consider redistributing workload across nodes.".to_string(),
590 expected_impact: 0.5,
591 difficulty: 0.8,
592 priority: 2,
593 });
594 }
595
596 recommendations.sort_by(|a, b| b.priority.cmp(&a.priority));
598
599 self.recommendation_cache = recommendations.clone();
600 self.last_generation = Instant::now();
601
602 recommendations
603 }
604
605 fn calculate_efficiency_trend(&self, recent_perf: &[&PerformanceSnapshot]) -> f32 {
606 if recent_perf.len() < 6 {
607 return 0.0;
608 }
609
610 let early_efficiency: f32 = recent_perf[3..].iter().map(|p| p.efficiency).sum::<f32>()
611 / (recent_perf.len() - 3) as f32;
612 let late_efficiency: f32 = recent_perf[..3].iter().map(|p| p.efficiency).sum::<f32>() / 3.0;
613
614 (late_efficiency - early_efficiency) / early_efficiency.max(0.001)
615 }
616
617 fn calculate_throughput_variance(&self, recent_perf: &[&PerformanceSnapshot]) -> f32 {
618 if recent_perf.len() < 5 {
619 return 0.0;
620 }
621
622 let throughputs: Vec<f32> = recent_perf.iter().map(|p| p.throughput).collect();
623 let mean = throughputs.iter().sum::<f32>() / throughputs.len() as f32;
624 let variance =
625 throughputs.iter().map(|&x| (x - mean).powi(2)).sum::<f32>() / throughputs.len() as f32;
626
627 if mean > 0.001 {
628 variance.sqrt() / mean } else {
630 0.0
631 }
632 }
633}
634
635impl TrainingAnalyticsDashboard {
636 pub fn new(
638 config: DashboardConfig,
639 monitor: Arc<DistributedMonitor>,
640 fault_tolerance: Arc<EnhancedFaultTolerance>,
641 memory_optimizer: Arc<DistributedMemoryOptimizer>,
642 ) -> Self {
643 Self {
644 config: config.clone(),
645 monitor,
646 fault_tolerance,
647 memory_optimizer,
648 current_analytics: Arc::new(RwLock::new(None)),
649 analytics_history: Arc::new(Mutex::new(VecDeque::new())),
650 trend_analyzer: Arc::new(Mutex::new(TrendAnalyzer::new(config.aggregation_window))),
651 recommendation_engine: Arc::new(Mutex::new(RecommendationEngine::new())),
652 last_update: Arc::new(Mutex::new(Instant::now())),
653 }
654 }
655
656 pub fn update_analytics(&self) -> TorshResult<()> {
658 {
660 let last_update = self.last_update.lock().map_err(|e| {
661 TorshDistributedError::communication_error(
662 "last_update",
663 format!("Lock error: {}", e),
664 )
665 })?;
666 if last_update.elapsed() < self.config.update_interval {
667 return Ok(());
668 }
669 }
670
671 let cluster_summary = self.monitor.get_cluster_summary().ok();
673
674 let fault_tolerance_status = self.fault_tolerance.get_status()?;
675 let memory_optimization_status = self.memory_optimizer.get_optimization_status()?;
676
677 let analytics = self.generate_training_analytics(
679 cluster_summary,
680 fault_tolerance_status,
681 memory_optimization_status,
682 )?;
683
684 {
686 let mut trend_analyzer = self.trend_analyzer.lock().map_err(|e| {
687 TorshDistributedError::communication_error(
688 "trend_analyzer",
689 format!("Lock error: {}", e),
690 )
691 })?;
692
693 let timestamp = SystemTime::now()
694 .duration_since(UNIX_EPOCH)
695 .expect("time should be after UNIX_EPOCH")
696 .as_millis() as u64;
697 trend_analyzer.update_loss(timestamp, analytics.performance.avg_loss);
698 trend_analyzer.update_throughput(timestamp, analytics.performance.cluster_throughput);
699 trend_analyzer.update_resources(
700 timestamp,
701 analytics.resource_utilization.avg_cpu_utilization,
702 analytics.resource_utilization.avg_gpu_utilization,
703 analytics.resource_utilization.avg_memory_utilization,
704 );
705 }
706
707 if self.config.enable_recommendations {
709 let mut recommendation_engine = self.recommendation_engine.lock().map_err(|e| {
710 TorshDistributedError::communication_error(
711 "recommendation_engine",
712 format!("Lock error: {}", e),
713 )
714 })?;
715
716 recommendation_engine.update_performance(
717 analytics.performance.cluster_throughput,
718 analytics.efficiency.overall_efficiency,
719 analytics.resource_utilization.avg_cpu_utilization,
720 analytics.resource_utilization.avg_gpu_utilization,
721 analytics.resource_utilization.avg_memory_utilization,
722 analytics.communication.avg_latency_us,
723 );
724 }
725
726 {
728 let mut current_analytics = self.current_analytics.write().map_err(|e| {
729 TorshDistributedError::communication_error(
730 "current_analytics",
731 format!("Lock error: {}", e),
732 )
733 })?;
734 *current_analytics = Some(analytics.clone());
735 }
736
737 {
739 let mut analytics_history = self.analytics_history.lock().map_err(|e| {
740 TorshDistributedError::communication_error(
741 "analytics_history",
742 format!("Lock error: {}", e),
743 )
744 })?;
745 analytics_history.push_back(analytics);
746
747 let retention_cutoff = (SystemTime::now()
749 .duration_since(UNIX_EPOCH)
750 .expect("time should be after UNIX_EPOCH")
751 .as_millis() as u64)
752 .saturating_sub(self.config.retention_period.as_millis() as u64);
753
754 analytics_history.retain(|a| a.timestamp_ms >= retention_cutoff);
755 }
756
757 {
759 let mut last_update = self.last_update.lock().map_err(|e| {
760 TorshDistributedError::communication_error(
761 "last_update",
762 format!("Lock error: {}", e),
763 )
764 })?;
765 *last_update = Instant::now();
766 }
767
768 Ok(())
769 }
770
771 fn generate_training_analytics(
773 &self,
774 cluster_summary: Option<ClusterSummary>,
775 fault_tolerance_status: FaultToleranceStatus,
776 memory_optimization_status: MemoryOptimizationStatus,
777 ) -> TorshResult<TrainingAnalytics> {
778 let timestamp_ms = SystemTime::now()
779 .duration_since(UNIX_EPOCH)
780 .expect("system time should be after UNIX_EPOCH")
781 .as_millis() as u64;
782
783 let current_metrics = self.monitor.get_current_metrics()?;
785
786 let performance =
788 self.generate_performance_analytics(¤t_metrics, &cluster_summary)?;
789
790 let resource_utilization =
792 self.generate_resource_analytics(&cluster_summary, &memory_optimization_status)?;
793
794 let communication = self.generate_communication_analytics(¤t_metrics)?;
796
797 let system_health = self.generate_system_health_analytics(&fault_tolerance_status)?;
799
800 let convergence = self.generate_convergence_analytics(¤t_metrics)?;
802
803 let efficiency = self.generate_efficiency_analytics(
805 &performance,
806 &resource_utilization,
807 &communication,
808 )?;
809
810 Ok(TrainingAnalytics {
811 performance,
812 resource_utilization,
813 communication,
814 system_health,
815 convergence,
816 efficiency,
817 timestamp_ms,
818 })
819 }
820
821 fn generate_performance_analytics(
823 &self,
824 current_metrics: &Option<NodeMetrics>,
825 cluster_summary: &Option<ClusterSummary>,
826 ) -> TorshResult<TrainingPerformanceAnalytics> {
827 let (current_epoch, avg_loss, cluster_throughput, avg_batch_time_ms) =
828 if let Some(metrics) = current_metrics {
829 (
830 metrics.training_metrics.epoch,
831 metrics.training_metrics.loss,
832 metrics.training_metrics.throughput_samples_per_sec
833 * cluster_summary.as_ref().map(|s| s.total_nodes).unwrap_or(1) as f32,
834 metrics.training_metrics.batch_time_ms,
835 )
836 } else {
837 (0, 0.0, 0.0, 0)
838 };
839
840 let (loss_trend, training_stability) = {
842 let trend_analyzer = self.trend_analyzer.lock().map_err(|e| {
843 TorshDistributedError::communication_error(
844 "trend_analyzer",
845 format!("Lock error: {}", e),
846 )
847 })?;
848 (
849 trend_analyzer.calculate_loss_trend(),
850 trend_analyzer.calculate_stability(),
851 )
852 };
853
854 let theoretical_max_throughput =
856 cluster_summary.as_ref().map(|s| s.total_nodes).unwrap_or(1) as f32 * 100.0; let throughput_efficiency = if theoretical_max_throughput > 0.0 {
858 (cluster_throughput / theoretical_max_throughput).min(1.0)
859 } else {
860 0.0
861 };
862
863 let batch_time_variance = 0.1; let estimated_completion_time = if cluster_throughput > 0.0 && current_epoch < 100 {
868 let remaining_epochs = 100 - current_epoch;
869 let samples_per_epoch = 10000; let remaining_samples = remaining_epochs as f32 * samples_per_epoch as f32;
871 let remaining_seconds = remaining_samples / cluster_throughput;
872 Some(Duration::from_secs(remaining_seconds as u64))
873 } else {
874 None
875 };
876
877 Ok(TrainingPerformanceAnalytics {
878 current_epoch,
879 avg_loss,
880 loss_trend,
881 cluster_throughput,
882 throughput_efficiency,
883 avg_batch_time_ms,
884 batch_time_variance,
885 training_stability,
886 estimated_completion_time,
887 })
888 }
889
890 fn generate_resource_analytics(
892 &self,
893 cluster_summary: &Option<ClusterSummary>,
894 memory_status: &MemoryOptimizationStatus,
895 ) -> TorshResult<ResourceUtilizationAnalytics> {
896 let (avg_cpu_utilization, avg_gpu_utilization) = if let Some(summary) = cluster_summary {
897 (summary.avg_cpu_utilization, summary.avg_gpu_utilization)
898 } else {
899 (0.0, 0.0)
900 };
901
902 let avg_memory_utilization = memory_status.avg_memory_utilization;
903
904 let utilizations = [
906 avg_cpu_utilization,
907 avg_gpu_utilization,
908 avg_memory_utilization,
909 ];
910 let max_util = utilizations.iter().fold(0.0f32, |a, &b| a.max(b));
911 let min_util = utilizations.iter().fold(100.0f32, |a, &b| a.min(b));
912 let utilization_balance = if max_util > 0.0 {
913 1.0 - (max_util - min_util) / max_util
914 } else {
915 1.0
916 };
917
918 let peak_cpu = avg_cpu_utilization * 1.2;
920 let peak_gpu = avg_gpu_utilization * 1.15;
921 let peak_memory = avg_memory_utilization * 1.1;
922
923 let resource_efficiency =
925 (avg_cpu_utilization + avg_gpu_utilization + avg_memory_utilization) / 300.0;
926
927 let primary_bottleneck = if avg_gpu_utilization < 60.0 {
929 ResourceBottleneck::GPU
930 } else if avg_memory_utilization > 90.0 {
931 ResourceBottleneck::Memory
932 } else if avg_cpu_utilization > 95.0 {
933 ResourceBottleneck::CPU
934 } else {
935 ResourceBottleneck::None
936 };
937
938 Ok(ResourceUtilizationAnalytics {
939 avg_cpu_utilization,
940 avg_gpu_utilization,
941 avg_memory_utilization,
942 utilization_balance,
943 peak_cpu,
944 peak_gpu,
945 peak_memory,
946 resource_efficiency,
947 primary_bottleneck,
948 })
949 }
950
951 fn generate_communication_analytics(
953 &self,
954 current_metrics: &Option<NodeMetrics>,
955 ) -> TorshResult<CommunicationAnalytics> {
956 let (avg_latency_us, bandwidth_utilization, efficiency_score, failed_operations_rate) =
957 if let Some(metrics) = current_metrics {
958 let comm = &metrics.communication_metrics;
959 (
960 comm.avg_latency_us,
961 comm.comm_bandwidth_mbps / 1000.0, comm.efficiency_score,
963 comm.failed_ops_count as f32 / 100.0, )
965 } else {
966 (0, 0.0, 0.0, 0.0)
967 };
968
969 let communication_patterns = CommunicationPatterns {
971 allreduce_frequency: 10.0,
972 allgather_frequency: 5.0,
973 p2p_frequency: 2.0,
974 gradient_sync_frequency: 8.0,
975 hotspots: vec![CommunicationHotspot {
976 source_node: "node_0".to_string(),
977 target_node: "node_1".to_string(),
978 traffic_volume: 50.0,
979 congestion_score: 0.3,
980 }],
981 };
982
983 let congestion_level = if avg_latency_us > 5000 { 0.7 } else { 0.2 };
984
985 Ok(CommunicationAnalytics {
986 avg_latency_us,
987 bandwidth_utilization,
988 efficiency_score,
989 failed_operations_rate,
990 communication_patterns,
991 congestion_level,
992 })
993 }
994
995 fn generate_system_health_analytics(
997 &self,
998 fault_tolerance_status: &FaultToleranceStatus,
999 ) -> TorshResult<SystemHealthAnalytics> {
1000 let cluster_health_score = fault_tolerance_status.system_health_score;
1001 let healthy_nodes = fault_tolerance_status.healthy_nodes;
1002 let degraded_nodes = fault_tolerance_status.excluded_nodes; let critical_nodes = 0; let failed_nodes = fault_tolerance_status
1005 .total_nodes
1006 .saturating_sub(fault_tolerance_status.healthy_nodes);
1007 let active_incidents = fault_tolerance_status.active_incidents;
1008
1009 let stability_trend = if cluster_health_score > 0.8 {
1011 0.1
1012 } else {
1013 -0.1
1014 };
1015
1016 let failure_probability = (1.0 - cluster_health_score).max(0.0);
1018
1019 Ok(SystemHealthAnalytics {
1020 cluster_health_score,
1021 healthy_nodes,
1022 degraded_nodes,
1023 critical_nodes,
1024 failed_nodes,
1025 active_incidents,
1026 stability_trend,
1027 failure_probability,
1028 })
1029 }
1030
1031 fn generate_convergence_analytics(
1033 &self,
1034 current_metrics: &Option<NodeMetrics>,
1035 ) -> TorshResult<ConvergenceAnalytics> {
1036 let (loss, gradient_norm) = if let Some(metrics) = current_metrics {
1037 (
1038 metrics.training_metrics.loss,
1039 metrics.training_metrics.gradient_norm,
1040 )
1041 } else {
1042 (0.0, 0.0)
1043 };
1044
1045 let convergence_rate = {
1047 let trend_analyzer = self.trend_analyzer.lock().map_err(|e| {
1048 TorshDistributedError::communication_error(
1049 "trend_analyzer",
1050 format!("Lock error: {}", e),
1051 )
1052 })?;
1053 -trend_analyzer.calculate_loss_trend() };
1055
1056 let convergence_confidence = if convergence_rate > 0.0 { 0.8 } else { 0.3 };
1058
1059 let training_progress = if loss > 0.0 {
1061 (1.0 / (loss + 1.0)).min(0.95)
1062 } else {
1063 0.0
1064 };
1065
1066 let loss_smoothness = 0.7; let gradient_norm_stats = GradientNormStats {
1071 avg_norm: gradient_norm,
1072 norm_variance: gradient_norm * 0.1, norm_trend: 0.0, explosion_risk: if gradient_norm > 10.0 { 0.8 } else { 0.1 },
1075 };
1076
1077 let lr_effectiveness = if convergence_rate > 0.0 { 0.7 } else { 0.3 };
1079
1080 let overfitting_risk = if training_progress > 0.8 { 0.6 } else { 0.2 };
1082
1083 Ok(ConvergenceAnalytics {
1084 convergence_rate,
1085 convergence_confidence,
1086 training_progress,
1087 loss_smoothness,
1088 gradient_norm_stats,
1089 lr_effectiveness,
1090 overfitting_risk,
1091 })
1092 }
1093
1094 fn generate_efficiency_analytics(
1096 &self,
1097 performance: &TrainingPerformanceAnalytics,
1098 resource_utilization: &ResourceUtilizationAnalytics,
1099 communication: &CommunicationAnalytics,
1100 ) -> TorshResult<EfficiencyAnalytics> {
1101 let compute_efficiency = resource_utilization.resource_efficiency;
1103 let communication_efficiency = communication.efficiency_score;
1104 let memory_efficiency =
1105 1.0 - (resource_utilization.avg_memory_utilization / 100.0 - 0.8).max(0.0) * 5.0; let overall_efficiency =
1109 (compute_efficiency + communication_efficiency + memory_efficiency) / 3.0;
1110
1111 let energy_efficiency = overall_efficiency * 0.8; let cost_efficiency = overall_efficiency * performance.throughput_efficiency;
1116
1117 let recommendations = if self.config.enable_recommendations {
1119 let mut recommendation_engine = self.recommendation_engine.lock().map_err(|e| {
1120 TorshDistributedError::communication_error(
1121 "recommendation_engine",
1122 format!("Lock error: {}", e),
1123 )
1124 })?;
1125 recommendation_engine.generate_recommendations()
1126 } else {
1127 Vec::new()
1128 };
1129
1130 Ok(EfficiencyAnalytics {
1131 overall_efficiency,
1132 compute_efficiency,
1133 communication_efficiency,
1134 memory_efficiency,
1135 energy_efficiency,
1136 cost_efficiency,
1137 recommendations,
1138 })
1139 }
1140
1141 pub fn get_current_analytics(&self) -> TorshResult<Option<TrainingAnalytics>> {
1143 let current_analytics = self.current_analytics.read().map_err(|e| {
1144 TorshDistributedError::communication_error(
1145 "get_current_analytics",
1146 format!("Lock error: {}", e),
1147 )
1148 })?;
1149 Ok(current_analytics.clone())
1150 }
1151
1152 pub fn get_analytics_history(&self) -> TorshResult<Vec<TrainingAnalytics>> {
1154 let analytics_history = self.analytics_history.lock().map_err(|e| {
1155 TorshDistributedError::communication_error(
1156 "get_analytics_history",
1157 format!("Lock error: {}", e),
1158 )
1159 })?;
1160 Ok(analytics_history.iter().cloned().collect())
1161 }
1162
1163 pub fn export_dashboard_data(&self) -> TorshResult<DashboardExport> {
1165 let current_analytics = self.get_current_analytics()?;
1166 let analytics_history = self.get_analytics_history()?;
1167
1168 Ok(DashboardExport {
1169 current_analytics,
1170 analytics_history,
1171 config: self.config.clone(),
1172 export_timestamp_ms: SystemTime::now()
1173 .duration_since(UNIX_EPOCH)
1174 .expect("time should be after UNIX_EPOCH")
1175 .as_millis() as u64,
1176 })
1177 }
1178
1179 pub fn generate_training_summary(&self) -> TorshResult<TrainingSummaryReport> {
1181 let current_analytics = self.get_current_analytics()?.ok_or_else(|| {
1182 TorshDistributedError::communication_error(
1183 "summary",
1184 "No analytics data available".to_string(),
1185 )
1186 })?;
1187
1188 let analytics_history = self.get_analytics_history()?;
1189
1190 let total_runtime = if !analytics_history.is_empty() {
1192 let start_time = analytics_history
1193 .first()
1194 .expect("analytics_history should not be empty")
1195 .timestamp_ms;
1196 let end_time = current_analytics.timestamp_ms;
1197 Duration::from_millis(end_time - start_time)
1198 } else {
1199 Duration::from_secs(0)
1200 };
1201
1202 let avg_efficiency = if !analytics_history.is_empty() {
1203 analytics_history
1204 .iter()
1205 .map(|a| a.efficiency.overall_efficiency)
1206 .sum::<f32>()
1207 / analytics_history.len() as f32
1208 } else {
1209 0.0
1210 };
1211
1212 let peak_throughput = analytics_history
1213 .iter()
1214 .map(|a| a.performance.cluster_throughput)
1215 .fold(0.0f32, |a, b| a.max(b));
1216
1217 Ok(TrainingSummaryReport {
1218 current_epoch: current_analytics.performance.current_epoch,
1219 current_loss: current_analytics.performance.avg_loss,
1220 total_runtime,
1221 avg_efficiency,
1222 peak_throughput,
1223 total_incidents: analytics_history
1224 .iter()
1225 .map(|a| a.system_health.active_incidents)
1226 .sum(),
1227 convergence_rate: current_analytics.convergence.convergence_rate,
1228 resource_utilization_summary: ResourceUtilizationSummary {
1229 avg_cpu: current_analytics.resource_utilization.avg_cpu_utilization,
1230 avg_gpu: current_analytics.resource_utilization.avg_gpu_utilization,
1231 avg_memory: current_analytics
1232 .resource_utilization
1233 .avg_memory_utilization,
1234 peak_cpu: current_analytics.resource_utilization.peak_cpu,
1235 peak_gpu: current_analytics.resource_utilization.peak_gpu,
1236 peak_memory: current_analytics.resource_utilization.peak_memory,
1237 },
1238 optimization_recommendations: current_analytics.efficiency.recommendations,
1239 generated_at: SystemTime::now()
1240 .duration_since(UNIX_EPOCH)
1241 .expect("time should be after UNIX_EPOCH")
1242 .as_millis() as u64,
1243 })
1244 }
1245}
1246
1247#[derive(Debug, Clone, Serialize, Deserialize)]
1249pub struct DashboardExport {
1250 pub current_analytics: Option<TrainingAnalytics>,
1251 pub analytics_history: Vec<TrainingAnalytics>,
1252 pub config: DashboardConfig,
1253 pub export_timestamp_ms: u64,
1254}
1255
1256#[derive(Debug, Clone, Serialize, Deserialize)]
1258pub struct TrainingSummaryReport {
1259 pub current_epoch: u32,
1260 pub current_loss: f32,
1261 pub total_runtime: Duration,
1262 pub avg_efficiency: f32,
1263 pub peak_throughput: f32,
1264 pub total_incidents: usize,
1265 pub convergence_rate: f32,
1266 pub resource_utilization_summary: ResourceUtilizationSummary,
1267 pub optimization_recommendations: Vec<OptimizationRecommendation>,
1268 pub generated_at: u64,
1269}
1270
1271#[derive(Debug, Clone, Serialize, Deserialize)]
1273pub struct ResourceUtilizationSummary {
1274 pub avg_cpu: f32,
1275 pub avg_gpu: f32,
1276 pub avg_memory: f32,
1277 pub peak_cpu: f32,
1278 pub peak_gpu: f32,
1279 pub peak_memory: f32,
1280}
1281
1282#[cfg(test)]
1283mod tests {
1284 use super::*;
1285 use crate::distributed_memory_optimization::{
1286 DistributedMemoryOptimizer, MemoryOptimizationConfig,
1287 };
1288 use crate::distributed_monitoring::{DistributedMonitor, MonitoringConfig};
1289 use crate::enhanced_fault_tolerance::{EnhancedFaultTolerance, FaultToleranceConfig};
1290
1291 #[tokio::test]
1292 async fn test_dashboard_creation() -> TorshResult<()> {
1293 let monitor_config = MonitoringConfig::default();
1294 let monitor = Arc::new(DistributedMonitor::new(monitor_config, false));
1295
1296 let ft_config = FaultToleranceConfig::default();
1297 let fault_tolerance = Arc::new(EnhancedFaultTolerance::new(ft_config, monitor.clone()));
1298
1299 let mem_config = MemoryOptimizationConfig::default();
1300 let memory_optimizer =
1301 Arc::new(DistributedMemoryOptimizer::new(mem_config, monitor.clone()));
1302
1303 let dashboard_config = DashboardConfig::default();
1304 let dashboard = TrainingAnalyticsDashboard::new(
1305 dashboard_config,
1306 monitor,
1307 fault_tolerance,
1308 memory_optimizer,
1309 );
1310
1311 let current_analytics = dashboard.get_current_analytics()?;
1312 assert!(current_analytics.is_none()); Ok(())
1315 }
1316
1317 #[tokio::test]
1318 async fn test_trend_analyzer() -> TorshResult<()> {
1319 let mut analyzer = TrendAnalyzer::new(Duration::from_secs(60));
1320
1321 for i in 0..20 {
1323 let timestamp = SystemTime::now()
1324 .duration_since(UNIX_EPOCH)
1325 .expect("time should be after UNIX_EPOCH")
1326 .as_millis() as u64
1327 + i * 1000;
1328 analyzer.update_loss(timestamp, 2.0 - i as f32 * 0.1); }
1330
1331 let loss_trend = analyzer.calculate_loss_trend();
1332 assert!(loss_trend < 0.0); let stability = analyzer.calculate_stability();
1335 assert!((0.0..=1.0).contains(&stability)); Ok(())
1339 }
1340
1341 #[tokio::test]
1342 async fn test_recommendation_engine() -> TorshResult<()> {
1343 let mut engine = RecommendationEngine::new();
1344
1345 for _ in 0..15 {
1347 engine.update_performance(
1348 100.0, 0.6, 80.0, 50.0, 70.0, 2000, );
1355 }
1356
1357 let recommendations = engine.generate_recommendations();
1358 assert!(!recommendations.is_empty());
1359
1360 let gpu_rec = recommendations.iter().find(|r| r.title.contains("GPU"));
1362 assert!(gpu_rec.is_some());
1363
1364 Ok(())
1365 }
1366}