Skip to main content

scirs2_transform/
monitoring.rs

1//! Production monitoring with drift detection and model degradation alerts
2//!
3//! This module provides comprehensive monitoring capabilities for transformation
4//! pipelines in production environments, including data drift detection,
5//! performance monitoring, and automated alerting.
6
7use crate::error::{Result, TransformError};
8use scirs2_core::ndarray::{Array2, ArrayView1, ArrayView2};
9use scirs2_core::validation::check_not_empty;
10use std::collections::{HashMap, VecDeque};
11use std::time::{Duration, SystemTime, UNIX_EPOCH};
12
13#[cfg(feature = "monitoring")]
14use prometheus::{Counter, Gauge, Histogram, HistogramOpts, Registry};
15
16/// Drift detection methods
17#[derive(Debug, Clone, PartialEq)]
18pub enum DriftMethod {
19    /// Kolmogorov-Smirnov test for continuous features
20    KolmogorovSmirnov,
21    /// Chi-square test for categorical features
22    ChiSquare,
23    /// Population Stability Index (PSI)
24    PopulationStabilityIndex,
25    /// Maximum Mean Discrepancy (MMD)
26    MaximumMeanDiscrepancy,
27    /// Wasserstein distance
28    WassersteinDistance,
29}
30
31/// Data drift detection result
32#[derive(Debug, Clone)]
33pub struct DriftDetectionResult {
34    /// Feature name or index
35    pub feature_name: String,
36    /// Drift detection method used
37    pub method: DriftMethod,
38    /// Test statistic value
39    pub statistic: f64,
40    /// P-value (if applicable)
41    pub p_value: Option<f64>,
42    /// Whether drift is detected
43    pub is_drift_detected: bool,
44    /// Severity level (0.0 = no drift, 1.0 = severe drift)
45    pub severity: f64,
46    /// Timestamp of detection
47    pub timestamp: u64,
48}
49
50/// Performance degradation metrics
51#[derive(Debug, Clone)]
52pub struct PerformanceMetrics {
53    /// Processing time in milliseconds
54    pub processing_time_ms: f64,
55    /// Memory usage in MB
56    pub memory_usage_mb: f64,
57    /// Error rate (0.0 to 1.0)
58    pub error_rate: f64,
59    /// Throughput (samples per second)
60    pub throughput: f64,
61    /// Data quality score (0.0 to 1.0)
62    pub data_quality_score: f64,
63    /// Timestamp
64    pub timestamp: u64,
65}
66
67/// Alert configuration
68#[derive(Debug, Clone)]
69pub struct AlertConfig {
70    /// Drift detection threshold
71    pub drift_threshold: f64,
72    /// Performance degradation threshold
73    pub performance_threshold: f64,
74    /// Error rate threshold
75    pub error_rate_threshold: f64,
76    /// Memory usage threshold in MB
77    pub memory_threshold_mb: f64,
78    /// Alert cooldown period in seconds
79    pub cooldown_seconds: u64,
80}
81
82impl Default for AlertConfig {
83    fn default() -> Self {
84        AlertConfig {
85            drift_threshold: 0.05,
86            performance_threshold: 2.0,  // 2x baseline
87            error_rate_threshold: 0.05,  // 5%
88            memory_threshold_mb: 1000.0, // 1GB
89            cooldown_seconds: 300,       // 5 minutes
90        }
91    }
92}
93
94/// Alert types
95#[derive(Debug, Clone)]
96pub enum AlertType {
97    /// Statistical drift detected in feature distribution
98    DataDrift {
99        /// Name of the drifting feature
100        feature: String,
101        /// Severity score of the drift (0.0 to 1.0)
102        severity: f64,
103    },
104    /// Performance degradation detected in metrics
105    PerformanceDegradation {
106        /// Name of the degraded metric
107        metric: String,
108        /// Current degraded value
109        value: f64,
110    },
111    /// Error rate exceeds acceptable threshold
112    HighErrorRate {
113        /// Current error rate (0.0 to 1.0)
114        rate: f64,
115    },
116    /// Memory usage approaching limits
117    MemoryExhaustion {
118        /// Current memory usage in megabytes
119        usage_mb: f64,
120    },
121    /// Data quality below acceptable standards
122    DataQualityIssue {
123        /// Quality score (0.0 to 1.0, lower is worse)
124        score: f64,
125    },
126}
127
128/// Production monitoring system
129pub struct TransformationMonitor {
130    /// Reference data for drift detection
131    reference_data: Option<Array2<f64>>,
132    /// Feature names
133    feature_names: Vec<String>,
134    /// Drift detection methods per feature
135    drift_methods: HashMap<String, DriftMethod>,
136    /// Historical performance metrics
137    performance_history: VecDeque<PerformanceMetrics>,
138    /// Historical drift results
139    drift_history: VecDeque<DriftDetectionResult>,
140    /// Alert configuration
141    alert_config: AlertConfig,
142    /// Last alert timestamps (for cooldown)
143    last_alert_times: HashMap<String, u64>,
144    /// Baseline performance metrics
145    baseline_metrics: Option<PerformanceMetrics>,
146    /// Prometheus metrics registry
147    #[cfg(feature = "monitoring")]
148    metrics_registry: Registry,
149    /// Prometheus counters and gauges
150    #[cfg(feature = "monitoring")]
151    prometheus_metrics: PrometheusMetrics,
152}
153
154#[cfg(feature = "monitoring")]
155struct PrometheusMetrics {
156    drift_detections: Counter,
157    processing_time: Histogram,
158    memory_usage: Gauge,
159    error_rate: Gauge,
160    throughput: Gauge,
161    data_quality: Gauge,
162}
163
164impl TransformationMonitor {
165    /// Create a new transformation monitor
166    pub fn new() -> Result<Self> {
167        #[cfg(feature = "monitoring")]
168        let metrics_registry = Registry::new();
169
170        #[cfg(feature = "monitoring")]
171        let prometheus_metrics = PrometheusMetrics {
172            drift_detections: Counter::new(
173                "transform_drift_detections_total",
174                "Total number of drift detections",
175            )
176            .map_err(|e| {
177                TransformError::ComputationError(format!("Failed to create counter: {}", e))
178            })?,
179            processing_time: Histogram::with_opts(HistogramOpts::new(
180                "transform_processing_time_seconds",
181                "Processing time in seconds",
182            ))
183            .map_err(|e| {
184                TransformError::ComputationError(format!("Failed to create histogram: {}", e))
185            })?,
186            memory_usage: Gauge::new("transform_memory_usage_mb", "Memory usage in MB").map_err(
187                |e| TransformError::ComputationError(format!("Failed to create gauge: {}", e)),
188            )?,
189            error_rate: Gauge::new("transform_error_rate", "Error rate").map_err(|e| {
190                TransformError::ComputationError(format!("Failed to create gauge: {}", e))
191            })?,
192            throughput: Gauge::new(
193                "transform_throughput_samples_per_second",
194                "Throughput in samples per second",
195            )
196            .map_err(|e| {
197                TransformError::ComputationError(format!("Failed to create gauge: {}", e))
198            })?,
199            data_quality: Gauge::new("transform_data_quality_score", "Data quality score")
200                .map_err(|e| {
201                    TransformError::ComputationError(format!("Failed to create gauge: {}", e))
202                })?,
203        };
204
205        #[cfg(feature = "monitoring")]
206        {
207            metrics_registry
208                .register(Box::new(prometheus_metrics.drift_detections.clone()))
209                .map_err(|e| {
210                    TransformError::ComputationError(format!("Failed to register counter: {}", e))
211                })?;
212            metrics_registry
213                .register(Box::new(prometheus_metrics.processing_time.clone()))
214                .map_err(|e| {
215                    TransformError::ComputationError(format!("Failed to register histogram: {}", e))
216                })?;
217            metrics_registry
218                .register(Box::new(prometheus_metrics.memory_usage.clone()))
219                .map_err(|e| {
220                    TransformError::ComputationError(format!("Failed to register gauge: {}", e))
221                })?;
222            metrics_registry
223                .register(Box::new(prometheus_metrics.error_rate.clone()))
224                .map_err(|e| {
225                    TransformError::ComputationError(format!("Failed to register gauge: {}", e))
226                })?;
227            metrics_registry
228                .register(Box::new(prometheus_metrics.throughput.clone()))
229                .map_err(|e| {
230                    TransformError::ComputationError(format!("Failed to register gauge: {}", e))
231                })?;
232            metrics_registry
233                .register(Box::new(prometheus_metrics.data_quality.clone()))
234                .map_err(|e| {
235                    TransformError::ComputationError(format!("Failed to register gauge: {}", e))
236                })?;
237        }
238
239        Ok(TransformationMonitor {
240            reference_data: None,
241            feature_names: Vec::new(),
242            drift_methods: HashMap::new(),
243            performance_history: VecDeque::with_capacity(1000),
244            drift_history: VecDeque::with_capacity(1000),
245            alert_config: AlertConfig::default(),
246            last_alert_times: HashMap::new(),
247            baseline_metrics: None,
248            #[cfg(feature = "monitoring")]
249            metrics_registry,
250            #[cfg(feature = "monitoring")]
251            prometheus_metrics,
252        })
253    }
254
255    /// Set reference data for drift detection
256    pub fn set_reference_data(
257        &mut self,
258        data: Array2<f64>,
259        feature_names: Option<Vec<String>>,
260    ) -> Result<()> {
261        self.reference_data = Some(data.clone());
262
263        if let Some(names) = feature_names {
264            if names.len() != data.ncols() {
265                return Err(TransformError::InvalidInput(
266                    "Number of feature names must match number of columns".to_string(),
267                ));
268            }
269            self.feature_names = names;
270        } else {
271            self.feature_names = (0..data.ncols())
272                .map(|i| format!("feature_{}", i))
273                .collect();
274        }
275
276        // Set default drift detection methods
277        for feature_name in &self.feature_names {
278            self.drift_methods
279                .insert(feature_name.clone(), DriftMethod::KolmogorovSmirnov);
280        }
281
282        Ok(())
283    }
284
285    /// Configure drift detection method for a specific feature
286    pub fn set_drift_method(&mut self, featurename: &str, method: DriftMethod) -> Result<()> {
287        if !self.feature_names.contains(&featurename.to_string()) {
288            return Err(TransformError::InvalidInput(format!(
289                "Unknown feature name: {}",
290                featurename
291            )));
292        }
293
294        self.drift_methods.insert(featurename.to_string(), method);
295        Ok(())
296    }
297
298    /// Set alert configuration
299    pub fn set_alert_config(&mut self, config: AlertConfig) {
300        self.alert_config = config;
301    }
302
303    /// Set baseline performance metrics
304    pub fn set_baseline_metrics(&mut self, metrics: PerformanceMetrics) {
305        self.baseline_metrics = Some(metrics);
306    }
307
308    /// Detect data drift in new data
309    pub fn detect_drift(
310        &mut self,
311        new_data: &ArrayView2<f64>,
312    ) -> Result<Vec<DriftDetectionResult>> {
313        let reference_data = self
314            .reference_data
315            .as_ref()
316            .ok_or_else(|| TransformError::InvalidInput("Reference data not set".to_string()))?;
317
318        if new_data.ncols() != reference_data.ncols() {
319            return Err(TransformError::InvalidInput(
320                "New data must have same number of features as reference data".to_string(),
321            ));
322        }
323
324        let mut results = Vec::new();
325        let timestamp = current_timestamp();
326
327        for (i, feature_name) in self.feature_names.iter().enumerate() {
328            let method = self
329                .drift_methods
330                .get(feature_name)
331                .unwrap_or(&DriftMethod::KolmogorovSmirnov);
332
333            let reference_feature = reference_data.column(i);
334            let new_feature = new_data.column(i);
335
336            let result = self.detect_feature_drift(
337                &reference_feature,
338                &new_feature,
339                feature_name,
340                method,
341                timestamp,
342            )?;
343
344            results.push(result.clone());
345            self.drift_history.push_back(result);
346
347            // Keep only recent history
348            if self.drift_history.len() > 1000 {
349                self.drift_history.pop_front();
350            }
351        }
352
353        // Update Prometheus metrics
354        #[cfg(feature = "monitoring")]
355        {
356            let drift_count = results.iter().filter(|r| r.is_drift_detected).count();
357            self.prometheus_metrics
358                .drift_detections
359                .inc_by(drift_count as f64);
360        }
361
362        Ok(results)
363    }
364
365    /// Record performance metrics
366    pub fn record_metrics(&mut self, metrics: PerformanceMetrics) -> Result<Vec<AlertType>> {
367        self.performance_history.push_back(metrics.clone());
368
369        // Keep only recent history
370        if self.performance_history.len() > 1000 {
371            self.performance_history.pop_front();
372        }
373
374        // Update Prometheus metrics
375        #[cfg(feature = "monitoring")]
376        {
377            self.prometheus_metrics
378                .processing_time
379                .observe(metrics.processing_time_ms / 1000.0);
380            self.prometheus_metrics
381                .memory_usage
382                .set(metrics.memory_usage_mb);
383            self.prometheus_metrics.error_rate.set(metrics.error_rate);
384            self.prometheus_metrics.throughput.set(metrics.throughput);
385            self.prometheus_metrics
386                .data_quality
387                .set(metrics.data_quality_score);
388        }
389
390        // Check for alerts
391        self.check_performance_alerts(&metrics)
392    }
393
394    /// Get drift detection summary
395    pub fn get_drift_summary(&self, lookbackhours: u64) -> Result<HashMap<String, f64>> {
396        let cutoff_time = current_timestamp() - (lookbackhours * 3600);
397        let mut summary = HashMap::new();
398
399        for feature_name in &self.feature_names {
400            let recent_detections: Vec<_> = self
401                .drift_history
402                .iter()
403                .filter(|r| r.timestamp >= cutoff_time && r.feature_name == *feature_name)
404                .collect();
405
406            let drift_rate = if recent_detections.is_empty() {
407                0.0
408            } else {
409                recent_detections
410                    .iter()
411                    .filter(|r| r.is_drift_detected)
412                    .count() as f64
413                    / recent_detections.len() as f64
414            };
415
416            summary.insert(feature_name.clone(), drift_rate);
417        }
418
419        Ok(summary)
420    }
421
422    /// Get performance trends
423    pub fn get_performance_trends(&self, lookbackhours: u64) -> Result<HashMap<String, f64>> {
424        let cutoff_time = current_timestamp() - (lookbackhours * 3600);
425        let recent_metrics: Vec<_> = self
426            .performance_history
427            .iter()
428            .filter(|m| m.timestamp >= cutoff_time)
429            .collect();
430
431        if recent_metrics.is_empty() {
432            return Ok(HashMap::new());
433        }
434
435        let mut trends = HashMap::new();
436
437        // Calculate trends (change from first to last measurement)
438        if recent_metrics.len() >= 2 {
439            let first = recent_metrics.first().expect("Operation failed");
440            let last = recent_metrics.last().expect("Operation failed");
441
442            trends.insert(
443                "processing_time_trend".to_string(),
444                (last.processing_time_ms - first.processing_time_ms) / first.processing_time_ms,
445            );
446            trends.insert(
447                "memory_usage_trend".to_string(),
448                (last.memory_usage_mb - first.memory_usage_mb) / first.memory_usage_mb,
449            );
450            trends.insert(
451                "error_rate_trend".to_string(),
452                last.error_rate - first.error_rate,
453            );
454            trends.insert(
455                "throughput_trend".to_string(),
456                (last.throughput - first.throughput) / first.throughput,
457            );
458        }
459
460        Ok(trends)
461    }
462
463    fn detect_feature_drift(
464        &self,
465        reference: &ArrayView1<f64>,
466        new_data: &ArrayView1<f64>,
467        feature_name: &str,
468        method: &DriftMethod,
469        timestamp: u64,
470    ) -> Result<DriftDetectionResult> {
471        check_not_empty(reference, "reference")?;
472        check_not_empty(new_data, "new_data")?;
473
474        // Check finite values in reference
475        for &val in reference.iter() {
476            if !val.is_finite() {
477                return Err(crate::error::TransformError::DataValidationError(
478                    "Reference data contains non-finite values".to_string(),
479                ));
480            }
481        }
482
483        // Check finite values in new_data
484        for &val in new_data.iter() {
485            if !val.is_finite() {
486                return Err(crate::error::TransformError::DataValidationError(
487                    "New data contains non-finite values".to_string(),
488                ));
489            }
490        }
491
492        let (statistic, p_value, is_drift) = match method {
493            DriftMethod::KolmogorovSmirnov => {
494                let (stat, p_val) = self.kolmogorov_smirnov_test(reference, new_data)?;
495                (stat, Some(p_val), p_val < self.alert_config.drift_threshold)
496            }
497            DriftMethod::ChiSquare => {
498                let (stat, p_val) = self.chi_square_test(reference, new_data)?;
499                (stat, Some(p_val), p_val < self.alert_config.drift_threshold)
500            }
501            DriftMethod::PopulationStabilityIndex => {
502                let psi = self.population_stability_index(reference, new_data)?;
503                (psi, None, psi > 0.1) // PSI > 0.1 indicates drift
504            }
505            DriftMethod::MaximumMeanDiscrepancy => {
506                let mmd = self.maximum_mean_discrepancy(reference, new_data)?;
507                (mmd, None, mmd > self.alert_config.drift_threshold)
508            }
509            DriftMethod::WassersteinDistance => {
510                let distance = self.wasserstein_distance(reference, new_data)?;
511                (distance, None, distance > self.alert_config.drift_threshold)
512            }
513        };
514
515        let severity = if let Some(p_val) = p_value {
516            1.0 - p_val // Lower p-value = higher severity
517        } else {
518            statistic.min(1.0) // Cap at 1.0
519        };
520
521        Ok(DriftDetectionResult {
522            feature_name: feature_name.to_string(),
523            method: method.clone(),
524            statistic,
525            p_value,
526            is_drift_detected: is_drift,
527            severity,
528            timestamp,
529        })
530    }
531
532    fn kolmogorov_smirnov_test(
533        &self,
534        x: &ArrayView1<f64>,
535        y: &ArrayView1<f64>,
536    ) -> Result<(f64, f64)> {
537        let mut x_sorted = x.to_vec();
538        let mut y_sorted = y.to_vec();
539        x_sorted.sort_by(|a, b| a.partial_cmp(b).expect("Operation failed"));
540        y_sorted.sort_by(|a, b| a.partial_cmp(b).expect("Operation failed"));
541
542        let n1 = x_sorted.len() as f64;
543        let n2 = y_sorted.len() as f64;
544
545        // Create combined sorted array for precise CDF calculation
546        let mut combined: Vec<(f64, i32)> = Vec::new();
547        for val in &x_sorted {
548            combined.push((*val, 1)); // Mark as from first sample
549        }
550        for val in &y_sorted {
551            combined.push((*val, 2)); // Mark as from second sample
552        }
553        combined.sort_by(|a, b| a.0.partial_cmp(&b.0).expect("Operation failed"));
554
555        let mut cdf1 = 0.0;
556        let mut cdf2 = 0.0;
557        let mut max_diff: f64 = 0.0;
558
559        for (_, sample_id) in combined {
560            if sample_id == 1 {
561                cdf1 += 1.0 / n1;
562            } else {
563                cdf2 += 1.0 / n2;
564            }
565            max_diff = max_diff.max((cdf1 - cdf2).abs());
566        }
567
568        let statistic = max_diff;
569
570        // More accurate p-value calculation using the asymptotic distribution
571        let effective_n = (n1 * n2) / (n1 + n2);
572        let lambda = statistic * effective_n.sqrt();
573
574        // Kolmogorov distribution approximation for p-value
575        let p_value = if lambda < 0.27 {
576            1.0
577        } else if lambda < 1.0 {
578            2.0 * (-2.0 * lambda * lambda).exp()
579        } else {
580            // Series expansion for large lambda
581            let mut sum = 0.0;
582            for k in 1..=10 {
583                let k_f = k as f64;
584                sum += (-1.0_f64).powi(k - 1) * (-2.0 * k_f * k_f * lambda * lambda).exp();
585            }
586            2.0 * sum
587        };
588
589        Ok((statistic, p_value.clamp(0.0, 1.0)))
590    }
591
592    fn population_stability_index(
593        &self,
594        reference: &ArrayView1<f64>,
595        new_data: &ArrayView1<f64>,
596    ) -> Result<f64> {
597        // Create bins based on reference _data
598        let mut ref_sorted = reference.to_vec();
599        ref_sorted.sort_by(|a, b| a.partial_cmp(b).expect("Operation failed"));
600
601        let n_bins = 10;
602        let mut bins = Vec::new();
603        for i in 0..=n_bins {
604            let percentile = (i as f64) / (n_bins as f64);
605            let index = ((ref_sorted.len() - 1) as f64 * percentile) as usize;
606            bins.push(ref_sorted[index]);
607        }
608
609        // Calculate frequencies
610        let ref_freq = self.calculate_bin_frequencies(reference, &bins);
611        let new_freq = self.calculate_bin_frequencies(new_data, &bins);
612
613        // Calculate PSI
614        let mut psi = 0.0;
615        for i in 0..n_bins {
616            let ref_pct = ref_freq[i];
617            let new_pct = new_freq[i];
618
619            if ref_pct > 0.0 && new_pct > 0.0 {
620                psi += (new_pct - ref_pct) * (new_pct / ref_pct).ln();
621            }
622        }
623
624        Ok(psi)
625    }
626
627    fn calculate_bin_frequencies(&self, data: &ArrayView1<f64>, bins: &[f64]) -> Vec<f64> {
628        if bins.len() < 2 {
629            return vec![];
630        }
631
632        let mut frequencies = vec![0; bins.len() - 1];
633
634        for &value in data.iter() {
635            if !value.is_finite() {
636                continue;
637            }
638
639            // Find appropriate bin for this value
640            let mut placed = false;
641            for i in 0..bins.len() - 1 {
642                if i == bins.len() - 2 {
643                    // Last bin includes upper bound
644                    if value >= bins[i] && value <= bins[i + 1] {
645                        frequencies[i] += 1;
646                        placed = true;
647                        break;
648                    }
649                } else if value >= bins[i] && value < bins[i + 1] {
650                    frequencies[i] += 1;
651                    placed = true;
652                    break;
653                }
654            }
655
656            // Handle values outside the range
657            if !placed {
658                if value < bins[0] {
659                    frequencies[0] += 1;
660                } else if value > bins[bins.len() - 1] {
661                    let last_idx = frequencies.len() - 1;
662                    frequencies[last_idx] += 1;
663                }
664            }
665        }
666
667        let total = data.iter().filter(|&&v| v.is_finite()).count() as f64;
668        if total == 0.0 {
669            vec![0.0; frequencies.len()]
670        } else {
671            frequencies.iter().map(|&f| f as f64 / total).collect()
672        }
673    }
674
675    fn wasserstein_distance(&self, x: &ArrayView1<f64>, y: &ArrayView1<f64>) -> Result<f64> {
676        // Simplified 1D Wasserstein distance (Earth Mover's Distance)
677        let mut x_sorted: Vec<f64> = x.iter().filter(|&&v| v.is_finite()).copied().collect();
678        let mut y_sorted: Vec<f64> = y.iter().filter(|&&v| v.is_finite()).copied().collect();
679
680        if x_sorted.is_empty() || y_sorted.is_empty() {
681            return Ok(0.0);
682        }
683
684        x_sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
685        y_sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
686
687        let n1 = x_sorted.len();
688        let n2 = y_sorted.len();
689        let max_len = n1.max(n2);
690
691        let mut distance = 0.0;
692        for i in 0..max_len {
693            let x_val = if i < n1 {
694                x_sorted[i]
695            } else {
696                x_sorted[n1 - 1]
697            };
698            let y_val = if i < n2 {
699                y_sorted[i]
700            } else {
701                y_sorted[n2 - 1]
702            };
703            distance += (x_val - y_val).abs();
704        }
705
706        Ok(distance / max_len as f64)
707    }
708
709    /// Chi-square test for categorical data drift detection
710    fn chi_square_test(
711        &self,
712        reference: &ArrayView1<f64>,
713        new_data: &ArrayView1<f64>,
714    ) -> Result<(f64, f64)> {
715        // For continuous data, we'll bin it first and then apply chi-square test
716        let n_bins = 10;
717
718        // Combine _data to determine common bins
719        let mut combined_data: Vec<f64> = reference
720            .iter()
721            .chain(new_data.iter())
722            .filter(|&&v| v.is_finite())
723            .copied()
724            .collect();
725
726        if combined_data.len() < n_bins {
727            return Ok((0.0, 1.0)); // Not enough _data for meaningful test
728        }
729
730        combined_data.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
731
732        // Create bins based on quantiles
733        let mut bins = Vec::new();
734        for i in 0..=n_bins {
735            let percentile = i as f64 / n_bins as f64;
736            let index = ((combined_data.len() - 1) as f64 * percentile) as usize;
737            bins.push(combined_data[index]);
738        }
739
740        // Remove duplicate bin edges
741        bins.dedup_by(|a, b| (*a - *b).abs() < f64::EPSILON);
742
743        if bins.len() < 2 {
744            return Ok((0.0, 1.0));
745        }
746
747        let ref_freq = self.calculate_bin_frequencies(reference, &bins);
748        let new_freq = self.calculate_bin_frequencies(new_data, &bins);
749
750        let ref_total = reference.iter().filter(|&&v| v.is_finite()).count() as f64;
751        let new_total = new_data.iter().filter(|&&v| v.is_finite()).count() as f64;
752
753        if ref_total == 0.0 || new_total == 0.0 {
754            return Ok((0.0, 1.0));
755        }
756
757        // Calculate chi-square statistic
758        let mut chi_square = 0.0;
759        let mut degrees_of_freedom = 0;
760
761        for i in 0..ref_freq.len() {
762            let observed_ref = ref_freq[i] * ref_total;
763            let observed_new = new_freq[i] * new_total;
764
765            // Calculate expected frequencies under null hypothesis
766            let total_in_bin = observed_ref + observed_new;
767            let expected_ref_null = total_in_bin * ref_total / (ref_total + new_total);
768            let expected_new_null = total_in_bin * new_total / (ref_total + new_total);
769
770            if expected_ref_null > 5.0 && expected_new_null > 5.0 {
771                chi_square += (observed_ref - expected_ref_null).powi(2) / expected_ref_null;
772                chi_square += (observed_new - expected_new_null).powi(2) / expected_new_null;
773                degrees_of_freedom += 1;
774            }
775        }
776
777        // Approximate p-value using chi-square distribution
778        let p_value = if degrees_of_freedom > 0 {
779            self.chi_square_cdf_complement(chi_square, degrees_of_freedom as f64)
780        } else {
781            1.0
782        };
783
784        Ok((chi_square, p_value))
785    }
786
787    /// Maximum Mean Discrepancy (MMD) test for distribution comparison
788    fn maximum_mean_discrepancy(&self, x: &ArrayView1<f64>, y: &ArrayView1<f64>) -> Result<f64> {
789        let x_clean: Vec<f64> = x.iter().filter(|&&v| v.is_finite()).copied().collect();
790        let y_clean: Vec<f64> = y.iter().filter(|&&v| v.is_finite()).copied().collect();
791
792        if x_clean.is_empty() || y_clean.is_empty() {
793            return Ok(0.0);
794        }
795
796        let n = x_clean.len();
797        let m = y_clean.len();
798
799        // Use RBF kernel with adaptive bandwidth
800        let all_data: Vec<f64> = x_clean.iter().chain(y_clean.iter()).copied().collect();
801        let mut sorted_data = all_data;
802        sorted_data.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
803
804        // Use median absolute deviation for bandwidth selection
805        let median = sorted_data[sorted_data.len() / 2];
806        let mad: f64 =
807            sorted_data.iter().map(|&x| (x - median).abs()).sum::<f64>() / sorted_data.len() as f64;
808        let bandwidth = mad.max(1.0); // Ensure reasonable bandwidth
809
810        // Calculate MMD^2 = E[k(X,X')] + E[k(Y,Y')] - 2*E[k(X,Y)]
811        let mut kxx = 0.0;
812        let mut kyy = 0.0;
813        let mut kxy = 0.0;
814
815        // E[k(X,X')] - sample without replacement
816        if n > 1 {
817            for i in 0..n {
818                for j in (i + 1)..n {
819                    kxx += self.rbf_kernel(x_clean[i], x_clean[j], bandwidth);
820                }
821            }
822            kxx = 2.0 * kxx / (n * (n - 1)) as f64;
823        }
824
825        // E[k(Y,Y')] - sample without replacement
826        if m > 1 {
827            for i in 0..m {
828                for j in (i + 1)..m {
829                    kyy += self.rbf_kernel(y_clean[i], y_clean[j], bandwidth);
830                }
831            }
832            kyy = 2.0 * kyy / (m * (m - 1)) as f64;
833        }
834
835        // E[k(X,Y)]
836        for i in 0..n {
837            for j in 0..m {
838                kxy += self.rbf_kernel(x_clean[i], y_clean[j], bandwidth);
839            }
840        }
841        kxy /= (n * m) as f64;
842
843        let mmd_squared = kxx + kyy - 2.0 * kxy;
844        Ok(mmd_squared.max(0.0).sqrt()) // Take square root and ensure non-negative
845    }
846
847    /// RBF (Gaussian) kernel function
848    fn rbf_kernel(&self, x: f64, y: f64, bandwidth: f64) -> f64 {
849        let diff = x - y;
850        (-diff * diff / (2.0 * bandwidth * bandwidth)).exp()
851    }
852
853    /// Complement of chi-square CDF using improved approximations
854    fn chi_square_cdf_complement(&self, x: f64, df: f64) -> f64 {
855        if x <= 0.0 {
856            return 1.0;
857        }
858        if df <= 0.0 {
859            return 0.0;
860        }
861
862        // For large df, use Wilson-Hilferty transformation (normal approximation)
863        if df >= 30.0 {
864            let h = 2.0 / (9.0 * df);
865            let z = ((x / df).powf(1.0 / 3.0) - (1.0 - h)) / h.sqrt();
866            return 0.5 * (1.0 - self.erf(z / 2.0_f64.sqrt()));
867        }
868
869        // For moderate df, use incomplete gamma function approximation
870        // P(X > x) = 1 - P(X <= x) = 1 - gamma_cdf(x/2, df/2)
871        let alpha = df / 2.0;
872        let x_half = x / 2.0;
873
874        // Use series expansion for gamma CDF
875        if x_half < alpha + 1.0 {
876            // Use series when x is relatively small compared to alpha
877            let mut term = x_half.powf(alpha) * (-x_half).exp();
878            let mut sum = term;
879
880            for k in 1..=50 {
881                term *= x_half / (alpha + k as f64);
882                sum += term;
883                if term / sum < 1e-10 {
884                    break;
885                }
886            }
887
888            let gamma_cdf = sum / self.gamma(alpha);
889            1.0 - gamma_cdf.min(1.0)
890        } else {
891            // Use continued fraction when x is large
892            let a = alpha;
893            let b = x_half + 1.0 - a;
894            let c = 1e30;
895            let mut d = 1.0 / b;
896            let mut h = d;
897
898            for i in 1..=100 {
899                let an = -i as f64 * (i as f64 - a);
900                let b = b + 2.0;
901                d = an * d + b;
902                if d.abs() < 1e-30 {
903                    d = 1e-30;
904                }
905                let mut c = b + an / c;
906                if c.abs() < 1e-30 {
907                    c = 1e-30;
908                }
909                d = 1.0 / d;
910                let del = d * c;
911                h *= del;
912                if (del - 1.0).abs() < 1e-10 {
913                    break;
914                }
915            }
916
917            let gamma_cf = (-x_half).exp() * x_half.powf(a) * h / self.gamma(a);
918            gamma_cf.clamp(0.0, 1.0)
919        }
920    }
921
922    /// Error function approximation
923    fn erf(&self, x: f64) -> f64 {
924        // Abramowitz and Stegun approximation
925        let a1 = 0.254829592;
926        let a2 = -0.284496736;
927        let a3 = 1.421413741;
928        let a4 = -1.453152027;
929        let a5 = 1.061405429;
930        let p = 0.3275911;
931
932        let sign = if x >= 0.0 { 1.0 } else { -1.0 };
933        let x = x.abs();
934
935        let t = 1.0 / (1.0 + p * x);
936        let y = 1.0 - (((((a5 * t + a4) * t) + a3) * t + a2) * t + a1) * t * (-x * x).exp();
937
938        sign * y
939    }
940
941    /// Gamma function using Lanczos approximation
942    fn gamma(&self, z: f64) -> f64 {
943        if z < 0.5 {
944            // Use reflection formula: Γ(z)Γ(1-z) = π/sin(πz)
945            std::f64::consts::PI / ((std::f64::consts::PI * z).sin() * self.gamma(1.0 - z))
946        } else {
947            // Lanczos approximation coefficients
948            let g = 7.0;
949            let c = [
950                0.99999999999980993,
951                676.5203681218851,
952                -1259.1392167224028,
953                771.32342877765313,
954                -176.61502916214059,
955                12.507343278686905,
956                -0.13857109526572012,
957                9.9843695780195716e-6,
958                1.5056327351493116e-7,
959            ];
960
961            let z = z - 1.0;
962            let mut x = c[0];
963            for i in 1..c.len() {
964                x += c[i] / (z + i as f64);
965            }
966
967            let t = z + g + 0.5;
968            (2.0 * std::f64::consts::PI).sqrt() * t.powf(z + 0.5) * (-t).exp() * x
969        }
970    }
971
972    fn check_performance_alerts(&mut self, metrics: &PerformanceMetrics) -> Result<Vec<AlertType>> {
973        let mut alerts = Vec::new();
974        let current_time = current_timestamp();
975
976        // Check if we're in cooldown period
977        let cooldown_key = "performance";
978        if let Some(&last_alert_time) = self.last_alert_times.get(cooldown_key) {
979            if current_time - last_alert_time < self.alert_config.cooldown_seconds {
980                return Ok(alerts);
981            }
982        }
983
984        // Check performance degradation
985        if let Some(ref baseline) = self.baseline_metrics {
986            let degradation_ratio = metrics.processing_time_ms / baseline.processing_time_ms;
987            if degradation_ratio > self.alert_config.performance_threshold {
988                alerts.push(AlertType::PerformanceDegradation {
989                    metric: "processing_time".to_string(),
990                    value: degradation_ratio,
991                });
992            }
993        }
994
995        // Check error rate
996        if metrics.error_rate > self.alert_config.error_rate_threshold {
997            alerts.push(AlertType::HighErrorRate {
998                rate: metrics.error_rate,
999            });
1000        }
1001
1002        // Check memory usage
1003        if metrics.memory_usage_mb > self.alert_config.memory_threshold_mb {
1004            alerts.push(AlertType::MemoryExhaustion {
1005                usage_mb: metrics.memory_usage_mb,
1006            });
1007        }
1008
1009        // Check data quality
1010        if metrics.data_quality_score < 0.8 {
1011            alerts.push(AlertType::DataQualityIssue {
1012                score: metrics.data_quality_score,
1013            });
1014        }
1015
1016        if !alerts.is_empty() {
1017            self.last_alert_times
1018                .insert(cooldown_key.to_string(), current_time);
1019        }
1020
1021        Ok(alerts)
1022    }
1023
1024    /// Export metrics in Prometheus format
1025    #[cfg(feature = "monitoring")]
1026    pub fn export_prometheus_metrics(&self) -> Result<String> {
1027        use prometheus::Encoder;
1028        let encoder = prometheus::TextEncoder::new();
1029        let metric_families = self.metrics_registry.gather();
1030        encoder.encode_to_string(&metric_families).map_err(|e| {
1031            TransformError::ComputationError(format!("Failed to encode metrics: {}", e))
1032        })
1033    }
1034}
1035
1036#[allow(dead_code)]
1037fn current_timestamp() -> u64 {
1038    SystemTime::now()
1039        .duration_since(UNIX_EPOCH)
1040        .unwrap_or_else(|_| Duration::from_secs(0))
1041        .as_secs()
1042}
1043
1044/// Advanced anomaly detection system
1045#[cfg(feature = "monitoring")]
1046pub struct AdvancedAnomalyDetector {
1047    /// Statistical anomaly detectors
1048    statistical_detectors: HashMap<String, StatisticalDetector>,
1049    /// Machine learning anomaly detectors
1050    ml_detectors: HashMap<String, MLAnomalyDetector>,
1051    /// Time series anomaly detectors
1052    time_series_detectors: HashMap<String, TimeSeriesAnomalyDetector>,
1053    /// Ensemble anomaly detector
1054    ensemble_detector: Option<EnsembleAnomalyDetector>,
1055    /// Anomaly history for learning
1056    anomaly_history: VecDeque<AnomalyRecord>,
1057    /// Alert thresholds
1058    thresholds: AnomalyThresholds,
1059}
1060
1061/// Statistical anomaly detector using multiple statistical methods
1062#[cfg(feature = "monitoring")]
1063#[derive(Debug, Clone)]
1064pub struct StatisticalDetector {
1065    /// Z-score threshold
1066    z_score_threshold: f64,
1067    /// IQR multiplier
1068    iqr_multiplier: f64,
1069    /// Modified Z-score threshold
1070    modified_z_threshold: f64,
1071    /// Historical data window
1072    data_window: VecDeque<f64>,
1073    /// Maximum window size
1074    max_window_size: usize,
1075}
1076
1077/// Machine learning anomaly detector
1078#[cfg(feature = "monitoring")]
1079pub struct MLAnomalyDetector {
1080    /// Isolation forest parameters
1081    isolation_forest_config: IsolationForestConfig,
1082    /// One-class SVM parameters
1083    svm_config: OneClassSVMConfig,
1084    /// Local outlier factor parameters
1085    lof_config: LOFConfig,
1086    /// Training data for ML models
1087    training_data: VecDeque<Vec<f64>>,
1088    /// Model state
1089    model_trained: bool,
1090}
1091
1092/// Time series anomaly detector
1093#[cfg(feature = "monitoring")]
1094pub struct TimeSeriesAnomalyDetector {
1095    /// ARIMA parameters
1096    arima_config: ARIMAConfig,
1097    /// Seasonal decomposition parameters
1098    seasonal_config: SeasonalConfig,
1099    /// Change point detection parameters
1100    change_point_config: ChangePointConfig,
1101    /// Historical time series data
1102    time_series_data: VecDeque<TimeSeriesPoint>,
1103    /// Forecast model
1104    forecast_model: Option<ForecastModel>,
1105}
1106
1107/// Ensemble anomaly detector combining multiple methods
1108#[cfg(feature = "monitoring")]
1109pub struct EnsembleAnomalyDetector {
1110    /// Detector weights
1111    detector_weights: HashMap<String, f64>,
1112    /// Voting threshold
1113    voting_threshold: f64,
1114    /// Confidence threshold
1115    confidence_threshold: f64,
1116}
1117
1118#[cfg(feature = "monitoring")]
1119impl EnsembleAnomalyDetector {
1120    /// Create a new ensemble anomaly detector
1121    pub fn new(
1122        detector_weights: HashMap<String, f64>,
1123        voting_threshold: f64,
1124        confidence_threshold: f64,
1125    ) -> Self {
1126        EnsembleAnomalyDetector {
1127            detector_weights,
1128            voting_threshold,
1129            confidence_threshold,
1130        }
1131    }
1132
1133    /// Detect ensemble anomalies by combining multiple detector results
1134    pub fn detect_ensemble_anomalies(
1135        &self,
1136        metrics: &HashMap<String, f64>,
1137        _timestamp: u64,
1138    ) -> Result<Vec<AnomalyRecord>> {
1139        // Placeholder ensemble detection logic
1140        // In a full implementation, this would combine results from multiple detectors
1141        // using voting, weighted averaging, or other ensemble methods
1142
1143        // For now, return empty results
1144        Ok(vec![])
1145    }
1146}
1147
1148/// Anomaly record for historical analysis
1149#[cfg(feature = "monitoring")]
1150#[derive(Debug, Clone)]
1151pub struct AnomalyRecord {
1152    /// Timestamp
1153    pub timestamp: u64,
1154    /// Metric name
1155    pub metric_name: String,
1156    /// Anomaly value
1157    pub value: f64,
1158    /// Anomaly score
1159    pub anomaly_score: f64,
1160    /// Detection method
1161    pub detection_method: String,
1162    /// Severity level
1163    pub severity: AnomalySeverity,
1164    /// Context information
1165    pub context: HashMap<String, String>,
1166}
1167
1168/// Anomaly severity levels
1169#[cfg(feature = "monitoring")]
1170#[derive(Debug, Clone, PartialEq)]
1171pub enum AnomalySeverity {
1172    /// Low severity - informational anomaly
1173    Low,
1174    /// Medium severity - notable deviation
1175    Medium,
1176    /// High severity - significant anomaly requiring attention
1177    High,
1178    /// Critical severity - severe anomaly requiring immediate action
1179    Critical,
1180}
1181
1182/// Anomaly detection thresholds
1183#[cfg(feature = "monitoring")]
1184#[derive(Debug, Clone)]
1185pub struct AnomalyThresholds {
1186    /// Low severity threshold
1187    pub low_threshold: f64,
1188    /// Medium severity threshold
1189    pub medium_threshold: f64,
1190    /// High severity threshold
1191    pub high_threshold: f64,
1192    /// Critical severity threshold
1193    pub critical_threshold: f64,
1194}
1195
1196impl Default for AnomalyThresholds {
1197    fn default() -> Self {
1198        AnomalyThresholds {
1199            low_threshold: 2.0,      // 2 sigma
1200            medium_threshold: 2.5,   // 2.5 sigma
1201            high_threshold: 3.0,     // 3 sigma
1202            critical_threshold: 4.0, // 4 sigma
1203        }
1204    }
1205}
1206
1207/// Time series data point
1208#[cfg(feature = "monitoring")]
1209#[derive(Debug, Clone)]
1210pub struct TimeSeriesPoint {
1211    /// Unix timestamp in milliseconds
1212    pub timestamp: u64,
1213    /// Numeric value at this timestamp
1214    pub value: f64,
1215    /// Additional metadata key-value pairs
1216    pub metadata: HashMap<String, String>,
1217}
1218
1219/// Configuration structures for various anomaly detection methods
1220#[cfg(feature = "monitoring")]
1221#[derive(Debug, Clone)]
1222pub struct IsolationForestConfig {
1223    /// Number of isolation trees in the forest
1224    pub n_trees: usize,
1225    /// Expected proportion of outliers (0.0 to 0.5)
1226    pub contamination: f64,
1227    /// Maximum number of samples to use per tree
1228    pub max_samples: usize,
1229}
1230
1231/// Configuration for One-Class SVM anomaly detection
1232#[cfg(feature = "monitoring")]
1233#[derive(Debug, Clone)]
1234pub struct OneClassSVMConfig {
1235    /// Upper bound on the fraction of training errors (0 < nu <= 1)
1236    pub nu: f64,
1237    /// Kernel coefficient for RBF kernel
1238    pub gamma: f64,
1239    /// Kernel type (e.g., "rbf", "linear", "poly")
1240    pub kernel: String,
1241}
1242
1243/// Configuration for Local Outlier Factor (LOF) detection
1244#[cfg(feature = "monitoring")]
1245#[derive(Debug, Clone)]
1246pub struct LOFConfig {
1247    /// Number of neighbors to use for LOF computation
1248    pub n_neighbors: usize,
1249    /// Expected proportion of outliers in the dataset
1250    pub contamination: f64,
1251}
1252
1253/// Configuration for ARIMA (AutoRegressive Integrated Moving Average) model
1254#[cfg(feature = "monitoring")]
1255#[derive(Debug, Clone)]
1256pub struct ARIMAConfig {
1257    /// Order of the autoregressive (AR) component
1258    pub p: usize,
1259    /// Degree of differencing (integration order)
1260    pub d: usize,
1261    /// Order of the moving average (MA) component
1262    pub q: usize,
1263}
1264
1265/// Configuration for seasonal time series decomposition
1266#[cfg(feature = "monitoring")]
1267#[derive(Debug, Clone)]
1268pub struct SeasonalConfig {
1269    /// Length of the seasonal cycle (e.g., 12 for monthly data with yearly seasonality)
1270    pub seasonal_period: usize,
1271    /// Whether to include trend component in decomposition
1272    pub trend_component: bool,
1273    /// Whether to include seasonal component in decomposition
1274    pub seasonal_component: bool,
1275}
1276
1277/// Configuration for change point detection
1278#[cfg(feature = "monitoring")]
1279#[derive(Debug, Clone)]
1280pub struct ChangePointConfig {
1281    /// Size of the sliding window for detection
1282    pub window_size: usize,
1283    /// Statistical significance level threshold
1284    pub significance_level: f64,
1285}
1286
1287/// Time series forecasting model configuration
1288#[cfg(feature = "monitoring")]
1289#[derive(Debug, Clone)]
1290pub struct ForecastModel {
1291    /// Model coefficients for prediction
1292    pub coefficients: Vec<f64>,
1293    /// Number of time steps to forecast ahead
1294    pub forecast_horizon: usize,
1295    /// Confidence interval for predictions (e.g., 0.95 for 95%)
1296    pub confidence_interval: f64,
1297}
1298
1299#[cfg(feature = "monitoring")]
1300impl AdvancedAnomalyDetector {
1301    /// Create a new advanced anomaly detector
1302    pub fn new() -> Self {
1303        AdvancedAnomalyDetector {
1304            statistical_detectors: HashMap::new(),
1305            ml_detectors: HashMap::new(),
1306            time_series_detectors: HashMap::new(),
1307            ensemble_detector: None,
1308            anomaly_history: VecDeque::with_capacity(10000),
1309            thresholds: AnomalyThresholds::default(),
1310        }
1311    }
1312
1313    /// Add a statistical detector for a metric
1314    pub fn add_statistical_detector(&mut self, metricname: String, detector: StatisticalDetector) {
1315        self.statistical_detectors.insert(metricname, detector);
1316    }
1317
1318    /// Add a machine learning detector for a metric
1319    pub fn add_ml_detector(&mut self, metricname: String, detector: MLAnomalyDetector) {
1320        self.ml_detectors.insert(metricname, detector);
1321    }
1322
1323    /// Add a time series detector for a metric
1324    pub fn add_time_series_detector(
1325        &mut self,
1326        metric_name: String,
1327        detector: TimeSeriesAnomalyDetector,
1328    ) {
1329        self.time_series_detectors.insert(metric_name, detector);
1330    }
1331
1332    /// Configure ensemble detector
1333    pub fn configure_ensemble(&mut self, detector: EnsembleAnomalyDetector) {
1334        self.ensemble_detector = Some(detector);
1335    }
1336
1337    /// Detect anomalies in new data
1338    pub fn detect_anomalies(
1339        &mut self,
1340        metrics: &HashMap<String, f64>,
1341    ) -> Result<Vec<AnomalyRecord>> {
1342        let mut anomalies = Vec::new();
1343        let timestamp = current_timestamp();
1344
1345        for (metric_name, &value) in metrics {
1346            // Statistical detection
1347            if let Some(detector) = self.statistical_detectors.get_mut(metric_name) {
1348                if let Some(anomaly) = detector.detect_anomaly(value, metric_name, timestamp)? {
1349                    anomalies.push(anomaly);
1350                }
1351            }
1352
1353            // ML detection
1354            if let Some(detector) = self.ml_detectors.get_mut(metric_name) {
1355                if let Some(anomaly) = detector.detect_anomaly(value, metric_name, timestamp)? {
1356                    anomalies.push(anomaly);
1357                }
1358            }
1359
1360            // Time series detection
1361            if let Some(detector) = self.time_series_detectors.get_mut(metric_name) {
1362                if let Some(anomaly) = detector.detect_anomaly(value, metric_name, timestamp)? {
1363                    anomalies.push(anomaly);
1364                }
1365            }
1366        }
1367
1368        // Ensemble detection
1369        if let Some(ref ensemble) = self.ensemble_detector {
1370            let ensemble_anomalies = ensemble.detect_ensemble_anomalies(metrics, timestamp)?;
1371            anomalies.extend(ensemble_anomalies);
1372        }
1373
1374        // Update anomaly history
1375        for anomaly in &anomalies {
1376            self.anomaly_history.push_back(anomaly.clone());
1377            if self.anomaly_history.len() > 10000 {
1378                self.anomaly_history.pop_front();
1379            }
1380        }
1381
1382        Ok(anomalies)
1383    }
1384
1385    /// Get anomaly patterns and insights
1386    pub fn get_anomaly_insights(&self, lookbackhours: u64) -> AnomalyInsights {
1387        let cutoff_time = current_timestamp() - (lookbackhours * 3600);
1388        let recent_anomalies: Vec<_> = self
1389            .anomaly_history
1390            .iter()
1391            .filter(|a| a.timestamp >= cutoff_time)
1392            .collect();
1393
1394        let total_anomalies = recent_anomalies.len();
1395        let critical_anomalies = recent_anomalies
1396            .iter()
1397            .filter(|a| a.severity == AnomalySeverity::Critical)
1398            .count();
1399
1400        // Calculate anomaly frequency by metric
1401        let mut metric_frequencies = HashMap::new();
1402        for anomaly in &recent_anomalies {
1403            *metric_frequencies
1404                .entry(anomaly.metric_name.clone())
1405                .or_insert(0) += 1;
1406        }
1407
1408        // Calculate anomaly frequency by detection method
1409        let mut method_frequencies = HashMap::new();
1410        for anomaly in &recent_anomalies {
1411            *method_frequencies
1412                .entry(anomaly.detection_method.clone())
1413                .or_insert(0) += 1;
1414        }
1415
1416        // Identify trending anomalies
1417        let trending_metrics = self.identify_trending_anomalies(&recent_anomalies);
1418
1419        let most_anomalous_metric = metric_frequencies
1420            .iter()
1421            .max_by_key(|(_, &count)| count)
1422            .map(|(metric_, _)| metric_.clone());
1423
1424        AnomalyInsights {
1425            total_anomalies,
1426            critical_anomalies,
1427            anomaly_rate: total_anomalies as f64 / lookbackhours as f64,
1428            metric_frequencies,
1429            method_frequencies,
1430            trending_metrics,
1431            most_anomalous_metric,
1432        }
1433    }
1434
1435    /// Identify trending anomalies
1436    fn identify_trending_anomalies(&self, anomalies: &[&AnomalyRecord]) -> Vec<String> {
1437        // Simple trending detection based on recent frequency
1438        let mut recent_counts = HashMap::new();
1439        let current_time = current_timestamp();
1440        let recent_threshold = 3600; // 1 hour
1441
1442        for anomaly in anomalies {
1443            if current_time - anomaly.timestamp <= recent_threshold {
1444                *recent_counts
1445                    .entry(anomaly.metric_name.clone())
1446                    .or_insert(0) += 1;
1447            }
1448        }
1449
1450        recent_counts
1451            .into_iter()
1452            .filter(|(_, count)| *count >= 3) // At least 3 anomalies in recent period
1453            .map(|(metric_, _)| metric_)
1454            .collect()
1455    }
1456
1457    /// Update detector configurations based on feedback
1458    pub fn update_detector_configurations(&mut self, feedback: AnomalyFeedback) -> Result<()> {
1459        match feedback.feedback_type {
1460            FeedbackType::FalsePositive => {
1461                // Increase thresholds for the detector that generated this anomaly
1462                self.adjust_thresholds_for_detector(&feedback.detection_method, 0.1)?;
1463            }
1464            FeedbackType::FalseNegative => {
1465                // Decrease thresholds for the detector
1466                self.adjust_thresholds_for_detector(&feedback.detection_method, -0.1)?;
1467            }
1468            FeedbackType::ConfirmedAnomaly => {
1469                // No adjustment needed, but can be used for retraining
1470            }
1471        }
1472        Ok(())
1473    }
1474
1475    fn adjust_thresholds_for_detector(
1476        &mut self,
1477        detection_method: &str,
1478        adjustment: f64,
1479    ) -> Result<()> {
1480        // Adjust thresholds based on feedback
1481        match detection_method {
1482            "statistical" => {
1483                for detector in self.statistical_detectors.values_mut() {
1484                    detector.z_score_threshold += adjustment;
1485                    detector.z_score_threshold = detector.z_score_threshold.clamp(1.5, 5.0);
1486                }
1487            }
1488            "ml" => {
1489                // Adjust ML detector parameters
1490                for detector in self.ml_detectors.values_mut() {
1491                    detector.isolation_forest_config.contamination += adjustment * 0.01;
1492                    detector.isolation_forest_config.contamination = detector
1493                        .isolation_forest_config
1494                        .contamination
1495                        .max(0.01)
1496                        .min(0.5);
1497                }
1498            }
1499            _ => {}
1500        }
1501        Ok(())
1502    }
1503}
1504
1505#[cfg(feature = "monitoring")]
1506impl StatisticalDetector {
1507    /// Create a new statistical detector
1508    pub fn new(z_score_threshold: f64, iqr_multiplier: f64, max_window_size: usize) -> Self {
1509        StatisticalDetector {
1510            z_score_threshold,
1511            iqr_multiplier,
1512            modified_z_threshold: z_score_threshold * 0.6745, // Median-based
1513            data_window: VecDeque::with_capacity(max_window_size),
1514            max_window_size,
1515        }
1516    }
1517
1518    /// Detect anomaly using statistical methods
1519    pub fn detect_anomaly(
1520        &mut self,
1521        value: f64,
1522        metric_name: &str,
1523        timestamp: u64,
1524    ) -> Result<Option<AnomalyRecord>> {
1525        // Add value to window
1526        self.data_window.push_back(value);
1527        if self.data_window.len() > self.max_window_size {
1528            self.data_window.pop_front();
1529        }
1530
1531        // Need sufficient data for meaningful statistics
1532        if self.data_window.len() < 10 {
1533            return Ok(None);
1534        }
1535
1536        let values: Vec<f64> = self.data_window.iter().copied().collect();
1537
1538        // Z-score based detection
1539        let mean = values.iter().sum::<f64>() / values.len() as f64;
1540        let variance = values.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / values.len() as f64;
1541        let std_dev = variance.sqrt();
1542
1543        if std_dev > 0.0 {
1544            let z_score = (value - mean) / std_dev;
1545
1546            if z_score.abs() > self.z_score_threshold {
1547                let severity = if z_score.abs() > 4.0 {
1548                    AnomalySeverity::Critical
1549                } else if z_score.abs() > 3.0 {
1550                    AnomalySeverity::High
1551                } else if z_score.abs() > 2.5 {
1552                    AnomalySeverity::Medium
1553                } else {
1554                    AnomalySeverity::Low
1555                };
1556
1557                return Ok(Some(AnomalyRecord {
1558                    timestamp,
1559                    metric_name: metric_name.to_string(),
1560                    value,
1561                    anomaly_score: z_score.abs(),
1562                    detection_method: "statistical_zscore".to_string(),
1563                    severity,
1564                    context: [
1565                        ("mean".to_string(), mean.to_string()),
1566                        ("std_dev".to_string(), std_dev.to_string()),
1567                        ("z_score".to_string(), z_score.to_string()),
1568                    ]
1569                    .iter()
1570                    .cloned()
1571                    .collect(),
1572                }));
1573            }
1574        }
1575
1576        // IQR based detection
1577        let mut sorted_values = values.clone();
1578        sorted_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
1579
1580        let q1_idx = sorted_values.len() / 4;
1581        let q3_idx = (3 * sorted_values.len()) / 4;
1582        let q1 = sorted_values[q1_idx];
1583        let q3 = sorted_values[q3_idx];
1584        let iqr = q3 - q1;
1585
1586        if iqr > 0.0 {
1587            let lower_bound = q1 - self.iqr_multiplier * iqr;
1588            let upper_bound = q3 + self.iqr_multiplier * iqr;
1589
1590            if value < lower_bound || value > upper_bound {
1591                let distance_from_bounds = if value < lower_bound {
1592                    lower_bound - value
1593                } else {
1594                    value - upper_bound
1595                };
1596
1597                let severity = if distance_from_bounds > 3.0 * iqr {
1598                    AnomalySeverity::Critical
1599                } else if distance_from_bounds > 2.0 * iqr {
1600                    AnomalySeverity::High
1601                } else if distance_from_bounds > 1.5 * iqr {
1602                    AnomalySeverity::Medium
1603                } else {
1604                    AnomalySeverity::Low
1605                };
1606
1607                return Ok(Some(AnomalyRecord {
1608                    timestamp,
1609                    metric_name: metric_name.to_string(),
1610                    value,
1611                    anomaly_score: distance_from_bounds / iqr,
1612                    detection_method: "statistical_iqr".to_string(),
1613                    severity,
1614                    context: [
1615                        ("q1".to_string(), q1.to_string()),
1616                        ("q3".to_string(), q3.to_string()),
1617                        ("iqr".to_string(), iqr.to_string()),
1618                        (
1619                            "distance_from_bounds".to_string(),
1620                            distance_from_bounds.to_string(),
1621                        ),
1622                    ]
1623                    .iter()
1624                    .cloned()
1625                    .collect(),
1626                }));
1627            }
1628        }
1629
1630        Ok(None)
1631    }
1632}
1633
1634#[cfg(feature = "monitoring")]
1635impl MLAnomalyDetector {
1636    /// Create a new ML anomaly detector
1637    pub fn new() -> Self {
1638        MLAnomalyDetector {
1639            isolation_forest_config: IsolationForestConfig {
1640                n_trees: 100,
1641                contamination: 0.1,
1642                max_samples: 256,
1643            },
1644            svm_config: OneClassSVMConfig {
1645                nu: 0.1,
1646                gamma: 0.1,
1647                kernel: "rbf".to_string(),
1648            },
1649            lof_config: LOFConfig {
1650                n_neighbors: 20,
1651                contamination: 0.1,
1652            },
1653            training_data: VecDeque::with_capacity(1000),
1654            model_trained: false,
1655        }
1656    }
1657
1658    /// Detect anomaly using ML methods
1659    pub fn detect_anomaly(
1660        &mut self,
1661        value: f64,
1662        metric_name: &str,
1663        timestamp: u64,
1664    ) -> Result<Option<AnomalyRecord>> {
1665        // Add to training data
1666        self.training_data.push_back(vec![value]);
1667        if self.training_data.len() > 1000 {
1668            self.training_data.pop_front();
1669        }
1670
1671        // Need sufficient data to train models
1672        if self.training_data.len() < 50 {
1673            return Ok(None);
1674        }
1675
1676        // Simplified isolation forest implementation
1677        let anomaly_score = self.simplified_isolation_forest_score(value)?;
1678
1679        if anomaly_score > 0.6 {
1680            // Threshold for anomaly
1681            let severity = if anomaly_score > 0.9 {
1682                AnomalySeverity::Critical
1683            } else if anomaly_score > 0.8 {
1684                AnomalySeverity::High
1685            } else if anomaly_score > 0.7 {
1686                AnomalySeverity::Medium
1687            } else {
1688                AnomalySeverity::Low
1689            };
1690
1691            return Ok(Some(AnomalyRecord {
1692                timestamp,
1693                metric_name: metric_name.to_string(),
1694                value,
1695                anomaly_score,
1696                detection_method: "ml_isolation_forest".to_string(),
1697                severity,
1698                context: [
1699                    ("isolation_score".to_string(), anomaly_score.to_string()),
1700                    (
1701                        "training_samples".to_string(),
1702                        self.training_data.len().to_string(),
1703                    ),
1704                ]
1705                .iter()
1706                .cloned()
1707                .collect(),
1708            }));
1709        }
1710
1711        Ok(None)
1712    }
1713
1714    /// Simplified isolation forest scoring
1715    fn simplified_isolation_forest_score(&self, value: f64) -> Result<f64> {
1716        let data: Vec<f64> = self.training_data.iter().map(|v| v[0]).collect();
1717
1718        // Calculate percentile rank as a simple anomaly score
1719        let mut sorted_data = data.clone();
1720        sorted_data.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
1721
1722        let position = sorted_data
1723            .iter()
1724            .position(|&x| x >= value)
1725            .unwrap_or(sorted_data.len());
1726        let percentile = position as f64 / sorted_data.len() as f64;
1727
1728        // Anomaly score based on distance from median
1729        let distance_from_median = (percentile - 0.5).abs() * 2.0;
1730
1731        Ok(distance_from_median)
1732    }
1733}
1734
1735#[cfg(feature = "monitoring")]
1736impl TimeSeriesAnomalyDetector {
1737    /// Create a new time series anomaly detector
1738    pub fn new() -> Self {
1739        TimeSeriesAnomalyDetector {
1740            arima_config: ARIMAConfig { p: 1, d: 1, q: 1 },
1741            seasonal_config: SeasonalConfig {
1742                seasonal_period: 24, // 24 hours
1743                trend_component: true,
1744                seasonal_component: true,
1745            },
1746            change_point_config: ChangePointConfig {
1747                window_size: 50,
1748                significance_level: 0.05,
1749            },
1750            time_series_data: VecDeque::with_capacity(1000),
1751            forecast_model: None,
1752        }
1753    }
1754
1755    /// Detect anomaly using time series methods
1756    pub fn detect_anomaly(
1757        &mut self,
1758        value: f64,
1759        metric_name: &str,
1760        timestamp: u64,
1761    ) -> Result<Option<AnomalyRecord>> {
1762        // Add to time series data
1763        self.time_series_data.push_back(TimeSeriesPoint {
1764            timestamp,
1765            value,
1766            metadata: HashMap::new(),
1767        });
1768
1769        if self.time_series_data.len() > 1000 {
1770            self.time_series_data.pop_front();
1771        }
1772
1773        // Need sufficient data for time series analysis
1774        if self.time_series_data.len() < 50 {
1775            return Ok(None);
1776        }
1777
1778        // Simple change point detection
1779        let anomaly_score = self.detect_change_point(value)?;
1780
1781        if anomaly_score > 2.0 {
1782            // Threshold for anomaly
1783            let severity = if anomaly_score > 5.0 {
1784                AnomalySeverity::Critical
1785            } else if anomaly_score > 4.0 {
1786                AnomalySeverity::High
1787            } else if anomaly_score > 3.0 {
1788                AnomalySeverity::Medium
1789            } else {
1790                AnomalySeverity::Low
1791            };
1792
1793            return Ok(Some(AnomalyRecord {
1794                timestamp,
1795                metric_name: metric_name.to_string(),
1796                value,
1797                anomaly_score,
1798                detection_method: "time_series_change_point".to_string(),
1799                severity,
1800                context: [
1801                    ("change_point_score".to_string(), anomaly_score.to_string()),
1802                    (
1803                        "window_size".to_string(),
1804                        self.change_point_config.window_size.to_string(),
1805                    ),
1806                ]
1807                .iter()
1808                .cloned()
1809                .collect(),
1810            }));
1811        }
1812
1813        Ok(None)
1814    }
1815
1816    /// Simple change point detection
1817    fn detect_change_point(&self, current_value: f64) -> Result<f64> {
1818        let window_size = self
1819            .change_point_config
1820            .window_size
1821            .min(self.time_series_data.len());
1822        if window_size < 10 {
1823            return Ok(0.0);
1824        }
1825
1826        let recent_data: Vec<f64> = self
1827            .time_series_data
1828            .iter()
1829            .rev()
1830            .take(window_size)
1831            .map(|p| p.value)
1832            .collect();
1833
1834        let half_window = window_size / 2;
1835        let first_half: Vec<f64> = recent_data.iter().take(half_window).copied().collect();
1836        let second_half: Vec<f64> = recent_data.iter().skip(half_window).copied().collect();
1837
1838        if first_half.is_empty() || second_half.is_empty() {
1839            return Ok(0.0);
1840        }
1841
1842        let mean1 = first_half.iter().sum::<f64>() / first_half.len() as f64;
1843        let mean2 = second_half.iter().sum::<f64>() / second_half.len() as f64;
1844
1845        let var1 =
1846            first_half.iter().map(|x| (x - mean1).powi(2)).sum::<f64>() / first_half.len() as f64;
1847        let var2 =
1848            second_half.iter().map(|x| (x - mean2).powi(2)).sum::<f64>() / second_half.len() as f64;
1849
1850        let pooled_std = ((var1 + var2) / 2.0).sqrt();
1851
1852        if pooled_std > 0.0 {
1853            let t_statistic =
1854                (mean2 - mean1).abs() / (pooled_std * (2.0_f64 / window_size as f64).sqrt());
1855            Ok(t_statistic)
1856        } else {
1857            Ok(0.0)
1858        }
1859    }
1860}
1861
1862/// Anomaly insights summary
1863#[cfg(feature = "monitoring")]
1864#[derive(Debug, Clone)]
1865pub struct AnomalyInsights {
1866    /// Total number of detected anomalies
1867    pub total_anomalies: usize,
1868    /// Number of critical severity anomalies
1869    pub critical_anomalies: usize,
1870    /// Rate of anomalies relative to total data points
1871    pub anomaly_rate: f64,
1872    /// Frequency count of anomalies per metric name
1873    pub metric_frequencies: HashMap<String, usize>,
1874    /// Frequency count of anomalies per detection method
1875    pub method_frequencies: HashMap<String, usize>,
1876    /// Metrics with increasing anomaly trends
1877    pub trending_metrics: Vec<String>,
1878    /// Metric with the highest anomaly count
1879    pub most_anomalous_metric: Option<String>,
1880}
1881
1882/// Feedback for anomaly detection tuning
1883#[cfg(feature = "monitoring")]
1884#[derive(Debug, Clone)]
1885pub struct AnomalyFeedback {
1886    /// Unique identifier for the anomaly
1887    pub anomaly_id: String,
1888    /// Type of feedback (false positive, false negative, confirmed)
1889    pub feedback_type: FeedbackType,
1890    /// Detection method that identified this anomaly
1891    pub detection_method: String,
1892    /// Name of the metric that triggered detection
1893    pub metric_name: String,
1894    /// Unix timestamp when feedback was provided
1895    pub timestamp: u64,
1896}
1897
1898/// Type of feedback for anomaly detection accuracy
1899#[cfg(feature = "monitoring")]
1900#[derive(Debug, Clone)]
1901pub enum FeedbackType {
1902    /// Detection was incorrect (false alarm)
1903    FalsePositive,
1904    /// Anomaly was missed by detection
1905    FalseNegative,
1906    /// Detection correctly identified an anomaly
1907    ConfirmedAnomaly,
1908}
1909
1910// Stub implementations when monitoring feature is not enabled
1911#[cfg(not(feature = "monitoring"))]
1912pub struct AdvancedAnomalyDetector;
1913
1914#[cfg(not(feature = "monitoring"))]
1915pub struct AnomalyInsights;