freenet/simulation/
fault.rs

1//! Fault injection configuration for simulation testing.
2//!
3//! This module provides configuration for injecting various faults:
4//! - Message loss (random drops)
5//! - Latency variation
6//! - Network partitions
7//! - Node crashes
8
9use std::{collections::HashSet, net::SocketAddr, ops::Range, time::Duration};
10
11use crate::config::GlobalRng;
12
13use super::rng::SimulationRng;
14
15/// A network partition between sets of peers.
16#[derive(Debug, Clone)]
17pub struct Partition {
18    /// One side of the partition
19    pub side_a: HashSet<SocketAddr>,
20    /// Other side of the partition
21    pub side_b: HashSet<SocketAddr>,
22    /// When the partition started (virtual nanos)
23    pub start_time: u64,
24    /// When the partition heals (virtual nanos), None for permanent
25    pub heal_time: Option<u64>,
26}
27
28impl Partition {
29    /// Creates a new partition between two sets of peers.
30    pub fn new(side_a: HashSet<SocketAddr>, side_b: HashSet<SocketAddr>) -> Self {
31        Self {
32            side_a,
33            side_b,
34            start_time: 0,
35            heal_time: None,
36        }
37    }
38
39    /// Creates a partition that heals after the given duration.
40    pub fn with_duration(mut self, start_time: u64, duration: Duration) -> Self {
41        self.start_time = start_time;
42        self.heal_time = Some(start_time + duration.as_nanos() as u64);
43        self
44    }
45
46    /// Creates a permanent partition.
47    pub fn permanent(mut self, start_time: u64) -> Self {
48        self.start_time = start_time;
49        self.heal_time = None;
50        self
51    }
52
53    /// Checks if this partition blocks communication between two peers at the given time.
54    pub fn blocks(&self, from: &SocketAddr, to: &SocketAddr, current_time: u64) -> bool {
55        // Check if partition is active
56        if current_time < self.start_time {
57            return false;
58        }
59        if let Some(heal_time) = self.heal_time {
60            if current_time >= heal_time {
61                return false;
62            }
63        }
64
65        // Check if peers are on opposite sides
66        (self.side_a.contains(from) && self.side_b.contains(to))
67            || (self.side_b.contains(from) && self.side_a.contains(to))
68    }
69
70    /// Returns true if this partition has healed by the given time.
71    pub fn is_healed(&self, current_time: u64) -> bool {
72        if let Some(heal_time) = self.heal_time {
73            current_time >= heal_time
74        } else {
75            false
76        }
77    }
78}
79
80/// Configuration for fault injection during simulation.
81#[derive(Debug, Clone)]
82pub struct FaultConfig {
83    /// Probability of dropping a message (0.0 to 1.0)
84    pub message_loss_rate: f64,
85    /// Range of latency to add to message delivery
86    pub latency_range: Option<Range<Duration>>,
87    /// Active network partitions
88    pub partitions: Vec<Partition>,
89    /// Set of crashed nodes (no messages sent or received)
90    pub crashed_nodes: HashSet<SocketAddr>,
91    /// Probability of a node crashing per time step
92    pub node_crash_rate: f64,
93}
94
95impl Default for FaultConfig {
96    fn default() -> Self {
97        Self {
98            message_loss_rate: 0.0,
99            latency_range: None,
100            partitions: Vec::new(),
101            crashed_nodes: HashSet::new(),
102            node_crash_rate: 0.0,
103        }
104    }
105}
106
107impl FaultConfig {
108    /// Creates a new empty fault configuration (no faults).
109    pub fn new() -> Self {
110        Self::default()
111    }
112
113    /// Creates a builder for fluent configuration.
114    pub fn builder() -> FaultConfigBuilder {
115        FaultConfigBuilder::new()
116    }
117
118    /// Returns true if a message should be dropped.
119    pub fn should_drop_message(&self, rng: &SimulationRng) -> bool {
120        self.message_loss_rate > 0.0 && rng.gen_bool(self.message_loss_rate)
121    }
122
123    /// Returns true if a message should be dropped, using thread-local RNG.
124    ///
125    /// This is useful for the in-memory transport bridge where we don't
126    /// have access to the simulation's deterministic RNG.
127    /// Note: Uses GlobalRng which is deterministic when a seed is set.
128    pub fn should_drop_message_random(&self) -> bool {
129        self.message_loss_rate > 0.0 && GlobalRng::random_bool(self.message_loss_rate)
130    }
131
132    /// Generates a latency duration for a message.
133    ///
134    /// Returns None if no latency range is configured.
135    pub fn generate_latency(&self, rng: &SimulationRng) -> Option<Duration> {
136        self.latency_range
137            .as_ref()
138            .map(|range| rng.gen_duration(range.clone()))
139    }
140
141    /// Returns the base latency (minimum of range) or zero.
142    pub fn base_latency(&self) -> Duration {
143        self.latency_range
144            .as_ref()
145            .map(|r| r.start)
146            .unwrap_or(Duration::ZERO)
147    }
148
149    /// Checks if communication between two peers is blocked by a partition.
150    pub fn is_partitioned(&self, from: &SocketAddr, to: &SocketAddr, current_time: u64) -> bool {
151        self.partitions
152            .iter()
153            .any(|p| p.blocks(from, to, current_time))
154    }
155
156    /// Checks if a node has crashed.
157    pub fn is_crashed(&self, addr: &SocketAddr) -> bool {
158        self.crashed_nodes.contains(addr)
159    }
160
161    /// Checks if a message can be delivered (not dropped, not partitioned, nodes not crashed).
162    pub fn can_deliver(
163        &self,
164        from: &SocketAddr,
165        to: &SocketAddr,
166        current_time: u64,
167        rng: &SimulationRng,
168    ) -> bool {
169        // Check for crashed nodes
170        if self.is_crashed(from) || self.is_crashed(to) {
171            return false;
172        }
173
174        // Check for partitions
175        if self.is_partitioned(from, to, current_time) {
176            return false;
177        }
178
179        // Check for random drop
180        if self.should_drop_message(rng) {
181            return false;
182        }
183
184        true
185    }
186
187    /// Adds a partition to the configuration.
188    pub fn add_partition(&mut self, partition: Partition) {
189        self.partitions.push(partition);
190    }
191
192    /// Crashes a node.
193    pub fn crash_node(&mut self, addr: SocketAddr) {
194        self.crashed_nodes.insert(addr);
195    }
196
197    /// Recovers a crashed node.
198    pub fn recover_node(&mut self, addr: &SocketAddr) {
199        self.crashed_nodes.remove(addr);
200    }
201
202    /// Removes healed partitions.
203    pub fn cleanup_healed_partitions(&mut self, current_time: u64) {
204        self.partitions.retain(|p| !p.is_healed(current_time));
205    }
206
207    /// Potentially crashes a node based on crash rate.
208    pub fn maybe_crash_node(&mut self, addr: SocketAddr, rng: &SimulationRng) -> bool {
209        if self.node_crash_rate > 0.0 && rng.gen_bool(self.node_crash_rate) {
210            self.crashed_nodes.insert(addr);
211            true
212        } else {
213            false
214        }
215    }
216}
217
218/// Builder for creating FaultConfig with fluent API.
219#[derive(Debug, Default)]
220pub struct FaultConfigBuilder {
221    config: FaultConfig,
222}
223
224impl FaultConfigBuilder {
225    /// Creates a new builder.
226    pub fn new() -> Self {
227        Self::default()
228    }
229
230    /// Sets the message loss rate (0.0 to 1.0).
231    pub fn message_loss_rate(mut self, rate: f64) -> Self {
232        self.config.message_loss_rate = rate.clamp(0.0, 1.0);
233        self
234    }
235
236    /// Sets the latency range for message delivery.
237    pub fn latency_range(mut self, range: Range<Duration>) -> Self {
238        self.config.latency_range = Some(range);
239        self
240    }
241
242    /// Sets fixed latency for message delivery.
243    pub fn fixed_latency(mut self, latency: Duration) -> Self {
244        let epsilon = Duration::from_nanos(1);
245        self.config.latency_range = Some(latency..latency + epsilon);
246        self
247    }
248
249    /// Adds a network partition.
250    pub fn partition(mut self, partition: Partition) -> Self {
251        self.config.partitions.push(partition);
252        self
253    }
254
255    /// Marks a node as crashed.
256    pub fn crashed_node(mut self, addr: SocketAddr) -> Self {
257        self.config.crashed_nodes.insert(addr);
258        self
259    }
260
261    /// Sets the node crash rate per step.
262    pub fn node_crash_rate(mut self, rate: f64) -> Self {
263        self.config.node_crash_rate = rate.clamp(0.0, 1.0);
264        self
265    }
266
267    /// Builds the FaultConfig.
268    pub fn build(self) -> FaultConfig {
269        self.config
270    }
271}
272
273#[cfg(test)]
274mod tests {
275    use super::*;
276    use std::net::{IpAddr, Ipv4Addr};
277
278    fn addr(port: u16) -> SocketAddr {
279        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port)
280    }
281
282    #[test]
283    fn test_no_faults_by_default() {
284        let config = FaultConfig::new();
285        let rng = SimulationRng::new(42);
286
287        assert!(config.can_deliver(&addr(1000), &addr(2000), 0, &rng));
288    }
289
290    #[test]
291    fn test_message_loss() {
292        let config = FaultConfig::builder()
293            .message_loss_rate(1.0) // 100% loss
294            .build();
295        let rng = SimulationRng::new(42);
296
297        // Should always drop with 100% loss rate
298        for _ in 0..100 {
299            assert!(config.should_drop_message(&rng));
300        }
301    }
302
303    #[test]
304    fn test_message_loss_zero() {
305        let config = FaultConfig::builder().message_loss_rate(0.0).build();
306        let rng = SimulationRng::new(42);
307
308        // Should never drop with 0% loss rate
309        for _ in 0..100 {
310            assert!(!config.should_drop_message(&rng));
311        }
312    }
313
314    #[test]
315    fn test_latency_generation() {
316        let config = FaultConfig::builder()
317            .latency_range(Duration::from_millis(10)..Duration::from_millis(100))
318            .build();
319        let rng = SimulationRng::new(42);
320
321        for _ in 0..100 {
322            let latency = config.generate_latency(&rng).unwrap();
323            assert!(latency >= Duration::from_millis(10));
324            assert!(latency < Duration::from_millis(100));
325        }
326    }
327
328    #[test]
329    fn test_partition_blocks() {
330        let mut side_a = HashSet::new();
331        side_a.insert(addr(1000));
332        side_a.insert(addr(1001));
333
334        let mut side_b = HashSet::new();
335        side_b.insert(addr(2000));
336        side_b.insert(addr(2001));
337
338        let partition = Partition::new(side_a, side_b).permanent(0);
339
340        // Cross-partition messages blocked
341        assert!(partition.blocks(&addr(1000), &addr(2000), 100));
342        assert!(partition.blocks(&addr(2000), &addr(1000), 100));
343
344        // Same-side messages not blocked
345        assert!(!partition.blocks(&addr(1000), &addr(1001), 100));
346        assert!(!partition.blocks(&addr(2000), &addr(2001), 100));
347
348        // Messages to/from non-partition nodes not blocked
349        assert!(!partition.blocks(&addr(3000), &addr(1000), 100));
350    }
351
352    #[test]
353    fn test_partition_with_duration() {
354        let mut side_a = HashSet::new();
355        side_a.insert(addr(1000));
356
357        let mut side_b = HashSet::new();
358        side_b.insert(addr(2000));
359
360        let partition =
361            Partition::new(side_a, side_b).with_duration(100, Duration::from_nanos(500));
362
363        // Before partition starts
364        assert!(!partition.blocks(&addr(1000), &addr(2000), 50));
365
366        // During partition
367        assert!(partition.blocks(&addr(1000), &addr(2000), 200));
368
369        // After partition heals
370        assert!(!partition.blocks(&addr(1000), &addr(2000), 700));
371    }
372
373    #[test]
374    fn test_crashed_node() {
375        let config = FaultConfig::builder().crashed_node(addr(1000)).build();
376        let rng = SimulationRng::new(42);
377
378        // Messages from crashed node blocked
379        assert!(!config.can_deliver(&addr(1000), &addr(2000), 0, &rng));
380
381        // Messages to crashed node blocked
382        assert!(!config.can_deliver(&addr(2000), &addr(1000), 0, &rng));
383
384        // Messages between non-crashed nodes allowed
385        assert!(config.can_deliver(&addr(2000), &addr(3000), 0, &rng));
386    }
387
388    #[test]
389    fn test_crash_recovery() {
390        let mut config = FaultConfig::new();
391        config.crash_node(addr(1000));
392
393        assert!(config.is_crashed(&addr(1000)));
394
395        config.recover_node(&addr(1000));
396
397        assert!(!config.is_crashed(&addr(1000)));
398    }
399
400    #[test]
401    fn test_partition_cleanup() {
402        let mut config = FaultConfig::new();
403
404        let mut side_a = HashSet::new();
405        side_a.insert(addr(1000));
406
407        let mut side_b = HashSet::new();
408        side_b.insert(addr(2000));
409
410        config.add_partition(
411            Partition::new(side_a.clone(), side_b.clone())
412                .with_duration(0, Duration::from_nanos(100)),
413        );
414
415        config.add_partition(Partition::new(side_a, side_b).permanent(0));
416
417        assert_eq!(config.partitions.len(), 2);
418
419        config.cleanup_healed_partitions(200);
420
421        assert_eq!(config.partitions.len(), 1); // Only permanent remains
422    }
423
424    #[test]
425    fn test_builder() {
426        let mut side_a = HashSet::new();
427        side_a.insert(addr(1000));
428
429        let mut side_b = HashSet::new();
430        side_b.insert(addr(2000));
431
432        let config = FaultConfig::builder()
433            .message_loss_rate(0.1)
434            .latency_range(Duration::from_millis(10)..Duration::from_millis(50))
435            .partition(Partition::new(side_a, side_b))
436            .crashed_node(addr(3000))
437            .node_crash_rate(0.01)
438            .build();
439
440        assert_eq!(config.message_loss_rate, 0.1);
441        assert!(config.latency_range.is_some());
442        assert_eq!(config.partitions.len(), 1);
443        assert!(config.crashed_nodes.contains(&addr(3000)));
444        assert_eq!(config.node_crash_rate, 0.01);
445    }
446}