1use crate::cleanroom::{HealthStatus, ServiceHandle, ServicePlugin};
7use crate::error::Result;
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::RwLock;
11use uuid::Uuid;
12
13#[derive(Debug, Clone)]
15pub struct ChaosConfig {
16 pub failure_rate: f64,
18 pub latency_ms: u64,
20 pub network_partition_rate: f64,
22 pub memory_pressure_mb: u64,
24 pub cpu_stress_percent: u8,
26 pub scenarios: Vec<ChaosScenario>,
28}
29
30#[derive(Debug, Clone)]
32pub enum ChaosScenario {
33 RandomFailures {
35 duration_secs: u64,
36 failure_rate: f64,
37 },
38 LatencySpikes {
40 duration_secs: u64,
41 max_latency_ms: u64,
42 },
43 MemoryExhaustion { duration_secs: u64, target_mb: u64 },
45 CpuSaturation {
47 duration_secs: u64,
48 target_percent: u8,
49 },
50 NetworkPartition {
52 duration_secs: u64,
53 affected_services: Vec<String>,
54 },
55 CascadingFailures {
57 trigger_service: String,
58 propagation_delay_ms: u64,
59 },
60}
61
62impl Default for ChaosConfig {
63 fn default() -> Self {
64 Self {
65 failure_rate: 0.1,
66 latency_ms: 100,
67 network_partition_rate: 0.05,
68 memory_pressure_mb: 100,
69 cpu_stress_percent: 50,
70 scenarios: vec![
71 ChaosScenario::RandomFailures {
72 duration_secs: 30,
73 failure_rate: 0.2,
74 },
75 ChaosScenario::LatencySpikes {
76 duration_secs: 60,
77 max_latency_ms: 500,
78 },
79 ],
80 }
81 }
82}
83
84#[derive(Debug)]
86pub struct ChaosEnginePlugin {
87 name: String,
88 config: ChaosConfig,
89 active_scenarios: Arc<RwLock<Vec<String>>>,
90 metrics: Arc<RwLock<ChaosMetrics>>,
91}
92
93#[derive(Debug, Default, Clone)]
95pub struct ChaosMetrics {
96 pub failures_injected: u64,
98 pub latency_injected_ms: u64,
100 pub network_partitions: u64,
102 pub affected_services: Vec<String>,
104 pub scenarios_executed: u64,
106}
107
108impl ChaosEnginePlugin {
109 pub fn new(name: &str) -> Self {
111 Self {
112 name: name.to_string(),
113 config: ChaosConfig::default(),
114 active_scenarios: Arc::new(RwLock::new(Vec::new())),
115 metrics: Arc::new(RwLock::new(ChaosMetrics::default())),
116 }
117 }
118
119 pub fn with_config(name: &str, config: ChaosConfig) -> Self {
121 Self {
122 name: name.to_string(),
123 config,
124 active_scenarios: Arc::new(RwLock::new(Vec::new())),
125 metrics: Arc::new(RwLock::new(ChaosMetrics::default())),
126 }
127 }
128
129 pub fn with_failure_rate(mut self, rate: f64) -> Self {
131 self.config.failure_rate = rate.clamp(0.0, 1.0);
132 self
133 }
134
135 pub fn with_latency(mut self, latency_ms: u64) -> Self {
137 self.config.latency_ms = latency_ms;
138 self
139 }
140
141 pub fn with_scenario(mut self, scenario: ChaosScenario) -> Self {
143 self.config.scenarios.push(scenario);
144 self
145 }
146
147 pub async fn inject_failure(&self, service_name: &str) -> Result<bool> {
149 let should_fail = rand::random::<f64>() < self.config.failure_rate;
150
151 if should_fail {
152 let mut metrics = self.metrics.write().await;
153 metrics.failures_injected += 1;
154 metrics.affected_services.push(service_name.to_string());
155
156 tracing::info!(
157 service = %service_name,
158 "Chaos engine injecting failure"
159 );
160 Ok(true)
161 } else {
162 Ok(false)
163 }
164 }
165
166 pub async fn inject_latency(&self, service_name: &str) -> Result<u64> {
168 let latency = if rand::random::<f64>() < 0.3 {
169 self.config.latency_ms + rand::random::<u64>() % 200
170 } else {
171 0
172 };
173
174 if latency > 0 {
175 let mut metrics = self.metrics.write().await;
176 metrics.latency_injected_ms += latency;
177
178 tracing::info!(
179 service = %service_name,
180 latency_ms = latency,
181 "Chaos engine injecting latency"
182 );
183
184 tokio::time::sleep(std::time::Duration::from_millis(latency)).await;
186 }
187
188 Ok(latency)
189 }
190
191 pub async fn create_network_partition(&self, services: &[String]) -> Result<()> {
193 if rand::random::<f64>() < self.config.network_partition_rate {
194 let mut metrics = self.metrics.write().await;
195 metrics.network_partitions += 1;
196 metrics.affected_services.extend(services.iter().cloned());
197
198 tracing::info!(
199 services = ?services,
200 "Chaos engine creating network partition"
201 );
202 }
203 Ok(())
204 }
205
206 pub async fn run_scenario(&self, scenario: &ChaosScenario) -> Result<()> {
208 let scenario_id = Uuid::new_v4().to_string();
209 let mut active = self.active_scenarios.write().await;
210 active.push(scenario_id.clone());
211
212 let mut metrics = self.metrics.write().await;
213 metrics.scenarios_executed += 1;
214
215 match scenario {
216 ChaosScenario::RandomFailures {
217 duration_secs,
218 failure_rate,
219 } => {
220 tracing::info!(
221 duration_secs,
222 failure_rate_percent = failure_rate * 100.0,
223 "Chaos engine running random failures scenario"
224 );
225
226 for _ in 0..*duration_secs {
228 if rand::random::<f64>() < *failure_rate {
229 metrics.failures_injected += 1;
230 }
231 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
232 }
233 }
234 ChaosScenario::LatencySpikes {
235 duration_secs,
236 max_latency_ms,
237 } => {
238 tracing::info!(
239 duration_secs,
240 max_latency_ms,
241 "Chaos engine running latency spikes scenario"
242 );
243
244 for _ in 0..*duration_secs {
246 if rand::random::<f64>() < 0.1 {
247 let latency = rand::random::<u64>() % max_latency_ms;
248 metrics.latency_injected_ms += latency;
249 tokio::time::sleep(std::time::Duration::from_millis(latency)).await;
250 }
251 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
252 }
253 }
254 ChaosScenario::MemoryExhaustion {
255 duration_secs,
256 target_mb,
257 } => {
258 tracing::info!(
259 duration_secs,
260 target_mb,
261 "Chaos engine running memory exhaustion scenario"
262 );
263
264 let _memory_pressure = vec![0u8; (*target_mb * 1024 * 1024) as usize];
266 tokio::time::sleep(std::time::Duration::from_secs(*duration_secs)).await;
267 }
268 ChaosScenario::CpuSaturation {
269 duration_secs,
270 target_percent,
271 } => {
272 tracing::info!(
273 duration_secs,
274 target_percent,
275 "Chaos engine running CPU saturation scenario"
276 );
277
278 let start = std::time::Instant::now();
280 while start.elapsed().as_secs() < *duration_secs {
281 if rand::random::<u8>() < *target_percent {
282 let _ = (0..1000).map(|i| i * i).collect::<Vec<_>>();
284 }
285 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
286 }
287 }
288 ChaosScenario::NetworkPartition {
289 duration_secs,
290 affected_services,
291 } => {
292 tracing::info!(
293 duration_secs,
294 affected_services = ?affected_services,
295 "Chaos engine running network partition scenario"
296 );
297
298 metrics.network_partitions += 1;
299 metrics
300 .affected_services
301 .extend(affected_services.iter().cloned());
302 tokio::time::sleep(std::time::Duration::from_secs(*duration_secs)).await;
303 }
304 ChaosScenario::CascadingFailures {
305 trigger_service,
306 propagation_delay_ms,
307 } => {
308 tracing::info!(
309 trigger_service = %trigger_service,
310 propagation_delay_ms,
311 "Chaos engine running cascading failures scenario"
312 );
313
314 metrics.failures_injected += 1;
316 metrics.affected_services.push(trigger_service.clone());
317
318 tokio::time::sleep(std::time::Duration::from_millis(*propagation_delay_ms)).await;
319
320 let cascade_services = vec!["service_b".to_string(), "service_c".to_string()];
322 metrics.failures_injected += cascade_services.len() as u64;
323 metrics.affected_services.extend(cascade_services);
324 }
325 }
326
327 active.retain(|id| id != &scenario_id);
329 Ok(())
330 }
331
332 pub async fn get_metrics(&self) -> ChaosMetrics {
334 self.metrics.read().await.clone()
335 }
336}
337
338impl ServicePlugin for ChaosEnginePlugin {
339 fn name(&self) -> &str {
340 &self.name
341 }
342
343 fn start(&self) -> Result<ServiceHandle> {
344 tokio::task::block_in_place(|| {
346 tokio::runtime::Handle::current().block_on(async {
347 tracing::info!("Chaos engine starting");
348
349 for scenario in &self.config.scenarios {
351 if let Err(e) = self.run_scenario(scenario).await {
352 tracing::warn!(error = %e, "Chaos scenario failed");
353 }
354 }
355
356 let mut metadata = HashMap::new();
357 metadata.insert("chaos_engine_version".to_string(), "1.0.0".to_string());
358 metadata.insert(
359 "failure_rate".to_string(),
360 self.config.failure_rate.to_string(),
361 );
362 metadata.insert("latency_ms".to_string(), self.config.latency_ms.to_string());
363 metadata.insert(
364 "scenarios_count".to_string(),
365 self.config.scenarios.len().to_string(),
366 );
367 metadata.insert("service_type".to_string(), "chaos_engine".to_string());
368 metadata.insert("status".to_string(), "running".to_string());
369
370 Ok(ServiceHandle {
371 id: Uuid::new_v4().to_string(),
372 service_name: self.name.clone(),
373 metadata,
374 })
375 })
376 })
377 }
378
379 fn stop(&self, _handle: ServiceHandle) -> Result<()> {
380 tokio::task::block_in_place(|| {
382 tokio::runtime::Handle::current().block_on(async {
383 tracing::info!("Chaos engine stopping");
384
385 let mut active = self.active_scenarios.write().await;
387 active.clear();
388
389 Ok(())
390 })
391 })
392 }
393
394 fn health_check(&self, handle: &ServiceHandle) -> HealthStatus {
395 if handle.metadata.contains_key("chaos_engine_version") {
396 HealthStatus::Healthy
397 } else {
398 HealthStatus::Unknown
399 }
400 }
401}