mockforge_chaos/
predictive_remediation.rs

1use serde::{Deserialize, Serialize};
2use std::collections::{HashMap, VecDeque};
3use std::sync::Arc;
4use tokio::sync::RwLock;
5
6use crate::reinforcement_learning::{RemediationAction, SystemState};
7
8/// Time series data point
9#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct DataPoint {
11    pub timestamp: chrono::DateTime<chrono::Utc>,
12    pub value: f64,
13}
14
15/// Metric type for prediction
16#[derive(Debug, Clone, Serialize, Deserialize, Hash, Eq, PartialEq)]
17pub enum MetricType {
18    ErrorRate,
19    Latency,
20    CpuUsage,
21    MemoryUsage,
22    RequestRate,
23    FailureCount,
24}
25
26/// Time series for a metric
27#[derive(Debug, Clone)]
28pub struct TimeSeries {
29    pub metric: MetricType,
30    pub data: VecDeque<DataPoint>,
31    pub max_size: usize,
32}
33
34impl TimeSeries {
35    pub fn new(metric: MetricType, max_size: usize) -> Self {
36        Self {
37            metric,
38            data: VecDeque::with_capacity(max_size),
39            max_size,
40        }
41    }
42
43    pub fn add(&mut self, point: DataPoint) {
44        if self.data.len() >= self.max_size {
45            self.data.pop_front();
46        }
47        self.data.push_back(point);
48    }
49
50    pub fn values(&self) -> Vec<f64> {
51        self.data.iter().map(|p| p.value).collect()
52    }
53
54    /// Calculate moving average
55    pub fn moving_average(&self, window: usize) -> Vec<f64> {
56        let values = self.values();
57        if values.len() < window {
58            return vec![];
59        }
60
61        let mut averages = Vec::new();
62        for i in 0..=(values.len() - window) {
63            let sum: f64 = values[i..i + window].iter().sum();
64            averages.push(sum / window as f64);
65        }
66        averages
67    }
68
69    /// Calculate exponential moving average
70    pub fn exponential_moving_average(&self, alpha: f64) -> Vec<f64> {
71        let values = self.values();
72        if values.is_empty() {
73            return vec![];
74        }
75
76        let mut ema = Vec::new();
77        ema.push(values[0]);
78
79        for i in 1..values.len() {
80            let e = alpha * values[i] + (1.0 - alpha) * ema[i - 1];
81            ema.push(e);
82        }
83
84        ema
85    }
86
87    /// Simple linear regression for trend
88    pub fn linear_trend(&self) -> Option<(f64, f64)> {
89        let values = self.values();
90        let n = values.len();
91
92        if n < 2 {
93            return None;
94        }
95
96        let x: Vec<f64> = (0..n).map(|i| i as f64).collect();
97        let y = values;
98
99        let x_mean = x.iter().sum::<f64>() / n as f64;
100        let y_mean = y.iter().sum::<f64>() / n as f64;
101
102        let mut numerator = 0.0;
103        let mut denominator = 0.0;
104
105        for i in 0..n {
106            numerator += (x[i] - x_mean) * (y[i] - y_mean);
107            denominator += (x[i] - x_mean).powi(2);
108        }
109
110        if denominator == 0.0 {
111            return None;
112        }
113
114        let slope = numerator / denominator;
115        let intercept = y_mean - slope * x_mean;
116
117        Some((slope, intercept))
118    }
119
120    /// Predict next N values using linear trend
121    pub fn predict_linear(&self, steps: usize) -> Vec<f64> {
122        if let Some((slope, intercept)) = self.linear_trend() {
123            let current_x = self.data.len() as f64;
124            (0..steps).map(|i| slope * (current_x + i as f64) + intercept).collect()
125        } else {
126            vec![]
127        }
128    }
129}
130
131/// Anomaly detection using statistical methods
132#[derive(Debug, Clone)]
133pub struct AnomalyDetector {
134    threshold_multiplier: f64, // Standard deviations for anomaly
135}
136
137impl AnomalyDetector {
138    pub fn new(threshold_multiplier: f64) -> Self {
139        Self {
140            threshold_multiplier,
141        }
142    }
143
144    /// Detect anomalies using z-score
145    pub fn detect_zscore(&self, series: &TimeSeries) -> Vec<(usize, f64)> {
146        let values = series.values();
147        if values.len() < 2 {
148            return vec![];
149        }
150
151        let mean = values.iter().sum::<f64>() / values.len() as f64;
152        let variance = values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / values.len() as f64;
153        let std_dev = variance.sqrt();
154
155        if std_dev == 0.0 {
156            return vec![];
157        }
158
159        let mut anomalies = Vec::new();
160        for (i, value) in values.iter().enumerate() {
161            let z_score = (value - mean).abs() / std_dev;
162            if z_score > self.threshold_multiplier {
163                anomalies.push((i, *value));
164            }
165        }
166
167        anomalies
168    }
169
170    /// Detect anomalies using IQR method
171    pub fn detect_iqr(&self, series: &TimeSeries) -> Vec<(usize, f64)> {
172        let mut values = series.values();
173        if values.len() < 4 {
174            return vec![];
175        }
176
177        values.sort_by(|a, b| a.partial_cmp(b).unwrap());
178
179        let q1_idx = values.len() / 4;
180        let q3_idx = (values.len() * 3) / 4;
181
182        let q1 = values[q1_idx];
183        let q3 = values[q3_idx];
184        let iqr = q3 - q1;
185
186        let lower_bound = q1 - 1.5 * iqr;
187        let upper_bound = q3 + 1.5 * iqr;
188
189        let original_values = series.values();
190        let mut anomalies = Vec::new();
191
192        for (i, value) in original_values.iter().enumerate() {
193            if *value < lower_bound || *value > upper_bound {
194                anomalies.push((i, *value));
195            }
196        }
197
198        anomalies
199    }
200}
201
202/// Failure prediction
203#[derive(Debug, Clone, Serialize, Deserialize)]
204pub struct FailurePrediction {
205    pub metric: MetricType,
206    pub current_value: f64,
207    pub predicted_value: f64,
208    pub time_to_failure: Option<std::time::Duration>,
209    pub confidence: f64,
210    pub threshold: f64,
211    pub recommended_actions: Vec<RemediationAction>,
212}
213
214/// Predictive remediation engine
215pub struct PredictiveRemediationEngine {
216    time_series: Arc<RwLock<HashMap<MetricType, TimeSeries>>>,
217    anomaly_detector: AnomalyDetector,
218    prediction_horizon: usize, // Number of steps to predict ahead
219    thresholds: HashMap<MetricType, f64>,
220}
221
222impl PredictiveRemediationEngine {
223    pub fn new(prediction_horizon: usize) -> Self {
224        let mut thresholds = HashMap::new();
225        thresholds.insert(MetricType::ErrorRate, 50.0);
226        thresholds.insert(MetricType::Latency, 80.0);
227        thresholds.insert(MetricType::CpuUsage, 85.0);
228        thresholds.insert(MetricType::MemoryUsage, 90.0);
229        thresholds.insert(MetricType::FailureCount, 5.0);
230
231        Self {
232            time_series: Arc::new(RwLock::new(HashMap::new())),
233            anomaly_detector: AnomalyDetector::new(3.0),
234            prediction_horizon,
235            thresholds,
236        }
237    }
238
239    /// Record a metric value
240    pub async fn record(&self, metric: MetricType, value: f64) {
241        let mut series_map = self.time_series.write().await;
242
243        series_map
244            .entry(metric.clone())
245            .or_insert_with(|| TimeSeries::new(metric, 1000))
246            .add(DataPoint {
247                timestamp: chrono::Utc::now(),
248                value,
249            });
250    }
251
252    /// Predict failures for all metrics
253    pub async fn predict_failures(&self) -> Vec<FailurePrediction> {
254        let series_map = self.time_series.read().await;
255        let mut predictions = Vec::new();
256
257        for (metric, series) in series_map.iter() {
258            if let Some(prediction) = self.predict_failure_for_metric(metric, series).await {
259                predictions.push(prediction);
260            }
261        }
262
263        predictions
264    }
265
266    /// Predict failure for a specific metric
267    async fn predict_failure_for_metric(
268        &self,
269        metric: &MetricType,
270        series: &TimeSeries,
271    ) -> Option<FailurePrediction> {
272        if series.data.is_empty() {
273            return None;
274        }
275
276        let current_value = series.data.back()?.value;
277        let threshold = *self.thresholds.get(metric)?;
278
279        // Predict future values
280        let predictions = series.predict_linear(self.prediction_horizon);
281        if predictions.is_empty() {
282            return None;
283        }
284
285        // Find when threshold will be crossed
286        let mut time_to_failure = None;
287        let mut predicted_value = current_value;
288
289        for (i, pred) in predictions.iter().enumerate() {
290            if *pred > threshold && time_to_failure.is_none() {
291                // Assume 1 minute per step
292                time_to_failure = Some(std::time::Duration::from_secs((i as u64 + 1) * 60));
293                predicted_value = *pred;
294                break;
295            }
296        }
297
298        // Calculate confidence based on trend strength
299        let confidence = if let Some((slope, _)) = series.linear_trend() {
300            (slope.abs() * 10.0).min(1.0)
301        } else {
302            0.0
303        };
304
305        // Recommend actions based on metric type
306        let recommended_actions = self.recommend_actions(metric, predicted_value, threshold);
307
308        Some(FailurePrediction {
309            metric: metric.clone(),
310            current_value,
311            predicted_value,
312            time_to_failure,
313            confidence,
314            threshold,
315            recommended_actions,
316        })
317    }
318
319    /// Recommend remediation actions
320    fn recommend_actions(
321        &self,
322        metric: &MetricType,
323        predicted_value: f64,
324        threshold: f64,
325    ) -> Vec<RemediationAction> {
326        if predicted_value <= threshold {
327            return vec![];
328        }
329
330        match metric {
331            MetricType::ErrorRate => vec![
332                RemediationAction::EnableCircuitBreaker,
333                RemediationAction::RestartService,
334            ],
335            MetricType::Latency => {
336                vec![RemediationAction::ClearCache, RemediationAction::ScaleUp(2)]
337            }
338            MetricType::CpuUsage | MetricType::MemoryUsage => {
339                vec![
340                    RemediationAction::ScaleUp(2),
341                    RemediationAction::RestrictTraffic,
342                ]
343            }
344            MetricType::FailureCount => vec![
345                RemediationAction::RollbackDeployment,
346                RemediationAction::RestartService,
347            ],
348            MetricType::RequestRate => vec![
349                RemediationAction::ScaleUp(4),
350                RemediationAction::RestrictTraffic,
351            ],
352        }
353    }
354
355    /// Detect anomalies in metrics
356    pub async fn detect_anomalies(&self) -> HashMap<MetricType, Vec<(usize, f64)>> {
357        let series_map = self.time_series.read().await;
358        let mut anomalies = HashMap::new();
359
360        for (metric, series) in series_map.iter() {
361            let detected = self.anomaly_detector.detect_zscore(series);
362            if !detected.is_empty() {
363                anomalies.insert(metric.clone(), detected);
364            }
365        }
366
367        anomalies
368    }
369
370    /// Get current system state
371    pub async fn get_system_state(&self) -> SystemState {
372        let series_map = self.time_series.read().await;
373
374        let error_rate = series_map
375            .get(&MetricType::ErrorRate)
376            .and_then(|s| s.data.back())
377            .map(|p| p.value as u8)
378            .unwrap_or(0);
379
380        let latency_level = series_map
381            .get(&MetricType::Latency)
382            .and_then(|s| s.data.back())
383            .map(|p| p.value as u8)
384            .unwrap_or(0);
385
386        let cpu_usage = series_map
387            .get(&MetricType::CpuUsage)
388            .and_then(|s| s.data.back())
389            .map(|p| p.value as u8)
390            .unwrap_or(0);
391
392        let memory_usage = series_map
393            .get(&MetricType::MemoryUsage)
394            .and_then(|s| s.data.back())
395            .map(|p| p.value as u8)
396            .unwrap_or(0);
397
398        let active_failures = series_map
399            .get(&MetricType::FailureCount)
400            .and_then(|s| s.data.back())
401            .map(|p| p.value as u8)
402            .unwrap_or(0);
403
404        let service_health = if error_rate > 80 || active_failures > 5 {
405            "critical".to_string()
406        } else if error_rate > 50 || latency_level > 70 {
407            "degraded".to_string()
408        } else {
409            "healthy".to_string()
410        };
411
412        SystemState {
413            error_rate,
414            latency_level,
415            cpu_usage,
416            memory_usage,
417            active_failures,
418            service_health,
419        }
420    }
421
422    /// Proactive remediation: apply actions before failure
423    pub async fn proactive_remediate(&self) -> Vec<RemediationAction> {
424        let predictions = self.predict_failures().await;
425        let mut actions = Vec::new();
426
427        for prediction in predictions {
428            // Only remediate if failure is imminent (< 5 minutes) and confidence is high
429            if let Some(ttf) = prediction.time_to_failure {
430                if ttf.as_secs() < 300 && prediction.confidence > 0.6 {
431                    actions.extend(prediction.recommended_actions);
432                }
433            }
434        }
435
436        // Deduplicate actions
437        actions.sort_by_key(|a| format!("{:?}", a));
438        actions.dedup();
439
440        actions
441    }
442}
443
444/// Trend analyzer for long-term patterns
445pub struct TrendAnalyzer {
446    engine: Arc<PredictiveRemediationEngine>,
447}
448
449impl TrendAnalyzer {
450    pub fn new(engine: Arc<PredictiveRemediationEngine>) -> Self {
451        Self { engine }
452    }
453
454    /// Analyze trends across all metrics
455    pub async fn analyze_trends(&self) -> TrendReport {
456        let series_map = self.engine.time_series.read().await;
457        let mut trends = HashMap::new();
458
459        for (metric, series) in series_map.iter() {
460            if let Some((slope, _)) = series.linear_trend() {
461                let direction = if slope > 0.1 {
462                    TrendDirection::Increasing
463                } else if slope < -0.1 {
464                    TrendDirection::Decreasing
465                } else {
466                    TrendDirection::Stable
467                };
468
469                trends.insert(
470                    metric.clone(),
471                    MetricTrend {
472                        direction,
473                        slope,
474                        confidence: (slope.abs() * 10.0).min(1.0),
475                    },
476                );
477            }
478        }
479
480        TrendReport { trends }
481    }
482}
483
484#[derive(Debug, Clone, Serialize, Deserialize)]
485pub struct TrendReport {
486    pub trends: HashMap<MetricType, MetricTrend>,
487}
488
489#[derive(Debug, Clone, Serialize, Deserialize)]
490pub struct MetricTrend {
491    pub direction: TrendDirection,
492    pub slope: f64,
493    pub confidence: f64,
494}
495
496#[derive(Debug, Clone, Serialize, Deserialize)]
497pub enum TrendDirection {
498    Increasing,
499    Decreasing,
500    Stable,
501}
502
503#[cfg(test)]
504mod tests {
505    use super::*;
506
507    #[tokio::test]
508    async fn test_time_series() {
509        let mut series = TimeSeries::new(MetricType::ErrorRate, 100);
510
511        for i in 0..50 {
512            series.add(DataPoint {
513                timestamp: chrono::Utc::now(),
514                value: i as f64,
515            });
516        }
517
518        let values = series.values();
519        assert_eq!(values.len(), 50);
520
521        let trend = series.linear_trend();
522        assert!(trend.is_some());
523    }
524
525    #[tokio::test]
526    async fn test_prediction() {
527        let engine = PredictiveRemediationEngine::new(10);
528
529        // Simulate increasing error rate
530        for i in 0..20 {
531            engine.record(MetricType::ErrorRate, (i * 5) as f64).await;
532        }
533
534        let predictions = engine.predict_failures().await;
535        assert!(!predictions.is_empty());
536    }
537
538    #[tokio::test]
539    async fn test_anomaly_detection() {
540        let mut series = TimeSeries::new(MetricType::CpuUsage, 100);
541
542        // Normal values
543        for _ in 0..50 {
544            series.add(DataPoint {
545                timestamp: chrono::Utc::now(),
546                value: 50.0,
547            });
548        }
549
550        // Anomaly
551        series.add(DataPoint {
552            timestamp: chrono::Utc::now(),
553            value: 200.0,
554        });
555
556        let detector = AnomalyDetector::new(3.0);
557        let anomalies = detector.detect_zscore(&series);
558
559        assert!(!anomalies.is_empty());
560    }
561}