clnrm_core/services/
chaos_engine.rs

1//! Chaos Engineering Service Plugin
2//!
3//! Revolutionary chaos testing plugin that introduces controlled failures,
4//! network partitions, and system degradation to test resilience.
5
6use crate::cleanroom::{HealthStatus, ServiceHandle, ServicePlugin};
7use crate::error::Result;
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::RwLock;
11use uuid::Uuid;
12
13/// Chaos engineering configuration
14#[derive(Debug, Clone)]
15pub struct ChaosConfig {
16    /// Failure injection rate (0.0 to 1.0)
17    pub failure_rate: f64,
18    /// Latency injection in milliseconds
19    pub latency_ms: u64,
20    /// Network partition probability
21    pub network_partition_rate: f64,
22    /// Memory pressure injection
23    pub memory_pressure_mb: u64,
24    /// CPU stress injection
25    pub cpu_stress_percent: u8,
26    /// Chaos scenarios to run
27    pub scenarios: Vec<ChaosScenario>,
28}
29
30/// Chaos testing scenarios
31#[derive(Debug, Clone)]
32pub enum ChaosScenario {
33    /// Random service failures
34    RandomFailures {
35        duration_secs: u64,
36        failure_rate: f64,
37    },
38    /// Network latency spikes
39    LatencySpikes {
40        duration_secs: u64,
41        max_latency_ms: u64,
42    },
43    /// Memory exhaustion
44    MemoryExhaustion { duration_secs: u64, target_mb: u64 },
45    /// CPU saturation
46    CpuSaturation {
47        duration_secs: u64,
48        target_percent: u8,
49    },
50    /// Network partition
51    NetworkPartition {
52        duration_secs: u64,
53        affected_services: Vec<String>,
54    },
55    /// Cascading failures
56    CascadingFailures {
57        trigger_service: String,
58        propagation_delay_ms: u64,
59    },
60}
61
62impl Default for ChaosConfig {
63    fn default() -> Self {
64        Self {
65            failure_rate: 0.1,
66            latency_ms: 100,
67            network_partition_rate: 0.05,
68            memory_pressure_mb: 100,
69            cpu_stress_percent: 50,
70            scenarios: vec![
71                ChaosScenario::RandomFailures {
72                    duration_secs: 30,
73                    failure_rate: 0.2,
74                },
75                ChaosScenario::LatencySpikes {
76                    duration_secs: 60,
77                    max_latency_ms: 500,
78                },
79            ],
80        }
81    }
82}
83
84/// Chaos engineering service plugin
85#[derive(Debug)]
86pub struct ChaosEnginePlugin {
87    name: String,
88    config: ChaosConfig,
89    active_scenarios: Arc<RwLock<Vec<String>>>,
90    metrics: Arc<RwLock<ChaosMetrics>>,
91}
92
93/// Chaos testing metrics
94#[derive(Debug, Default, Clone)]
95pub struct ChaosMetrics {
96    /// Total failures injected
97    pub failures_injected: u64,
98    /// Total latency injected (ms)
99    pub latency_injected_ms: u64,
100    /// Network partitions created
101    pub network_partitions: u64,
102    /// Services affected by chaos
103    pub affected_services: Vec<String>,
104    /// Chaos scenarios executed
105    pub scenarios_executed: u64,
106}
107
108impl ChaosEnginePlugin {
109    /// Create a new chaos engine plugin
110    pub fn new(name: &str) -> Self {
111        Self {
112            name: name.to_string(),
113            config: ChaosConfig::default(),
114            active_scenarios: Arc::new(RwLock::new(Vec::new())),
115            metrics: Arc::new(RwLock::new(ChaosMetrics::default())),
116        }
117    }
118
119    /// Create with custom configuration
120    pub fn with_config(name: &str, config: ChaosConfig) -> Self {
121        Self {
122            name: name.to_string(),
123            config,
124            active_scenarios: Arc::new(RwLock::new(Vec::new())),
125            metrics: Arc::new(RwLock::new(ChaosMetrics::default())),
126        }
127    }
128
129    /// Set failure injection rate
130    pub fn with_failure_rate(mut self, rate: f64) -> Self {
131        self.config.failure_rate = rate.clamp(0.0, 1.0);
132        self
133    }
134
135    /// Set latency injection
136    pub fn with_latency(mut self, latency_ms: u64) -> Self {
137        self.config.latency_ms = latency_ms;
138        self
139    }
140
141    /// Add chaos scenario
142    pub fn with_scenario(mut self, scenario: ChaosScenario) -> Self {
143        self.config.scenarios.push(scenario);
144        self
145    }
146
147    /// Inject random failure
148    pub async fn inject_failure(&self, service_name: &str) -> Result<bool> {
149        let should_fail = rand::random::<f64>() < self.config.failure_rate;
150
151        if should_fail {
152            let mut metrics = self.metrics.write().await;
153            metrics.failures_injected += 1;
154            metrics.affected_services.push(service_name.to_string());
155
156            tracing::info!(
157                service = %service_name,
158                "Chaos engine injecting failure"
159            );
160            Ok(true)
161        } else {
162            Ok(false)
163        }
164    }
165
166    /// Inject latency
167    pub async fn inject_latency(&self, service_name: &str) -> Result<u64> {
168        let latency = if rand::random::<f64>() < 0.3 {
169            self.config.latency_ms + rand::random::<u64>() % 200
170        } else {
171            0
172        };
173
174        if latency > 0 {
175            let mut metrics = self.metrics.write().await;
176            metrics.latency_injected_ms += latency;
177
178            tracing::info!(
179                service = %service_name,
180                latency_ms = latency,
181                "Chaos engine injecting latency"
182            );
183
184            // Simulate latency
185            tokio::time::sleep(std::time::Duration::from_millis(latency)).await;
186        }
187
188        Ok(latency)
189    }
190
191    /// Create network partition
192    pub async fn create_network_partition(&self, services: &[String]) -> Result<()> {
193        if rand::random::<f64>() < self.config.network_partition_rate {
194            let mut metrics = self.metrics.write().await;
195            metrics.network_partitions += 1;
196            metrics.affected_services.extend(services.iter().cloned());
197
198            tracing::info!(
199                services = ?services,
200                "Chaos engine creating network partition"
201            );
202        }
203        Ok(())
204    }
205
206    /// Run chaos scenario
207    pub async fn run_scenario(&self, scenario: &ChaosScenario) -> Result<()> {
208        let scenario_id = Uuid::new_v4().to_string();
209        let mut active = self.active_scenarios.write().await;
210        active.push(scenario_id.clone());
211
212        let mut metrics = self.metrics.write().await;
213        metrics.scenarios_executed += 1;
214
215        match scenario {
216            ChaosScenario::RandomFailures {
217                duration_secs,
218                failure_rate,
219            } => {
220                tracing::info!(
221                    duration_secs,
222                    failure_rate_percent = failure_rate * 100.0,
223                    "Chaos engine running random failures scenario"
224                );
225
226                // Simulate random failures over duration
227                for _ in 0..*duration_secs {
228                    if rand::random::<f64>() < *failure_rate {
229                        metrics.failures_injected += 1;
230                    }
231                    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
232                }
233            }
234            ChaosScenario::LatencySpikes {
235                duration_secs,
236                max_latency_ms,
237            } => {
238                tracing::info!(
239                    duration_secs,
240                    max_latency_ms,
241                    "Chaos engine running latency spikes scenario"
242                );
243
244                // Simulate latency spikes
245                for _ in 0..*duration_secs {
246                    if rand::random::<f64>() < 0.1 {
247                        let latency = rand::random::<u64>() % max_latency_ms;
248                        metrics.latency_injected_ms += latency;
249                        tokio::time::sleep(std::time::Duration::from_millis(latency)).await;
250                    }
251                    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
252                }
253            }
254            ChaosScenario::MemoryExhaustion {
255                duration_secs,
256                target_mb,
257            } => {
258                tracing::info!(
259                    duration_secs,
260                    target_mb,
261                    "Chaos engine running memory exhaustion scenario"
262                );
263
264                // Simulate memory pressure
265                let _memory_pressure = vec![0u8; (*target_mb * 1024 * 1024) as usize];
266                tokio::time::sleep(std::time::Duration::from_secs(*duration_secs)).await;
267            }
268            ChaosScenario::CpuSaturation {
269                duration_secs,
270                target_percent,
271            } => {
272                tracing::info!(
273                    duration_secs,
274                    target_percent,
275                    "Chaos engine running CPU saturation scenario"
276                );
277
278                // Simulate CPU stress
279                let start = std::time::Instant::now();
280                while start.elapsed().as_secs() < *duration_secs {
281                    if rand::random::<u8>() < *target_percent {
282                        // Simulate CPU work
283                        let _ = (0..1000).map(|i| i * i).collect::<Vec<_>>();
284                    }
285                    tokio::time::sleep(std::time::Duration::from_millis(10)).await;
286                }
287            }
288            ChaosScenario::NetworkPartition {
289                duration_secs,
290                affected_services,
291            } => {
292                tracing::info!(
293                    duration_secs,
294                    affected_services = ?affected_services,
295                    "Chaos engine running network partition scenario"
296                );
297
298                metrics.network_partitions += 1;
299                metrics
300                    .affected_services
301                    .extend(affected_services.iter().cloned());
302                tokio::time::sleep(std::time::Duration::from_secs(*duration_secs)).await;
303            }
304            ChaosScenario::CascadingFailures {
305                trigger_service,
306                propagation_delay_ms,
307            } => {
308                tracing::info!(
309                    trigger_service = %trigger_service,
310                    propagation_delay_ms,
311                    "Chaos engine running cascading failures scenario"
312                );
313
314                // Simulate cascading failure
315                metrics.failures_injected += 1;
316                metrics.affected_services.push(trigger_service.clone());
317
318                tokio::time::sleep(std::time::Duration::from_millis(*propagation_delay_ms)).await;
319
320                // Simulate propagation to other services
321                let cascade_services = vec!["service_b".to_string(), "service_c".to_string()];
322                metrics.failures_injected += cascade_services.len() as u64;
323                metrics.affected_services.extend(cascade_services);
324            }
325        }
326
327        // Remove from active scenarios
328        active.retain(|id| id != &scenario_id);
329        Ok(())
330    }
331
332    /// Get chaos metrics
333    pub async fn get_metrics(&self) -> ChaosMetrics {
334        self.metrics.read().await.clone()
335    }
336}
337
338impl ServicePlugin for ChaosEnginePlugin {
339    fn name(&self) -> &str {
340        &self.name
341    }
342
343    fn start(&self) -> Result<ServiceHandle> {
344        // Use tokio::task::block_in_place for async operations
345        tokio::task::block_in_place(|| {
346            tokio::runtime::Handle::current().block_on(async {
347                tracing::info!("Chaos engine starting");
348
349                // Run initial chaos scenarios
350                for scenario in &self.config.scenarios {
351                    if let Err(e) = self.run_scenario(scenario).await {
352                        tracing::warn!(error = %e, "Chaos scenario failed");
353                    }
354                }
355
356                let mut metadata = HashMap::new();
357                metadata.insert("chaos_engine_version".to_string(), "1.0.0".to_string());
358                metadata.insert(
359                    "failure_rate".to_string(),
360                    self.config.failure_rate.to_string(),
361                );
362                metadata.insert("latency_ms".to_string(), self.config.latency_ms.to_string());
363                metadata.insert(
364                    "scenarios_count".to_string(),
365                    self.config.scenarios.len().to_string(),
366                );
367                metadata.insert("service_type".to_string(), "chaos_engine".to_string());
368                metadata.insert("status".to_string(), "running".to_string());
369
370                Ok(ServiceHandle {
371                    id: Uuid::new_v4().to_string(),
372                    service_name: self.name.clone(),
373                    metadata,
374                })
375            })
376        })
377    }
378
379    fn stop(&self, _handle: ServiceHandle) -> Result<()> {
380        // Use tokio::task::block_in_place for async operations
381        tokio::task::block_in_place(|| {
382            tokio::runtime::Handle::current().block_on(async {
383                tracing::info!("Chaos engine stopping");
384
385                // Stop all active scenarios
386                let mut active = self.active_scenarios.write().await;
387                active.clear();
388
389                Ok(())
390            })
391        })
392    }
393
394    fn health_check(&self, handle: &ServiceHandle) -> HealthStatus {
395        if handle.metadata.contains_key("chaos_engine_version") {
396            HealthStatus::Healthy
397        } else {
398            HealthStatus::Unknown
399        }
400    }
401}