use std::{
collections::{HashMap, HashSet},
fmt::{Debug, Display},
io,
net::IpAddr,
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
};
use rtnetlink::{LinkBridge, LinkUnspec, LinkVeth};
use tokio::{
sync::{
mpsc,
oneshot::{self},
},
task::JoinHandle,
};
use tracing::Instrument as _;
use crate::tc::requests::configure_htb_class;
use crate::tc::requests::configure_tbf;
use crate::tc::requests::{configure_flower_filter, configure_netem, install_htb_root};
use crate::{
dynch::DynFuture,
ip::{IpAddrExt as _, Subnet},
namespace::{self, NetworkNamespace},
tc::impairment::LinkImpairment,
wrappers,
};
static PEER_ID_NEXT: AtomicUsize = AtomicUsize::new(1);
#[inline]
pub fn next_peer_id() -> PeerId {
PEER_ID_NEXT.load(Ordering::Relaxed)
}
pub type PeerId = usize;
pub const NAMESPACE_PREFIX: &str = "lem";
pub const LINK_PREFIX: &str = "msg-veth";
pub trait PeerIdExt: Display + Copy {
fn veth_address(self, subnet: Subnet) -> IpAddr;
fn veth_name(self) -> String {
format!("{LINK_PREFIX}{self}")
}
fn veth_br_name(self) -> String {
format!("{}-br", self.veth_name())
}
}
impl PeerIdExt for PeerId {
fn veth_address(self, subnet: Subnet) -> IpAddr {
IpAddr::from_bits(subnet.network_address.to_bits().saturating_add(self as u128))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct Link(pub PeerId, pub PeerId);
impl Link {
#[inline]
pub fn new(source: impl Into<PeerId>, destination: impl Into<PeerId>) -> Self {
Link(source.into(), destination.into())
}
#[inline]
pub fn source(&self) -> PeerId {
self.0
}
#[inline]
pub fn destination(&self) -> PeerId {
self.1
}
}
impl Display for Link {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "({} → {})", self.0, self.1)
}
}
#[derive(Debug, Default)]
struct PeerTcState {
htb_installed: bool,
configured_destinations: HashSet<PeerId>,
}
impl PeerTcState {
fn has_impairment_to(&self, dest: PeerId) -> bool {
self.configured_destinations.contains(&dest)
}
fn mark_configured(&mut self, dest: PeerId) {
self.configured_destinations.insert(dest);
}
}
pub type PeerMap = HashMap<PeerId, Peer<PeerContext>>;
type TcStateMap = HashMap<PeerId, PeerTcState>;
#[derive(Debug)]
pub struct Peer<Ctx = ()> {
pub id: PeerId,
pub namespace: NetworkNamespace<Ctx>,
}
impl Peer {
pub fn new<Ctx>(id: PeerId, namespace: NetworkNamespace<Ctx>) -> Peer<Ctx> {
Peer { id, namespace }
}
}
pub(crate) type RuntimeFactory = Box<dyn FnOnce() -> tokio::runtime::Runtime + Send>;
pub fn default_runtime_factory() -> RuntimeFactory {
let peer_id = next_peer_id();
Box::new(move || {
static WORKER_ID: AtomicUsize = AtomicUsize::new(0);
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name_fn(move || {
let id = WORKER_ID.fetch_add(1, Ordering::Relaxed);
format!("peer-{peer_id}-w{id}")
})
.build()
.expect("to create runtime")
})
}
#[derive(Debug)]
pub struct CommonContext {
handle: rtnetlink::Handle,
_connection_task: tokio::task::JoinHandle<()>,
}
#[derive(Debug)]
pub struct PeerContext {
pub handle: rtnetlink::Handle,
_connection_task: tokio::task::JoinHandle<()>,
pub subnet: Subnet,
pub peer_id: PeerId,
}
pub struct PeerOptions {
runtime_factory: RuntimeFactory,
}
impl Default for PeerOptions {
fn default() -> Self {
Self { runtime_factory: default_runtime_factory() }
}
}
impl PeerOptions {
pub fn with_runtime(
runtime_factory: impl FnOnce() -> tokio::runtime::Runtime + Send + 'static,
) -> Self {
Self { runtime_factory: Box::new(runtime_factory) }
}
}
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("io error: {0}")]
Io(#[from] io::Error),
#[error("peer not found: {0}")]
PeerNotFound(PeerId),
#[error("too many peers")]
TooManyPeers,
#[error("rtnetlink error: {0}")]
RtNetlink(#[from] rtnetlink::Error),
#[error("network namespace error: {0}")]
Namespace(#[from] namespace::Error),
#[error("tokio join error: {0}")]
Join(#[from] tokio::task::JoinError),
#[error("thread join error: {0:?}")]
Thread(Box<dyn std::any::Any + Send + 'static>),
#[error("task in namespace failed: {0}")]
NamespaceTask(#[from] namespace::TaskError<rtnetlink::Error>),
#[error("failed to send task: {0}")]
SendError(#[from] mpsc::error::SendError<()>),
#[error("failed to receive task result: {0}")]
RecvError(#[from] oneshot::error::RecvError),
}
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug)]
pub struct Network {
sim_id: u16,
peers: PeerMap,
tc_state: TcStateMap,
subnet: Subnet,
network_hub_namespace: NetworkNamespace<CommonContext>,
rtnetlink_handle: rtnetlink::Handle,
_rtnetlink_socket_task: JoinHandle<()>,
}
impl Network {
const BRIDGE_NAME: &str = "linkem-br0";
pub async fn new(subnet: Subnet) -> Result<Self> {
let sim_id = std::process::id() as u16;
let (connection, handle, _) = rtnetlink::new_connection()?;
let _task = tokio::spawn(connection);
let make_ctx = move || {
let (handle, _connection_task) = rtnetlink::new_connection()
.map(|(connection, handle, _)| (handle, tokio::task::spawn(connection)))
.unwrap();
CommonContext { handle, _connection_task }
};
let namespace_hub = NetworkNamespace::new(
Self::hub_namespace_name(sim_id),
default_runtime_factory(),
make_ctx,
)
.await?;
let fd = namespace_hub.fd();
let network = Self {
sim_id,
peers: PeerMap::default(),
tc_state: TcStateMap::default(),
subnet,
network_hub_namespace: namespace_hub,
rtnetlink_handle: handle,
_rtnetlink_socket_task: _task,
};
network
.rtnetlink_handle
.link()
.add(LinkBridge::new(Self::BRIDGE_NAME).up().setns_by_fd(fd).build())
.execute()
.await?;
Ok(network)
}
fn hub_namespace_name(sim_id: u16) -> String {
format!("{NAMESPACE_PREFIX}-{sim_id:04x}-hub")
}
fn peer_namespace_name(&self, peer_id: PeerId) -> String {
format!("{NAMESPACE_PREFIX}-{:04x}-{peer_id}", self.sim_id)
}
pub async fn add_peer_with_options(&mut self, options: PeerOptions) -> Result<PeerId> {
let peer_id = PEER_ID_NEXT.load(Ordering::Relaxed);
let namespace_name = self.peer_namespace_name(peer_id);
let veth_name = Arc::new(peer_id.veth_name());
let veth_br_name = Arc::new(peer_id.veth_br_name());
let _span =
tracing::debug_span!("add_peer", ?peer_id, %namespace_name, %veth_name, %veth_br_name)
.entered();
let subnet = self.subnet;
let make_ctx = move || {
let (handle, _connection_task) = rtnetlink::new_connection()
.map(|(connection, handle, _)| (handle, tokio::task::spawn(connection)))
.expect("to create rtnetlink socket");
PeerContext { handle, _connection_task, subnet, peer_id }
};
let network_namespace =
NetworkNamespace::new(namespace_name.clone(), options.runtime_factory, make_ctx)
.await?;
self.rtnetlink_handle
.link()
.add(LinkVeth::new(&veth_name, &veth_br_name).build())
.execute()
.await
.inspect_err(|e| tracing::debug!(?e, "failed to add link"))?;
self.rtnetlink_handle
.link()
.set(LinkUnspec::new_with_name(&veth_name).setns_by_fd(network_namespace.fd()).build())
.execute()
.await
.inspect_err(|e| tracing::debug!(?e, "failed to set device in namespace"))?;
self.rtnetlink_handle
.link()
.set(
LinkUnspec::new_with_name(&veth_br_name)
.setns_by_fd(self.network_hub_namespace.fd())
.build(),
)
.execute()
.await
.inspect_err(|e| tracing::debug!(?e, "failed to set link end in hub namespace"))?;
let v = veth_name.clone();
network_namespace
.task_sender
.submit(|ctx: &mut PeerContext| {
Box::pin(async move {
let address = ctx.peer_id.veth_address(ctx.subnet);
let mask = ctx.subnet.netmask;
tracing::debug!(?address, ?mask, dev = ?v, "adding address to device");
let index = wrappers::if_nametoindex(&v).expect("to find device").get();
ctx.handle
.link()
.set(LinkUnspec::new_with_name(&veth_name).up().build())
.execute()
.await?;
ctx.handle.address().add(index, address, mask).execute().await?;
ctx.handle
.link()
.set(LinkUnspec::new_with_name("lo").up().build())
.execute()
.await
})
})
.await?
.receive()
.await??;
self.network_hub_namespace
.task_sender
.submit(|ctx| {
Box::pin(async move {
let index =
wrappers::if_nametoindex(Self::BRIDGE_NAME).expect("to find bridge").get();
ctx.handle
.link()
.set(LinkUnspec::new_with_name(&veth_br_name).controller(index).build())
.execute()
.await?;
ctx.handle
.link()
.set(LinkUnspec::new_with_name(&veth_br_name).up().build())
.execute()
.await
})
})
.await?
.receive()
.await??;
let peer = Peer::new(peer_id, network_namespace);
self.peers.insert(peer_id, peer);
self.tc_state.insert(peer_id, PeerTcState::default());
PEER_ID_NEXT.store(peer_id + 1, Ordering::Relaxed);
Ok(peer_id)
}
pub async fn add_peer(&mut self) -> Result<PeerId> {
self.add_peer_with_options(PeerOptions::default()).await
}
pub async fn run_in_namespace<T, F>(
&self,
peer_id: PeerId,
fut: F,
) -> Result<impl Future<Output = std::result::Result<T, oneshot::error::RecvError>>>
where
T: Send + 'static,
F: for<'a> FnOnce(&'a mut PeerContext) -> DynFuture<'a, T> + Send + 'static,
{
let Some(peer) = self.peers.get(&peer_id) else {
return Err(Error::PeerNotFound(peer_id));
};
let rx = peer.namespace.task_sender.submit(fut).await?.receive();
Ok(rx)
}
pub async fn apply_impairment(&mut self, link: Link, impairment: LinkImpairment) -> Result<()> {
let (src_peer, dst_peer) =
match self.peers.get_disjoint_mut([&link.source(), &link.destination()]) {
[Some(p1), Some(p2)] => (p1, p2),
[None, Some(_)] => return Err(Error::PeerNotFound(link.source())),
[Some(_), None] => return Err(Error::PeerNotFound(link.destination())),
[None, None] => return Err(Error::PeerNotFound(link.source())),
};
let tc_state = self.tc_state.entry(link.source()).or_default();
let is_replacement = tc_state.has_impairment_to(link.destination());
let dst_peer_id = dst_peer.id;
let htb_already_installed = tc_state.htb_installed;
let subnet = self.subnet;
src_peer
.namespace
.task_sender
.submit(move |ctx: &mut PeerContext| {
let span = tracing::debug_span!(
"apply_impairment",
link = %link,
?impairment,
)
.entered();
Box::pin(
async move {
let if_index = wrappers::if_nametoindex(&ctx.peer_id.veth_name())
.expect("to find dev")
.get() as i32;
if !htb_already_installed {
install_htb_root(&mut ctx.handle, if_index).await?;
}
configure_htb_class(&mut ctx.handle, if_index, dst_peer_id, is_replacement)
.await?;
let netem_parent = configure_tbf(
&mut ctx.handle,
if_index,
dst_peer_id,
&impairment,
is_replacement,
)
.await?;
configure_netem(
&mut ctx.handle,
if_index,
dst_peer_id,
netem_parent,
&impairment,
is_replacement,
)
.await?;
if !is_replacement {
let dst_ip = dst_peer_id.veth_address(subnet);
configure_flower_filter(&mut ctx.handle, if_index, dst_peer_id, dst_ip)
.await?;
}
tracing::debug!(is_replacement, "impairment configuration complete");
Ok::<_, rtnetlink::Error>(())
}
.instrument(span.clone()),
)
})
.await?
.receive()
.await??;
let tc_state = self.tc_state.get_mut(&link.source()).unwrap();
tc_state.htb_installed = true;
tc_state.mark_configured(link.destination());
Ok(())
}
}
#[cfg(test)]
mod linkem_network {
use std::{
net::{Ipv4Addr, SocketAddr},
time::{Duration, Instant},
};
use futures::StreamExt;
use msg_socket::{RepSocket, ReqSocket};
use msg_transport::tcp::Tcp;
use crate::{
ip::Subnet,
network::{Link, Network, PeerIdExt},
tc::impairment::LinkImpairment,
};
#[tokio::test(flavor = "multi_thread")]
async fn create_network_works() {
let _ = tracing_subscriber::fmt::try_init();
let subnet = Subnet::new(Ipv4Addr::new(11, 0, 0, 0).into(), 16);
let network = Network::new(subnet).await.unwrap();
let path = format!("/run/netns/{}", Network::hub_namespace_name(network.sim_id));
let exists = std::fs::exists(path.clone()).unwrap();
assert!(exists, "netns file doesn't exists at path {path}");
}
#[tokio::test(flavor = "multi_thread")]
async fn add_peer_works() {
let _ = tracing_subscriber::fmt::try_init();
let subnet = Subnet::new(Ipv4Addr::new(12, 0, 0, 0).into(), 16);
let mut network = Network::new(subnet).await.unwrap();
let _peer_id = network.add_peer().await.unwrap();
let _peer_id = network.add_peer().await.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn simulate_reqrep_works() {
let _ = tracing_subscriber::fmt::try_init();
let subnet = Subnet::new(Ipv4Addr::new(13, 0, 0, 0).into(), 16);
let mut network = Network::new(subnet).await.unwrap();
let peer_1 = network.add_peer().await.unwrap();
let peer_2 = network.add_peer().await.unwrap();
let address_2 = peer_2.veth_address(subnet);
let port_2 = 12345;
let task1 = network
.run_in_namespace(peer_2, move |_| {
Box::pin(async move {
let mut rep_socket = RepSocket::new(Tcp::default());
rep_socket.bind(SocketAddr::new(address_2, port_2)).await.unwrap();
if let Some(request) = rep_socket.next().await {
let msg = request.msg().clone();
request.respond(msg).unwrap();
}
})
})
.await
.unwrap();
let task2 = network
.run_in_namespace(peer_1, move |_| {
Box::pin(async move {
let mut req_socket = ReqSocket::new(Tcp::default());
req_socket.connect_sync(SocketAddr::new(address_2, port_2));
let response = req_socket.request("hello".into()).await.unwrap();
assert_eq!(response, "hello");
})
})
.await
.unwrap();
tokio::try_join!(task1, task2).unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn apply_impairment_works() {
let _ = tracing_subscriber::fmt::try_init();
let subnet = Subnet::new(Ipv4Addr::new(12, 0, 0, 0).into(), 16);
let mut network = Network::new(subnet).await.unwrap();
let peer_1 = network.add_peer().await.unwrap();
let peer_2 = network.add_peer().await.unwrap();
let _peer_3 = network.add_peer().await.unwrap();
let impairment = LinkImpairment {
loss: 50.0,
jitter: 100_000,
latency: 1_000_000,
duplicate: 50.0,
..Default::default()
};
network.apply_impairment(Link::new(peer_1, peer_2), impairment).await.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn simulate_reqrep_netem_delay_works() {
let _ = tracing_subscriber::fmt::try_init();
let subnet = Subnet::new(Ipv4Addr::new(14, 0, 0, 0).into(), 16);
let mut network = Network::new(subnet).await.unwrap();
let peer_1 = network.add_peer().await.unwrap();
let peer_2 = network.add_peer().await.unwrap();
let sec_in_us = 1_000_000;
let impairment = LinkImpairment { latency: sec_in_us, ..Default::default() };
network.apply_impairment(Link::new(peer_1, peer_2), impairment).await.unwrap();
let address_2 = peer_2.veth_address(subnet);
let port_2 = 12345;
let task1 = network
.run_in_namespace(peer_2, move |_ctx| {
Box::pin(async move {
let mut rep_socket = RepSocket::new(Tcp::default());
rep_socket.bind(SocketAddr::new(address_2, port_2)).await.unwrap();
tokio::time::timeout(Duration::from_micros((sec_in_us / 2).into()), async {
if let Some(request) = rep_socket.next().await {
let msg = request.msg().clone();
request.respond(msg).unwrap();
}
})
.await
.unwrap_err();
if let Some(request) = rep_socket.next().await {
let msg = request.msg().clone();
request.respond(msg).unwrap();
}
})
})
.await
.unwrap();
let task2 = network
.run_in_namespace(peer_1, move |_ctx| {
Box::pin(async move {
let mut req_socket = ReqSocket::new(Tcp::default());
req_socket.connect_sync(SocketAddr::new(address_2, port_2));
req_socket.request("hello".into()).await.unwrap();
})
})
.await
.unwrap();
tokio::try_join!(task1, task2).unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn per_destination_impairments_work() {
let _ = tracing_subscriber::fmt::try_init();
let subnet = Subnet::new(Ipv4Addr::new(15, 0, 0, 0).into(), 16);
let mut network = Network::new(subnet).await.unwrap();
let peer_1 = network.add_peer().await.unwrap();
let peer_2 = network.add_peer().await.unwrap();
let peer_3 = network.add_peer().await.unwrap();
let fast_latency_us = 100_000;
network
.apply_impairment(
Link::new(peer_1, peer_2),
LinkImpairment { latency: fast_latency_us, ..Default::default() },
)
.await
.unwrap();
let slow_latency_us = 500_000;
network
.apply_impairment(
Link::new(peer_1, peer_3),
LinkImpairment { latency: slow_latency_us, ..Default::default() },
)
.await
.unwrap();
let address_2 = peer_2.veth_address(subnet);
let address_3 = peer_3.veth_address(subnet);
let port = 12345;
let server_2 = network
.run_in_namespace(peer_2, move |_| {
Box::pin(async move {
let mut rep_socket = RepSocket::new(Tcp::default());
rep_socket.bind(SocketAddr::new(address_2, port)).await.unwrap();
if let Some(request) = rep_socket.next().await {
request.respond("peer2".into()).unwrap();
}
})
})
.await
.unwrap();
let server_3 = network
.run_in_namespace(peer_3, move |_| {
Box::pin(async move {
let mut rep_socket = RepSocket::new(Tcp::default());
rep_socket.bind(SocketAddr::new(address_3, port)).await.unwrap();
if let Some(request) = rep_socket.next().await {
request.respond("peer3".into()).unwrap();
}
})
})
.await
.unwrap();
let client = network
.run_in_namespace(peer_1, move |_| {
Box::pin(async move {
let mut req_socket_2 = ReqSocket::new(Tcp::default());
let mut req_socket_3 = ReqSocket::new(Tcp::default());
req_socket_2.connect_sync(SocketAddr::new(address_2, port));
req_socket_3.connect_sync(SocketAddr::new(address_3, port));
tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
let start = Instant::now();
let resp = req_socket_2.request("ping".into()).await.unwrap();
let rtt_2 = start.elapsed();
assert_eq!(resp.as_ref(), b"peer2");
let start = Instant::now();
let resp = req_socket_3.request("ping".into()).await.unwrap();
let rtt_3 = start.elapsed();
assert_eq!(resp.as_ref(), b"peer3");
tracing::info!(?rtt_2, ?rtt_3, "measured RTTs");
assert!(
rtt_3 > rtt_2 * 2,
"RTT to peer 3 ({:?}) should be at least 2x RTT to peer 2 ({:?})",
rtt_3,
rtt_2
);
})
})
.await
.unwrap();
tokio::try_join!(server_2, server_3, client).unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn bandwidth_limiting_works() {
let _ = tracing_subscriber::fmt::try_init();
let subnet = Subnet::new(Ipv4Addr::new(16, 0, 0, 0).into(), 16);
let mut network = Network::new(subnet).await.unwrap();
let peer_1 = network.add_peer().await.unwrap();
let peer_2 = network.add_peer().await.unwrap();
let bandwidth_mbit = 1.0;
network
.apply_impairment(
Link::new(peer_1, peer_2),
LinkImpairment { bandwidth_mbit_s: Some(bandwidth_mbit), ..Default::default() },
)
.await
.unwrap();
let address_2 = peer_2.veth_address(subnet);
let port = 12346;
let data_size = 125_000;
let server = network
.run_in_namespace(peer_2, move |_| {
Box::pin(async move {
let mut rep_socket = RepSocket::new(Tcp::default());
rep_socket.bind(SocketAddr::new(address_2, port)).await.unwrap();
if let Some(request) = rep_socket.next().await {
request.respond("ok".into()).unwrap();
}
})
})
.await
.unwrap();
let client = network
.run_in_namespace(peer_1, move |_| {
Box::pin(async move {
let mut req_socket = ReqSocket::new(Tcp::default());
req_socket.connect_sync(SocketAddr::new(address_2, port));
let payload = vec![0u8; data_size];
let start = Instant::now();
let _resp = req_socket.request(payload.into()).await.unwrap();
let elapsed = start.elapsed();
tracing::info!(?elapsed, data_size, "transfer completed");
assert!(
elapsed > Duration::from_millis(500),
"Transfer of {} bytes took only {:?}, expected > 500ms at 1 Mbit/s",
data_size,
elapsed
);
})
})
.await
.unwrap();
tokio::try_join!(server, client).unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn bandwidth_and_latency_combined() {
let _ = tracing_subscriber::fmt::try_init();
let subnet = Subnet::new(Ipv4Addr::new(17, 0, 0, 0).into(), 16);
let mut network = Network::new(subnet).await.unwrap();
let peer_1 = network.add_peer().await.unwrap();
let peer_2 = network.add_peer().await.unwrap();
let latency_us = 100_000; let bandwidth_mbit = 10.0; network
.apply_impairment(
Link::new(peer_1, peer_2),
LinkImpairment {
latency: latency_us,
bandwidth_mbit_s: Some(bandwidth_mbit),
..Default::default()
},
)
.await
.unwrap();
let address_2 = peer_2.veth_address(subnet);
let port = 12347;
let server = network
.run_in_namespace(peer_2, move |_| {
Box::pin(async move {
let mut rep_socket = RepSocket::new(Tcp::default());
rep_socket.bind(SocketAddr::new(address_2, port)).await.unwrap();
if let Some(request) = rep_socket.next().await {
request.respond("pong".into()).unwrap();
}
})
})
.await
.unwrap();
let client = network
.run_in_namespace(peer_1, move |_| {
Box::pin(async move {
let mut req_socket = ReqSocket::new(Tcp::default());
req_socket.connect_sync(SocketAddr::new(address_2, port));
let start = Instant::now();
let resp = req_socket.request("ping".into()).await.unwrap();
let elapsed = start.elapsed();
assert_eq!(resp.as_ref(), b"pong");
tracing::info!(?elapsed, "ping-pong completed");
assert!(
elapsed > Duration::from_millis(150),
"RTT {:?} should be > 150ms with 100ms one-way latency",
elapsed
);
})
})
.await
.unwrap();
tokio::try_join!(server, client).unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn impairment_replacement_works() {
let _ = tracing_subscriber::fmt::try_init();
let subnet = Subnet::new(Ipv4Addr::new(18, 0, 0, 0).into(), 16);
let mut network = Network::new(subnet).await.unwrap();
let peer_1 = network.add_peer().await.unwrap();
let peer_2 = network.add_peer().await.unwrap();
let impairment_1 = LinkImpairment { latency: 100_000, ..Default::default() };
network.apply_impairment(Link::new(peer_1, peer_2), impairment_1).await.unwrap();
let impairment_2 = LinkImpairment { latency: 200_000, ..Default::default() };
network
.apply_impairment(Link::new(peer_1, peer_2), impairment_2)
.await
.expect("Replacement should succeed");
let impairment_3 =
LinkImpairment { latency: 50_000, bandwidth_mbit_s: Some(10.0), ..Default::default() };
network
.apply_impairment(Link::new(peer_1, peer_2), impairment_3)
.await
.expect("Replacement with bandwidth should succeed");
}
#[tokio::test(flavor = "multi_thread")]
async fn bidirectional_impairments() {
let _ = tracing_subscriber::fmt::try_init();
let subnet = Subnet::new(Ipv4Addr::new(19, 0, 0, 0).into(), 16);
let mut network = Network::new(subnet).await.unwrap();
let peer_1 = network.add_peer().await.unwrap();
let peer_2 = network.add_peer().await.unwrap();
let impairment_1_to_2 = LinkImpairment { latency: 50_000, ..Default::default() };
let impairment_2_to_1 =
LinkImpairment { latency: 200_000, loss: 5.0, ..Default::default() };
network.apply_impairment(Link::new(peer_1, peer_2), impairment_1_to_2).await.unwrap();
network.apply_impairment(Link::new(peer_2, peer_1), impairment_2_to_1).await.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn custom_burst_size() {
let _ = tracing_subscriber::fmt::try_init();
let subnet = Subnet::new(Ipv4Addr::new(20, 0, 0, 0).into(), 16);
let mut network = Network::new(subnet).await.unwrap();
let peer_1 = network.add_peer().await.unwrap();
let peer_2 = network.add_peer().await.unwrap();
network
.apply_impairment(
Link::new(peer_1, peer_2),
LinkImpairment {
bandwidth_mbit_s: Some(10.0),
burst_kib: Some(64), ..Default::default()
},
)
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
#[ignore = "flaky, depends on kernel version"]
async fn netem_duplicate_prevents_additional_netem_qdiscs() {
let _ = tracing_subscriber::fmt::try_init();
let subnet = Subnet::new(Ipv4Addr::new(24, 0, 0, 0).into(), 16);
let mut network = Network::new(subnet).await.unwrap();
let peer_1 = network.add_peer().await.unwrap();
let peer_2 = network.add_peer().await.unwrap();
let peer_3 = network.add_peer().await.unwrap();
let with_dup = LinkImpairment { latency: 20_000, duplicate: 0.02, ..Default::default() };
network.apply_impairment(Link::new(peer_1, peer_2), with_dup).await.unwrap();
let no_dup = LinkImpairment {
latency: 10_000,
..Default::default()
};
let result = network.apply_impairment(Link::new(peer_1, peer_3), no_dup).await;
assert!(
result.is_err(),
"Expected failure: kernel prevents additional netem qdiscs when one has duplicate > 0"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn packet_duplication_works() {
let _ = tracing_subscriber::fmt::try_init();
let subnet = Subnet::new(Ipv4Addr::new(21, 0, 0, 0).into(), 16);
let mut network = Network::new(subnet).await.unwrap();
let peer_1 = network.add_peer().await.unwrap();
let peer_2 = network.add_peer().await.unwrap();
let impairment = LinkImpairment { duplicate: 100.0, ..Default::default() };
network.apply_impairment(Link::new(peer_1, peer_2), impairment).await.unwrap();
let address_2 = peer_2.veth_address(subnet);
let port = 9999;
let receiver = network
.run_in_namespace(peer_2, move |_| {
Box::pin(async move {
let sock = tokio::net::UdpSocket::bind(SocketAddr::new(address_2, port))
.await
.unwrap();
let mut received_count = 0;
let mut buf = [0u8; 64];
loop {
match tokio::time::timeout(Duration::from_millis(500), sock.recv(&mut buf))
.await
{
Ok(Ok(n)) => {
let msg = &buf[..n];
tracing::info!(?msg, received_count, "received packet");
assert_eq!(msg, b"ping", "unexpected message content");
received_count += 1;
}
Ok(Err(e)) => panic!("recv error: {}", e),
Err(_) => break, }
}
received_count
})
})
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let sender = network
.run_in_namespace(peer_1, move |_| {
Box::pin(async move {
let sock = tokio::net::UdpSocket::bind("0.0.0.0:0").await.unwrap();
sock.send_to(b"ping", SocketAddr::new(address_2, port)).await.unwrap();
tracing::info!("sent single packet");
})
})
.await
.unwrap();
sender.await.unwrap();
let received_count = receiver.await.unwrap();
assert_eq!(
received_count, 2,
"Expected 2 packets (original + duplicate) but received {}",
received_count
);
}
}