mod latency;
mod node;
mod topology;
mod traced;
pub use latency::{
FixedLatency, LatencyModel, OverheadPlusJitter, PercentageJitter, SpikyLatency, UniformJitter,
};
pub use node::{MessageId, NodeId};
pub use topology::{Link, Route, Topology, TopologyBuilder};
pub use traced::{NetTraceDropReason, NetTraceEvent, TracedNetwork};
#[derive(Debug, Clone, Copy, PartialEq, Default)]
pub enum DropPolicy {
#[default]
TailDrop,
RandomEarlyDetection {
min_bytes: u64,
max_bytes: u64,
max_prob: f64,
},
}
impl DropPolicy {
pub fn red(min_bytes: u64, max_bytes: u64, buffer_bytes: u64, max_prob: f64) -> Self {
assert!(
min_bytes <= max_bytes,
"DropPolicy::red: min_bytes ({min_bytes}) must be <= max_bytes ({max_bytes})"
);
assert!(
max_bytes <= buffer_bytes,
"DropPolicy::red: max_bytes ({max_bytes}) must be <= buffer_bytes ({buffer_bytes})"
);
assert!(
max_prob > 0.0 && max_prob <= 1.0,
"DropPolicy::red: max_prob ({max_prob}) must be in (0.0, 1.0]"
);
DropPolicy::RandomEarlyDetection {
min_bytes,
max_bytes,
max_prob,
}
}
}
use crate::rng::SimRng;
use crate::sim::Simulation;
use crate::time::{Duration, Time};
#[derive(Debug, Clone)]
pub struct Message<P> {
pub id: MessageId,
pub src: NodeId,
pub dst: NodeId,
pub payload: P,
}
impl<P> Message<P> {
pub fn new(id: MessageId, src: NodeId, dst: NodeId, payload: P) -> Self {
Message {
id,
src,
dst,
payload,
}
}
}
#[derive(Debug, Clone)]
pub enum NetEvent<P> {
Deliver(Message<P>),
Drop {
message: Message<P>,
reason: DropReason,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DropReason {
NoRoute,
PacketLoss,
Partitioned,
BufferOverflow,
}
#[derive(Debug, Clone)]
pub struct NetConfig {
pub packet_loss: f64,
pub drop_in_flight_on_failure: bool,
}
impl Default for NetConfig {
fn default() -> Self {
NetConfig {
packet_loss: 0.0,
drop_in_flight_on_failure: false,
}
}
}
#[derive(Debug)]
pub struct Network<P, L: LatencyModel = FixedLatency> {
sim: Simulation<NetEvent<P>>,
topology: Topology,
latency_model: L,
rng: SimRng,
next_message_id: u64,
config: NetConfig,
partitions: std::collections::HashSet<(NodeId, NodeId)>,
bandwidths: std::collections::HashMap<(NodeId, NodeId), BandwidthState>,
link_busy: std::collections::HashMap<(NodeId, NodeId), Time>,
}
#[derive(Debug, Clone, Copy)]
struct BandwidthState {
bps: u64,
link_free_at: Time,
}
fn transmission_duration(bps: u64, size_bytes: u64) -> Duration {
if bps == 0 || size_bytes == 0 {
return Duration::ZERO;
}
let numerator = size_bytes.saturating_mul(1_000_000_000);
let nanos = numerator.div_ceil(bps);
Duration::from_nanos(nanos)
}
fn sweep_failure_drops<P>(
sim: &mut Simulation<NetEvent<P>>,
topology: &mut Topology,
partitions: &std::collections::HashSet<(NodeId, NodeId)>,
now: Time,
) {
sim.rewrite_queue(|s| match s.event {
NetEvent::Deliver(msg) => {
if partitions.contains(&partition_key(msg.src, msg.dst)) {
Some((
now,
NetEvent::Drop {
message: msg,
reason: DropReason::Partitioned,
},
))
} else if topology.route(msg.src, msg.dst).is_none() {
Some((
now,
NetEvent::Drop {
message: msg,
reason: DropReason::NoRoute,
},
))
} else {
Some((s.time, NetEvent::Deliver(msg)))
}
}
other => Some((s.time, other)),
});
}
fn partition_key(a: NodeId, b: NodeId) -> (NodeId, NodeId) {
if a.as_u32() <= b.as_u32() {
(a, b)
} else {
(b, a)
}
}
fn pending_bytes_at(bps: u64, busy_until: Time, now: Time) -> u64 {
if bps == 0 || busy_until <= now {
return 0;
}
let remaining_ns = busy_until.as_nanos().saturating_sub(now.as_nanos());
let bytes = (remaining_ns as u128 * bps as u128) / 1_000_000_000u128;
u64::try_from(bytes).unwrap_or(u64::MAX)
}
#[derive(Debug, Clone, Copy)]
enum PerLinkOutcome {
Admitted(Duration),
Overflow,
}
fn policy_wants_drop(policy: DropPolicy, pending: u64, rng: &mut SimRng) -> bool {
match policy {
DropPolicy::TailDrop => false,
DropPolicy::RandomEarlyDetection {
min_bytes,
max_bytes,
max_prob,
} => {
if pending >= max_bytes {
true
} else if pending < min_bytes || max_bytes == min_bytes {
false
} else {
let span = (max_bytes - min_bytes) as f64;
let over = (pending - min_bytes) as f64;
let prob = max_prob * (over / span);
rng.bool(prob)
}
}
}
}
fn walk_per_link_bandwidth(
topology: &mut Topology,
link_busy: &mut std::collections::HashMap<(NodeId, NodeId), Time>,
rng: &mut SimRng,
src: NodeId,
dst: NodeId,
size_bytes: u64,
arrival_at_src_now: Time,
) -> PerLinkOutcome {
let mut current = src;
let mut hop_time = arrival_at_src_now;
let mut accumulated_link_latency = Duration::ZERO;
let mut admissions: Vec<((NodeId, NodeId), Time)> = Vec::new();
while current != dst {
let next = match topology.route(current, dst) {
Some(r) if r.hop_count > 0 => r.next_hop,
_ => return PerLinkOutcome::Admitted(Duration::ZERO),
};
let link = topology.link_between(current, next).copied();
let link_latency = link.map(|l| l.latency).unwrap_or(Duration::ZERO);
let link_bandwidth = link.and_then(|l| l.bandwidth);
let link_buffer = link.and_then(|l| l.buffer_bytes);
let link_policy = link.map(|l| l.drop_policy).unwrap_or_default();
if let Some(bps) = link_bandwidth {
let busy = link_busy
.get(&(current, next))
.copied()
.unwrap_or(Time::ZERO);
if let Some(buf) = link_buffer {
let pending = pending_bytes_at(bps, busy, hop_time);
if policy_wants_drop(link_policy, pending, rng) {
return PerLinkOutcome::Overflow;
}
if pending.saturating_add(size_bytes) > buf {
return PerLinkOutcome::Overflow;
}
}
let tx = transmission_duration(bps, size_bytes);
let departure = hop_time.max(busy);
let free_at = departure + tx;
admissions.push(((current, next), free_at));
hop_time = free_at + link_latency;
} else {
hop_time += link_latency;
}
accumulated_link_latency += link_latency;
current = next;
}
for (key, t) in admissions {
link_busy.insert(key, t);
}
let walk_elapsed = hop_time.saturating_duration_since(arrival_at_src_now);
PerLinkOutcome::Admitted(walk_elapsed.saturating_sub(accumulated_link_latency))
}
impl<P> Network<P, FixedLatency> {
pub fn new(topology: Topology, seed: u64) -> Self {
Network {
sim: Simulation::new(),
topology,
latency_model: FixedLatency,
rng: SimRng::new(seed),
next_message_id: 0,
config: NetConfig::default(),
partitions: std::collections::HashSet::new(),
bandwidths: std::collections::HashMap::new(),
link_busy: std::collections::HashMap::new(),
}
}
}
impl<P, L: LatencyModel> Network<P, L> {
pub fn with_latency_model(topology: Topology, latency_model: L, seed: u64) -> Self {
Network {
sim: Simulation::new(),
topology,
latency_model,
rng: SimRng::new(seed),
next_message_id: 0,
config: NetConfig::default(),
partitions: std::collections::HashSet::new(),
bandwidths: std::collections::HashMap::new(),
link_busy: std::collections::HashMap::new(),
}
}
pub fn set_bandwidth(&mut self, src: NodeId, dst: NodeId, bps: u64) {
if bps == 0 {
self.bandwidths.remove(&(src, dst));
} else {
self.bandwidths
.entry((src, dst))
.and_modify(|s| s.bps = bps)
.or_insert(BandwidthState {
bps,
link_free_at: Time::ZERO,
});
}
}
pub fn bandwidth(&self, src: NodeId, dst: NodeId) -> Option<u64> {
self.bandwidths.get(&(src, dst)).map(|s| s.bps)
}
pub fn partition(&mut self, a: NodeId, b: NodeId) {
self.partitions.insert(partition_key(a, b));
self.maybe_sweep_in_flight();
}
pub fn heal(&mut self, a: NodeId, b: NodeId) {
self.partitions.remove(&partition_key(a, b));
}
fn maybe_sweep_in_flight(&mut self) {
if self.config.drop_in_flight_on_failure {
let now = self.sim.now();
sweep_failure_drops(&mut self.sim, &mut self.topology, &self.partitions, now);
}
}
pub fn is_partitioned(&self, a: NodeId, b: NodeId) -> bool {
self.partitions.contains(&partition_key(a, b))
}
pub fn fail_link(&mut self, a: NodeId, b: NodeId) {
self.topology.fail_link(a, b);
self.maybe_sweep_in_flight();
}
pub fn heal_link(&mut self, a: NodeId, b: NodeId) {
self.topology.heal_link(a, b);
}
pub fn is_link_failed(&self, a: NodeId, b: NodeId) -> bool {
self.topology.is_link_failed(a, b)
}
pub fn fail_link_directed(&mut self, src: NodeId, dst: NodeId) {
self.topology.fail_link_directed(src, dst);
self.maybe_sweep_in_flight();
}
pub fn heal_link_directed(&mut self, src: NodeId, dst: NodeId) {
self.topology.heal_link_directed(src, dst);
}
pub fn is_link_failed_directed(&self, src: NodeId, dst: NodeId) -> bool {
self.topology.is_link_failed_directed(src, dst)
}
pub fn fail_node(&mut self, node: NodeId) {
self.topology.fail_node(node);
self.maybe_sweep_in_flight();
}
pub fn heal_node(&mut self, node: NodeId) {
self.topology.heal_node(node);
}
pub fn is_node_failed(&self, node: NodeId) -> bool {
self.topology.is_node_failed(node)
}
pub fn with_config(mut self, config: NetConfig) -> Self {
self.config = config;
self
}
#[inline]
pub fn now(&self) -> Time {
self.sim.now()
}
#[inline]
pub fn topology(&self) -> &Topology {
&self.topology
}
#[inline]
pub fn topology_mut(&mut self) -> &mut Topology {
&mut self.topology
}
pub fn send(&mut self, src: NodeId, dst: NodeId, payload: P) -> Option<MessageId> {
self.enqueue_send(self.sim.now(), src, dst, payload, 0)
}
pub fn send_sized(
&mut self,
src: NodeId,
dst: NodeId,
payload: P,
size_bytes: u64,
) -> Option<MessageId> {
self.enqueue_send(self.sim.now(), src, dst, payload, size_bytes)
}
pub fn send_at(
&mut self,
time: Time,
src: NodeId,
dst: NodeId,
payload: P,
) -> Option<MessageId> {
self.enqueue_send(time, src, dst, payload, 0)
}
pub fn send_at_sized(
&mut self,
time: Time,
src: NodeId,
dst: NodeId,
payload: P,
size_bytes: u64,
) -> Option<MessageId> {
self.enqueue_send(time, src, dst, payload, size_bytes)
}
fn enqueue_send(
&mut self,
base_time: Time,
src: NodeId,
dst: NodeId,
payload: P,
size_bytes: u64,
) -> Option<MessageId> {
let id = MessageId::new(self.next_message_id);
self.next_message_id += 1;
let message = Message::new(id, src, dst, payload);
if self.partitions.contains(&partition_key(src, dst)) {
self.sim.schedule(
base_time,
NetEvent::Drop {
message,
reason: DropReason::Partitioned,
},
);
return Some(id);
}
if self.config.packet_loss > 0.0 && self.rng.bool(self.config.packet_loss) {
self.sim.schedule(
base_time,
NetEvent::Drop {
message,
reason: DropReason::PacketLoss,
},
);
return Some(id);
}
let route = match self.topology.route(src, dst) {
Some(r) => r,
None => {
self.sim.schedule(
base_time,
NetEvent::Drop {
message,
reason: DropReason::NoRoute,
},
);
return Some(id);
}
};
let latency = self
.latency_model
.compute(route.total_latency, &mut self.rng);
let (departure_time, pair_tx) = match self.bandwidths.get_mut(&(src, dst)) {
Some(state) if size_bytes > 0 => {
let earliest = base_time.max(state.link_free_at);
let tx = transmission_duration(state.bps, size_bytes);
let free_at = earliest + tx;
state.link_free_at = free_at;
(earliest, tx)
}
_ => (base_time, Duration::ZERO),
};
let link_bandwidth_delay = if size_bytes > 0 {
match self.apply_per_link_bandwidth(src, dst, size_bytes, departure_time + pair_tx) {
PerLinkOutcome::Admitted(d) => d,
PerLinkOutcome::Overflow => {
self.sim.schedule(
base_time,
NetEvent::Drop {
message,
reason: DropReason::BufferOverflow,
},
);
return Some(id);
}
}
} else {
Duration::ZERO
};
let delivery_time = departure_time + pair_tx + latency + link_bandwidth_delay;
self.sim.schedule(delivery_time, NetEvent::Deliver(message));
Some(id)
}
fn apply_per_link_bandwidth(
&mut self,
src: NodeId,
dst: NodeId,
size_bytes: u64,
arrival_at_src_now: Time,
) -> PerLinkOutcome {
walk_per_link_bandwidth(
&mut self.topology,
&mut self.link_busy,
&mut self.rng,
src,
dst,
size_bytes,
arrival_at_src_now,
)
}
pub fn schedule(&mut self, time: Time, event: NetEvent<P>) -> u64 {
self.sim.schedule(time, event)
}
pub fn schedule_in(&mut self, delay: Duration, event: NetEvent<P>) -> u64 {
self.sim.schedule_in(delay, event)
}
pub fn messages_sent(&self) -> u64 {
self.next_message_id
}
pub fn run<F>(self, handler: F) -> NetworkStats
where
F: FnMut(&mut RunContext<P, L>, NetEvent<P>),
{
let Network {
sim,
topology,
latency_model,
rng,
next_message_id,
config,
partitions,
bandwidths,
link_busy,
} = self;
let mut ctx = RunContext {
now: Time::ZERO,
topology,
latency_model,
rng,
next_message_id,
config,
partitions,
bandwidths,
link_busy,
delivered: 0,
dropped: 0,
pending_sends: Vec::new(),
needs_failure_sweep: false,
};
let mut handler = handler;
let stats = sim.run(|sim, event| {
ctx.now = sim.now();
match &event {
NetEvent::Deliver(_) => ctx.delivered += 1,
NetEvent::Drop { .. } => ctx.dropped += 1,
}
handler(&mut ctx, event);
if ctx.needs_failure_sweep && ctx.config.drop_in_flight_on_failure {
ctx.needs_failure_sweep = false;
let now = sim.now();
sweep_failure_drops(sim, &mut ctx.topology, &ctx.partitions, now);
} else {
ctx.needs_failure_sweep = false;
}
for (src, dst, payload, size_bytes) in ctx.pending_sends.drain(..) {
let id = MessageId::new(ctx.next_message_id);
ctx.next_message_id += 1;
let message = Message::new(id, src, dst, payload);
if ctx.partitions.contains(&partition_key(src, dst)) {
sim.schedule(
sim.now(),
NetEvent::Drop {
message,
reason: DropReason::Partitioned,
},
);
continue;
}
if ctx.config.packet_loss > 0.0 && ctx.rng.bool(ctx.config.packet_loss) {
sim.schedule(
sim.now(),
NetEvent::Drop {
message,
reason: DropReason::PacketLoss,
},
);
continue;
}
let route = match ctx.topology.route(src, dst) {
Some(r) => r,
None => {
sim.schedule(
sim.now(),
NetEvent::Drop {
message,
reason: DropReason::NoRoute,
},
);
continue;
}
};
let latency = ctx.latency_model.compute(route.total_latency, &mut ctx.rng);
let base_time = sim.now();
let (departure_time, pair_tx) = match ctx.bandwidths.get_mut(&(src, dst)) {
Some(state) if size_bytes > 0 => {
let earliest = base_time.max(state.link_free_at);
let tx = transmission_duration(state.bps, size_bytes);
let free_at = earliest + tx;
state.link_free_at = free_at;
(earliest, tx)
}
_ => (base_time, Duration::ZERO),
};
let link_bandwidth_delay = if size_bytes > 0 {
match walk_per_link_bandwidth(
&mut ctx.topology,
&mut ctx.link_busy,
&mut ctx.rng,
src,
dst,
size_bytes,
departure_time + pair_tx,
) {
PerLinkOutcome::Admitted(d) => d,
PerLinkOutcome::Overflow => {
sim.schedule(
base_time,
NetEvent::Drop {
message,
reason: DropReason::BufferOverflow,
},
);
continue;
}
}
} else {
Duration::ZERO
};
let delivery_time = departure_time + pair_tx + latency + link_bandwidth_delay;
sim.schedule(delivery_time, NetEvent::Deliver(message));
}
});
NetworkStats {
events_processed: stats.events_processed,
final_time: stats.final_time,
messages_sent: ctx.next_message_id,
messages_delivered: ctx.delivered,
messages_dropped: ctx.dropped,
}
}
}
#[derive(Debug)]
pub struct RunContext<P, L: LatencyModel> {
now: Time,
topology: Topology,
latency_model: L,
rng: SimRng,
next_message_id: u64,
config: NetConfig,
partitions: std::collections::HashSet<(NodeId, NodeId)>,
bandwidths: std::collections::HashMap<(NodeId, NodeId), BandwidthState>,
link_busy: std::collections::HashMap<(NodeId, NodeId), Time>,
delivered: u64,
dropped: u64,
pending_sends: Vec<(NodeId, NodeId, P, u64)>,
needs_failure_sweep: bool,
}
impl<P, L: LatencyModel> RunContext<P, L> {
pub fn now(&self) -> Time {
self.now
}
pub fn send(&mut self, src: NodeId, dst: NodeId, payload: P) {
self.pending_sends.push((src, dst, payload, 0));
}
pub fn send_sized(&mut self, src: NodeId, dst: NodeId, payload: P, size_bytes: u64) {
self.pending_sends.push((src, dst, payload, size_bytes));
}
pub fn set_bandwidth(&mut self, src: NodeId, dst: NodeId, bps: u64) {
if bps == 0 {
self.bandwidths.remove(&(src, dst));
} else {
self.bandwidths
.entry((src, dst))
.and_modify(|s| s.bps = bps)
.or_insert(BandwidthState {
bps,
link_free_at: Time::ZERO,
});
}
}
pub fn bandwidth(&self, src: NodeId, dst: NodeId) -> Option<u64> {
self.bandwidths.get(&(src, dst)).map(|s| s.bps)
}
pub fn partition(&mut self, a: NodeId, b: NodeId) {
self.partitions.insert(partition_key(a, b));
self.needs_failure_sweep = true;
}
pub fn heal(&mut self, a: NodeId, b: NodeId) {
self.partitions.remove(&partition_key(a, b));
}
pub fn is_partitioned(&self, a: NodeId, b: NodeId) -> bool {
self.partitions.contains(&partition_key(a, b))
}
pub fn fail_link(&mut self, a: NodeId, b: NodeId) {
self.topology.fail_link(a, b);
self.needs_failure_sweep = true;
}
pub fn heal_link(&mut self, a: NodeId, b: NodeId) {
self.topology.heal_link(a, b);
}
pub fn is_link_failed(&self, a: NodeId, b: NodeId) -> bool {
self.topology.is_link_failed(a, b)
}
pub fn fail_link_directed(&mut self, src: NodeId, dst: NodeId) {
self.topology.fail_link_directed(src, dst);
self.needs_failure_sweep = true;
}
pub fn heal_link_directed(&mut self, src: NodeId, dst: NodeId) {
self.topology.heal_link_directed(src, dst);
}
pub fn is_link_failed_directed(&self, src: NodeId, dst: NodeId) -> bool {
self.topology.is_link_failed_directed(src, dst)
}
pub fn fail_node(&mut self, node: NodeId) {
self.topology.fail_node(node);
self.needs_failure_sweep = true;
}
pub fn heal_node(&mut self, node: NodeId) {
self.topology.heal_node(node);
}
pub fn is_node_failed(&self, node: NodeId) -> bool {
self.topology.is_node_failed(node)
}
pub fn topology(&self) -> &Topology {
&self.topology
}
pub fn topology_mut(&mut self) -> &mut Topology {
&mut self.topology
}
pub fn rng(&mut self) -> &mut SimRng {
&mut self.rng
}
}
#[derive(Debug, Clone, Default)]
pub struct NetworkStats {
pub events_processed: u64,
pub final_time: Time,
pub messages_sent: u64,
pub messages_delivered: u64,
pub messages_dropped: u64,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn basic_message_delivery() {
let topo = TopologyBuilder::new(3)
.link(0u32, 1u32, Duration::from_millis(10))
.link(1u32, 2u32, Duration::from_millis(20))
.build();
let mut net = Network::new(topo, 42);
net.send(NodeId(0), NodeId(2), "hello");
let mut delivered = Vec::new();
let stats = net.run(|_ctx, event| {
if let NetEvent::Deliver(msg) = event {
delivered.push((msg.src, msg.dst, msg.payload));
}
});
assert_eq!(stats.messages_delivered, 1);
assert_eq!(delivered.len(), 1);
assert_eq!(delivered[0], (NodeId(0), NodeId(2), "hello"));
assert_eq!(stats.final_time, Time::from_millis(30));
}
#[test]
fn no_route_drops_message() {
let topo = TopologyBuilder::new(3)
.link(0u32, 1u32, Duration::from_millis(10))
.build();
let mut net = Network::new(topo, 42);
net.send(NodeId(0), NodeId(2), "unreachable");
let mut dropped = Vec::new();
let stats = net.run(|_ctx, event| {
if let NetEvent::Drop { message, reason } = event {
dropped.push((message.dst, reason));
}
});
assert_eq!(stats.messages_dropped, 1);
assert_eq!(dropped[0], (NodeId(2), DropReason::NoRoute));
}
#[test]
fn jitter_model() {
let topo = TopologyBuilder::new(2)
.link(0u32, 1u32, Duration::from_millis(100))
.build();
let latency_model = UniformJitter::new(Duration::from_millis(10));
let mut net = Network::with_latency_model(topo, latency_model, 42);
for _ in 0..10 {
net.send(NodeId(0), NodeId(1), ());
}
net.run(|_ctx, event| {
if let NetEvent::Deliver(_) = event {
}
});
}
#[test]
fn packet_loss() {
let topo = TopologyBuilder::new(2)
.link(0u32, 1u32, Duration::from_millis(10))
.build();
let config = NetConfig {
packet_loss: 0.5,
..Default::default()
};
let mut net = Network::new(topo, 42).with_config(config);
for _ in 0..100 {
net.send(NodeId(0), NodeId(1), ());
}
let stats = net.run(|_ctx, _event| {});
assert!(stats.messages_delivered > 0);
assert!(stats.messages_dropped > 0);
assert_eq!(stats.messages_delivered + stats.messages_dropped, 100);
}
#[test]
fn reply_in_handler() {
let topo = TopologyBuilder::new(2)
.link(0u32, 1u32, Duration::from_millis(10))
.build();
let mut net = Network::new(topo, 42);
net.send(NodeId(0), NodeId(1), "ping");
let mut messages = Vec::new();
let stats = net.run(|ctx, event| {
if let NetEvent::Deliver(msg) = event {
messages.push(msg.payload);
if msg.payload == "ping" {
ctx.send(msg.dst, msg.src, "pong");
}
}
});
assert_eq!(stats.messages_delivered, 2);
assert_eq!(messages, vec!["ping", "pong"]);
}
#[test]
fn transmission_duration_helper() {
assert_eq!(
transmission_duration(1_000_000, 1_000_000),
Duration::from_secs(1)
);
let d = transmission_duration(3, 1);
assert_eq!(d.as_nanos(), 333_333_334);
assert_eq!(transmission_duration(0, 100), Duration::ZERO);
assert_eq!(transmission_duration(100, 0), Duration::ZERO);
}
#[test]
fn unsized_send_ignores_bandwidth() {
let topo = TopologyBuilder::new(2)
.link(0u32, 1u32, Duration::from_millis(10))
.build();
let mut net: Network<u32> = Network::new(topo, 42);
net.set_bandwidth(NodeId(0), NodeId(1), 1);
net.send(NodeId(0), NodeId(1), 0);
net.send(NodeId(0), NodeId(1), 0);
let stats = net.run(|_ctx, _event| {});
assert_eq!(stats.messages_delivered, 2);
assert_eq!(stats.final_time, Time::from_millis(10));
}
#[test]
fn send_sized_delays_by_transmission_time() {
let topo = TopologyBuilder::new(2)
.link(0u32, 1u32, Duration::from_millis(10))
.build();
let mut net: Network<u32> = Network::new(topo, 42);
net.set_bandwidth(NodeId(0), NodeId(1), 1_000_000);
assert_eq!(net.bandwidth(NodeId(0), NodeId(1)), Some(1_000_000));
net.send_sized(NodeId(0), NodeId(1), 0, 1_000);
let stats = net.run(|_ctx, _event| {});
assert_eq!(stats.messages_delivered, 1);
assert_eq!(stats.final_time, Time::from_millis(11));
}
#[test]
fn bandwidth_serializes_concurrent_sends() {
let topo = TopologyBuilder::new(2)
.link(0u32, 1u32, Duration::from_millis(0))
.build();
let mut net: Network<u32> = Network::new(topo, 42);
net.set_bandwidth(NodeId(0), NodeId(1), 1_000_000);
net.send_sized(NodeId(0), NodeId(1), 0, 1_000_000);
net.send_sized(NodeId(0), NodeId(1), 1, 1_000_000);
net.send_sized(NodeId(0), NodeId(1), 2, 1_000_000);
let mut arrivals = Vec::new();
net.run(|ctx, event| {
if let NetEvent::Deliver(msg) = event {
arrivals.push((msg.payload, ctx.now()));
}
});
arrivals.sort_by_key(|(_, t)| *t);
assert_eq!(arrivals.len(), 3);
assert_eq!(arrivals[0], (0, Time::from_secs(1)));
assert_eq!(arrivals[1], (1, Time::from_secs(2)));
assert_eq!(arrivals[2], (2, Time::from_secs(3)));
}
#[test]
fn set_bandwidth_zero_clears_cap() {
let topo = TopologyBuilder::new(2)
.link(0u32, 1u32, Duration::from_millis(0))
.build();
let mut net: Network<u32> = Network::new(topo, 42);
net.set_bandwidth(NodeId(0), NodeId(1), 1_000_000);
assert!(net.bandwidth(NodeId(0), NodeId(1)).is_some());
net.set_bandwidth(NodeId(0), NodeId(1), 0);
assert!(net.bandwidth(NodeId(0), NodeId(1)).is_none());
net.send_sized(NodeId(0), NodeId(1), 0, 1_000_000);
let stats = net.run(|_ctx, _event| {});
assert_eq!(stats.messages_delivered, 1);
assert_eq!(stats.final_time, Time::ZERO);
}
#[test]
fn per_link_bandwidth_on_direct_edge() {
let topo = TopologyBuilder::new(2)
.link_with_capacity(0u32, 1u32, Duration::from_millis(10), 1_000_000)
.build();
let mut net: Network<u32> = Network::new(topo, 42);
net.send_sized(NodeId(0), NodeId(1), 0, 1_000);
let stats = net.run(|_ctx, _event| {});
assert_eq!(stats.messages_delivered, 1);
assert_eq!(stats.final_time, Time::from_millis(11));
}
#[test]
fn per_link_bandwidth_bottleneck_middle_of_path() {
let topo = TopologyBuilder::new(3)
.link(0u32, 1u32, Duration::from_millis(1))
.link_with_capacity(1u32, 2u32, Duration::from_millis(1), 1_000_000)
.build();
let mut net: Network<u32> = Network::new(topo, 42);
net.send_sized(NodeId(0), NodeId(2), 0, 1_000_000);
let stats = net.run(|_ctx, _event| {});
assert_eq!(stats.messages_delivered, 1);
assert_eq!(stats.final_time, Time::from_millis(1002));
}
#[test]
fn per_link_bandwidth_shared_across_flows() {
let topo = TopologyBuilder::new(4)
.link(0u32, 1u32, Duration::from_millis(1))
.link(3u32, 1u32, Duration::from_millis(1))
.link_with_capacity(1u32, 2u32, Duration::from_millis(0), 1_000_000)
.build();
let mut net: Network<u32> = Network::new(topo, 42);
net.send_sized(NodeId(0), NodeId(2), 0, 500_000);
net.send_sized(NodeId(3), NodeId(2), 1, 500_000);
let mut arrivals = Vec::new();
net.run(|ctx, event| {
if let NetEvent::Deliver(msg) = event {
arrivals.push((msg.payload, ctx.now()));
}
});
arrivals.sort_by_key(|(_, t)| *t);
assert_eq!(arrivals.len(), 2);
assert_eq!(arrivals[0].1, Time::from_millis(501));
assert_eq!(arrivals[1].1, Time::from_millis(1001));
}
#[test]
fn per_link_and_pair_bandwidth_stack() {
let topo = TopologyBuilder::new(2)
.link_with_capacity(0u32, 1u32, Duration::from_millis(0), 1_000_000)
.build();
let mut net: Network<u32> = Network::new(topo, 42);
net.set_bandwidth(NodeId(0), NodeId(1), 2_000_000);
net.send_sized(NodeId(0), NodeId(1), 0, 1_000_000);
let stats = net.run(|_ctx, _event| {});
assert_eq!(stats.messages_delivered, 1);
assert_eq!(stats.final_time, Time::from_millis(1500));
}
#[test]
fn bandwidth_is_directional() {
let topo = TopologyBuilder::new(2)
.link(0u32, 1u32, Duration::from_millis(0))
.build();
let mut net: Network<u32> = Network::new(topo, 42);
net.set_bandwidth(NodeId(0), NodeId(1), 1_000_000);
net.send_sized(NodeId(0), NodeId(1), 0, 1_000_000); net.send_sized(NodeId(1), NodeId(0), 1, 1_000_000);
let mut arrivals = Vec::new();
net.run(|ctx, event| {
if let NetEvent::Deliver(msg) = event {
arrivals.push((msg.payload, ctx.now()));
}
});
arrivals.sort_by_key(|(p, _)| *p);
assert_eq!(arrivals[0], (0, Time::from_secs(1)));
assert_eq!(arrivals[1], (1, Time::ZERO));
}
#[test]
fn unbounded_buffer_matches_prior_behavior() {
let topo = TopologyBuilder::new(2)
.link_with_capacity(0u32, 1u32, Duration::from_millis(0), 1_000_000)
.build();
let mut net: Network<u32> = Network::new(topo, 42);
for i in 0..5 {
net.send_sized(NodeId(0), NodeId(1), i, 1_000_000);
}
let stats = net.run(|_ctx, _event| {});
assert_eq!(stats.messages_delivered, 5);
assert_eq!(stats.messages_dropped, 0);
}
#[test]
fn buffer_overflow_drops_excess_sends() {
let topo = TopologyBuilder::new(2)
.link_with_capacity_and_buffer(
0u32,
1u32,
Duration::from_millis(0),
1_000_000,
2_000_000,
)
.build();
let mut net: Network<u32> = Network::new(topo, 42);
let a = net.send_sized(NodeId(0), NodeId(1), 0, 1_000_000);
let b = net.send_sized(NodeId(0), NodeId(1), 1, 1_000_000);
let c = net.send_sized(NodeId(0), NodeId(1), 2, 1_000_000);
assert!(a.is_some() && b.is_some() && c.is_some());
let mut delivered = Vec::new();
let mut dropped = Vec::new();
let stats = net.run(|_ctx, event| match event {
NetEvent::Deliver(msg) => delivered.push(msg.payload),
NetEvent::Drop { message, reason } => dropped.push((message.payload, reason)),
});
assert_eq!(stats.messages_delivered, 2);
assert_eq!(stats.messages_dropped, 1);
assert_eq!(delivered, vec![0, 1]);
assert_eq!(dropped, vec![(2, DropReason::BufferOverflow)]);
}
#[test]
fn buffer_drains_over_time() {
let topo = TopologyBuilder::new(2)
.link_with_capacity_and_buffer(
0u32,
1u32,
Duration::from_millis(0),
1_000_000,
1_500_000,
)
.build();
let mut net: Network<u32> = Network::new(topo, 42);
for i in 0..4 {
net.send_at_sized(
Time::from_secs(2 * i),
NodeId(0),
NodeId(1),
i as u32,
1_000_000,
);
}
let stats = net.run(|_ctx, _event| {});
assert_eq!(stats.messages_delivered, 4);
assert_eq!(stats.messages_dropped, 0);
}
#[test]
fn overflow_on_shared_bottleneck_does_not_consume_upstream_capacity() {
let topo = TopologyBuilder::new(4)
.link(0u32, 1u32, Duration::from_millis(1))
.link(3u32, 1u32, Duration::from_millis(1))
.link_with_capacity_and_buffer(
1u32,
2u32,
Duration::from_millis(0),
1_000_000,
2_000_000,
)
.build();
let mut net: Network<u32> = Network::new(topo, 42);
net.send_sized(NodeId(0), NodeId(2), 0, 1_000_000);
net.send_sized(NodeId(3), NodeId(2), 1, 1_000_000);
net.send_sized(NodeId(0), NodeId(2), 2, 1_000_000);
let mut delivered = Vec::new();
let mut dropped = Vec::new();
net.run(|_ctx, event| match event {
NetEvent::Deliver(msg) => delivered.push(msg.payload),
NetEvent::Drop { message, reason } => dropped.push((message.payload, reason)),
});
assert_eq!(delivered.len(), 2);
assert_eq!(dropped.len(), 1);
assert_eq!(dropped[0].1, DropReason::BufferOverflow);
}
#[test]
fn red_never_drops_below_min_threshold() {
let topo = TopologyBuilder::new(2)
.link_with_policy(
0u32,
1u32,
Duration::from_millis(0),
1_000_000,
10_000_000,
DropPolicy::red(5_000_000, 8_000_000, 10_000_000, 1.0),
)
.build();
let mut net: Network<u32> = Network::new(topo, 42);
for i in 0..3 {
net.send_sized(NodeId(0), NodeId(1), i, 1_000_000);
}
let stats = net.run(|_ctx, _event| {});
assert_eq!(stats.messages_delivered, 3);
assert_eq!(stats.messages_dropped, 0);
}
#[test]
fn red_hard_drops_above_max_threshold() {
let topo = TopologyBuilder::new(2)
.link_with_policy(
0u32,
1u32,
Duration::from_millis(0),
1_000_000,
10_000_000,
DropPolicy::red(2_000_000, 3_000_000, 10_000_000, 1.0),
)
.build();
let mut net: Network<u32> = Network::new(topo, 42);
for i in 0..5 {
net.send_sized(NodeId(0), NodeId(1), i, 1_000_000);
}
let mut delivered = Vec::new();
let mut dropped = Vec::new();
let stats = net.run(|_ctx, event| match event {
NetEvent::Deliver(msg) => delivered.push(msg.payload),
NetEvent::Drop { message, reason } => dropped.push((message.payload, reason)),
});
assert_eq!(stats.messages_delivered, 3);
assert_eq!(stats.messages_dropped, 2);
assert_eq!(delivered, vec![0, 1, 2]);
assert!(
dropped
.iter()
.all(|(_, r)| *r == DropReason::BufferOverflow)
);
}
#[test]
fn red_ramp_drops_are_deterministic_and_nontrivial() {
fn run() -> (u64, u64) {
let topo = TopologyBuilder::new(2)
.link_with_policy(
0u32,
1u32,
Duration::from_millis(0),
1_000_000,
10_000_000,
DropPolicy::red(0, 5_000_000, 10_000_000, 1.0),
)
.build();
let mut net: Network<u32> = Network::new(topo, 0xC0FFEE);
for i in 0..20 {
net.send_sized(NodeId(0), NodeId(1), i, 1_000_000);
}
let stats = net.run(|_ctx, _event| {});
(stats.messages_delivered, stats.messages_dropped)
}
let (d1, x1) = run();
let (d2, x2) = run();
assert_eq!((d1, x1), (d2, x2), "RED admission must be deterministic");
assert_eq!(d1 + x1, 20);
assert!(d1 > 0, "expected some deliveries, got {d1}");
assert!(x1 > 0, "expected some RED drops, got {x1}");
}
#[test]
fn red_drops_more_than_tail_under_sustained_load() {
fn run_with_policy(red: bool) -> u64 {
let topo = if red {
TopologyBuilder::new(2).link_with_policy(
0u32,
1u32,
Duration::from_millis(0),
1_000_000,
10_000_000,
DropPolicy::red(2_000_000, 8_000_000, 10_000_000, 1.0),
)
} else {
TopologyBuilder::new(2).link_with_capacity_and_buffer(
0u32,
1u32,
Duration::from_millis(0),
1_000_000,
10_000_000,
)
}
.build();
let mut net: Network<u32> = Network::new(topo, 7);
for i in 0..30 {
net.send_sized(NodeId(0), NodeId(1), i, 1_000_000);
}
let stats = net.run(|_ctx, _event| {});
stats.messages_dropped
}
let red_drops = run_with_policy(true);
let tail_drops = run_with_policy(false);
assert!(
red_drops > tail_drops,
"expected RED ({red_drops}) to drop strictly more than TailDrop ({tail_drops})"
);
}
#[test]
fn partition_drops_messages() {
let topo = TopologyBuilder::new(3)
.full_mesh(Duration::from_millis(10))
.build();
let mut net = Network::new(topo, 42);
net.partition(NodeId(0), NodeId(1));
assert!(net.is_partitioned(NodeId(0), NodeId(1)));
assert!(net.is_partitioned(NodeId(1), NodeId(0)));
assert!(!net.is_partitioned(NodeId(0), NodeId(2)));
net.send(NodeId(0), NodeId(1), "nope");
net.send(NodeId(1), NodeId(0), "also nope");
net.send(NodeId(0), NodeId(2), "this goes through");
let mut dropped_pairs = Vec::new();
let mut delivered_pairs = Vec::new();
let stats = net.run(|_ctx, event| match event {
NetEvent::Deliver(msg) => delivered_pairs.push((msg.src, msg.dst)),
NetEvent::Drop { message, reason } => {
dropped_pairs.push((message.src, message.dst, reason));
}
});
assert_eq!(stats.messages_delivered, 1);
assert_eq!(stats.messages_dropped, 2);
assert!(
dropped_pairs
.iter()
.all(|(.., r)| *r == DropReason::Partitioned)
);
assert_eq!(delivered_pairs, vec![(NodeId(0), NodeId(2))]);
}
#[test]
fn heal_restores_delivery() {
let topo = TopologyBuilder::new(2)
.link(0u32, 1u32, Duration::from_millis(5))
.build();
let mut net = Network::new(topo, 42);
net.partition(NodeId(0), NodeId(1));
net.send(NodeId(0), NodeId(1), "blocked");
net.heal(NodeId(0), NodeId(1));
net.send(NodeId(0), NodeId(1), "through");
let stats = net.run(|_ctx, _event| {});
assert_eq!(stats.messages_delivered, 1);
assert_eq!(stats.messages_dropped, 1);
}
#[test]
fn asymmetric_link_latency() {
let topo = TopologyBuilder::new(2)
.link_asymmetric(
0u32,
1u32,
Duration::from_millis(5),
Duration::from_millis(20),
)
.build();
let mut net = Network::new(topo, 42);
net.send(NodeId(0), NodeId(1), "fast");
net.send_at(Time::from_millis(100), NodeId(1), NodeId(0), "slow");
let mut arrivals = Vec::new();
net.run(|ctx, event| {
if let NetEvent::Deliver(msg) = event {
arrivals.push((msg.payload, ctx.now()));
}
});
arrivals.sort_by_key(|(_, t)| *t);
assert_eq!(arrivals.len(), 2);
assert_eq!(arrivals[0], ("fast", Time::from_millis(5)));
assert_eq!(arrivals[1], ("slow", Time::from_millis(120)));
}
#[test]
fn spiky_latency_sometimes_adds_extra() {
use crate::net::SpikyLatency;
let topo = TopologyBuilder::new(2)
.link(0u32, 1u32, Duration::from_millis(10))
.build();
let model = SpikyLatency::new(FixedLatency, 1.0, Duration::from_millis(50));
let mut net = Network::with_latency_model(topo, model, 42);
for _ in 0..20 {
net.send(NodeId(0), NodeId(1), ());
}
let mut max_time = Time::ZERO;
net.run(|ctx, event| {
if let NetEvent::Deliver(_) = event
&& ctx.now() > max_time
{
max_time = ctx.now();
}
});
assert!(
max_time > Time::from_millis(10),
"expected at least one spike beyond base latency, got max_time = {}",
max_time
);
}
#[test]
fn fail_link_drops_with_no_route_when_no_alternate() {
let topo = TopologyBuilder::new(3)
.link(0u32, 1u32, Duration::from_millis(10))
.link(1u32, 2u32, Duration::from_millis(10))
.build();
let mut net = Network::new(topo, 42);
net.fail_link(NodeId(0), NodeId(1));
net.send(NodeId(0), NodeId(2), "stuck");
let mut dropped = Vec::new();
let stats = net.run(|_ctx, event| {
if let NetEvent::Drop { reason, .. } = event {
dropped.push(reason);
}
});
assert_eq!(stats.messages_delivered, 0);
assert_eq!(stats.messages_dropped, 1);
assert_eq!(dropped, vec![DropReason::NoRoute]);
}
#[test]
fn fail_link_reroutes_via_alternate_path() {
let topo = TopologyBuilder::new(3)
.link(0u32, 1u32, Duration::from_millis(10))
.link(1u32, 2u32, Duration::from_millis(10))
.link(0u32, 2u32, Duration::from_millis(100))
.build();
let mut net = Network::new(topo, 42);
net.fail_link(NodeId(0), NodeId(1));
net.send(NodeId(0), NodeId(2), "long way");
let mut arrivals = Vec::new();
let stats = net.run(|ctx, event| {
if let NetEvent::Deliver(_) = event {
arrivals.push(ctx.now());
}
});
assert_eq!(stats.messages_delivered, 1);
assert_eq!(arrivals, vec![Time::from_millis(100)]);
}
#[test]
fn fail_link_directed_blocks_only_that_direction() {
let topo = TopologyBuilder::new(2)
.link(0u32, 1u32, Duration::from_millis(10))
.build();
let mut net = Network::new(topo, 42);
net.fail_link_directed(NodeId(0), NodeId(1));
net.send(NodeId(0), NodeId(1), "blocked");
net.send(NodeId(1), NodeId(0), "ok");
let mut delivered = Vec::new();
let mut dropped = Vec::new();
net.run(|_ctx, event| match event {
NetEvent::Deliver(msg) => delivered.push(msg.payload),
NetEvent::Drop { message, reason } => dropped.push((message.payload, reason)),
});
assert_eq!(delivered, vec!["ok"]);
assert_eq!(dropped, vec![("blocked", DropReason::NoRoute)]);
}
#[test]
fn run_context_can_fail_and_heal_mid_run() {
let topo = TopologyBuilder::new(3)
.link(0u32, 1u32, Duration::from_millis(10))
.link(0u32, 2u32, Duration::from_millis(10))
.build();
let mut net: Network<u32> = Network::new(topo, 42);
net.send_at(Time::from_millis(0), NodeId(0), NodeId(0), 100);
net.send_at(Time::from_millis(20), NodeId(0), NodeId(0), 200);
net.send_at(Time::from_millis(40), NodeId(0), NodeId(0), 300);
let mut outcomes = Vec::new();
net.run(|ctx, event| match event {
NetEvent::Deliver(msg) if msg.dst == NodeId(0) => match msg.payload {
100 => {
ctx.send(NodeId(0), NodeId(1), 1);
}
200 => {
ctx.fail_link(NodeId(0), NodeId(1));
ctx.send(NodeId(0), NodeId(1), 2);
}
300 => {
ctx.heal_link(NodeId(0), NodeId(1));
ctx.send(NodeId(0), NodeId(1), 3);
}
_ => {}
},
NetEvent::Deliver(msg) if msg.dst == NodeId(1) => {
outcomes.push((msg.payload, "delivered"));
}
NetEvent::Drop {
message,
reason: DropReason::NoRoute,
} => {
outcomes.push((message.payload, "no_route"));
}
_ => {}
});
assert_eq!(
outcomes,
vec![(1, "delivered"), (2, "no_route"), (3, "delivered")]
);
}
#[test]
fn fail_link_does_not_affect_in_flight_messages() {
let topo = TopologyBuilder::new(2)
.link(0u32, 1u32, Duration::from_millis(10))
.build();
let mut net: Network<&'static str> = Network::new(topo, 42);
net.send(NodeId(0), NodeId(1), "in_flight");
net.send_at(Time::from_millis(5), NodeId(1), NodeId(1), "tick");
let mut delivered = Vec::new();
net.run(|ctx, event| {
if let NetEvent::Deliver(msg) = event {
if msg.payload == "tick" {
ctx.fail_link(NodeId(0), NodeId(1));
}
delivered.push((msg.payload, ctx.now()));
}
});
assert!(
delivered.contains(&("in_flight", Time::from_millis(10))),
"in-flight message must survive mid-flight fail; got {delivered:?}"
);
}
#[test]
fn fail_node_drops_with_no_route_when_no_alternate() {
let topo = TopologyBuilder::new(3)
.link(0u32, 1u32, Duration::from_millis(10))
.link(1u32, 2u32, Duration::from_millis(10))
.build();
let mut net = Network::new(topo, 42);
net.fail_node(NodeId(1));
assert!(net.is_node_failed(NodeId(1)));
net.send(NodeId(0), NodeId(2), "stuck");
let mut dropped = Vec::new();
let stats = net.run(|_ctx, event| {
if let NetEvent::Drop { reason, .. } = event {
dropped.push(reason);
}
});
assert_eq!(stats.messages_delivered, 0);
assert_eq!(dropped, vec![DropReason::NoRoute]);
}
#[test]
fn fail_node_reroutes_via_alternate_path() {
let topo = TopologyBuilder::new(4)
.link(0u32, 1u32, Duration::from_millis(5))
.link(1u32, 3u32, Duration::from_millis(5))
.link(0u32, 2u32, Duration::from_millis(50))
.link(2u32, 3u32, Duration::from_millis(50))
.build();
let mut net = Network::new(topo, 42);
net.fail_node(NodeId(1));
net.send(NodeId(0), NodeId(3), "long way");
let mut arrivals = Vec::new();
let stats = net.run(|ctx, event| {
if let NetEvent::Deliver(_) = event {
arrivals.push(ctx.now());
}
});
assert_eq!(stats.messages_delivered, 1);
assert_eq!(arrivals, vec![Time::from_millis(100)]);
}
#[test]
fn fail_node_blocks_sends_targeting_failed_node() {
let topo = TopologyBuilder::new(2)
.link(0u32, 1u32, Duration::from_millis(10))
.build();
let mut net = Network::new(topo, 42);
net.fail_node(NodeId(1));
net.send(NodeId(0), NodeId(1), "to_dead_node");
net.send(NodeId(1), NodeId(0), "from_dead_node");
let mut dropped = Vec::new();
let stats = net.run(|_ctx, event| {
if let NetEvent::Drop { message, reason } = event {
dropped.push((message.payload, reason));
}
});
assert_eq!(stats.messages_delivered, 0);
assert_eq!(stats.messages_dropped, 2);
assert!(dropped.iter().all(|(_, r)| *r == DropReason::NoRoute));
}
#[test]
fn run_context_can_fail_and_heal_node_mid_run() {
let topo = TopologyBuilder::new(4)
.link(0u32, 1u32, Duration::from_millis(5))
.link(1u32, 3u32, Duration::from_millis(5))
.link(0u32, 2u32, Duration::from_millis(50))
.link(2u32, 3u32, Duration::from_millis(50))
.build();
let mut net: Network<u32> = Network::new(topo, 42);
net.send_at(Time::from_millis(0), NodeId(0), NodeId(0), 100);
net.send_at(Time::from_millis(200), NodeId(0), NodeId(0), 200);
net.send_at(Time::from_millis(400), NodeId(0), NodeId(0), 300);
let mut hops_taken = Vec::new();
net.run(|ctx, event| match event {
NetEvent::Deliver(msg) if msg.dst == NodeId(0) => match msg.payload {
100 => {
ctx.send(NodeId(0), NodeId(3), 1);
}
200 => {
ctx.fail_node(NodeId(1));
ctx.send(NodeId(0), NodeId(3), 2);
}
300 => {
ctx.heal_node(NodeId(1));
ctx.send(NodeId(0), NodeId(3), 3);
}
_ => {}
},
NetEvent::Deliver(msg) if msg.dst == NodeId(3) => {
hops_taken.push((msg.payload, ctx.now()));
}
_ => {}
});
assert_eq!(hops_taken.len(), 3);
assert_eq!(hops_taken[0], (1, Time::from_millis(10)));
assert_eq!(hops_taken[1], (2, Time::from_millis(300)));
assert_eq!(hops_taken[2], (3, Time::from_millis(410)));
}
fn drop_in_flight_config() -> NetConfig {
NetConfig {
drop_in_flight_on_failure: true,
..Default::default()
}
}
#[test]
fn drop_in_flight_default_off_preserves_in_flight_messages() {
let topo = TopologyBuilder::new(2)
.link(0u32, 1u32, Duration::from_millis(10))
.build();
let mut net = Network::new(topo, 42);
net.send(NodeId(0), NodeId(1), "in_flight");
net.partition(NodeId(0), NodeId(1));
let stats = net.run(|_ctx, _event| {});
assert_eq!(stats.messages_delivered, 1);
assert_eq!(stats.messages_dropped, 0);
}
#[test]
fn drop_in_flight_partition_rewrites_pending_delivery() {
let topo = TopologyBuilder::new(2)
.link(0u32, 1u32, Duration::from_millis(10))
.build();
let mut net = Network::new(topo, 42).with_config(drop_in_flight_config());
net.send(NodeId(0), NodeId(1), "in_flight");
net.partition(NodeId(0), NodeId(1));
let mut dropped = Vec::new();
let stats = net.run(|ctx, event| {
if let NetEvent::Drop { message, reason } = event {
dropped.push((message.payload, reason, ctx.now()));
}
});
assert_eq!(stats.messages_delivered, 0);
assert_eq!(stats.messages_dropped, 1);
assert_eq!(
dropped,
vec![("in_flight", DropReason::Partitioned, Time::ZERO)]
);
}
#[test]
fn drop_in_flight_link_failure_drops_when_no_alternate() {
let topo = TopologyBuilder::new(3)
.link(0u32, 1u32, Duration::from_millis(10))
.link(1u32, 2u32, Duration::from_millis(10))
.build();
let mut net = Network::new(topo, 42).with_config(drop_in_flight_config());
net.send(NodeId(0), NodeId(2), "doomed");
net.fail_link(NodeId(0), NodeId(1));
let mut dropped = Vec::new();
let stats = net.run(|_ctx, event| {
if let NetEvent::Drop { message, reason } = event {
dropped.push((message.payload, reason));
}
});
assert_eq!(stats.messages_delivered, 0);
assert_eq!(dropped, vec![("doomed", DropReason::NoRoute)]);
}
#[test]
fn drop_in_flight_link_failure_preserves_when_alternate_exists() {
let topo = TopologyBuilder::new(3)
.link(0u32, 1u32, Duration::from_millis(10))
.link(1u32, 2u32, Duration::from_millis(10))
.link(0u32, 2u32, Duration::from_millis(100))
.build();
let mut net = Network::new(topo, 42).with_config(drop_in_flight_config());
net.send(NodeId(0), NodeId(2), "still_ok");
net.fail_link(NodeId(0), NodeId(1));
let stats = net.run(|_ctx, _event| {});
assert_eq!(stats.messages_delivered, 1);
assert_eq!(stats.messages_dropped, 0);
}
#[test]
fn drop_in_flight_node_failure_drops_when_node_is_dst() {
let topo = TopologyBuilder::new(2)
.link(0u32, 1u32, Duration::from_millis(10))
.build();
let mut net = Network::new(topo, 42).with_config(drop_in_flight_config());
net.send(NodeId(0), NodeId(1), "to_dead");
net.fail_node(NodeId(1));
let mut dropped = Vec::new();
let stats = net.run(|_ctx, event| {
if let NetEvent::Drop { message, reason } = event {
dropped.push((message.payload, reason));
}
});
assert_eq!(stats.messages_delivered, 0);
assert_eq!(dropped, vec![("to_dead", DropReason::NoRoute)]);
}
#[test]
fn drop_in_flight_works_mid_run_via_run_context() {
let topo = TopologyBuilder::new(2)
.link(0u32, 1u32, Duration::from_millis(10))
.build();
let mut net: Network<&'static str> =
Network::new(topo, 42).with_config(drop_in_flight_config());
net.send(NodeId(0), NodeId(1), "doomed");
net.send_at(Time::from_millis(5), NodeId(1), NodeId(1), "tick");
let mut events_seen = Vec::new();
net.run(|ctx, event| match event {
NetEvent::Deliver(msg) if msg.payload == "tick" => {
ctx.fail_link(NodeId(0), NodeId(1));
events_seen.push(("tick_at", ctx.now()));
}
NetEvent::Drop { message, reason } => {
events_seen.push((message.payload, ctx.now()));
assert_eq!(reason, DropReason::NoRoute);
}
_ => {}
});
assert_eq!(
events_seen,
vec![
("tick_at", Time::from_millis(5)),
("doomed", Time::from_millis(5))
]
);
}
#[test]
fn deterministic_simulation() {
fn run_sim() -> (Vec<(Time, NodeId)>, NetworkStats) {
let topo = TopologyBuilder::new(4)
.ring(Duration::from_millis(10))
.build();
let latency_model = UniformJitter::new(Duration::from_millis(2));
let mut net = Network::with_latency_model(topo, latency_model, 42);
net.send(NodeId(0), NodeId(2), ());
net.send(NodeId(1), NodeId(3), ());
net.send(NodeId(2), NodeId(0), ());
let mut deliveries = Vec::new();
let stats = net.run(|_ctx, event| {
if let NetEvent::Deliver(msg) = event {
deliveries.push((Time::ZERO, msg.dst)); }
});
(deliveries, stats)
}
let (d1, s1) = run_sim();
let (d2, s2) = run_sim();
assert_eq!(d1, d2);
assert_eq!(s1.final_time, s2.final_time);
}
}