1mod config;
48mod loss;
49
50pub use config::{Bitrate, DataSize, GilbertElliot, Link};
51pub use config::{LossModel, NetemConfig, Probability, RandomLoss};
52
53use std::cmp::{Ordering, Reverse};
54use std::collections::BinaryHeap;
55use std::time::{Duration, Instant};
56
57use fastrand::Rng;
58
59use loss::LossState;
60
61pub struct Netem<T> {
63 config: NetemConfig,
64 rng: Rng,
65 loss_state: LossState,
66
67 queue: BinaryHeap<Reverse<QueuedPacket<T>>>,
70
71 last_delay: Duration,
73
74 rate_virtual_time: Option<Instant>,
76
77 reorder_counter: u32,
79
80 current_time: Option<Instant>,
82
83 packet_count: u64,
85
86 timeout_pending: bool,
88
89 last_send_at: Option<Instant>,
91
92 queued_bytes: usize,
94}
95
96impl<T: Clone + WithLen> Netem<T> {
97 pub fn new(config: NetemConfig) -> Self {
99 let rng = Rng::with_seed(config.seed);
100
101 let loss_state = LossState::new(&config.loss);
102
103 Self {
104 config,
105 rng,
106 loss_state,
107 queue: BinaryHeap::new(),
108 last_delay: Duration::ZERO,
109 rate_virtual_time: None,
110 reorder_counter: 0,
111 current_time: None,
112 packet_count: 0,
113 timeout_pending: false,
114 last_send_at: None,
115 queued_bytes: 0,
116 }
117 }
118
119 pub fn handle_input(&mut self, input: Input<T>) {
121 match input {
122 Input::Timeout(now) => {
123 self.progress_time(now);
124 self.timeout_pending = false;
125 }
126 Input::Packet(now, data) => {
127 self.progress_time(now);
128 self.process_packet(now, data);
129 }
130 }
131 }
132
133 fn progress_time(&mut self, now: Instant) {
134 if let Some(last_time) = self.current_time {
135 if now < last_time {
136 return;
138 }
139 }
140 self.current_time = Some(now);
141 }
142
143 pub fn poll_output(&mut self) -> Option<Output<T>> {
147 let now = self.current_time?;
148
149 if let Some(Reverse(packet)) = self.queue.peek() {
151 if packet.send_at <= now {
152 let Reverse(packet) = self.queue.pop().unwrap();
153 self.queued_bytes = self.queued_bytes.saturating_sub(packet.data.len());
155 return Some(Output::Packet(packet.data));
156 }
157
158 if !self.timeout_pending {
160 self.timeout_pending = true;
161 return Some(Output::Timeout(packet.send_at));
162 }
163 }
164
165 None
166 }
167
168 pub fn poll_timeout(&self) -> Instant {
173 self.queue
174 .peek()
175 .map(|Reverse(p)| p.send_at)
176 .unwrap_or_else(not_happening)
177 }
178
179 fn process_packet(&mut self, now: Instant, data: T) {
181 if self
183 .loss_state
184 .should_lose(&self.config.loss, &mut self.rng)
185 {
186 return; }
188
189 let should_duplicate = self.rng.f32() < self.config.duplicate.0;
191
192 self.enqueue_packet(now, data.clone());
194
195 if should_duplicate {
197 self.enqueue_packet(now, data);
198 }
199 }
200
201 fn enqueue_packet(&mut self, now: Instant, data: T) {
203 let delay = self.calculate_delay();
205
206 let mut send_at = now + delay;
208
209 let transmission_time = if let Some(link) = self.config.link {
211 let packet_size = DataSize::from(data.len());
212 let tx_time = packet_size / link.rate;
213
214 if let Some(virtual_time) = self.rate_virtual_time {
216 if virtual_time > send_at {
217 send_at = virtual_time;
218 }
219 }
220
221 if self.queued_bytes + data.len() > link.buffer.as_bytes_usize() {
223 return;
225 }
226
227 Some(tx_time)
228 } else {
229 None
230 };
231
232 let should_reorder = if let Some(gap) = self.config.reorder_gap {
234 self.reorder_counter += 1;
235 if self.reorder_counter >= gap {
236 self.reorder_counter = 0;
237 self.last_send_at.is_some() && self.packet_count > 0
239 } else {
240 false
241 }
242 } else {
243 false
244 };
245
246 let gap = self.config.reorder_gap.unwrap_or(1) as u64;
247 let packet_index;
248
249 if should_reorder {
250 send_at = self.last_send_at.unwrap();
252 packet_index = self.packet_count * gap - 1;
255 } else {
257 packet_index = (self.packet_count + 1) * gap;
259
260 if let Some(tx_time) = transmission_time {
262 self.rate_virtual_time = Some(send_at + tx_time);
263 }
264
265 self.last_send_at = Some(send_at);
267 }
268
269 self.packet_count += 1;
270
271 self.queued_bytes += data.len();
273
274 let packet = QueuedPacket {
275 send_at,
276 data,
277 packet_index,
278 };
279 self.queue.push(Reverse(packet));
280
281 self.timeout_pending = false;
283 }
284
285 fn calculate_delay(&mut self) -> Duration {
287 let base = self.config.latency;
288 let jitter = self.config.jitter;
289
290 if jitter.is_zero() {
291 return base;
292 }
293
294 let rho = self.config.delay_correlation.0;
296 let jitter_nanos = jitter.as_nanos() as f32;
297
298 let fresh_random = self.rng.f32() * 2.0 - 1.0;
300 let last_normalized = if self.last_delay >= base {
301 (self.last_delay - base).as_nanos() as f32 / jitter_nanos
302 } else {
303 -((base - self.last_delay).as_nanos() as f32 / jitter_nanos)
304 };
305
306 let jitter_factor = if rho == 0.0 {
307 fresh_random
308 } else {
309 fresh_random * (1.0 - rho) + last_normalized.clamp(-1.0, 1.0) * rho
310 };
311
312 let jitter_nanos = (jitter_factor * jitter_nanos) as i64;
313 let delay = if jitter_nanos >= 0 {
314 base + Duration::from_nanos(jitter_nanos as u64)
315 } else {
316 base.saturating_sub(Duration::from_nanos((-jitter_nanos) as u64))
317 };
318
319 self.last_delay = delay;
320 delay
321 }
322
323 pub fn queue_len(&self) -> usize {
325 self.queue.len()
326 }
327
328 pub fn is_empty(&self) -> bool {
330 self.queue.is_empty()
331 }
332
333 pub fn set_config(&mut self, config: NetemConfig) {
339 self.rng = Rng::with_seed(config.seed);
340 self.loss_state = LossState::new(&config.loss);
341 self.last_delay = Duration::ZERO;
342 self.last_send_at = None;
343 self.config = config;
346 }
347}
348
349pub trait WithLen {
351 fn len(&self) -> usize;
352
353 fn is_empty(&self) -> bool {
354 self.len() == 0
355 }
356}
357
358impl<T: AsRef<[u8]>> WithLen for T {
359 fn len(&self) -> usize {
360 self.as_ref().len()
361 }
362}
363
364#[derive(Debug)]
366pub enum Input<T> {
367 Timeout(Instant),
369
370 Packet(Instant, T),
372}
373
374#[derive(Debug)]
376pub enum Output<T> {
377 Timeout(Instant),
379
380 Packet(T),
382}
383
384#[derive(Debug)]
386struct QueuedPacket<T> {
387 send_at: Instant,
389
390 data: T,
392
393 packet_index: u64,
395}
396
397fn not_happening() -> Instant {
398 Instant::now() + Duration::from_secs(3600 * 24 * 365 * 10)
399}
400
401impl<T> PartialEq for QueuedPacket<T> {
402 fn eq(&self, other: &Self) -> bool {
403 self.send_at == other.send_at && self.packet_index == other.packet_index
404 }
405}
406
407impl<T> Eq for QueuedPacket<T> {}
408
409impl<T> PartialOrd for QueuedPacket<T> {
410 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
411 Some(self.cmp(other))
412 }
413}
414
415impl<T> Ord for QueuedPacket<T> {
416 fn cmp(&self, other: &Self) -> Ordering {
417 self.packet_index
418 .cmp(&other.packet_index)
419 .then(self.send_at.cmp(&other.send_at))
420 }
421}
422
423#[cfg(test)]
424mod tests {
425 use super::*;
426
427 fn instant() -> Instant {
428 Instant::now()
429 }
430
431 #[test]
432 fn test_passthrough() {
433 let config = NetemConfig::default();
434 let mut netem: Netem<Vec<u8>> = Netem::new(config);
435
436 let now = instant();
437 netem.handle_input(Input::Packet(now, vec![1, 2, 3]));
438
439 let output = netem.poll_output();
440 assert!(matches!(output, Some(Output::Packet(data)) if data == vec![1, 2, 3]));
441 assert!(netem.poll_output().is_none());
442 }
443
444 #[test]
445 fn test_latency() {
446 let config = NetemConfig::new()
447 .latency(Duration::from_millis(100))
448 .seed(42);
449 let mut netem: Netem<Vec<u8>> = Netem::new(config);
450
451 let now = instant();
452 netem.handle_input(Input::Packet(now, vec![1, 2, 3]));
453
454 let output = netem.poll_output();
456 assert!(matches!(output, Some(Output::Timeout(t)) if t > now));
457
458 let later = now + Duration::from_millis(100);
460 netem.handle_input(Input::Timeout(later));
461
462 let output = netem.poll_output();
463 assert!(matches!(output, Some(Output::Packet(data)) if data == vec![1, 2, 3]));
464 }
465
466 #[test]
467 fn test_total_loss() {
468 let config = NetemConfig::new()
469 .loss(RandomLoss::new(Probability::ONE))
470 .seed(42);
471 let mut netem: Netem<Vec<u8>> = Netem::new(config);
472
473 let now = instant();
474 netem.handle_input(Input::Packet(now, vec![1, 2, 3]));
475
476 assert!(netem.poll_output().is_none());
477 assert!(netem.is_empty());
478 }
479
480 #[test]
481 fn test_duplication() {
482 let config = NetemConfig::new().duplicate(Probability::ONE).seed(42);
483 let mut netem: Netem<Vec<u8>> = Netem::new(config);
484
485 let now = instant();
486 netem.handle_input(Input::Packet(now, vec![1, 2, 3]));
487
488 assert!(matches!(netem.poll_output(), Some(Output::Packet(_))));
490 assert!(matches!(netem.poll_output(), Some(Output::Packet(_))));
491 assert!(netem.poll_output().is_none());
492 }
493
494 #[test]
495 fn test_rate_limiting() {
496 let config = NetemConfig::new()
498 .link(Bitrate::kbps(8), DataSize::kbytes(10))
499 .seed(42);
500 let mut netem: Netem<Vec<u8>> = Netem::new(config);
501
502 let now = instant();
503
504 netem.handle_input(Input::Packet(now, vec![0; 100]));
506
507 let output = netem.poll_output();
509 assert!(matches!(output, Some(Output::Packet(_))));
510
511 netem.handle_input(Input::Packet(now, vec![0; 100]));
513
514 let output = netem.poll_output();
516 match output {
517 Some(Output::Timeout(t)) => {
518 let delay = t - now;
520 assert!(delay >= Duration::from_millis(90));
521 assert!(delay <= Duration::from_millis(110));
522 }
523 _ => panic!("Expected timeout, got {:?}", output),
524 }
525 }
526
527 #[test]
528 fn test_reordering() {
529 let config = NetemConfig::new()
530 .latency(Duration::from_millis(100))
531 .reorder_gap(3) .seed(42);
533 let mut netem: Netem<Vec<u8>> = Netem::new(config);
534
535 let now = instant();
536
537 netem.handle_input(Input::Packet(now, vec![1]));
539 netem.handle_input(Input::Packet(now, vec![2]));
540 netem.handle_input(Input::Packet(now, vec![3])); let output = netem.poll_output();
544 assert!(
545 matches!(output, Some(Output::Timeout(t)) if t == now + Duration::from_millis(100))
546 );
547
548 let later = now + Duration::from_millis(100);
550 netem.handle_input(Input::Timeout(later));
551
552 let output = netem.poll_output();
554 assert!(matches!(output, Some(Output::Packet(data)) if data == vec![1]));
555
556 let output = netem.poll_output();
558 assert!(matches!(output, Some(Output::Packet(data)) if data == vec![3]));
559
560 let output = netem.poll_output();
562 assert!(matches!(output, Some(Output::Packet(data)) if data == vec![2]));
563 }
564
565 #[test]
566 fn test_reordering_with_rate_limiting() {
567 let config = NetemConfig::new()
569 .link(Bitrate::kbps(8), DataSize::kbytes(10))
570 .reorder_gap(3) .seed(42);
572 let mut netem: Netem<Vec<u8>> = Netem::new(config);
573
574 let now = instant();
575
576 netem.handle_input(Input::Packet(now, vec![0; 100]));
578 netem.handle_input(Input::Packet(now, vec![0; 100]));
579 netem.handle_input(Input::Packet(now, vec![0; 100])); let output = netem.poll_output();
583 assert!(matches!(output, Some(Output::Packet(_))));
584
585 let output = netem.poll_output();
588 match output {
589 Some(Output::Timeout(t)) => {
590 let delay = t - now;
592 assert!(
593 delay >= Duration::from_millis(90),
594 "Reordered packet should respect rate limiting, got delay {:?}",
595 delay
596 );
597 }
598 _ => panic!(
599 "Expected timeout for rate-limited reordered packet, got {:?}",
600 output
601 ),
602 }
603 }
604
605 #[test]
606 fn test_gilbert_elliot_preset() {
607 let config = NetemConfig::new()
608 .loss(LossModel::GilbertElliot(GilbertElliot::wifi()))
609 .seed(42);
610 let mut netem: Netem<Vec<u8>> = Netem::new(config);
611
612 let now = instant();
613 let mut received = 0;
614 let total = 1000;
615
616 for i in 0..total {
617 netem.handle_input(Input::Packet(now, vec![i as u8]));
618 while let Some(output) = netem.poll_output() {
619 if matches!(output, Output::Packet(_)) {
620 received += 1;
621 }
622 }
623 }
624
625 let loss_ratio = 1.0 - (received as f32 / total as f32);
627 assert!(
628 (0.005..=0.05).contains(&loss_ratio),
629 "Loss ratio: {}",
630 loss_ratio
631 );
632 }
633
634 #[test]
635 fn test_buffer_overflow_drops_packets() {
636 let config = NetemConfig::new()
639 .link(Bitrate::kbps(80), DataSize::bytes(100))
640 .seed(42);
641 let mut netem: Netem<Vec<u8>> = Netem::new(config);
642
643 let now = instant();
644
645 for i in 0..5 {
648 netem.handle_input(Input::Packet(now, vec![i; 100]));
649 }
650
651 let mut received = 0;
653 while let Some(output) = netem.poll_output() {
654 match output {
655 Output::Packet(_) => received += 1,
656 Output::Timeout(t) => {
657 netem.handle_input(Input::Timeout(t));
659 }
660 }
661 }
662
663 assert!(
665 received < 5,
666 "Expected buffer overflow to drop packets, but received all {received}"
667 );
668 assert!(
669 received >= 1,
670 "Expected at least one packet to be delivered, got {received}"
671 );
672 }
673
674 #[test]
675 fn test_congestion_causes_delay_then_loss() {
676 let config = NetemConfig::new()
679 .link(Bitrate::kbps(80), DataSize::bytes(500))
680 .seed(42);
681 let mut netem: Netem<Vec<u8>> = Netem::new(config);
682
683 let now = instant();
684
685 for i in 0..20 {
688 netem.handle_input(Input::Packet(now, vec![i; 100]));
689 }
690
691 let first = netem.poll_output();
693 assert!(matches!(first, Some(Output::Packet(_))));
694
695 let second = netem.poll_output();
697 match second {
698 Some(Output::Timeout(t)) => {
699 assert!(t > now, "Expected queuing delay");
701 }
702 _ => panic!("Expected timeout due to rate limiting"),
703 }
704
705 let far_future = now + Duration::from_secs(10);
708 netem.handle_input(Input::Packet(far_future, vec![]));
709
710 let mut received = 1;
712 while let Some(output) = netem.poll_output() {
713 if matches!(output, Output::Packet(_)) {
714 received += 1;
715 }
716 }
717
718 assert!(
725 received < 20,
726 "Expected buffer overflow to cause loss, but received all {received}"
727 );
728 assert!(
729 received >= 5,
730 "Expected some packets to get through, only got {received}"
731 );
732 }
733}