Skip to main content

torsh_distributed/
training_analytics_dashboard.rs

1//! Distributed Training Analytics Dashboard
2//!
3//! This module provides a comprehensive analytics dashboard for distributed training,
4//! offering real-time insights, performance visualization, and intelligent analysis
5//! of training progress, resource utilization, and system health.
6
7// Framework infrastructure - components designed for future use
8#![allow(dead_code)]
9use crate::distributed_memory_optimization::{
10    DistributedMemoryOptimizer, MemoryOptimizationStatus,
11};
12use crate::distributed_monitoring::{ClusterSummary, DistributedMonitor, NodeMetrics};
13use crate::enhanced_fault_tolerance::{EnhancedFaultTolerance, FaultToleranceStatus};
14use crate::{TorshDistributedError, TorshResult};
15use serde::{Deserialize, Serialize};
16use std::collections::VecDeque;
17use std::sync::{Arc, Mutex, RwLock};
18use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
19
20/// Training analytics data aggregated across the cluster
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct TrainingAnalytics {
23    /// Training performance metrics
24    pub performance: TrainingPerformanceAnalytics,
25    /// Resource utilization analytics
26    pub resource_utilization: ResourceUtilizationAnalytics,
27    /// Communication efficiency analytics
28    pub communication: CommunicationAnalytics,
29    /// System health analytics
30    pub system_health: SystemHealthAnalytics,
31    /// Training convergence analytics
32    pub convergence: ConvergenceAnalytics,
33    /// Efficiency and optimization analytics
34    pub efficiency: EfficiencyAnalytics,
35    /// Timestamp of analytics generation
36    pub timestamp_ms: u64,
37}
38
39/// Training performance analytics
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct TrainingPerformanceAnalytics {
42    /// Current epoch across all nodes
43    pub current_epoch: u32,
44    /// Current average loss across nodes
45    pub avg_loss: f32,
46    /// Loss trend (positive = increasing, negative = decreasing)
47    pub loss_trend: f32,
48    /// Training throughput (samples/second across cluster)
49    pub cluster_throughput: f32,
50    /// Throughput efficiency compared to theoretical maximum
51    pub throughput_efficiency: f32,
52    /// Average batch time across nodes (milliseconds)
53    pub avg_batch_time_ms: u64,
54    /// Batch time variance (indicator of load balance)
55    pub batch_time_variance: f32,
56    /// Training stability score (0.0 to 1.0)
57    pub training_stability: f32,
58    /// Estimated time to completion
59    pub estimated_completion_time: Option<Duration>,
60}
61
62/// Resource utilization analytics
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct ResourceUtilizationAnalytics {
65    /// Average CPU utilization across cluster
66    pub avg_cpu_utilization: f32,
67    /// Average GPU utilization across cluster
68    pub avg_gpu_utilization: f32,
69    /// Average memory utilization across cluster
70    pub avg_memory_utilization: f32,
71    /// Resource utilization balance score
72    pub utilization_balance: f32,
73    /// Peak resource usage
74    pub peak_cpu: f32,
75    pub peak_gpu: f32,
76    pub peak_memory: f32,
77    /// Resource efficiency score
78    pub resource_efficiency: f32,
79    /// Bottleneck identification
80    pub primary_bottleneck: ResourceBottleneck,
81}
82
83/// Identified resource bottlenecks
84#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
85pub enum ResourceBottleneck {
86    CPU,
87    GPU,
88    Memory,
89    Network,
90    Storage,
91    None,
92}
93
94impl std::fmt::Display for ResourceBottleneck {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        match self {
97            ResourceBottleneck::CPU => write!(f, "CPU"),
98            ResourceBottleneck::GPU => write!(f, "GPU"),
99            ResourceBottleneck::Memory => write!(f, "Memory"),
100            ResourceBottleneck::Network => write!(f, "Network"),
101            ResourceBottleneck::Storage => write!(f, "Storage"),
102            ResourceBottleneck::None => write!(f, "None"),
103        }
104    }
105}
106
107/// Communication efficiency analytics
108#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct CommunicationAnalytics {
110    /// Average communication latency (microseconds)
111    pub avg_latency_us: u64,
112    /// Communication bandwidth utilization
113    pub bandwidth_utilization: f32,
114    /// Communication efficiency score
115    pub efficiency_score: f32,
116    /// Failed operations rate
117    pub failed_operations_rate: f32,
118    /// Communication patterns analysis
119    pub communication_patterns: CommunicationPatterns,
120    /// Network congestion indicator
121    pub congestion_level: f32,
122}
123
124/// Communication patterns analysis
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct CommunicationPatterns {
127    /// All-reduce operation frequency
128    pub allreduce_frequency: f32,
129    /// All-gather operation frequency
130    pub allgather_frequency: f32,
131    /// Point-to-point communication frequency
132    pub p2p_frequency: f32,
133    /// Gradient synchronization frequency
134    pub gradient_sync_frequency: f32,
135    /// Communication hotspots (node pairs with high traffic)
136    pub hotspots: Vec<CommunicationHotspot>,
137}
138
139/// Communication hotspot identification
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct CommunicationHotspot {
142    /// Source node
143    pub source_node: String,
144    /// Target node
145    pub target_node: String,
146    /// Traffic volume (MB/s)
147    pub traffic_volume: f32,
148    /// Congestion score (0.0 to 1.0)
149    pub congestion_score: f32,
150}
151
152/// System health analytics
153#[derive(Debug, Clone, Serialize, Deserialize)]
154pub struct SystemHealthAnalytics {
155    /// Overall cluster health score
156    pub cluster_health_score: f32,
157    /// Number of healthy nodes
158    pub healthy_nodes: usize,
159    /// Number of degraded nodes
160    pub degraded_nodes: usize,
161    /// Number of critical nodes
162    pub critical_nodes: usize,
163    /// Number of failed nodes
164    pub failed_nodes: usize,
165    /// Active incidents count
166    pub active_incidents: usize,
167    /// System stability trend
168    pub stability_trend: f32,
169    /// Predicted failure probability (0.0 to 1.0)
170    pub failure_probability: f32,
171}
172
173/// Training convergence analytics
174#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct ConvergenceAnalytics {
176    /// Convergence rate (improvement per epoch)
177    pub convergence_rate: f32,
178    /// Convergence confidence (0.0 to 1.0)
179    pub convergence_confidence: f32,
180    /// Training progress percentage
181    pub training_progress: f32,
182    /// Loss smoothness indicator
183    pub loss_smoothness: f32,
184    /// Gradient norm statistics
185    pub gradient_norm_stats: GradientNormStats,
186    /// Learning rate effectiveness
187    pub lr_effectiveness: f32,
188    /// Overfitting risk score
189    pub overfitting_risk: f32,
190}
191
192/// Gradient norm statistics
193#[derive(Debug, Clone, Serialize, Deserialize)]
194pub struct GradientNormStats {
195    /// Average gradient norm
196    pub avg_norm: f32,
197    /// Gradient norm variance
198    pub norm_variance: f32,
199    /// Gradient norm trend
200    pub norm_trend: f32,
201    /// Gradient explosion risk
202    pub explosion_risk: f32,
203}
204
205/// Training efficiency analytics
206#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct EfficiencyAnalytics {
208    /// Overall training efficiency score
209    pub overall_efficiency: f32,
210    /// Computational efficiency
211    pub compute_efficiency: f32,
212    /// Communication efficiency
213    pub communication_efficiency: f32,
214    /// Memory efficiency
215    pub memory_efficiency: f32,
216    /// Energy efficiency (performance per watt)
217    pub energy_efficiency: f32,
218    /// Cost efficiency (performance per dollar)
219    pub cost_efficiency: f32,
220    /// Optimization recommendations
221    pub recommendations: Vec<OptimizationRecommendation>,
222}
223
224/// Optimization recommendation
225#[derive(Debug, Clone, Serialize, Deserialize)]
226pub struct OptimizationRecommendation {
227    /// Recommendation category
228    pub category: RecommendationCategory,
229    /// Recommendation title
230    pub title: String,
231    /// Detailed description
232    pub description: String,
233    /// Expected impact (0.0 to 1.0)
234    pub expected_impact: f32,
235    /// Implementation difficulty (0.0 to 1.0)
236    pub difficulty: f32,
237    /// Priority score
238    pub priority: u32,
239}
240
241/// Recommendation categories
242#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
243pub enum RecommendationCategory {
244    Performance,
245    Efficiency,
246    Reliability,
247    Cost,
248    Scalability,
249}
250
251impl std::fmt::Display for RecommendationCategory {
252    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
253        match self {
254            RecommendationCategory::Performance => write!(f, "Performance"),
255            RecommendationCategory::Efficiency => write!(f, "Efficiency"),
256            RecommendationCategory::Reliability => write!(f, "Reliability"),
257            RecommendationCategory::Cost => write!(f, "Cost"),
258            RecommendationCategory::Scalability => write!(f, "Scalability"),
259        }
260    }
261}
262
263/// Dashboard configuration
264#[derive(Debug, Clone, Serialize, Deserialize)]
265pub struct DashboardConfig {
266    /// Analytics update interval
267    pub update_interval: Duration,
268    /// Historical data retention period
269    pub retention_period: Duration,
270    /// Enable predictive analytics
271    pub enable_predictions: bool,
272    /// Enable optimization recommendations
273    pub enable_recommendations: bool,
274    /// Metrics aggregation window
275    pub aggregation_window: Duration,
276    /// Alert thresholds
277    pub alert_thresholds: DashboardAlertThresholds,
278}
279
280/// Alert thresholds for dashboard
281#[derive(Debug, Clone, Serialize, Deserialize)]
282pub struct DashboardAlertThresholds {
283    /// Training efficiency threshold
284    pub efficiency_threshold: f32,
285    /// Resource utilization threshold
286    pub utilization_threshold: f32,
287    /// Communication latency threshold (microseconds)
288    pub latency_threshold: u64,
289    /// Convergence rate threshold
290    pub convergence_threshold: f32,
291}
292
293impl Default for DashboardConfig {
294    fn default() -> Self {
295        Self {
296            update_interval: Duration::from_secs(10),
297            retention_period: Duration::from_secs(24 * 3600), // 24 hours
298            enable_predictions: true,
299            enable_recommendations: true,
300            aggregation_window: Duration::from_secs(60),
301            alert_thresholds: DashboardAlertThresholds {
302                efficiency_threshold: 0.7,
303                utilization_threshold: 0.9,
304                latency_threshold: 10000,
305                convergence_threshold: 0.1,
306            },
307        }
308    }
309}
310
311/// Distributed training analytics dashboard
312pub struct TrainingAnalyticsDashboard {
313    /// Configuration
314    config: DashboardConfig,
315    /// Distributed monitoring system
316    monitor: Arc<DistributedMonitor>,
317    /// Enhanced fault tolerance system
318    fault_tolerance: Arc<EnhancedFaultTolerance>,
319    /// Memory optimization system
320    memory_optimizer: Arc<DistributedMemoryOptimizer>,
321    /// Current analytics
322    current_analytics: Arc<RwLock<Option<TrainingAnalytics>>>,
323    /// Analytics history
324    analytics_history: Arc<Mutex<VecDeque<TrainingAnalytics>>>,
325    /// Performance trend analyzer
326    trend_analyzer: Arc<Mutex<TrendAnalyzer>>,
327    /// Recommendation engine
328    recommendation_engine: Arc<Mutex<RecommendationEngine>>,
329    /// Last update time
330    last_update: Arc<Mutex<Instant>>,
331}
332
333/// Trend analysis system
334#[derive(Debug)]
335struct TrendAnalyzer {
336    /// Loss history for trend analysis
337    loss_history: VecDeque<(u64, f32)>, // (timestamp, loss)
338    /// Throughput history
339    throughput_history: VecDeque<(u64, f32)>,
340    /// Resource utilization history
341    resource_history: VecDeque<(u64, ResourceSnapshot)>,
342    /// Trend calculation window
343    trend_window: Duration,
344}
345
346/// Resource utilization snapshot
347#[derive(Debug, Clone)]
348struct ResourceSnapshot {
349    cpu: f32,
350    gpu: f32,
351    memory: f32,
352}
353
354impl TrendAnalyzer {
355    fn new(trend_window: Duration) -> Self {
356        Self {
357            loss_history: VecDeque::with_capacity(1000),
358            throughput_history: VecDeque::with_capacity(1000),
359            resource_history: VecDeque::with_capacity(1000),
360            trend_window,
361        }
362    }
363
364    fn update_loss(&mut self, timestamp: u64, loss: f32) {
365        self.loss_history.push_back((timestamp, loss));
366        self.cleanup_old_data(timestamp);
367    }
368
369    fn update_throughput(&mut self, timestamp: u64, throughput: f32) {
370        self.throughput_history.push_back((timestamp, throughput));
371        self.cleanup_old_data(timestamp);
372    }
373
374    fn update_resources(&mut self, timestamp: u64, cpu: f32, gpu: f32, memory: f32) {
375        self.resource_history
376            .push_back((timestamp, ResourceSnapshot { cpu, gpu, memory }));
377        self.cleanup_old_data(timestamp);
378    }
379
380    fn cleanup_old_data(&mut self, current_timestamp: u64) {
381        let cutoff = current_timestamp.saturating_sub(self.trend_window.as_millis() as u64);
382
383        self.loss_history.retain(|(ts, _)| *ts >= cutoff);
384        self.throughput_history.retain(|(ts, _)| *ts >= cutoff);
385        self.resource_history.retain(|(ts, _)| *ts >= cutoff);
386    }
387
388    fn calculate_loss_trend(&self) -> f32 {
389        if self.loss_history.len() < 10 {
390            return 0.0;
391        }
392
393        let recent_data: Vec<f32> = self
394            .loss_history
395            .iter()
396            .rev()
397            .take(10)
398            .map(|(_, loss)| *loss)
399            .collect();
400        // recent_data[..5] is the LATEST data (most recent)
401        // recent_data[5..] is the EARLIER data (less recent)
402        let late_avg = recent_data[..5].iter().sum::<f32>() / 5.0;
403        let early_avg = recent_data[5..].iter().sum::<f32>() / (recent_data.len() - 5) as f32;
404
405        (late_avg - early_avg) / early_avg.max(0.001) // Negative means improvement (loss decreasing)
406    }
407
408    fn calculate_throughput_trend(&self) -> f32 {
409        if self.throughput_history.len() < 10 {
410            return 0.0;
411        }
412
413        let recent_data: Vec<f32> = self
414            .throughput_history
415            .iter()
416            .rev()
417            .take(10)
418            .map(|(_, tput)| *tput)
419            .collect();
420        let early_avg = recent_data[5..].iter().sum::<f32>() / (recent_data.len() - 5) as f32;
421        let late_avg = recent_data[..5].iter().sum::<f32>() / 5.0;
422
423        (late_avg - early_avg) / early_avg.max(0.001) // Positive means improvement
424    }
425
426    fn calculate_stability(&self) -> f32 {
427        if self.loss_history.len() < 20 {
428            return 0.5; // Neutral stability
429        }
430
431        let recent_losses: Vec<f32> = self
432            .loss_history
433            .iter()
434            .rev()
435            .take(20)
436            .map(|(_, loss)| *loss)
437            .collect();
438        let mean = recent_losses.iter().sum::<f32>() / recent_losses.len() as f32;
439        let variance = recent_losses
440            .iter()
441            .map(|&x| (x - mean).powi(2))
442            .sum::<f32>()
443            / recent_losses.len() as f32;
444        let std_dev = variance.sqrt();
445
446        // Lower coefficient of variation = higher stability
447        let cv = if mean > 0.001 { std_dev / mean } else { 1.0 };
448        (1.0 - cv.min(1.0)).max(0.0)
449    }
450}
451
452/// Intelligent recommendation engine
453#[derive(Debug)]
454struct RecommendationEngine {
455    /// Recent performance data for analysis
456    performance_history: VecDeque<PerformanceSnapshot>,
457    /// Generated recommendations cache
458    recommendation_cache: Vec<OptimizationRecommendation>,
459    /// Last recommendation generation time
460    last_generation: Instant,
461}
462
463/// Performance snapshot for recommendations
464#[derive(Debug, Clone)]
465struct PerformanceSnapshot {
466    timestamp: Instant,
467    throughput: f32,
468    efficiency: f32,
469    cpu_util: f32,
470    gpu_util: f32,
471    memory_util: f32,
472    communication_latency: u64,
473}
474
475impl RecommendationEngine {
476    fn new() -> Self {
477        Self {
478            performance_history: VecDeque::with_capacity(100),
479            recommendation_cache: Vec::new(),
480            last_generation: Instant::now(),
481        }
482    }
483
484    fn update_performance(
485        &mut self,
486        throughput: f32,
487        efficiency: f32,
488        cpu_util: f32,
489        gpu_util: f32,
490        memory_util: f32,
491        communication_latency: u64,
492    ) {
493        let snapshot = PerformanceSnapshot {
494            timestamp: Instant::now(),
495            throughput,
496            efficiency,
497            cpu_util,
498            gpu_util,
499            memory_util,
500            communication_latency,
501        };
502
503        self.performance_history.push_back(snapshot);
504        if self.performance_history.len() > 100 {
505            self.performance_history.pop_front();
506        }
507    }
508
509    fn generate_recommendations(&mut self) -> Vec<OptimizationRecommendation> {
510        // Only regenerate recommendations every 5 minutes (skip check if cache is empty for first run)
511        if !self.recommendation_cache.is_empty() && self.last_generation.elapsed().as_secs() < 300 {
512            return self.recommendation_cache.clone();
513        }
514
515        let mut recommendations = Vec::new();
516
517        if self.performance_history.len() < 10 {
518            return recommendations;
519        }
520
521        // Analyze recent performance
522        let recent_perf: Vec<&PerformanceSnapshot> =
523            self.performance_history.iter().rev().take(10).collect();
524
525        // Check for low GPU utilization
526        let avg_gpu_util =
527            recent_perf.iter().map(|p| p.gpu_util).sum::<f32>() / recent_perf.len() as f32;
528        if avg_gpu_util < 70.0 {
529            recommendations.push(OptimizationRecommendation {
530                category: RecommendationCategory::Performance,
531                title: "Increase GPU Utilization".to_string(),
532                description: format!("GPU utilization is at {:.1}%. Consider increasing batch size or adjusting data loading.", avg_gpu_util),
533                expected_impact: 0.8,
534                difficulty: 0.3,
535                priority: 4,
536            });
537        }
538
539        // Check for high memory utilization
540        let avg_memory_util =
541            recent_perf.iter().map(|p| p.memory_util).sum::<f32>() / recent_perf.len() as f32;
542        if avg_memory_util > 90.0 {
543            recommendations.push(OptimizationRecommendation {
544                category: RecommendationCategory::Efficiency,
545                title: "Optimize Memory Usage".to_string(),
546                description: format!("Memory utilization is at {:.1}%. Consider enabling gradient checkpointing or reducing batch size.", avg_memory_util),
547                expected_impact: 0.6,
548                difficulty: 0.4,
549                priority: 3,
550            });
551        }
552
553        // Check for high communication latency
554        let avg_latency = recent_perf
555            .iter()
556            .map(|p| p.communication_latency)
557            .sum::<u64>()
558            / recent_perf.len() as u64;
559        if avg_latency > 5000 {
560            recommendations.push(OptimizationRecommendation {
561                category: RecommendationCategory::Performance,
562                title: "Optimize Communication".to_string(),
563                description: format!("Communication latency is {}μs. Consider gradient compression or improving network connectivity.", avg_latency),
564                expected_impact: 0.7,
565                difficulty: 0.6,
566                priority: 3,
567            });
568        }
569
570        // Check for efficiency trends
571        let efficiency_trend = self.calculate_efficiency_trend(&recent_perf);
572        if efficiency_trend < -0.1 {
573            recommendations.push(OptimizationRecommendation {
574                category: RecommendationCategory::Efficiency,
575                title: "Address Efficiency Decline".to_string(),
576                description: "Training efficiency is declining. Review resource allocation and check for bottlenecks.".to_string(),
577                expected_impact: 0.9,
578                difficulty: 0.7,
579                priority: 5,
580            });
581        }
582
583        // Check for load balancing issues
584        let throughput_variance = self.calculate_throughput_variance(&recent_perf);
585        if throughput_variance > 0.2 {
586            recommendations.push(OptimizationRecommendation {
587                category: RecommendationCategory::Scalability,
588                title: "Improve Load Balancing".to_string(),
589                description: "High throughput variance detected. Consider redistributing workload across nodes.".to_string(),
590                expected_impact: 0.5,
591                difficulty: 0.8,
592                priority: 2,
593            });
594        }
595
596        // Sort by priority
597        recommendations.sort_by(|a, b| b.priority.cmp(&a.priority));
598
599        self.recommendation_cache = recommendations.clone();
600        self.last_generation = Instant::now();
601
602        recommendations
603    }
604
605    fn calculate_efficiency_trend(&self, recent_perf: &[&PerformanceSnapshot]) -> f32 {
606        if recent_perf.len() < 6 {
607            return 0.0;
608        }
609
610        let early_efficiency: f32 = recent_perf[3..].iter().map(|p| p.efficiency).sum::<f32>()
611            / (recent_perf.len() - 3) as f32;
612        let late_efficiency: f32 = recent_perf[..3].iter().map(|p| p.efficiency).sum::<f32>() / 3.0;
613
614        (late_efficiency - early_efficiency) / early_efficiency.max(0.001)
615    }
616
617    fn calculate_throughput_variance(&self, recent_perf: &[&PerformanceSnapshot]) -> f32 {
618        if recent_perf.len() < 5 {
619            return 0.0;
620        }
621
622        let throughputs: Vec<f32> = recent_perf.iter().map(|p| p.throughput).collect();
623        let mean = throughputs.iter().sum::<f32>() / throughputs.len() as f32;
624        let variance =
625            throughputs.iter().map(|&x| (x - mean).powi(2)).sum::<f32>() / throughputs.len() as f32;
626
627        if mean > 0.001 {
628            variance.sqrt() / mean // Coefficient of variation
629        } else {
630            0.0
631        }
632    }
633}
634
635impl TrainingAnalyticsDashboard {
636    /// Create new training analytics dashboard
637    pub fn new(
638        config: DashboardConfig,
639        monitor: Arc<DistributedMonitor>,
640        fault_tolerance: Arc<EnhancedFaultTolerance>,
641        memory_optimizer: Arc<DistributedMemoryOptimizer>,
642    ) -> Self {
643        Self {
644            config: config.clone(),
645            monitor,
646            fault_tolerance,
647            memory_optimizer,
648            current_analytics: Arc::new(RwLock::new(None)),
649            analytics_history: Arc::new(Mutex::new(VecDeque::new())),
650            trend_analyzer: Arc::new(Mutex::new(TrendAnalyzer::new(config.aggregation_window))),
651            recommendation_engine: Arc::new(Mutex::new(RecommendationEngine::new())),
652            last_update: Arc::new(Mutex::new(Instant::now())),
653        }
654    }
655
656    /// Update analytics with latest data
657    pub fn update_analytics(&self) -> TorshResult<()> {
658        // Check if enough time has passed since last update
659        {
660            let last_update = self.last_update.lock().map_err(|e| {
661                TorshDistributedError::communication_error(
662                    "last_update",
663                    format!("Lock error: {}", e),
664                )
665            })?;
666            if last_update.elapsed() < self.config.update_interval {
667                return Ok(());
668            }
669        }
670
671        // Gather data from all systems
672        let cluster_summary = self.monitor.get_cluster_summary().ok();
673
674        let fault_tolerance_status = self.fault_tolerance.get_status()?;
675        let memory_optimization_status = self.memory_optimizer.get_optimization_status()?;
676
677        // Generate comprehensive analytics
678        let analytics = self.generate_training_analytics(
679            cluster_summary,
680            fault_tolerance_status,
681            memory_optimization_status,
682        )?;
683
684        // Update trend analyzer
685        {
686            let mut trend_analyzer = self.trend_analyzer.lock().map_err(|e| {
687                TorshDistributedError::communication_error(
688                    "trend_analyzer",
689                    format!("Lock error: {}", e),
690                )
691            })?;
692
693            let timestamp = SystemTime::now()
694                .duration_since(UNIX_EPOCH)
695                .expect("time should be after UNIX_EPOCH")
696                .as_millis() as u64;
697            trend_analyzer.update_loss(timestamp, analytics.performance.avg_loss);
698            trend_analyzer.update_throughput(timestamp, analytics.performance.cluster_throughput);
699            trend_analyzer.update_resources(
700                timestamp,
701                analytics.resource_utilization.avg_cpu_utilization,
702                analytics.resource_utilization.avg_gpu_utilization,
703                analytics.resource_utilization.avg_memory_utilization,
704            );
705        }
706
707        // Update recommendation engine
708        if self.config.enable_recommendations {
709            let mut recommendation_engine = self.recommendation_engine.lock().map_err(|e| {
710                TorshDistributedError::communication_error(
711                    "recommendation_engine",
712                    format!("Lock error: {}", e),
713                )
714            })?;
715
716            recommendation_engine.update_performance(
717                analytics.performance.cluster_throughput,
718                analytics.efficiency.overall_efficiency,
719                analytics.resource_utilization.avg_cpu_utilization,
720                analytics.resource_utilization.avg_gpu_utilization,
721                analytics.resource_utilization.avg_memory_utilization,
722                analytics.communication.avg_latency_us,
723            );
724        }
725
726        // Store current analytics
727        {
728            let mut current_analytics = self.current_analytics.write().map_err(|e| {
729                TorshDistributedError::communication_error(
730                    "current_analytics",
731                    format!("Lock error: {}", e),
732                )
733            })?;
734            *current_analytics = Some(analytics.clone());
735        }
736
737        // Add to history
738        {
739            let mut analytics_history = self.analytics_history.lock().map_err(|e| {
740                TorshDistributedError::communication_error(
741                    "analytics_history",
742                    format!("Lock error: {}", e),
743                )
744            })?;
745            analytics_history.push_back(analytics);
746
747            // Cleanup old data
748            let retention_cutoff = (SystemTime::now()
749                .duration_since(UNIX_EPOCH)
750                .expect("time should be after UNIX_EPOCH")
751                .as_millis() as u64)
752                .saturating_sub(self.config.retention_period.as_millis() as u64);
753
754            analytics_history.retain(|a| a.timestamp_ms >= retention_cutoff);
755        }
756
757        // Update last update time
758        {
759            let mut last_update = self.last_update.lock().map_err(|e| {
760                TorshDistributedError::communication_error(
761                    "last_update",
762                    format!("Lock error: {}", e),
763                )
764            })?;
765            *last_update = Instant::now();
766        }
767
768        Ok(())
769    }
770
771    /// Generate comprehensive training analytics
772    fn generate_training_analytics(
773        &self,
774        cluster_summary: Option<ClusterSummary>,
775        fault_tolerance_status: FaultToleranceStatus,
776        memory_optimization_status: MemoryOptimizationStatus,
777    ) -> TorshResult<TrainingAnalytics> {
778        let timestamp_ms = SystemTime::now()
779            .duration_since(UNIX_EPOCH)
780            .expect("system time should be after UNIX_EPOCH")
781            .as_millis() as u64;
782
783        // Get current monitoring data
784        let current_metrics = self.monitor.get_current_metrics()?;
785
786        // Generate performance analytics
787        let performance =
788            self.generate_performance_analytics(&current_metrics, &cluster_summary)?;
789
790        // Generate resource utilization analytics
791        let resource_utilization =
792            self.generate_resource_analytics(&cluster_summary, &memory_optimization_status)?;
793
794        // Generate communication analytics
795        let communication = self.generate_communication_analytics(&current_metrics)?;
796
797        // Generate system health analytics
798        let system_health = self.generate_system_health_analytics(&fault_tolerance_status)?;
799
800        // Generate convergence analytics
801        let convergence = self.generate_convergence_analytics(&current_metrics)?;
802
803        // Generate efficiency analytics
804        let efficiency = self.generate_efficiency_analytics(
805            &performance,
806            &resource_utilization,
807            &communication,
808        )?;
809
810        Ok(TrainingAnalytics {
811            performance,
812            resource_utilization,
813            communication,
814            system_health,
815            convergence,
816            efficiency,
817            timestamp_ms,
818        })
819    }
820
821    /// Generate training performance analytics
822    fn generate_performance_analytics(
823        &self,
824        current_metrics: &Option<NodeMetrics>,
825        cluster_summary: &Option<ClusterSummary>,
826    ) -> TorshResult<TrainingPerformanceAnalytics> {
827        let (current_epoch, avg_loss, cluster_throughput, avg_batch_time_ms) =
828            if let Some(metrics) = current_metrics {
829                (
830                    metrics.training_metrics.epoch,
831                    metrics.training_metrics.loss,
832                    metrics.training_metrics.throughput_samples_per_sec
833                        * cluster_summary.as_ref().map(|s| s.total_nodes).unwrap_or(1) as f32,
834                    metrics.training_metrics.batch_time_ms,
835                )
836            } else {
837                (0, 0.0, 0.0, 0)
838            };
839
840        // Calculate trends using trend analyzer
841        let (loss_trend, training_stability) = {
842            let trend_analyzer = self.trend_analyzer.lock().map_err(|e| {
843                TorshDistributedError::communication_error(
844                    "trend_analyzer",
845                    format!("Lock error: {}", e),
846                )
847            })?;
848            (
849                trend_analyzer.calculate_loss_trend(),
850                trend_analyzer.calculate_stability(),
851            )
852        };
853
854        // Estimate throughput efficiency (simplified)
855        let theoretical_max_throughput =
856            cluster_summary.as_ref().map(|s| s.total_nodes).unwrap_or(1) as f32 * 100.0; // Assume 100 samples/sec per node max
857        let throughput_efficiency = if theoretical_max_throughput > 0.0 {
858            (cluster_throughput / theoretical_max_throughput).min(1.0)
859        } else {
860            0.0
861        };
862
863        // Calculate batch time variance (simplified estimate)
864        let batch_time_variance = 0.1; // Would be calculated from actual node data
865
866        // Estimate completion time (simplified)
867        let estimated_completion_time = if cluster_throughput > 0.0 && current_epoch < 100 {
868            let remaining_epochs = 100 - current_epoch;
869            let samples_per_epoch = 10000; // Assume 10K samples per epoch
870            let remaining_samples = remaining_epochs as f32 * samples_per_epoch as f32;
871            let remaining_seconds = remaining_samples / cluster_throughput;
872            Some(Duration::from_secs(remaining_seconds as u64))
873        } else {
874            None
875        };
876
877        Ok(TrainingPerformanceAnalytics {
878            current_epoch,
879            avg_loss,
880            loss_trend,
881            cluster_throughput,
882            throughput_efficiency,
883            avg_batch_time_ms,
884            batch_time_variance,
885            training_stability,
886            estimated_completion_time,
887        })
888    }
889
890    /// Generate resource utilization analytics
891    fn generate_resource_analytics(
892        &self,
893        cluster_summary: &Option<ClusterSummary>,
894        memory_status: &MemoryOptimizationStatus,
895    ) -> TorshResult<ResourceUtilizationAnalytics> {
896        let (avg_cpu_utilization, avg_gpu_utilization) = if let Some(summary) = cluster_summary {
897            (summary.avg_cpu_utilization, summary.avg_gpu_utilization)
898        } else {
899            (0.0, 0.0)
900        };
901
902        let avg_memory_utilization = memory_status.avg_memory_utilization;
903
904        // Calculate utilization balance (how evenly resources are used)
905        let utilizations = [
906            avg_cpu_utilization,
907            avg_gpu_utilization,
908            avg_memory_utilization,
909        ];
910        let max_util = utilizations.iter().fold(0.0f32, |a, &b| a.max(b));
911        let min_util = utilizations.iter().fold(100.0f32, |a, &b| a.min(b));
912        let utilization_balance = if max_util > 0.0 {
913            1.0 - (max_util - min_util) / max_util
914        } else {
915            1.0
916        };
917
918        // Simulate peak usage (would be tracked from history)
919        let peak_cpu = avg_cpu_utilization * 1.2;
920        let peak_gpu = avg_gpu_utilization * 1.15;
921        let peak_memory = avg_memory_utilization * 1.1;
922
923        // Calculate resource efficiency
924        let resource_efficiency =
925            (avg_cpu_utilization + avg_gpu_utilization + avg_memory_utilization) / 300.0;
926
927        // Identify primary bottleneck
928        let primary_bottleneck = if avg_gpu_utilization < 60.0 {
929            ResourceBottleneck::GPU
930        } else if avg_memory_utilization > 90.0 {
931            ResourceBottleneck::Memory
932        } else if avg_cpu_utilization > 95.0 {
933            ResourceBottleneck::CPU
934        } else {
935            ResourceBottleneck::None
936        };
937
938        Ok(ResourceUtilizationAnalytics {
939            avg_cpu_utilization,
940            avg_gpu_utilization,
941            avg_memory_utilization,
942            utilization_balance,
943            peak_cpu,
944            peak_gpu,
945            peak_memory,
946            resource_efficiency,
947            primary_bottleneck,
948        })
949    }
950
951    /// Generate communication analytics
952    fn generate_communication_analytics(
953        &self,
954        current_metrics: &Option<NodeMetrics>,
955    ) -> TorshResult<CommunicationAnalytics> {
956        let (avg_latency_us, bandwidth_utilization, efficiency_score, failed_operations_rate) =
957            if let Some(metrics) = current_metrics {
958                let comm = &metrics.communication_metrics;
959                (
960                    comm.avg_latency_us,
961                    comm.comm_bandwidth_mbps / 1000.0, // Assume 1Gbps max
962                    comm.efficiency_score,
963                    comm.failed_ops_count as f32 / 100.0, // Normalize
964                )
965            } else {
966                (0, 0.0, 0.0, 0.0)
967            };
968
969        // Simulate communication patterns
970        let communication_patterns = CommunicationPatterns {
971            allreduce_frequency: 10.0,
972            allgather_frequency: 5.0,
973            p2p_frequency: 2.0,
974            gradient_sync_frequency: 8.0,
975            hotspots: vec![CommunicationHotspot {
976                source_node: "node_0".to_string(),
977                target_node: "node_1".to_string(),
978                traffic_volume: 50.0,
979                congestion_score: 0.3,
980            }],
981        };
982
983        let congestion_level = if avg_latency_us > 5000 { 0.7 } else { 0.2 };
984
985        Ok(CommunicationAnalytics {
986            avg_latency_us,
987            bandwidth_utilization,
988            efficiency_score,
989            failed_operations_rate,
990            communication_patterns,
991            congestion_level,
992        })
993    }
994
995    /// Generate system health analytics
996    fn generate_system_health_analytics(
997        &self,
998        fault_tolerance_status: &FaultToleranceStatus,
999    ) -> TorshResult<SystemHealthAnalytics> {
1000        let cluster_health_score = fault_tolerance_status.system_health_score;
1001        let healthy_nodes = fault_tolerance_status.healthy_nodes;
1002        let degraded_nodes = fault_tolerance_status.excluded_nodes; // Simplified mapping
1003        let critical_nodes = 0; // Would be calculated from actual health data
1004        let failed_nodes = fault_tolerance_status
1005            .total_nodes
1006            .saturating_sub(fault_tolerance_status.healthy_nodes);
1007        let active_incidents = fault_tolerance_status.active_incidents;
1008
1009        // Calculate stability trend (simplified)
1010        let stability_trend = if cluster_health_score > 0.8 {
1011            0.1
1012        } else {
1013            -0.1
1014        };
1015
1016        // Calculate failure probability based on health score
1017        let failure_probability = (1.0 - cluster_health_score).max(0.0);
1018
1019        Ok(SystemHealthAnalytics {
1020            cluster_health_score,
1021            healthy_nodes,
1022            degraded_nodes,
1023            critical_nodes,
1024            failed_nodes,
1025            active_incidents,
1026            stability_trend,
1027            failure_probability,
1028        })
1029    }
1030
1031    /// Generate convergence analytics
1032    fn generate_convergence_analytics(
1033        &self,
1034        current_metrics: &Option<NodeMetrics>,
1035    ) -> TorshResult<ConvergenceAnalytics> {
1036        let (loss, gradient_norm) = if let Some(metrics) = current_metrics {
1037            (
1038                metrics.training_metrics.loss,
1039                metrics.training_metrics.gradient_norm,
1040            )
1041        } else {
1042            (0.0, 0.0)
1043        };
1044
1045        // Calculate convergence rate from trend analyzer
1046        let convergence_rate = {
1047            let trend_analyzer = self.trend_analyzer.lock().map_err(|e| {
1048                TorshDistributedError::communication_error(
1049                    "trend_analyzer",
1050                    format!("Lock error: {}", e),
1051                )
1052            })?;
1053            -trend_analyzer.calculate_loss_trend() // Negative loss trend = positive convergence rate
1054        };
1055
1056        // Calculate convergence confidence (simplified)
1057        let convergence_confidence = if convergence_rate > 0.0 { 0.8 } else { 0.3 };
1058
1059        // Estimate training progress (simplified)
1060        let training_progress = if loss > 0.0 {
1061            (1.0 / (loss + 1.0)).min(0.95)
1062        } else {
1063            0.0
1064        };
1065
1066        // Calculate loss smoothness (simplified)
1067        let loss_smoothness = 0.7; // Would be calculated from loss variance
1068
1069        // Gradient norm statistics
1070        let gradient_norm_stats = GradientNormStats {
1071            avg_norm: gradient_norm,
1072            norm_variance: gradient_norm * 0.1, // Simplified
1073            norm_trend: 0.0,                    // Would be calculated from history
1074            explosion_risk: if gradient_norm > 10.0 { 0.8 } else { 0.1 },
1075        };
1076
1077        // Learning rate effectiveness (simplified)
1078        let lr_effectiveness = if convergence_rate > 0.0 { 0.7 } else { 0.3 };
1079
1080        // Overfitting risk (simplified)
1081        let overfitting_risk = if training_progress > 0.8 { 0.6 } else { 0.2 };
1082
1083        Ok(ConvergenceAnalytics {
1084            convergence_rate,
1085            convergence_confidence,
1086            training_progress,
1087            loss_smoothness,
1088            gradient_norm_stats,
1089            lr_effectiveness,
1090            overfitting_risk,
1091        })
1092    }
1093
1094    /// Generate efficiency analytics
1095    fn generate_efficiency_analytics(
1096        &self,
1097        performance: &TrainingPerformanceAnalytics,
1098        resource_utilization: &ResourceUtilizationAnalytics,
1099        communication: &CommunicationAnalytics,
1100    ) -> TorshResult<EfficiencyAnalytics> {
1101        // Calculate component efficiencies
1102        let compute_efficiency = resource_utilization.resource_efficiency;
1103        let communication_efficiency = communication.efficiency_score;
1104        let memory_efficiency =
1105            1.0 - (resource_utilization.avg_memory_utilization / 100.0 - 0.8).max(0.0) * 5.0; // Penalty for high memory usage
1106
1107        // Calculate overall efficiency
1108        let overall_efficiency =
1109            (compute_efficiency + communication_efficiency + memory_efficiency) / 3.0;
1110
1111        // Estimate energy efficiency (simplified)
1112        let energy_efficiency = overall_efficiency * 0.8; // Assume some energy overhead
1113
1114        // Estimate cost efficiency (simplified)
1115        let cost_efficiency = overall_efficiency * performance.throughput_efficiency;
1116
1117        // Generate recommendations
1118        let recommendations = if self.config.enable_recommendations {
1119            let mut recommendation_engine = self.recommendation_engine.lock().map_err(|e| {
1120                TorshDistributedError::communication_error(
1121                    "recommendation_engine",
1122                    format!("Lock error: {}", e),
1123                )
1124            })?;
1125            recommendation_engine.generate_recommendations()
1126        } else {
1127            Vec::new()
1128        };
1129
1130        Ok(EfficiencyAnalytics {
1131            overall_efficiency,
1132            compute_efficiency,
1133            communication_efficiency,
1134            memory_efficiency,
1135            energy_efficiency,
1136            cost_efficiency,
1137            recommendations,
1138        })
1139    }
1140
1141    /// Get current analytics
1142    pub fn get_current_analytics(&self) -> TorshResult<Option<TrainingAnalytics>> {
1143        let current_analytics = self.current_analytics.read().map_err(|e| {
1144            TorshDistributedError::communication_error(
1145                "get_current_analytics",
1146                format!("Lock error: {}", e),
1147            )
1148        })?;
1149        Ok(current_analytics.clone())
1150    }
1151
1152    /// Get analytics history
1153    pub fn get_analytics_history(&self) -> TorshResult<Vec<TrainingAnalytics>> {
1154        let analytics_history = self.analytics_history.lock().map_err(|e| {
1155            TorshDistributedError::communication_error(
1156                "get_analytics_history",
1157                format!("Lock error: {}", e),
1158            )
1159        })?;
1160        Ok(analytics_history.iter().cloned().collect())
1161    }
1162
1163    /// Export dashboard data for external visualization
1164    pub fn export_dashboard_data(&self) -> TorshResult<DashboardExport> {
1165        let current_analytics = self.get_current_analytics()?;
1166        let analytics_history = self.get_analytics_history()?;
1167
1168        Ok(DashboardExport {
1169            current_analytics,
1170            analytics_history,
1171            config: self.config.clone(),
1172            export_timestamp_ms: SystemTime::now()
1173                .duration_since(UNIX_EPOCH)
1174                .expect("time should be after UNIX_EPOCH")
1175                .as_millis() as u64,
1176        })
1177    }
1178
1179    /// Generate training summary report
1180    pub fn generate_training_summary(&self) -> TorshResult<TrainingSummaryReport> {
1181        let current_analytics = self.get_current_analytics()?.ok_or_else(|| {
1182            TorshDistributedError::communication_error(
1183                "summary",
1184                "No analytics data available".to_string(),
1185            )
1186        })?;
1187
1188        let analytics_history = self.get_analytics_history()?;
1189
1190        // Calculate summary statistics
1191        let total_runtime = if !analytics_history.is_empty() {
1192            let start_time = analytics_history
1193                .first()
1194                .expect("analytics_history should not be empty")
1195                .timestamp_ms;
1196            let end_time = current_analytics.timestamp_ms;
1197            Duration::from_millis(end_time - start_time)
1198        } else {
1199            Duration::from_secs(0)
1200        };
1201
1202        let avg_efficiency = if !analytics_history.is_empty() {
1203            analytics_history
1204                .iter()
1205                .map(|a| a.efficiency.overall_efficiency)
1206                .sum::<f32>()
1207                / analytics_history.len() as f32
1208        } else {
1209            0.0
1210        };
1211
1212        let peak_throughput = analytics_history
1213            .iter()
1214            .map(|a| a.performance.cluster_throughput)
1215            .fold(0.0f32, |a, b| a.max(b));
1216
1217        Ok(TrainingSummaryReport {
1218            current_epoch: current_analytics.performance.current_epoch,
1219            current_loss: current_analytics.performance.avg_loss,
1220            total_runtime,
1221            avg_efficiency,
1222            peak_throughput,
1223            total_incidents: analytics_history
1224                .iter()
1225                .map(|a| a.system_health.active_incidents)
1226                .sum(),
1227            convergence_rate: current_analytics.convergence.convergence_rate,
1228            resource_utilization_summary: ResourceUtilizationSummary {
1229                avg_cpu: current_analytics.resource_utilization.avg_cpu_utilization,
1230                avg_gpu: current_analytics.resource_utilization.avg_gpu_utilization,
1231                avg_memory: current_analytics
1232                    .resource_utilization
1233                    .avg_memory_utilization,
1234                peak_cpu: current_analytics.resource_utilization.peak_cpu,
1235                peak_gpu: current_analytics.resource_utilization.peak_gpu,
1236                peak_memory: current_analytics.resource_utilization.peak_memory,
1237            },
1238            optimization_recommendations: current_analytics.efficiency.recommendations,
1239            generated_at: SystemTime::now()
1240                .duration_since(UNIX_EPOCH)
1241                .expect("time should be after UNIX_EPOCH")
1242                .as_millis() as u64,
1243        })
1244    }
1245}
1246
1247/// Dashboard data export
1248#[derive(Debug, Clone, Serialize, Deserialize)]
1249pub struct DashboardExport {
1250    pub current_analytics: Option<TrainingAnalytics>,
1251    pub analytics_history: Vec<TrainingAnalytics>,
1252    pub config: DashboardConfig,
1253    pub export_timestamp_ms: u64,
1254}
1255
1256/// Training summary report
1257#[derive(Debug, Clone, Serialize, Deserialize)]
1258pub struct TrainingSummaryReport {
1259    pub current_epoch: u32,
1260    pub current_loss: f32,
1261    pub total_runtime: Duration,
1262    pub avg_efficiency: f32,
1263    pub peak_throughput: f32,
1264    pub total_incidents: usize,
1265    pub convergence_rate: f32,
1266    pub resource_utilization_summary: ResourceUtilizationSummary,
1267    pub optimization_recommendations: Vec<OptimizationRecommendation>,
1268    pub generated_at: u64,
1269}
1270
1271/// Resource utilization summary
1272#[derive(Debug, Clone, Serialize, Deserialize)]
1273pub struct ResourceUtilizationSummary {
1274    pub avg_cpu: f32,
1275    pub avg_gpu: f32,
1276    pub avg_memory: f32,
1277    pub peak_cpu: f32,
1278    pub peak_gpu: f32,
1279    pub peak_memory: f32,
1280}
1281
1282#[cfg(test)]
1283mod tests {
1284    use super::*;
1285    use crate::distributed_memory_optimization::{
1286        DistributedMemoryOptimizer, MemoryOptimizationConfig,
1287    };
1288    use crate::distributed_monitoring::{DistributedMonitor, MonitoringConfig};
1289    use crate::enhanced_fault_tolerance::{EnhancedFaultTolerance, FaultToleranceConfig};
1290
1291    #[tokio::test]
1292    async fn test_dashboard_creation() -> TorshResult<()> {
1293        let monitor_config = MonitoringConfig::default();
1294        let monitor = Arc::new(DistributedMonitor::new(monitor_config, false));
1295
1296        let ft_config = FaultToleranceConfig::default();
1297        let fault_tolerance = Arc::new(EnhancedFaultTolerance::new(ft_config, monitor.clone()));
1298
1299        let mem_config = MemoryOptimizationConfig::default();
1300        let memory_optimizer =
1301            Arc::new(DistributedMemoryOptimizer::new(mem_config, monitor.clone()));
1302
1303        let dashboard_config = DashboardConfig::default();
1304        let dashboard = TrainingAnalyticsDashboard::new(
1305            dashboard_config,
1306            monitor,
1307            fault_tolerance,
1308            memory_optimizer,
1309        );
1310
1311        let current_analytics = dashboard.get_current_analytics()?;
1312        assert!(current_analytics.is_none()); // No data initially
1313
1314        Ok(())
1315    }
1316
1317    #[tokio::test]
1318    async fn test_trend_analyzer() -> TorshResult<()> {
1319        let mut analyzer = TrendAnalyzer::new(Duration::from_secs(60));
1320
1321        // Add some data points
1322        for i in 0..20 {
1323            let timestamp = SystemTime::now()
1324                .duration_since(UNIX_EPOCH)
1325                .expect("time should be after UNIX_EPOCH")
1326                .as_millis() as u64
1327                + i * 1000;
1328            analyzer.update_loss(timestamp, 2.0 - i as f32 * 0.1); // Decreasing loss
1329        }
1330
1331        let loss_trend = analyzer.calculate_loss_trend();
1332        assert!(loss_trend < 0.0); // Should detect decreasing trend
1333
1334        let stability = analyzer.calculate_stability();
1335        // Note: Stability calculation may vary based on implementation
1336        assert!((0.0..=1.0).contains(&stability)); // Should be normalized
1337
1338        Ok(())
1339    }
1340
1341    #[tokio::test]
1342    async fn test_recommendation_engine() -> TorshResult<()> {
1343        let mut engine = RecommendationEngine::new();
1344
1345        // Add performance data indicating low GPU utilization
1346        for _ in 0..15 {
1347            engine.update_performance(
1348                100.0, // throughput
1349                0.6,   // efficiency
1350                80.0,  // CPU util
1351                50.0,  // GPU util (low)
1352                70.0,  // memory util
1353                2000,  // latency
1354            );
1355        }
1356
1357        let recommendations = engine.generate_recommendations();
1358        assert!(!recommendations.is_empty());
1359
1360        // Should recommend increasing GPU utilization
1361        let gpu_rec = recommendations.iter().find(|r| r.title.contains("GPU"));
1362        assert!(gpu_rec.is_some());
1363
1364        Ok(())
1365    }
1366}