1use std::collections::{HashMap, HashSet, VecDeque};
7use vortex_core::{DetRng, NodeId};
8
9#[derive(Debug, Clone)]
11pub struct LinkConfig {
12 pub latency_ticks: u64,
14 pub drop_probability: f64,
16 pub jitter_ticks: u64,
18 pub reorder: bool,
20 pub duplicate_probability: f64,
22 pub corrupt_probability: f64,
24 pub bandwidth_limit: u64,
26}
27
28impl Default for LinkConfig {
29 fn default() -> Self {
30 Self {
31 latency_ticks: 1,
32 drop_probability: 0.0,
33 jitter_ticks: 0,
34 reorder: false,
35 duplicate_probability: 0.0,
36 corrupt_probability: 0.0,
37 bandwidth_limit: 0,
38 }
39 }
40}
41
42#[derive(Debug, Clone)]
44pub struct InFlightMessage {
45 pub from: NodeId,
47 pub to: NodeId,
49 pub payload: Vec<u8>,
51 pub deliver_at: u64,
53}
54
55pub struct SimNetwork {
60 current_tick: u64,
61 in_flight: VecDeque<InFlightMessage>,
62 link_configs: HashMap<(NodeId, NodeId), LinkConfig>,
63 partitions: HashSet<(NodeId, NodeId)>,
64 delivered: HashMap<NodeId, VecDeque<(NodeId, Vec<u8>)>>,
65 rng: DetRng,
66 total_sent: u64,
67 total_dropped: u64,
68 total_delivered: u64,
69 total_corrupted: u64,
70 total_throttled: u64,
71 tick_delivery_counts: HashMap<(NodeId, NodeId), u64>,
73}
74
75impl SimNetwork {
76 pub fn new(seed: u64) -> Self {
78 Self {
79 current_tick: 0,
80 in_flight: VecDeque::new(),
81 link_configs: HashMap::new(),
82 partitions: HashSet::new(),
83 delivered: HashMap::new(),
84 rng: DetRng::derive(seed, "network"),
85 total_sent: 0,
86 total_dropped: 0,
87 total_delivered: 0,
88 total_corrupted: 0,
89 total_throttled: 0,
90 tick_delivery_counts: HashMap::new(),
91 }
92 }
93
94 pub fn set_link_config(&mut self, from: NodeId, to: NodeId, config: LinkConfig) {
96 self.link_configs.insert((from, to), config);
97 }
98
99 pub fn set_all_links(&mut self, nodes: &[NodeId], config: LinkConfig) {
101 for &a in nodes {
102 for &b in nodes {
103 if a != b {
104 self.link_configs.insert((a, b), config.clone());
105 }
106 }
107 }
108 }
109
110 pub fn inject_partition(&mut self, a: NodeId, b: NodeId) {
112 self.partitions.insert((a, b));
113 self.partitions.insert((b, a));
114 }
115
116 pub fn heal_partition(&mut self, a: NodeId, b: NodeId) {
118 self.partitions.remove(&(a, b));
119 self.partitions.remove(&(b, a));
120 }
121
122 pub fn inject_one_way_partition(&mut self, from: NodeId, to: NodeId) {
124 self.partitions.insert((from, to));
125 }
126
127 pub fn heal_one_way_partition(&mut self, from: NodeId, to: NodeId) {
129 self.partitions.remove(&(from, to));
130 }
131
132 pub fn heal_all_partitions(&mut self) {
134 self.partitions.clear();
135 }
136
137 pub fn send(&mut self, from: NodeId, to: NodeId, payload: Vec<u8>) {
139 self.total_sent += 1;
140
141 if self.partitions.contains(&(from, to)) {
143 self.total_dropped += 1;
144 return;
145 }
146
147 let config = self
149 .link_configs
150 .get(&(from, to))
151 .cloned()
152 .unwrap_or_default();
153
154 if config.drop_probability > 0.0 && self.rng.next_f64() < config.drop_probability {
156 self.total_dropped += 1;
157 return;
158 }
159
160 let base_latency = config.latency_ticks as i64;
162 let jitter = if config.jitter_ticks > 0 {
163 let j = config.jitter_ticks as i64;
164 let range = 2 * j + 1;
165 (self.rng.next_f64() * range as f64) as i64 - j
166 } else {
167 0
168 };
169 let effective_latency = (base_latency + jitter).max(1) as u64;
170 let deliver_at = self.current_tick + effective_latency;
171
172 let should_duplicate = config.duplicate_probability > 0.0
174 && self.rng.next_f64() < config.duplicate_probability;
175
176 self.in_flight.push_back(InFlightMessage {
177 from,
178 to,
179 payload: payload.clone(),
180 deliver_at,
181 });
182
183 if should_duplicate {
184 let dup_delay = (self.rng.next_f64() * 5.0) as u64 + 1;
185 self.in_flight.push_back(InFlightMessage {
186 from,
187 to,
188 payload,
189 deliver_at: deliver_at + dup_delay,
190 });
191 }
192 }
193
194 pub fn tick(&mut self) {
196 self.current_tick += 1;
197 self.tick_delivery_counts.clear();
198
199 let mut still_in_flight = VecDeque::new();
200 while let Some(msg) = self.in_flight.pop_front() {
201 if msg.deliver_at <= self.current_tick {
202 if self.partitions.contains(&(msg.from, msg.to)) {
204 self.total_dropped += 1;
205 continue;
206 }
207
208 let config = self
210 .link_configs
211 .get(&(msg.from, msg.to))
212 .cloned()
213 .unwrap_or_default();
214 if config.bandwidth_limit > 0 {
215 let count = self
216 .tick_delivery_counts
217 .entry((msg.from, msg.to))
218 .or_insert(0);
219 if *count >= config.bandwidth_limit {
220 still_in_flight.push_back(InFlightMessage {
222 deliver_at: self.current_tick + 1,
223 ..msg
224 });
225 self.total_throttled += 1;
226 continue;
227 }
228 *count += 1;
229 }
230
231 let payload = if config.corrupt_probability > 0.0
233 && self.rng.next_f64() < config.corrupt_probability
234 {
235 let mut corrupted = msg.payload;
236 if !corrupted.is_empty() {
237 let idx = self.rng.next_u64_below(corrupted.len() as u64) as usize;
238 corrupted[idx] ^= 1u8 << (self.rng.next_u64_below(8) as u8);
239 }
240 self.total_corrupted += 1;
241 corrupted
242 } else {
243 msg.payload
244 };
245
246 self.delivered
247 .entry(msg.to)
248 .or_default()
249 .push_back((msg.from, payload));
250 self.total_delivered += 1;
251 } else {
252 still_in_flight.push_back(msg);
253 }
254 }
255 self.in_flight = still_in_flight;
256 }
257
258 pub fn drain(&mut self, node_id: NodeId) -> Vec<(NodeId, Vec<u8>)> {
260 self.delivered
261 .get_mut(&node_id)
262 .map(|q| q.drain(..).collect())
263 .unwrap_or_default()
264 }
265
266 pub fn current_tick(&self) -> u64 {
268 self.current_tick
269 }
270
271 pub fn in_flight_count(&self) -> usize {
273 self.in_flight.len()
274 }
275
276 pub fn total_sent(&self) -> u64 {
278 self.total_sent
279 }
280
281 pub fn total_dropped(&self) -> u64 {
283 self.total_dropped
284 }
285
286 pub fn total_delivered(&self) -> u64 {
288 self.total_delivered
289 }
290
291 pub fn total_corrupted(&self) -> u64 {
293 self.total_corrupted
294 }
295
296 pub fn total_throttled(&self) -> u64 {
298 self.total_throttled
299 }
300
301 pub fn partition_pairs(&self) -> Vec<(NodeId, NodeId)> {
303 self.partitions.iter().copied().collect()
304 }
305}
306
307#[cfg(test)]
308mod tests {
309 use super::*;
310
311 #[test]
312 fn test_basic_delivery() {
313 let mut net = SimNetwork::new(42);
314 net.send(1, 2, b"hello".to_vec());
315 assert!(net.drain(2).is_empty()); net.tick();
317 let msgs = net.drain(2);
318 assert_eq!(msgs.len(), 1);
319 assert_eq!(msgs[0].0, 1);
320 assert_eq!(msgs[0].1, b"hello");
321 }
322
323 #[test]
324 fn test_partition() {
325 let mut net = SimNetwork::new(42);
326 net.inject_partition(1, 2);
327 net.send(1, 2, b"lost".to_vec());
328 net.tick();
329 assert!(net.drain(2).is_empty());
330
331 net.heal_partition(1, 2);
332 net.send(1, 2, b"found".to_vec());
333 net.tick();
334 assert_eq!(net.drain(2).len(), 1);
335 }
336
337 #[test]
338 fn test_one_way_partition() {
339 let mut net = SimNetwork::new(42);
340 net.inject_one_way_partition(1, 2);
341 net.send(1, 2, b"blocked".to_vec());
342 net.send(2, 1, b"ok".to_vec());
343 net.tick();
344 assert!(net.drain(2).is_empty());
345 assert_eq!(net.drain(1).len(), 1);
346 }
347
348 #[test]
349 fn test_latency() {
350 let mut net = SimNetwork::new(42);
351 net.set_link_config(
352 1,
353 2,
354 LinkConfig {
355 latency_ticks: 5,
356 ..Default::default()
357 },
358 );
359 net.send(1, 2, b"delayed".to_vec());
360 for _ in 0..4 {
361 net.tick();
362 assert!(net.drain(2).is_empty());
363 }
364 net.tick();
365 assert_eq!(net.drain(2).len(), 1);
366 }
367
368 #[test]
369 fn test_deterministic() {
370 let mut net1 = SimNetwork::new(100);
371 let mut net2 = SimNetwork::new(100);
372 let config = LinkConfig {
373 drop_probability: 0.5,
374 ..Default::default()
375 };
376 for net in [&mut net1, &mut net2] {
377 net.set_link_config(1, 2, config.clone());
378 for i in 0..10 {
379 net.send(1, 2, vec![i]);
380 }
381 net.tick();
382 }
383 assert_eq!(net1.drain(2).len(), net2.drain(2).len());
384 }
385
386 #[test]
387 fn test_jitter_varies_delivery() {
388 let mut net = SimNetwork::new(42);
389 net.set_link_config(
390 1,
391 2,
392 LinkConfig {
393 latency_ticks: 10,
394 jitter_ticks: 5,
395 reorder: true,
396 ..Default::default()
397 },
398 );
399 for i in 0..20u8 {
400 net.send(1, 2, vec![i]);
401 }
402 let mut delivery_ticks = Vec::new();
403 for tick in 1..=20 {
404 net.tick();
405 let count = net.drain(2).len();
406 for _ in 0..count {
407 delivery_ticks.push(tick);
408 }
409 }
410 assert_eq!(delivery_ticks.len(), 20);
411 let first = delivery_ticks[0];
412 assert!(
413 delivery_ticks.iter().any(|&t| t != first),
414 "jitter should vary delivery"
415 );
416 }
417
418 #[test]
419 fn test_duplication() {
420 let mut net = SimNetwork::new(42);
421 net.set_link_config(
422 1,
423 2,
424 LinkConfig {
425 duplicate_probability: 1.0,
426 ..Default::default()
427 },
428 );
429 net.send(1, 2, b"hello".to_vec());
430 for _ in 0..10 {
431 net.tick();
432 }
433 assert_eq!(net.drain(2).len(), 2);
434 }
435
436 #[test]
437 fn test_corruption() {
438 let mut net = SimNetwork::new(42);
439 net.set_link_config(
440 1,
441 2,
442 LinkConfig {
443 corrupt_probability: 1.0,
444 ..Default::default()
445 },
446 );
447 net.send(1, 2, b"hello".to_vec());
448 net.tick();
449 let msgs = net.drain(2);
450 assert_eq!(msgs.len(), 1);
451 assert_ne!(msgs[0].1, b"hello");
453 assert_eq!(net.total_corrupted(), 1);
454 }
455
456 #[test]
457 fn test_bandwidth_limit() {
458 let mut net = SimNetwork::new(42);
459 net.set_link_config(
460 1,
461 2,
462 LinkConfig {
463 bandwidth_limit: 2,
464 ..Default::default()
465 },
466 );
467 for i in 0..5u8 {
469 net.send(1, 2, vec![i]);
470 }
471 net.tick();
473 assert_eq!(net.drain(2).len(), 2);
474 net.tick();
476 assert_eq!(net.drain(2).len(), 2);
477 net.tick();
479 assert_eq!(net.drain(2).len(), 1);
480 assert!(net.total_throttled() >= 3);
481 }
482
483 #[test]
484 fn test_stats() {
485 let mut net = SimNetwork::new(42);
486 net.inject_partition(1, 2);
487 net.send(1, 2, b"dropped".to_vec()); net.send(2, 1, b"also_dropped".to_vec()); net.send(3, 1, b"ok".to_vec()); net.tick();
491 assert_eq!(net.total_sent(), 3);
492 assert_eq!(net.total_dropped(), 2);
493 assert_eq!(net.total_delivered(), 1);
494 }
495}