scirs2_cluster/distributed/
monitoring.rs

1//! Performance monitoring and metrics collection for distributed clustering
2//!
3//! This module provides comprehensive monitoring capabilities including
4//! performance metrics, resource usage tracking, and system health analysis.
5
6use std::collections::{HashMap, VecDeque};
7use std::sync::{Arc, Mutex};
8use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
9
10use crate::error::{ClusteringError, Result};
11
12/// Performance monitoring coordinator
13#[derive(Debug)]
14pub struct PerformanceMonitor {
15    pub metrics_history: Arc<Mutex<VecDeque<PerformanceMetrics>>>,
16    pub resource_usage: Arc<Mutex<VecDeque<ResourceUsage>>>,
17    pub worker_metrics: HashMap<usize, WorkerMetrics>,
18    pub config: MonitoringConfig,
19    pub alert_thresholds: AlertThresholds,
20    pub start_time: Instant,
21}
22
23/// Configuration for performance monitoring
24#[derive(Debug, Clone)]
25pub struct MonitoringConfig {
26    pub enable_detailed_monitoring: bool,
27    pub metrics_collection_interval_ms: u64,
28    pub max_history_size: usize,
29    pub enable_resource_monitoring: bool,
30    pub enable_network_monitoring: bool,
31    pub enable_predictive_analytics: bool,
32    pub export_metrics: bool,
33    pub alert_on_anomalies: bool,
34}
35
36impl Default for MonitoringConfig {
37    fn default() -> Self {
38        Self {
39            enable_detailed_monitoring: true,
40            metrics_collection_interval_ms: 1000,
41            max_history_size: 1000,
42            enable_resource_monitoring: true,
43            enable_network_monitoring: false,
44            enable_predictive_analytics: false,
45            export_metrics: false,
46            alert_on_anomalies: true,
47        }
48    }
49}
50
51/// Alert thresholds for monitoring
52#[derive(Debug, Clone)]
53pub struct AlertThresholds {
54    pub max_convergence_time_ms: u64,
55    pub min_worker_efficiency: f64,
56    pub max_memory_utilization: f64,
57    pub max_cpu_utilization: f64,
58    pub max_message_latency_ms: f64,
59    pub max_sync_overhead_ms: f64,
60    pub min_throughput_threshold: f64,
61}
62
63impl Default for AlertThresholds {
64    fn default() -> Self {
65        Self {
66            max_convergence_time_ms: 300000, // 5 minutes
67            min_worker_efficiency: 0.6,
68            max_memory_utilization: 0.9,
69            max_cpu_utilization: 0.95,
70            max_message_latency_ms: 1000.0,
71            max_sync_overhead_ms: 5000.0,
72            min_throughput_threshold: 10.0,
73        }
74    }
75}
76
77/// System performance metrics
78#[derive(Debug, Clone)]
79pub struct PerformanceMetrics {
80    pub timestamp: SystemTime,
81    pub iteration: usize,
82    pub global_inertia: f64,
83    pub convergence_rate: f64,
84    pub worker_efficiency: f64,
85    pub message_latency_ms: f64,
86    pub sync_overhead_ms: f64,
87    pub total_computation_time_ms: u64,
88    pub memory_pressure_score: f64,
89    pub load_balance_score: f64,
90    pub network_utilization: f64,
91}
92
93/// Resource usage metrics
94#[derive(Debug, Clone)]
95pub struct ResourceUsage {
96    pub timestamp: SystemTime,
97    pub cpu_utilization: f64,
98    pub memory_utilization: f64,
99    pub network_throughput_mbps: f64,
100    pub disk_io_rate: f64,
101    pub active_workers: usize,
102    pub failed_workers: usize,
103    pub queue_depth: usize,
104    pub cache_hit_ratio: f64,
105}
106
107/// Worker-specific metrics
108#[derive(Debug, Clone)]
109pub struct WorkerMetrics {
110    pub worker_id: usize,
111    pub cpu_usage_history: VecDeque<f64>,
112    pub memory_usage_history: VecDeque<f64>,
113    pub throughput_history: VecDeque<f64>,
114    pub latency_history: VecDeque<f64>,
115    pub error_count: usize,
116    pub last_update: SystemTime,
117    pub health_score: f64,
118}
119
120/// Performance alert
121#[derive(Debug, Clone)]
122pub struct PerformanceAlert {
123    pub alert_type: AlertType,
124    pub severity: AlertSeverity,
125    pub message: String,
126    pub timestamp: SystemTime,
127    pub worker_id: Option<usize>,
128    pub metric_value: f64,
129    pub threshold: f64,
130}
131
132/// Types of performance alerts
133#[derive(Debug, Clone)]
134pub enum AlertType {
135    HighCpuUsage,
136    HighMemoryUsage,
137    HighLatency,
138    LowThroughput,
139    WorkerFailure,
140    ConvergenceTimeout,
141    LoadImbalance,
142    NetworkCongestion,
143    ResourceExhaustion,
144    AnomalyDetected,
145}
146
147/// Alert severity levels
148#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
149pub enum AlertSeverity {
150    Critical,
151    Warning,
152    Info,
153}
154
155/// System efficiency analysis
156#[derive(Debug, Clone)]
157pub struct EfficiencyAnalysis {
158    pub overall_efficiency: f64,
159    pub bottleneck_analysis: BottleneckAnalysis,
160    pub resource_utilization: HashMap<String, f64>,
161    pub performance_trends: PerformanceTrends,
162    pub optimization_recommendations: Vec<String>,
163}
164
165/// Bottleneck analysis results
166#[derive(Debug, Clone)]
167pub struct BottleneckAnalysis {
168    pub primary_bottleneck: BottleneckType,
169    pub bottleneck_severity: f64,
170    pub affected_workers: Vec<usize>,
171    pub estimated_impact: f64,
172}
173
174/// Types of system bottlenecks
175#[derive(Debug, Clone, PartialEq, Eq, Hash)]
176pub enum BottleneckType {
177    Cpu,
178    Memory,
179    Network,
180    Disk,
181    Synchronization,
182    LoadImbalance,
183    MessagePassing,
184    None,
185}
186
187/// Performance trend analysis
188#[derive(Debug, Clone)]
189pub struct PerformanceTrends {
190    pub throughput_trend: TrendDirection,
191    pub latency_trend: TrendDirection,
192    pub efficiency_trend: TrendDirection,
193    pub resource_trend: TrendDirection,
194    pub trend_confidence: f64,
195}
196
197/// Trend direction indicators
198#[derive(Debug, Clone, Copy)]
199pub enum TrendDirection {
200    Improving,
201    Stable,
202    Degrading,
203    Unknown,
204}
205
206impl PerformanceMonitor {
207    /// Create new performance monitor
208    pub fn new(config: MonitoringConfig) -> Self {
209        Self {
210            metrics_history: Arc::new(Mutex::new(VecDeque::new())),
211            resource_usage: Arc::new(Mutex::new(VecDeque::new())),
212            worker_metrics: HashMap::new(),
213            config,
214            alert_thresholds: AlertThresholds::default(),
215            start_time: Instant::now(),
216        }
217    }
218
219    /// Register worker for monitoring
220    pub fn register_worker(&mut self, workerid: usize) {
221        let worker_metrics = WorkerMetrics {
222            worker_id: workerid,
223            cpu_usage_history: VecDeque::new(),
224            memory_usage_history: VecDeque::new(),
225            throughput_history: VecDeque::new(),
226            latency_history: VecDeque::new(),
227            error_count: 0,
228            last_update: SystemTime::now(),
229            health_score: 1.0,
230        };
231
232        self.worker_metrics.insert(workerid, worker_metrics);
233    }
234
235    /// Record performance metrics
236    pub fn record_performance_metrics(&self, metrics: PerformanceMetrics) -> Result<()> {
237        let mut history = self.metrics_history.lock().map_err(|_| {
238            ClusteringError::InvalidInput("Failed to acquire metrics lock".to_string())
239        })?;
240
241        history.push_back(metrics);
242
243        // Maintain history size limit
244        while history.len() > self.config.max_history_size {
245            history.pop_front();
246        }
247
248        Ok(())
249    }
250
251    /// Record resource usage metrics
252    pub fn record_resource_usage(&self, usage: ResourceUsage) -> Result<()> {
253        if !self.config.enable_resource_monitoring {
254            return Ok(());
255        }
256
257        let mut usage_history = self.resource_usage.lock().map_err(|_| {
258            ClusteringError::InvalidInput("Failed to acquire resource usage lock".to_string())
259        })?;
260
261        usage_history.push_back(usage);
262
263        // Maintain history size limit
264        while usage_history.len() > self.config.max_history_size {
265            usage_history.pop_front();
266        }
267
268        Ok(())
269    }
270
271    /// Update worker metrics
272    pub fn update_worker_metrics(
273        &mut self,
274        worker_id: usize,
275        cpu_usage: f64,
276        memory_usage: f64,
277        throughput: f64,
278        latency: f64,
279    ) -> Result<()> {
280        if let Some(metrics) = self.worker_metrics.get_mut(&worker_id) {
281            metrics.cpu_usage_history.push_back(cpu_usage);
282            metrics.memory_usage_history.push_back(memory_usage);
283            metrics.throughput_history.push_back(throughput);
284            metrics.latency_history.push_back(latency);
285            metrics.last_update = SystemTime::now();
286
287            // Maintain history size
288            let max_size = 100;
289            if metrics.cpu_usage_history.len() > max_size {
290                metrics.cpu_usage_history.pop_front();
291            }
292            if metrics.memory_usage_history.len() > max_size {
293                metrics.memory_usage_history.pop_front();
294            }
295            if metrics.throughput_history.len() > max_size {
296                metrics.throughput_history.pop_front();
297            }
298            if metrics.latency_history.len() > max_size {
299                metrics.latency_history.pop_front();
300            }
301        }
302
303        // Update health score after all metrics updates
304        if let Some(metrics) = self.worker_metrics.get(&worker_id) {
305            let health_score = self.calculate_worker_health_score(metrics);
306            if let Some(metrics_mut) = self.worker_metrics.get_mut(&worker_id) {
307                metrics_mut.health_score = health_score;
308            }
309        }
310
311        Ok(())
312    }
313
314    /// Calculate worker health score
315    fn calculate_worker_health_score(&self, metrics: &WorkerMetrics) -> f64 {
316        let mut score = 1.0;
317
318        // CPU usage component
319        if !metrics.cpu_usage_history.is_empty() {
320            let avg_cpu = metrics.cpu_usage_history.iter().sum::<f64>()
321                / metrics.cpu_usage_history.len() as f64;
322            score *= (1.0 - (avg_cpu - 0.8).max(0.0) * 2.0).max(0.0);
323        }
324
325        // Memory usage component
326        if !metrics.memory_usage_history.is_empty() {
327            let avg_memory = metrics.memory_usage_history.iter().sum::<f64>()
328                / metrics.memory_usage_history.len() as f64;
329            score *= (1.0 - (avg_memory - 0.85).max(0.0) * 3.0).max(0.0);
330        }
331
332        // Latency component
333        if !metrics.latency_history.is_empty() {
334            let avg_latency =
335                metrics.latency_history.iter().sum::<f64>() / metrics.latency_history.len() as f64;
336            let latency_penalty = (avg_latency / 1000.0).min(1.0) * 0.3;
337            score *= (1.0 - latency_penalty).max(0.0);
338        }
339
340        // Error rate component
341        let time_window_hours = 1.0; // Consider last hour
342        let error_rate = metrics.error_count as f64 / time_window_hours;
343        let error_penalty = (error_rate / 10.0).min(0.5); // Max 50% penalty for errors
344        score *= (1.0 - error_penalty).max(0.0);
345
346        score.max(0.0).min(1.0)
347    }
348
349    /// Check for performance alerts
350    pub fn check_alerts(&self) -> Result<Vec<PerformanceAlert>> {
351        if !self.config.alert_on_anomalies {
352            return Ok(Vec::new());
353        }
354
355        let mut alerts = Vec::new();
356
357        // Check latest metrics against thresholds
358        let metrics_history = self.metrics_history.lock().map_err(|_| {
359            ClusteringError::InvalidInput("Failed to acquire metrics lock".to_string())
360        })?;
361
362        if let Some(latest_metrics) = metrics_history.back() {
363            // Check convergence time
364            if latest_metrics.total_computation_time_ms
365                > self.alert_thresholds.max_convergence_time_ms
366            {
367                alerts.push(PerformanceAlert {
368                    alert_type: AlertType::ConvergenceTimeout,
369                    severity: AlertSeverity::Warning,
370                    message: format!(
371                        "Convergence taking longer than expected: {}ms > {}ms",
372                        latest_metrics.total_computation_time_ms,
373                        self.alert_thresholds.max_convergence_time_ms
374                    ),
375                    timestamp: SystemTime::now(),
376                    worker_id: None,
377                    metric_value: latest_metrics.total_computation_time_ms as f64,
378                    threshold: self.alert_thresholds.max_convergence_time_ms as f64,
379                });
380            }
381
382            // Check worker efficiency
383            if latest_metrics.worker_efficiency < self.alert_thresholds.min_worker_efficiency {
384                alerts.push(PerformanceAlert {
385                    alert_type: AlertType::LowThroughput,
386                    severity: AlertSeverity::Warning,
387                    message: format!(
388                        "Worker efficiency below threshold: {:.2} < {:.2}",
389                        latest_metrics.worker_efficiency,
390                        self.alert_thresholds.min_worker_efficiency
391                    ),
392                    timestamp: SystemTime::now(),
393                    worker_id: None,
394                    metric_value: latest_metrics.worker_efficiency,
395                    threshold: self.alert_thresholds.min_worker_efficiency,
396                });
397            }
398
399            // Check message latency
400            if latest_metrics.message_latency_ms > self.alert_thresholds.max_message_latency_ms {
401                alerts.push(PerformanceAlert {
402                    alert_type: AlertType::HighLatency,
403                    severity: AlertSeverity::Warning,
404                    message: format!(
405                        "High message latency detected: {:.2}ms > {:.2}ms",
406                        latest_metrics.message_latency_ms,
407                        self.alert_thresholds.max_message_latency_ms
408                    ),
409                    timestamp: SystemTime::now(),
410                    worker_id: None,
411                    metric_value: latest_metrics.message_latency_ms,
412                    threshold: self.alert_thresholds.max_message_latency_ms,
413                });
414            }
415        }
416
417        // Check resource usage
418        let resource_usage = self.resource_usage.lock().map_err(|_| {
419            ClusteringError::InvalidInput("Failed to acquire resource usage lock".to_string())
420        })?;
421
422        if let Some(latest_usage) = resource_usage.back() {
423            // Check CPU utilization
424            if latest_usage.cpu_utilization > self.alert_thresholds.max_cpu_utilization {
425                alerts.push(PerformanceAlert {
426                    alert_type: AlertType::HighCpuUsage,
427                    severity: AlertSeverity::Critical,
428                    message: format!(
429                        "High CPU utilization: {:.1}% > {:.1}%",
430                        latest_usage.cpu_utilization * 100.0,
431                        self.alert_thresholds.max_cpu_utilization * 100.0
432                    ),
433                    timestamp: SystemTime::now(),
434                    worker_id: None,
435                    metric_value: latest_usage.cpu_utilization,
436                    threshold: self.alert_thresholds.max_cpu_utilization,
437                });
438            }
439
440            // Check memory utilization
441            if latest_usage.memory_utilization > self.alert_thresholds.max_memory_utilization {
442                alerts.push(PerformanceAlert {
443                    alert_type: AlertType::HighMemoryUsage,
444                    severity: AlertSeverity::Critical,
445                    message: format!(
446                        "High memory utilization: {:.1}% > {:.1}%",
447                        latest_usage.memory_utilization * 100.0,
448                        self.alert_thresholds.max_memory_utilization * 100.0
449                    ),
450                    timestamp: SystemTime::now(),
451                    worker_id: None,
452                    metric_value: latest_usage.memory_utilization,
453                    threshold: self.alert_thresholds.max_memory_utilization,
454                });
455            }
456
457            // Check for failed workers
458            if latest_usage.failed_workers > 0 {
459                alerts.push(PerformanceAlert {
460                    alert_type: AlertType::WorkerFailure,
461                    severity: AlertSeverity::Critical,
462                    message: format!("{} worker(s) have failed", latest_usage.failed_workers),
463                    timestamp: SystemTime::now(),
464                    worker_id: None,
465                    metric_value: latest_usage.failed_workers as f64,
466                    threshold: 0.0,
467                });
468            }
469        }
470
471        // Check individual worker metrics
472        for (worker_id, metrics) in &self.worker_metrics {
473            if metrics.health_score < 0.5 {
474                alerts.push(PerformanceAlert {
475                    alert_type: AlertType::AnomalyDetected,
476                    severity: AlertSeverity::Warning,
477                    message: format!(
478                        "Worker {} health score is low: {:.2}",
479                        worker_id, metrics.health_score
480                    ),
481                    timestamp: SystemTime::now(),
482                    worker_id: Some(*worker_id),
483                    metric_value: metrics.health_score,
484                    threshold: 0.5,
485                });
486            }
487        }
488
489        Ok(alerts)
490    }
491
492    /// Perform comprehensive system analysis
493    pub fn analyze_system_efficiency(&self) -> Result<EfficiencyAnalysis> {
494        let metrics_history = self.metrics_history.lock().map_err(|_| {
495            ClusteringError::InvalidInput("Failed to acquire metrics lock".to_string())
496        })?;
497
498        let resource_usage = self.resource_usage.lock().map_err(|_| {
499            ClusteringError::InvalidInput("Failed to acquire resource usage lock".to_string())
500        })?;
501
502        // Calculate overall efficiency
503        let overall_efficiency = if !metrics_history.is_empty() {
504            let recent_metrics: Vec<_> = metrics_history.iter().rev().take(10).collect();
505            let avg_efficiency = recent_metrics
506                .iter()
507                .map(|m| m.worker_efficiency)
508                .sum::<f64>()
509                / recent_metrics.len() as f64;
510            avg_efficiency
511        } else {
512            0.0
513        };
514
515        // Perform bottleneck analysis
516        let bottleneck_analysis = self.analyze_bottlenecks(&metrics_history, &resource_usage);
517
518        // Calculate resource utilization
519        let mut resource_utilization = HashMap::new();
520        if let Some(latest_usage) = resource_usage.back() {
521            resource_utilization.insert("cpu".to_string(), latest_usage.cpu_utilization);
522            resource_utilization.insert("memory".to_string(), latest_usage.memory_utilization);
523            resource_utilization.insert(
524                "network".to_string(),
525                latest_usage.network_throughput_mbps / 1000.0,
526            );
527            resource_utilization.insert("disk".to_string(), latest_usage.disk_io_rate);
528        }
529
530        // Analyze performance trends
531        let performance_trends = self.analyze_trends(&metrics_history);
532
533        // Generate optimization recommendations
534        let optimization_recommendations = self.generate_recommendations(
535            &bottleneck_analysis,
536            &performance_trends,
537            overall_efficiency,
538        );
539
540        Ok(EfficiencyAnalysis {
541            overall_efficiency,
542            bottleneck_analysis,
543            resource_utilization,
544            performance_trends,
545            optimization_recommendations,
546        })
547    }
548
549    /// Analyze system bottlenecks
550    fn analyze_bottlenecks(
551        &self,
552        metrics_history: &VecDeque<PerformanceMetrics>,
553        resource_usage: &VecDeque<ResourceUsage>,
554    ) -> BottleneckAnalysis {
555        let mut bottleneck_scores = HashMap::new();
556        bottleneck_scores.insert(BottleneckType::Cpu, 0.0);
557        bottleneck_scores.insert(BottleneckType::Memory, 0.0);
558        bottleneck_scores.insert(BottleneckType::Network, 0.0);
559        bottleneck_scores.insert(BottleneckType::Synchronization, 0.0);
560        bottleneck_scores.insert(BottleneckType::LoadImbalance, 0.0);
561        bottleneck_scores.insert(BottleneckType::MessagePassing, 0.0);
562
563        // Analyze resource usage patterns
564        if !resource_usage.is_empty() {
565            let recent_usage: Vec<_> = resource_usage.iter().rev().take(10).collect();
566
567            let avg_cpu = recent_usage.iter().map(|u| u.cpu_utilization).sum::<f64>()
568                / recent_usage.len() as f64;
569            let avg_memory = recent_usage
570                .iter()
571                .map(|u| u.memory_utilization)
572                .sum::<f64>()
573                / recent_usage.len() as f64;
574            let avg_network = recent_usage
575                .iter()
576                .map(|u| u.network_throughput_mbps)
577                .sum::<f64>()
578                / recent_usage.len() as f64;
579
580            bottleneck_scores.insert(BottleneckType::Cpu, avg_cpu);
581            bottleneck_scores.insert(BottleneckType::Memory, avg_memory);
582            bottleneck_scores.insert(BottleneckType::Network, avg_network / 1000.0);
583            // Normalize
584        }
585
586        // Analyze performance metrics patterns
587        if !metrics_history.is_empty() {
588            let recent_metrics: Vec<_> = metrics_history.iter().rev().take(10).collect();
589
590            let avg_sync_overhead = recent_metrics
591                .iter()
592                .map(|m| m.sync_overhead_ms)
593                .sum::<f64>()
594                / recent_metrics.len() as f64;
595            let avg_message_latency = recent_metrics
596                .iter()
597                .map(|m| m.message_latency_ms)
598                .sum::<f64>()
599                / recent_metrics.len() as f64;
600            let avg_load_balance = recent_metrics
601                .iter()
602                .map(|m| m.load_balance_score)
603                .sum::<f64>()
604                / recent_metrics.len() as f64;
605
606            bottleneck_scores.insert(BottleneckType::Synchronization, avg_sync_overhead / 1000.0); // Normalize
607            bottleneck_scores.insert(BottleneckType::MessagePassing, avg_message_latency / 1000.0); // Normalize
608            bottleneck_scores.insert(BottleneckType::LoadImbalance, 1.0 - avg_load_balance);
609            // Invert score
610        }
611
612        // Find primary bottleneck
613        let (primary_bottleneck, bottleneck_severity) = bottleneck_scores
614            .iter()
615            .max_by(|a, b| a.1.partial_cmp(b.1).unwrap_or(std::cmp::Ordering::Equal))
616            .map(|(bottleneck, &severity)| (bottleneck.clone(), severity))
617            .unwrap_or((BottleneckType::None, 0.0));
618
619        // Identify affected workers (simplified)
620        let affected_workers: Vec<usize> = self
621            .worker_metrics
622            .iter()
623            .filter(|(_, metrics)| metrics.health_score < 0.7)
624            .map(|(&id, _)| id)
625            .collect();
626
627        let estimated_impact = bottleneck_severity * 0.5; // Simplified impact calculation
628
629        BottleneckAnalysis {
630            primary_bottleneck,
631            bottleneck_severity,
632            affected_workers,
633            estimated_impact,
634        }
635    }
636
637    /// Analyze performance trends
638    fn analyze_trends(&self, metricshistory: &VecDeque<PerformanceMetrics>) -> PerformanceTrends {
639        if metricshistory.len() < 5 {
640            return PerformanceTrends {
641                throughput_trend: TrendDirection::Unknown,
642                latency_trend: TrendDirection::Unknown,
643                efficiency_trend: TrendDirection::Unknown,
644                resource_trend: TrendDirection::Unknown,
645                trend_confidence: 0.0,
646            };
647        }
648
649        let recent_metrics: Vec<_> = metricshistory.iter().rev().take(10).collect();
650        let older_metrics: Vec<_> = metricshistory.iter().rev().skip(5).take(10).collect();
651
652        // Calculate trend for worker efficiency
653        let recent_efficiency = recent_metrics
654            .iter()
655            .map(|m| m.worker_efficiency)
656            .sum::<f64>()
657            / recent_metrics.len() as f64;
658        let older_efficiency = if !older_metrics.is_empty() {
659            older_metrics
660                .iter()
661                .map(|m| m.worker_efficiency)
662                .sum::<f64>()
663                / older_metrics.len() as f64
664        } else {
665            recent_efficiency
666        };
667
668        let efficiency_trend = if (recent_efficiency - older_efficiency).abs() < 0.05 {
669            TrendDirection::Stable
670        } else if recent_efficiency > older_efficiency {
671            TrendDirection::Improving
672        } else {
673            TrendDirection::Degrading
674        };
675
676        // Calculate trend for message latency
677        let recent_latency = recent_metrics
678            .iter()
679            .map(|m| m.message_latency_ms)
680            .sum::<f64>()
681            / recent_metrics.len() as f64;
682        let older_latency = if !older_metrics.is_empty() {
683            older_metrics
684                .iter()
685                .map(|m| m.message_latency_ms)
686                .sum::<f64>()
687                / older_metrics.len() as f64
688        } else {
689            recent_latency
690        };
691
692        let latency_trend = if (recent_latency - older_latency).abs() < 50.0 {
693            TrendDirection::Stable
694        } else if recent_latency < older_latency {
695            TrendDirection::Improving
696        } else {
697            TrendDirection::Degrading
698        };
699
700        // Simplified trends for other metrics
701        let throughput_trend = efficiency_trend;
702        let resource_trend = TrendDirection::Stable;
703
704        let trend_confidence = if recent_metrics.len() >= 10 { 0.8 } else { 0.4 };
705
706        PerformanceTrends {
707            throughput_trend,
708            latency_trend,
709            efficiency_trend,
710            resource_trend,
711            trend_confidence,
712        }
713    }
714
715    /// Generate optimization recommendations
716    fn generate_recommendations(
717        &self,
718        bottleneck_analysis: &BottleneckAnalysis,
719        performance_trends: &PerformanceTrends,
720        overall_efficiency: f64,
721    ) -> Vec<String> {
722        let mut recommendations = Vec::new();
723
724        // Bottleneck-based recommendations
725        match bottleneck_analysis.primary_bottleneck {
726            BottleneckType::Cpu => {
727                recommendations.push(
728                    "Consider adding more CPU cores or reducing computational load per worker"
729                        .to_string(),
730                );
731                recommendations
732                    .push("Optimize algorithms to reduce CPU-intensive operations".to_string());
733            }
734            BottleneckType::Memory => {
735                recommendations.push(
736                    "Increase memory allocation or implement more efficient memory management"
737                        .to_string(),
738                );
739                recommendations
740                    .push("Consider data compression or streaming techniques".to_string());
741            }
742            BottleneckType::Network => {
743                recommendations.push("Optimize network communication patterns".to_string());
744                recommendations.push("Consider message batching or compression".to_string());
745            }
746            BottleneckType::Synchronization => {
747                recommendations.push(
748                    "Reduce synchronization frequency or implement asynchronous patterns"
749                        .to_string(),
750                );
751                recommendations
752                    .push("Consider lockless data structures where possible".to_string());
753            }
754            BottleneckType::LoadImbalance => {
755                recommendations.push("Implement dynamic load balancing".to_string());
756                recommendations.push("Review data partitioning strategy".to_string());
757            }
758            BottleneckType::MessagePassing => {
759                recommendations.push("Optimize message passing protocols".to_string());
760                recommendations.push("Reduce message size or frequency".to_string());
761            }
762            _ => {}
763        }
764
765        // Trend-based recommendations
766        match performance_trends.efficiency_trend {
767            TrendDirection::Degrading => {
768                recommendations
769                    .push("Performance is degrading - investigate recent changes".to_string());
770                recommendations
771                    .push("Consider scaling up resources or optimizing algorithms".to_string());
772            }
773            TrendDirection::Stable => {
774                if overall_efficiency < 0.7 {
775                    recommendations.push(
776                        "Performance is stable but suboptimal - consider optimization".to_string(),
777                    );
778                }
779            }
780            _ => {}
781        }
782
783        // Overall efficiency recommendations
784        if overall_efficiency < 0.5 {
785            recommendations.push(
786                "Overall efficiency is very low - comprehensive system review needed".to_string(),
787            );
788        } else if overall_efficiency < 0.7 {
789            recommendations
790                .push("Moderate efficiency - targeted optimizations recommended".to_string());
791        }
792
793        // Worker-specific recommendations
794        let unhealthy_workers = self
795            .worker_metrics
796            .iter()
797            .filter(|(_, metrics)| metrics.health_score < 0.6)
798            .count();
799
800        if unhealthy_workers > 0 {
801            recommendations.push(format!(
802                "{} workers are performing poorly - investigate individual worker issues",
803                unhealthy_workers
804            ));
805        }
806
807        if recommendations.is_empty() {
808            recommendations
809                .push("System performance is optimal - no immediate action required".to_string());
810        }
811
812        recommendations
813    }
814
815    /// Generate monitoring report
816    pub fn generate_report(&self) -> MonitoringReport {
817        let mut report = MonitoringReport::default();
818
819        // Calculate averages from recent history
820        let metrics_history = self.metrics_history.lock().unwrap();
821        let resource_usage = self.resource_usage.lock().unwrap();
822
823        if !metrics_history.is_empty() {
824            let recent_metrics: Vec<_> = metrics_history.iter().rev().take(10).collect();
825
826            report.avg_convergence_rate = recent_metrics
827                .iter()
828                .map(|m| m.convergence_rate)
829                .sum::<f64>()
830                / recent_metrics.len() as f64;
831
832            report.avg_worker_efficiency = recent_metrics
833                .iter()
834                .map(|m| m.worker_efficiency)
835                .sum::<f64>()
836                / recent_metrics.len() as f64;
837
838            report.avg_sync_overhead = recent_metrics
839                .iter()
840                .map(|m| m.sync_overhead_ms)
841                .sum::<f64>()
842                / recent_metrics.len() as f64;
843        }
844
845        if !resource_usage.is_empty() {
846            let recent_usage: Vec<_> = resource_usage.iter().rev().take(10).collect();
847
848            report.avg_cpu_utilization =
849                recent_usage.iter().map(|r| r.cpu_utilization).sum::<f64>()
850                    / recent_usage.len() as f64;
851
852            report.avg_memory_utilization = recent_usage
853                .iter()
854                .map(|r| r.memory_utilization)
855                .sum::<f64>()
856                / recent_usage.len() as f64;
857
858            report.peak_network_throughput = recent_usage
859                .iter()
860                .map(|r| r.network_throughput_mbps)
861                .fold(0.0, f64::max);
862        }
863
864        // Calculate efficiency scores
865        report.overall_efficiency_score = self.calculate_efficiency_score();
866        report.recommendations = self.generate_optimization_recommendations();
867
868        report
869    }
870
871    /// Calculate overall system efficiency score
872    fn calculate_efficiency_score(&self) -> f64 {
873        let metrics_history = self.metrics_history.lock().unwrap();
874        let resource_usage = self.resource_usage.lock().unwrap();
875
876        if metrics_history.is_empty() || resource_usage.is_empty() {
877            return 0.0;
878        }
879
880        // Weighted efficiency calculation
881        let convergence_score = metrics_history
882            .iter()
883            .map(|m| m.convergence_rate.min(1.0))
884            .sum::<f64>()
885            / metrics_history.len() as f64;
886
887        let worker_score = metrics_history
888            .iter()
889            .map(|m| m.worker_efficiency)
890            .sum::<f64>()
891            / metrics_history.len() as f64;
892
893        let resource_score = 1.0
894            - (resource_usage
895                .iter()
896                .map(|r| r.memory_utilization.max(r.cpu_utilization))
897                .sum::<f64>()
898                / resource_usage.len() as f64);
899
900        // Weighted average: 40% convergence, 40% worker efficiency, 20% resource usage
901        convergence_score * 0.4 + worker_score * 0.4 + resource_score * 0.2
902    }
903
904    /// Generate optimization recommendations
905    fn generate_optimization_recommendations(&self) -> Vec<String> {
906        let mut recommendations = Vec::new();
907        let metrics_history = self.metrics_history.lock().unwrap();
908        let resource_usage = self.resource_usage.lock().unwrap();
909
910        if let Some(latest_metrics) = metrics_history.back() {
911            if latest_metrics.worker_efficiency < 0.7 {
912                recommendations
913                    .push("Consider load rebalancing - worker efficiency is low".to_string());
914            }
915
916            if latest_metrics.sync_overhead_ms > 1000.0 {
917                recommendations.push(
918                    "High synchronization overhead - consider reducing coordination frequency"
919                        .to_string(),
920                );
921            }
922
923            if latest_metrics.message_latency_ms > 500.0 {
924                recommendations
925                    .push("High message latency - check network configuration".to_string());
926            }
927        }
928
929        if let Some(latest_resources) = resource_usage.back() {
930            if latest_resources.memory_utilization > 0.8 {
931                recommendations.push(
932                    "High memory usage - consider increasing workers or reducing batch size"
933                        .to_string(),
934                );
935            }
936
937            if latest_resources.failed_workers > 0 {
938                recommendations.push(
939                    "Worker failures detected - check fault tolerance configuration".to_string(),
940                );
941            }
942
943            if latest_resources.queue_depth > 100 {
944                recommendations.push(
945                    "High message queue depth - consider increasing processing capacity"
946                        .to_string(),
947                );
948            }
949        }
950
951        if recommendations.is_empty() {
952            recommendations.push("System performance is optimal".to_string());
953        }
954
955        recommendations
956    }
957
958    /// Export metrics for external analysis
959    pub fn export_metrics_csv(&self, filepath: &str) -> Result<()> {
960        use std::fs::File;
961        use std::io::Write;
962
963        let mut file = File::create(filepath)
964            .map_err(|e| ClusteringError::InvalidInput(format!("Failed to create file: {}", e)))?;
965
966        // Write CSV header
967        writeln!(file, "timestamp,iteration,global_inertia,convergence_rate,worker_efficiency,message_latency_ms,sync_overhead_ms,memory_pressure")
968            .map_err(|e| ClusteringError::InvalidInput(format!("Failed to write header: {}", e)))?;
969
970        // Write metrics data
971        let metrics_history = self.metrics_history.lock().unwrap();
972        for metrics in metrics_history.iter() {
973            writeln!(
974                file,
975                "{:?},{},{},{},{},{},{},{}",
976                metrics.timestamp,
977                metrics.iteration,
978                metrics.global_inertia,
979                metrics.convergence_rate,
980                metrics.worker_efficiency,
981                metrics.message_latency_ms,
982                metrics.sync_overhead_ms,
983                metrics.memory_pressure_score
984            )
985            .map_err(|e| ClusteringError::InvalidInput(format!("Failed to write data: {}", e)))?;
986        }
987
988        Ok(())
989    }
990
991    /// Get current worker metrics
992    pub fn get_worker_metrics(&self) -> &HashMap<usize, WorkerMetrics> {
993        &self.worker_metrics
994    }
995
996    /// Get monitoring configuration
997    pub fn get_config(&self) -> &MonitoringConfig {
998        &self.config
999    }
1000
1001    /// Get system uptime
1002    pub fn get_uptime(&self) -> Duration {
1003        self.start_time.elapsed()
1004    }
1005}
1006
1007/// Comprehensive monitoring report
1008#[derive(Debug, Default)]
1009pub struct MonitoringReport {
1010    pub avg_convergence_rate: f64,
1011    pub avg_worker_efficiency: f64,
1012    pub avg_sync_overhead: f64,
1013    pub avg_cpu_utilization: f64,
1014    pub avg_memory_utilization: f64,
1015    pub peak_network_throughput: f64,
1016    pub overall_efficiency_score: f64,
1017    pub recommendations: Vec<String>,
1018}
1019
1020#[cfg(test)]
1021mod tests {
1022    use super::*;
1023
1024    #[test]
1025    fn test_performance_monitor_creation() {
1026        let config = MonitoringConfig::default();
1027        let monitor = PerformanceMonitor::new(config);
1028
1029        assert!(monitor.worker_metrics.is_empty());
1030        assert!(monitor.metrics_history.lock().unwrap().is_empty());
1031    }
1032
1033    #[test]
1034    fn test_worker_registration() {
1035        let config = MonitoringConfig::default();
1036        let mut monitor = PerformanceMonitor::new(config);
1037
1038        monitor.register_worker(1);
1039        assert!(monitor.worker_metrics.contains_key(&1));
1040        assert_eq!(monitor.worker_metrics[&1].worker_id, 1);
1041    }
1042
1043    #[test]
1044    fn test_performance_metrics_recording() {
1045        let config = MonitoringConfig::default();
1046        let monitor = PerformanceMonitor::new(config);
1047
1048        let metrics = PerformanceMetrics {
1049            timestamp: SystemTime::now(),
1050            iteration: 1,
1051            global_inertia: 100.0,
1052            convergence_rate: 0.8,
1053            worker_efficiency: 0.9,
1054            message_latency_ms: 50.0,
1055            sync_overhead_ms: 100.0,
1056            total_computation_time_ms: 5000,
1057            memory_pressure_score: 0.6,
1058            load_balance_score: 0.8,
1059            network_utilization: 0.5,
1060        };
1061
1062        let result = monitor.record_performance_metrics(metrics);
1063        assert!(result.is_ok());
1064        assert_eq!(monitor.metrics_history.lock().unwrap().len(), 1);
1065    }
1066
1067    #[test]
1068    fn test_worker_health_score_calculation() {
1069        let config = MonitoringConfig::default();
1070        let monitor = PerformanceMonitor::new(config);
1071
1072        let mut metrics = WorkerMetrics {
1073            worker_id: 1,
1074            cpu_usage_history: VecDeque::from(vec![0.5, 0.6, 0.4]),
1075            memory_usage_history: VecDeque::from(vec![0.3, 0.4, 0.2]),
1076            throughput_history: VecDeque::new(),
1077            latency_history: VecDeque::from(vec![100.0, 150.0, 80.0]),
1078            error_count: 0,
1079            last_update: SystemTime::now(),
1080            health_score: 0.0,
1081        };
1082
1083        let score = monitor.calculate_worker_health_score(&metrics);
1084        assert!(score > 0.5 && score <= 1.0);
1085
1086        // Test with high resource usage
1087        metrics.cpu_usage_history = VecDeque::from(vec![0.95, 0.98, 0.92]);
1088        metrics.memory_usage_history = VecDeque::from(vec![0.9, 0.95, 0.88]);
1089
1090        let degraded_score = monitor.calculate_worker_health_score(&metrics);
1091        assert!(degraded_score < score);
1092    }
1093
1094    #[test]
1095    fn test_alert_generation() {
1096        let config = MonitoringConfig::default();
1097        let monitor = PerformanceMonitor::new(config);
1098
1099        // Record metrics that should trigger alerts
1100        let metrics = PerformanceMetrics {
1101            timestamp: SystemTime::now(),
1102            iteration: 1,
1103            global_inertia: 100.0,
1104            convergence_rate: 0.1,      // Low convergence
1105            worker_efficiency: 0.3,     // Low efficiency
1106            message_latency_ms: 2000.0, // High latency
1107            sync_overhead_ms: 100.0,
1108            total_computation_time_ms: 400000, // Long computation time
1109            memory_pressure_score: 0.6,
1110            load_balance_score: 0.8,
1111            network_utilization: 0.5,
1112        };
1113
1114        monitor.record_performance_metrics(metrics).unwrap();
1115
1116        let alerts = monitor.check_alerts().unwrap();
1117        assert!(!alerts.is_empty());
1118
1119        // Check if we got expected alert types
1120        let alert_types: Vec<_> = alerts.iter().map(|a| &a.alert_type).collect();
1121        assert!(alert_types
1122            .iter()
1123            .any(|t| matches!(t, AlertType::ConvergenceTimeout)));
1124        assert!(alert_types
1125            .iter()
1126            .any(|t| matches!(t, AlertType::LowThroughput)));
1127    }
1128}