#![allow(dead_code)]
use crate::distributed_memory_optimization::{
DistributedMemoryOptimizer, MemoryOptimizationStatus,
};
use crate::distributed_monitoring::{ClusterSummary, DistributedMonitor, NodeMetrics};
use crate::enhanced_fault_tolerance::{EnhancedFaultTolerance, FaultToleranceStatus};
use crate::{TorshDistributedError, TorshResult};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TrainingAnalytics {
pub performance: TrainingPerformanceAnalytics,
pub resource_utilization: ResourceUtilizationAnalytics,
pub communication: CommunicationAnalytics,
pub system_health: SystemHealthAnalytics,
pub convergence: ConvergenceAnalytics,
pub efficiency: EfficiencyAnalytics,
pub timestamp_ms: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TrainingPerformanceAnalytics {
pub current_epoch: u32,
pub avg_loss: f32,
pub loss_trend: f32,
pub cluster_throughput: f32,
pub throughput_efficiency: f32,
pub avg_batch_time_ms: u64,
pub batch_time_variance: f32,
pub training_stability: f32,
pub estimated_completion_time: Option<Duration>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourceUtilizationAnalytics {
pub avg_cpu_utilization: f32,
pub avg_gpu_utilization: f32,
pub avg_memory_utilization: f32,
pub utilization_balance: f32,
pub peak_cpu: f32,
pub peak_gpu: f32,
pub peak_memory: f32,
pub resource_efficiency: f32,
pub primary_bottleneck: ResourceBottleneck,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum ResourceBottleneck {
CPU,
GPU,
Memory,
Network,
Storage,
None,
}
impl std::fmt::Display for ResourceBottleneck {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ResourceBottleneck::CPU => write!(f, "CPU"),
ResourceBottleneck::GPU => write!(f, "GPU"),
ResourceBottleneck::Memory => write!(f, "Memory"),
ResourceBottleneck::Network => write!(f, "Network"),
ResourceBottleneck::Storage => write!(f, "Storage"),
ResourceBottleneck::None => write!(f, "None"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CommunicationAnalytics {
pub avg_latency_us: u64,
pub bandwidth_utilization: f32,
pub efficiency_score: f32,
pub failed_operations_rate: f32,
pub communication_patterns: CommunicationPatterns,
pub congestion_level: f32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CommunicationPatterns {
pub allreduce_frequency: f32,
pub allgather_frequency: f32,
pub p2p_frequency: f32,
pub gradient_sync_frequency: f32,
pub hotspots: Vec<CommunicationHotspot>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CommunicationHotspot {
pub source_node: String,
pub target_node: String,
pub traffic_volume: f32,
pub congestion_score: f32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SystemHealthAnalytics {
pub cluster_health_score: f32,
pub healthy_nodes: usize,
pub degraded_nodes: usize,
pub critical_nodes: usize,
pub failed_nodes: usize,
pub active_incidents: usize,
pub stability_trend: f32,
pub failure_probability: f32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConvergenceAnalytics {
pub convergence_rate: f32,
pub convergence_confidence: f32,
pub training_progress: f32,
pub loss_smoothness: f32,
pub gradient_norm_stats: GradientNormStats,
pub lr_effectiveness: f32,
pub overfitting_risk: f32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GradientNormStats {
pub avg_norm: f32,
pub norm_variance: f32,
pub norm_trend: f32,
pub explosion_risk: f32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EfficiencyAnalytics {
pub overall_efficiency: f32,
pub compute_efficiency: f32,
pub communication_efficiency: f32,
pub memory_efficiency: f32,
pub energy_efficiency: f32,
pub cost_efficiency: f32,
pub recommendations: Vec<OptimizationRecommendation>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OptimizationRecommendation {
pub category: RecommendationCategory,
pub title: String,
pub description: String,
pub expected_impact: f32,
pub difficulty: f32,
pub priority: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum RecommendationCategory {
Performance,
Efficiency,
Reliability,
Cost,
Scalability,
}
impl std::fmt::Display for RecommendationCategory {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RecommendationCategory::Performance => write!(f, "Performance"),
RecommendationCategory::Efficiency => write!(f, "Efficiency"),
RecommendationCategory::Reliability => write!(f, "Reliability"),
RecommendationCategory::Cost => write!(f, "Cost"),
RecommendationCategory::Scalability => write!(f, "Scalability"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DashboardConfig {
pub update_interval: Duration,
pub retention_period: Duration,
pub enable_predictions: bool,
pub enable_recommendations: bool,
pub aggregation_window: Duration,
pub alert_thresholds: DashboardAlertThresholds,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DashboardAlertThresholds {
pub efficiency_threshold: f32,
pub utilization_threshold: f32,
pub latency_threshold: u64,
pub convergence_threshold: f32,
}
impl Default for DashboardConfig {
fn default() -> Self {
Self {
update_interval: Duration::from_secs(10),
retention_period: Duration::from_secs(24 * 3600), enable_predictions: true,
enable_recommendations: true,
aggregation_window: Duration::from_secs(60),
alert_thresholds: DashboardAlertThresholds {
efficiency_threshold: 0.7,
utilization_threshold: 0.9,
latency_threshold: 10000,
convergence_threshold: 0.1,
},
}
}
}
pub struct TrainingAnalyticsDashboard {
config: DashboardConfig,
monitor: Arc<DistributedMonitor>,
fault_tolerance: Arc<EnhancedFaultTolerance>,
memory_optimizer: Arc<DistributedMemoryOptimizer>,
current_analytics: Arc<RwLock<Option<TrainingAnalytics>>>,
analytics_history: Arc<Mutex<VecDeque<TrainingAnalytics>>>,
trend_analyzer: Arc<Mutex<TrendAnalyzer>>,
recommendation_engine: Arc<Mutex<RecommendationEngine>>,
last_update: Arc<Mutex<Instant>>,
}
#[derive(Debug)]
struct TrendAnalyzer {
loss_history: VecDeque<(u64, f32)>, throughput_history: VecDeque<(u64, f32)>,
resource_history: VecDeque<(u64, ResourceSnapshot)>,
trend_window: Duration,
}
#[derive(Debug, Clone)]
struct ResourceSnapshot {
cpu: f32,
gpu: f32,
memory: f32,
}
impl TrendAnalyzer {
fn new(trend_window: Duration) -> Self {
Self {
loss_history: VecDeque::with_capacity(1000),
throughput_history: VecDeque::with_capacity(1000),
resource_history: VecDeque::with_capacity(1000),
trend_window,
}
}
fn update_loss(&mut self, timestamp: u64, loss: f32) {
self.loss_history.push_back((timestamp, loss));
self.cleanup_old_data(timestamp);
}
fn update_throughput(&mut self, timestamp: u64, throughput: f32) {
self.throughput_history.push_back((timestamp, throughput));
self.cleanup_old_data(timestamp);
}
fn update_resources(&mut self, timestamp: u64, cpu: f32, gpu: f32, memory: f32) {
self.resource_history
.push_back((timestamp, ResourceSnapshot { cpu, gpu, memory }));
self.cleanup_old_data(timestamp);
}
fn cleanup_old_data(&mut self, current_timestamp: u64) {
let cutoff = current_timestamp.saturating_sub(self.trend_window.as_millis() as u64);
self.loss_history.retain(|(ts, _)| *ts >= cutoff);
self.throughput_history.retain(|(ts, _)| *ts >= cutoff);
self.resource_history.retain(|(ts, _)| *ts >= cutoff);
}
fn calculate_loss_trend(&self) -> f32 {
if self.loss_history.len() < 10 {
return 0.0;
}
let recent_data: Vec<f32> = self
.loss_history
.iter()
.rev()
.take(10)
.map(|(_, loss)| *loss)
.collect();
let late_avg = recent_data[..5].iter().sum::<f32>() / 5.0;
let early_avg = recent_data[5..].iter().sum::<f32>() / (recent_data.len() - 5) as f32;
(late_avg - early_avg) / early_avg.max(0.001) }
fn calculate_throughput_trend(&self) -> f32 {
if self.throughput_history.len() < 10 {
return 0.0;
}
let recent_data: Vec<f32> = self
.throughput_history
.iter()
.rev()
.take(10)
.map(|(_, tput)| *tput)
.collect();
let early_avg = recent_data[5..].iter().sum::<f32>() / (recent_data.len() - 5) as f32;
let late_avg = recent_data[..5].iter().sum::<f32>() / 5.0;
(late_avg - early_avg) / early_avg.max(0.001) }
fn calculate_stability(&self) -> f32 {
if self.loss_history.len() < 20 {
return 0.5; }
let recent_losses: Vec<f32> = self
.loss_history
.iter()
.rev()
.take(20)
.map(|(_, loss)| *loss)
.collect();
let mean = recent_losses.iter().sum::<f32>() / recent_losses.len() as f32;
let variance = recent_losses
.iter()
.map(|&x| (x - mean).powi(2))
.sum::<f32>()
/ recent_losses.len() as f32;
let std_dev = variance.sqrt();
let cv = if mean > 0.001 { std_dev / mean } else { 1.0 };
(1.0 - cv.min(1.0)).max(0.0)
}
}
#[derive(Debug)]
struct RecommendationEngine {
performance_history: VecDeque<PerformanceSnapshot>,
recommendation_cache: Vec<OptimizationRecommendation>,
last_generation: Instant,
}
#[derive(Debug, Clone)]
struct PerformanceSnapshot {
timestamp: Instant,
throughput: f32,
efficiency: f32,
cpu_util: f32,
gpu_util: f32,
memory_util: f32,
communication_latency: u64,
}
impl RecommendationEngine {
fn new() -> Self {
Self {
performance_history: VecDeque::with_capacity(100),
recommendation_cache: Vec::new(),
last_generation: Instant::now(),
}
}
fn update_performance(
&mut self,
throughput: f32,
efficiency: f32,
cpu_util: f32,
gpu_util: f32,
memory_util: f32,
communication_latency: u64,
) {
let snapshot = PerformanceSnapshot {
timestamp: Instant::now(),
throughput,
efficiency,
cpu_util,
gpu_util,
memory_util,
communication_latency,
};
self.performance_history.push_back(snapshot);
if self.performance_history.len() > 100 {
self.performance_history.pop_front();
}
}
fn generate_recommendations(&mut self) -> Vec<OptimizationRecommendation> {
if !self.recommendation_cache.is_empty() && self.last_generation.elapsed().as_secs() < 300 {
return self.recommendation_cache.clone();
}
let mut recommendations = Vec::new();
if self.performance_history.len() < 10 {
return recommendations;
}
let recent_perf: Vec<&PerformanceSnapshot> =
self.performance_history.iter().rev().take(10).collect();
let avg_gpu_util =
recent_perf.iter().map(|p| p.gpu_util).sum::<f32>() / recent_perf.len() as f32;
if avg_gpu_util < 70.0 {
recommendations.push(OptimizationRecommendation {
category: RecommendationCategory::Performance,
title: "Increase GPU Utilization".to_string(),
description: format!("GPU utilization is at {:.1}%. Consider increasing batch size or adjusting data loading.", avg_gpu_util),
expected_impact: 0.8,
difficulty: 0.3,
priority: 4,
});
}
let avg_memory_util =
recent_perf.iter().map(|p| p.memory_util).sum::<f32>() / recent_perf.len() as f32;
if avg_memory_util > 90.0 {
recommendations.push(OptimizationRecommendation {
category: RecommendationCategory::Efficiency,
title: "Optimize Memory Usage".to_string(),
description: format!("Memory utilization is at {:.1}%. Consider enabling gradient checkpointing or reducing batch size.", avg_memory_util),
expected_impact: 0.6,
difficulty: 0.4,
priority: 3,
});
}
let avg_latency = recent_perf
.iter()
.map(|p| p.communication_latency)
.sum::<u64>()
/ recent_perf.len() as u64;
if avg_latency > 5000 {
recommendations.push(OptimizationRecommendation {
category: RecommendationCategory::Performance,
title: "Optimize Communication".to_string(),
description: format!("Communication latency is {}μs. Consider gradient compression or improving network connectivity.", avg_latency),
expected_impact: 0.7,
difficulty: 0.6,
priority: 3,
});
}
let efficiency_trend = self.calculate_efficiency_trend(&recent_perf);
if efficiency_trend < -0.1 {
recommendations.push(OptimizationRecommendation {
category: RecommendationCategory::Efficiency,
title: "Address Efficiency Decline".to_string(),
description: "Training efficiency is declining. Review resource allocation and check for bottlenecks.".to_string(),
expected_impact: 0.9,
difficulty: 0.7,
priority: 5,
});
}
let throughput_variance = self.calculate_throughput_variance(&recent_perf);
if throughput_variance > 0.2 {
recommendations.push(OptimizationRecommendation {
category: RecommendationCategory::Scalability,
title: "Improve Load Balancing".to_string(),
description: "High throughput variance detected. Consider redistributing workload across nodes.".to_string(),
expected_impact: 0.5,
difficulty: 0.8,
priority: 2,
});
}
recommendations.sort_by(|a, b| b.priority.cmp(&a.priority));
self.recommendation_cache = recommendations.clone();
self.last_generation = Instant::now();
recommendations
}
fn calculate_efficiency_trend(&self, recent_perf: &[&PerformanceSnapshot]) -> f32 {
if recent_perf.len() < 6 {
return 0.0;
}
let early_efficiency: f32 = recent_perf[3..].iter().map(|p| p.efficiency).sum::<f32>()
/ (recent_perf.len() - 3) as f32;
let late_efficiency: f32 = recent_perf[..3].iter().map(|p| p.efficiency).sum::<f32>() / 3.0;
(late_efficiency - early_efficiency) / early_efficiency.max(0.001)
}
fn calculate_throughput_variance(&self, recent_perf: &[&PerformanceSnapshot]) -> f32 {
if recent_perf.len() < 5 {
return 0.0;
}
let throughputs: Vec<f32> = recent_perf.iter().map(|p| p.throughput).collect();
let mean = throughputs.iter().sum::<f32>() / throughputs.len() as f32;
let variance =
throughputs.iter().map(|&x| (x - mean).powi(2)).sum::<f32>() / throughputs.len() as f32;
if mean > 0.001 {
variance.sqrt() / mean } else {
0.0
}
}
}
impl TrainingAnalyticsDashboard {
pub fn new(
config: DashboardConfig,
monitor: Arc<DistributedMonitor>,
fault_tolerance: Arc<EnhancedFaultTolerance>,
memory_optimizer: Arc<DistributedMemoryOptimizer>,
) -> Self {
Self {
config: config.clone(),
monitor,
fault_tolerance,
memory_optimizer,
current_analytics: Arc::new(RwLock::new(None)),
analytics_history: Arc::new(Mutex::new(VecDeque::new())),
trend_analyzer: Arc::new(Mutex::new(TrendAnalyzer::new(config.aggregation_window))),
recommendation_engine: Arc::new(Mutex::new(RecommendationEngine::new())),
last_update: Arc::new(Mutex::new(Instant::now())),
}
}
pub fn update_analytics(&self) -> TorshResult<()> {
{
let last_update = self.last_update.lock().map_err(|e| {
TorshDistributedError::communication_error(
"last_update",
format!("Lock error: {}", e),
)
})?;
if last_update.elapsed() < self.config.update_interval {
return Ok(());
}
}
let cluster_summary = self.monitor.get_cluster_summary().ok();
let fault_tolerance_status = self.fault_tolerance.get_status()?;
let memory_optimization_status = self.memory_optimizer.get_optimization_status()?;
let analytics = self.generate_training_analytics(
cluster_summary,
fault_tolerance_status,
memory_optimization_status,
)?;
{
let mut trend_analyzer = self.trend_analyzer.lock().map_err(|e| {
TorshDistributedError::communication_error(
"trend_analyzer",
format!("Lock error: {}", e),
)
})?;
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("time should be after UNIX_EPOCH")
.as_millis() as u64;
trend_analyzer.update_loss(timestamp, analytics.performance.avg_loss);
trend_analyzer.update_throughput(timestamp, analytics.performance.cluster_throughput);
trend_analyzer.update_resources(
timestamp,
analytics.resource_utilization.avg_cpu_utilization,
analytics.resource_utilization.avg_gpu_utilization,
analytics.resource_utilization.avg_memory_utilization,
);
}
if self.config.enable_recommendations {
let mut recommendation_engine = self.recommendation_engine.lock().map_err(|e| {
TorshDistributedError::communication_error(
"recommendation_engine",
format!("Lock error: {}", e),
)
})?;
recommendation_engine.update_performance(
analytics.performance.cluster_throughput,
analytics.efficiency.overall_efficiency,
analytics.resource_utilization.avg_cpu_utilization,
analytics.resource_utilization.avg_gpu_utilization,
analytics.resource_utilization.avg_memory_utilization,
analytics.communication.avg_latency_us,
);
}
{
let mut current_analytics = self.current_analytics.write().map_err(|e| {
TorshDistributedError::communication_error(
"current_analytics",
format!("Lock error: {}", e),
)
})?;
*current_analytics = Some(analytics.clone());
}
{
let mut analytics_history = self.analytics_history.lock().map_err(|e| {
TorshDistributedError::communication_error(
"analytics_history",
format!("Lock error: {}", e),
)
})?;
analytics_history.push_back(analytics);
let retention_cutoff = (SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("time should be after UNIX_EPOCH")
.as_millis() as u64)
.saturating_sub(self.config.retention_period.as_millis() as u64);
analytics_history.retain(|a| a.timestamp_ms >= retention_cutoff);
}
{
let mut last_update = self.last_update.lock().map_err(|e| {
TorshDistributedError::communication_error(
"last_update",
format!("Lock error: {}", e),
)
})?;
*last_update = Instant::now();
}
Ok(())
}
fn generate_training_analytics(
&self,
cluster_summary: Option<ClusterSummary>,
fault_tolerance_status: FaultToleranceStatus,
memory_optimization_status: MemoryOptimizationStatus,
) -> TorshResult<TrainingAnalytics> {
let timestamp_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time should be after UNIX_EPOCH")
.as_millis() as u64;
let current_metrics = self.monitor.get_current_metrics()?;
let performance =
self.generate_performance_analytics(¤t_metrics, &cluster_summary)?;
let resource_utilization =
self.generate_resource_analytics(&cluster_summary, &memory_optimization_status)?;
let communication = self.generate_communication_analytics(¤t_metrics)?;
let system_health = self.generate_system_health_analytics(&fault_tolerance_status)?;
let convergence = self.generate_convergence_analytics(¤t_metrics)?;
let efficiency = self.generate_efficiency_analytics(
&performance,
&resource_utilization,
&communication,
)?;
Ok(TrainingAnalytics {
performance,
resource_utilization,
communication,
system_health,
convergence,
efficiency,
timestamp_ms,
})
}
fn generate_performance_analytics(
&self,
current_metrics: &Option<NodeMetrics>,
cluster_summary: &Option<ClusterSummary>,
) -> TorshResult<TrainingPerformanceAnalytics> {
let (current_epoch, avg_loss, cluster_throughput, avg_batch_time_ms) =
if let Some(metrics) = current_metrics {
(
metrics.training_metrics.epoch,
metrics.training_metrics.loss,
metrics.training_metrics.throughput_samples_per_sec
* cluster_summary.as_ref().map(|s| s.total_nodes).unwrap_or(1) as f32,
metrics.training_metrics.batch_time_ms,
)
} else {
(0, 0.0, 0.0, 0)
};
let (loss_trend, training_stability) = {
let trend_analyzer = self.trend_analyzer.lock().map_err(|e| {
TorshDistributedError::communication_error(
"trend_analyzer",
format!("Lock error: {}", e),
)
})?;
(
trend_analyzer.calculate_loss_trend(),
trend_analyzer.calculate_stability(),
)
};
let theoretical_max_throughput =
cluster_summary.as_ref().map(|s| s.total_nodes).unwrap_or(1) as f32 * 100.0; let throughput_efficiency = if theoretical_max_throughput > 0.0 {
(cluster_throughput / theoretical_max_throughput).min(1.0)
} else {
0.0
};
let batch_time_variance = 0.1;
let estimated_completion_time = if cluster_throughput > 0.0 && current_epoch < 100 {
let remaining_epochs = 100 - current_epoch;
let samples_per_epoch = 10000; let remaining_samples = remaining_epochs as f32 * samples_per_epoch as f32;
let remaining_seconds = remaining_samples / cluster_throughput;
Some(Duration::from_secs(remaining_seconds as u64))
} else {
None
};
Ok(TrainingPerformanceAnalytics {
current_epoch,
avg_loss,
loss_trend,
cluster_throughput,
throughput_efficiency,
avg_batch_time_ms,
batch_time_variance,
training_stability,
estimated_completion_time,
})
}
fn generate_resource_analytics(
&self,
cluster_summary: &Option<ClusterSummary>,
memory_status: &MemoryOptimizationStatus,
) -> TorshResult<ResourceUtilizationAnalytics> {
let (avg_cpu_utilization, avg_gpu_utilization) = if let Some(summary) = cluster_summary {
(summary.avg_cpu_utilization, summary.avg_gpu_utilization)
} else {
(0.0, 0.0)
};
let avg_memory_utilization = memory_status.avg_memory_utilization;
let utilizations = [
avg_cpu_utilization,
avg_gpu_utilization,
avg_memory_utilization,
];
let max_util = utilizations.iter().fold(0.0f32, |a, &b| a.max(b));
let min_util = utilizations.iter().fold(100.0f32, |a, &b| a.min(b));
let utilization_balance = if max_util > 0.0 {
1.0 - (max_util - min_util) / max_util
} else {
1.0
};
let peak_cpu = avg_cpu_utilization * 1.2;
let peak_gpu = avg_gpu_utilization * 1.15;
let peak_memory = avg_memory_utilization * 1.1;
let resource_efficiency =
(avg_cpu_utilization + avg_gpu_utilization + avg_memory_utilization) / 300.0;
let primary_bottleneck = if avg_gpu_utilization < 60.0 {
ResourceBottleneck::GPU
} else if avg_memory_utilization > 90.0 {
ResourceBottleneck::Memory
} else if avg_cpu_utilization > 95.0 {
ResourceBottleneck::CPU
} else {
ResourceBottleneck::None
};
Ok(ResourceUtilizationAnalytics {
avg_cpu_utilization,
avg_gpu_utilization,
avg_memory_utilization,
utilization_balance,
peak_cpu,
peak_gpu,
peak_memory,
resource_efficiency,
primary_bottleneck,
})
}
fn generate_communication_analytics(
&self,
current_metrics: &Option<NodeMetrics>,
) -> TorshResult<CommunicationAnalytics> {
let (avg_latency_us, bandwidth_utilization, efficiency_score, failed_operations_rate) =
if let Some(metrics) = current_metrics {
let comm = &metrics.communication_metrics;
(
comm.avg_latency_us,
comm.comm_bandwidth_mbps / 1000.0, comm.efficiency_score,
comm.failed_ops_count as f32 / 100.0, )
} else {
(0, 0.0, 0.0, 0.0)
};
let communication_patterns = CommunicationPatterns {
allreduce_frequency: 10.0,
allgather_frequency: 5.0,
p2p_frequency: 2.0,
gradient_sync_frequency: 8.0,
hotspots: vec![CommunicationHotspot {
source_node: "node_0".to_string(),
target_node: "node_1".to_string(),
traffic_volume: 50.0,
congestion_score: 0.3,
}],
};
let congestion_level = if avg_latency_us > 5000 { 0.7 } else { 0.2 };
Ok(CommunicationAnalytics {
avg_latency_us,
bandwidth_utilization,
efficiency_score,
failed_operations_rate,
communication_patterns,
congestion_level,
})
}
fn generate_system_health_analytics(
&self,
fault_tolerance_status: &FaultToleranceStatus,
) -> TorshResult<SystemHealthAnalytics> {
let cluster_health_score = fault_tolerance_status.system_health_score;
let healthy_nodes = fault_tolerance_status.healthy_nodes;
let degraded_nodes = fault_tolerance_status.excluded_nodes; let critical_nodes = 0; let failed_nodes = fault_tolerance_status
.total_nodes
.saturating_sub(fault_tolerance_status.healthy_nodes);
let active_incidents = fault_tolerance_status.active_incidents;
let stability_trend = if cluster_health_score > 0.8 {
0.1
} else {
-0.1
};
let failure_probability = (1.0 - cluster_health_score).max(0.0);
Ok(SystemHealthAnalytics {
cluster_health_score,
healthy_nodes,
degraded_nodes,
critical_nodes,
failed_nodes,
active_incidents,
stability_trend,
failure_probability,
})
}
fn generate_convergence_analytics(
&self,
current_metrics: &Option<NodeMetrics>,
) -> TorshResult<ConvergenceAnalytics> {
let (loss, gradient_norm) = if let Some(metrics) = current_metrics {
(
metrics.training_metrics.loss,
metrics.training_metrics.gradient_norm,
)
} else {
(0.0, 0.0)
};
let convergence_rate = {
let trend_analyzer = self.trend_analyzer.lock().map_err(|e| {
TorshDistributedError::communication_error(
"trend_analyzer",
format!("Lock error: {}", e),
)
})?;
-trend_analyzer.calculate_loss_trend() };
let convergence_confidence = if convergence_rate > 0.0 { 0.8 } else { 0.3 };
let training_progress = if loss > 0.0 {
(1.0 / (loss + 1.0)).min(0.95)
} else {
0.0
};
let loss_smoothness = 0.7;
let gradient_norm_stats = GradientNormStats {
avg_norm: gradient_norm,
norm_variance: gradient_norm * 0.1, norm_trend: 0.0, explosion_risk: if gradient_norm > 10.0 { 0.8 } else { 0.1 },
};
let lr_effectiveness = if convergence_rate > 0.0 { 0.7 } else { 0.3 };
let overfitting_risk = if training_progress > 0.8 { 0.6 } else { 0.2 };
Ok(ConvergenceAnalytics {
convergence_rate,
convergence_confidence,
training_progress,
loss_smoothness,
gradient_norm_stats,
lr_effectiveness,
overfitting_risk,
})
}
fn generate_efficiency_analytics(
&self,
performance: &TrainingPerformanceAnalytics,
resource_utilization: &ResourceUtilizationAnalytics,
communication: &CommunicationAnalytics,
) -> TorshResult<EfficiencyAnalytics> {
let compute_efficiency = resource_utilization.resource_efficiency;
let communication_efficiency = communication.efficiency_score;
let memory_efficiency =
1.0 - (resource_utilization.avg_memory_utilization / 100.0 - 0.8).max(0.0) * 5.0;
let overall_efficiency =
(compute_efficiency + communication_efficiency + memory_efficiency) / 3.0;
let energy_efficiency = overall_efficiency * 0.8;
let cost_efficiency = overall_efficiency * performance.throughput_efficiency;
let recommendations = if self.config.enable_recommendations {
let mut recommendation_engine = self.recommendation_engine.lock().map_err(|e| {
TorshDistributedError::communication_error(
"recommendation_engine",
format!("Lock error: {}", e),
)
})?;
recommendation_engine.generate_recommendations()
} else {
Vec::new()
};
Ok(EfficiencyAnalytics {
overall_efficiency,
compute_efficiency,
communication_efficiency,
memory_efficiency,
energy_efficiency,
cost_efficiency,
recommendations,
})
}
pub fn get_current_analytics(&self) -> TorshResult<Option<TrainingAnalytics>> {
let current_analytics = self.current_analytics.read().map_err(|e| {
TorshDistributedError::communication_error(
"get_current_analytics",
format!("Lock error: {}", e),
)
})?;
Ok(current_analytics.clone())
}
pub fn get_analytics_history(&self) -> TorshResult<Vec<TrainingAnalytics>> {
let analytics_history = self.analytics_history.lock().map_err(|e| {
TorshDistributedError::communication_error(
"get_analytics_history",
format!("Lock error: {}", e),
)
})?;
Ok(analytics_history.iter().cloned().collect())
}
pub fn export_dashboard_data(&self) -> TorshResult<DashboardExport> {
let current_analytics = self.get_current_analytics()?;
let analytics_history = self.get_analytics_history()?;
Ok(DashboardExport {
current_analytics,
analytics_history,
config: self.config.clone(),
export_timestamp_ms: SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("time should be after UNIX_EPOCH")
.as_millis() as u64,
})
}
pub fn generate_training_summary(&self) -> TorshResult<TrainingSummaryReport> {
let current_analytics = self.get_current_analytics()?.ok_or_else(|| {
TorshDistributedError::communication_error(
"summary",
"No analytics data available".to_string(),
)
})?;
let analytics_history = self.get_analytics_history()?;
let total_runtime = if !analytics_history.is_empty() {
let start_time = analytics_history
.first()
.expect("analytics_history should not be empty")
.timestamp_ms;
let end_time = current_analytics.timestamp_ms;
Duration::from_millis(end_time - start_time)
} else {
Duration::from_secs(0)
};
let avg_efficiency = if !analytics_history.is_empty() {
analytics_history
.iter()
.map(|a| a.efficiency.overall_efficiency)
.sum::<f32>()
/ analytics_history.len() as f32
} else {
0.0
};
let peak_throughput = analytics_history
.iter()
.map(|a| a.performance.cluster_throughput)
.fold(0.0f32, |a, b| a.max(b));
Ok(TrainingSummaryReport {
current_epoch: current_analytics.performance.current_epoch,
current_loss: current_analytics.performance.avg_loss,
total_runtime,
avg_efficiency,
peak_throughput,
total_incidents: analytics_history
.iter()
.map(|a| a.system_health.active_incidents)
.sum(),
convergence_rate: current_analytics.convergence.convergence_rate,
resource_utilization_summary: ResourceUtilizationSummary {
avg_cpu: current_analytics.resource_utilization.avg_cpu_utilization,
avg_gpu: current_analytics.resource_utilization.avg_gpu_utilization,
avg_memory: current_analytics
.resource_utilization
.avg_memory_utilization,
peak_cpu: current_analytics.resource_utilization.peak_cpu,
peak_gpu: current_analytics.resource_utilization.peak_gpu,
peak_memory: current_analytics.resource_utilization.peak_memory,
},
optimization_recommendations: current_analytics.efficiency.recommendations,
generated_at: SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("time should be after UNIX_EPOCH")
.as_millis() as u64,
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DashboardExport {
pub current_analytics: Option<TrainingAnalytics>,
pub analytics_history: Vec<TrainingAnalytics>,
pub config: DashboardConfig,
pub export_timestamp_ms: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TrainingSummaryReport {
pub current_epoch: u32,
pub current_loss: f32,
pub total_runtime: Duration,
pub avg_efficiency: f32,
pub peak_throughput: f32,
pub total_incidents: usize,
pub convergence_rate: f32,
pub resource_utilization_summary: ResourceUtilizationSummary,
pub optimization_recommendations: Vec<OptimizationRecommendation>,
pub generated_at: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourceUtilizationSummary {
pub avg_cpu: f32,
pub avg_gpu: f32,
pub avg_memory: f32,
pub peak_cpu: f32,
pub peak_gpu: f32,
pub peak_memory: f32,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::distributed_memory_optimization::{
DistributedMemoryOptimizer, MemoryOptimizationConfig,
};
use crate::distributed_monitoring::{DistributedMonitor, MonitoringConfig};
use crate::enhanced_fault_tolerance::{EnhancedFaultTolerance, FaultToleranceConfig};
#[tokio::test]
async fn test_dashboard_creation() -> TorshResult<()> {
let monitor_config = MonitoringConfig::default();
let monitor = Arc::new(DistributedMonitor::new(monitor_config, false));
let ft_config = FaultToleranceConfig::default();
let fault_tolerance = Arc::new(EnhancedFaultTolerance::new(ft_config, monitor.clone()));
let mem_config = MemoryOptimizationConfig::default();
let memory_optimizer =
Arc::new(DistributedMemoryOptimizer::new(mem_config, monitor.clone()));
let dashboard_config = DashboardConfig::default();
let dashboard = TrainingAnalyticsDashboard::new(
dashboard_config,
monitor,
fault_tolerance,
memory_optimizer,
);
let current_analytics = dashboard.get_current_analytics()?;
assert!(current_analytics.is_none());
Ok(())
}
#[tokio::test]
async fn test_trend_analyzer() -> TorshResult<()> {
let mut analyzer = TrendAnalyzer::new(Duration::from_secs(60));
for i in 0..20 {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("time should be after UNIX_EPOCH")
.as_millis() as u64
+ i * 1000;
analyzer.update_loss(timestamp, 2.0 - i as f32 * 0.1); }
let loss_trend = analyzer.calculate_loss_trend();
assert!(loss_trend < 0.0);
let stability = analyzer.calculate_stability();
assert!((0.0..=1.0).contains(&stability));
Ok(())
}
#[tokio::test]
async fn test_recommendation_engine() -> TorshResult<()> {
let mut engine = RecommendationEngine::new();
for _ in 0..15 {
engine.update_performance(
100.0, 0.6, 80.0, 50.0, 70.0, 2000, );
}
let recommendations = engine.generate_recommendations();
assert!(!recommendations.is_empty());
let gpu_rec = recommendations.iter().find(|r| r.title.contains("GPU"));
assert!(gpu_rec.is_some());
Ok(())
}
}