1use std::collections::{HashMap, VecDeque};
7use std::sync::{Arc, Mutex};
8use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
9
10use crate::error::{ClusteringError, Result};
11
12#[derive(Debug)]
14pub struct PerformanceMonitor {
15 pub metrics_history: Arc<Mutex<VecDeque<PerformanceMetrics>>>,
16 pub resource_usage: Arc<Mutex<VecDeque<ResourceUsage>>>,
17 pub worker_metrics: HashMap<usize, WorkerMetrics>,
18 pub config: MonitoringConfig,
19 pub alert_thresholds: AlertThresholds,
20 pub start_time: Instant,
21}
22
23#[derive(Debug, Clone)]
25pub struct MonitoringConfig {
26 pub enable_detailed_monitoring: bool,
27 pub metrics_collection_interval_ms: u64,
28 pub max_history_size: usize,
29 pub enable_resource_monitoring: bool,
30 pub enable_network_monitoring: bool,
31 pub enable_predictive_analytics: bool,
32 pub export_metrics: bool,
33 pub alert_on_anomalies: bool,
34}
35
36impl Default for MonitoringConfig {
37 fn default() -> Self {
38 Self {
39 enable_detailed_monitoring: true,
40 metrics_collection_interval_ms: 1000,
41 max_history_size: 1000,
42 enable_resource_monitoring: true,
43 enable_network_monitoring: false,
44 enable_predictive_analytics: false,
45 export_metrics: false,
46 alert_on_anomalies: true,
47 }
48 }
49}
50
51#[derive(Debug, Clone)]
53pub struct AlertThresholds {
54 pub max_convergence_time_ms: u64,
55 pub min_worker_efficiency: f64,
56 pub max_memory_utilization: f64,
57 pub max_cpu_utilization: f64,
58 pub max_message_latency_ms: f64,
59 pub max_sync_overhead_ms: f64,
60 pub min_throughput_threshold: f64,
61}
62
63impl Default for AlertThresholds {
64 fn default() -> Self {
65 Self {
66 max_convergence_time_ms: 300000, min_worker_efficiency: 0.6,
68 max_memory_utilization: 0.9,
69 max_cpu_utilization: 0.95,
70 max_message_latency_ms: 1000.0,
71 max_sync_overhead_ms: 5000.0,
72 min_throughput_threshold: 10.0,
73 }
74 }
75}
76
77#[derive(Debug, Clone)]
79pub struct PerformanceMetrics {
80 pub timestamp: SystemTime,
81 pub iteration: usize,
82 pub global_inertia: f64,
83 pub convergence_rate: f64,
84 pub worker_efficiency: f64,
85 pub message_latency_ms: f64,
86 pub sync_overhead_ms: f64,
87 pub total_computation_time_ms: u64,
88 pub memory_pressure_score: f64,
89 pub load_balance_score: f64,
90 pub network_utilization: f64,
91}
92
93#[derive(Debug, Clone)]
95pub struct ResourceUsage {
96 pub timestamp: SystemTime,
97 pub cpu_utilization: f64,
98 pub memory_utilization: f64,
99 pub network_throughput_mbps: f64,
100 pub disk_io_rate: f64,
101 pub active_workers: usize,
102 pub failed_workers: usize,
103 pub queue_depth: usize,
104 pub cache_hit_ratio: f64,
105}
106
107#[derive(Debug, Clone)]
109pub struct WorkerMetrics {
110 pub worker_id: usize,
111 pub cpu_usage_history: VecDeque<f64>,
112 pub memory_usage_history: VecDeque<f64>,
113 pub throughput_history: VecDeque<f64>,
114 pub latency_history: VecDeque<f64>,
115 pub error_count: usize,
116 pub last_update: SystemTime,
117 pub health_score: f64,
118}
119
120#[derive(Debug, Clone)]
122pub struct PerformanceAlert {
123 pub alert_type: AlertType,
124 pub severity: AlertSeverity,
125 pub message: String,
126 pub timestamp: SystemTime,
127 pub worker_id: Option<usize>,
128 pub metric_value: f64,
129 pub threshold: f64,
130}
131
132#[derive(Debug, Clone)]
134pub enum AlertType {
135 HighCpuUsage,
136 HighMemoryUsage,
137 HighLatency,
138 LowThroughput,
139 WorkerFailure,
140 ConvergenceTimeout,
141 LoadImbalance,
142 NetworkCongestion,
143 ResourceExhaustion,
144 AnomalyDetected,
145}
146
147#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
149pub enum AlertSeverity {
150 Critical,
151 Warning,
152 Info,
153}
154
155#[derive(Debug, Clone)]
157pub struct EfficiencyAnalysis {
158 pub overall_efficiency: f64,
159 pub bottleneck_analysis: BottleneckAnalysis,
160 pub resource_utilization: HashMap<String, f64>,
161 pub performance_trends: PerformanceTrends,
162 pub optimization_recommendations: Vec<String>,
163}
164
165#[derive(Debug, Clone)]
167pub struct BottleneckAnalysis {
168 pub primary_bottleneck: BottleneckType,
169 pub bottleneck_severity: f64,
170 pub affected_workers: Vec<usize>,
171 pub estimated_impact: f64,
172}
173
174#[derive(Debug, Clone, PartialEq, Eq, Hash)]
176pub enum BottleneckType {
177 Cpu,
178 Memory,
179 Network,
180 Disk,
181 Synchronization,
182 LoadImbalance,
183 MessagePassing,
184 None,
185}
186
187#[derive(Debug, Clone)]
189pub struct PerformanceTrends {
190 pub throughput_trend: TrendDirection,
191 pub latency_trend: TrendDirection,
192 pub efficiency_trend: TrendDirection,
193 pub resource_trend: TrendDirection,
194 pub trend_confidence: f64,
195}
196
197#[derive(Debug, Clone, Copy)]
199pub enum TrendDirection {
200 Improving,
201 Stable,
202 Degrading,
203 Unknown,
204}
205
206impl PerformanceMonitor {
207 pub fn new(config: MonitoringConfig) -> Self {
209 Self {
210 metrics_history: Arc::new(Mutex::new(VecDeque::new())),
211 resource_usage: Arc::new(Mutex::new(VecDeque::new())),
212 worker_metrics: HashMap::new(),
213 config,
214 alert_thresholds: AlertThresholds::default(),
215 start_time: Instant::now(),
216 }
217 }
218
219 pub fn register_worker(&mut self, workerid: usize) {
221 let worker_metrics = WorkerMetrics {
222 worker_id: workerid,
223 cpu_usage_history: VecDeque::new(),
224 memory_usage_history: VecDeque::new(),
225 throughput_history: VecDeque::new(),
226 latency_history: VecDeque::new(),
227 error_count: 0,
228 last_update: SystemTime::now(),
229 health_score: 1.0,
230 };
231
232 self.worker_metrics.insert(workerid, worker_metrics);
233 }
234
235 pub fn record_performance_metrics(&self, metrics: PerformanceMetrics) -> Result<()> {
237 let mut history = self.metrics_history.lock().map_err(|_| {
238 ClusteringError::InvalidInput("Failed to acquire metrics lock".to_string())
239 })?;
240
241 history.push_back(metrics);
242
243 while history.len() > self.config.max_history_size {
245 history.pop_front();
246 }
247
248 Ok(())
249 }
250
251 pub fn record_resource_usage(&self, usage: ResourceUsage) -> Result<()> {
253 if !self.config.enable_resource_monitoring {
254 return Ok(());
255 }
256
257 let mut usage_history = self.resource_usage.lock().map_err(|_| {
258 ClusteringError::InvalidInput("Failed to acquire resource usage lock".to_string())
259 })?;
260
261 usage_history.push_back(usage);
262
263 while usage_history.len() > self.config.max_history_size {
265 usage_history.pop_front();
266 }
267
268 Ok(())
269 }
270
271 pub fn update_worker_metrics(
273 &mut self,
274 worker_id: usize,
275 cpu_usage: f64,
276 memory_usage: f64,
277 throughput: f64,
278 latency: f64,
279 ) -> Result<()> {
280 if let Some(metrics) = self.worker_metrics.get_mut(&worker_id) {
281 metrics.cpu_usage_history.push_back(cpu_usage);
282 metrics.memory_usage_history.push_back(memory_usage);
283 metrics.throughput_history.push_back(throughput);
284 metrics.latency_history.push_back(latency);
285 metrics.last_update = SystemTime::now();
286
287 let max_size = 100;
289 if metrics.cpu_usage_history.len() > max_size {
290 metrics.cpu_usage_history.pop_front();
291 }
292 if metrics.memory_usage_history.len() > max_size {
293 metrics.memory_usage_history.pop_front();
294 }
295 if metrics.throughput_history.len() > max_size {
296 metrics.throughput_history.pop_front();
297 }
298 if metrics.latency_history.len() > max_size {
299 metrics.latency_history.pop_front();
300 }
301 }
302
303 if let Some(metrics) = self.worker_metrics.get(&worker_id) {
305 let health_score = self.calculate_worker_health_score(metrics);
306 if let Some(metrics_mut) = self.worker_metrics.get_mut(&worker_id) {
307 metrics_mut.health_score = health_score;
308 }
309 }
310
311 Ok(())
312 }
313
314 fn calculate_worker_health_score(&self, metrics: &WorkerMetrics) -> f64 {
316 let mut score = 1.0;
317
318 if !metrics.cpu_usage_history.is_empty() {
320 let avg_cpu = metrics.cpu_usage_history.iter().sum::<f64>()
321 / metrics.cpu_usage_history.len() as f64;
322 score *= (1.0 - (avg_cpu - 0.8).max(0.0) * 2.0).max(0.0);
323 }
324
325 if !metrics.memory_usage_history.is_empty() {
327 let avg_memory = metrics.memory_usage_history.iter().sum::<f64>()
328 / metrics.memory_usage_history.len() as f64;
329 score *= (1.0 - (avg_memory - 0.85).max(0.0) * 3.0).max(0.0);
330 }
331
332 if !metrics.latency_history.is_empty() {
334 let avg_latency =
335 metrics.latency_history.iter().sum::<f64>() / metrics.latency_history.len() as f64;
336 let latency_penalty = (avg_latency / 1000.0).min(1.0) * 0.3;
337 score *= (1.0 - latency_penalty).max(0.0);
338 }
339
340 let time_window_hours = 1.0; let error_rate = metrics.error_count as f64 / time_window_hours;
343 let error_penalty = (error_rate / 10.0).min(0.5); score *= (1.0 - error_penalty).max(0.0);
345
346 score.max(0.0).min(1.0)
347 }
348
349 pub fn check_alerts(&self) -> Result<Vec<PerformanceAlert>> {
351 if !self.config.alert_on_anomalies {
352 return Ok(Vec::new());
353 }
354
355 let mut alerts = Vec::new();
356
357 let metrics_history = self.metrics_history.lock().map_err(|_| {
359 ClusteringError::InvalidInput("Failed to acquire metrics lock".to_string())
360 })?;
361
362 if let Some(latest_metrics) = metrics_history.back() {
363 if latest_metrics.total_computation_time_ms
365 > self.alert_thresholds.max_convergence_time_ms
366 {
367 alerts.push(PerformanceAlert {
368 alert_type: AlertType::ConvergenceTimeout,
369 severity: AlertSeverity::Warning,
370 message: format!(
371 "Convergence taking longer than expected: {}ms > {}ms",
372 latest_metrics.total_computation_time_ms,
373 self.alert_thresholds.max_convergence_time_ms
374 ),
375 timestamp: SystemTime::now(),
376 worker_id: None,
377 metric_value: latest_metrics.total_computation_time_ms as f64,
378 threshold: self.alert_thresholds.max_convergence_time_ms as f64,
379 });
380 }
381
382 if latest_metrics.worker_efficiency < self.alert_thresholds.min_worker_efficiency {
384 alerts.push(PerformanceAlert {
385 alert_type: AlertType::LowThroughput,
386 severity: AlertSeverity::Warning,
387 message: format!(
388 "Worker efficiency below threshold: {:.2} < {:.2}",
389 latest_metrics.worker_efficiency,
390 self.alert_thresholds.min_worker_efficiency
391 ),
392 timestamp: SystemTime::now(),
393 worker_id: None,
394 metric_value: latest_metrics.worker_efficiency,
395 threshold: self.alert_thresholds.min_worker_efficiency,
396 });
397 }
398
399 if latest_metrics.message_latency_ms > self.alert_thresholds.max_message_latency_ms {
401 alerts.push(PerformanceAlert {
402 alert_type: AlertType::HighLatency,
403 severity: AlertSeverity::Warning,
404 message: format!(
405 "High message latency detected: {:.2}ms > {:.2}ms",
406 latest_metrics.message_latency_ms,
407 self.alert_thresholds.max_message_latency_ms
408 ),
409 timestamp: SystemTime::now(),
410 worker_id: None,
411 metric_value: latest_metrics.message_latency_ms,
412 threshold: self.alert_thresholds.max_message_latency_ms,
413 });
414 }
415 }
416
417 let resource_usage = self.resource_usage.lock().map_err(|_| {
419 ClusteringError::InvalidInput("Failed to acquire resource usage lock".to_string())
420 })?;
421
422 if let Some(latest_usage) = resource_usage.back() {
423 if latest_usage.cpu_utilization > self.alert_thresholds.max_cpu_utilization {
425 alerts.push(PerformanceAlert {
426 alert_type: AlertType::HighCpuUsage,
427 severity: AlertSeverity::Critical,
428 message: format!(
429 "High CPU utilization: {:.1}% > {:.1}%",
430 latest_usage.cpu_utilization * 100.0,
431 self.alert_thresholds.max_cpu_utilization * 100.0
432 ),
433 timestamp: SystemTime::now(),
434 worker_id: None,
435 metric_value: latest_usage.cpu_utilization,
436 threshold: self.alert_thresholds.max_cpu_utilization,
437 });
438 }
439
440 if latest_usage.memory_utilization > self.alert_thresholds.max_memory_utilization {
442 alerts.push(PerformanceAlert {
443 alert_type: AlertType::HighMemoryUsage,
444 severity: AlertSeverity::Critical,
445 message: format!(
446 "High memory utilization: {:.1}% > {:.1}%",
447 latest_usage.memory_utilization * 100.0,
448 self.alert_thresholds.max_memory_utilization * 100.0
449 ),
450 timestamp: SystemTime::now(),
451 worker_id: None,
452 metric_value: latest_usage.memory_utilization,
453 threshold: self.alert_thresholds.max_memory_utilization,
454 });
455 }
456
457 if latest_usage.failed_workers > 0 {
459 alerts.push(PerformanceAlert {
460 alert_type: AlertType::WorkerFailure,
461 severity: AlertSeverity::Critical,
462 message: format!("{} worker(s) have failed", latest_usage.failed_workers),
463 timestamp: SystemTime::now(),
464 worker_id: None,
465 metric_value: latest_usage.failed_workers as f64,
466 threshold: 0.0,
467 });
468 }
469 }
470
471 for (worker_id, metrics) in &self.worker_metrics {
473 if metrics.health_score < 0.5 {
474 alerts.push(PerformanceAlert {
475 alert_type: AlertType::AnomalyDetected,
476 severity: AlertSeverity::Warning,
477 message: format!(
478 "Worker {} health score is low: {:.2}",
479 worker_id, metrics.health_score
480 ),
481 timestamp: SystemTime::now(),
482 worker_id: Some(*worker_id),
483 metric_value: metrics.health_score,
484 threshold: 0.5,
485 });
486 }
487 }
488
489 Ok(alerts)
490 }
491
492 pub fn analyze_system_efficiency(&self) -> Result<EfficiencyAnalysis> {
494 let metrics_history = self.metrics_history.lock().map_err(|_| {
495 ClusteringError::InvalidInput("Failed to acquire metrics lock".to_string())
496 })?;
497
498 let resource_usage = self.resource_usage.lock().map_err(|_| {
499 ClusteringError::InvalidInput("Failed to acquire resource usage lock".to_string())
500 })?;
501
502 let overall_efficiency = if !metrics_history.is_empty() {
504 let recent_metrics: Vec<_> = metrics_history.iter().rev().take(10).collect();
505 let avg_efficiency = recent_metrics
506 .iter()
507 .map(|m| m.worker_efficiency)
508 .sum::<f64>()
509 / recent_metrics.len() as f64;
510 avg_efficiency
511 } else {
512 0.0
513 };
514
515 let bottleneck_analysis = self.analyze_bottlenecks(&metrics_history, &resource_usage);
517
518 let mut resource_utilization = HashMap::new();
520 if let Some(latest_usage) = resource_usage.back() {
521 resource_utilization.insert("cpu".to_string(), latest_usage.cpu_utilization);
522 resource_utilization.insert("memory".to_string(), latest_usage.memory_utilization);
523 resource_utilization.insert(
524 "network".to_string(),
525 latest_usage.network_throughput_mbps / 1000.0,
526 );
527 resource_utilization.insert("disk".to_string(), latest_usage.disk_io_rate);
528 }
529
530 let performance_trends = self.analyze_trends(&metrics_history);
532
533 let optimization_recommendations = self.generate_recommendations(
535 &bottleneck_analysis,
536 &performance_trends,
537 overall_efficiency,
538 );
539
540 Ok(EfficiencyAnalysis {
541 overall_efficiency,
542 bottleneck_analysis,
543 resource_utilization,
544 performance_trends,
545 optimization_recommendations,
546 })
547 }
548
549 fn analyze_bottlenecks(
551 &self,
552 metrics_history: &VecDeque<PerformanceMetrics>,
553 resource_usage: &VecDeque<ResourceUsage>,
554 ) -> BottleneckAnalysis {
555 let mut bottleneck_scores = HashMap::new();
556 bottleneck_scores.insert(BottleneckType::Cpu, 0.0);
557 bottleneck_scores.insert(BottleneckType::Memory, 0.0);
558 bottleneck_scores.insert(BottleneckType::Network, 0.0);
559 bottleneck_scores.insert(BottleneckType::Synchronization, 0.0);
560 bottleneck_scores.insert(BottleneckType::LoadImbalance, 0.0);
561 bottleneck_scores.insert(BottleneckType::MessagePassing, 0.0);
562
563 if !resource_usage.is_empty() {
565 let recent_usage: Vec<_> = resource_usage.iter().rev().take(10).collect();
566
567 let avg_cpu = recent_usage.iter().map(|u| u.cpu_utilization).sum::<f64>()
568 / recent_usage.len() as f64;
569 let avg_memory = recent_usage
570 .iter()
571 .map(|u| u.memory_utilization)
572 .sum::<f64>()
573 / recent_usage.len() as f64;
574 let avg_network = recent_usage
575 .iter()
576 .map(|u| u.network_throughput_mbps)
577 .sum::<f64>()
578 / recent_usage.len() as f64;
579
580 bottleneck_scores.insert(BottleneckType::Cpu, avg_cpu);
581 bottleneck_scores.insert(BottleneckType::Memory, avg_memory);
582 bottleneck_scores.insert(BottleneckType::Network, avg_network / 1000.0);
583 }
585
586 if !metrics_history.is_empty() {
588 let recent_metrics: Vec<_> = metrics_history.iter().rev().take(10).collect();
589
590 let avg_sync_overhead = recent_metrics
591 .iter()
592 .map(|m| m.sync_overhead_ms)
593 .sum::<f64>()
594 / recent_metrics.len() as f64;
595 let avg_message_latency = recent_metrics
596 .iter()
597 .map(|m| m.message_latency_ms)
598 .sum::<f64>()
599 / recent_metrics.len() as f64;
600 let avg_load_balance = recent_metrics
601 .iter()
602 .map(|m| m.load_balance_score)
603 .sum::<f64>()
604 / recent_metrics.len() as f64;
605
606 bottleneck_scores.insert(BottleneckType::Synchronization, avg_sync_overhead / 1000.0); bottleneck_scores.insert(BottleneckType::MessagePassing, avg_message_latency / 1000.0); bottleneck_scores.insert(BottleneckType::LoadImbalance, 1.0 - avg_load_balance);
609 }
611
612 let (primary_bottleneck, bottleneck_severity) = bottleneck_scores
614 .iter()
615 .max_by(|a, b| a.1.partial_cmp(b.1).unwrap_or(std::cmp::Ordering::Equal))
616 .map(|(bottleneck, &severity)| (bottleneck.clone(), severity))
617 .unwrap_or((BottleneckType::None, 0.0));
618
619 let affected_workers: Vec<usize> = self
621 .worker_metrics
622 .iter()
623 .filter(|(_, metrics)| metrics.health_score < 0.7)
624 .map(|(&id, _)| id)
625 .collect();
626
627 let estimated_impact = bottleneck_severity * 0.5; BottleneckAnalysis {
630 primary_bottleneck,
631 bottleneck_severity,
632 affected_workers,
633 estimated_impact,
634 }
635 }
636
637 fn analyze_trends(&self, metricshistory: &VecDeque<PerformanceMetrics>) -> PerformanceTrends {
639 if metricshistory.len() < 5 {
640 return PerformanceTrends {
641 throughput_trend: TrendDirection::Unknown,
642 latency_trend: TrendDirection::Unknown,
643 efficiency_trend: TrendDirection::Unknown,
644 resource_trend: TrendDirection::Unknown,
645 trend_confidence: 0.0,
646 };
647 }
648
649 let recent_metrics: Vec<_> = metricshistory.iter().rev().take(10).collect();
650 let older_metrics: Vec<_> = metricshistory.iter().rev().skip(5).take(10).collect();
651
652 let recent_efficiency = recent_metrics
654 .iter()
655 .map(|m| m.worker_efficiency)
656 .sum::<f64>()
657 / recent_metrics.len() as f64;
658 let older_efficiency = if !older_metrics.is_empty() {
659 older_metrics
660 .iter()
661 .map(|m| m.worker_efficiency)
662 .sum::<f64>()
663 / older_metrics.len() as f64
664 } else {
665 recent_efficiency
666 };
667
668 let efficiency_trend = if (recent_efficiency - older_efficiency).abs() < 0.05 {
669 TrendDirection::Stable
670 } else if recent_efficiency > older_efficiency {
671 TrendDirection::Improving
672 } else {
673 TrendDirection::Degrading
674 };
675
676 let recent_latency = recent_metrics
678 .iter()
679 .map(|m| m.message_latency_ms)
680 .sum::<f64>()
681 / recent_metrics.len() as f64;
682 let older_latency = if !older_metrics.is_empty() {
683 older_metrics
684 .iter()
685 .map(|m| m.message_latency_ms)
686 .sum::<f64>()
687 / older_metrics.len() as f64
688 } else {
689 recent_latency
690 };
691
692 let latency_trend = if (recent_latency - older_latency).abs() < 50.0 {
693 TrendDirection::Stable
694 } else if recent_latency < older_latency {
695 TrendDirection::Improving
696 } else {
697 TrendDirection::Degrading
698 };
699
700 let throughput_trend = efficiency_trend;
702 let resource_trend = TrendDirection::Stable;
703
704 let trend_confidence = if recent_metrics.len() >= 10 { 0.8 } else { 0.4 };
705
706 PerformanceTrends {
707 throughput_trend,
708 latency_trend,
709 efficiency_trend,
710 resource_trend,
711 trend_confidence,
712 }
713 }
714
715 fn generate_recommendations(
717 &self,
718 bottleneck_analysis: &BottleneckAnalysis,
719 performance_trends: &PerformanceTrends,
720 overall_efficiency: f64,
721 ) -> Vec<String> {
722 let mut recommendations = Vec::new();
723
724 match bottleneck_analysis.primary_bottleneck {
726 BottleneckType::Cpu => {
727 recommendations.push(
728 "Consider adding more CPU cores or reducing computational load per worker"
729 .to_string(),
730 );
731 recommendations
732 .push("Optimize algorithms to reduce CPU-intensive operations".to_string());
733 }
734 BottleneckType::Memory => {
735 recommendations.push(
736 "Increase memory allocation or implement more efficient memory management"
737 .to_string(),
738 );
739 recommendations
740 .push("Consider data compression or streaming techniques".to_string());
741 }
742 BottleneckType::Network => {
743 recommendations.push("Optimize network communication patterns".to_string());
744 recommendations.push("Consider message batching or compression".to_string());
745 }
746 BottleneckType::Synchronization => {
747 recommendations.push(
748 "Reduce synchronization frequency or implement asynchronous patterns"
749 .to_string(),
750 );
751 recommendations
752 .push("Consider lockless data structures where possible".to_string());
753 }
754 BottleneckType::LoadImbalance => {
755 recommendations.push("Implement dynamic load balancing".to_string());
756 recommendations.push("Review data partitioning strategy".to_string());
757 }
758 BottleneckType::MessagePassing => {
759 recommendations.push("Optimize message passing protocols".to_string());
760 recommendations.push("Reduce message size or frequency".to_string());
761 }
762 _ => {}
763 }
764
765 match performance_trends.efficiency_trend {
767 TrendDirection::Degrading => {
768 recommendations
769 .push("Performance is degrading - investigate recent changes".to_string());
770 recommendations
771 .push("Consider scaling up resources or optimizing algorithms".to_string());
772 }
773 TrendDirection::Stable => {
774 if overall_efficiency < 0.7 {
775 recommendations.push(
776 "Performance is stable but suboptimal - consider optimization".to_string(),
777 );
778 }
779 }
780 _ => {}
781 }
782
783 if overall_efficiency < 0.5 {
785 recommendations.push(
786 "Overall efficiency is very low - comprehensive system review needed".to_string(),
787 );
788 } else if overall_efficiency < 0.7 {
789 recommendations
790 .push("Moderate efficiency - targeted optimizations recommended".to_string());
791 }
792
793 let unhealthy_workers = self
795 .worker_metrics
796 .iter()
797 .filter(|(_, metrics)| metrics.health_score < 0.6)
798 .count();
799
800 if unhealthy_workers > 0 {
801 recommendations.push(format!(
802 "{} workers are performing poorly - investigate individual worker issues",
803 unhealthy_workers
804 ));
805 }
806
807 if recommendations.is_empty() {
808 recommendations
809 .push("System performance is optimal - no immediate action required".to_string());
810 }
811
812 recommendations
813 }
814
815 pub fn generate_report(&self) -> MonitoringReport {
817 let mut report = MonitoringReport::default();
818
819 let metrics_history = self.metrics_history.lock().unwrap();
821 let resource_usage = self.resource_usage.lock().unwrap();
822
823 if !metrics_history.is_empty() {
824 let recent_metrics: Vec<_> = metrics_history.iter().rev().take(10).collect();
825
826 report.avg_convergence_rate = recent_metrics
827 .iter()
828 .map(|m| m.convergence_rate)
829 .sum::<f64>()
830 / recent_metrics.len() as f64;
831
832 report.avg_worker_efficiency = recent_metrics
833 .iter()
834 .map(|m| m.worker_efficiency)
835 .sum::<f64>()
836 / recent_metrics.len() as f64;
837
838 report.avg_sync_overhead = recent_metrics
839 .iter()
840 .map(|m| m.sync_overhead_ms)
841 .sum::<f64>()
842 / recent_metrics.len() as f64;
843 }
844
845 if !resource_usage.is_empty() {
846 let recent_usage: Vec<_> = resource_usage.iter().rev().take(10).collect();
847
848 report.avg_cpu_utilization =
849 recent_usage.iter().map(|r| r.cpu_utilization).sum::<f64>()
850 / recent_usage.len() as f64;
851
852 report.avg_memory_utilization = recent_usage
853 .iter()
854 .map(|r| r.memory_utilization)
855 .sum::<f64>()
856 / recent_usage.len() as f64;
857
858 report.peak_network_throughput = recent_usage
859 .iter()
860 .map(|r| r.network_throughput_mbps)
861 .fold(0.0, f64::max);
862 }
863
864 report.overall_efficiency_score = self.calculate_efficiency_score();
866 report.recommendations = self.generate_optimization_recommendations();
867
868 report
869 }
870
871 fn calculate_efficiency_score(&self) -> f64 {
873 let metrics_history = self.metrics_history.lock().unwrap();
874 let resource_usage = self.resource_usage.lock().unwrap();
875
876 if metrics_history.is_empty() || resource_usage.is_empty() {
877 return 0.0;
878 }
879
880 let convergence_score = metrics_history
882 .iter()
883 .map(|m| m.convergence_rate.min(1.0))
884 .sum::<f64>()
885 / metrics_history.len() as f64;
886
887 let worker_score = metrics_history
888 .iter()
889 .map(|m| m.worker_efficiency)
890 .sum::<f64>()
891 / metrics_history.len() as f64;
892
893 let resource_score = 1.0
894 - (resource_usage
895 .iter()
896 .map(|r| r.memory_utilization.max(r.cpu_utilization))
897 .sum::<f64>()
898 / resource_usage.len() as f64);
899
900 convergence_score * 0.4 + worker_score * 0.4 + resource_score * 0.2
902 }
903
904 fn generate_optimization_recommendations(&self) -> Vec<String> {
906 let mut recommendations = Vec::new();
907 let metrics_history = self.metrics_history.lock().unwrap();
908 let resource_usage = self.resource_usage.lock().unwrap();
909
910 if let Some(latest_metrics) = metrics_history.back() {
911 if latest_metrics.worker_efficiency < 0.7 {
912 recommendations
913 .push("Consider load rebalancing - worker efficiency is low".to_string());
914 }
915
916 if latest_metrics.sync_overhead_ms > 1000.0 {
917 recommendations.push(
918 "High synchronization overhead - consider reducing coordination frequency"
919 .to_string(),
920 );
921 }
922
923 if latest_metrics.message_latency_ms > 500.0 {
924 recommendations
925 .push("High message latency - check network configuration".to_string());
926 }
927 }
928
929 if let Some(latest_resources) = resource_usage.back() {
930 if latest_resources.memory_utilization > 0.8 {
931 recommendations.push(
932 "High memory usage - consider increasing workers or reducing batch size"
933 .to_string(),
934 );
935 }
936
937 if latest_resources.failed_workers > 0 {
938 recommendations.push(
939 "Worker failures detected - check fault tolerance configuration".to_string(),
940 );
941 }
942
943 if latest_resources.queue_depth > 100 {
944 recommendations.push(
945 "High message queue depth - consider increasing processing capacity"
946 .to_string(),
947 );
948 }
949 }
950
951 if recommendations.is_empty() {
952 recommendations.push("System performance is optimal".to_string());
953 }
954
955 recommendations
956 }
957
958 pub fn export_metrics_csv(&self, filepath: &str) -> Result<()> {
960 use std::fs::File;
961 use std::io::Write;
962
963 let mut file = File::create(filepath)
964 .map_err(|e| ClusteringError::InvalidInput(format!("Failed to create file: {}", e)))?;
965
966 writeln!(file, "timestamp,iteration,global_inertia,convergence_rate,worker_efficiency,message_latency_ms,sync_overhead_ms,memory_pressure")
968 .map_err(|e| ClusteringError::InvalidInput(format!("Failed to write header: {}", e)))?;
969
970 let metrics_history = self.metrics_history.lock().unwrap();
972 for metrics in metrics_history.iter() {
973 writeln!(
974 file,
975 "{:?},{},{},{},{},{},{},{}",
976 metrics.timestamp,
977 metrics.iteration,
978 metrics.global_inertia,
979 metrics.convergence_rate,
980 metrics.worker_efficiency,
981 metrics.message_latency_ms,
982 metrics.sync_overhead_ms,
983 metrics.memory_pressure_score
984 )
985 .map_err(|e| ClusteringError::InvalidInput(format!("Failed to write data: {}", e)))?;
986 }
987
988 Ok(())
989 }
990
991 pub fn get_worker_metrics(&self) -> &HashMap<usize, WorkerMetrics> {
993 &self.worker_metrics
994 }
995
996 pub fn get_config(&self) -> &MonitoringConfig {
998 &self.config
999 }
1000
1001 pub fn get_uptime(&self) -> Duration {
1003 self.start_time.elapsed()
1004 }
1005}
1006
1007#[derive(Debug, Default)]
1009pub struct MonitoringReport {
1010 pub avg_convergence_rate: f64,
1011 pub avg_worker_efficiency: f64,
1012 pub avg_sync_overhead: f64,
1013 pub avg_cpu_utilization: f64,
1014 pub avg_memory_utilization: f64,
1015 pub peak_network_throughput: f64,
1016 pub overall_efficiency_score: f64,
1017 pub recommendations: Vec<String>,
1018}
1019
1020#[cfg(test)]
1021mod tests {
1022 use super::*;
1023
1024 #[test]
1025 fn test_performance_monitor_creation() {
1026 let config = MonitoringConfig::default();
1027 let monitor = PerformanceMonitor::new(config);
1028
1029 assert!(monitor.worker_metrics.is_empty());
1030 assert!(monitor.metrics_history.lock().unwrap().is_empty());
1031 }
1032
1033 #[test]
1034 fn test_worker_registration() {
1035 let config = MonitoringConfig::default();
1036 let mut monitor = PerformanceMonitor::new(config);
1037
1038 monitor.register_worker(1);
1039 assert!(monitor.worker_metrics.contains_key(&1));
1040 assert_eq!(monitor.worker_metrics[&1].worker_id, 1);
1041 }
1042
1043 #[test]
1044 fn test_performance_metrics_recording() {
1045 let config = MonitoringConfig::default();
1046 let monitor = PerformanceMonitor::new(config);
1047
1048 let metrics = PerformanceMetrics {
1049 timestamp: SystemTime::now(),
1050 iteration: 1,
1051 global_inertia: 100.0,
1052 convergence_rate: 0.8,
1053 worker_efficiency: 0.9,
1054 message_latency_ms: 50.0,
1055 sync_overhead_ms: 100.0,
1056 total_computation_time_ms: 5000,
1057 memory_pressure_score: 0.6,
1058 load_balance_score: 0.8,
1059 network_utilization: 0.5,
1060 };
1061
1062 let result = monitor.record_performance_metrics(metrics);
1063 assert!(result.is_ok());
1064 assert_eq!(monitor.metrics_history.lock().unwrap().len(), 1);
1065 }
1066
1067 #[test]
1068 fn test_worker_health_score_calculation() {
1069 let config = MonitoringConfig::default();
1070 let monitor = PerformanceMonitor::new(config);
1071
1072 let mut metrics = WorkerMetrics {
1073 worker_id: 1,
1074 cpu_usage_history: VecDeque::from(vec![0.5, 0.6, 0.4]),
1075 memory_usage_history: VecDeque::from(vec![0.3, 0.4, 0.2]),
1076 throughput_history: VecDeque::new(),
1077 latency_history: VecDeque::from(vec![100.0, 150.0, 80.0]),
1078 error_count: 0,
1079 last_update: SystemTime::now(),
1080 health_score: 0.0,
1081 };
1082
1083 let score = monitor.calculate_worker_health_score(&metrics);
1084 assert!(score > 0.5 && score <= 1.0);
1085
1086 metrics.cpu_usage_history = VecDeque::from(vec![0.95, 0.98, 0.92]);
1088 metrics.memory_usage_history = VecDeque::from(vec![0.9, 0.95, 0.88]);
1089
1090 let degraded_score = monitor.calculate_worker_health_score(&metrics);
1091 assert!(degraded_score < score);
1092 }
1093
1094 #[test]
1095 fn test_alert_generation() {
1096 let config = MonitoringConfig::default();
1097 let monitor = PerformanceMonitor::new(config);
1098
1099 let metrics = PerformanceMetrics {
1101 timestamp: SystemTime::now(),
1102 iteration: 1,
1103 global_inertia: 100.0,
1104 convergence_rate: 0.1, worker_efficiency: 0.3, message_latency_ms: 2000.0, sync_overhead_ms: 100.0,
1108 total_computation_time_ms: 400000, memory_pressure_score: 0.6,
1110 load_balance_score: 0.8,
1111 network_utilization: 0.5,
1112 };
1113
1114 monitor.record_performance_metrics(metrics).unwrap();
1115
1116 let alerts = monitor.check_alerts().unwrap();
1117 assert!(!alerts.is_empty());
1118
1119 let alert_types: Vec<_> = alerts.iter().map(|a| &a.alert_type).collect();
1121 assert!(alert_types
1122 .iter()
1123 .any(|t| matches!(t, AlertType::ConvergenceTimeout)));
1124 assert!(alert_types
1125 .iter()
1126 .any(|t| matches!(t, AlertType::LowThroughput)));
1127 }
1128}