1use sklears_core::{traits::Estimator, types::FloatBounds};
8use std::collections::{HashMap, VecDeque};
9use std::sync::{Arc, Mutex};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12pub struct PipelineMonitor {
14 config: MonitorConfig,
16 metrics: Arc<Mutex<MetricsStorage>>,
18 active_contexts: Arc<Mutex<HashMap<String, ExecutionContext>>>,
20 baselines: HashMap<String, PerformanceBaseline>,
22}
23
24impl PipelineMonitor {
25 #[must_use]
27 pub fn new(config: MonitorConfig) -> Self {
28 Self {
29 config,
30 metrics: Arc::new(Mutex::new(MetricsStorage::new())),
31 active_contexts: Arc::new(Mutex::new(HashMap::new())),
32 baselines: HashMap::new(),
33 }
34 }
35
36 #[must_use]
38 pub fn start_execution(&self, execution_id: &str, pipeline_name: &str) -> ExecutionHandle {
39 let context = ExecutionContext::new(execution_id, pipeline_name);
40
41 if let Ok(mut contexts) = self.active_contexts.lock() {
42 contexts.insert(execution_id.to_string(), context);
43 }
44
45 ExecutionHandle::new(execution_id.to_string(), self.metrics.clone())
46 }
47
48 pub fn record_metric(&self, metric: Metric) {
50 if let Ok(mut metrics) = self.metrics.lock() {
51 metrics.add_metric(metric);
52 }
53 }
54
55 #[must_use]
57 pub fn get_metrics_snapshot(&self) -> MetricsSnapshot {
58 if let Ok(metrics) = self.metrics.lock() {
59 metrics.snapshot()
60 } else {
61 MetricsSnapshot::empty()
62 }
63 }
64
65 #[must_use]
67 pub fn analyze_performance(&self, pipeline_name: &str) -> PerformanceAnalysis {
68 let snapshot = self.get_metrics_snapshot();
69 let pipeline_metrics = snapshot.filter_by_pipeline(pipeline_name);
70
71 PerformanceAnalysis::from_metrics(pipeline_metrics)
72 }
73
74 #[must_use]
76 pub fn detect_anomalies(&self, pipeline_name: &str) -> Vec<Anomaly> {
77 let snapshot = self.get_metrics_snapshot();
78 let pipeline_metrics = snapshot.filter_by_pipeline(pipeline_name);
79
80 let mut anomalies = Vec::new();
81
82 if let Some(baseline) = self.baselines.get(pipeline_name) {
84 for metric in &pipeline_metrics.execution_times {
85 if metric.value > baseline.avg_execution_time * 2.0 {
86 anomalies.push(Anomaly {
87 anomaly_type: AnomalyType::SlowExecution,
88 severity: AnomalySeverity::High,
89 timestamp: metric.timestamp,
90 description: format!(
91 "Execution time {:.2}s is significantly higher than baseline {:.2}s",
92 metric.value, baseline.avg_execution_time
93 ),
94 pipeline_name: pipeline_name.to_string(),
95 metric_name: metric.name.clone(),
96 });
97 }
98 }
99 }
100
101 for metric in &pipeline_metrics.memory_usage {
103 if metric.value > self.config.memory_threshold_mb {
104 anomalies.push(Anomaly {
105 anomaly_type: AnomalyType::HighMemoryUsage,
106 severity: AnomalySeverity::Medium,
107 timestamp: metric.timestamp,
108 description: format!(
109 "Memory usage {:.2}MB exceeds threshold {:.2}MB",
110 metric.value, self.config.memory_threshold_mb
111 ),
112 pipeline_name: pipeline_name.to_string(),
113 metric_name: metric.name.clone(),
114 });
115 }
116 }
117
118 anomalies
119 }
120
121 pub fn set_baseline(&mut self, pipeline_name: &str, baseline: PerformanceBaseline) {
123 self.baselines.insert(pipeline_name.to_string(), baseline);
124 }
125
126 #[must_use]
128 pub fn get_active_executions(&self) -> Vec<ExecutionContext> {
129 if let Ok(contexts) = self.active_contexts.lock() {
130 contexts.values().cloned().collect()
131 } else {
132 Vec::new()
133 }
134 }
135}
136
137#[derive(Debug, Clone)]
139pub struct MonitorConfig {
140 pub max_metrics: usize,
142 pub sampling_interval: Duration,
144 pub memory_threshold_mb: f64,
146 pub execution_time_threshold_sec: f64,
148 pub enable_profiling: bool,
150 pub enable_tracing: bool,
152}
153
154impl MonitorConfig {
155 #[must_use]
157 pub fn new() -> Self {
158 Self {
159 max_metrics: 10000,
160 sampling_interval: Duration::from_secs(1),
161 memory_threshold_mb: 1024.0, execution_time_threshold_sec: 60.0, enable_profiling: true,
164 enable_tracing: false,
165 }
166 }
167
168 #[must_use]
170 pub fn max_metrics(mut self, max: usize) -> Self {
171 self.max_metrics = max;
172 self
173 }
174
175 #[must_use]
177 pub fn sampling_interval(mut self, interval: Duration) -> Self {
178 self.sampling_interval = interval;
179 self
180 }
181
182 #[must_use]
184 pub fn memory_threshold_mb(mut self, threshold: f64) -> Self {
185 self.memory_threshold_mb = threshold;
186 self
187 }
188
189 #[must_use]
191 pub fn enable_profiling(mut self, enable: bool) -> Self {
192 self.enable_profiling = enable;
193 self
194 }
195
196 #[must_use]
198 pub fn enable_tracing(mut self, enable: bool) -> Self {
199 self.enable_tracing = enable;
200 self
201 }
202}
203
204impl Default for MonitorConfig {
205 fn default() -> Self {
206 Self::new()
207 }
208}
209
210pub struct ExecutionHandle {
212 execution_id: String,
213 start_time: Instant,
214 metrics: Arc<Mutex<MetricsStorage>>,
215 stage_timings: HashMap<String, Instant>,
216}
217
218impl ExecutionHandle {
219 fn new(execution_id: String, metrics: Arc<Mutex<MetricsStorage>>) -> Self {
221 Self {
222 execution_id,
223 start_time: Instant::now(),
224 metrics,
225 stage_timings: HashMap::new(),
226 }
227 }
228
229 pub fn start_stage(&mut self, stage_name: &str) {
231 self.stage_timings
232 .insert(stage_name.to_string(), Instant::now());
233 }
234
235 pub fn end_stage(&mut self, stage_name: &str) {
237 if let Some(start_time) = self.stage_timings.remove(stage_name) {
238 let duration = start_time.elapsed();
239
240 let metric = Metric {
241 name: format!("stage_duration_{stage_name}"),
242 value: duration.as_secs_f64(),
243 timestamp: SystemTime::now()
244 .duration_since(UNIX_EPOCH)
245 .unwrap()
246 .as_secs(),
247 pipeline_name: self.execution_id.clone(),
248 stage_name: Some(stage_name.to_string()),
249 execution_id: Some(self.execution_id.clone()),
250 metadata: HashMap::new(),
251 };
252
253 if let Ok(mut metrics) = self.metrics.lock() {
254 metrics.add_metric(metric);
255 }
256 }
257 }
258
259 pub fn record_metric(&self, name: &str, value: f64) {
261 let metric = Metric {
262 name: name.to_string(),
263 value,
264 timestamp: SystemTime::now()
265 .duration_since(UNIX_EPOCH)
266 .unwrap()
267 .as_secs(),
268 pipeline_name: self.execution_id.clone(),
269 stage_name: None,
270 execution_id: Some(self.execution_id.clone()),
271 metadata: HashMap::new(),
272 };
273
274 if let Ok(mut metrics) = self.metrics.lock() {
275 metrics.add_metric(metric);
276 }
277 }
278
279 pub fn record_memory_usage(&self, usage_mb: f64) {
281 self.record_metric("memory_usage_mb", usage_mb);
282 }
283
284 pub fn record_throughput(&self, samples_per_sec: f64) {
286 self.record_metric("throughput_samples_per_sec", samples_per_sec);
287 }
288
289 pub fn finish(self) {
291 let total_duration = self.start_time.elapsed();
292
293 let metric = Metric {
294 name: "total_execution_time".to_string(),
295 value: total_duration.as_secs_f64(),
296 timestamp: SystemTime::now()
297 .duration_since(UNIX_EPOCH)
298 .unwrap()
299 .as_secs(),
300 pipeline_name: self.execution_id.clone(),
301 stage_name: None,
302 execution_id: Some(self.execution_id.clone()),
303 metadata: HashMap::new(),
304 };
305
306 if let Ok(mut metrics) = self.metrics.lock() {
307 metrics.add_metric(metric);
308 }
309 }
310}
311
312#[derive(Debug, Clone)]
314pub struct ExecutionContext {
315 pub execution_id: String,
317 pub pipeline_name: String,
319 pub start_time: u64,
321 pub current_stage: Option<String>,
323 pub status: ExecutionStatus,
325}
326
327impl ExecutionContext {
328 fn new(execution_id: &str, pipeline_name: &str) -> Self {
330 Self {
331 execution_id: execution_id.to_string(),
332 pipeline_name: pipeline_name.to_string(),
333 start_time: SystemTime::now()
334 .duration_since(UNIX_EPOCH)
335 .unwrap()
336 .as_secs(),
337 current_stage: None,
338 status: ExecutionStatus::Running,
339 }
340 }
341}
342
343#[derive(Debug, Clone, PartialEq)]
345pub enum ExecutionStatus {
346 Running,
348 Completed,
350 Failed,
352 Cancelled,
354}
355
356#[derive(Debug, Clone)]
358pub struct Metric {
359 pub name: String,
361 pub value: f64,
363 pub timestamp: u64,
365 pub pipeline_name: String,
367 pub stage_name: Option<String>,
369 pub execution_id: Option<String>,
371 pub metadata: HashMap<String, String>,
373}
374
375struct MetricsStorage {
377 metrics: VecDeque<Metric>,
378 max_size: usize,
379}
380
381impl MetricsStorage {
382 fn new() -> Self {
384 Self {
385 metrics: VecDeque::new(),
386 max_size: 10000,
387 }
388 }
389
390 fn add_metric(&mut self, metric: Metric) {
392 if self.metrics.len() >= self.max_size {
393 self.metrics.pop_front();
394 }
395 self.metrics.push_back(metric);
396 }
397
398 fn snapshot(&self) -> MetricsSnapshot {
400 MetricsSnapshot {
401 execution_times: self
402 .metrics
403 .iter()
404 .filter(|m| m.name.contains("execution_time"))
405 .cloned()
406 .collect(),
407 memory_usage: self
408 .metrics
409 .iter()
410 .filter(|m| m.name.contains("memory_usage"))
411 .cloned()
412 .collect(),
413 throughput: self
414 .metrics
415 .iter()
416 .filter(|m| m.name.contains("throughput"))
417 .cloned()
418 .collect(),
419 stage_durations: self
420 .metrics
421 .iter()
422 .filter(|m| m.name.starts_with("stage_duration"))
423 .cloned()
424 .collect(),
425 custom_metrics: self
426 .metrics
427 .iter()
428 .filter(|m| {
429 !m.name.contains("execution_time")
430 && !m.name.contains("memory_usage")
431 && !m.name.contains("throughput")
432 && !m.name.starts_with("stage_duration")
433 })
434 .cloned()
435 .collect(),
436 }
437 }
438}
439
440#[derive(Debug, Clone)]
442pub struct MetricsSnapshot {
443 pub execution_times: Vec<Metric>,
445 pub memory_usage: Vec<Metric>,
447 pub throughput: Vec<Metric>,
449 pub stage_durations: Vec<Metric>,
451 pub custom_metrics: Vec<Metric>,
453}
454
455impl MetricsSnapshot {
456 fn empty() -> Self {
458 Self {
459 execution_times: Vec::new(),
460 memory_usage: Vec::new(),
461 throughput: Vec::new(),
462 stage_durations: Vec::new(),
463 custom_metrics: Vec::new(),
464 }
465 }
466
467 fn filter_by_pipeline(&self, pipeline_name: &str) -> Self {
469 Self {
470 execution_times: self
471 .execution_times
472 .iter()
473 .filter(|m| m.pipeline_name == pipeline_name)
474 .cloned()
475 .collect(),
476 memory_usage: self
477 .memory_usage
478 .iter()
479 .filter(|m| m.pipeline_name == pipeline_name)
480 .cloned()
481 .collect(),
482 throughput: self
483 .throughput
484 .iter()
485 .filter(|m| m.pipeline_name == pipeline_name)
486 .cloned()
487 .collect(),
488 stage_durations: self
489 .stage_durations
490 .iter()
491 .filter(|m| m.pipeline_name == pipeline_name)
492 .cloned()
493 .collect(),
494 custom_metrics: self
495 .custom_metrics
496 .iter()
497 .filter(|m| m.pipeline_name == pipeline_name)
498 .cloned()
499 .collect(),
500 }
501 }
502
503 #[must_use]
505 pub fn all_metrics(&self) -> Vec<&Metric> {
506 let mut all = Vec::new();
507 all.extend(&self.execution_times);
508 all.extend(&self.memory_usage);
509 all.extend(&self.throughput);
510 all.extend(&self.stage_durations);
511 all.extend(&self.custom_metrics);
512 all
513 }
514}
515
516#[derive(Debug, Clone)]
518pub struct PerformanceAnalysis {
519 pub avg_execution_time: f64,
521 pub execution_time_percentiles: HashMap<u8, f64>,
523 pub avg_memory_usage: f64,
525 pub peak_memory_usage: f64,
527 pub avg_throughput: f64,
529 pub stage_breakdown: HashMap<String, StagePerformance>,
531 pub trends: PerformanceTrends,
533}
534
535impl PerformanceAnalysis {
536 fn from_metrics(metrics: MetricsSnapshot) -> Self {
538 let mut execution_times: Vec<f64> =
539 metrics.execution_times.iter().map(|m| m.value).collect();
540 execution_times.sort_by(|a, b| a.partial_cmp(b).unwrap());
541
542 let avg_execution_time = if execution_times.is_empty() {
543 0.0
544 } else {
545 execution_times.iter().sum::<f64>() / execution_times.len() as f64
546 };
547
548 let execution_time_percentiles = Self::calculate_percentiles(&execution_times);
549
550 let memory_values: Vec<f64> = metrics.memory_usage.iter().map(|m| m.value).collect();
551 let avg_memory_usage = if memory_values.is_empty() {
552 0.0
553 } else {
554 memory_values.iter().sum::<f64>() / memory_values.len() as f64
555 };
556 let peak_memory_usage = memory_values.iter().fold(0.0f64, |acc, &x| acc.max(x));
557
558 let throughput_values: Vec<f64> = metrics.throughput.iter().map(|m| m.value).collect();
559 let avg_throughput = if throughput_values.is_empty() {
560 0.0
561 } else {
562 throughput_values.iter().sum::<f64>() / throughput_values.len() as f64
563 };
564
565 let stage_breakdown = Self::analyze_stages(&metrics.stage_durations);
566 let trends = Self::analyze_trends(&metrics);
567
568 Self {
569 avg_execution_time,
570 execution_time_percentiles,
571 avg_memory_usage,
572 peak_memory_usage,
573 avg_throughput,
574 stage_breakdown,
575 trends,
576 }
577 }
578
579 fn calculate_percentiles(sorted_values: &[f64]) -> HashMap<u8, f64> {
581 let mut percentiles = HashMap::new();
582
583 if sorted_values.is_empty() {
584 return percentiles;
585 }
586
587 for p in &[50, 75, 90, 95, 99] {
588 let index =
589 ((f64::from(*p) / 100.0) * (sorted_values.len() - 1) as f64).round() as usize;
590 let index = index.min(sorted_values.len() - 1);
591 percentiles.insert(*p, sorted_values[index]);
592 }
593
594 percentiles
595 }
596
597 fn analyze_stages(stage_metrics: &[Metric]) -> HashMap<String, StagePerformance> {
599 let mut stage_map: HashMap<String, Vec<f64>> = HashMap::new();
600
601 for metric in stage_metrics {
602 if let Some(stage_name) = &metric.stage_name {
603 stage_map
604 .entry(stage_name.clone())
605 .or_default()
606 .push(metric.value);
607 }
608 }
609
610 let mut stage_breakdown = HashMap::new();
611 for (stage_name, times) in stage_map {
612 let avg_time = times.iter().sum::<f64>() / times.len() as f64;
613 let min_time = times.iter().fold(f64::INFINITY, |acc, &x| acc.min(x));
614 let max_time = times.iter().fold(0.0f64, |acc, &x| acc.max(x));
615
616 stage_breakdown.insert(
617 stage_name,
618 StagePerformance {
619 avg_duration: avg_time,
620 min_duration: min_time,
621 max_duration: max_time,
622 execution_count: times.len(),
623 },
624 );
625 }
626
627 stage_breakdown
628 }
629
630 fn analyze_trends(metrics: &MetricsSnapshot) -> PerformanceTrends {
632 let recent_count = 10;
634
635 let recent_execution_times: Vec<f64> = metrics
636 .execution_times
637 .iter()
638 .rev()
639 .take(recent_count)
640 .map(|m| m.value)
641 .collect();
642
643 let execution_time_trend = if recent_execution_times.len() >= 2 {
644 let first_half = &recent_execution_times[recent_execution_times.len() / 2..];
645 let second_half = &recent_execution_times[..recent_execution_times.len() / 2];
646
647 let first_avg = first_half.iter().sum::<f64>() / first_half.len() as f64;
648 let second_avg = second_half.iter().sum::<f64>() / second_half.len() as f64;
649
650 if second_avg > first_avg * 1.1 {
651 Trend::Increasing
652 } else if second_avg < first_avg * 0.9 {
653 Trend::Decreasing
654 } else {
655 Trend::Stable
656 }
657 } else {
658 Trend::Unknown
659 };
660
661 PerformanceTrends {
662 execution_time_trend,
663 memory_usage_trend: Trend::Unknown, throughput_trend: Trend::Unknown,
665 }
666 }
667}
668
669#[derive(Debug, Clone)]
671pub struct StagePerformance {
672 pub avg_duration: f64,
674 pub min_duration: f64,
676 pub max_duration: f64,
678 pub execution_count: usize,
680}
681
682#[derive(Debug, Clone)]
684pub struct PerformanceTrends {
685 pub execution_time_trend: Trend,
687 pub memory_usage_trend: Trend,
689 pub throughput_trend: Trend,
691}
692
693#[derive(Debug, Clone, PartialEq)]
695pub enum Trend {
696 Increasing,
698 Decreasing,
700 Stable,
702 Unknown,
704}
705
706#[derive(Debug, Clone)]
708pub struct PerformanceBaseline {
709 pub avg_execution_time: f64,
711 pub std_dev_execution_time: f64,
713 pub avg_memory_usage: f64,
715 pub avg_throughput: f64,
717}
718
719#[derive(Debug, Clone)]
721pub struct Anomaly {
722 pub anomaly_type: AnomalyType,
724 pub severity: AnomalySeverity,
726 pub timestamp: u64,
728 pub description: String,
730 pub pipeline_name: String,
732 pub metric_name: String,
734}
735
736#[derive(Debug, Clone, PartialEq)]
738pub enum AnomalyType {
739 SlowExecution,
741 HighMemoryUsage,
743 LowThroughput,
745 HighErrorRate,
747 ResourceContention,
749}
750
751#[derive(Debug, Clone, PartialEq)]
753pub enum AnomalySeverity {
754 Low,
756 Medium,
758 High,
760 Critical,
762}
763
764#[allow(non_snake_case)]
765#[cfg(test)]
766mod tests {
767 use super::*;
768
769 #[test]
770 fn test_monitor_config() {
771 let config = MonitorConfig::new()
772 .max_metrics(5000)
773 .memory_threshold_mb(512.0)
774 .enable_profiling(true);
775
776 assert_eq!(config.max_metrics, 5000);
777 assert_eq!(config.memory_threshold_mb, 512.0);
778 assert!(config.enable_profiling);
779 }
780
781 #[test]
782 fn test_pipeline_monitor() {
783 let config = MonitorConfig::new();
784 let monitor = PipelineMonitor::new(config);
785
786 let mut handle = monitor.start_execution("test-exec-1", "test-pipeline");
787 handle.start_stage("preprocessing");
788 handle.end_stage("preprocessing");
789 handle.record_memory_usage(256.0);
790 handle.finish();
791
792 let snapshot = monitor.get_metrics_snapshot();
793 assert!(!snapshot.stage_durations.is_empty());
794 assert!(!snapshot.memory_usage.is_empty());
795 }
796
797 #[test]
798 fn test_execution_handle() {
799 let metrics = Arc::new(Mutex::new(MetricsStorage::new()));
800 let mut handle = ExecutionHandle::new("test-id".to_string(), metrics.clone());
801
802 handle.start_stage("test-stage");
803 std::thread::sleep(Duration::from_millis(10));
804 handle.end_stage("test-stage");
805
806 let metrics_lock = metrics.lock().unwrap();
807 assert!(!metrics_lock.metrics.is_empty());
808 }
809
810 #[test]
811 fn test_metrics_snapshot() {
812 let mut snapshot = MetricsSnapshot::empty();
813 let metric = Metric {
814 name: "test_metric".to_string(),
815 value: 42.0,
816 timestamp: 123456789,
817 pipeline_name: "test-pipeline".to_string(),
818 stage_name: None,
819 execution_id: None,
820 metadata: HashMap::new(),
821 };
822
823 snapshot.custom_metrics.push(metric);
824 assert_eq!(snapshot.all_metrics().len(), 1);
825 }
826
827 #[test]
828 fn test_performance_analysis() {
829 let mut snapshot = MetricsSnapshot::empty();
830
831 for i in 1..=5 {
833 snapshot.execution_times.push(Metric {
834 name: "execution_time".to_string(),
835 value: i as f64,
836 timestamp: 123456789,
837 pipeline_name: "test".to_string(),
838 stage_name: None,
839 execution_id: None,
840 metadata: HashMap::new(),
841 });
842 }
843
844 let analysis = PerformanceAnalysis::from_metrics(snapshot);
845 assert_eq!(analysis.avg_execution_time, 3.0);
846 assert!(analysis.execution_time_percentiles.contains_key(&50));
847 }
848
849 #[test]
850 fn test_anomaly_detection() {
851 let config = MonitorConfig::new().memory_threshold_mb(100.0);
852 let mut monitor = PipelineMonitor::new(config);
853
854 let baseline = PerformanceBaseline {
855 avg_execution_time: 1.0,
856 std_dev_execution_time: 0.1,
857 avg_memory_usage: 50.0,
858 avg_throughput: 1000.0,
859 };
860 monitor.set_baseline("test-pipeline", baseline);
861
862 monitor.record_metric(Metric {
864 name: "memory_usage_mb".to_string(),
865 value: 200.0, timestamp: 123456789,
867 pipeline_name: "test-pipeline".to_string(),
868 stage_name: None,
869 execution_id: None,
870 metadata: HashMap::new(),
871 });
872
873 let anomalies = monitor.detect_anomalies("test-pipeline");
874 assert!(!anomalies.is_empty());
875 assert_eq!(anomalies[0].anomaly_type, AnomalyType::HighMemoryUsage);
876 }
877}