1use serde::{Deserialize, Serialize};
2use std::collections::{HashMap, VecDeque};
3use std::sync::Arc;
4use tokio::sync::RwLock;
5
6use crate::reinforcement_learning::{RemediationAction, SystemState};
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct DataPoint {
11 pub timestamp: chrono::DateTime<chrono::Utc>,
12 pub value: f64,
13}
14
15#[derive(Debug, Clone, Serialize, Deserialize, Hash, Eq, PartialEq)]
17pub enum MetricType {
18 ErrorRate,
19 Latency,
20 CpuUsage,
21 MemoryUsage,
22 RequestRate,
23 FailureCount,
24}
25
26#[derive(Debug, Clone)]
28pub struct TimeSeries {
29 pub metric: MetricType,
30 pub data: VecDeque<DataPoint>,
31 pub max_size: usize,
32}
33
34impl TimeSeries {
35 pub fn new(metric: MetricType, max_size: usize) -> Self {
36 Self {
37 metric,
38 data: VecDeque::with_capacity(max_size),
39 max_size,
40 }
41 }
42
43 pub fn add(&mut self, point: DataPoint) {
44 if self.data.len() >= self.max_size {
45 self.data.pop_front();
46 }
47 self.data.push_back(point);
48 }
49
50 pub fn values(&self) -> Vec<f64> {
51 self.data.iter().map(|p| p.value).collect()
52 }
53
54 pub fn moving_average(&self, window: usize) -> Vec<f64> {
56 let values = self.values();
57 if values.len() < window {
58 return vec![];
59 }
60
61 let mut averages = Vec::new();
62 for i in 0..=(values.len() - window) {
63 let sum: f64 = values[i..i + window].iter().sum();
64 averages.push(sum / window as f64);
65 }
66 averages
67 }
68
69 pub fn exponential_moving_average(&self, alpha: f64) -> Vec<f64> {
71 let values = self.values();
72 if values.is_empty() {
73 return vec![];
74 }
75
76 let mut ema = Vec::new();
77 ema.push(values[0]);
78
79 for i in 1..values.len() {
80 let e = alpha * values[i] + (1.0 - alpha) * ema[i - 1];
81 ema.push(e);
82 }
83
84 ema
85 }
86
87 pub fn linear_trend(&self) -> Option<(f64, f64)> {
89 let values = self.values();
90 let n = values.len();
91
92 if n < 2 {
93 return None;
94 }
95
96 let x: Vec<f64> = (0..n).map(|i| i as f64).collect();
97 let y = values;
98
99 let x_mean = x.iter().sum::<f64>() / n as f64;
100 let y_mean = y.iter().sum::<f64>() / n as f64;
101
102 let mut numerator = 0.0;
103 let mut denominator = 0.0;
104
105 for i in 0..n {
106 numerator += (x[i] - x_mean) * (y[i] - y_mean);
107 denominator += (x[i] - x_mean).powi(2);
108 }
109
110 if denominator == 0.0 {
111 return None;
112 }
113
114 let slope = numerator / denominator;
115 let intercept = y_mean - slope * x_mean;
116
117 Some((slope, intercept))
118 }
119
120 pub fn predict_linear(&self, steps: usize) -> Vec<f64> {
122 if let Some((slope, intercept)) = self.linear_trend() {
123 let current_x = self.data.len() as f64;
124 (0..steps).map(|i| slope * (current_x + i as f64) + intercept).collect()
125 } else {
126 vec![]
127 }
128 }
129}
130
131#[derive(Debug, Clone)]
133pub struct AnomalyDetector {
134 threshold_multiplier: f64, }
136
137impl AnomalyDetector {
138 pub fn new(threshold_multiplier: f64) -> Self {
139 Self {
140 threshold_multiplier,
141 }
142 }
143
144 pub fn detect_zscore(&self, series: &TimeSeries) -> Vec<(usize, f64)> {
146 let values = series.values();
147 if values.len() < 2 {
148 return vec![];
149 }
150
151 let mean = values.iter().sum::<f64>() / values.len() as f64;
152 let variance = values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / values.len() as f64;
153 let std_dev = variance.sqrt();
154
155 if std_dev == 0.0 {
156 return vec![];
157 }
158
159 let mut anomalies = Vec::new();
160 for (i, value) in values.iter().enumerate() {
161 let z_score = (value - mean).abs() / std_dev;
162 if z_score > self.threshold_multiplier {
163 anomalies.push((i, *value));
164 }
165 }
166
167 anomalies
168 }
169
170 pub fn detect_iqr(&self, series: &TimeSeries) -> Vec<(usize, f64)> {
172 let mut values = series.values();
173 if values.len() < 4 {
174 return vec![];
175 }
176
177 values.sort_by(|a, b| a.partial_cmp(b).unwrap());
178
179 let q1_idx = values.len() / 4;
180 let q3_idx = (values.len() * 3) / 4;
181
182 let q1 = values[q1_idx];
183 let q3 = values[q3_idx];
184 let iqr = q3 - q1;
185
186 let lower_bound = q1 - 1.5 * iqr;
187 let upper_bound = q3 + 1.5 * iqr;
188
189 let original_values = series.values();
190 let mut anomalies = Vec::new();
191
192 for (i, value) in original_values.iter().enumerate() {
193 if *value < lower_bound || *value > upper_bound {
194 anomalies.push((i, *value));
195 }
196 }
197
198 anomalies
199 }
200}
201
202#[derive(Debug, Clone, Serialize, Deserialize)]
204pub struct FailurePrediction {
205 pub metric: MetricType,
206 pub current_value: f64,
207 pub predicted_value: f64,
208 pub time_to_failure: Option<std::time::Duration>,
209 pub confidence: f64,
210 pub threshold: f64,
211 pub recommended_actions: Vec<RemediationAction>,
212}
213
214pub struct PredictiveRemediationEngine {
216 time_series: Arc<RwLock<HashMap<MetricType, TimeSeries>>>,
217 anomaly_detector: AnomalyDetector,
218 prediction_horizon: usize, thresholds: HashMap<MetricType, f64>,
220}
221
222impl PredictiveRemediationEngine {
223 pub fn new(prediction_horizon: usize) -> Self {
224 let mut thresholds = HashMap::new();
225 thresholds.insert(MetricType::ErrorRate, 50.0);
226 thresholds.insert(MetricType::Latency, 80.0);
227 thresholds.insert(MetricType::CpuUsage, 85.0);
228 thresholds.insert(MetricType::MemoryUsage, 90.0);
229 thresholds.insert(MetricType::FailureCount, 5.0);
230
231 Self {
232 time_series: Arc::new(RwLock::new(HashMap::new())),
233 anomaly_detector: AnomalyDetector::new(3.0),
234 prediction_horizon,
235 thresholds,
236 }
237 }
238
239 pub async fn record(&self, metric: MetricType, value: f64) {
241 let mut series_map = self.time_series.write().await;
242
243 series_map
244 .entry(metric.clone())
245 .or_insert_with(|| TimeSeries::new(metric, 1000))
246 .add(DataPoint {
247 timestamp: chrono::Utc::now(),
248 value,
249 });
250 }
251
252 pub async fn predict_failures(&self) -> Vec<FailurePrediction> {
254 let series_map = self.time_series.read().await;
255 let mut predictions = Vec::new();
256
257 for (metric, series) in series_map.iter() {
258 if let Some(prediction) = self.predict_failure_for_metric(metric, series).await {
259 predictions.push(prediction);
260 }
261 }
262
263 predictions
264 }
265
266 async fn predict_failure_for_metric(
268 &self,
269 metric: &MetricType,
270 series: &TimeSeries,
271 ) -> Option<FailurePrediction> {
272 if series.data.is_empty() {
273 return None;
274 }
275
276 let current_value = series.data.back()?.value;
277 let threshold = *self.thresholds.get(metric)?;
278
279 let predictions = series.predict_linear(self.prediction_horizon);
281 if predictions.is_empty() {
282 return None;
283 }
284
285 let mut time_to_failure = None;
287 let mut predicted_value = current_value;
288
289 for (i, pred) in predictions.iter().enumerate() {
290 if *pred > threshold && time_to_failure.is_none() {
291 time_to_failure = Some(std::time::Duration::from_secs((i as u64 + 1) * 60));
293 predicted_value = *pred;
294 break;
295 }
296 }
297
298 let confidence = if let Some((slope, _)) = series.linear_trend() {
300 (slope.abs() * 10.0).min(1.0)
301 } else {
302 0.0
303 };
304
305 let recommended_actions = self.recommend_actions(metric, predicted_value, threshold);
307
308 Some(FailurePrediction {
309 metric: metric.clone(),
310 current_value,
311 predicted_value,
312 time_to_failure,
313 confidence,
314 threshold,
315 recommended_actions,
316 })
317 }
318
319 fn recommend_actions(
321 &self,
322 metric: &MetricType,
323 predicted_value: f64,
324 threshold: f64,
325 ) -> Vec<RemediationAction> {
326 if predicted_value <= threshold {
327 return vec![];
328 }
329
330 match metric {
331 MetricType::ErrorRate => vec![
332 RemediationAction::EnableCircuitBreaker,
333 RemediationAction::RestartService,
334 ],
335 MetricType::Latency => {
336 vec![RemediationAction::ClearCache, RemediationAction::ScaleUp(2)]
337 }
338 MetricType::CpuUsage | MetricType::MemoryUsage => {
339 vec![
340 RemediationAction::ScaleUp(2),
341 RemediationAction::RestrictTraffic,
342 ]
343 }
344 MetricType::FailureCount => vec![
345 RemediationAction::RollbackDeployment,
346 RemediationAction::RestartService,
347 ],
348 MetricType::RequestRate => vec![
349 RemediationAction::ScaleUp(4),
350 RemediationAction::RestrictTraffic,
351 ],
352 }
353 }
354
355 pub async fn detect_anomalies(&self) -> HashMap<MetricType, Vec<(usize, f64)>> {
357 let series_map = self.time_series.read().await;
358 let mut anomalies = HashMap::new();
359
360 for (metric, series) in series_map.iter() {
361 let detected = self.anomaly_detector.detect_zscore(series);
362 if !detected.is_empty() {
363 anomalies.insert(metric.clone(), detected);
364 }
365 }
366
367 anomalies
368 }
369
370 pub async fn get_system_state(&self) -> SystemState {
372 let series_map = self.time_series.read().await;
373
374 let error_rate = series_map
375 .get(&MetricType::ErrorRate)
376 .and_then(|s| s.data.back())
377 .map(|p| p.value as u8)
378 .unwrap_or(0);
379
380 let latency_level = series_map
381 .get(&MetricType::Latency)
382 .and_then(|s| s.data.back())
383 .map(|p| p.value as u8)
384 .unwrap_or(0);
385
386 let cpu_usage = series_map
387 .get(&MetricType::CpuUsage)
388 .and_then(|s| s.data.back())
389 .map(|p| p.value as u8)
390 .unwrap_or(0);
391
392 let memory_usage = series_map
393 .get(&MetricType::MemoryUsage)
394 .and_then(|s| s.data.back())
395 .map(|p| p.value as u8)
396 .unwrap_or(0);
397
398 let active_failures = series_map
399 .get(&MetricType::FailureCount)
400 .and_then(|s| s.data.back())
401 .map(|p| p.value as u8)
402 .unwrap_or(0);
403
404 let service_health = if error_rate > 80 || active_failures > 5 {
405 "critical".to_string()
406 } else if error_rate > 50 || latency_level > 70 {
407 "degraded".to_string()
408 } else {
409 "healthy".to_string()
410 };
411
412 SystemState {
413 error_rate,
414 latency_level,
415 cpu_usage,
416 memory_usage,
417 active_failures,
418 service_health,
419 }
420 }
421
422 pub async fn proactive_remediate(&self) -> Vec<RemediationAction> {
424 let predictions = self.predict_failures().await;
425 let mut actions = Vec::new();
426
427 for prediction in predictions {
428 if let Some(ttf) = prediction.time_to_failure {
430 if ttf.as_secs() < 300 && prediction.confidence > 0.6 {
431 actions.extend(prediction.recommended_actions);
432 }
433 }
434 }
435
436 actions.sort_by_key(|a| format!("{:?}", a));
438 actions.dedup();
439
440 actions
441 }
442}
443
444pub struct TrendAnalyzer {
446 engine: Arc<PredictiveRemediationEngine>,
447}
448
449impl TrendAnalyzer {
450 pub fn new(engine: Arc<PredictiveRemediationEngine>) -> Self {
451 Self { engine }
452 }
453
454 pub async fn analyze_trends(&self) -> TrendReport {
456 let series_map = self.engine.time_series.read().await;
457 let mut trends = HashMap::new();
458
459 for (metric, series) in series_map.iter() {
460 if let Some((slope, _)) = series.linear_trend() {
461 let direction = if slope > 0.1 {
462 TrendDirection::Increasing
463 } else if slope < -0.1 {
464 TrendDirection::Decreasing
465 } else {
466 TrendDirection::Stable
467 };
468
469 trends.insert(
470 metric.clone(),
471 MetricTrend {
472 direction,
473 slope,
474 confidence: (slope.abs() * 10.0).min(1.0),
475 },
476 );
477 }
478 }
479
480 TrendReport { trends }
481 }
482}
483
484#[derive(Debug, Clone, Serialize, Deserialize)]
485pub struct TrendReport {
486 pub trends: HashMap<MetricType, MetricTrend>,
487}
488
489#[derive(Debug, Clone, Serialize, Deserialize)]
490pub struct MetricTrend {
491 pub direction: TrendDirection,
492 pub slope: f64,
493 pub confidence: f64,
494}
495
496#[derive(Debug, Clone, Serialize, Deserialize)]
497pub enum TrendDirection {
498 Increasing,
499 Decreasing,
500 Stable,
501}
502
503#[cfg(test)]
504mod tests {
505 use super::*;
506
507 #[tokio::test]
508 async fn test_time_series() {
509 let mut series = TimeSeries::new(MetricType::ErrorRate, 100);
510
511 for i in 0..50 {
512 series.add(DataPoint {
513 timestamp: chrono::Utc::now(),
514 value: i as f64,
515 });
516 }
517
518 let values = series.values();
519 assert_eq!(values.len(), 50);
520
521 let trend = series.linear_trend();
522 assert!(trend.is_some());
523 }
524
525 #[tokio::test]
526 async fn test_prediction() {
527 let engine = PredictiveRemediationEngine::new(10);
528
529 for i in 0..20 {
531 engine.record(MetricType::ErrorRate, (i * 5) as f64).await;
532 }
533
534 let predictions = engine.predict_failures().await;
535 assert!(!predictions.is_empty());
536 }
537
538 #[tokio::test]
539 async fn test_anomaly_detection() {
540 let mut series = TimeSeries::new(MetricType::CpuUsage, 100);
541
542 for _ in 0..50 {
544 series.add(DataPoint {
545 timestamp: chrono::Utc::now(),
546 value: 50.0,
547 });
548 }
549
550 series.add(DataPoint {
552 timestamp: chrono::Utc::now(),
553 value: 200.0,
554 });
555
556 let detector = AnomalyDetector::new(3.0);
557 let anomalies = detector.detect_zscore(&series);
558
559 assert!(!anomalies.is_empty());
560 }
561}