vortex-sim 0.1.0

Simulated I/O implementations (network, storage, clock) for Vortex
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
//! Simulated network with configurable latency, drops, partitions, and reordering.
//!
//! Every message delivery is deterministic given the same seed. The network
//! supports symmetric/asymmetric partitions, jitter, duplication, and more.

use std::collections::{HashMap, HashSet, VecDeque};
use vortex_core::{DetRng, NodeId};

/// Configuration for a simulated network link between two nodes.
#[derive(Debug, Clone)]
pub struct LinkConfig {
    /// One-way latency in simulated ticks.
    pub latency_ticks: u64,
    /// Drop probability (0.0 = never, 1.0 = always).
    pub drop_probability: f64,
    /// Jitter in ticks: delivery = latency ± rand(0..jitter).
    pub jitter_ticks: u64,
    /// If true, messages may be reordered due to jitter.
    pub reorder: bool,
    /// Probability that a message is delivered twice.
    pub duplicate_probability: f64,
    /// Probability that a message is corrupted (random byte flip).
    pub corrupt_probability: f64,
    /// Maximum messages per tick on this link (0 = unlimited).
    pub bandwidth_limit: u64,
}

impl Default for LinkConfig {
    fn default() -> Self {
        Self {
            latency_ticks: 1,
            drop_probability: 0.0,
            jitter_ticks: 0,
            reorder: false,
            duplicate_probability: 0.0,
            corrupt_probability: 0.0,
            bandwidth_limit: 0,
        }
    }
}

/// A message in-flight in the simulated network.
#[derive(Debug, Clone)]
pub struct InFlightMessage {
    /// Source node.
    pub from: NodeId,
    /// Destination node.
    pub to: NodeId,
    /// Message payload.
    pub payload: Vec<u8>,
    /// Tick at which this message should be delivered.
    pub deliver_at: u64,
}

/// Simulated network that intercepts all inter-node communication.
///
/// Fully deterministic given the same seed. Supports partitions, drops,
/// latency, jitter, reordering, and message duplication.
pub struct SimNetwork {
    current_tick: u64,
    in_flight: VecDeque<InFlightMessage>,
    link_configs: HashMap<(NodeId, NodeId), LinkConfig>,
    partitions: HashSet<(NodeId, NodeId)>,
    delivered: HashMap<NodeId, VecDeque<(NodeId, Vec<u8>)>>,
    rng: DetRng,
    total_sent: u64,
    total_dropped: u64,
    total_delivered: u64,
    total_corrupted: u64,
    total_throttled: u64,
    /// Messages delivered per link per tick (for bandwidth limiting).
    tick_delivery_counts: HashMap<(NodeId, NodeId), u64>,
}

impl SimNetwork {
    /// Create a new simulated network with the given random seed.
    pub fn new(seed: u64) -> Self {
        Self {
            current_tick: 0,
            in_flight: VecDeque::new(),
            link_configs: HashMap::new(),
            partitions: HashSet::new(),
            delivered: HashMap::new(),
            rng: DetRng::derive(seed, "network"),
            total_sent: 0,
            total_dropped: 0,
            total_delivered: 0,
            total_corrupted: 0,
            total_throttled: 0,
            tick_delivery_counts: HashMap::new(),
        }
    }

    /// Set the link configuration between two nodes.
    pub fn set_link_config(&mut self, from: NodeId, to: NodeId, config: LinkConfig) {
        self.link_configs.insert((from, to), config);
    }

    /// Configure all links between the given nodes symmetrically.
    pub fn set_all_links(&mut self, nodes: &[NodeId], config: LinkConfig) {
        for &a in nodes {
            for &b in nodes {
                if a != b {
                    self.link_configs.insert((a, b), config.clone());
                }
            }
        }
    }

    /// Inject a symmetric partition: all messages between a and b are dropped.
    pub fn inject_partition(&mut self, a: NodeId, b: NodeId) {
        self.partitions.insert((a, b));
        self.partitions.insert((b, a));
    }

    /// Heal a symmetric partition.
    pub fn heal_partition(&mut self, a: NodeId, b: NodeId) {
        self.partitions.remove(&(a, b));
        self.partitions.remove(&(b, a));
    }

    /// Inject a one-way partition: from→to dropped, to→from delivered.
    pub fn inject_one_way_partition(&mut self, from: NodeId, to: NodeId) {
        self.partitions.insert((from, to));
    }

    /// Heal a one-way partition.
    pub fn heal_one_way_partition(&mut self, from: NodeId, to: NodeId) {
        self.partitions.remove(&(from, to));
    }

    /// Heal all partitions.
    pub fn heal_all_partitions(&mut self) {
        self.partitions.clear();
    }

    /// Submit a message into the network.
    pub fn send(&mut self, from: NodeId, to: NodeId, payload: Vec<u8>) {
        self.total_sent += 1;

        // Check partition
        if self.partitions.contains(&(from, to)) {
            self.total_dropped += 1;
            return;
        }

        // Get link config
        let config = self
            .link_configs
            .get(&(from, to))
            .cloned()
            .unwrap_or_default();

        // Random drop
        if config.drop_probability > 0.0 && self.rng.next_f64() < config.drop_probability {
            self.total_dropped += 1;
            return;
        }

        // Calculate delivery time with jitter
        let base_latency = config.latency_ticks as i64;
        let jitter = if config.jitter_ticks > 0 {
            let j = config.jitter_ticks as i64;
            let range = 2 * j + 1;
            (self.rng.next_f64() * range as f64) as i64 - j
        } else {
            0
        };
        let effective_latency = (base_latency + jitter).max(1) as u64;
        let deliver_at = self.current_tick + effective_latency;

        // Check for duplication
        let should_duplicate = config.duplicate_probability > 0.0
            && self.rng.next_f64() < config.duplicate_probability;

        self.in_flight.push_back(InFlightMessage {
            from,
            to,
            payload: payload.clone(),
            deliver_at,
        });

        if should_duplicate {
            let dup_delay = (self.rng.next_f64() * 5.0) as u64 + 1;
            self.in_flight.push_back(InFlightMessage {
                from,
                to,
                payload,
                deliver_at: deliver_at + dup_delay,
            });
        }
    }

    /// Advance by one tick, delivering any due messages.
    pub fn tick(&mut self) {
        self.current_tick += 1;
        self.tick_delivery_counts.clear();

        let mut still_in_flight = VecDeque::new();
        while let Some(msg) = self.in_flight.pop_front() {
            if msg.deliver_at <= self.current_tick {
                // Check partition at delivery time
                if self.partitions.contains(&(msg.from, msg.to)) {
                    self.total_dropped += 1;
                    continue;
                }

                // Check bandwidth limit
                let config = self
                    .link_configs
                    .get(&(msg.from, msg.to))
                    .cloned()
                    .unwrap_or_default();
                if config.bandwidth_limit > 0 {
                    let count = self
                        .tick_delivery_counts
                        .entry((msg.from, msg.to))
                        .or_insert(0);
                    if *count >= config.bandwidth_limit {
                        // Re-queue for next tick
                        still_in_flight.push_back(InFlightMessage {
                            deliver_at: self.current_tick + 1,
                            ..msg
                        });
                        self.total_throttled += 1;
                        continue;
                    }
                    *count += 1;
                }

                // Apply corruption
                let payload = if config.corrupt_probability > 0.0
                    && self.rng.next_f64() < config.corrupt_probability
                {
                    let mut corrupted = msg.payload;
                    if !corrupted.is_empty() {
                        let idx = self.rng.next_u64_below(corrupted.len() as u64) as usize;
                        corrupted[idx] ^= 1u8 << (self.rng.next_u64_below(8) as u8);
                    }
                    self.total_corrupted += 1;
                    corrupted
                } else {
                    msg.payload
                };

                self.delivered
                    .entry(msg.to)
                    .or_default()
                    .push_back((msg.from, payload));
                self.total_delivered += 1;
            } else {
                still_in_flight.push_back(msg);
            }
        }
        self.in_flight = still_in_flight;
    }

    /// Drain delivered messages for a specific node.
    pub fn drain(&mut self, node_id: NodeId) -> Vec<(NodeId, Vec<u8>)> {
        self.delivered
            .get_mut(&node_id)
            .map(|q| q.drain(..).collect())
            .unwrap_or_default()
    }

    /// Current simulation tick.
    pub fn current_tick(&self) -> u64 {
        self.current_tick
    }

    /// Number of messages currently in-flight.
    pub fn in_flight_count(&self) -> usize {
        self.in_flight.len()
    }

    /// Stats: total messages sent.
    pub fn total_sent(&self) -> u64 {
        self.total_sent
    }

    /// Stats: total messages dropped.
    pub fn total_dropped(&self) -> u64 {
        self.total_dropped
    }

    /// Stats: total messages delivered.
    pub fn total_delivered(&self) -> u64 {
        self.total_delivered
    }

    /// Stats: total messages corrupted.
    pub fn total_corrupted(&self) -> u64 {
        self.total_corrupted
    }

    /// Stats: total messages throttled (delayed due to bandwidth limit).
    pub fn total_throttled(&self) -> u64 {
        self.total_throttled
    }

    /// Get all active partition pairs.
    pub fn partition_pairs(&self) -> Vec<(NodeId, NodeId)> {
        self.partitions.iter().copied().collect()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_basic_delivery() {
        let mut net = SimNetwork::new(42);
        net.send(1, 2, b"hello".to_vec());
        assert!(net.drain(2).is_empty()); // Not yet delivered
        net.tick();
        let msgs = net.drain(2);
        assert_eq!(msgs.len(), 1);
        assert_eq!(msgs[0].0, 1);
        assert_eq!(msgs[0].1, b"hello");
    }

    #[test]
    fn test_partition() {
        let mut net = SimNetwork::new(42);
        net.inject_partition(1, 2);
        net.send(1, 2, b"lost".to_vec());
        net.tick();
        assert!(net.drain(2).is_empty());

        net.heal_partition(1, 2);
        net.send(1, 2, b"found".to_vec());
        net.tick();
        assert_eq!(net.drain(2).len(), 1);
    }

    #[test]
    fn test_one_way_partition() {
        let mut net = SimNetwork::new(42);
        net.inject_one_way_partition(1, 2);
        net.send(1, 2, b"blocked".to_vec());
        net.send(2, 1, b"ok".to_vec());
        net.tick();
        assert!(net.drain(2).is_empty());
        assert_eq!(net.drain(1).len(), 1);
    }

    #[test]
    fn test_latency() {
        let mut net = SimNetwork::new(42);
        net.set_link_config(
            1,
            2,
            LinkConfig {
                latency_ticks: 5,
                ..Default::default()
            },
        );
        net.send(1, 2, b"delayed".to_vec());
        for _ in 0..4 {
            net.tick();
            assert!(net.drain(2).is_empty());
        }
        net.tick();
        assert_eq!(net.drain(2).len(), 1);
    }

    #[test]
    fn test_deterministic() {
        let mut net1 = SimNetwork::new(100);
        let mut net2 = SimNetwork::new(100);
        let config = LinkConfig {
            drop_probability: 0.5,
            ..Default::default()
        };
        for net in [&mut net1, &mut net2] {
            net.set_link_config(1, 2, config.clone());
            for i in 0..10 {
                net.send(1, 2, vec![i]);
            }
            net.tick();
        }
        assert_eq!(net1.drain(2).len(), net2.drain(2).len());
    }

    #[test]
    fn test_jitter_varies_delivery() {
        let mut net = SimNetwork::new(42);
        net.set_link_config(
            1,
            2,
            LinkConfig {
                latency_ticks: 10,
                jitter_ticks: 5,
                reorder: true,
                ..Default::default()
            },
        );
        for i in 0..20u8 {
            net.send(1, 2, vec![i]);
        }
        let mut delivery_ticks = Vec::new();
        for tick in 1..=20 {
            net.tick();
            let count = net.drain(2).len();
            for _ in 0..count {
                delivery_ticks.push(tick);
            }
        }
        assert_eq!(delivery_ticks.len(), 20);
        let first = delivery_ticks[0];
        assert!(
            delivery_ticks.iter().any(|&t| t != first),
            "jitter should vary delivery"
        );
    }

    #[test]
    fn test_duplication() {
        let mut net = SimNetwork::new(42);
        net.set_link_config(
            1,
            2,
            LinkConfig {
                duplicate_probability: 1.0,
                ..Default::default()
            },
        );
        net.send(1, 2, b"hello".to_vec());
        for _ in 0..10 {
            net.tick();
        }
        assert_eq!(net.drain(2).len(), 2);
    }

    #[test]
    fn test_corruption() {
        let mut net = SimNetwork::new(42);
        net.set_link_config(
            1,
            2,
            LinkConfig {
                corrupt_probability: 1.0,
                ..Default::default()
            },
        );
        net.send(1, 2, b"hello".to_vec());
        net.tick();
        let msgs = net.drain(2);
        assert_eq!(msgs.len(), 1);
        // Message should be corrupted (bit flipped)
        assert_ne!(msgs[0].1, b"hello");
        assert_eq!(net.total_corrupted(), 1);
    }

    #[test]
    fn test_bandwidth_limit() {
        let mut net = SimNetwork::new(42);
        net.set_link_config(
            1,
            2,
            LinkConfig {
                bandwidth_limit: 2,
                ..Default::default()
            },
        );
        // Send 5 messages
        for i in 0..5u8 {
            net.send(1, 2, vec![i]);
        }
        // First tick: only 2 delivered due to bandwidth limit
        net.tick();
        assert_eq!(net.drain(2).len(), 2);
        // Second tick: 2 more
        net.tick();
        assert_eq!(net.drain(2).len(), 2);
        // Third tick: last 1
        net.tick();
        assert_eq!(net.drain(2).len(), 1);
        assert!(net.total_throttled() >= 3);
    }

    #[test]
    fn test_stats() {
        let mut net = SimNetwork::new(42);
        net.inject_partition(1, 2);
        net.send(1, 2, b"dropped".to_vec()); // dropped (1→2 partitioned)
        net.send(2, 1, b"also_dropped".to_vec()); // dropped (symmetric: 2→1 also partitioned)
        net.send(3, 1, b"ok".to_vec()); // delivered
        net.tick();
        assert_eq!(net.total_sent(), 3);
        assert_eq!(net.total_dropped(), 2);
        assert_eq!(net.total_delivered(), 1);
    }
}