1use std::{collections::HashSet, net::SocketAddr, ops::Range, time::Duration};
10
11use crate::config::GlobalRng;
12
13use super::rng::SimulationRng;
14
15#[derive(Debug, Clone)]
17pub struct Partition {
18 pub side_a: HashSet<SocketAddr>,
20 pub side_b: HashSet<SocketAddr>,
22 pub start_time: u64,
24 pub heal_time: Option<u64>,
26}
27
28impl Partition {
29 pub fn new(side_a: HashSet<SocketAddr>, side_b: HashSet<SocketAddr>) -> Self {
31 Self {
32 side_a,
33 side_b,
34 start_time: 0,
35 heal_time: None,
36 }
37 }
38
39 pub fn with_duration(mut self, start_time: u64, duration: Duration) -> Self {
41 self.start_time = start_time;
42 self.heal_time = Some(start_time + duration.as_nanos() as u64);
43 self
44 }
45
46 pub fn permanent(mut self, start_time: u64) -> Self {
48 self.start_time = start_time;
49 self.heal_time = None;
50 self
51 }
52
53 pub fn blocks(&self, from: &SocketAddr, to: &SocketAddr, current_time: u64) -> bool {
55 if current_time < self.start_time {
57 return false;
58 }
59 if let Some(heal_time) = self.heal_time {
60 if current_time >= heal_time {
61 return false;
62 }
63 }
64
65 (self.side_a.contains(from) && self.side_b.contains(to))
67 || (self.side_b.contains(from) && self.side_a.contains(to))
68 }
69
70 pub fn is_healed(&self, current_time: u64) -> bool {
72 if let Some(heal_time) = self.heal_time {
73 current_time >= heal_time
74 } else {
75 false
76 }
77 }
78}
79
80#[derive(Debug, Clone)]
82pub struct FaultConfig {
83 pub message_loss_rate: f64,
85 pub latency_range: Option<Range<Duration>>,
87 pub partitions: Vec<Partition>,
89 pub crashed_nodes: HashSet<SocketAddr>,
91 pub node_crash_rate: f64,
93}
94
95impl Default for FaultConfig {
96 fn default() -> Self {
97 Self {
98 message_loss_rate: 0.0,
99 latency_range: None,
100 partitions: Vec::new(),
101 crashed_nodes: HashSet::new(),
102 node_crash_rate: 0.0,
103 }
104 }
105}
106
107impl FaultConfig {
108 pub fn new() -> Self {
110 Self::default()
111 }
112
113 pub fn builder() -> FaultConfigBuilder {
115 FaultConfigBuilder::new()
116 }
117
118 pub fn should_drop_message(&self, rng: &SimulationRng) -> bool {
120 self.message_loss_rate > 0.0 && rng.gen_bool(self.message_loss_rate)
121 }
122
123 pub fn should_drop_message_random(&self) -> bool {
129 self.message_loss_rate > 0.0 && GlobalRng::random_bool(self.message_loss_rate)
130 }
131
132 pub fn generate_latency(&self, rng: &SimulationRng) -> Option<Duration> {
136 self.latency_range
137 .as_ref()
138 .map(|range| rng.gen_duration(range.clone()))
139 }
140
141 pub fn base_latency(&self) -> Duration {
143 self.latency_range
144 .as_ref()
145 .map(|r| r.start)
146 .unwrap_or(Duration::ZERO)
147 }
148
149 pub fn is_partitioned(&self, from: &SocketAddr, to: &SocketAddr, current_time: u64) -> bool {
151 self.partitions
152 .iter()
153 .any(|p| p.blocks(from, to, current_time))
154 }
155
156 pub fn is_crashed(&self, addr: &SocketAddr) -> bool {
158 self.crashed_nodes.contains(addr)
159 }
160
161 pub fn can_deliver(
163 &self,
164 from: &SocketAddr,
165 to: &SocketAddr,
166 current_time: u64,
167 rng: &SimulationRng,
168 ) -> bool {
169 if self.is_crashed(from) || self.is_crashed(to) {
171 return false;
172 }
173
174 if self.is_partitioned(from, to, current_time) {
176 return false;
177 }
178
179 if self.should_drop_message(rng) {
181 return false;
182 }
183
184 true
185 }
186
187 pub fn add_partition(&mut self, partition: Partition) {
189 self.partitions.push(partition);
190 }
191
192 pub fn crash_node(&mut self, addr: SocketAddr) {
194 self.crashed_nodes.insert(addr);
195 }
196
197 pub fn recover_node(&mut self, addr: &SocketAddr) {
199 self.crashed_nodes.remove(addr);
200 }
201
202 pub fn cleanup_healed_partitions(&mut self, current_time: u64) {
204 self.partitions.retain(|p| !p.is_healed(current_time));
205 }
206
207 pub fn maybe_crash_node(&mut self, addr: SocketAddr, rng: &SimulationRng) -> bool {
209 if self.node_crash_rate > 0.0 && rng.gen_bool(self.node_crash_rate) {
210 self.crashed_nodes.insert(addr);
211 true
212 } else {
213 false
214 }
215 }
216}
217
218#[derive(Debug, Default)]
220pub struct FaultConfigBuilder {
221 config: FaultConfig,
222}
223
224impl FaultConfigBuilder {
225 pub fn new() -> Self {
227 Self::default()
228 }
229
230 pub fn message_loss_rate(mut self, rate: f64) -> Self {
232 self.config.message_loss_rate = rate.clamp(0.0, 1.0);
233 self
234 }
235
236 pub fn latency_range(mut self, range: Range<Duration>) -> Self {
238 self.config.latency_range = Some(range);
239 self
240 }
241
242 pub fn fixed_latency(mut self, latency: Duration) -> Self {
244 let epsilon = Duration::from_nanos(1);
245 self.config.latency_range = Some(latency..latency + epsilon);
246 self
247 }
248
249 pub fn partition(mut self, partition: Partition) -> Self {
251 self.config.partitions.push(partition);
252 self
253 }
254
255 pub fn crashed_node(mut self, addr: SocketAddr) -> Self {
257 self.config.crashed_nodes.insert(addr);
258 self
259 }
260
261 pub fn node_crash_rate(mut self, rate: f64) -> Self {
263 self.config.node_crash_rate = rate.clamp(0.0, 1.0);
264 self
265 }
266
267 pub fn build(self) -> FaultConfig {
269 self.config
270 }
271}
272
273#[cfg(test)]
274mod tests {
275 use super::*;
276 use std::net::{IpAddr, Ipv4Addr};
277
278 fn addr(port: u16) -> SocketAddr {
279 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port)
280 }
281
282 #[test]
283 fn test_no_faults_by_default() {
284 let config = FaultConfig::new();
285 let rng = SimulationRng::new(42);
286
287 assert!(config.can_deliver(&addr(1000), &addr(2000), 0, &rng));
288 }
289
290 #[test]
291 fn test_message_loss() {
292 let config = FaultConfig::builder()
293 .message_loss_rate(1.0) .build();
295 let rng = SimulationRng::new(42);
296
297 for _ in 0..100 {
299 assert!(config.should_drop_message(&rng));
300 }
301 }
302
303 #[test]
304 fn test_message_loss_zero() {
305 let config = FaultConfig::builder().message_loss_rate(0.0).build();
306 let rng = SimulationRng::new(42);
307
308 for _ in 0..100 {
310 assert!(!config.should_drop_message(&rng));
311 }
312 }
313
314 #[test]
315 fn test_latency_generation() {
316 let config = FaultConfig::builder()
317 .latency_range(Duration::from_millis(10)..Duration::from_millis(100))
318 .build();
319 let rng = SimulationRng::new(42);
320
321 for _ in 0..100 {
322 let latency = config.generate_latency(&rng).unwrap();
323 assert!(latency >= Duration::from_millis(10));
324 assert!(latency < Duration::from_millis(100));
325 }
326 }
327
328 #[test]
329 fn test_partition_blocks() {
330 let mut side_a = HashSet::new();
331 side_a.insert(addr(1000));
332 side_a.insert(addr(1001));
333
334 let mut side_b = HashSet::new();
335 side_b.insert(addr(2000));
336 side_b.insert(addr(2001));
337
338 let partition = Partition::new(side_a, side_b).permanent(0);
339
340 assert!(partition.blocks(&addr(1000), &addr(2000), 100));
342 assert!(partition.blocks(&addr(2000), &addr(1000), 100));
343
344 assert!(!partition.blocks(&addr(1000), &addr(1001), 100));
346 assert!(!partition.blocks(&addr(2000), &addr(2001), 100));
347
348 assert!(!partition.blocks(&addr(3000), &addr(1000), 100));
350 }
351
352 #[test]
353 fn test_partition_with_duration() {
354 let mut side_a = HashSet::new();
355 side_a.insert(addr(1000));
356
357 let mut side_b = HashSet::new();
358 side_b.insert(addr(2000));
359
360 let partition =
361 Partition::new(side_a, side_b).with_duration(100, Duration::from_nanos(500));
362
363 assert!(!partition.blocks(&addr(1000), &addr(2000), 50));
365
366 assert!(partition.blocks(&addr(1000), &addr(2000), 200));
368
369 assert!(!partition.blocks(&addr(1000), &addr(2000), 700));
371 }
372
373 #[test]
374 fn test_crashed_node() {
375 let config = FaultConfig::builder().crashed_node(addr(1000)).build();
376 let rng = SimulationRng::new(42);
377
378 assert!(!config.can_deliver(&addr(1000), &addr(2000), 0, &rng));
380
381 assert!(!config.can_deliver(&addr(2000), &addr(1000), 0, &rng));
383
384 assert!(config.can_deliver(&addr(2000), &addr(3000), 0, &rng));
386 }
387
388 #[test]
389 fn test_crash_recovery() {
390 let mut config = FaultConfig::new();
391 config.crash_node(addr(1000));
392
393 assert!(config.is_crashed(&addr(1000)));
394
395 config.recover_node(&addr(1000));
396
397 assert!(!config.is_crashed(&addr(1000)));
398 }
399
400 #[test]
401 fn test_partition_cleanup() {
402 let mut config = FaultConfig::new();
403
404 let mut side_a = HashSet::new();
405 side_a.insert(addr(1000));
406
407 let mut side_b = HashSet::new();
408 side_b.insert(addr(2000));
409
410 config.add_partition(
411 Partition::new(side_a.clone(), side_b.clone())
412 .with_duration(0, Duration::from_nanos(100)),
413 );
414
415 config.add_partition(Partition::new(side_a, side_b).permanent(0));
416
417 assert_eq!(config.partitions.len(), 2);
418
419 config.cleanup_healed_partitions(200);
420
421 assert_eq!(config.partitions.len(), 1); }
423
424 #[test]
425 fn test_builder() {
426 let mut side_a = HashSet::new();
427 side_a.insert(addr(1000));
428
429 let mut side_b = HashSet::new();
430 side_b.insert(addr(2000));
431
432 let config = FaultConfig::builder()
433 .message_loss_rate(0.1)
434 .latency_range(Duration::from_millis(10)..Duration::from_millis(50))
435 .partition(Partition::new(side_a, side_b))
436 .crashed_node(addr(3000))
437 .node_crash_rate(0.01)
438 .build();
439
440 assert_eq!(config.message_loss_rate, 0.1);
441 assert!(config.latency_range.is_some());
442 assert_eq!(config.partitions.len(), 1);
443 assert!(config.crashed_nodes.contains(&addr(3000)));
444 assert_eq!(config.node_crash_rate, 0.01);
445 }
446}