1use std::sync::Arc;
10use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
11use std::time::Duration;
12
13#[derive(Debug, Clone)]
15pub struct ChaosConfig {
16 pub io_failure_pct: u8,
18 pub network_delay_pct: u8,
20 pub network_delay_range: (Duration, Duration),
22 pub clock_skew_pct: u8,
24 pub clock_skew_range_ms: (i64, i64),
26 pub alloc_failure_pct: u8,
28 pub enabled: bool,
30}
31
32impl Default for ChaosConfig {
33 fn default() -> Self {
34 Self {
35 io_failure_pct: 0,
36 network_delay_pct: 0,
37 network_delay_range: (Duration::from_millis(10), Duration::from_millis(100)),
38 clock_skew_pct: 0,
39 clock_skew_range_ms: (-100, 100),
40 alloc_failure_pct: 0,
41 enabled: false,
42 }
43 }
44}
45
46impl ChaosConfig {
47 #[must_use]
49 pub fn mild() -> Self {
50 Self {
51 io_failure_pct: 1,
52 network_delay_pct: 5,
53 network_delay_range: (Duration::from_millis(1), Duration::from_millis(10)),
54 clock_skew_pct: 2,
55 clock_skew_range_ms: (-10, 10),
56 alloc_failure_pct: 0,
57 enabled: true,
58 }
59 }
60
61 #[must_use]
63 pub fn aggressive() -> Self {
64 Self {
65 io_failure_pct: 5,
66 network_delay_pct: 20,
67 network_delay_range: (Duration::from_millis(50), Duration::from_millis(500)),
68 clock_skew_pct: 10,
69 clock_skew_range_ms: (-500, 500),
70 alloc_failure_pct: 1,
71 enabled: true,
72 }
73 }
74}
75
76pub struct ChaosInjector {
78 config: ChaosConfig,
79 injection_count: AtomicU32,
80 paused: AtomicBool,
81 seed: AtomicU32,
82}
83
84impl ChaosInjector {
85 #[must_use]
86 pub fn new(config: ChaosConfig) -> Self {
87 Self {
88 config,
89 injection_count: AtomicU32::new(0),
90 paused: AtomicBool::new(false),
91 seed: AtomicU32::new(12345),
92 }
93 }
94
95 #[must_use]
97 pub fn disabled() -> Self {
98 Self::new(ChaosConfig::default())
99 }
100
101 fn next_rand(&self) -> u32 {
103 let mut seed = self.seed.load(Ordering::Relaxed);
104 seed = seed.wrapping_mul(1_103_515_245).wrapping_add(12345);
105 self.seed.store(seed, Ordering::Relaxed);
106 (seed >> 16) & 0x7fff
107 }
108
109 fn should_inject(&self, probability_pct: u8) -> bool {
110 if !self.config.enabled || self.paused.load(Ordering::Relaxed) {
111 return false;
112 }
113 if probability_pct == 0 {
114 return false;
115 }
116 let rand = self.next_rand() % 100;
117 rand < u32::from(probability_pct)
118 }
119
120 pub fn should_fail_io(&self) -> bool {
122 let result = self.should_inject(self.config.io_failure_pct);
123 if result {
124 self.injection_count.fetch_add(1, Ordering::Relaxed);
125 }
126 result
127 }
128
129 pub fn should_delay_network(&self) -> Option<Duration> {
131 if !self.should_inject(self.config.network_delay_pct) {
132 return None;
133 }
134
135 self.injection_count.fetch_add(1, Ordering::Relaxed);
136
137 let (min, max) = self.config.network_delay_range;
138 #[allow(clippy::cast_possible_truncation)]
139 let min_ms = min.as_millis().min(u128::from(u32::MAX)) as u32;
140 #[allow(clippy::cast_possible_truncation)]
141 let max_ms = max.as_millis().min(u128::from(u32::MAX)) as u32;
142 let range = max_ms.saturating_sub(min_ms);
143 let delay_ms = if range > 0 {
144 min_ms + (self.next_rand() % range)
145 } else {
146 min_ms
147 };
148
149 Some(Duration::from_millis(u64::from(delay_ms)))
150 }
151
152 pub fn get_clock_skew_ms(&self) -> Option<i64> {
154 if !self.should_inject(self.config.clock_skew_pct) {
155 return None;
156 }
157
158 self.injection_count.fetch_add(1, Ordering::Relaxed);
159
160 let (min, max) = self.config.clock_skew_range_ms;
161 #[allow(clippy::cast_possible_truncation)]
162 let range = (max - min).unsigned_abs() as u32;
163 let skew = if range > 0 {
164 min + i64::from(self.next_rand() % range)
165 } else {
166 min
167 };
168
169 Some(skew)
170 }
171
172 pub fn should_fail_alloc(&self) -> bool {
174 let result = self.should_inject(self.config.alloc_failure_pct);
175 if result {
176 self.injection_count.fetch_add(1, Ordering::Relaxed);
177 }
178 result
179 }
180
181 pub fn pause(&self) {
183 self.paused.store(true, Ordering::Relaxed);
184 }
185
186 pub fn resume(&self) {
188 self.paused.store(false, Ordering::Relaxed);
189 }
190
191 pub fn injection_count(&self) -> u32 {
193 self.injection_count.load(Ordering::Relaxed)
194 }
195
196 pub fn reset_count(&self) {
198 self.injection_count.store(0, Ordering::Relaxed);
199 }
200
201 pub fn is_enabled(&self) -> bool {
203 self.config.enabled && !self.paused.load(Ordering::Relaxed)
204 }
205}
206
207pub type SharedChaosInjector = Arc<ChaosInjector>;
209
210#[must_use]
212pub fn shared_injector(config: ChaosConfig) -> SharedChaosInjector {
213 Arc::new(ChaosInjector::new(config))
214}
215
216#[derive(Debug, Clone, Copy, PartialEq, Eq)]
218pub enum FaultType {
219 DiskWriteFailure,
221 DiskReadFailure,
223 NetworkPartition,
225 NetworkLatency,
227 LeaderCrash,
229 FollowerCrash,
231 ClockJumpForward,
233 ClockJumpBackward,
235 MemoryPressure,
237 CpuStall,
239}
240
241impl FaultType {
242 #[must_use]
243 pub fn as_str(&self) -> &'static str {
244 match self {
245 FaultType::DiskWriteFailure => "disk_write_failure",
246 FaultType::DiskReadFailure => "disk_read_failure",
247 FaultType::NetworkPartition => "network_partition",
248 FaultType::NetworkLatency => "network_latency",
249 FaultType::LeaderCrash => "leader_crash",
250 FaultType::FollowerCrash => "follower_crash",
251 FaultType::ClockJumpForward => "clock_jump_forward",
252 FaultType::ClockJumpBackward => "clock_jump_backward",
253 FaultType::MemoryPressure => "memory_pressure",
254 FaultType::CpuStall => "cpu_stall",
255 }
256 }
257}
258
259#[derive(Debug, Clone)]
261pub struct ChaosScenario {
262 pub name: String,
263 pub faults: Vec<(FaultType, Duration)>,
264 pub duration: Duration,
265}
266
267impl ChaosScenario {
268 pub fn new(name: impl Into<String>) -> Self {
270 Self {
271 name: name.into(),
272 faults: Vec::new(),
273 duration: Duration::from_secs(60),
274 }
275 }
276
277 #[must_use]
279 pub fn with_fault(mut self, fault: FaultType, at: Duration) -> Self {
280 self.faults.push((fault, at));
281 self
282 }
283
284 #[must_use]
286 pub fn with_duration(mut self, duration: Duration) -> Self {
287 self.duration = duration;
288 self
289 }
290
291 #[must_use]
293 pub fn leader_election() -> Self {
294 Self::new("leader_election")
295 .with_fault(FaultType::LeaderCrash, Duration::from_secs(5))
296 .with_duration(Duration::from_secs(30))
297 }
298
299 #[must_use]
301 pub fn network_partition() -> Self {
302 Self::new("network_partition")
303 .with_fault(FaultType::NetworkPartition, Duration::from_secs(2))
304 .with_duration(Duration::from_secs(30))
305 }
306
307 #[must_use]
309 pub fn disk_failure() -> Self {
310 Self::new("disk_failure")
311 .with_fault(FaultType::DiskWriteFailure, Duration::from_secs(3))
312 .with_fault(FaultType::DiskReadFailure, Duration::from_secs(10))
313 .with_duration(Duration::from_secs(30))
314 }
315
316 #[must_use]
318 pub fn clock_skew() -> Self {
319 Self::new("clock_skew")
320 .with_fault(FaultType::ClockJumpForward, Duration::from_secs(5))
321 .with_fault(FaultType::ClockJumpBackward, Duration::from_secs(15))
322 .with_duration(Duration::from_secs(30))
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use super::*;
329
330 #[test]
331 fn test_chaos_config_default_disabled() {
332 let config = ChaosConfig::default();
333 assert!(!config.enabled);
334 assert_eq!(config.io_failure_pct, 0);
335 }
336
337 #[test]
338 fn test_chaos_injector_disabled() {
339 let injector = ChaosInjector::disabled();
340
341 for _ in 0..100 {
343 assert!(!injector.should_fail_io());
344 assert!(injector.should_delay_network().is_none());
345 }
346 }
347
348 #[test]
349 fn test_chaos_injector_aggressive() {
350 let injector = ChaosInjector::new(ChaosConfig::aggressive());
351
352 let mut failures = 0;
354 for _ in 0..1000 {
355 if injector.should_fail_io() {
356 failures += 1;
357 }
358 }
359
360 assert!(failures > 0);
362 assert!(failures < 200); }
364
365 #[test]
366 fn test_chaos_injector_pause_resume() {
367 let config = ChaosConfig {
368 io_failure_pct: 100, enabled: true,
370 ..Default::default()
371 };
372 let injector = ChaosInjector::new(config);
373
374 assert!(injector.should_fail_io());
376
377 injector.pause();
379 assert!(!injector.should_fail_io());
380
381 injector.resume();
383 assert!(injector.should_fail_io());
384 }
385
386 #[test]
387 fn test_chaos_scenario() {
388 let scenario = ChaosScenario::leader_election();
389 assert_eq!(scenario.name, "leader_election");
390 assert_eq!(scenario.faults.len(), 1);
391 assert_eq!(scenario.faults[0].0, FaultType::LeaderCrash);
392 }
393
394 #[test]
395 fn test_fault_type_as_str() {
396 assert_eq!(FaultType::DiskWriteFailure.as_str(), "disk_write_failure");
397 assert_eq!(FaultType::NetworkPartition.as_str(), "network_partition");
398 }
399}