mockforge_chaos/
analytics.rs

1//! Chaos analytics and metrics aggregation
2
3use crate::scenario_recorder::{ChaosEvent, ChaosEventType};
4use chrono::{DateTime, Duration, Utc};
5use parking_lot::RwLock;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::sync::Arc;
9
10/// Type alias for complex bucket map
11type BucketMap = Arc<RwLock<HashMap<(DateTime<Utc>, TimeBucket), MetricsBucket>>>;
12
13/// Time bucket for aggregated metrics
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
15pub enum TimeBucket {
16    /// 1-minute buckets
17    Minute,
18    /// 5-minute buckets
19    FiveMinutes,
20    /// 1-hour buckets
21    Hour,
22    /// 1-day buckets
23    Day,
24}
25
26impl TimeBucket {
27    /// Get duration for this bucket
28    pub fn duration(&self) -> Duration {
29        match self {
30            TimeBucket::Minute => Duration::minutes(1),
31            TimeBucket::FiveMinutes => Duration::minutes(5),
32            TimeBucket::Hour => Duration::hours(1),
33            TimeBucket::Day => Duration::days(1),
34        }
35    }
36
37    /// Round timestamp to bucket boundary
38    pub fn round_timestamp(&self, timestamp: DateTime<Utc>) -> DateTime<Utc> {
39        let duration_secs = self.duration().num_seconds();
40        let timestamp_secs = timestamp.timestamp();
41        let rounded_secs = (timestamp_secs / duration_secs) * duration_secs;
42
43        DateTime::from_timestamp(rounded_secs, 0).unwrap_or(timestamp)
44    }
45}
46
47/// Aggregated chaos metrics for a time bucket
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct MetricsBucket {
50    /// Bucket timestamp
51    pub timestamp: DateTime<Utc>,
52    /// Bucket size
53    pub bucket: TimeBucket,
54    /// Total events in this bucket
55    pub total_events: usize,
56    /// Events by type
57    pub events_by_type: HashMap<String, usize>,
58    /// Average latency (ms)
59    pub avg_latency_ms: f64,
60    /// Max latency (ms)
61    pub max_latency_ms: u64,
62    /// Min latency (ms)
63    pub min_latency_ms: u64,
64    /// Total faults injected
65    pub total_faults: usize,
66    /// Faults by type
67    pub faults_by_type: HashMap<String, usize>,
68    /// Rate limit violations
69    pub rate_limit_violations: usize,
70    /// Traffic shaping events
71    pub traffic_shaping_events: usize,
72    /// Protocol events
73    pub protocol_events: HashMap<String, usize>,
74    /// Affected endpoints
75    pub affected_endpoints: HashMap<String, usize>,
76}
77
78impl MetricsBucket {
79    /// Create a new empty metrics bucket
80    pub fn new(timestamp: DateTime<Utc>, bucket: TimeBucket) -> Self {
81        Self {
82            timestamp: bucket.round_timestamp(timestamp),
83            bucket,
84            total_events: 0,
85            events_by_type: HashMap::new(),
86            avg_latency_ms: 0.0,
87            max_latency_ms: 0,
88            min_latency_ms: u64::MAX,
89            total_faults: 0,
90            faults_by_type: HashMap::new(),
91            rate_limit_violations: 0,
92            traffic_shaping_events: 0,
93            protocol_events: HashMap::new(),
94            affected_endpoints: HashMap::new(),
95        }
96    }
97
98    /// Add an event to this bucket
99    pub fn add_event(&mut self, event: &ChaosEvent) {
100        self.total_events += 1;
101
102        // Count by event type
103        let event_type_name = Self::event_type_name(&event.event_type);
104        *self.events_by_type.entry(event_type_name).or_insert(0) += 1;
105
106        // Process specific event types
107        match &event.event_type {
108            ChaosEventType::LatencyInjection { delay_ms, endpoint } => {
109                // Update latency stats
110                self.update_latency_stats(*delay_ms);
111
112                // Track affected endpoint
113                if let Some(ep) = endpoint {
114                    *self.affected_endpoints.entry(ep.clone()).or_insert(0) += 1;
115                }
116            }
117            ChaosEventType::FaultInjection {
118                fault_type,
119                endpoint,
120            } => {
121                self.total_faults += 1;
122                *self.faults_by_type.entry(fault_type.clone()).or_insert(0) += 1;
123
124                if let Some(ep) = endpoint {
125                    *self.affected_endpoints.entry(ep.clone()).or_insert(0) += 1;
126                }
127            }
128            ChaosEventType::RateLimitExceeded { endpoint, .. } => {
129                self.rate_limit_violations += 1;
130
131                if let Some(ep) = endpoint {
132                    *self.affected_endpoints.entry(ep.clone()).or_insert(0) += 1;
133                }
134            }
135            ChaosEventType::TrafficShaping { .. } => {
136                self.traffic_shaping_events += 1;
137            }
138            ChaosEventType::ProtocolEvent { protocol, .. } => {
139                *self.protocol_events.entry(protocol.clone()).or_insert(0) += 1;
140            }
141            ChaosEventType::ScenarioTransition { .. } => {
142                // Just counted in total_events
143            }
144        }
145    }
146
147    /// Update latency statistics
148    fn update_latency_stats(&mut self, delay_ms: u64) {
149        // Update min/max
150        self.max_latency_ms = self.max_latency_ms.max(delay_ms);
151        self.min_latency_ms = self.min_latency_ms.min(delay_ms);
152
153        // Update average (incremental)
154        let n = self.events_by_type.get("LatencyInjection").copied().unwrap_or(1);
155        self.avg_latency_ms = ((self.avg_latency_ms * (n - 1) as f64) + delay_ms as f64) / n as f64;
156    }
157
158    /// Get event type name as string
159    fn event_type_name(event_type: &ChaosEventType) -> String {
160        match event_type {
161            ChaosEventType::LatencyInjection { .. } => "LatencyInjection".to_string(),
162            ChaosEventType::FaultInjection { .. } => "FaultInjection".to_string(),
163            ChaosEventType::RateLimitExceeded { .. } => "RateLimitExceeded".to_string(),
164            ChaosEventType::TrafficShaping { .. } => "TrafficShaping".to_string(),
165            ChaosEventType::ProtocolEvent { .. } => "ProtocolEvent".to_string(),
166            ChaosEventType::ScenarioTransition { .. } => "ScenarioTransition".to_string(),
167        }
168    }
169}
170
171/// Chaos impact analysis
172#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct ChaosImpact {
174    /// Analysis period
175    pub start_time: DateTime<Utc>,
176    pub end_time: DateTime<Utc>,
177
178    /// Total chaos events
179    pub total_events: usize,
180
181    /// Impact severity (0.0 - 1.0)
182    /// Based on event frequency, latency, and fault rate
183    pub severity_score: f64,
184
185    /// Most affected endpoints
186    pub top_affected_endpoints: Vec<(String, usize)>,
187
188    /// Chaos distribution
189    pub event_distribution: HashMap<String, usize>,
190
191    /// Average system degradation percentage
192    pub avg_degradation_percent: f64,
193
194    /// Peak chaos time
195    pub peak_chaos_time: Option<DateTime<Utc>>,
196    pub peak_chaos_events: usize,
197}
198
199impl ChaosImpact {
200    /// Calculate impact from metrics buckets
201    pub fn from_buckets(buckets: &[MetricsBucket]) -> Self {
202        if buckets.is_empty() {
203            return Self::empty();
204        }
205
206        let start_time = buckets.first().unwrap().timestamp;
207        let end_time = buckets.last().unwrap().timestamp;
208
209        let mut total_events = 0;
210        let mut endpoint_counts: HashMap<String, usize> = HashMap::new();
211        let mut event_distribution: HashMap<String, usize> = HashMap::new();
212        let mut peak_chaos_events = 0;
213        let mut peak_chaos_time = None;
214
215        for bucket in buckets {
216            total_events += bucket.total_events;
217
218            // Track peak
219            if bucket.total_events > peak_chaos_events {
220                peak_chaos_events = bucket.total_events;
221                peak_chaos_time = Some(bucket.timestamp);
222            }
223
224            // Aggregate endpoint counts
225            for (endpoint, count) in &bucket.affected_endpoints {
226                *endpoint_counts.entry(endpoint.clone()).or_insert(0) += count;
227            }
228
229            // Aggregate event distribution
230            for (event_type, count) in &bucket.events_by_type {
231                *event_distribution.entry(event_type.clone()).or_insert(0) += count;
232            }
233        }
234
235        // Calculate severity score (0.0 - 1.0)
236        let avg_events_per_bucket = total_events as f64 / buckets.len() as f64;
237        let severity_score = (avg_events_per_bucket / 100.0).min(1.0); // Normalize to 0-1
238
239        // Get top affected endpoints
240        let mut top_affected: Vec<_> = endpoint_counts.into_iter().collect();
241        top_affected.sort_by(|a, b| b.1.cmp(&a.1));
242        top_affected.truncate(10); // Top 10
243
244        // Calculate degradation (simplified: based on latency and faults)
245        let avg_degradation_percent = severity_score * 100.0;
246
247        Self {
248            start_time,
249            end_time,
250            total_events,
251            severity_score,
252            top_affected_endpoints: top_affected,
253            event_distribution,
254            avg_degradation_percent,
255            peak_chaos_time,
256            peak_chaos_events,
257        }
258    }
259
260    /// Create empty impact analysis
261    fn empty() -> Self {
262        Self {
263            start_time: Utc::now(),
264            end_time: Utc::now(),
265            total_events: 0,
266            severity_score: 0.0,
267            top_affected_endpoints: vec![],
268            event_distribution: HashMap::new(),
269            avg_degradation_percent: 0.0,
270            peak_chaos_time: None,
271            peak_chaos_events: 0,
272        }
273    }
274}
275
276/// Chaos analytics engine
277pub struct ChaosAnalytics {
278    /// Metrics buckets by time
279    buckets: BucketMap,
280    /// Maximum buckets to retain
281    max_buckets: usize,
282}
283
284impl ChaosAnalytics {
285    /// Create a new analytics engine
286    pub fn new() -> Self {
287        Self {
288            buckets: Arc::new(RwLock::new(HashMap::new())),
289            max_buckets: 1440, // 24 hours of minute buckets
290        }
291    }
292
293    /// Set maximum buckets to retain
294    pub fn with_max_buckets(mut self, max: usize) -> Self {
295        self.max_buckets = max;
296        self
297    }
298
299    /// Record an event
300    pub fn record_event(&self, event: &ChaosEvent, bucket_size: TimeBucket) {
301        let bucket_timestamp = bucket_size.round_timestamp(event.timestamp);
302        let key = (bucket_timestamp, bucket_size);
303
304        let mut buckets = self.buckets.write();
305
306        // Get or create bucket
307        let bucket = buckets
308            .entry(key)
309            .or_insert_with(|| MetricsBucket::new(bucket_timestamp, bucket_size));
310
311        bucket.add_event(event);
312
313        // Cleanup old buckets if needed
314        if buckets.len() > self.max_buckets {
315            self.cleanup_old_buckets(&mut buckets);
316        }
317    }
318
319    /// Get metrics for a time range
320    pub fn get_metrics(
321        &self,
322        start: DateTime<Utc>,
323        end: DateTime<Utc>,
324        bucket_size: TimeBucket,
325    ) -> Vec<MetricsBucket> {
326        let buckets = self.buckets.read();
327
328        let mut result: Vec<_> = buckets
329            .iter()
330            .filter(|((timestamp, size), _)| {
331                *size == bucket_size && *timestamp >= start && *timestamp <= end
332            })
333            .map(|(_, bucket)| bucket.clone())
334            .collect();
335
336        result.sort_by_key(|b| b.timestamp);
337        result
338    }
339
340    /// Get chaos impact analysis
341    pub fn get_impact_analysis(
342        &self,
343        start: DateTime<Utc>,
344        end: DateTime<Utc>,
345        bucket_size: TimeBucket,
346    ) -> ChaosImpact {
347        let buckets = self.get_metrics(start, end, bucket_size);
348        ChaosImpact::from_buckets(&buckets)
349    }
350
351    /// Get current metrics (last N minutes)
352    pub fn get_current_metrics(&self, minutes: i64, bucket_size: TimeBucket) -> Vec<MetricsBucket> {
353        let end = Utc::now();
354        let start = end - Duration::minutes(minutes);
355        self.get_metrics(start, end, bucket_size)
356    }
357
358    /// Cleanup old buckets
359    fn cleanup_old_buckets(
360        &self,
361        buckets: &mut HashMap<(DateTime<Utc>, TimeBucket), MetricsBucket>,
362    ) {
363        if buckets.len() <= self.max_buckets {
364            return;
365        }
366
367        // Find oldest buckets and remove them
368        let mut timestamps: Vec<_> = buckets.keys().map(|(ts, _)| *ts).collect();
369        timestamps.sort();
370
371        let keep_from = timestamps.len().saturating_sub(self.max_buckets);
372        let remove_before = timestamps.get(keep_from).copied().unwrap_or(Utc::now());
373
374        buckets.retain(|(ts, _), _| *ts >= remove_before);
375    }
376
377    /// Clear all analytics data
378    pub fn clear(&self) {
379        let mut buckets = self.buckets.write();
380        buckets.clear();
381    }
382}
383
384impl Default for ChaosAnalytics {
385    fn default() -> Self {
386        Self::new()
387    }
388}
389
390#[cfg(test)]
391mod tests {
392    use super::*;
393    use chrono::Timelike;
394    use std::collections::HashMap;
395
396    #[test]
397    fn test_time_bucket_rounding() {
398        let timestamp = DateTime::parse_from_rfc3339("2025-10-07T12:34:56Z")
399            .unwrap()
400            .with_timezone(&Utc);
401
402        let minute_bucket = TimeBucket::Minute;
403        let rounded = minute_bucket.round_timestamp(timestamp);
404
405        // Should round down to 12:34:00
406        assert_eq!(rounded.minute(), 34);
407        assert_eq!(rounded.second(), 0);
408    }
409
410    #[test]
411    fn test_metrics_bucket_creation() {
412        let timestamp = Utc::now();
413        let bucket = MetricsBucket::new(timestamp, TimeBucket::Minute);
414
415        assert_eq!(bucket.total_events, 0);
416        assert_eq!(bucket.min_latency_ms, u64::MAX);
417        assert_eq!(bucket.max_latency_ms, 0);
418    }
419
420    #[test]
421    fn test_add_event_to_bucket() {
422        let mut bucket = MetricsBucket::new(Utc::now(), TimeBucket::Minute);
423
424        let event = ChaosEvent {
425            timestamp: Utc::now(),
426            event_type: ChaosEventType::LatencyInjection {
427                delay_ms: 100,
428                endpoint: Some("/api/test".to_string()),
429            },
430            metadata: HashMap::new(),
431        };
432
433        bucket.add_event(&event);
434
435        assert_eq!(bucket.total_events, 1);
436        assert_eq!(bucket.avg_latency_ms, 100.0);
437        assert_eq!(bucket.max_latency_ms, 100);
438        assert_eq!(bucket.min_latency_ms, 100);
439        assert_eq!(bucket.affected_endpoints.get("/api/test"), Some(&1));
440    }
441
442    #[test]
443    fn test_analytics_record_event() {
444        let analytics = ChaosAnalytics::new();
445
446        let event = ChaosEvent {
447            timestamp: Utc::now(),
448            event_type: ChaosEventType::LatencyInjection {
449                delay_ms: 100,
450                endpoint: None,
451            },
452            metadata: HashMap::new(),
453        };
454
455        analytics.record_event(&event, TimeBucket::Minute);
456
457        let metrics = analytics.get_current_metrics(1, TimeBucket::Minute);
458        assert_eq!(metrics.len(), 1);
459        assert_eq!(metrics[0].total_events, 1);
460    }
461
462    #[test]
463    fn test_chaos_impact_empty() {
464        let impact = ChaosImpact::from_buckets(&[]);
465        assert_eq!(impact.total_events, 0);
466        assert_eq!(impact.severity_score, 0.0);
467    }
468}