sklears_compose/
monitoring.rs

1//! Pipeline monitoring and performance profiling utilities
2//!
3//! This module provides comprehensive monitoring, profiling, and observability
4//! tools for pipeline execution, including real-time metrics collection,
5//! performance analysis, and anomaly detection.
6
7use sklears_core::{traits::Estimator, types::FloatBounds};
8use std::collections::{HashMap, VecDeque};
9use std::sync::{Arc, Mutex};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12/// Pipeline execution monitor with real-time metrics collection
13pub struct PipelineMonitor {
14    /// Monitor configuration
15    config: MonitorConfig,
16    /// Execution metrics storage
17    metrics: Arc<Mutex<MetricsStorage>>,
18    /// Active execution contexts
19    active_contexts: Arc<Mutex<HashMap<String, ExecutionContext>>>,
20    /// Performance baselines
21    baselines: HashMap<String, PerformanceBaseline>,
22}
23
24impl PipelineMonitor {
25    /// Create a new pipeline monitor
26    #[must_use]
27    pub fn new(config: MonitorConfig) -> Self {
28        Self {
29            config,
30            metrics: Arc::new(Mutex::new(MetricsStorage::new())),
31            active_contexts: Arc::new(Mutex::new(HashMap::new())),
32            baselines: HashMap::new(),
33        }
34    }
35
36    /// Start monitoring a pipeline execution
37    #[must_use]
38    pub fn start_execution(&self, execution_id: &str, pipeline_name: &str) -> ExecutionHandle {
39        let context = ExecutionContext::new(execution_id, pipeline_name);
40
41        if let Ok(mut contexts) = self.active_contexts.lock() {
42            contexts.insert(execution_id.to_string(), context);
43        }
44
45        ExecutionHandle::new(execution_id.to_string(), self.metrics.clone())
46    }
47
48    /// Record a metric
49    pub fn record_metric(&self, metric: Metric) {
50        if let Ok(mut metrics) = self.metrics.lock() {
51            metrics.add_metric(metric);
52        }
53    }
54
55    /// Get current metrics snapshot
56    #[must_use]
57    pub fn get_metrics_snapshot(&self) -> MetricsSnapshot {
58        if let Ok(metrics) = self.metrics.lock() {
59            metrics.snapshot()
60        } else {
61            MetricsSnapshot::empty()
62        }
63    }
64
65    /// Analyze performance trends
66    #[must_use]
67    pub fn analyze_performance(&self, pipeline_name: &str) -> PerformanceAnalysis {
68        let snapshot = self.get_metrics_snapshot();
69        let pipeline_metrics = snapshot.filter_by_pipeline(pipeline_name);
70
71        PerformanceAnalysis::from_metrics(pipeline_metrics)
72    }
73
74    /// Detect anomalies in pipeline execution
75    #[must_use]
76    pub fn detect_anomalies(&self, pipeline_name: &str) -> Vec<Anomaly> {
77        let snapshot = self.get_metrics_snapshot();
78        let pipeline_metrics = snapshot.filter_by_pipeline(pipeline_name);
79
80        let mut anomalies = Vec::new();
81
82        // Check for execution time anomalies
83        if let Some(baseline) = self.baselines.get(pipeline_name) {
84            for metric in &pipeline_metrics.execution_times {
85                if metric.value > baseline.avg_execution_time * 2.0 {
86                    anomalies.push(Anomaly {
87                        anomaly_type: AnomalyType::SlowExecution,
88                        severity: AnomalySeverity::High,
89                        timestamp: metric.timestamp,
90                        description: format!(
91                            "Execution time {:.2}s is significantly higher than baseline {:.2}s",
92                            metric.value, baseline.avg_execution_time
93                        ),
94                        pipeline_name: pipeline_name.to_string(),
95                        metric_name: metric.name.clone(),
96                    });
97                }
98            }
99        }
100
101        // Check for memory usage anomalies
102        for metric in &pipeline_metrics.memory_usage {
103            if metric.value > self.config.memory_threshold_mb {
104                anomalies.push(Anomaly {
105                    anomaly_type: AnomalyType::HighMemoryUsage,
106                    severity: AnomalySeverity::Medium,
107                    timestamp: metric.timestamp,
108                    description: format!(
109                        "Memory usage {:.2}MB exceeds threshold {:.2}MB",
110                        metric.value, self.config.memory_threshold_mb
111                    ),
112                    pipeline_name: pipeline_name.to_string(),
113                    metric_name: metric.name.clone(),
114                });
115            }
116        }
117
118        anomalies
119    }
120
121    /// Set performance baseline for a pipeline
122    pub fn set_baseline(&mut self, pipeline_name: &str, baseline: PerformanceBaseline) {
123        self.baselines.insert(pipeline_name.to_string(), baseline);
124    }
125
126    /// Get active execution contexts
127    #[must_use]
128    pub fn get_active_executions(&self) -> Vec<ExecutionContext> {
129        if let Ok(contexts) = self.active_contexts.lock() {
130            contexts.values().cloned().collect()
131        } else {
132            Vec::new()
133        }
134    }
135}
136
137/// Monitor configuration
138#[derive(Debug, Clone)]
139pub struct MonitorConfig {
140    /// Maximum number of metrics to retain
141    pub max_metrics: usize,
142    /// Sampling interval for continuous metrics
143    pub sampling_interval: Duration,
144    /// Memory usage threshold for anomaly detection (MB)
145    pub memory_threshold_mb: f64,
146    /// Execution time threshold for anomaly detection (seconds)
147    pub execution_time_threshold_sec: f64,
148    /// Enable detailed profiling
149    pub enable_profiling: bool,
150    /// Enable distributed tracing
151    pub enable_tracing: bool,
152}
153
154impl MonitorConfig {
155    /// Create a new monitor configuration
156    #[must_use]
157    pub fn new() -> Self {
158        Self {
159            max_metrics: 10000,
160            sampling_interval: Duration::from_secs(1),
161            memory_threshold_mb: 1024.0,        // 1GB
162            execution_time_threshold_sec: 60.0, // 1 minute
163            enable_profiling: true,
164            enable_tracing: false,
165        }
166    }
167
168    /// Set maximum metrics retention
169    #[must_use]
170    pub fn max_metrics(mut self, max: usize) -> Self {
171        self.max_metrics = max;
172        self
173    }
174
175    /// Set sampling interval
176    #[must_use]
177    pub fn sampling_interval(mut self, interval: Duration) -> Self {
178        self.sampling_interval = interval;
179        self
180    }
181
182    /// Set memory threshold
183    #[must_use]
184    pub fn memory_threshold_mb(mut self, threshold: f64) -> Self {
185        self.memory_threshold_mb = threshold;
186        self
187    }
188
189    /// Enable profiling
190    #[must_use]
191    pub fn enable_profiling(mut self, enable: bool) -> Self {
192        self.enable_profiling = enable;
193        self
194    }
195
196    /// Enable tracing
197    #[must_use]
198    pub fn enable_tracing(mut self, enable: bool) -> Self {
199        self.enable_tracing = enable;
200        self
201    }
202}
203
204impl Default for MonitorConfig {
205    fn default() -> Self {
206        Self::new()
207    }
208}
209
210/// Execution handle for tracking a specific pipeline run
211pub struct ExecutionHandle {
212    execution_id: String,
213    start_time: Instant,
214    metrics: Arc<Mutex<MetricsStorage>>,
215    stage_timings: HashMap<String, Instant>,
216}
217
218impl ExecutionHandle {
219    /// Create a new execution handle
220    fn new(execution_id: String, metrics: Arc<Mutex<MetricsStorage>>) -> Self {
221        Self {
222            execution_id,
223            start_time: Instant::now(),
224            metrics,
225            stage_timings: HashMap::new(),
226        }
227    }
228
229    /// Start timing a pipeline stage
230    pub fn start_stage(&mut self, stage_name: &str) {
231        self.stage_timings
232            .insert(stage_name.to_string(), Instant::now());
233    }
234
235    /// End timing a pipeline stage
236    pub fn end_stage(&mut self, stage_name: &str) {
237        if let Some(start_time) = self.stage_timings.remove(stage_name) {
238            let duration = start_time.elapsed();
239
240            let metric = Metric {
241                name: format!("stage_duration_{stage_name}"),
242                value: duration.as_secs_f64(),
243                timestamp: SystemTime::now()
244                    .duration_since(UNIX_EPOCH)
245                    .unwrap()
246                    .as_secs(),
247                pipeline_name: self.execution_id.clone(),
248                stage_name: Some(stage_name.to_string()),
249                execution_id: Some(self.execution_id.clone()),
250                metadata: HashMap::new(),
251            };
252
253            if let Ok(mut metrics) = self.metrics.lock() {
254                metrics.add_metric(metric);
255            }
256        }
257    }
258
259    /// Record custom metric
260    pub fn record_metric(&self, name: &str, value: f64) {
261        let metric = Metric {
262            name: name.to_string(),
263            value,
264            timestamp: SystemTime::now()
265                .duration_since(UNIX_EPOCH)
266                .unwrap()
267                .as_secs(),
268            pipeline_name: self.execution_id.clone(),
269            stage_name: None,
270            execution_id: Some(self.execution_id.clone()),
271            metadata: HashMap::new(),
272        };
273
274        if let Ok(mut metrics) = self.metrics.lock() {
275            metrics.add_metric(metric);
276        }
277    }
278
279    /// Record memory usage
280    pub fn record_memory_usage(&self, usage_mb: f64) {
281        self.record_metric("memory_usage_mb", usage_mb);
282    }
283
284    /// Record throughput
285    pub fn record_throughput(&self, samples_per_sec: f64) {
286        self.record_metric("throughput_samples_per_sec", samples_per_sec);
287    }
288
289    /// Finish execution and record total time
290    pub fn finish(self) {
291        let total_duration = self.start_time.elapsed();
292
293        let metric = Metric {
294            name: "total_execution_time".to_string(),
295            value: total_duration.as_secs_f64(),
296            timestamp: SystemTime::now()
297                .duration_since(UNIX_EPOCH)
298                .unwrap()
299                .as_secs(),
300            pipeline_name: self.execution_id.clone(),
301            stage_name: None,
302            execution_id: Some(self.execution_id.clone()),
303            metadata: HashMap::new(),
304        };
305
306        if let Ok(mut metrics) = self.metrics.lock() {
307            metrics.add_metric(metric);
308        }
309    }
310}
311
312/// Execution context for active pipeline runs
313#[derive(Debug, Clone)]
314pub struct ExecutionContext {
315    /// Unique execution identifier
316    pub execution_id: String,
317    /// Pipeline name
318    pub pipeline_name: String,
319    /// Start timestamp
320    pub start_time: u64,
321    /// Current stage
322    pub current_stage: Option<String>,
323    /// Execution status
324    pub status: ExecutionStatus,
325}
326
327impl ExecutionContext {
328    /// Create a new execution context
329    fn new(execution_id: &str, pipeline_name: &str) -> Self {
330        Self {
331            execution_id: execution_id.to_string(),
332            pipeline_name: pipeline_name.to_string(),
333            start_time: SystemTime::now()
334                .duration_since(UNIX_EPOCH)
335                .unwrap()
336                .as_secs(),
337            current_stage: None,
338            status: ExecutionStatus::Running,
339        }
340    }
341}
342
343/// Execution status
344#[derive(Debug, Clone, PartialEq)]
345pub enum ExecutionStatus {
346    /// Pipeline is currently running
347    Running,
348    /// Pipeline completed successfully
349    Completed,
350    /// Pipeline failed with error
351    Failed,
352    /// Pipeline was cancelled
353    Cancelled,
354}
355
356/// Individual metric measurement
357#[derive(Debug, Clone)]
358pub struct Metric {
359    /// Metric name
360    pub name: String,
361    /// Metric value
362    pub value: f64,
363    /// Timestamp (Unix epoch seconds)
364    pub timestamp: u64,
365    /// Pipeline name
366    pub pipeline_name: String,
367    /// Optional stage name
368    pub stage_name: Option<String>,
369    /// Optional execution ID
370    pub execution_id: Option<String>,
371    /// Additional metadata
372    pub metadata: HashMap<String, String>,
373}
374
375/// Metrics storage with circular buffer
376struct MetricsStorage {
377    metrics: VecDeque<Metric>,
378    max_size: usize,
379}
380
381impl MetricsStorage {
382    /// Create new metrics storage
383    fn new() -> Self {
384        Self {
385            metrics: VecDeque::new(),
386            max_size: 10000,
387        }
388    }
389
390    /// Add a metric
391    fn add_metric(&mut self, metric: Metric) {
392        if self.metrics.len() >= self.max_size {
393            self.metrics.pop_front();
394        }
395        self.metrics.push_back(metric);
396    }
397
398    /// Create a snapshot of current metrics
399    fn snapshot(&self) -> MetricsSnapshot {
400        MetricsSnapshot {
401            execution_times: self
402                .metrics
403                .iter()
404                .filter(|m| m.name.contains("execution_time"))
405                .cloned()
406                .collect(),
407            memory_usage: self
408                .metrics
409                .iter()
410                .filter(|m| m.name.contains("memory_usage"))
411                .cloned()
412                .collect(),
413            throughput: self
414                .metrics
415                .iter()
416                .filter(|m| m.name.contains("throughput"))
417                .cloned()
418                .collect(),
419            stage_durations: self
420                .metrics
421                .iter()
422                .filter(|m| m.name.starts_with("stage_duration"))
423                .cloned()
424                .collect(),
425            custom_metrics: self
426                .metrics
427                .iter()
428                .filter(|m| {
429                    !m.name.contains("execution_time")
430                        && !m.name.contains("memory_usage")
431                        && !m.name.contains("throughput")
432                        && !m.name.starts_with("stage_duration")
433                })
434                .cloned()
435                .collect(),
436        }
437    }
438}
439
440/// Snapshot of metrics at a point in time
441#[derive(Debug, Clone)]
442pub struct MetricsSnapshot {
443    /// Execution time metrics
444    pub execution_times: Vec<Metric>,
445    /// Memory usage metrics
446    pub memory_usage: Vec<Metric>,
447    /// Throughput metrics
448    pub throughput: Vec<Metric>,
449    /// Stage duration metrics
450    pub stage_durations: Vec<Metric>,
451    /// Custom metrics
452    pub custom_metrics: Vec<Metric>,
453}
454
455impl MetricsSnapshot {
456    /// Create an empty snapshot
457    fn empty() -> Self {
458        Self {
459            execution_times: Vec::new(),
460            memory_usage: Vec::new(),
461            throughput: Vec::new(),
462            stage_durations: Vec::new(),
463            custom_metrics: Vec::new(),
464        }
465    }
466
467    /// Filter metrics by pipeline name
468    fn filter_by_pipeline(&self, pipeline_name: &str) -> Self {
469        Self {
470            execution_times: self
471                .execution_times
472                .iter()
473                .filter(|m| m.pipeline_name == pipeline_name)
474                .cloned()
475                .collect(),
476            memory_usage: self
477                .memory_usage
478                .iter()
479                .filter(|m| m.pipeline_name == pipeline_name)
480                .cloned()
481                .collect(),
482            throughput: self
483                .throughput
484                .iter()
485                .filter(|m| m.pipeline_name == pipeline_name)
486                .cloned()
487                .collect(),
488            stage_durations: self
489                .stage_durations
490                .iter()
491                .filter(|m| m.pipeline_name == pipeline_name)
492                .cloned()
493                .collect(),
494            custom_metrics: self
495                .custom_metrics
496                .iter()
497                .filter(|m| m.pipeline_name == pipeline_name)
498                .cloned()
499                .collect(),
500        }
501    }
502
503    /// Get all metrics as a single vector
504    #[must_use]
505    pub fn all_metrics(&self) -> Vec<&Metric> {
506        let mut all = Vec::new();
507        all.extend(&self.execution_times);
508        all.extend(&self.memory_usage);
509        all.extend(&self.throughput);
510        all.extend(&self.stage_durations);
511        all.extend(&self.custom_metrics);
512        all
513    }
514}
515
516/// Performance analysis results
517#[derive(Debug, Clone)]
518pub struct PerformanceAnalysis {
519    /// Average execution time
520    pub avg_execution_time: f64,
521    /// Execution time percentiles
522    pub execution_time_percentiles: HashMap<u8, f64>,
523    /// Average memory usage
524    pub avg_memory_usage: f64,
525    /// Peak memory usage
526    pub peak_memory_usage: f64,
527    /// Average throughput
528    pub avg_throughput: f64,
529    /// Stage performance breakdown
530    pub stage_breakdown: HashMap<String, StagePerformance>,
531    /// Performance trends
532    pub trends: PerformanceTrends,
533}
534
535impl PerformanceAnalysis {
536    /// Create performance analysis from metrics
537    fn from_metrics(metrics: MetricsSnapshot) -> Self {
538        let mut execution_times: Vec<f64> =
539            metrics.execution_times.iter().map(|m| m.value).collect();
540        execution_times.sort_by(|a, b| a.partial_cmp(b).unwrap());
541
542        let avg_execution_time = if execution_times.is_empty() {
543            0.0
544        } else {
545            execution_times.iter().sum::<f64>() / execution_times.len() as f64
546        };
547
548        let execution_time_percentiles = Self::calculate_percentiles(&execution_times);
549
550        let memory_values: Vec<f64> = metrics.memory_usage.iter().map(|m| m.value).collect();
551        let avg_memory_usage = if memory_values.is_empty() {
552            0.0
553        } else {
554            memory_values.iter().sum::<f64>() / memory_values.len() as f64
555        };
556        let peak_memory_usage = memory_values.iter().fold(0.0f64, |acc, &x| acc.max(x));
557
558        let throughput_values: Vec<f64> = metrics.throughput.iter().map(|m| m.value).collect();
559        let avg_throughput = if throughput_values.is_empty() {
560            0.0
561        } else {
562            throughput_values.iter().sum::<f64>() / throughput_values.len() as f64
563        };
564
565        let stage_breakdown = Self::analyze_stages(&metrics.stage_durations);
566        let trends = Self::analyze_trends(&metrics);
567
568        Self {
569            avg_execution_time,
570            execution_time_percentiles,
571            avg_memory_usage,
572            peak_memory_usage,
573            avg_throughput,
574            stage_breakdown,
575            trends,
576        }
577    }
578
579    /// Calculate percentiles for a sorted vector
580    fn calculate_percentiles(sorted_values: &[f64]) -> HashMap<u8, f64> {
581        let mut percentiles = HashMap::new();
582
583        if sorted_values.is_empty() {
584            return percentiles;
585        }
586
587        for p in &[50, 75, 90, 95, 99] {
588            let index =
589                ((f64::from(*p) / 100.0) * (sorted_values.len() - 1) as f64).round() as usize;
590            let index = index.min(sorted_values.len() - 1);
591            percentiles.insert(*p, sorted_values[index]);
592        }
593
594        percentiles
595    }
596
597    /// Analyze stage performance
598    fn analyze_stages(stage_metrics: &[Metric]) -> HashMap<String, StagePerformance> {
599        let mut stage_map: HashMap<String, Vec<f64>> = HashMap::new();
600
601        for metric in stage_metrics {
602            if let Some(stage_name) = &metric.stage_name {
603                stage_map
604                    .entry(stage_name.clone())
605                    .or_default()
606                    .push(metric.value);
607            }
608        }
609
610        let mut stage_breakdown = HashMap::new();
611        for (stage_name, times) in stage_map {
612            let avg_time = times.iter().sum::<f64>() / times.len() as f64;
613            let min_time = times.iter().fold(f64::INFINITY, |acc, &x| acc.min(x));
614            let max_time = times.iter().fold(0.0f64, |acc, &x| acc.max(x));
615
616            stage_breakdown.insert(
617                stage_name,
618                StagePerformance {
619                    avg_duration: avg_time,
620                    min_duration: min_time,
621                    max_duration: max_time,
622                    execution_count: times.len(),
623                },
624            );
625        }
626
627        stage_breakdown
628    }
629
630    /// Analyze performance trends
631    fn analyze_trends(metrics: &MetricsSnapshot) -> PerformanceTrends {
632        // Simple trend analysis - in practice this would be more sophisticated
633        let recent_count = 10;
634
635        let recent_execution_times: Vec<f64> = metrics
636            .execution_times
637            .iter()
638            .rev()
639            .take(recent_count)
640            .map(|m| m.value)
641            .collect();
642
643        let execution_time_trend = if recent_execution_times.len() >= 2 {
644            let first_half = &recent_execution_times[recent_execution_times.len() / 2..];
645            let second_half = &recent_execution_times[..recent_execution_times.len() / 2];
646
647            let first_avg = first_half.iter().sum::<f64>() / first_half.len() as f64;
648            let second_avg = second_half.iter().sum::<f64>() / second_half.len() as f64;
649
650            if second_avg > first_avg * 1.1 {
651                Trend::Increasing
652            } else if second_avg < first_avg * 0.9 {
653                Trend::Decreasing
654            } else {
655                Trend::Stable
656            }
657        } else {
658            Trend::Unknown
659        };
660
661        PerformanceTrends {
662            execution_time_trend,
663            memory_usage_trend: Trend::Unknown, // Would implement similar logic
664            throughput_trend: Trend::Unknown,
665        }
666    }
667}
668
669/// Stage performance metrics
670#[derive(Debug, Clone)]
671pub struct StagePerformance {
672    /// Average duration
673    pub avg_duration: f64,
674    /// Minimum duration
675    pub min_duration: f64,
676    /// Maximum duration
677    pub max_duration: f64,
678    /// Number of executions
679    pub execution_count: usize,
680}
681
682/// Performance trends
683#[derive(Debug, Clone)]
684pub struct PerformanceTrends {
685    /// Execution time trend
686    pub execution_time_trend: Trend,
687    /// Memory usage trend
688    pub memory_usage_trend: Trend,
689    /// Throughput trend
690    pub throughput_trend: Trend,
691}
692
693/// Trend direction
694#[derive(Debug, Clone, PartialEq)]
695pub enum Trend {
696    /// Metric is increasing
697    Increasing,
698    /// Metric is decreasing
699    Decreasing,
700    /// Metric is stable
701    Stable,
702    /// Trend is unknown
703    Unknown,
704}
705
706/// Performance baseline for anomaly detection
707#[derive(Debug, Clone)]
708pub struct PerformanceBaseline {
709    /// Average execution time
710    pub avg_execution_time: f64,
711    /// Standard deviation of execution time
712    pub std_dev_execution_time: f64,
713    /// Average memory usage
714    pub avg_memory_usage: f64,
715    /// Average throughput
716    pub avg_throughput: f64,
717}
718
719/// Detected anomaly
720#[derive(Debug, Clone)]
721pub struct Anomaly {
722    /// Type of anomaly
723    pub anomaly_type: AnomalyType,
724    /// Severity level
725    pub severity: AnomalySeverity,
726    /// Timestamp when detected
727    pub timestamp: u64,
728    /// Human-readable description
729    pub description: String,
730    /// Pipeline name
731    pub pipeline_name: String,
732    /// Metric name
733    pub metric_name: String,
734}
735
736/// Types of anomalies
737#[derive(Debug, Clone, PartialEq)]
738pub enum AnomalyType {
739    /// Execution time is unusually slow
740    SlowExecution,
741    /// Memory usage is unusually high
742    HighMemoryUsage,
743    /// Throughput is unusually low
744    LowThroughput,
745    /// Error rate is unusually high
746    HighErrorRate,
747    /// Resource contention detected
748    ResourceContention,
749}
750
751/// Anomaly severity levels
752#[derive(Debug, Clone, PartialEq)]
753pub enum AnomalySeverity {
754    /// Low severity - informational
755    Low,
756    /// Medium severity - worth investigating
757    Medium,
758    /// High severity - requires immediate attention
759    High,
760    /// Critical severity - system may be failing
761    Critical,
762}
763
764#[allow(non_snake_case)]
765#[cfg(test)]
766mod tests {
767    use super::*;
768
769    #[test]
770    fn test_monitor_config() {
771        let config = MonitorConfig::new()
772            .max_metrics(5000)
773            .memory_threshold_mb(512.0)
774            .enable_profiling(true);
775
776        assert_eq!(config.max_metrics, 5000);
777        assert_eq!(config.memory_threshold_mb, 512.0);
778        assert!(config.enable_profiling);
779    }
780
781    #[test]
782    fn test_pipeline_monitor() {
783        let config = MonitorConfig::new();
784        let monitor = PipelineMonitor::new(config);
785
786        let mut handle = monitor.start_execution("test-exec-1", "test-pipeline");
787        handle.start_stage("preprocessing");
788        handle.end_stage("preprocessing");
789        handle.record_memory_usage(256.0);
790        handle.finish();
791
792        let snapshot = monitor.get_metrics_snapshot();
793        assert!(!snapshot.stage_durations.is_empty());
794        assert!(!snapshot.memory_usage.is_empty());
795    }
796
797    #[test]
798    fn test_execution_handle() {
799        let metrics = Arc::new(Mutex::new(MetricsStorage::new()));
800        let mut handle = ExecutionHandle::new("test-id".to_string(), metrics.clone());
801
802        handle.start_stage("test-stage");
803        std::thread::sleep(Duration::from_millis(10));
804        handle.end_stage("test-stage");
805
806        let metrics_lock = metrics.lock().unwrap();
807        assert!(!metrics_lock.metrics.is_empty());
808    }
809
810    #[test]
811    fn test_metrics_snapshot() {
812        let mut snapshot = MetricsSnapshot::empty();
813        let metric = Metric {
814            name: "test_metric".to_string(),
815            value: 42.0,
816            timestamp: 123456789,
817            pipeline_name: "test-pipeline".to_string(),
818            stage_name: None,
819            execution_id: None,
820            metadata: HashMap::new(),
821        };
822
823        snapshot.custom_metrics.push(metric);
824        assert_eq!(snapshot.all_metrics().len(), 1);
825    }
826
827    #[test]
828    fn test_performance_analysis() {
829        let mut snapshot = MetricsSnapshot::empty();
830
831        // Add some execution time metrics
832        for i in 1..=5 {
833            snapshot.execution_times.push(Metric {
834                name: "execution_time".to_string(),
835                value: i as f64,
836                timestamp: 123456789,
837                pipeline_name: "test".to_string(),
838                stage_name: None,
839                execution_id: None,
840                metadata: HashMap::new(),
841            });
842        }
843
844        let analysis = PerformanceAnalysis::from_metrics(snapshot);
845        assert_eq!(analysis.avg_execution_time, 3.0);
846        assert!(analysis.execution_time_percentiles.contains_key(&50));
847    }
848
849    #[test]
850    fn test_anomaly_detection() {
851        let config = MonitorConfig::new().memory_threshold_mb(100.0);
852        let mut monitor = PipelineMonitor::new(config);
853
854        let baseline = PerformanceBaseline {
855            avg_execution_time: 1.0,
856            std_dev_execution_time: 0.1,
857            avg_memory_usage: 50.0,
858            avg_throughput: 1000.0,
859        };
860        monitor.set_baseline("test-pipeline", baseline);
861
862        // Record a metric that should trigger anomaly
863        monitor.record_metric(Metric {
864            name: "memory_usage_mb".to_string(),
865            value: 200.0, // Above threshold
866            timestamp: 123456789,
867            pipeline_name: "test-pipeline".to_string(),
868            stage_name: None,
869            execution_id: None,
870            metadata: HashMap::new(),
871        });
872
873        let anomalies = monitor.detect_anomalies("test-pipeline");
874        assert!(!anomalies.is_empty());
875        assert_eq!(anomalies[0].anomaly_type, AnomalyType::HighMemoryUsage);
876    }
877}