use crate::net::{
DropReason, LatencyModel, NetConfig, NetEvent, Network, NetworkStats, NodeId, Topology,
};
use crate::time::Time;
use crate::trace::{Trace, TraceRecorder};
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum NetTraceEvent {
Delivered {
message_id: u64,
src: u32,
dst: u32,
},
Dropped {
message_id: u64,
src: u32,
dst: u32,
reason: NetTraceDropReason,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum NetTraceDropReason {
NoRoute,
PacketLoss,
Partitioned,
BufferOverflow,
}
impl From<DropReason> for NetTraceDropReason {
fn from(reason: DropReason) -> Self {
match reason {
DropReason::NoRoute => NetTraceDropReason::NoRoute,
DropReason::PacketLoss => NetTraceDropReason::PacketLoss,
DropReason::Partitioned => NetTraceDropReason::Partitioned,
DropReason::BufferOverflow => NetTraceDropReason::BufferOverflow,
}
}
}
pub struct TracedNetwork<P, L: LatencyModel> {
network: Network<P, L>,
seed: u64,
}
impl<P> TracedNetwork<P, crate::net::FixedLatency> {
pub fn new(topology: Topology, seed: u64) -> Self {
TracedNetwork {
network: Network::new(topology, seed),
seed,
}
}
}
impl<P, L: LatencyModel> TracedNetwork<P, L> {
pub fn with_latency_model(topology: Topology, latency_model: L, seed: u64) -> Self {
TracedNetwork {
network: Network::with_latency_model(topology, latency_model, seed),
seed,
}
}
pub fn with_config(mut self, config: NetConfig) -> Self {
self.network = self.network.with_config(config);
self
}
pub fn partition(&mut self, a: NodeId, b: NodeId) {
self.network.partition(a, b);
}
pub fn set_bandwidth(&mut self, src: NodeId, dst: NodeId, bps: u64) {
self.network.set_bandwidth(src, dst, bps);
}
pub fn send_sized(
&mut self,
src: NodeId,
dst: NodeId,
payload: P,
size_bytes: u64,
) -> Option<crate::net::MessageId> {
self.network.send_sized(src, dst, payload, size_bytes)
}
pub fn heal(&mut self, a: NodeId, b: NodeId) {
self.network.heal(a, b);
}
pub fn is_partitioned(&self, a: NodeId, b: NodeId) -> bool {
self.network.is_partitioned(a, b)
}
pub fn fail_link(&mut self, a: NodeId, b: NodeId) {
self.network.fail_link(a, b);
}
pub fn heal_link(&mut self, a: NodeId, b: NodeId) {
self.network.heal_link(a, b);
}
pub fn is_link_failed(&self, a: NodeId, b: NodeId) -> bool {
self.network.is_link_failed(a, b)
}
pub fn fail_link_directed(&mut self, src: NodeId, dst: NodeId) {
self.network.fail_link_directed(src, dst);
}
pub fn heal_link_directed(&mut self, src: NodeId, dst: NodeId) {
self.network.heal_link_directed(src, dst);
}
pub fn is_link_failed_directed(&self, src: NodeId, dst: NodeId) -> bool {
self.network.is_link_failed_directed(src, dst)
}
pub fn fail_node(&mut self, node: NodeId) {
self.network.fail_node(node);
}
pub fn heal_node(&mut self, node: NodeId) {
self.network.heal_node(node);
}
pub fn is_node_failed(&self, node: NodeId) -> bool {
self.network.is_node_failed(node)
}
pub fn now(&self) -> Time {
self.network.now()
}
pub fn topology(&self) -> &Topology {
self.network.topology()
}
pub fn topology_mut(&mut self) -> &mut Topology {
self.network.topology_mut()
}
pub fn send(&mut self, src: NodeId, dst: NodeId, payload: P) -> Option<crate::net::MessageId> {
self.network.send(src, dst, payload)
}
pub fn send_at(
&mut self,
time: Time,
src: NodeId,
dst: NodeId,
payload: P,
) -> Option<crate::net::MessageId> {
self.network.send_at(time, src, dst, payload)
}
pub fn send_at_sized(
&mut self,
time: Time,
src: NodeId,
dst: NodeId,
payload: P,
size_bytes: u64,
) -> Option<crate::net::MessageId> {
self.network
.send_at_sized(time, src, dst, payload, size_bytes)
}
pub fn run_traced<F>(self, mut handler: F) -> (NetworkStats, Trace<NetTraceEvent>)
where
F: FnMut(&mut crate::net::RunContext<P, L>, &NetEvent<P>),
{
let seed = self.seed;
let mut recorder = TraceRecorder::new(seed);
let stats = self.network.run(|ctx, event| {
let trace_event = match &event {
NetEvent::Deliver(msg) => NetTraceEvent::Delivered {
message_id: msg.id.as_u64(),
src: msg.src.as_u32(),
dst: msg.dst.as_u32(),
},
NetEvent::Drop { message, reason } => NetTraceEvent::Dropped {
message_id: message.id.as_u64(),
src: message.src.as_u32(),
dst: message.dst.as_u32(),
reason: (*reason).into(),
},
};
recorder.record(ctx.now(), trace_event);
handler(ctx, &event);
});
let mut trace = recorder.finish();
trace.final_time_ns = stats.final_time.as_nanos();
(stats, trace)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Duration;
use crate::net::TopologyBuilder;
#[test]
fn traced_network_basic() {
let topo = TopologyBuilder::new(3)
.link(0u32, 1u32, Duration::from_millis(10))
.link(1u32, 2u32, Duration::from_millis(10))
.build();
let mut net = TracedNetwork::new(topo, 42);
net.send(NodeId(0), NodeId(2), "hello");
let (stats, trace) = net.run_traced(|_ctx, _event| {});
assert_eq!(stats.messages_delivered, 1);
assert_eq!(trace.seed, 42);
assert_eq!(trace.len(), 1);
let event = &trace.get(0).unwrap().event;
assert!(matches!(
event,
NetTraceEvent::Delivered { src: 0, dst: 2, .. }
));
}
#[test]
fn traced_network_dropped() {
let topo = TopologyBuilder::new(3)
.link(0u32, 1u32, Duration::from_millis(10))
.build();
let mut net = TracedNetwork::new(topo, 42);
net.send(NodeId(0), NodeId(2), "unreachable");
let (stats, trace) = net.run_traced(|_ctx, _event| {});
assert_eq!(stats.messages_dropped, 1);
assert_eq!(trace.len(), 1);
let event = &trace.get(0).unwrap().event;
assert!(matches!(
event,
NetTraceEvent::Dropped {
reason: NetTraceDropReason::NoRoute,
..
}
));
}
#[test]
fn trace_comparison() {
fn run_sim() -> Trace<NetTraceEvent> {
let topo = TopologyBuilder::new(3)
.link(0u32, 1u32, Duration::from_millis(10))
.link(1u32, 2u32, Duration::from_millis(10))
.build();
let mut net = TracedNetwork::new(topo, 42);
net.send(NodeId(0), NodeId(1), ());
net.send(NodeId(1), NodeId(2), ());
let (_stats, trace) = net.run_traced(|_ctx, _event| {});
trace
}
let trace1 = run_sim();
let trace2 = run_sim();
assert!(trace1.compare(&trace2).is_ok());
}
#[cfg(feature = "serde")]
#[test]
fn trace_json_export() {
let topo = TopologyBuilder::new(2)
.link(0u32, 1u32, Duration::from_millis(10))
.build();
let mut net = TracedNetwork::new(topo, 42);
net.send(NodeId(0), NodeId(1), "test");
let (_stats, trace) = net.run_traced(|_ctx, _event| {});
let json = trace.to_json().unwrap();
let parsed: Trace<NetTraceEvent> = Trace::from_json(&json).unwrap();
assert!(trace.compare(&parsed).is_ok());
}
}