1use crate::scenario_recorder::{ChaosEvent, ChaosEventType};
4use chrono::{DateTime, Duration, Utc};
5use parking_lot::RwLock;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::sync::Arc;
9
10type BucketMap = Arc<RwLock<HashMap<(DateTime<Utc>, TimeBucket), MetricsBucket>>>;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
15pub enum TimeBucket {
16 Minute,
18 FiveMinutes,
20 Hour,
22 Day,
24}
25
26impl TimeBucket {
27 pub fn duration(&self) -> Duration {
29 match self {
30 TimeBucket::Minute => Duration::minutes(1),
31 TimeBucket::FiveMinutes => Duration::minutes(5),
32 TimeBucket::Hour => Duration::hours(1),
33 TimeBucket::Day => Duration::days(1),
34 }
35 }
36
37 pub fn round_timestamp(&self, timestamp: DateTime<Utc>) -> DateTime<Utc> {
39 let duration_secs = self.duration().num_seconds();
40 let timestamp_secs = timestamp.timestamp();
41 let rounded_secs = (timestamp_secs / duration_secs) * duration_secs;
42
43 DateTime::from_timestamp(rounded_secs, 0).unwrap_or(timestamp)
44 }
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct MetricsBucket {
50 pub timestamp: DateTime<Utc>,
52 pub bucket: TimeBucket,
54 pub total_events: usize,
56 pub events_by_type: HashMap<String, usize>,
58 pub avg_latency_ms: f64,
60 pub max_latency_ms: u64,
62 pub min_latency_ms: u64,
64 pub total_faults: usize,
66 pub faults_by_type: HashMap<String, usize>,
68 pub rate_limit_violations: usize,
70 pub traffic_shaping_events: usize,
72 pub protocol_events: HashMap<String, usize>,
74 pub affected_endpoints: HashMap<String, usize>,
76}
77
78impl MetricsBucket {
79 pub fn new(timestamp: DateTime<Utc>, bucket: TimeBucket) -> Self {
81 Self {
82 timestamp: bucket.round_timestamp(timestamp),
83 bucket,
84 total_events: 0,
85 events_by_type: HashMap::new(),
86 avg_latency_ms: 0.0,
87 max_latency_ms: 0,
88 min_latency_ms: u64::MAX,
89 total_faults: 0,
90 faults_by_type: HashMap::new(),
91 rate_limit_violations: 0,
92 traffic_shaping_events: 0,
93 protocol_events: HashMap::new(),
94 affected_endpoints: HashMap::new(),
95 }
96 }
97
98 pub fn add_event(&mut self, event: &ChaosEvent) {
100 self.total_events += 1;
101
102 let event_type_name = Self::event_type_name(&event.event_type);
104 *self.events_by_type.entry(event_type_name).or_insert(0) += 1;
105
106 match &event.event_type {
108 ChaosEventType::LatencyInjection { delay_ms, endpoint } => {
109 self.update_latency_stats(*delay_ms);
111
112 if let Some(ep) = endpoint {
114 *self.affected_endpoints.entry(ep.clone()).or_insert(0) += 1;
115 }
116 }
117 ChaosEventType::FaultInjection {
118 fault_type,
119 endpoint,
120 } => {
121 self.total_faults += 1;
122 *self.faults_by_type.entry(fault_type.clone()).or_insert(0) += 1;
123
124 if let Some(ep) = endpoint {
125 *self.affected_endpoints.entry(ep.clone()).or_insert(0) += 1;
126 }
127 }
128 ChaosEventType::RateLimitExceeded { endpoint, .. } => {
129 self.rate_limit_violations += 1;
130
131 if let Some(ep) = endpoint {
132 *self.affected_endpoints.entry(ep.clone()).or_insert(0) += 1;
133 }
134 }
135 ChaosEventType::TrafficShaping { .. } => {
136 self.traffic_shaping_events += 1;
137 }
138 ChaosEventType::ProtocolEvent { protocol, .. } => {
139 *self.protocol_events.entry(protocol.clone()).or_insert(0) += 1;
140 }
141 ChaosEventType::ScenarioTransition { .. } => {
142 }
144 }
145 }
146
147 fn update_latency_stats(&mut self, delay_ms: u64) {
149 self.max_latency_ms = self.max_latency_ms.max(delay_ms);
151 self.min_latency_ms = self.min_latency_ms.min(delay_ms);
152
153 let n = self.events_by_type.get("LatencyInjection").copied().unwrap_or(1);
155 self.avg_latency_ms = ((self.avg_latency_ms * (n - 1) as f64) + delay_ms as f64) / n as f64;
156 }
157
158 fn event_type_name(event_type: &ChaosEventType) -> String {
160 match event_type {
161 ChaosEventType::LatencyInjection { .. } => "LatencyInjection".to_string(),
162 ChaosEventType::FaultInjection { .. } => "FaultInjection".to_string(),
163 ChaosEventType::RateLimitExceeded { .. } => "RateLimitExceeded".to_string(),
164 ChaosEventType::TrafficShaping { .. } => "TrafficShaping".to_string(),
165 ChaosEventType::ProtocolEvent { .. } => "ProtocolEvent".to_string(),
166 ChaosEventType::ScenarioTransition { .. } => "ScenarioTransition".to_string(),
167 }
168 }
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct ChaosImpact {
174 pub start_time: DateTime<Utc>,
176 pub end_time: DateTime<Utc>,
177
178 pub total_events: usize,
180
181 pub severity_score: f64,
184
185 pub top_affected_endpoints: Vec<(String, usize)>,
187
188 pub event_distribution: HashMap<String, usize>,
190
191 pub avg_degradation_percent: f64,
193
194 pub peak_chaos_time: Option<DateTime<Utc>>,
196 pub peak_chaos_events: usize,
197}
198
199impl ChaosImpact {
200 pub fn from_buckets(buckets: &[MetricsBucket]) -> Self {
202 if buckets.is_empty() {
203 return Self::empty();
204 }
205
206 let start_time = buckets.first().unwrap().timestamp;
207 let end_time = buckets.last().unwrap().timestamp;
208
209 let mut total_events = 0;
210 let mut endpoint_counts: HashMap<String, usize> = HashMap::new();
211 let mut event_distribution: HashMap<String, usize> = HashMap::new();
212 let mut peak_chaos_events = 0;
213 let mut peak_chaos_time = None;
214
215 for bucket in buckets {
216 total_events += bucket.total_events;
217
218 if bucket.total_events > peak_chaos_events {
220 peak_chaos_events = bucket.total_events;
221 peak_chaos_time = Some(bucket.timestamp);
222 }
223
224 for (endpoint, count) in &bucket.affected_endpoints {
226 *endpoint_counts.entry(endpoint.clone()).or_insert(0) += count;
227 }
228
229 for (event_type, count) in &bucket.events_by_type {
231 *event_distribution.entry(event_type.clone()).or_insert(0) += count;
232 }
233 }
234
235 let avg_events_per_bucket = total_events as f64 / buckets.len() as f64;
237 let severity_score = (avg_events_per_bucket / 100.0).min(1.0); let mut top_affected: Vec<_> = endpoint_counts.into_iter().collect();
241 top_affected.sort_by(|a, b| b.1.cmp(&a.1));
242 top_affected.truncate(10); let avg_degradation_percent = severity_score * 100.0;
246
247 Self {
248 start_time,
249 end_time,
250 total_events,
251 severity_score,
252 top_affected_endpoints: top_affected,
253 event_distribution,
254 avg_degradation_percent,
255 peak_chaos_time,
256 peak_chaos_events,
257 }
258 }
259
260 fn empty() -> Self {
262 Self {
263 start_time: Utc::now(),
264 end_time: Utc::now(),
265 total_events: 0,
266 severity_score: 0.0,
267 top_affected_endpoints: vec![],
268 event_distribution: HashMap::new(),
269 avg_degradation_percent: 0.0,
270 peak_chaos_time: None,
271 peak_chaos_events: 0,
272 }
273 }
274}
275
276pub struct ChaosAnalytics {
278 buckets: BucketMap,
280 max_buckets: usize,
282}
283
284impl ChaosAnalytics {
285 pub fn new() -> Self {
287 Self {
288 buckets: Arc::new(RwLock::new(HashMap::new())),
289 max_buckets: 1440, }
291 }
292
293 pub fn with_max_buckets(mut self, max: usize) -> Self {
295 self.max_buckets = max;
296 self
297 }
298
299 pub fn record_event(&self, event: &ChaosEvent, bucket_size: TimeBucket) {
301 let bucket_timestamp = bucket_size.round_timestamp(event.timestamp);
302 let key = (bucket_timestamp, bucket_size);
303
304 let mut buckets = self.buckets.write();
305
306 let bucket = buckets
308 .entry(key)
309 .or_insert_with(|| MetricsBucket::new(bucket_timestamp, bucket_size));
310
311 bucket.add_event(event);
312
313 if buckets.len() > self.max_buckets {
315 self.cleanup_old_buckets(&mut buckets);
316 }
317 }
318
319 pub fn get_metrics(
321 &self,
322 start: DateTime<Utc>,
323 end: DateTime<Utc>,
324 bucket_size: TimeBucket,
325 ) -> Vec<MetricsBucket> {
326 let buckets = self.buckets.read();
327
328 let mut result: Vec<_> = buckets
329 .iter()
330 .filter(|((timestamp, size), _)| {
331 *size == bucket_size && *timestamp >= start && *timestamp <= end
332 })
333 .map(|(_, bucket)| bucket.clone())
334 .collect();
335
336 result.sort_by_key(|b| b.timestamp);
337 result
338 }
339
340 pub fn get_impact_analysis(
342 &self,
343 start: DateTime<Utc>,
344 end: DateTime<Utc>,
345 bucket_size: TimeBucket,
346 ) -> ChaosImpact {
347 let buckets = self.get_metrics(start, end, bucket_size);
348 ChaosImpact::from_buckets(&buckets)
349 }
350
351 pub fn get_current_metrics(&self, minutes: i64, bucket_size: TimeBucket) -> Vec<MetricsBucket> {
353 let end = Utc::now();
354 let start = end - Duration::minutes(minutes);
355 self.get_metrics(start, end, bucket_size)
356 }
357
358 fn cleanup_old_buckets(
360 &self,
361 buckets: &mut HashMap<(DateTime<Utc>, TimeBucket), MetricsBucket>,
362 ) {
363 if buckets.len() <= self.max_buckets {
364 return;
365 }
366
367 let mut timestamps: Vec<_> = buckets.keys().map(|(ts, _)| *ts).collect();
369 timestamps.sort();
370
371 let keep_from = timestamps.len().saturating_sub(self.max_buckets);
372 let remove_before = timestamps.get(keep_from).copied().unwrap_or(Utc::now());
373
374 buckets.retain(|(ts, _), _| *ts >= remove_before);
375 }
376
377 pub fn clear(&self) {
379 let mut buckets = self.buckets.write();
380 buckets.clear();
381 }
382}
383
384impl Default for ChaosAnalytics {
385 fn default() -> Self {
386 Self::new()
387 }
388}
389
390#[cfg(test)]
391mod tests {
392 use super::*;
393 use chrono::Timelike;
394 use std::collections::HashMap;
395
396 #[test]
397 fn test_time_bucket_rounding() {
398 let timestamp = DateTime::parse_from_rfc3339("2025-10-07T12:34:56Z")
399 .unwrap()
400 .with_timezone(&Utc);
401
402 let minute_bucket = TimeBucket::Minute;
403 let rounded = minute_bucket.round_timestamp(timestamp);
404
405 assert_eq!(rounded.minute(), 34);
407 assert_eq!(rounded.second(), 0);
408 }
409
410 #[test]
411 fn test_metrics_bucket_creation() {
412 let timestamp = Utc::now();
413 let bucket = MetricsBucket::new(timestamp, TimeBucket::Minute);
414
415 assert_eq!(bucket.total_events, 0);
416 assert_eq!(bucket.min_latency_ms, u64::MAX);
417 assert_eq!(bucket.max_latency_ms, 0);
418 }
419
420 #[test]
421 fn test_add_event_to_bucket() {
422 let mut bucket = MetricsBucket::new(Utc::now(), TimeBucket::Minute);
423
424 let event = ChaosEvent {
425 timestamp: Utc::now(),
426 event_type: ChaosEventType::LatencyInjection {
427 delay_ms: 100,
428 endpoint: Some("/api/test".to_string()),
429 },
430 metadata: HashMap::new(),
431 };
432
433 bucket.add_event(&event);
434
435 assert_eq!(bucket.total_events, 1);
436 assert_eq!(bucket.avg_latency_ms, 100.0);
437 assert_eq!(bucket.max_latency_ms, 100);
438 assert_eq!(bucket.min_latency_ms, 100);
439 assert_eq!(bucket.affected_endpoints.get("/api/test"), Some(&1));
440 }
441
442 #[test]
443 fn test_analytics_record_event() {
444 let analytics = ChaosAnalytics::new();
445
446 let event = ChaosEvent {
447 timestamp: Utc::now(),
448 event_type: ChaosEventType::LatencyInjection {
449 delay_ms: 100,
450 endpoint: None,
451 },
452 metadata: HashMap::new(),
453 };
454
455 analytics.record_event(&event, TimeBucket::Minute);
456
457 let metrics = analytics.get_current_metrics(1, TimeBucket::Minute);
458 assert_eq!(metrics.len(), 1);
459 assert_eq!(metrics[0].total_events, 1);
460 }
461
462 #[test]
463 fn test_chaos_impact_empty() {
464 let impact = ChaosImpact::from_buckets(&[]);
465 assert_eq!(impact.total_events, 0);
466 assert_eq!(impact.severity_score, 0.0);
467 }
468}