1use crate::error::{Result, TransformError};
8use scirs2_core::ndarray::{Array2, ArrayView1, ArrayView2};
9use scirs2_core::validation::check_not_empty;
10use std::collections::{HashMap, VecDeque};
11use std::time::{Duration, SystemTime, UNIX_EPOCH};
12
13#[cfg(feature = "monitoring")]
14use prometheus::{Counter, Gauge, Histogram, HistogramOpts, Registry};
15
16#[derive(Debug, Clone, PartialEq)]
18pub enum DriftMethod {
19 KolmogorovSmirnov,
21 ChiSquare,
23 PopulationStabilityIndex,
25 MaximumMeanDiscrepancy,
27 WassersteinDistance,
29}
30
31#[derive(Debug, Clone)]
33pub struct DriftDetectionResult {
34 pub feature_name: String,
36 pub method: DriftMethod,
38 pub statistic: f64,
40 pub p_value: Option<f64>,
42 pub is_drift_detected: bool,
44 pub severity: f64,
46 pub timestamp: u64,
48}
49
50#[derive(Debug, Clone)]
52pub struct PerformanceMetrics {
53 pub processing_time_ms: f64,
55 pub memory_usage_mb: f64,
57 pub error_rate: f64,
59 pub throughput: f64,
61 pub data_quality_score: f64,
63 pub timestamp: u64,
65}
66
67#[derive(Debug, Clone)]
69pub struct AlertConfig {
70 pub drift_threshold: f64,
72 pub performance_threshold: f64,
74 pub error_rate_threshold: f64,
76 pub memory_threshold_mb: f64,
78 pub cooldown_seconds: u64,
80}
81
82impl Default for AlertConfig {
83 fn default() -> Self {
84 AlertConfig {
85 drift_threshold: 0.05,
86 performance_threshold: 2.0, error_rate_threshold: 0.05, memory_threshold_mb: 1000.0, cooldown_seconds: 300, }
91 }
92}
93
94#[derive(Debug, Clone)]
96pub enum AlertType {
97 DataDrift {
99 feature: String,
101 severity: f64,
103 },
104 PerformanceDegradation {
106 metric: String,
108 value: f64,
110 },
111 HighErrorRate {
113 rate: f64,
115 },
116 MemoryExhaustion {
118 usage_mb: f64,
120 },
121 DataQualityIssue {
123 score: f64,
125 },
126}
127
128pub struct TransformationMonitor {
130 reference_data: Option<Array2<f64>>,
132 feature_names: Vec<String>,
134 drift_methods: HashMap<String, DriftMethod>,
136 performance_history: VecDeque<PerformanceMetrics>,
138 drift_history: VecDeque<DriftDetectionResult>,
140 alert_config: AlertConfig,
142 last_alert_times: HashMap<String, u64>,
144 baseline_metrics: Option<PerformanceMetrics>,
146 #[cfg(feature = "monitoring")]
148 metrics_registry: Registry,
149 #[cfg(feature = "monitoring")]
151 prometheus_metrics: PrometheusMetrics,
152}
153
154#[cfg(feature = "monitoring")]
155struct PrometheusMetrics {
156 drift_detections: Counter,
157 processing_time: Histogram,
158 memory_usage: Gauge,
159 error_rate: Gauge,
160 throughput: Gauge,
161 data_quality: Gauge,
162}
163
164impl TransformationMonitor {
165 pub fn new() -> Result<Self> {
167 #[cfg(feature = "monitoring")]
168 let metrics_registry = Registry::new();
169
170 #[cfg(feature = "monitoring")]
171 let prometheus_metrics = PrometheusMetrics {
172 drift_detections: Counter::new(
173 "transform_drift_detections_total",
174 "Total number of drift detections",
175 )
176 .map_err(|e| {
177 TransformError::ComputationError(format!("Failed to create counter: {}", e))
178 })?,
179 processing_time: Histogram::with_opts(HistogramOpts::new(
180 "transform_processing_time_seconds",
181 "Processing time in seconds",
182 ))
183 .map_err(|e| {
184 TransformError::ComputationError(format!("Failed to create histogram: {}", e))
185 })?,
186 memory_usage: Gauge::new("transform_memory_usage_mb", "Memory usage in MB").map_err(
187 |e| TransformError::ComputationError(format!("Failed to create gauge: {}", e)),
188 )?,
189 error_rate: Gauge::new("transform_error_rate", "Error rate").map_err(|e| {
190 TransformError::ComputationError(format!("Failed to create gauge: {}", e))
191 })?,
192 throughput: Gauge::new(
193 "transform_throughput_samples_per_second",
194 "Throughput in samples per second",
195 )
196 .map_err(|e| {
197 TransformError::ComputationError(format!("Failed to create gauge: {}", e))
198 })?,
199 data_quality: Gauge::new("transform_data_quality_score", "Data quality score")
200 .map_err(|e| {
201 TransformError::ComputationError(format!("Failed to create gauge: {}", e))
202 })?,
203 };
204
205 #[cfg(feature = "monitoring")]
206 {
207 metrics_registry
208 .register(Box::new(prometheus_metrics.drift_detections.clone()))
209 .map_err(|e| {
210 TransformError::ComputationError(format!("Failed to register counter: {}", e))
211 })?;
212 metrics_registry
213 .register(Box::new(prometheus_metrics.processing_time.clone()))
214 .map_err(|e| {
215 TransformError::ComputationError(format!("Failed to register histogram: {}", e))
216 })?;
217 metrics_registry
218 .register(Box::new(prometheus_metrics.memory_usage.clone()))
219 .map_err(|e| {
220 TransformError::ComputationError(format!("Failed to register gauge: {}", e))
221 })?;
222 metrics_registry
223 .register(Box::new(prometheus_metrics.error_rate.clone()))
224 .map_err(|e| {
225 TransformError::ComputationError(format!("Failed to register gauge: {}", e))
226 })?;
227 metrics_registry
228 .register(Box::new(prometheus_metrics.throughput.clone()))
229 .map_err(|e| {
230 TransformError::ComputationError(format!("Failed to register gauge: {}", e))
231 })?;
232 metrics_registry
233 .register(Box::new(prometheus_metrics.data_quality.clone()))
234 .map_err(|e| {
235 TransformError::ComputationError(format!("Failed to register gauge: {}", e))
236 })?;
237 }
238
239 Ok(TransformationMonitor {
240 reference_data: None,
241 feature_names: Vec::new(),
242 drift_methods: HashMap::new(),
243 performance_history: VecDeque::with_capacity(1000),
244 drift_history: VecDeque::with_capacity(1000),
245 alert_config: AlertConfig::default(),
246 last_alert_times: HashMap::new(),
247 baseline_metrics: None,
248 #[cfg(feature = "monitoring")]
249 metrics_registry,
250 #[cfg(feature = "monitoring")]
251 prometheus_metrics,
252 })
253 }
254
255 pub fn set_reference_data(
257 &mut self,
258 data: Array2<f64>,
259 feature_names: Option<Vec<String>>,
260 ) -> Result<()> {
261 self.reference_data = Some(data.clone());
262
263 if let Some(names) = feature_names {
264 if names.len() != data.ncols() {
265 return Err(TransformError::InvalidInput(
266 "Number of feature names must match number of columns".to_string(),
267 ));
268 }
269 self.feature_names = names;
270 } else {
271 self.feature_names = (0..data.ncols())
272 .map(|i| format!("feature_{}", i))
273 .collect();
274 }
275
276 for feature_name in &self.feature_names {
278 self.drift_methods
279 .insert(feature_name.clone(), DriftMethod::KolmogorovSmirnov);
280 }
281
282 Ok(())
283 }
284
285 pub fn set_drift_method(&mut self, featurename: &str, method: DriftMethod) -> Result<()> {
287 if !self.feature_names.contains(&featurename.to_string()) {
288 return Err(TransformError::InvalidInput(format!(
289 "Unknown feature name: {}",
290 featurename
291 )));
292 }
293
294 self.drift_methods.insert(featurename.to_string(), method);
295 Ok(())
296 }
297
298 pub fn set_alert_config(&mut self, config: AlertConfig) {
300 self.alert_config = config;
301 }
302
303 pub fn set_baseline_metrics(&mut self, metrics: PerformanceMetrics) {
305 self.baseline_metrics = Some(metrics);
306 }
307
308 pub fn detect_drift(
310 &mut self,
311 new_data: &ArrayView2<f64>,
312 ) -> Result<Vec<DriftDetectionResult>> {
313 let reference_data = self
314 .reference_data
315 .as_ref()
316 .ok_or_else(|| TransformError::InvalidInput("Reference data not set".to_string()))?;
317
318 if new_data.ncols() != reference_data.ncols() {
319 return Err(TransformError::InvalidInput(
320 "New data must have same number of features as reference data".to_string(),
321 ));
322 }
323
324 let mut results = Vec::new();
325 let timestamp = current_timestamp();
326
327 for (i, feature_name) in self.feature_names.iter().enumerate() {
328 let method = self
329 .drift_methods
330 .get(feature_name)
331 .unwrap_or(&DriftMethod::KolmogorovSmirnov);
332
333 let reference_feature = reference_data.column(i);
334 let new_feature = new_data.column(i);
335
336 let result = self.detect_feature_drift(
337 &reference_feature,
338 &new_feature,
339 feature_name,
340 method,
341 timestamp,
342 )?;
343
344 results.push(result.clone());
345 self.drift_history.push_back(result);
346
347 if self.drift_history.len() > 1000 {
349 self.drift_history.pop_front();
350 }
351 }
352
353 #[cfg(feature = "monitoring")]
355 {
356 let drift_count = results.iter().filter(|r| r.is_drift_detected).count();
357 self.prometheus_metrics
358 .drift_detections
359 .inc_by(drift_count as f64);
360 }
361
362 Ok(results)
363 }
364
365 pub fn record_metrics(&mut self, metrics: PerformanceMetrics) -> Result<Vec<AlertType>> {
367 self.performance_history.push_back(metrics.clone());
368
369 if self.performance_history.len() > 1000 {
371 self.performance_history.pop_front();
372 }
373
374 #[cfg(feature = "monitoring")]
376 {
377 self.prometheus_metrics
378 .processing_time
379 .observe(metrics.processing_time_ms / 1000.0);
380 self.prometheus_metrics
381 .memory_usage
382 .set(metrics.memory_usage_mb);
383 self.prometheus_metrics.error_rate.set(metrics.error_rate);
384 self.prometheus_metrics.throughput.set(metrics.throughput);
385 self.prometheus_metrics
386 .data_quality
387 .set(metrics.data_quality_score);
388 }
389
390 self.check_performance_alerts(&metrics)
392 }
393
394 pub fn get_drift_summary(&self, lookbackhours: u64) -> Result<HashMap<String, f64>> {
396 let cutoff_time = current_timestamp() - (lookbackhours * 3600);
397 let mut summary = HashMap::new();
398
399 for feature_name in &self.feature_names {
400 let recent_detections: Vec<_> = self
401 .drift_history
402 .iter()
403 .filter(|r| r.timestamp >= cutoff_time && r.feature_name == *feature_name)
404 .collect();
405
406 let drift_rate = if recent_detections.is_empty() {
407 0.0
408 } else {
409 recent_detections
410 .iter()
411 .filter(|r| r.is_drift_detected)
412 .count() as f64
413 / recent_detections.len() as f64
414 };
415
416 summary.insert(feature_name.clone(), drift_rate);
417 }
418
419 Ok(summary)
420 }
421
422 pub fn get_performance_trends(&self, lookbackhours: u64) -> Result<HashMap<String, f64>> {
424 let cutoff_time = current_timestamp() - (lookbackhours * 3600);
425 let recent_metrics: Vec<_> = self
426 .performance_history
427 .iter()
428 .filter(|m| m.timestamp >= cutoff_time)
429 .collect();
430
431 if recent_metrics.is_empty() {
432 return Ok(HashMap::new());
433 }
434
435 let mut trends = HashMap::new();
436
437 if recent_metrics.len() >= 2 {
439 let first = recent_metrics.first().expect("Operation failed");
440 let last = recent_metrics.last().expect("Operation failed");
441
442 trends.insert(
443 "processing_time_trend".to_string(),
444 (last.processing_time_ms - first.processing_time_ms) / first.processing_time_ms,
445 );
446 trends.insert(
447 "memory_usage_trend".to_string(),
448 (last.memory_usage_mb - first.memory_usage_mb) / first.memory_usage_mb,
449 );
450 trends.insert(
451 "error_rate_trend".to_string(),
452 last.error_rate - first.error_rate,
453 );
454 trends.insert(
455 "throughput_trend".to_string(),
456 (last.throughput - first.throughput) / first.throughput,
457 );
458 }
459
460 Ok(trends)
461 }
462
463 fn detect_feature_drift(
464 &self,
465 reference: &ArrayView1<f64>,
466 new_data: &ArrayView1<f64>,
467 feature_name: &str,
468 method: &DriftMethod,
469 timestamp: u64,
470 ) -> Result<DriftDetectionResult> {
471 check_not_empty(reference, "reference")?;
472 check_not_empty(new_data, "new_data")?;
473
474 for &val in reference.iter() {
476 if !val.is_finite() {
477 return Err(crate::error::TransformError::DataValidationError(
478 "Reference data contains non-finite values".to_string(),
479 ));
480 }
481 }
482
483 for &val in new_data.iter() {
485 if !val.is_finite() {
486 return Err(crate::error::TransformError::DataValidationError(
487 "New data contains non-finite values".to_string(),
488 ));
489 }
490 }
491
492 let (statistic, p_value, is_drift) = match method {
493 DriftMethod::KolmogorovSmirnov => {
494 let (stat, p_val) = self.kolmogorov_smirnov_test(reference, new_data)?;
495 (stat, Some(p_val), p_val < self.alert_config.drift_threshold)
496 }
497 DriftMethod::ChiSquare => {
498 let (stat, p_val) = self.chi_square_test(reference, new_data)?;
499 (stat, Some(p_val), p_val < self.alert_config.drift_threshold)
500 }
501 DriftMethod::PopulationStabilityIndex => {
502 let psi = self.population_stability_index(reference, new_data)?;
503 (psi, None, psi > 0.1) }
505 DriftMethod::MaximumMeanDiscrepancy => {
506 let mmd = self.maximum_mean_discrepancy(reference, new_data)?;
507 (mmd, None, mmd > self.alert_config.drift_threshold)
508 }
509 DriftMethod::WassersteinDistance => {
510 let distance = self.wasserstein_distance(reference, new_data)?;
511 (distance, None, distance > self.alert_config.drift_threshold)
512 }
513 };
514
515 let severity = if let Some(p_val) = p_value {
516 1.0 - p_val } else {
518 statistic.min(1.0) };
520
521 Ok(DriftDetectionResult {
522 feature_name: feature_name.to_string(),
523 method: method.clone(),
524 statistic,
525 p_value,
526 is_drift_detected: is_drift,
527 severity,
528 timestamp,
529 })
530 }
531
532 fn kolmogorov_smirnov_test(
533 &self,
534 x: &ArrayView1<f64>,
535 y: &ArrayView1<f64>,
536 ) -> Result<(f64, f64)> {
537 let mut x_sorted = x.to_vec();
538 let mut y_sorted = y.to_vec();
539 x_sorted.sort_by(|a, b| a.partial_cmp(b).expect("Operation failed"));
540 y_sorted.sort_by(|a, b| a.partial_cmp(b).expect("Operation failed"));
541
542 let n1 = x_sorted.len() as f64;
543 let n2 = y_sorted.len() as f64;
544
545 let mut combined: Vec<(f64, i32)> = Vec::new();
547 for val in &x_sorted {
548 combined.push((*val, 1)); }
550 for val in &y_sorted {
551 combined.push((*val, 2)); }
553 combined.sort_by(|a, b| a.0.partial_cmp(&b.0).expect("Operation failed"));
554
555 let mut cdf1 = 0.0;
556 let mut cdf2 = 0.0;
557 let mut max_diff: f64 = 0.0;
558
559 for (_, sample_id) in combined {
560 if sample_id == 1 {
561 cdf1 += 1.0 / n1;
562 } else {
563 cdf2 += 1.0 / n2;
564 }
565 max_diff = max_diff.max((cdf1 - cdf2).abs());
566 }
567
568 let statistic = max_diff;
569
570 let effective_n = (n1 * n2) / (n1 + n2);
572 let lambda = statistic * effective_n.sqrt();
573
574 let p_value = if lambda < 0.27 {
576 1.0
577 } else if lambda < 1.0 {
578 2.0 * (-2.0 * lambda * lambda).exp()
579 } else {
580 let mut sum = 0.0;
582 for k in 1..=10 {
583 let k_f = k as f64;
584 sum += (-1.0_f64).powi(k - 1) * (-2.0 * k_f * k_f * lambda * lambda).exp();
585 }
586 2.0 * sum
587 };
588
589 Ok((statistic, p_value.clamp(0.0, 1.0)))
590 }
591
592 fn population_stability_index(
593 &self,
594 reference: &ArrayView1<f64>,
595 new_data: &ArrayView1<f64>,
596 ) -> Result<f64> {
597 let mut ref_sorted = reference.to_vec();
599 ref_sorted.sort_by(|a, b| a.partial_cmp(b).expect("Operation failed"));
600
601 let n_bins = 10;
602 let mut bins = Vec::new();
603 for i in 0..=n_bins {
604 let percentile = (i as f64) / (n_bins as f64);
605 let index = ((ref_sorted.len() - 1) as f64 * percentile) as usize;
606 bins.push(ref_sorted[index]);
607 }
608
609 let ref_freq = self.calculate_bin_frequencies(reference, &bins);
611 let new_freq = self.calculate_bin_frequencies(new_data, &bins);
612
613 let mut psi = 0.0;
615 for i in 0..n_bins {
616 let ref_pct = ref_freq[i];
617 let new_pct = new_freq[i];
618
619 if ref_pct > 0.0 && new_pct > 0.0 {
620 psi += (new_pct - ref_pct) * (new_pct / ref_pct).ln();
621 }
622 }
623
624 Ok(psi)
625 }
626
627 fn calculate_bin_frequencies(&self, data: &ArrayView1<f64>, bins: &[f64]) -> Vec<f64> {
628 if bins.len() < 2 {
629 return vec![];
630 }
631
632 let mut frequencies = vec![0; bins.len() - 1];
633
634 for &value in data.iter() {
635 if !value.is_finite() {
636 continue;
637 }
638
639 let mut placed = false;
641 for i in 0..bins.len() - 1 {
642 if i == bins.len() - 2 {
643 if value >= bins[i] && value <= bins[i + 1] {
645 frequencies[i] += 1;
646 placed = true;
647 break;
648 }
649 } else if value >= bins[i] && value < bins[i + 1] {
650 frequencies[i] += 1;
651 placed = true;
652 break;
653 }
654 }
655
656 if !placed {
658 if value < bins[0] {
659 frequencies[0] += 1;
660 } else if value > bins[bins.len() - 1] {
661 let last_idx = frequencies.len() - 1;
662 frequencies[last_idx] += 1;
663 }
664 }
665 }
666
667 let total = data.iter().filter(|&&v| v.is_finite()).count() as f64;
668 if total == 0.0 {
669 vec![0.0; frequencies.len()]
670 } else {
671 frequencies.iter().map(|&f| f as f64 / total).collect()
672 }
673 }
674
675 fn wasserstein_distance(&self, x: &ArrayView1<f64>, y: &ArrayView1<f64>) -> Result<f64> {
676 let mut x_sorted: Vec<f64> = x.iter().filter(|&&v| v.is_finite()).copied().collect();
678 let mut y_sorted: Vec<f64> = y.iter().filter(|&&v| v.is_finite()).copied().collect();
679
680 if x_sorted.is_empty() || y_sorted.is_empty() {
681 return Ok(0.0);
682 }
683
684 x_sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
685 y_sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
686
687 let n1 = x_sorted.len();
688 let n2 = y_sorted.len();
689 let max_len = n1.max(n2);
690
691 let mut distance = 0.0;
692 for i in 0..max_len {
693 let x_val = if i < n1 {
694 x_sorted[i]
695 } else {
696 x_sorted[n1 - 1]
697 };
698 let y_val = if i < n2 {
699 y_sorted[i]
700 } else {
701 y_sorted[n2 - 1]
702 };
703 distance += (x_val - y_val).abs();
704 }
705
706 Ok(distance / max_len as f64)
707 }
708
709 fn chi_square_test(
711 &self,
712 reference: &ArrayView1<f64>,
713 new_data: &ArrayView1<f64>,
714 ) -> Result<(f64, f64)> {
715 let n_bins = 10;
717
718 let mut combined_data: Vec<f64> = reference
720 .iter()
721 .chain(new_data.iter())
722 .filter(|&&v| v.is_finite())
723 .copied()
724 .collect();
725
726 if combined_data.len() < n_bins {
727 return Ok((0.0, 1.0)); }
729
730 combined_data.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
731
732 let mut bins = Vec::new();
734 for i in 0..=n_bins {
735 let percentile = i as f64 / n_bins as f64;
736 let index = ((combined_data.len() - 1) as f64 * percentile) as usize;
737 bins.push(combined_data[index]);
738 }
739
740 bins.dedup_by(|a, b| (*a - *b).abs() < f64::EPSILON);
742
743 if bins.len() < 2 {
744 return Ok((0.0, 1.0));
745 }
746
747 let ref_freq = self.calculate_bin_frequencies(reference, &bins);
748 let new_freq = self.calculate_bin_frequencies(new_data, &bins);
749
750 let ref_total = reference.iter().filter(|&&v| v.is_finite()).count() as f64;
751 let new_total = new_data.iter().filter(|&&v| v.is_finite()).count() as f64;
752
753 if ref_total == 0.0 || new_total == 0.0 {
754 return Ok((0.0, 1.0));
755 }
756
757 let mut chi_square = 0.0;
759 let mut degrees_of_freedom = 0;
760
761 for i in 0..ref_freq.len() {
762 let observed_ref = ref_freq[i] * ref_total;
763 let observed_new = new_freq[i] * new_total;
764
765 let total_in_bin = observed_ref + observed_new;
767 let expected_ref_null = total_in_bin * ref_total / (ref_total + new_total);
768 let expected_new_null = total_in_bin * new_total / (ref_total + new_total);
769
770 if expected_ref_null > 5.0 && expected_new_null > 5.0 {
771 chi_square += (observed_ref - expected_ref_null).powi(2) / expected_ref_null;
772 chi_square += (observed_new - expected_new_null).powi(2) / expected_new_null;
773 degrees_of_freedom += 1;
774 }
775 }
776
777 let p_value = if degrees_of_freedom > 0 {
779 self.chi_square_cdf_complement(chi_square, degrees_of_freedom as f64)
780 } else {
781 1.0
782 };
783
784 Ok((chi_square, p_value))
785 }
786
787 fn maximum_mean_discrepancy(&self, x: &ArrayView1<f64>, y: &ArrayView1<f64>) -> Result<f64> {
789 let x_clean: Vec<f64> = x.iter().filter(|&&v| v.is_finite()).copied().collect();
790 let y_clean: Vec<f64> = y.iter().filter(|&&v| v.is_finite()).copied().collect();
791
792 if x_clean.is_empty() || y_clean.is_empty() {
793 return Ok(0.0);
794 }
795
796 let n = x_clean.len();
797 let m = y_clean.len();
798
799 let all_data: Vec<f64> = x_clean.iter().chain(y_clean.iter()).copied().collect();
801 let mut sorted_data = all_data;
802 sorted_data.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
803
804 let median = sorted_data[sorted_data.len() / 2];
806 let mad: f64 =
807 sorted_data.iter().map(|&x| (x - median).abs()).sum::<f64>() / sorted_data.len() as f64;
808 let bandwidth = mad.max(1.0); let mut kxx = 0.0;
812 let mut kyy = 0.0;
813 let mut kxy = 0.0;
814
815 if n > 1 {
817 for i in 0..n {
818 for j in (i + 1)..n {
819 kxx += self.rbf_kernel(x_clean[i], x_clean[j], bandwidth);
820 }
821 }
822 kxx = 2.0 * kxx / (n * (n - 1)) as f64;
823 }
824
825 if m > 1 {
827 for i in 0..m {
828 for j in (i + 1)..m {
829 kyy += self.rbf_kernel(y_clean[i], y_clean[j], bandwidth);
830 }
831 }
832 kyy = 2.0 * kyy / (m * (m - 1)) as f64;
833 }
834
835 for i in 0..n {
837 for j in 0..m {
838 kxy += self.rbf_kernel(x_clean[i], y_clean[j], bandwidth);
839 }
840 }
841 kxy /= (n * m) as f64;
842
843 let mmd_squared = kxx + kyy - 2.0 * kxy;
844 Ok(mmd_squared.max(0.0).sqrt()) }
846
847 fn rbf_kernel(&self, x: f64, y: f64, bandwidth: f64) -> f64 {
849 let diff = x - y;
850 (-diff * diff / (2.0 * bandwidth * bandwidth)).exp()
851 }
852
853 fn chi_square_cdf_complement(&self, x: f64, df: f64) -> f64 {
855 if x <= 0.0 {
856 return 1.0;
857 }
858 if df <= 0.0 {
859 return 0.0;
860 }
861
862 if df >= 30.0 {
864 let h = 2.0 / (9.0 * df);
865 let z = ((x / df).powf(1.0 / 3.0) - (1.0 - h)) / h.sqrt();
866 return 0.5 * (1.0 - self.erf(z / 2.0_f64.sqrt()));
867 }
868
869 let alpha = df / 2.0;
872 let x_half = x / 2.0;
873
874 if x_half < alpha + 1.0 {
876 let mut term = x_half.powf(alpha) * (-x_half).exp();
878 let mut sum = term;
879
880 for k in 1..=50 {
881 term *= x_half / (alpha + k as f64);
882 sum += term;
883 if term / sum < 1e-10 {
884 break;
885 }
886 }
887
888 let gamma_cdf = sum / self.gamma(alpha);
889 1.0 - gamma_cdf.min(1.0)
890 } else {
891 let a = alpha;
893 let b = x_half + 1.0 - a;
894 let c = 1e30;
895 let mut d = 1.0 / b;
896 let mut h = d;
897
898 for i in 1..=100 {
899 let an = -i as f64 * (i as f64 - a);
900 let b = b + 2.0;
901 d = an * d + b;
902 if d.abs() < 1e-30 {
903 d = 1e-30;
904 }
905 let mut c = b + an / c;
906 if c.abs() < 1e-30 {
907 c = 1e-30;
908 }
909 d = 1.0 / d;
910 let del = d * c;
911 h *= del;
912 if (del - 1.0).abs() < 1e-10 {
913 break;
914 }
915 }
916
917 let gamma_cf = (-x_half).exp() * x_half.powf(a) * h / self.gamma(a);
918 gamma_cf.clamp(0.0, 1.0)
919 }
920 }
921
922 fn erf(&self, x: f64) -> f64 {
924 let a1 = 0.254829592;
926 let a2 = -0.284496736;
927 let a3 = 1.421413741;
928 let a4 = -1.453152027;
929 let a5 = 1.061405429;
930 let p = 0.3275911;
931
932 let sign = if x >= 0.0 { 1.0 } else { -1.0 };
933 let x = x.abs();
934
935 let t = 1.0 / (1.0 + p * x);
936 let y = 1.0 - (((((a5 * t + a4) * t) + a3) * t + a2) * t + a1) * t * (-x * x).exp();
937
938 sign * y
939 }
940
941 fn gamma(&self, z: f64) -> f64 {
943 if z < 0.5 {
944 std::f64::consts::PI / ((std::f64::consts::PI * z).sin() * self.gamma(1.0 - z))
946 } else {
947 let g = 7.0;
949 let c = [
950 0.99999999999980993,
951 676.5203681218851,
952 -1259.1392167224028,
953 771.32342877765313,
954 -176.61502916214059,
955 12.507343278686905,
956 -0.13857109526572012,
957 9.9843695780195716e-6,
958 1.5056327351493116e-7,
959 ];
960
961 let z = z - 1.0;
962 let mut x = c[0];
963 for i in 1..c.len() {
964 x += c[i] / (z + i as f64);
965 }
966
967 let t = z + g + 0.5;
968 (2.0 * std::f64::consts::PI).sqrt() * t.powf(z + 0.5) * (-t).exp() * x
969 }
970 }
971
972 fn check_performance_alerts(&mut self, metrics: &PerformanceMetrics) -> Result<Vec<AlertType>> {
973 let mut alerts = Vec::new();
974 let current_time = current_timestamp();
975
976 let cooldown_key = "performance";
978 if let Some(&last_alert_time) = self.last_alert_times.get(cooldown_key) {
979 if current_time - last_alert_time < self.alert_config.cooldown_seconds {
980 return Ok(alerts);
981 }
982 }
983
984 if let Some(ref baseline) = self.baseline_metrics {
986 let degradation_ratio = metrics.processing_time_ms / baseline.processing_time_ms;
987 if degradation_ratio > self.alert_config.performance_threshold {
988 alerts.push(AlertType::PerformanceDegradation {
989 metric: "processing_time".to_string(),
990 value: degradation_ratio,
991 });
992 }
993 }
994
995 if metrics.error_rate > self.alert_config.error_rate_threshold {
997 alerts.push(AlertType::HighErrorRate {
998 rate: metrics.error_rate,
999 });
1000 }
1001
1002 if metrics.memory_usage_mb > self.alert_config.memory_threshold_mb {
1004 alerts.push(AlertType::MemoryExhaustion {
1005 usage_mb: metrics.memory_usage_mb,
1006 });
1007 }
1008
1009 if metrics.data_quality_score < 0.8 {
1011 alerts.push(AlertType::DataQualityIssue {
1012 score: metrics.data_quality_score,
1013 });
1014 }
1015
1016 if !alerts.is_empty() {
1017 self.last_alert_times
1018 .insert(cooldown_key.to_string(), current_time);
1019 }
1020
1021 Ok(alerts)
1022 }
1023
1024 #[cfg(feature = "monitoring")]
1026 pub fn export_prometheus_metrics(&self) -> Result<String> {
1027 use prometheus::Encoder;
1028 let encoder = prometheus::TextEncoder::new();
1029 let metric_families = self.metrics_registry.gather();
1030 encoder.encode_to_string(&metric_families).map_err(|e| {
1031 TransformError::ComputationError(format!("Failed to encode metrics: {}", e))
1032 })
1033 }
1034}
1035
1036#[allow(dead_code)]
1037fn current_timestamp() -> u64 {
1038 SystemTime::now()
1039 .duration_since(UNIX_EPOCH)
1040 .unwrap_or_else(|_| Duration::from_secs(0))
1041 .as_secs()
1042}
1043
1044#[cfg(feature = "monitoring")]
1046pub struct AdvancedAnomalyDetector {
1047 statistical_detectors: HashMap<String, StatisticalDetector>,
1049 ml_detectors: HashMap<String, MLAnomalyDetector>,
1051 time_series_detectors: HashMap<String, TimeSeriesAnomalyDetector>,
1053 ensemble_detector: Option<EnsembleAnomalyDetector>,
1055 anomaly_history: VecDeque<AnomalyRecord>,
1057 thresholds: AnomalyThresholds,
1059}
1060
1061#[cfg(feature = "monitoring")]
1063#[derive(Debug, Clone)]
1064pub struct StatisticalDetector {
1065 z_score_threshold: f64,
1067 iqr_multiplier: f64,
1069 modified_z_threshold: f64,
1071 data_window: VecDeque<f64>,
1073 max_window_size: usize,
1075}
1076
1077#[cfg(feature = "monitoring")]
1079pub struct MLAnomalyDetector {
1080 isolation_forest_config: IsolationForestConfig,
1082 svm_config: OneClassSVMConfig,
1084 lof_config: LOFConfig,
1086 training_data: VecDeque<Vec<f64>>,
1088 model_trained: bool,
1090}
1091
1092#[cfg(feature = "monitoring")]
1094pub struct TimeSeriesAnomalyDetector {
1095 arima_config: ARIMAConfig,
1097 seasonal_config: SeasonalConfig,
1099 change_point_config: ChangePointConfig,
1101 time_series_data: VecDeque<TimeSeriesPoint>,
1103 forecast_model: Option<ForecastModel>,
1105}
1106
1107#[cfg(feature = "monitoring")]
1109pub struct EnsembleAnomalyDetector {
1110 detector_weights: HashMap<String, f64>,
1112 voting_threshold: f64,
1114 confidence_threshold: f64,
1116}
1117
1118#[cfg(feature = "monitoring")]
1119impl EnsembleAnomalyDetector {
1120 pub fn new(
1122 detector_weights: HashMap<String, f64>,
1123 voting_threshold: f64,
1124 confidence_threshold: f64,
1125 ) -> Self {
1126 EnsembleAnomalyDetector {
1127 detector_weights,
1128 voting_threshold,
1129 confidence_threshold,
1130 }
1131 }
1132
1133 pub fn detect_ensemble_anomalies(
1135 &self,
1136 metrics: &HashMap<String, f64>,
1137 _timestamp: u64,
1138 ) -> Result<Vec<AnomalyRecord>> {
1139 Ok(vec![])
1145 }
1146}
1147
1148#[cfg(feature = "monitoring")]
1150#[derive(Debug, Clone)]
1151pub struct AnomalyRecord {
1152 pub timestamp: u64,
1154 pub metric_name: String,
1156 pub value: f64,
1158 pub anomaly_score: f64,
1160 pub detection_method: String,
1162 pub severity: AnomalySeverity,
1164 pub context: HashMap<String, String>,
1166}
1167
1168#[cfg(feature = "monitoring")]
1170#[derive(Debug, Clone, PartialEq)]
1171pub enum AnomalySeverity {
1172 Low,
1174 Medium,
1176 High,
1178 Critical,
1180}
1181
1182#[cfg(feature = "monitoring")]
1184#[derive(Debug, Clone)]
1185pub struct AnomalyThresholds {
1186 pub low_threshold: f64,
1188 pub medium_threshold: f64,
1190 pub high_threshold: f64,
1192 pub critical_threshold: f64,
1194}
1195
1196impl Default for AnomalyThresholds {
1197 fn default() -> Self {
1198 AnomalyThresholds {
1199 low_threshold: 2.0, medium_threshold: 2.5, high_threshold: 3.0, critical_threshold: 4.0, }
1204 }
1205}
1206
1207#[cfg(feature = "monitoring")]
1209#[derive(Debug, Clone)]
1210pub struct TimeSeriesPoint {
1211 pub timestamp: u64,
1213 pub value: f64,
1215 pub metadata: HashMap<String, String>,
1217}
1218
1219#[cfg(feature = "monitoring")]
1221#[derive(Debug, Clone)]
1222pub struct IsolationForestConfig {
1223 pub n_trees: usize,
1225 pub contamination: f64,
1227 pub max_samples: usize,
1229}
1230
1231#[cfg(feature = "monitoring")]
1233#[derive(Debug, Clone)]
1234pub struct OneClassSVMConfig {
1235 pub nu: f64,
1237 pub gamma: f64,
1239 pub kernel: String,
1241}
1242
1243#[cfg(feature = "monitoring")]
1245#[derive(Debug, Clone)]
1246pub struct LOFConfig {
1247 pub n_neighbors: usize,
1249 pub contamination: f64,
1251}
1252
1253#[cfg(feature = "monitoring")]
1255#[derive(Debug, Clone)]
1256pub struct ARIMAConfig {
1257 pub p: usize,
1259 pub d: usize,
1261 pub q: usize,
1263}
1264
1265#[cfg(feature = "monitoring")]
1267#[derive(Debug, Clone)]
1268pub struct SeasonalConfig {
1269 pub seasonal_period: usize,
1271 pub trend_component: bool,
1273 pub seasonal_component: bool,
1275}
1276
1277#[cfg(feature = "monitoring")]
1279#[derive(Debug, Clone)]
1280pub struct ChangePointConfig {
1281 pub window_size: usize,
1283 pub significance_level: f64,
1285}
1286
1287#[cfg(feature = "monitoring")]
1289#[derive(Debug, Clone)]
1290pub struct ForecastModel {
1291 pub coefficients: Vec<f64>,
1293 pub forecast_horizon: usize,
1295 pub confidence_interval: f64,
1297}
1298
1299#[cfg(feature = "monitoring")]
1300impl AdvancedAnomalyDetector {
1301 pub fn new() -> Self {
1303 AdvancedAnomalyDetector {
1304 statistical_detectors: HashMap::new(),
1305 ml_detectors: HashMap::new(),
1306 time_series_detectors: HashMap::new(),
1307 ensemble_detector: None,
1308 anomaly_history: VecDeque::with_capacity(10000),
1309 thresholds: AnomalyThresholds::default(),
1310 }
1311 }
1312
1313 pub fn add_statistical_detector(&mut self, metricname: String, detector: StatisticalDetector) {
1315 self.statistical_detectors.insert(metricname, detector);
1316 }
1317
1318 pub fn add_ml_detector(&mut self, metricname: String, detector: MLAnomalyDetector) {
1320 self.ml_detectors.insert(metricname, detector);
1321 }
1322
1323 pub fn add_time_series_detector(
1325 &mut self,
1326 metric_name: String,
1327 detector: TimeSeriesAnomalyDetector,
1328 ) {
1329 self.time_series_detectors.insert(metric_name, detector);
1330 }
1331
1332 pub fn configure_ensemble(&mut self, detector: EnsembleAnomalyDetector) {
1334 self.ensemble_detector = Some(detector);
1335 }
1336
1337 pub fn detect_anomalies(
1339 &mut self,
1340 metrics: &HashMap<String, f64>,
1341 ) -> Result<Vec<AnomalyRecord>> {
1342 let mut anomalies = Vec::new();
1343 let timestamp = current_timestamp();
1344
1345 for (metric_name, &value) in metrics {
1346 if let Some(detector) = self.statistical_detectors.get_mut(metric_name) {
1348 if let Some(anomaly) = detector.detect_anomaly(value, metric_name, timestamp)? {
1349 anomalies.push(anomaly);
1350 }
1351 }
1352
1353 if let Some(detector) = self.ml_detectors.get_mut(metric_name) {
1355 if let Some(anomaly) = detector.detect_anomaly(value, metric_name, timestamp)? {
1356 anomalies.push(anomaly);
1357 }
1358 }
1359
1360 if let Some(detector) = self.time_series_detectors.get_mut(metric_name) {
1362 if let Some(anomaly) = detector.detect_anomaly(value, metric_name, timestamp)? {
1363 anomalies.push(anomaly);
1364 }
1365 }
1366 }
1367
1368 if let Some(ref ensemble) = self.ensemble_detector {
1370 let ensemble_anomalies = ensemble.detect_ensemble_anomalies(metrics, timestamp)?;
1371 anomalies.extend(ensemble_anomalies);
1372 }
1373
1374 for anomaly in &anomalies {
1376 self.anomaly_history.push_back(anomaly.clone());
1377 if self.anomaly_history.len() > 10000 {
1378 self.anomaly_history.pop_front();
1379 }
1380 }
1381
1382 Ok(anomalies)
1383 }
1384
1385 pub fn get_anomaly_insights(&self, lookbackhours: u64) -> AnomalyInsights {
1387 let cutoff_time = current_timestamp() - (lookbackhours * 3600);
1388 let recent_anomalies: Vec<_> = self
1389 .anomaly_history
1390 .iter()
1391 .filter(|a| a.timestamp >= cutoff_time)
1392 .collect();
1393
1394 let total_anomalies = recent_anomalies.len();
1395 let critical_anomalies = recent_anomalies
1396 .iter()
1397 .filter(|a| a.severity == AnomalySeverity::Critical)
1398 .count();
1399
1400 let mut metric_frequencies = HashMap::new();
1402 for anomaly in &recent_anomalies {
1403 *metric_frequencies
1404 .entry(anomaly.metric_name.clone())
1405 .or_insert(0) += 1;
1406 }
1407
1408 let mut method_frequencies = HashMap::new();
1410 for anomaly in &recent_anomalies {
1411 *method_frequencies
1412 .entry(anomaly.detection_method.clone())
1413 .or_insert(0) += 1;
1414 }
1415
1416 let trending_metrics = self.identify_trending_anomalies(&recent_anomalies);
1418
1419 let most_anomalous_metric = metric_frequencies
1420 .iter()
1421 .max_by_key(|(_, &count)| count)
1422 .map(|(metric_, _)| metric_.clone());
1423
1424 AnomalyInsights {
1425 total_anomalies,
1426 critical_anomalies,
1427 anomaly_rate: total_anomalies as f64 / lookbackhours as f64,
1428 metric_frequencies,
1429 method_frequencies,
1430 trending_metrics,
1431 most_anomalous_metric,
1432 }
1433 }
1434
1435 fn identify_trending_anomalies(&self, anomalies: &[&AnomalyRecord]) -> Vec<String> {
1437 let mut recent_counts = HashMap::new();
1439 let current_time = current_timestamp();
1440 let recent_threshold = 3600; for anomaly in anomalies {
1443 if current_time - anomaly.timestamp <= recent_threshold {
1444 *recent_counts
1445 .entry(anomaly.metric_name.clone())
1446 .or_insert(0) += 1;
1447 }
1448 }
1449
1450 recent_counts
1451 .into_iter()
1452 .filter(|(_, count)| *count >= 3) .map(|(metric_, _)| metric_)
1454 .collect()
1455 }
1456
1457 pub fn update_detector_configurations(&mut self, feedback: AnomalyFeedback) -> Result<()> {
1459 match feedback.feedback_type {
1460 FeedbackType::FalsePositive => {
1461 self.adjust_thresholds_for_detector(&feedback.detection_method, 0.1)?;
1463 }
1464 FeedbackType::FalseNegative => {
1465 self.adjust_thresholds_for_detector(&feedback.detection_method, -0.1)?;
1467 }
1468 FeedbackType::ConfirmedAnomaly => {
1469 }
1471 }
1472 Ok(())
1473 }
1474
1475 fn adjust_thresholds_for_detector(
1476 &mut self,
1477 detection_method: &str,
1478 adjustment: f64,
1479 ) -> Result<()> {
1480 match detection_method {
1482 "statistical" => {
1483 for detector in self.statistical_detectors.values_mut() {
1484 detector.z_score_threshold += adjustment;
1485 detector.z_score_threshold = detector.z_score_threshold.clamp(1.5, 5.0);
1486 }
1487 }
1488 "ml" => {
1489 for detector in self.ml_detectors.values_mut() {
1491 detector.isolation_forest_config.contamination += adjustment * 0.01;
1492 detector.isolation_forest_config.contamination = detector
1493 .isolation_forest_config
1494 .contamination
1495 .max(0.01)
1496 .min(0.5);
1497 }
1498 }
1499 _ => {}
1500 }
1501 Ok(())
1502 }
1503}
1504
1505#[cfg(feature = "monitoring")]
1506impl StatisticalDetector {
1507 pub fn new(z_score_threshold: f64, iqr_multiplier: f64, max_window_size: usize) -> Self {
1509 StatisticalDetector {
1510 z_score_threshold,
1511 iqr_multiplier,
1512 modified_z_threshold: z_score_threshold * 0.6745, data_window: VecDeque::with_capacity(max_window_size),
1514 max_window_size,
1515 }
1516 }
1517
1518 pub fn detect_anomaly(
1520 &mut self,
1521 value: f64,
1522 metric_name: &str,
1523 timestamp: u64,
1524 ) -> Result<Option<AnomalyRecord>> {
1525 self.data_window.push_back(value);
1527 if self.data_window.len() > self.max_window_size {
1528 self.data_window.pop_front();
1529 }
1530
1531 if self.data_window.len() < 10 {
1533 return Ok(None);
1534 }
1535
1536 let values: Vec<f64> = self.data_window.iter().copied().collect();
1537
1538 let mean = values.iter().sum::<f64>() / values.len() as f64;
1540 let variance = values.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / values.len() as f64;
1541 let std_dev = variance.sqrt();
1542
1543 if std_dev > 0.0 {
1544 let z_score = (value - mean) / std_dev;
1545
1546 if z_score.abs() > self.z_score_threshold {
1547 let severity = if z_score.abs() > 4.0 {
1548 AnomalySeverity::Critical
1549 } else if z_score.abs() > 3.0 {
1550 AnomalySeverity::High
1551 } else if z_score.abs() > 2.5 {
1552 AnomalySeverity::Medium
1553 } else {
1554 AnomalySeverity::Low
1555 };
1556
1557 return Ok(Some(AnomalyRecord {
1558 timestamp,
1559 metric_name: metric_name.to_string(),
1560 value,
1561 anomaly_score: z_score.abs(),
1562 detection_method: "statistical_zscore".to_string(),
1563 severity,
1564 context: [
1565 ("mean".to_string(), mean.to_string()),
1566 ("std_dev".to_string(), std_dev.to_string()),
1567 ("z_score".to_string(), z_score.to_string()),
1568 ]
1569 .iter()
1570 .cloned()
1571 .collect(),
1572 }));
1573 }
1574 }
1575
1576 let mut sorted_values = values.clone();
1578 sorted_values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
1579
1580 let q1_idx = sorted_values.len() / 4;
1581 let q3_idx = (3 * sorted_values.len()) / 4;
1582 let q1 = sorted_values[q1_idx];
1583 let q3 = sorted_values[q3_idx];
1584 let iqr = q3 - q1;
1585
1586 if iqr > 0.0 {
1587 let lower_bound = q1 - self.iqr_multiplier * iqr;
1588 let upper_bound = q3 + self.iqr_multiplier * iqr;
1589
1590 if value < lower_bound || value > upper_bound {
1591 let distance_from_bounds = if value < lower_bound {
1592 lower_bound - value
1593 } else {
1594 value - upper_bound
1595 };
1596
1597 let severity = if distance_from_bounds > 3.0 * iqr {
1598 AnomalySeverity::Critical
1599 } else if distance_from_bounds > 2.0 * iqr {
1600 AnomalySeverity::High
1601 } else if distance_from_bounds > 1.5 * iqr {
1602 AnomalySeverity::Medium
1603 } else {
1604 AnomalySeverity::Low
1605 };
1606
1607 return Ok(Some(AnomalyRecord {
1608 timestamp,
1609 metric_name: metric_name.to_string(),
1610 value,
1611 anomaly_score: distance_from_bounds / iqr,
1612 detection_method: "statistical_iqr".to_string(),
1613 severity,
1614 context: [
1615 ("q1".to_string(), q1.to_string()),
1616 ("q3".to_string(), q3.to_string()),
1617 ("iqr".to_string(), iqr.to_string()),
1618 (
1619 "distance_from_bounds".to_string(),
1620 distance_from_bounds.to_string(),
1621 ),
1622 ]
1623 .iter()
1624 .cloned()
1625 .collect(),
1626 }));
1627 }
1628 }
1629
1630 Ok(None)
1631 }
1632}
1633
1634#[cfg(feature = "monitoring")]
1635impl MLAnomalyDetector {
1636 pub fn new() -> Self {
1638 MLAnomalyDetector {
1639 isolation_forest_config: IsolationForestConfig {
1640 n_trees: 100,
1641 contamination: 0.1,
1642 max_samples: 256,
1643 },
1644 svm_config: OneClassSVMConfig {
1645 nu: 0.1,
1646 gamma: 0.1,
1647 kernel: "rbf".to_string(),
1648 },
1649 lof_config: LOFConfig {
1650 n_neighbors: 20,
1651 contamination: 0.1,
1652 },
1653 training_data: VecDeque::with_capacity(1000),
1654 model_trained: false,
1655 }
1656 }
1657
1658 pub fn detect_anomaly(
1660 &mut self,
1661 value: f64,
1662 metric_name: &str,
1663 timestamp: u64,
1664 ) -> Result<Option<AnomalyRecord>> {
1665 self.training_data.push_back(vec![value]);
1667 if self.training_data.len() > 1000 {
1668 self.training_data.pop_front();
1669 }
1670
1671 if self.training_data.len() < 50 {
1673 return Ok(None);
1674 }
1675
1676 let anomaly_score = self.simplified_isolation_forest_score(value)?;
1678
1679 if anomaly_score > 0.6 {
1680 let severity = if anomaly_score > 0.9 {
1682 AnomalySeverity::Critical
1683 } else if anomaly_score > 0.8 {
1684 AnomalySeverity::High
1685 } else if anomaly_score > 0.7 {
1686 AnomalySeverity::Medium
1687 } else {
1688 AnomalySeverity::Low
1689 };
1690
1691 return Ok(Some(AnomalyRecord {
1692 timestamp,
1693 metric_name: metric_name.to_string(),
1694 value,
1695 anomaly_score,
1696 detection_method: "ml_isolation_forest".to_string(),
1697 severity,
1698 context: [
1699 ("isolation_score".to_string(), anomaly_score.to_string()),
1700 (
1701 "training_samples".to_string(),
1702 self.training_data.len().to_string(),
1703 ),
1704 ]
1705 .iter()
1706 .cloned()
1707 .collect(),
1708 }));
1709 }
1710
1711 Ok(None)
1712 }
1713
1714 fn simplified_isolation_forest_score(&self, value: f64) -> Result<f64> {
1716 let data: Vec<f64> = self.training_data.iter().map(|v| v[0]).collect();
1717
1718 let mut sorted_data = data.clone();
1720 sorted_data.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
1721
1722 let position = sorted_data
1723 .iter()
1724 .position(|&x| x >= value)
1725 .unwrap_or(sorted_data.len());
1726 let percentile = position as f64 / sorted_data.len() as f64;
1727
1728 let distance_from_median = (percentile - 0.5).abs() * 2.0;
1730
1731 Ok(distance_from_median)
1732 }
1733}
1734
1735#[cfg(feature = "monitoring")]
1736impl TimeSeriesAnomalyDetector {
1737 pub fn new() -> Self {
1739 TimeSeriesAnomalyDetector {
1740 arima_config: ARIMAConfig { p: 1, d: 1, q: 1 },
1741 seasonal_config: SeasonalConfig {
1742 seasonal_period: 24, trend_component: true,
1744 seasonal_component: true,
1745 },
1746 change_point_config: ChangePointConfig {
1747 window_size: 50,
1748 significance_level: 0.05,
1749 },
1750 time_series_data: VecDeque::with_capacity(1000),
1751 forecast_model: None,
1752 }
1753 }
1754
1755 pub fn detect_anomaly(
1757 &mut self,
1758 value: f64,
1759 metric_name: &str,
1760 timestamp: u64,
1761 ) -> Result<Option<AnomalyRecord>> {
1762 self.time_series_data.push_back(TimeSeriesPoint {
1764 timestamp,
1765 value,
1766 metadata: HashMap::new(),
1767 });
1768
1769 if self.time_series_data.len() > 1000 {
1770 self.time_series_data.pop_front();
1771 }
1772
1773 if self.time_series_data.len() < 50 {
1775 return Ok(None);
1776 }
1777
1778 let anomaly_score = self.detect_change_point(value)?;
1780
1781 if anomaly_score > 2.0 {
1782 let severity = if anomaly_score > 5.0 {
1784 AnomalySeverity::Critical
1785 } else if anomaly_score > 4.0 {
1786 AnomalySeverity::High
1787 } else if anomaly_score > 3.0 {
1788 AnomalySeverity::Medium
1789 } else {
1790 AnomalySeverity::Low
1791 };
1792
1793 return Ok(Some(AnomalyRecord {
1794 timestamp,
1795 metric_name: metric_name.to_string(),
1796 value,
1797 anomaly_score,
1798 detection_method: "time_series_change_point".to_string(),
1799 severity,
1800 context: [
1801 ("change_point_score".to_string(), anomaly_score.to_string()),
1802 (
1803 "window_size".to_string(),
1804 self.change_point_config.window_size.to_string(),
1805 ),
1806 ]
1807 .iter()
1808 .cloned()
1809 .collect(),
1810 }));
1811 }
1812
1813 Ok(None)
1814 }
1815
1816 fn detect_change_point(&self, current_value: f64) -> Result<f64> {
1818 let window_size = self
1819 .change_point_config
1820 .window_size
1821 .min(self.time_series_data.len());
1822 if window_size < 10 {
1823 return Ok(0.0);
1824 }
1825
1826 let recent_data: Vec<f64> = self
1827 .time_series_data
1828 .iter()
1829 .rev()
1830 .take(window_size)
1831 .map(|p| p.value)
1832 .collect();
1833
1834 let half_window = window_size / 2;
1835 let first_half: Vec<f64> = recent_data.iter().take(half_window).copied().collect();
1836 let second_half: Vec<f64> = recent_data.iter().skip(half_window).copied().collect();
1837
1838 if first_half.is_empty() || second_half.is_empty() {
1839 return Ok(0.0);
1840 }
1841
1842 let mean1 = first_half.iter().sum::<f64>() / first_half.len() as f64;
1843 let mean2 = second_half.iter().sum::<f64>() / second_half.len() as f64;
1844
1845 let var1 =
1846 first_half.iter().map(|x| (x - mean1).powi(2)).sum::<f64>() / first_half.len() as f64;
1847 let var2 =
1848 second_half.iter().map(|x| (x - mean2).powi(2)).sum::<f64>() / second_half.len() as f64;
1849
1850 let pooled_std = ((var1 + var2) / 2.0).sqrt();
1851
1852 if pooled_std > 0.0 {
1853 let t_statistic =
1854 (mean2 - mean1).abs() / (pooled_std * (2.0_f64 / window_size as f64).sqrt());
1855 Ok(t_statistic)
1856 } else {
1857 Ok(0.0)
1858 }
1859 }
1860}
1861
1862#[cfg(feature = "monitoring")]
1864#[derive(Debug, Clone)]
1865pub struct AnomalyInsights {
1866 pub total_anomalies: usize,
1868 pub critical_anomalies: usize,
1870 pub anomaly_rate: f64,
1872 pub metric_frequencies: HashMap<String, usize>,
1874 pub method_frequencies: HashMap<String, usize>,
1876 pub trending_metrics: Vec<String>,
1878 pub most_anomalous_metric: Option<String>,
1880}
1881
1882#[cfg(feature = "monitoring")]
1884#[derive(Debug, Clone)]
1885pub struct AnomalyFeedback {
1886 pub anomaly_id: String,
1888 pub feedback_type: FeedbackType,
1890 pub detection_method: String,
1892 pub metric_name: String,
1894 pub timestamp: u64,
1896}
1897
1898#[cfg(feature = "monitoring")]
1900#[derive(Debug, Clone)]
1901pub enum FeedbackType {
1902 FalsePositive,
1904 FalseNegative,
1906 ConfirmedAnomaly,
1908}
1909
1910#[cfg(not(feature = "monitoring"))]
1912pub struct AdvancedAnomalyDetector;
1913
1914#[cfg(not(feature = "monitoring"))]
1915pub struct AnomalyInsights;