Skip to main content

vortex_sim/
network.rs

1//! Simulated network with configurable latency, drops, partitions, and reordering.
2//!
3//! Every message delivery is deterministic given the same seed. The network
4//! supports symmetric/asymmetric partitions, jitter, duplication, and more.
5
6use std::collections::{HashMap, HashSet, VecDeque};
7use vortex_core::{DetRng, NodeId};
8
9/// Configuration for a simulated network link between two nodes.
10#[derive(Debug, Clone)]
11pub struct LinkConfig {
12    /// One-way latency in simulated ticks.
13    pub latency_ticks: u64,
14    /// Drop probability (0.0 = never, 1.0 = always).
15    pub drop_probability: f64,
16    /// Jitter in ticks: delivery = latency ± rand(0..jitter).
17    pub jitter_ticks: u64,
18    /// If true, messages may be reordered due to jitter.
19    pub reorder: bool,
20    /// Probability that a message is delivered twice.
21    pub duplicate_probability: f64,
22    /// Probability that a message is corrupted (random byte flip).
23    pub corrupt_probability: f64,
24    /// Maximum messages per tick on this link (0 = unlimited).
25    pub bandwidth_limit: u64,
26}
27
28impl Default for LinkConfig {
29    fn default() -> Self {
30        Self {
31            latency_ticks: 1,
32            drop_probability: 0.0,
33            jitter_ticks: 0,
34            reorder: false,
35            duplicate_probability: 0.0,
36            corrupt_probability: 0.0,
37            bandwidth_limit: 0,
38        }
39    }
40}
41
42/// A message in-flight in the simulated network.
43#[derive(Debug, Clone)]
44pub struct InFlightMessage {
45    /// Source node.
46    pub from: NodeId,
47    /// Destination node.
48    pub to: NodeId,
49    /// Message payload.
50    pub payload: Vec<u8>,
51    /// Tick at which this message should be delivered.
52    pub deliver_at: u64,
53}
54
55/// Simulated network that intercepts all inter-node communication.
56///
57/// Fully deterministic given the same seed. Supports partitions, drops,
58/// latency, jitter, reordering, and message duplication.
59pub struct SimNetwork {
60    current_tick: u64,
61    in_flight: VecDeque<InFlightMessage>,
62    link_configs: HashMap<(NodeId, NodeId), LinkConfig>,
63    partitions: HashSet<(NodeId, NodeId)>,
64    delivered: HashMap<NodeId, VecDeque<(NodeId, Vec<u8>)>>,
65    rng: DetRng,
66    total_sent: u64,
67    total_dropped: u64,
68    total_delivered: u64,
69    total_corrupted: u64,
70    total_throttled: u64,
71    /// Messages delivered per link per tick (for bandwidth limiting).
72    tick_delivery_counts: HashMap<(NodeId, NodeId), u64>,
73}
74
75impl SimNetwork {
76    /// Create a new simulated network with the given random seed.
77    pub fn new(seed: u64) -> Self {
78        Self {
79            current_tick: 0,
80            in_flight: VecDeque::new(),
81            link_configs: HashMap::new(),
82            partitions: HashSet::new(),
83            delivered: HashMap::new(),
84            rng: DetRng::derive(seed, "network"),
85            total_sent: 0,
86            total_dropped: 0,
87            total_delivered: 0,
88            total_corrupted: 0,
89            total_throttled: 0,
90            tick_delivery_counts: HashMap::new(),
91        }
92    }
93
94    /// Set the link configuration between two nodes.
95    pub fn set_link_config(&mut self, from: NodeId, to: NodeId, config: LinkConfig) {
96        self.link_configs.insert((from, to), config);
97    }
98
99    /// Configure all links between the given nodes symmetrically.
100    pub fn set_all_links(&mut self, nodes: &[NodeId], config: LinkConfig) {
101        for &a in nodes {
102            for &b in nodes {
103                if a != b {
104                    self.link_configs.insert((a, b), config.clone());
105                }
106            }
107        }
108    }
109
110    /// Inject a symmetric partition: all messages between a and b are dropped.
111    pub fn inject_partition(&mut self, a: NodeId, b: NodeId) {
112        self.partitions.insert((a, b));
113        self.partitions.insert((b, a));
114    }
115
116    /// Heal a symmetric partition.
117    pub fn heal_partition(&mut self, a: NodeId, b: NodeId) {
118        self.partitions.remove(&(a, b));
119        self.partitions.remove(&(b, a));
120    }
121
122    /// Inject a one-way partition: from→to dropped, to→from delivered.
123    pub fn inject_one_way_partition(&mut self, from: NodeId, to: NodeId) {
124        self.partitions.insert((from, to));
125    }
126
127    /// Heal a one-way partition.
128    pub fn heal_one_way_partition(&mut self, from: NodeId, to: NodeId) {
129        self.partitions.remove(&(from, to));
130    }
131
132    /// Heal all partitions.
133    pub fn heal_all_partitions(&mut self) {
134        self.partitions.clear();
135    }
136
137    /// Submit a message into the network.
138    pub fn send(&mut self, from: NodeId, to: NodeId, payload: Vec<u8>) {
139        self.total_sent += 1;
140
141        // Check partition
142        if self.partitions.contains(&(from, to)) {
143            self.total_dropped += 1;
144            return;
145        }
146
147        // Get link config
148        let config = self
149            .link_configs
150            .get(&(from, to))
151            .cloned()
152            .unwrap_or_default();
153
154        // Random drop
155        if config.drop_probability > 0.0 && self.rng.next_f64() < config.drop_probability {
156            self.total_dropped += 1;
157            return;
158        }
159
160        // Calculate delivery time with jitter
161        let base_latency = config.latency_ticks as i64;
162        let jitter = if config.jitter_ticks > 0 {
163            let j = config.jitter_ticks as i64;
164            let range = 2 * j + 1;
165            (self.rng.next_f64() * range as f64) as i64 - j
166        } else {
167            0
168        };
169        let effective_latency = (base_latency + jitter).max(1) as u64;
170        let deliver_at = self.current_tick + effective_latency;
171
172        // Check for duplication
173        let should_duplicate = config.duplicate_probability > 0.0
174            && self.rng.next_f64() < config.duplicate_probability;
175
176        self.in_flight.push_back(InFlightMessage {
177            from,
178            to,
179            payload: payload.clone(),
180            deliver_at,
181        });
182
183        if should_duplicate {
184            let dup_delay = (self.rng.next_f64() * 5.0) as u64 + 1;
185            self.in_flight.push_back(InFlightMessage {
186                from,
187                to,
188                payload,
189                deliver_at: deliver_at + dup_delay,
190            });
191        }
192    }
193
194    /// Advance by one tick, delivering any due messages.
195    pub fn tick(&mut self) {
196        self.current_tick += 1;
197        self.tick_delivery_counts.clear();
198
199        let mut still_in_flight = VecDeque::new();
200        while let Some(msg) = self.in_flight.pop_front() {
201            if msg.deliver_at <= self.current_tick {
202                // Check partition at delivery time
203                if self.partitions.contains(&(msg.from, msg.to)) {
204                    self.total_dropped += 1;
205                    continue;
206                }
207
208                // Check bandwidth limit
209                let config = self
210                    .link_configs
211                    .get(&(msg.from, msg.to))
212                    .cloned()
213                    .unwrap_or_default();
214                if config.bandwidth_limit > 0 {
215                    let count = self
216                        .tick_delivery_counts
217                        .entry((msg.from, msg.to))
218                        .or_insert(0);
219                    if *count >= config.bandwidth_limit {
220                        // Re-queue for next tick
221                        still_in_flight.push_back(InFlightMessage {
222                            deliver_at: self.current_tick + 1,
223                            ..msg
224                        });
225                        self.total_throttled += 1;
226                        continue;
227                    }
228                    *count += 1;
229                }
230
231                // Apply corruption
232                let payload = if config.corrupt_probability > 0.0
233                    && self.rng.next_f64() < config.corrupt_probability
234                {
235                    let mut corrupted = msg.payload;
236                    if !corrupted.is_empty() {
237                        let idx = self.rng.next_u64_below(corrupted.len() as u64) as usize;
238                        corrupted[idx] ^= 1u8 << (self.rng.next_u64_below(8) as u8);
239                    }
240                    self.total_corrupted += 1;
241                    corrupted
242                } else {
243                    msg.payload
244                };
245
246                self.delivered
247                    .entry(msg.to)
248                    .or_default()
249                    .push_back((msg.from, payload));
250                self.total_delivered += 1;
251            } else {
252                still_in_flight.push_back(msg);
253            }
254        }
255        self.in_flight = still_in_flight;
256    }
257
258    /// Drain delivered messages for a specific node.
259    pub fn drain(&mut self, node_id: NodeId) -> Vec<(NodeId, Vec<u8>)> {
260        self.delivered
261            .get_mut(&node_id)
262            .map(|q| q.drain(..).collect())
263            .unwrap_or_default()
264    }
265
266    /// Current simulation tick.
267    pub fn current_tick(&self) -> u64 {
268        self.current_tick
269    }
270
271    /// Number of messages currently in-flight.
272    pub fn in_flight_count(&self) -> usize {
273        self.in_flight.len()
274    }
275
276    /// Stats: total messages sent.
277    pub fn total_sent(&self) -> u64 {
278        self.total_sent
279    }
280
281    /// Stats: total messages dropped.
282    pub fn total_dropped(&self) -> u64 {
283        self.total_dropped
284    }
285
286    /// Stats: total messages delivered.
287    pub fn total_delivered(&self) -> u64 {
288        self.total_delivered
289    }
290
291    /// Stats: total messages corrupted.
292    pub fn total_corrupted(&self) -> u64 {
293        self.total_corrupted
294    }
295
296    /// Stats: total messages throttled (delayed due to bandwidth limit).
297    pub fn total_throttled(&self) -> u64 {
298        self.total_throttled
299    }
300
301    /// Get all active partition pairs.
302    pub fn partition_pairs(&self) -> Vec<(NodeId, NodeId)> {
303        self.partitions.iter().copied().collect()
304    }
305}
306
307#[cfg(test)]
308mod tests {
309    use super::*;
310
311    #[test]
312    fn test_basic_delivery() {
313        let mut net = SimNetwork::new(42);
314        net.send(1, 2, b"hello".to_vec());
315        assert!(net.drain(2).is_empty()); // Not yet delivered
316        net.tick();
317        let msgs = net.drain(2);
318        assert_eq!(msgs.len(), 1);
319        assert_eq!(msgs[0].0, 1);
320        assert_eq!(msgs[0].1, b"hello");
321    }
322
323    #[test]
324    fn test_partition() {
325        let mut net = SimNetwork::new(42);
326        net.inject_partition(1, 2);
327        net.send(1, 2, b"lost".to_vec());
328        net.tick();
329        assert!(net.drain(2).is_empty());
330
331        net.heal_partition(1, 2);
332        net.send(1, 2, b"found".to_vec());
333        net.tick();
334        assert_eq!(net.drain(2).len(), 1);
335    }
336
337    #[test]
338    fn test_one_way_partition() {
339        let mut net = SimNetwork::new(42);
340        net.inject_one_way_partition(1, 2);
341        net.send(1, 2, b"blocked".to_vec());
342        net.send(2, 1, b"ok".to_vec());
343        net.tick();
344        assert!(net.drain(2).is_empty());
345        assert_eq!(net.drain(1).len(), 1);
346    }
347
348    #[test]
349    fn test_latency() {
350        let mut net = SimNetwork::new(42);
351        net.set_link_config(
352            1,
353            2,
354            LinkConfig {
355                latency_ticks: 5,
356                ..Default::default()
357            },
358        );
359        net.send(1, 2, b"delayed".to_vec());
360        for _ in 0..4 {
361            net.tick();
362            assert!(net.drain(2).is_empty());
363        }
364        net.tick();
365        assert_eq!(net.drain(2).len(), 1);
366    }
367
368    #[test]
369    fn test_deterministic() {
370        let mut net1 = SimNetwork::new(100);
371        let mut net2 = SimNetwork::new(100);
372        let config = LinkConfig {
373            drop_probability: 0.5,
374            ..Default::default()
375        };
376        for net in [&mut net1, &mut net2] {
377            net.set_link_config(1, 2, config.clone());
378            for i in 0..10 {
379                net.send(1, 2, vec![i]);
380            }
381            net.tick();
382        }
383        assert_eq!(net1.drain(2).len(), net2.drain(2).len());
384    }
385
386    #[test]
387    fn test_jitter_varies_delivery() {
388        let mut net = SimNetwork::new(42);
389        net.set_link_config(
390            1,
391            2,
392            LinkConfig {
393                latency_ticks: 10,
394                jitter_ticks: 5,
395                reorder: true,
396                ..Default::default()
397            },
398        );
399        for i in 0..20u8 {
400            net.send(1, 2, vec![i]);
401        }
402        let mut delivery_ticks = Vec::new();
403        for tick in 1..=20 {
404            net.tick();
405            let count = net.drain(2).len();
406            for _ in 0..count {
407                delivery_ticks.push(tick);
408            }
409        }
410        assert_eq!(delivery_ticks.len(), 20);
411        let first = delivery_ticks[0];
412        assert!(
413            delivery_ticks.iter().any(|&t| t != first),
414            "jitter should vary delivery"
415        );
416    }
417
418    #[test]
419    fn test_duplication() {
420        let mut net = SimNetwork::new(42);
421        net.set_link_config(
422            1,
423            2,
424            LinkConfig {
425                duplicate_probability: 1.0,
426                ..Default::default()
427            },
428        );
429        net.send(1, 2, b"hello".to_vec());
430        for _ in 0..10 {
431            net.tick();
432        }
433        assert_eq!(net.drain(2).len(), 2);
434    }
435
436    #[test]
437    fn test_corruption() {
438        let mut net = SimNetwork::new(42);
439        net.set_link_config(
440            1,
441            2,
442            LinkConfig {
443                corrupt_probability: 1.0,
444                ..Default::default()
445            },
446        );
447        net.send(1, 2, b"hello".to_vec());
448        net.tick();
449        let msgs = net.drain(2);
450        assert_eq!(msgs.len(), 1);
451        // Message should be corrupted (bit flipped)
452        assert_ne!(msgs[0].1, b"hello");
453        assert_eq!(net.total_corrupted(), 1);
454    }
455
456    #[test]
457    fn test_bandwidth_limit() {
458        let mut net = SimNetwork::new(42);
459        net.set_link_config(
460            1,
461            2,
462            LinkConfig {
463                bandwidth_limit: 2,
464                ..Default::default()
465            },
466        );
467        // Send 5 messages
468        for i in 0..5u8 {
469            net.send(1, 2, vec![i]);
470        }
471        // First tick: only 2 delivered due to bandwidth limit
472        net.tick();
473        assert_eq!(net.drain(2).len(), 2);
474        // Second tick: 2 more
475        net.tick();
476        assert_eq!(net.drain(2).len(), 2);
477        // Third tick: last 1
478        net.tick();
479        assert_eq!(net.drain(2).len(), 1);
480        assert!(net.total_throttled() >= 3);
481    }
482
483    #[test]
484    fn test_stats() {
485        let mut net = SimNetwork::new(42);
486        net.inject_partition(1, 2);
487        net.send(1, 2, b"dropped".to_vec()); // dropped (1→2 partitioned)
488        net.send(2, 1, b"also_dropped".to_vec()); // dropped (symmetric: 2→1 also partitioned)
489        net.send(3, 1, b"ok".to_vec()); // delivered
490        net.tick();
491        assert_eq!(net.total_sent(), 3);
492        assert_eq!(net.total_dropped(), 2);
493        assert_eq!(net.total_delivered(), 1);
494    }
495}