Skip to main content

str0m_netem/
lib.rs

1//! Sans-IO network emulator inspired by Linux netem.
2//!
3//! This crate provides a network emulator that can simulate:
4//! - Latency and jitter
5//! - Packet loss (random or bursty via Gilbert-Elliot model)
6//! - Packet duplication
7//! - Packet reordering
8//! - Rate limiting
9//!
10//! # Sans-IO Pattern
11//!
12//! This implementation follows the Sans-IO pattern: packets go in with timestamps,
13//! and decisions come out (drop, delay, duplicate). The caller handles actual I/O
14//! and timing.
15//!
16//! # Example
17//!
18//! ```
19//! use std::time::{Duration, Instant};
20//! use str0m_netem::{Netem, NetemConfig, Input, Output, LossModel, RandomLoss, Probability};
21//!
22//! let config = NetemConfig::new()
23//!     .latency(Duration::from_millis(50))
24//!     .jitter(Duration::from_millis(10))
25//!     .loss(RandomLoss::new(Probability::new(0.01)))
26//!     .seed(42);
27//!
28//! let mut netem: Netem<Vec<u8>> = Netem::new(config);
29//!
30//! // Send a packet
31//! let now = Instant::now();
32//! netem.handle_input(Input::Packet(now, vec![1, 2, 3]));
33//!
34//! // Poll for output
35//! while let Some(output) = netem.poll_output() {
36//!     match output {
37//!         Output::Timeout(when) => {
38//!             // Wait until `when` and call handle_input with Input::Timeout
39//!         }
40//!         Output::Packet(data) => {
41//!             // Send the packet
42//!         }
43//!     }
44//! }
45//! ```
46
47mod config;
48mod loss;
49
50pub use config::{Bitrate, DataSize, GilbertElliot, Link};
51pub use config::{LossModel, NetemConfig, Probability, RandomLoss};
52
53use std::cmp::{Ordering, Reverse};
54use std::collections::BinaryHeap;
55use std::time::{Duration, Instant};
56
57use fastrand::Rng;
58
59use loss::LossState;
60
61/// Sans-IO network emulator.
62pub struct Netem<T> {
63    config: NetemConfig,
64    rng: Rng,
65    loss_state: LossState,
66
67    /// Priority queue of packets, ordered by send time (earliest first).
68    /// Using Reverse to make BinaryHeap a min-heap.
69    queue: BinaryHeap<Reverse<QueuedPacket<T>>>,
70
71    /// Last delay value for correlation.
72    last_delay: Duration,
73
74    /// Virtual time when the last packet would finish transmitting (for rate limiting).
75    rate_virtual_time: Option<Instant>,
76
77    /// Counter for reordering (every N packets, one gets reordered).
78    reorder_counter: u32,
79
80    /// Current time from the last input.
81    current_time: Option<Instant>,
82
83    /// Number of packets for the current time.
84    packet_count: u64,
85
86    /// Whether we've already returned a timeout for the next packet.
87    timeout_pending: bool,
88
89    /// Send time of the last queued packet (for reordering).
90    last_send_at: Option<Instant>,
91
92    /// Total bytes currently queued (for buffer overflow detection).
93    queued_bytes: usize,
94}
95
96impl<T: Clone + WithLen> Netem<T> {
97    /// Create a new network emulator with the given configuration.
98    pub fn new(config: NetemConfig) -> Self {
99        let rng = Rng::with_seed(config.seed);
100
101        let loss_state = LossState::new(&config.loss);
102
103        Self {
104            config,
105            rng,
106            loss_state,
107            queue: BinaryHeap::new(),
108            last_delay: Duration::ZERO,
109            rate_virtual_time: None,
110            reorder_counter: 0,
111            current_time: None,
112            packet_count: 0,
113            timeout_pending: false,
114            last_send_at: None,
115            queued_bytes: 0,
116        }
117    }
118
119    /// Handle an input event.
120    pub fn handle_input(&mut self, input: Input<T>) {
121        match input {
122            Input::Timeout(now) => {
123                self.progress_time(now);
124                self.timeout_pending = false;
125            }
126            Input::Packet(now, data) => {
127                self.progress_time(now);
128                self.process_packet(now, data);
129            }
130        }
131    }
132
133    fn progress_time(&mut self, now: Instant) {
134        if let Some(last_time) = self.current_time {
135            if now < last_time {
136                // Time does not go backwards.
137                return;
138            }
139        }
140        self.current_time = Some(now);
141    }
142
143    /// Poll for the next output event.
144    ///
145    /// Returns `None` when there are no more events to process.
146    pub fn poll_output(&mut self) -> Option<Output<T>> {
147        let now = self.current_time?;
148
149        // Check if the next packet is ready to send
150        if let Some(Reverse(packet)) = self.queue.peek() {
151            if packet.send_at <= now {
152                let Reverse(packet) = self.queue.pop().unwrap();
153                // Decrement queued bytes when packet is dequeued
154                self.queued_bytes = self.queued_bytes.saturating_sub(packet.data.len());
155                return Some(Output::Packet(packet.data));
156            }
157
158            // Need to wait for the packet
159            if !self.timeout_pending {
160                self.timeout_pending = true;
161                return Some(Output::Timeout(packet.send_at));
162            }
163        }
164
165        None
166    }
167
168    /// Returns when the next packet will be ready, if any.
169    ///
170    /// This can be used to decide which of multiple Netem instances
171    /// should be polled next.
172    pub fn poll_timeout(&self) -> Instant {
173        self.queue
174            .peek()
175            .map(|Reverse(p)| p.send_at)
176            .unwrap_or_else(not_happening)
177    }
178
179    /// Process an incoming packet.
180    fn process_packet(&mut self, now: Instant, data: T) {
181        // Check for packet loss
182        if self
183            .loss_state
184            .should_lose(&self.config.loss, &mut self.rng)
185        {
186            return; // Packet dropped
187        }
188
189        // Check for duplication (process original first, then maybe duplicate)
190        let should_duplicate = self.rng.f32() < self.config.duplicate.0;
191
192        // Process the original packet
193        self.enqueue_packet(now, data.clone());
194
195        // Duplicate if needed
196        if should_duplicate {
197            self.enqueue_packet(now, data);
198        }
199    }
200
201    /// Calculate delay and enqueue a packet.
202    fn enqueue_packet(&mut self, now: Instant, data: T) {
203        // Calculate delay with jitter
204        let delay = self.calculate_delay();
205
206        // Calculate base send time
207        let mut send_at = now + delay;
208
209        // Handle link rate limiting and buffer overflow
210        let transmission_time = if let Some(link) = self.config.link {
211            let packet_size = DataSize::from(data.len());
212            let tx_time = packet_size / link.rate;
213
214            // Apply rate limiting: packet can't be sent until previous finishes
215            if let Some(virtual_time) = self.rate_virtual_time {
216                if virtual_time > send_at {
217                    send_at = virtual_time;
218                }
219            }
220
221            // Check buffer overflow (tail drop) using actual queued bytes
222            if self.queued_bytes + data.len() > link.buffer.as_bytes_usize() {
223                // Buffer overflow - drop packet
224                return;
225            }
226
227            Some(tx_time)
228        } else {
229            None
230        };
231
232        // Determine if this packet should be reordered
233        let should_reorder = if let Some(gap) = self.config.reorder_gap {
234            self.reorder_counter += 1;
235            if self.reorder_counter >= gap {
236                self.reorder_counter = 0;
237                // Can only reorder if we have a previous packet to reorder before
238                self.last_send_at.is_some() && self.packet_count > 0
239            } else {
240                false
241            }
242        } else {
243            false
244        };
245
246        let gap = self.config.reorder_gap.unwrap_or(1) as u64;
247        let packet_index;
248
249        if should_reorder {
250            // Reordered packet: use previous packet's send_at and a lower index
251            send_at = self.last_send_at.unwrap();
252            // Index slots before the previous packet: count * gap - 1
253            // Previous packet had index = count * gap
254            packet_index = self.packet_count * gap - 1;
255            // Don't update rate_virtual_time or last_send_at
256        } else {
257            // Normal packet: use calculated send_at with gaps for index
258            packet_index = (self.packet_count + 1) * gap;
259
260            // Update rate_virtual_time for next packet
261            if let Some(tx_time) = transmission_time {
262                self.rate_virtual_time = Some(send_at + tx_time);
263            }
264
265            // Track this packet's send_at for potential future reordering
266            self.last_send_at = Some(send_at);
267        }
268
269        self.packet_count += 1;
270
271        // Track queued bytes for buffer overflow detection
272        self.queued_bytes += data.len();
273
274        let packet = QueuedPacket {
275            send_at,
276            data,
277            packet_index,
278        };
279        self.queue.push(Reverse(packet));
280
281        // Reset timeout pending since queue changed
282        self.timeout_pending = false;
283    }
284
285    /// Calculate delay with jitter and correlation.
286    fn calculate_delay(&mut self) -> Duration {
287        let base = self.config.latency;
288        let jitter = self.config.jitter;
289
290        if jitter.is_zero() {
291            return base;
292        }
293
294        // Generate correlated jitter
295        let rho = self.config.delay_correlation.0;
296        let jitter_nanos = jitter.as_nanos() as f32;
297
298        // Random value in [-1, 1]
299        let fresh_random = self.rng.f32() * 2.0 - 1.0;
300        let last_normalized = if self.last_delay >= base {
301            (self.last_delay - base).as_nanos() as f32 / jitter_nanos
302        } else {
303            -((base - self.last_delay).as_nanos() as f32 / jitter_nanos)
304        };
305
306        let jitter_factor = if rho == 0.0 {
307            fresh_random
308        } else {
309            fresh_random * (1.0 - rho) + last_normalized.clamp(-1.0, 1.0) * rho
310        };
311
312        let jitter_nanos = (jitter_factor * jitter_nanos) as i64;
313        let delay = if jitter_nanos >= 0 {
314            base + Duration::from_nanos(jitter_nanos as u64)
315        } else {
316            base.saturating_sub(Duration::from_nanos((-jitter_nanos) as u64))
317        };
318
319        self.last_delay = delay;
320        delay
321    }
322
323    /// Returns the number of packets currently queued.
324    pub fn queue_len(&self) -> usize {
325        self.queue.len()
326    }
327
328    /// Returns true if the queue is empty.
329    pub fn is_empty(&self) -> bool {
330        self.queue.is_empty()
331    }
332
333    /// Update the configuration without dropping queued packets.
334    ///
335    /// Resets loss state and correlation tracking, but preserves timing state
336    /// (rate limiting, reorder counter, pending timeouts) to avoid disrupting
337    /// packets already in the queue.
338    pub fn set_config(&mut self, config: NetemConfig) {
339        self.rng = Rng::with_seed(config.seed);
340        self.loss_state = LossState::new(&config.loss);
341        self.last_delay = Duration::ZERO;
342        self.last_send_at = None;
343        // Don't reset: rate_virtual_time, reorder_counter, timeout_pending, current_time
344        // These affect packets already queued
345        self.config = config;
346    }
347}
348
349/// Trait for getting the length of packet data (used for rate limiting).
350pub trait WithLen {
351    fn len(&self) -> usize;
352
353    fn is_empty(&self) -> bool {
354        self.len() == 0
355    }
356}
357
358impl<T: AsRef<[u8]>> WithLen for T {
359    fn len(&self) -> usize {
360        self.as_ref().len()
361    }
362}
363
364/// Input events to the network emulator.
365#[derive(Debug)]
366pub enum Input<T> {
367    /// A timeout has occurred at the given instant.
368    Timeout(Instant),
369
370    /// A packet arrived at the given instant with the given data.
371    Packet(Instant, T),
372}
373
374/// Output events from the network emulator.
375#[derive(Debug)]
376pub enum Output<T> {
377    /// Request a timeout at the given instant.
378    Timeout(Instant),
379
380    /// A packet is ready to be sent.
381    Packet(T),
382}
383
384/// A queued packet waiting to be sent.
385#[derive(Debug)]
386struct QueuedPacket<T> {
387    /// When this packet should be sent.
388    send_at: Instant,
389
390    /// The packet data.
391    data: T,
392
393    /// Ever increasing counter to break ties when send_at is the same.
394    packet_index: u64,
395}
396
397fn not_happening() -> Instant {
398    Instant::now() + Duration::from_secs(3600 * 24 * 365 * 10)
399}
400
401impl<T> PartialEq for QueuedPacket<T> {
402    fn eq(&self, other: &Self) -> bool {
403        self.send_at == other.send_at && self.packet_index == other.packet_index
404    }
405}
406
407impl<T> Eq for QueuedPacket<T> {}
408
409impl<T> PartialOrd for QueuedPacket<T> {
410    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
411        Some(self.cmp(other))
412    }
413}
414
415impl<T> Ord for QueuedPacket<T> {
416    fn cmp(&self, other: &Self) -> Ordering {
417        self.packet_index
418            .cmp(&other.packet_index)
419            .then(self.send_at.cmp(&other.send_at))
420    }
421}
422
423#[cfg(test)]
424mod tests {
425    use super::*;
426
427    fn instant() -> Instant {
428        Instant::now()
429    }
430
431    #[test]
432    fn test_passthrough() {
433        let config = NetemConfig::default();
434        let mut netem: Netem<Vec<u8>> = Netem::new(config);
435
436        let now = instant();
437        netem.handle_input(Input::Packet(now, vec![1, 2, 3]));
438
439        let output = netem.poll_output();
440        assert!(matches!(output, Some(Output::Packet(data)) if data == vec![1, 2, 3]));
441        assert!(netem.poll_output().is_none());
442    }
443
444    #[test]
445    fn test_latency() {
446        let config = NetemConfig::new()
447            .latency(Duration::from_millis(100))
448            .seed(42);
449        let mut netem: Netem<Vec<u8>> = Netem::new(config);
450
451        let now = instant();
452        netem.handle_input(Input::Packet(now, vec![1, 2, 3]));
453
454        // Should get a timeout, not the packet
455        let output = netem.poll_output();
456        assert!(matches!(output, Some(Output::Timeout(t)) if t > now));
457
458        // After the timeout, packet should be ready
459        let later = now + Duration::from_millis(100);
460        netem.handle_input(Input::Timeout(later));
461
462        let output = netem.poll_output();
463        assert!(matches!(output, Some(Output::Packet(data)) if data == vec![1, 2, 3]));
464    }
465
466    #[test]
467    fn test_total_loss() {
468        let config = NetemConfig::new()
469            .loss(RandomLoss::new(Probability::ONE))
470            .seed(42);
471        let mut netem: Netem<Vec<u8>> = Netem::new(config);
472
473        let now = instant();
474        netem.handle_input(Input::Packet(now, vec![1, 2, 3]));
475
476        assert!(netem.poll_output().is_none());
477        assert!(netem.is_empty());
478    }
479
480    #[test]
481    fn test_duplication() {
482        let config = NetemConfig::new().duplicate(Probability::ONE).seed(42);
483        let mut netem: Netem<Vec<u8>> = Netem::new(config);
484
485        let now = instant();
486        netem.handle_input(Input::Packet(now, vec![1, 2, 3]));
487
488        // Should get two packets
489        assert!(matches!(netem.poll_output(), Some(Output::Packet(_))));
490        assert!(matches!(netem.poll_output(), Some(Output::Packet(_))));
491        assert!(netem.poll_output().is_none());
492    }
493
494    #[test]
495    fn test_rate_limiting() {
496        // 8 kbps = 1000 bytes/sec, large buffer to avoid drops
497        let config = NetemConfig::new()
498            .link(Bitrate::kbps(8), DataSize::kbytes(10))
499            .seed(42);
500        let mut netem: Netem<Vec<u8>> = Netem::new(config);
501
502        let now = instant();
503
504        // Send 100 bytes
505        netem.handle_input(Input::Packet(now, vec![0; 100]));
506
507        // First packet should be immediate
508        let output = netem.poll_output();
509        assert!(matches!(output, Some(Output::Packet(_))));
510
511        // Send another 100 bytes immediately after
512        netem.handle_input(Input::Packet(now, vec![0; 100]));
513
514        // Second packet should require a timeout (rate limited)
515        let output = netem.poll_output();
516        match output {
517            Some(Output::Timeout(t)) => {
518                // Should be delayed by ~100ms (100 bytes at 1000 bytes/sec)
519                let delay = t - now;
520                assert!(delay >= Duration::from_millis(90));
521                assert!(delay <= Duration::from_millis(110));
522            }
523            _ => panic!("Expected timeout, got {:?}", output),
524        }
525    }
526
527    #[test]
528    fn test_reordering() {
529        let config = NetemConfig::new()
530            .latency(Duration::from_millis(100))
531            .reorder_gap(3) // Every 3rd packet is reordered
532            .seed(42);
533        let mut netem: Netem<Vec<u8>> = Netem::new(config);
534
535        let now = instant();
536
537        // Send 3 packets
538        netem.handle_input(Input::Packet(now, vec![1]));
539        netem.handle_input(Input::Packet(now, vec![2]));
540        netem.handle_input(Input::Packet(now, vec![3])); // This one should be reordered before packet 2
541
542        // All packets are delayed by latency, so we get a timeout first
543        let output = netem.poll_output();
544        assert!(
545            matches!(output, Some(Output::Timeout(t)) if t == now + Duration::from_millis(100))
546        );
547
548        // After the timeout, packets should be ready in reordered sequence: 1, 3, 2
549        let later = now + Duration::from_millis(100);
550        netem.handle_input(Input::Timeout(later));
551
552        // Packet 1 comes first (lowest index)
553        let output = netem.poll_output();
554        assert!(matches!(output, Some(Output::Packet(data)) if data == vec![1]));
555
556        // Packet 3 comes second (reordered before packet 2)
557        let output = netem.poll_output();
558        assert!(matches!(output, Some(Output::Packet(data)) if data == vec![3]));
559
560        // Packet 2 comes last
561        let output = netem.poll_output();
562        assert!(matches!(output, Some(Output::Packet(data)) if data == vec![2]));
563    }
564
565    #[test]
566    fn test_reordering_with_rate_limiting() {
567        // 8 kbps = 1024 bytes/sec, large buffer to avoid drops
568        let config = NetemConfig::new()
569            .link(Bitrate::kbps(8), DataSize::kbytes(10))
570            .reorder_gap(3) // Every 3rd packet is reordered
571            .seed(42);
572        let mut netem: Netem<Vec<u8>> = Netem::new(config);
573
574        let now = instant();
575
576        // Send 3 packets of 100 bytes each
577        netem.handle_input(Input::Packet(now, vec![0; 100]));
578        netem.handle_input(Input::Packet(now, vec![0; 100]));
579        netem.handle_input(Input::Packet(now, vec![0; 100])); // Reordered
580
581        // First packet should be immediate (no latency configured)
582        let output = netem.poll_output();
583        assert!(matches!(output, Some(Output::Packet(_))));
584
585        // Next output should be a timeout (rate limited)
586        // The reordered packet (3rd) shares the slot with packet 2
587        let output = netem.poll_output();
588        match output {
589            Some(Output::Timeout(t)) => {
590                // Should be delayed by ~100ms (100 bytes at 1000 bytes/sec)
591                let delay = t - now;
592                assert!(
593                    delay >= Duration::from_millis(90),
594                    "Reordered packet should respect rate limiting, got delay {:?}",
595                    delay
596                );
597            }
598            _ => panic!(
599                "Expected timeout for rate-limited reordered packet, got {:?}",
600                output
601            ),
602        }
603    }
604
605    #[test]
606    fn test_gilbert_elliot_preset() {
607        let config = NetemConfig::new()
608            .loss(LossModel::GilbertElliot(GilbertElliot::wifi()))
609            .seed(42);
610        let mut netem: Netem<Vec<u8>> = Netem::new(config);
611
612        let now = instant();
613        let mut received = 0;
614        let total = 1000;
615
616        for i in 0..total {
617            netem.handle_input(Input::Packet(now, vec![i as u8]));
618            while let Some(output) = netem.poll_output() {
619                if matches!(output, Output::Packet(_)) {
620                    received += 1;
621                }
622            }
623        }
624
625        // WiFi preset should have ~1% loss, so ~990 received
626        let loss_ratio = 1.0 - (received as f32 / total as f32);
627        assert!(
628            (0.005..=0.05).contains(&loss_ratio),
629            "Loss ratio: {}",
630            loss_ratio
631        );
632    }
633
634    #[test]
635    fn test_buffer_overflow_drops_packets() {
636        // 80 kbps = 10KB/sec, tiny 100 byte buffer
637        // This means only ~1 packet of 100 bytes can be queued
638        let config = NetemConfig::new()
639            .link(Bitrate::kbps(80), DataSize::bytes(100))
640            .seed(42);
641        let mut netem: Netem<Vec<u8>> = Netem::new(config);
642
643        let now = instant();
644
645        // Send 5 packets of 100 bytes each at once
646        // Only the first should be accepted, rest should be dropped due to buffer overflow
647        for i in 0..5 {
648            netem.handle_input(Input::Packet(now, vec![i; 100]));
649        }
650
651        // Count how many packets we actually receive
652        let mut received = 0;
653        while let Some(output) = netem.poll_output() {
654            match output {
655                Output::Packet(_) => received += 1,
656                Output::Timeout(t) => {
657                    // Advance time to release next packet
658                    netem.handle_input(Input::Timeout(t));
659                }
660            }
661        }
662
663        // With a 100 byte buffer and 100 byte packets, only 1-2 should fit
664        assert!(
665            received < 5,
666            "Expected buffer overflow to drop packets, but received all {received}"
667        );
668        assert!(
669            received >= 1,
670            "Expected at least one packet to be delivered, got {received}"
671        );
672    }
673
674    #[test]
675    fn test_congestion_causes_delay_then_loss() {
676        // 80 kbps = 10KB/sec, 500 byte buffer (~50ms worth at this rate)
677        // This allows ~5 packets of 100 bytes each to be queued
678        let config = NetemConfig::new()
679            .link(Bitrate::kbps(80), DataSize::bytes(500))
680            .seed(42);
681        let mut netem: Netem<Vec<u8>> = Netem::new(config);
682
683        let now = instant();
684
685        // Send many packets to cause congestion and buffer overflow
686        // 20 packets * 100 bytes = 2000 bytes, but buffer is only 500 bytes
687        for i in 0..20 {
688            netem.handle_input(Input::Packet(now, vec![i; 100]));
689        }
690
691        // First packet should be immediate (no queue yet)
692        let first = netem.poll_output();
693        assert!(matches!(first, Some(Output::Packet(_))));
694
695        // Next should be a timeout (queued due to rate limiting)
696        let second = netem.poll_output();
697        match second {
698            Some(Output::Timeout(t)) => {
699                // Delay should be positive (congestion causing queue buildup)
700                assert!(t > now, "Expected queuing delay");
701            }
702            _ => panic!("Expected timeout due to rate limiting"),
703        }
704
705        // Advance time to release all remaining packets
706        // Send a dummy packet at a far future time to advance current_time
707        let far_future = now + Duration::from_secs(10);
708        netem.handle_input(Input::Packet(far_future, vec![]));
709
710        // Count total packets received (including the first one we already got)
711        let mut received = 1;
712        while let Some(output) = netem.poll_output() {
713            if matches!(output, Output::Packet(_)) {
714                received += 1;
715            }
716        }
717
718        // Should have lost some packets due to buffer overflow
719        // With 500 byte buffer and 100 byte packets:
720        // - First packet sent immediately (no queue)
721        // - 5 more packets can be buffered (500 bytes)
722        // - Remaining 14 packets should be dropped
723        // Expected: ~6 packets total (plus the dummy 0-byte packet)
724        assert!(
725            received < 20,
726            "Expected buffer overflow to cause loss, but received all {received}"
727        );
728        assert!(
729            received >= 5,
730            "Expected some packets to get through, only got {received}"
731        );
732    }
733}