mockforge_chaos/
advanced_analytics.rs

1//! Advanced analytics engine for chaos engineering
2//!
3//! Provides predictive analytics, anomaly detection, and intelligent insights.
4
5use crate::{
6    analytics::{ChaosAnalytics, MetricsBucket, TimeBucket},
7    scenario_recorder::ChaosEvent,
8};
9use chrono::{DateTime, Duration, Utc};
10use parking_lot::RwLock;
11use serde::{Deserialize, Serialize};
12use std::collections::{HashMap, VecDeque};
13use std::sync::Arc;
14
15/// Anomaly detected in chaos patterns
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct Anomaly {
18    /// Anomaly ID
19    pub id: String,
20    /// Detection time
21    pub detected_at: DateTime<Utc>,
22    /// Anomaly type
23    pub anomaly_type: AnomalyType,
24    /// Severity (0.0 - 1.0)
25    pub severity: f64,
26    /// Description
27    pub description: String,
28    /// Affected metrics
29    pub affected_metrics: Vec<String>,
30    /// Suggested actions
31    pub suggested_actions: Vec<String>,
32}
33
34/// Types of anomalies
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
36#[serde(rename_all = "snake_case")]
37pub enum AnomalyType {
38    /// Sudden spike in events
39    EventSpike,
40    /// Unusual latency patterns
41    LatencyAnomaly,
42    /// High error rate
43    HighErrorRate,
44    /// Resource exhaustion pattern
45    ResourceExhaustion,
46    /// Cascading failures
47    CascadingFailure,
48    /// Unexpected quiet period
49    UnexpectedQuiet,
50}
51
52/// Predictive insight
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct PredictiveInsight {
55    /// Insight ID
56    pub id: String,
57    /// Generated at
58    pub generated_at: DateTime<Utc>,
59    /// Predicted metric
60    pub metric: String,
61    /// Predicted value
62    pub predicted_value: f64,
63    /// Confidence (0.0 - 1.0)
64    pub confidence: f64,
65    /// Time horizon
66    pub time_horizon_minutes: i64,
67    /// Recommendation
68    pub recommendation: String,
69}
70
71/// Trend analysis
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct TrendAnalysis {
74    /// Metric name
75    pub metric: String,
76    /// Analysis period
77    pub start_time: DateTime<Utc>,
78    pub end_time: DateTime<Utc>,
79    /// Trend direction
80    pub trend: TrendDirection,
81    /// Rate of change
82    pub rate_of_change: f64,
83    /// Statistical confidence
84    pub confidence: f64,
85    /// Data points
86    pub data_points: Vec<DataPoint>,
87}
88
89/// Trend direction
90#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
91#[serde(rename_all = "lowercase")]
92pub enum TrendDirection {
93    Increasing,
94    Decreasing,
95    Stable,
96    Volatile,
97}
98
99/// Data point for trend analysis
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct DataPoint {
102    pub timestamp: DateTime<Utc>,
103    pub value: f64,
104}
105
106/// Correlation analysis between metrics
107#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct CorrelationAnalysis {
109    /// Metric A
110    pub metric_a: String,
111    /// Metric B
112    pub metric_b: String,
113    /// Correlation coefficient (-1.0 to 1.0)
114    pub correlation: f64,
115    /// Statistical significance
116    pub p_value: f64,
117    /// Interpretation
118    pub interpretation: String,
119}
120
121/// System health score
122#[derive(Debug, Clone, Serialize, Deserialize)]
123pub struct HealthScore {
124    /// Overall score (0.0 - 100.0)
125    pub overall_score: f64,
126    /// Component scores
127    pub components: HashMap<String, f64>,
128    /// Factors affecting score
129    pub factors: Vec<HealthFactor>,
130    /// Calculated at
131    pub calculated_at: DateTime<Utc>,
132}
133
134/// Factor affecting health score
135#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct HealthFactor {
137    pub name: String,
138    pub impact: f64, // Positive or negative impact on score
139    pub description: String,
140}
141
142/// Advanced analytics engine
143pub struct AdvancedAnalyticsEngine {
144    /// Base analytics
145    base_analytics: Arc<ChaosAnalytics>,
146    /// Detected anomalies
147    anomalies: Arc<RwLock<Vec<Anomaly>>>,
148    /// Historical events for pattern learning
149    event_history: Arc<RwLock<VecDeque<ChaosEvent>>>,
150    /// Maximum events to retain
151    max_history_size: usize,
152    /// Anomaly detection threshold
153    anomaly_threshold: f64,
154}
155
156impl AdvancedAnalyticsEngine {
157    /// Create a new advanced analytics engine
158    pub fn new(base_analytics: Arc<ChaosAnalytics>) -> Self {
159        Self {
160            base_analytics,
161            anomalies: Arc::new(RwLock::new(Vec::new())),
162            event_history: Arc::new(RwLock::new(VecDeque::new())),
163            max_history_size: 10000,
164            anomaly_threshold: 0.7,
165        }
166    }
167
168    /// Set maximum history size
169    pub fn with_max_history(mut self, size: usize) -> Self {
170        self.max_history_size = size;
171        self
172    }
173
174    /// Set anomaly detection threshold
175    pub fn with_anomaly_threshold(mut self, threshold: f64) -> Self {
176        self.anomaly_threshold = threshold;
177        self
178    }
179
180    /// Record and analyze an event
181    pub fn record_event(&self, event: ChaosEvent) {
182        // Add to base analytics
183        self.base_analytics.record_event(&event, TimeBucket::Minute);
184
185        // Add to history
186        {
187            let mut history = self.event_history.write();
188            history.push_back(event.clone());
189
190            // Trim history if needed
191            while history.len() > self.max_history_size {
192                history.pop_front();
193            }
194        }
195
196        // Check for anomalies
197        self.detect_anomalies();
198    }
199
200    /// Detect anomalies in recent data
201    pub fn detect_anomalies(&self) {
202        let now = Utc::now();
203        let recent_start = now - Duration::minutes(5);
204
205        let recent_metrics = self.base_analytics.get_metrics(recent_start, now, TimeBucket::Minute);
206
207        if recent_metrics.is_empty() {
208            return;
209        }
210
211        // Calculate baseline from older data
212        let baseline_start = now - Duration::minutes(30);
213        let baseline_end = now - Duration::minutes(10);
214        let baseline_metrics =
215            self.base_analytics
216                .get_metrics(baseline_start, baseline_end, TimeBucket::Minute);
217
218        if baseline_metrics.is_empty() {
219            return;
220        }
221
222        // Detect event spikes
223        self.detect_event_spike(&recent_metrics, &baseline_metrics);
224
225        // Detect latency anomalies
226        self.detect_latency_anomaly(&recent_metrics, &baseline_metrics);
227
228        // Detect high error rates
229        self.detect_high_error_rate(&recent_metrics);
230    }
231
232    /// Detect event spikes
233    fn detect_event_spike(&self, recent: &[MetricsBucket], baseline: &[MetricsBucket]) {
234        let recent_avg =
235            recent.iter().map(|b| b.total_events).sum::<usize>() as f64 / recent.len() as f64;
236        let baseline_avg =
237            baseline.iter().map(|b| b.total_events).sum::<usize>() as f64 / baseline.len() as f64;
238
239        if baseline_avg > 0.0 {
240            let spike_ratio = recent_avg / baseline_avg;
241
242            if spike_ratio > 2.0 {
243                let severity = (spike_ratio - 1.0).min(1.0);
244
245                if severity >= self.anomaly_threshold {
246                    let anomaly = Anomaly {
247                        id: format!("event_spike_{}", Utc::now().timestamp()),
248                        detected_at: Utc::now(),
249                        anomaly_type: AnomalyType::EventSpike,
250                        severity,
251                        description: format!(
252                            "Event rate spiked {:.1}x above baseline",
253                            spike_ratio
254                        ),
255                        affected_metrics: vec!["total_events".to_string()],
256                        suggested_actions: vec![
257                            "Review recent configuration changes".to_string(),
258                            "Check orchestration step frequency".to_string(),
259                        ],
260                    };
261
262                    let mut anomalies = self.anomalies.write();
263                    anomalies.push(anomaly);
264                }
265            }
266        }
267    }
268
269    /// Detect latency anomalies
270    fn detect_latency_anomaly(&self, recent: &[MetricsBucket], baseline: &[MetricsBucket]) {
271        let recent_avg = recent.iter().map(|b| b.avg_latency_ms).sum::<f64>() / recent.len() as f64;
272        let baseline_avg =
273            baseline.iter().map(|b| b.avg_latency_ms).sum::<f64>() / baseline.len() as f64;
274
275        if baseline_avg > 0.0 {
276            let latency_ratio = recent_avg / baseline_avg;
277
278            if !(0.5..=1.5).contains(&latency_ratio) {
279                let severity = ((latency_ratio - 1.0).abs()).min(1.0);
280
281                if severity >= self.anomaly_threshold {
282                    let anomaly = Anomaly {
283                        id: format!("latency_anomaly_{}", Utc::now().timestamp()),
284                        detected_at: Utc::now(),
285                        anomaly_type: AnomalyType::LatencyAnomaly,
286                        severity,
287                        description: format!(
288                            "Latency changed {:.1}x from baseline ({:.1}ms vs {:.1}ms)",
289                            latency_ratio, recent_avg, baseline_avg
290                        ),
291                        affected_metrics: vec!["avg_latency_ms".to_string()],
292                        suggested_actions: vec![
293                            "Review latency injection settings".to_string(),
294                            "Check network conditions".to_string(),
295                        ],
296                    };
297
298                    let mut anomalies = self.anomalies.write();
299                    anomalies.push(anomaly);
300                }
301            }
302        }
303    }
304
305    /// Detect high error rates
306    fn detect_high_error_rate(&self, recent: &[MetricsBucket]) {
307        let total_events: usize = recent.iter().map(|b| b.total_events).sum();
308        let total_faults: usize = recent.iter().map(|b| b.total_faults).sum();
309
310        if total_events > 0 {
311            let error_rate = total_faults as f64 / total_events as f64;
312
313            if error_rate > 0.5 {
314                let severity = error_rate;
315
316                if severity >= self.anomaly_threshold {
317                    let anomaly = Anomaly {
318                        id: format!("high_error_rate_{}", Utc::now().timestamp()),
319                        detected_at: Utc::now(),
320                        anomaly_type: AnomalyType::HighErrorRate,
321                        severity,
322                        description: format!("Error rate at {:.1}%", error_rate * 100.0),
323                        affected_metrics: vec![
324                            "total_faults".to_string(),
325                            "total_events".to_string(),
326                        ],
327                        suggested_actions: vec![
328                            "Review fault injection settings".to_string(),
329                            "Check system resilience".to_string(),
330                        ],
331                    };
332
333                    let mut anomalies = self.anomalies.write();
334                    anomalies.push(anomaly);
335                }
336            }
337        }
338    }
339
340    /// Get recent anomalies
341    pub fn get_anomalies(&self, since: DateTime<Utc>) -> Vec<Anomaly> {
342        let anomalies = self.anomalies.read();
343        anomalies.iter().filter(|a| a.detected_at >= since).cloned().collect()
344    }
345
346    /// Perform trend analysis on a metric
347    pub fn analyze_trend(
348        &self,
349        metric_name: &str,
350        start: DateTime<Utc>,
351        end: DateTime<Utc>,
352    ) -> TrendAnalysis {
353        let buckets = self.base_analytics.get_metrics(start, end, TimeBucket::FiveMinutes);
354
355        let data_points: Vec<DataPoint> = buckets
356            .iter()
357            .map(|b| {
358                let value = match metric_name {
359                    "total_events" => b.total_events as f64,
360                    "avg_latency_ms" => b.avg_latency_ms,
361                    "total_faults" => b.total_faults as f64,
362                    "rate_limit_violations" => b.rate_limit_violations as f64,
363                    _ => 0.0,
364                };
365
366                DataPoint {
367                    timestamp: b.timestamp,
368                    value,
369                }
370            })
371            .collect();
372
373        // Calculate trend using simple linear regression
374        let (trend, rate) = self.calculate_trend(&data_points);
375
376        TrendAnalysis {
377            metric: metric_name.to_string(),
378            start_time: start,
379            end_time: end,
380            trend,
381            rate_of_change: rate,
382            confidence: 0.85, // Simplified - in production use statistical calculation
383            data_points,
384        }
385    }
386
387    /// Calculate trend direction and rate
388    fn calculate_trend(&self, data_points: &[DataPoint]) -> (TrendDirection, f64) {
389        if data_points.len() < 2 {
390            return (TrendDirection::Stable, 0.0);
391        }
392
393        // Simple moving average comparison
394        let first_half: Vec<f64> =
395            data_points[..data_points.len() / 2].iter().map(|p| p.value).collect();
396        let second_half: Vec<f64> =
397            data_points[data_points.len() / 2..].iter().map(|p| p.value).collect();
398
399        let first_avg: f64 = first_half.iter().sum::<f64>() / first_half.len() as f64;
400        let second_avg: f64 = second_half.iter().sum::<f64>() / second_half.len() as f64;
401
402        let rate = if first_avg > 0.0 {
403            (second_avg - first_avg) / first_avg
404        } else {
405            0.0
406        };
407
408        let trend = if rate > 0.1 {
409            TrendDirection::Increasing
410        } else if rate < -0.1 {
411            TrendDirection::Decreasing
412        } else if rate.abs() < 0.05 {
413            TrendDirection::Stable
414        } else {
415            TrendDirection::Volatile
416        };
417
418        (trend, rate)
419    }
420
421    /// Generate predictive insights
422    pub fn generate_insights(&self) -> Vec<PredictiveInsight> {
423        let mut insights = Vec::new();
424
425        // Analyze recent trends
426        let now = Utc::now();
427        let lookback = now - Duration::hours(1);
428
429        let trend = self.analyze_trend("total_events", lookback, now);
430
431        // Predict future event rate
432        if trend.trend == TrendDirection::Increasing {
433            insights.push(PredictiveInsight {
434                id: format!("prediction_{}", Utc::now().timestamp()),
435                generated_at: Utc::now(),
436                metric: "total_events".to_string(),
437                predicted_value: trend.rate_of_change * 1.2, // Simplified prediction
438                confidence: trend.confidence,
439                time_horizon_minutes: 30,
440                recommendation: "Event rate is increasing. Consider scaling resources or adjusting chaos parameters.".to_string(),
441            });
442        }
443
444        insights
445    }
446
447    /// Calculate system health score
448    pub fn calculate_health_score(&self) -> HealthScore {
449        let now = Utc::now();
450        let lookback = now - Duration::minutes(15);
451
452        let impact = self.base_analytics.get_impact_analysis(lookback, now, TimeBucket::Minute);
453
454        let mut components = HashMap::new();
455        let mut factors = Vec::new();
456
457        // Calculate component scores
458        let event_score = (1.0 - impact.severity_score) * 100.0;
459        components.insert("chaos_impact".to_string(), event_score);
460
461        if impact.severity_score > 0.5 {
462            factors.push(HealthFactor {
463                name: "High chaos severity".to_string(),
464                impact: -20.0,
465                description: "System under significant chaos load".to_string(),
466            });
467        }
468
469        // Check for recent anomalies
470        let recent_anomalies = self.get_anomalies(lookback);
471        let anomaly_score = (1.0 - (recent_anomalies.len() as f64 * 0.1)).max(0.0) * 100.0;
472        components.insert("anomaly_score".to_string(), anomaly_score);
473
474        if !recent_anomalies.is_empty() {
475            factors.push(HealthFactor {
476                name: "Anomalies detected".to_string(),
477                impact: -(recent_anomalies.len() as f64 * 5.0),
478                description: format!("{} anomalies detected", recent_anomalies.len()),
479            });
480        }
481
482        // Calculate overall score
483        let overall_score = components.values().sum::<f64>() / components.len() as f64;
484
485        HealthScore {
486            overall_score,
487            components,
488            factors,
489            calculated_at: Utc::now(),
490        }
491    }
492
493    /// Clear all analytics data
494    pub fn clear(&self) {
495        self.base_analytics.clear();
496        let mut anomalies = self.anomalies.write();
497        anomalies.clear();
498        let mut history = self.event_history.write();
499        history.clear();
500    }
501}
502
503#[cfg(test)]
504mod tests {
505    use super::*;
506
507    #[test]
508    fn test_analytics_engine_creation() {
509        let base = Arc::new(ChaosAnalytics::new());
510        let engine = AdvancedAnalyticsEngine::new(base);
511
512        assert_eq!(engine.max_history_size, 10000);
513        assert_eq!(engine.anomaly_threshold, 0.7);
514    }
515
516    #[test]
517    fn test_trend_direction() {
518        let base = Arc::new(ChaosAnalytics::new());
519        let engine = AdvancedAnalyticsEngine::new(base);
520
521        let data_points = vec![
522            DataPoint {
523                timestamp: Utc::now(),
524                value: 10.0,
525            },
526            DataPoint {
527                timestamp: Utc::now(),
528                value: 20.0,
529            },
530        ];
531
532        let (trend, rate) = engine.calculate_trend(&data_points);
533        assert_eq!(trend, TrendDirection::Increasing);
534        assert!(rate > 0.0);
535    }
536}