1use crate::{
6 analytics::{ChaosAnalytics, MetricsBucket, TimeBucket},
7 scenario_recorder::ChaosEvent,
8};
9use chrono::{DateTime, Duration, Utc};
10use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12use std::collections::{HashMap, VecDeque};
13use std::sync::Arc;
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct Anomaly {
18 pub id: String,
20 pub detected_at: DateTime<Utc>,
22 pub anomaly_type: AnomalyType,
24 pub severity: f64,
26 pub description: String,
28 pub affected_metrics: Vec<String>,
30 pub suggested_actions: Vec<String>,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
36#[serde(rename_all = "snake_case")]
37pub enum AnomalyType {
38 EventSpike,
40 LatencyAnomaly,
42 HighErrorRate,
44 ResourceExhaustion,
46 CascadingFailure,
48 UnexpectedQuiet,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct PredictiveInsight {
55 pub id: String,
57 pub generated_at: DateTime<Utc>,
59 pub metric: String,
61 pub predicted_value: f64,
63 pub confidence: f64,
65 pub time_horizon_minutes: i64,
67 pub recommendation: String,
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct TrendAnalysis {
74 pub metric: String,
76 pub start_time: DateTime<Utc>,
78 pub end_time: DateTime<Utc>,
79 pub trend: TrendDirection,
81 pub rate_of_change: f64,
83 pub confidence: f64,
85 pub data_points: Vec<DataPoint>,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
91#[serde(rename_all = "lowercase")]
92pub enum TrendDirection {
93 Increasing,
94 Decreasing,
95 Stable,
96 Volatile,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct DataPoint {
102 pub timestamp: DateTime<Utc>,
103 pub value: f64,
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct CorrelationAnalysis {
109 pub metric_a: String,
111 pub metric_b: String,
113 pub correlation: f64,
115 pub p_value: f64,
117 pub interpretation: String,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
123pub struct HealthScore {
124 pub overall_score: f64,
126 pub components: HashMap<String, f64>,
128 pub factors: Vec<HealthFactor>,
130 pub calculated_at: DateTime<Utc>,
132}
133
134#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct HealthFactor {
137 pub name: String,
138 pub impact: f64, pub description: String,
140}
141
142pub struct AdvancedAnalyticsEngine {
144 base_analytics: Arc<ChaosAnalytics>,
146 anomalies: Arc<RwLock<Vec<Anomaly>>>,
148 event_history: Arc<RwLock<VecDeque<ChaosEvent>>>,
150 max_history_size: usize,
152 anomaly_threshold: f64,
154}
155
156impl AdvancedAnalyticsEngine {
157 pub fn new(base_analytics: Arc<ChaosAnalytics>) -> Self {
159 Self {
160 base_analytics,
161 anomalies: Arc::new(RwLock::new(Vec::new())),
162 event_history: Arc::new(RwLock::new(VecDeque::new())),
163 max_history_size: 10000,
164 anomaly_threshold: 0.7,
165 }
166 }
167
168 pub fn with_max_history(mut self, size: usize) -> Self {
170 self.max_history_size = size;
171 self
172 }
173
174 pub fn with_anomaly_threshold(mut self, threshold: f64) -> Self {
176 self.anomaly_threshold = threshold;
177 self
178 }
179
180 pub fn record_event(&self, event: ChaosEvent) {
182 self.base_analytics.record_event(&event, TimeBucket::Minute);
184
185 {
187 let mut history = self.event_history.write();
188 history.push_back(event.clone());
189
190 while history.len() > self.max_history_size {
192 history.pop_front();
193 }
194 }
195
196 self.detect_anomalies();
198 }
199
200 pub fn detect_anomalies(&self) {
202 let now = Utc::now();
203 let recent_start = now - Duration::minutes(5);
204
205 let recent_metrics = self.base_analytics.get_metrics(recent_start, now, TimeBucket::Minute);
206
207 if recent_metrics.is_empty() {
208 return;
209 }
210
211 let baseline_start = now - Duration::minutes(30);
213 let baseline_end = now - Duration::minutes(10);
214 let baseline_metrics =
215 self.base_analytics
216 .get_metrics(baseline_start, baseline_end, TimeBucket::Minute);
217
218 if baseline_metrics.is_empty() {
219 return;
220 }
221
222 self.detect_event_spike(&recent_metrics, &baseline_metrics);
224
225 self.detect_latency_anomaly(&recent_metrics, &baseline_metrics);
227
228 self.detect_high_error_rate(&recent_metrics);
230 }
231
232 fn detect_event_spike(&self, recent: &[MetricsBucket], baseline: &[MetricsBucket]) {
234 let recent_avg =
235 recent.iter().map(|b| b.total_events).sum::<usize>() as f64 / recent.len() as f64;
236 let baseline_avg =
237 baseline.iter().map(|b| b.total_events).sum::<usize>() as f64 / baseline.len() as f64;
238
239 if baseline_avg > 0.0 {
240 let spike_ratio = recent_avg / baseline_avg;
241
242 if spike_ratio > 2.0 {
243 let severity = (spike_ratio - 1.0).min(1.0);
244
245 if severity >= self.anomaly_threshold {
246 let anomaly = Anomaly {
247 id: format!("event_spike_{}", Utc::now().timestamp()),
248 detected_at: Utc::now(),
249 anomaly_type: AnomalyType::EventSpike,
250 severity,
251 description: format!(
252 "Event rate spiked {:.1}x above baseline",
253 spike_ratio
254 ),
255 affected_metrics: vec!["total_events".to_string()],
256 suggested_actions: vec![
257 "Review recent configuration changes".to_string(),
258 "Check orchestration step frequency".to_string(),
259 ],
260 };
261
262 let mut anomalies = self.anomalies.write();
263 anomalies.push(anomaly);
264 }
265 }
266 }
267 }
268
269 fn detect_latency_anomaly(&self, recent: &[MetricsBucket], baseline: &[MetricsBucket]) {
271 let recent_avg = recent.iter().map(|b| b.avg_latency_ms).sum::<f64>() / recent.len() as f64;
272 let baseline_avg =
273 baseline.iter().map(|b| b.avg_latency_ms).sum::<f64>() / baseline.len() as f64;
274
275 if baseline_avg > 0.0 {
276 let latency_ratio = recent_avg / baseline_avg;
277
278 if !(0.5..=1.5).contains(&latency_ratio) {
279 let severity = ((latency_ratio - 1.0).abs()).min(1.0);
280
281 if severity >= self.anomaly_threshold {
282 let anomaly = Anomaly {
283 id: format!("latency_anomaly_{}", Utc::now().timestamp()),
284 detected_at: Utc::now(),
285 anomaly_type: AnomalyType::LatencyAnomaly,
286 severity,
287 description: format!(
288 "Latency changed {:.1}x from baseline ({:.1}ms vs {:.1}ms)",
289 latency_ratio, recent_avg, baseline_avg
290 ),
291 affected_metrics: vec!["avg_latency_ms".to_string()],
292 suggested_actions: vec![
293 "Review latency injection settings".to_string(),
294 "Check network conditions".to_string(),
295 ],
296 };
297
298 let mut anomalies = self.anomalies.write();
299 anomalies.push(anomaly);
300 }
301 }
302 }
303 }
304
305 fn detect_high_error_rate(&self, recent: &[MetricsBucket]) {
307 let total_events: usize = recent.iter().map(|b| b.total_events).sum();
308 let total_faults: usize = recent.iter().map(|b| b.total_faults).sum();
309
310 if total_events > 0 {
311 let error_rate = total_faults as f64 / total_events as f64;
312
313 if error_rate > 0.5 {
314 let severity = error_rate;
315
316 if severity >= self.anomaly_threshold {
317 let anomaly = Anomaly {
318 id: format!("high_error_rate_{}", Utc::now().timestamp()),
319 detected_at: Utc::now(),
320 anomaly_type: AnomalyType::HighErrorRate,
321 severity,
322 description: format!("Error rate at {:.1}%", error_rate * 100.0),
323 affected_metrics: vec![
324 "total_faults".to_string(),
325 "total_events".to_string(),
326 ],
327 suggested_actions: vec![
328 "Review fault injection settings".to_string(),
329 "Check system resilience".to_string(),
330 ],
331 };
332
333 let mut anomalies = self.anomalies.write();
334 anomalies.push(anomaly);
335 }
336 }
337 }
338 }
339
340 pub fn get_anomalies(&self, since: DateTime<Utc>) -> Vec<Anomaly> {
342 let anomalies = self.anomalies.read();
343 anomalies.iter().filter(|a| a.detected_at >= since).cloned().collect()
344 }
345
346 pub fn analyze_trend(
348 &self,
349 metric_name: &str,
350 start: DateTime<Utc>,
351 end: DateTime<Utc>,
352 ) -> TrendAnalysis {
353 let buckets = self.base_analytics.get_metrics(start, end, TimeBucket::FiveMinutes);
354
355 let data_points: Vec<DataPoint> = buckets
356 .iter()
357 .map(|b| {
358 let value = match metric_name {
359 "total_events" => b.total_events as f64,
360 "avg_latency_ms" => b.avg_latency_ms,
361 "total_faults" => b.total_faults as f64,
362 "rate_limit_violations" => b.rate_limit_violations as f64,
363 _ => 0.0,
364 };
365
366 DataPoint {
367 timestamp: b.timestamp,
368 value,
369 }
370 })
371 .collect();
372
373 let (trend, rate) = self.calculate_trend(&data_points);
375
376 TrendAnalysis {
377 metric: metric_name.to_string(),
378 start_time: start,
379 end_time: end,
380 trend,
381 rate_of_change: rate,
382 confidence: 0.85, data_points,
384 }
385 }
386
387 fn calculate_trend(&self, data_points: &[DataPoint]) -> (TrendDirection, f64) {
389 if data_points.len() < 2 {
390 return (TrendDirection::Stable, 0.0);
391 }
392
393 let first_half: Vec<f64> =
395 data_points[..data_points.len() / 2].iter().map(|p| p.value).collect();
396 let second_half: Vec<f64> =
397 data_points[data_points.len() / 2..].iter().map(|p| p.value).collect();
398
399 let first_avg: f64 = first_half.iter().sum::<f64>() / first_half.len() as f64;
400 let second_avg: f64 = second_half.iter().sum::<f64>() / second_half.len() as f64;
401
402 let rate = if first_avg > 0.0 {
403 (second_avg - first_avg) / first_avg
404 } else {
405 0.0
406 };
407
408 let trend = if rate > 0.1 {
409 TrendDirection::Increasing
410 } else if rate < -0.1 {
411 TrendDirection::Decreasing
412 } else if rate.abs() < 0.05 {
413 TrendDirection::Stable
414 } else {
415 TrendDirection::Volatile
416 };
417
418 (trend, rate)
419 }
420
421 pub fn generate_insights(&self) -> Vec<PredictiveInsight> {
423 let mut insights = Vec::new();
424
425 let now = Utc::now();
427 let lookback = now - Duration::hours(1);
428
429 let trend = self.analyze_trend("total_events", lookback, now);
430
431 if trend.trend == TrendDirection::Increasing {
433 insights.push(PredictiveInsight {
434 id: format!("prediction_{}", Utc::now().timestamp()),
435 generated_at: Utc::now(),
436 metric: "total_events".to_string(),
437 predicted_value: trend.rate_of_change * 1.2, confidence: trend.confidence,
439 time_horizon_minutes: 30,
440 recommendation: "Event rate is increasing. Consider scaling resources or adjusting chaos parameters.".to_string(),
441 });
442 }
443
444 insights
445 }
446
447 pub fn calculate_health_score(&self) -> HealthScore {
449 let now = Utc::now();
450 let lookback = now - Duration::minutes(15);
451
452 let impact = self.base_analytics.get_impact_analysis(lookback, now, TimeBucket::Minute);
453
454 let mut components = HashMap::new();
455 let mut factors = Vec::new();
456
457 let event_score = (1.0 - impact.severity_score) * 100.0;
459 components.insert("chaos_impact".to_string(), event_score);
460
461 if impact.severity_score > 0.5 {
462 factors.push(HealthFactor {
463 name: "High chaos severity".to_string(),
464 impact: -20.0,
465 description: "System under significant chaos load".to_string(),
466 });
467 }
468
469 let recent_anomalies = self.get_anomalies(lookback);
471 let anomaly_score = (1.0 - (recent_anomalies.len() as f64 * 0.1)).max(0.0) * 100.0;
472 components.insert("anomaly_score".to_string(), anomaly_score);
473
474 if !recent_anomalies.is_empty() {
475 factors.push(HealthFactor {
476 name: "Anomalies detected".to_string(),
477 impact: -(recent_anomalies.len() as f64 * 5.0),
478 description: format!("{} anomalies detected", recent_anomalies.len()),
479 });
480 }
481
482 let overall_score = components.values().sum::<f64>() / components.len() as f64;
484
485 HealthScore {
486 overall_score,
487 components,
488 factors,
489 calculated_at: Utc::now(),
490 }
491 }
492
493 pub fn clear(&self) {
495 self.base_analytics.clear();
496 let mut anomalies = self.anomalies.write();
497 anomalies.clear();
498 let mut history = self.event_history.write();
499 history.clear();
500 }
501}
502
503#[cfg(test)]
504mod tests {
505 use super::*;
506
507 #[test]
508 fn test_analytics_engine_creation() {
509 let base = Arc::new(ChaosAnalytics::new());
510 let engine = AdvancedAnalyticsEngine::new(base);
511
512 assert_eq!(engine.max_history_size, 10000);
513 assert_eq!(engine.anomaly_threshold, 0.7);
514 }
515
516 #[test]
517 fn test_trend_direction() {
518 let base = Arc::new(ChaosAnalytics::new());
519 let engine = AdvancedAnalyticsEngine::new(base);
520
521 let data_points = vec![
522 DataPoint {
523 timestamp: Utc::now(),
524 value: 10.0,
525 },
526 DataPoint {
527 timestamp: Utc::now(),
528 value: 20.0,
529 },
530 ];
531
532 let (trend, rate) = engine.calculate_trend(&data_points);
533 assert_eq!(trend, TrendDirection::Increasing);
534 assert!(rate > 0.0);
535 }
536}