mod config;
mod loss;
pub use config::{Bitrate, DataSize, GilbertElliot, Link};
pub use config::{LossModel, NetemConfig, Probability, RandomLoss};
use std::cmp::{Ordering, Reverse};
use std::collections::BinaryHeap;
use std::time::{Duration, Instant};
use fastrand::Rng;
use loss::LossState;
pub struct Netem<T> {
config: NetemConfig,
rng: Rng,
loss_state: LossState,
queue: BinaryHeap<Reverse<QueuedPacket<T>>>,
last_delay: Duration,
rate_virtual_time: Option<Instant>,
reorder_counter: u32,
current_time: Option<Instant>,
packet_count: u64,
timeout_pending: bool,
last_send_at: Option<Instant>,
queued_bytes: usize,
}
impl<T: Clone + WithLen> Netem<T> {
pub fn new(config: NetemConfig) -> Self {
let rng = Rng::with_seed(config.seed);
let loss_state = LossState::new(&config.loss);
Self {
config,
rng,
loss_state,
queue: BinaryHeap::new(),
last_delay: Duration::ZERO,
rate_virtual_time: None,
reorder_counter: 0,
current_time: None,
packet_count: 0,
timeout_pending: false,
last_send_at: None,
queued_bytes: 0,
}
}
pub fn handle_input(&mut self, input: Input<T>) {
match input {
Input::Timeout(now) => {
self.progress_time(now);
self.timeout_pending = false;
}
Input::Packet(now, data) => {
self.progress_time(now);
self.process_packet(now, data);
}
}
}
fn progress_time(&mut self, now: Instant) {
if let Some(last_time) = self.current_time {
if now < last_time {
return;
}
}
self.current_time = Some(now);
}
pub fn poll_output(&mut self) -> Option<Output<T>> {
let now = self.current_time?;
if let Some(Reverse(packet)) = self.queue.peek() {
if packet.send_at <= now {
let Reverse(packet) = self.queue.pop().unwrap();
self.queued_bytes = self.queued_bytes.saturating_sub(packet.data.len());
return Some(Output::Packet(packet.data));
}
if !self.timeout_pending {
self.timeout_pending = true;
return Some(Output::Timeout(packet.send_at));
}
}
None
}
pub fn poll_timeout(&self) -> Instant {
self.queue
.peek()
.map(|Reverse(p)| p.send_at)
.unwrap_or_else(not_happening)
}
fn process_packet(&mut self, now: Instant, data: T) {
if self
.loss_state
.should_lose(&self.config.loss, &mut self.rng)
{
return; }
let should_duplicate = self.rng.f32() < self.config.duplicate.0;
self.enqueue_packet(now, data.clone());
if should_duplicate {
self.enqueue_packet(now, data);
}
}
fn enqueue_packet(&mut self, now: Instant, data: T) {
let delay = self.calculate_delay();
let mut send_at = now + delay;
let transmission_time = if let Some(link) = self.config.link {
let packet_size = DataSize::from(data.len());
let tx_time = packet_size / link.rate;
if let Some(virtual_time) = self.rate_virtual_time {
if virtual_time > send_at {
send_at = virtual_time;
}
}
if self.queued_bytes + data.len() > link.buffer.as_bytes_usize() {
return;
}
Some(tx_time)
} else {
None
};
let should_reorder = if let Some(gap) = self.config.reorder_gap {
self.reorder_counter += 1;
if self.reorder_counter >= gap {
self.reorder_counter = 0;
self.last_send_at.is_some() && self.packet_count > 0
} else {
false
}
} else {
false
};
let gap = self.config.reorder_gap.unwrap_or(1) as u64;
let packet_index;
if should_reorder {
send_at = self.last_send_at.unwrap();
packet_index = self.packet_count * gap - 1;
} else {
packet_index = (self.packet_count + 1) * gap;
if let Some(tx_time) = transmission_time {
self.rate_virtual_time = Some(send_at + tx_time);
}
self.last_send_at = Some(send_at);
}
self.packet_count += 1;
self.queued_bytes += data.len();
let packet = QueuedPacket {
send_at,
data,
packet_index,
};
self.queue.push(Reverse(packet));
self.timeout_pending = false;
}
fn calculate_delay(&mut self) -> Duration {
let base = self.config.latency;
let jitter = self.config.jitter;
if jitter.is_zero() {
return base;
}
let rho = self.config.delay_correlation.0;
let jitter_nanos = jitter.as_nanos() as f32;
let fresh_random = self.rng.f32() * 2.0 - 1.0;
let last_normalized = if self.last_delay >= base {
(self.last_delay - base).as_nanos() as f32 / jitter_nanos
} else {
-((base - self.last_delay).as_nanos() as f32 / jitter_nanos)
};
let jitter_factor = if rho == 0.0 {
fresh_random
} else {
fresh_random * (1.0 - rho) + last_normalized.clamp(-1.0, 1.0) * rho
};
let jitter_nanos = (jitter_factor * jitter_nanos) as i64;
let delay = if jitter_nanos >= 0 {
base + Duration::from_nanos(jitter_nanos as u64)
} else {
base.saturating_sub(Duration::from_nanos((-jitter_nanos) as u64))
};
self.last_delay = delay;
delay
}
pub fn queue_len(&self) -> usize {
self.queue.len()
}
pub fn is_empty(&self) -> bool {
self.queue.is_empty()
}
pub fn set_config(&mut self, config: NetemConfig) {
self.rng = Rng::with_seed(config.seed);
self.loss_state = LossState::new(&config.loss);
self.last_delay = Duration::ZERO;
self.last_send_at = None;
self.config = config;
}
}
pub trait WithLen {
fn len(&self) -> usize;
fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl<T: AsRef<[u8]>> WithLen for T {
fn len(&self) -> usize {
self.as_ref().len()
}
}
#[derive(Debug)]
pub enum Input<T> {
Timeout(Instant),
Packet(Instant, T),
}
#[derive(Debug)]
pub enum Output<T> {
Timeout(Instant),
Packet(T),
}
#[derive(Debug)]
struct QueuedPacket<T> {
send_at: Instant,
data: T,
packet_index: u64,
}
fn not_happening() -> Instant {
Instant::now() + Duration::from_secs(3600 * 24 * 365 * 10)
}
impl<T> PartialEq for QueuedPacket<T> {
fn eq(&self, other: &Self) -> bool {
self.send_at == other.send_at && self.packet_index == other.packet_index
}
}
impl<T> Eq for QueuedPacket<T> {}
impl<T> PartialOrd for QueuedPacket<T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<T> Ord for QueuedPacket<T> {
fn cmp(&self, other: &Self) -> Ordering {
self.packet_index
.cmp(&other.packet_index)
.then(self.send_at.cmp(&other.send_at))
}
}
#[cfg(test)]
mod tests {
use super::*;
fn instant() -> Instant {
Instant::now()
}
#[test]
fn test_passthrough() {
let config = NetemConfig::default();
let mut netem: Netem<Vec<u8>> = Netem::new(config);
let now = instant();
netem.handle_input(Input::Packet(now, vec![1, 2, 3]));
let output = netem.poll_output();
assert!(matches!(output, Some(Output::Packet(data)) if data == vec![1, 2, 3]));
assert!(netem.poll_output().is_none());
}
#[test]
fn test_latency() {
let config = NetemConfig::new()
.latency(Duration::from_millis(100))
.seed(42);
let mut netem: Netem<Vec<u8>> = Netem::new(config);
let now = instant();
netem.handle_input(Input::Packet(now, vec![1, 2, 3]));
let output = netem.poll_output();
assert!(matches!(output, Some(Output::Timeout(t)) if t > now));
let later = now + Duration::from_millis(100);
netem.handle_input(Input::Timeout(later));
let output = netem.poll_output();
assert!(matches!(output, Some(Output::Packet(data)) if data == vec![1, 2, 3]));
}
#[test]
fn test_total_loss() {
let config = NetemConfig::new()
.loss(RandomLoss::new(Probability::ONE))
.seed(42);
let mut netem: Netem<Vec<u8>> = Netem::new(config);
let now = instant();
netem.handle_input(Input::Packet(now, vec![1, 2, 3]));
assert!(netem.poll_output().is_none());
assert!(netem.is_empty());
}
#[test]
fn test_duplication() {
let config = NetemConfig::new().duplicate(Probability::ONE).seed(42);
let mut netem: Netem<Vec<u8>> = Netem::new(config);
let now = instant();
netem.handle_input(Input::Packet(now, vec![1, 2, 3]));
assert!(matches!(netem.poll_output(), Some(Output::Packet(_))));
assert!(matches!(netem.poll_output(), Some(Output::Packet(_))));
assert!(netem.poll_output().is_none());
}
#[test]
fn test_rate_limiting() {
let config = NetemConfig::new()
.link(Bitrate::kbps(8), DataSize::kbytes(10))
.seed(42);
let mut netem: Netem<Vec<u8>> = Netem::new(config);
let now = instant();
netem.handle_input(Input::Packet(now, vec![0; 100]));
let output = netem.poll_output();
assert!(matches!(output, Some(Output::Packet(_))));
netem.handle_input(Input::Packet(now, vec![0; 100]));
let output = netem.poll_output();
match output {
Some(Output::Timeout(t)) => {
let delay = t - now;
assert!(delay >= Duration::from_millis(90));
assert!(delay <= Duration::from_millis(110));
}
_ => panic!("Expected timeout, got {:?}", output),
}
}
#[test]
fn test_reordering() {
let config = NetemConfig::new()
.latency(Duration::from_millis(100))
.reorder_gap(3) .seed(42);
let mut netem: Netem<Vec<u8>> = Netem::new(config);
let now = instant();
netem.handle_input(Input::Packet(now, vec![1]));
netem.handle_input(Input::Packet(now, vec![2]));
netem.handle_input(Input::Packet(now, vec![3]));
let output = netem.poll_output();
assert!(
matches!(output, Some(Output::Timeout(t)) if t == now + Duration::from_millis(100))
);
let later = now + Duration::from_millis(100);
netem.handle_input(Input::Timeout(later));
let output = netem.poll_output();
assert!(matches!(output, Some(Output::Packet(data)) if data == vec![1]));
let output = netem.poll_output();
assert!(matches!(output, Some(Output::Packet(data)) if data == vec![3]));
let output = netem.poll_output();
assert!(matches!(output, Some(Output::Packet(data)) if data == vec![2]));
}
#[test]
fn test_reordering_with_rate_limiting() {
let config = NetemConfig::new()
.link(Bitrate::kbps(8), DataSize::kbytes(10))
.reorder_gap(3) .seed(42);
let mut netem: Netem<Vec<u8>> = Netem::new(config);
let now = instant();
netem.handle_input(Input::Packet(now, vec![0; 100]));
netem.handle_input(Input::Packet(now, vec![0; 100]));
netem.handle_input(Input::Packet(now, vec![0; 100]));
let output = netem.poll_output();
assert!(matches!(output, Some(Output::Packet(_))));
let output = netem.poll_output();
match output {
Some(Output::Timeout(t)) => {
let delay = t - now;
assert!(
delay >= Duration::from_millis(90),
"Reordered packet should respect rate limiting, got delay {:?}",
delay
);
}
_ => panic!(
"Expected timeout for rate-limited reordered packet, got {:?}",
output
),
}
}
#[test]
fn test_gilbert_elliot_preset() {
let config = NetemConfig::new()
.loss(LossModel::GilbertElliot(GilbertElliot::wifi()))
.seed(42);
let mut netem: Netem<Vec<u8>> = Netem::new(config);
let now = instant();
let mut received = 0;
let total = 1000;
for i in 0..total {
netem.handle_input(Input::Packet(now, vec![i as u8]));
while let Some(output) = netem.poll_output() {
if matches!(output, Output::Packet(_)) {
received += 1;
}
}
}
let loss_ratio = 1.0 - (received as f32 / total as f32);
assert!(
(0.005..=0.05).contains(&loss_ratio),
"Loss ratio: {}",
loss_ratio
);
}
#[test]
fn test_buffer_overflow_drops_packets() {
let config = NetemConfig::new()
.link(Bitrate::kbps(80), DataSize::bytes(100))
.seed(42);
let mut netem: Netem<Vec<u8>> = Netem::new(config);
let now = instant();
for i in 0..5 {
netem.handle_input(Input::Packet(now, vec![i; 100]));
}
let mut received = 0;
while let Some(output) = netem.poll_output() {
match output {
Output::Packet(_) => received += 1,
Output::Timeout(t) => {
netem.handle_input(Input::Timeout(t));
}
}
}
assert!(
received < 5,
"Expected buffer overflow to drop packets, but received all {received}"
);
assert!(
received >= 1,
"Expected at least one packet to be delivered, got {received}"
);
}
#[test]
fn test_congestion_causes_delay_then_loss() {
let config = NetemConfig::new()
.link(Bitrate::kbps(80), DataSize::bytes(500))
.seed(42);
let mut netem: Netem<Vec<u8>> = Netem::new(config);
let now = instant();
for i in 0..20 {
netem.handle_input(Input::Packet(now, vec![i; 100]));
}
let first = netem.poll_output();
assert!(matches!(first, Some(Output::Packet(_))));
let second = netem.poll_output();
match second {
Some(Output::Timeout(t)) => {
assert!(t > now, "Expected queuing delay");
}
_ => panic!("Expected timeout due to rate limiting"),
}
let far_future = now + Duration::from_secs(10);
netem.handle_input(Input::Packet(far_future, vec![]));
let mut received = 1;
while let Some(output) = netem.poll_output() {
if matches!(output, Output::Packet(_)) {
received += 1;
}
}
assert!(
received < 20,
"Expected buffer overflow to cause loss, but received all {received}"
);
assert!(
received >= 5,
"Expected some packets to get through, only got {received}"
);
}
}