use std::collections::VecDeque;
pub struct SimMessage {
pub from_shard: u16,
pub to_shard: u16,
pub payload: Vec<u8>,
pub deliver_at_ns: u64,
}
pub struct SimNetwork {
in_flight: VecDeque<SimMessage>,
min_delay_ns: u64,
max_delay_ns: u64,
drop_rate: f64,
partition: Option<(u16, u16)>,
delivered: Vec<SimMessage>,
}
impl SimNetwork {
pub fn new(min_delay_ns: u64, max_delay_ns: u64) -> Self {
assert!(
min_delay_ns <= max_delay_ns,
"min_delay_ns must be <= max_delay_ns"
);
Self {
in_flight: VecDeque::new(),
min_delay_ns,
max_delay_ns,
drop_rate: 0.0,
partition: None,
delivered: Vec::new(),
}
}
pub fn send(
&mut self,
from: u16,
to: u16,
payload: Vec<u8>,
now_ns: u64,
rng: &mut impl FnMut() -> u64,
) {
if self.is_partitioned(from, to) {
return;
}
let drop_threshold = (self.drop_rate * u64::MAX as f64) as u64;
if drop_threshold > 0 && rng() < drop_threshold {
return;
}
let delay = if self.min_delay_ns == self.max_delay_ns {
self.min_delay_ns
} else {
let range = self.max_delay_ns - self.min_delay_ns;
self.min_delay_ns + rng() % range
};
self.in_flight.push_back(SimMessage {
from_shard: from,
to_shard: to,
payload,
deliver_at_ns: now_ns + delay,
});
}
pub fn deliver_until(&mut self, now_ns: u64) -> Vec<SimMessage> {
self.delivered.clear();
let mut remaining = VecDeque::new();
while let Some(msg) = self.in_flight.pop_front() {
if msg.deliver_at_ns <= now_ns {
self.delivered.push(msg);
} else {
remaining.push_back(msg);
}
}
self.in_flight = remaining;
std::mem::take(&mut self.delivered)
}
pub fn set_drop_rate(&mut self, rate: f64) {
assert!(
(0.0..=1.0).contains(&rate),
"drop rate must be between 0.0 and 1.0"
);
self.drop_rate = rate;
}
pub fn partition(&mut self, shard_a: u16, shard_b: u16) {
self.partition = Some((shard_a, shard_b));
}
pub fn heal_partition(&mut self) {
self.partition = None;
}
pub fn in_flight_count(&self) -> usize {
self.in_flight.len()
}
fn is_partitioned(&self, from: u16, to: u16) -> bool {
match self.partition {
Some((a, b)) => (from == a && to == b) || (from == b && to == a),
None => false,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn fixed_rng(value: u64) -> impl FnMut() -> u64 {
move || value
}
#[test]
fn send_and_deliver() {
let mut net = SimNetwork::new(100, 200);
net.send(0, 1, vec![1, 2, 3], 1000, &mut fixed_rng(50));
let delivered = net.deliver_until(1000);
assert!(delivered.is_empty());
let delivered = net.deliver_until(1200);
assert_eq!(delivered.len(), 1);
assert_eq!(delivered[0].from_shard, 0);
assert_eq!(delivered[0].to_shard, 1);
assert_eq!(delivered[0].payload, vec![1, 2, 3]);
}
#[test]
fn partition_drops_messages() {
let mut net = SimNetwork::new(0, 0);
net.partition(0, 1);
net.send(0, 1, vec![1], 0, &mut fixed_rng(0));
net.send(1, 0, vec![2], 0, &mut fixed_rng(0));
net.send(0, 2, vec![3], 0, &mut fixed_rng(0));
let delivered = net.deliver_until(u64::MAX);
assert_eq!(delivered.len(), 1);
assert_eq!(delivered[0].payload, vec![3]);
}
#[test]
fn heal_partition_restores() {
let mut net = SimNetwork::new(0, 0);
net.partition(0, 1);
net.send(0, 1, vec![1], 0, &mut fixed_rng(0));
assert_eq!(net.in_flight_count(), 0);
net.heal_partition();
net.send(0, 1, vec![2], 0, &mut fixed_rng(0));
assert_eq!(net.in_flight_count(), 1);
}
#[test]
fn drop_rate_drops_messages() {
let mut net = SimNetwork::new(0, 0);
net.set_drop_rate(1.0);
net.send(0, 1, vec![1], 0, &mut fixed_rng(0));
assert_eq!(net.in_flight_count(), 0);
}
#[test]
fn zero_drop_rate_delivers_all() {
let mut net = SimNetwork::new(0, 0);
net.set_drop_rate(0.0);
for i in 0..10u8 {
net.send(0, 1, vec![i], 0, &mut fixed_rng(u64::MAX));
}
assert_eq!(net.in_flight_count(), 10);
}
#[test]
fn delivery_ordering() {
let mut net = SimNetwork::new(0, 1000);
let delays = [100u64, 500, 300];
let mut idx = 0;
let mut delay_rng = || {
let val = delays[idx];
idx += 1;
val
};
net.send(0, 1, vec![1], 0, &mut delay_rng);
net.send(0, 1, vec![2], 0, &mut delay_rng);
net.send(0, 1, vec![3], 0, &mut delay_rng);
assert_eq!(net.in_flight_count(), 3);
let msgs = net.deliver_until(150);
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].payload, vec![1]);
let msgs = net.deliver_until(400);
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].payload, vec![3]);
let msgs = net.deliver_until(600);
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].payload, vec![2]);
}
#[test]
fn in_flight_count_tracks() {
let mut net = SimNetwork::new(100, 100);
assert_eq!(net.in_flight_count(), 0);
net.send(0, 1, vec![1], 0, &mut fixed_rng(0));
net.send(0, 1, vec![2], 0, &mut fixed_rng(0));
assert_eq!(net.in_flight_count(), 2);
net.deliver_until(100);
assert_eq!(net.in_flight_count(), 0);
}
}