Skip to main content

rtp_engine/
jitter.rs

1//! Jitter buffer implementations for RTP media.
2//!
3//! Jitter buffers smooth out variable network delay (jitter) by buffering
4//! packets before playout. This module provides two strategies:
5//!
6//! - **Fixed**: Constant delay, simple and predictable
7//! - **Adaptive**: Dynamically adjusts delay based on observed jitter
8//!
9//! # Example
10//!
11//! ```
12//! use rtp_engine::jitter::{JitterBuffer, JitterConfig, JitterMode};
13//!
14//! // Create an adaptive jitter buffer
15//! let config = JitterConfig {
16//!     mode: JitterMode::Adaptive { target_ms: 60, min_ms: 20, max_ms: 200 },
17//!     clock_rate: 8000,
18//!     max_packets: 50,
19//! };
20//! let mut jitter = JitterBuffer::new(config);
21//!
22//! // Push received RTP packets (seq, timestamp, payload)
23//! jitter.push(0, 0, vec![0u8; 160]);
24//! jitter.push(1, 160, vec![0u8; 160]);
25//!
26//! // Pop packets for playout (returns None if not ready yet)
27//! // In real usage, wait for the jitter delay before popping
28//! ```
29
30use std::collections::BTreeMap;
31use std::time::{Duration, Instant};
32
33/// Jitter buffer operating mode.
34#[derive(Debug, Clone, Copy, PartialEq)]
35pub enum JitterMode {
36    /// Fixed delay jitter buffer.
37    Fixed {
38        /// Delay in milliseconds before playout.
39        delay_ms: u32,
40    },
41    /// Adaptive jitter buffer that adjusts to network conditions.
42    Adaptive {
43        /// Target delay in milliseconds (starting point).
44        target_ms: u32,
45        /// Minimum delay in milliseconds.
46        min_ms: u32,
47        /// Maximum delay in milliseconds.
48        max_ms: u32,
49    },
50}
51
52impl Default for JitterMode {
53    fn default() -> Self {
54        Self::Adaptive {
55            target_ms: 60,
56            min_ms: 20,
57            max_ms: 200,
58        }
59    }
60}
61
62/// Configuration for the jitter buffer.
63#[derive(Debug, Clone)]
64pub struct JitterConfig {
65    /// Operating mode (fixed or adaptive).
66    pub mode: JitterMode,
67    /// RTP clock rate in Hz (e.g., 8000 for G.711).
68    pub clock_rate: u32,
69    /// Maximum number of packets to buffer.
70    pub max_packets: usize,
71}
72
73impl Default for JitterConfig {
74    fn default() -> Self {
75        Self {
76            mode: JitterMode::default(),
77            clock_rate: 8000,
78            max_packets: 50,
79        }
80    }
81}
82
83/// A buffered RTP packet ready for playout.
84#[derive(Debug, Clone)]
85pub struct BufferedPacket {
86    /// RTP sequence number.
87    pub seq: u16,
88    /// RTP timestamp.
89    pub timestamp: u32,
90    /// Packet payload (encoded audio).
91    pub payload: Vec<u8>,
92    /// When this packet was received.
93    pub received_at: Instant,
94    /// Whether this is a synthesized packet (for loss concealment).
95    pub synthesized: bool,
96}
97
98/// Statistics from the jitter buffer.
99#[derive(Debug, Clone, Default)]
100pub struct JitterStats {
101    /// Total packets received.
102    pub packets_received: u64,
103    /// Packets dropped (buffer full or too late).
104    pub packets_dropped: u64,
105    /// Packets lost (gaps in sequence).
106    pub packets_lost: u64,
107    /// Packets played out.
108    pub packets_played: u64,
109    /// Current buffer depth in packets.
110    pub buffer_depth: usize,
111    /// Current delay in milliseconds.
112    pub current_delay_ms: u32,
113    /// Observed jitter in milliseconds.
114    pub observed_jitter_ms: f64,
115}
116
117/// Jitter buffer for RTP packet reordering and playout scheduling.
118pub struct JitterBuffer {
119    config: JitterConfig,
120    /// Packets indexed by extended sequence number.
121    packets: BTreeMap<u32, BufferedPacket>,
122    /// Current playout sequence (extended).
123    playout_seq: Option<u32>,
124    /// Sequence number cycles (for extended seq).
125    seq_cycles: u16,
126    /// Last received sequence number.
127    last_seq: Option<u16>,
128    /// Current adaptive delay in ms.
129    current_delay_ms: u32,
130    /// Jitter estimate (RFC 3550 style, in timestamp units).
131    jitter_estimate: f64,
132    /// Last transit time for jitter calculation.
133    last_transit: Option<i64>,
134    /// Statistics.
135    stats: JitterStats,
136    /// First packet timestamp (for relative timing).
137    base_timestamp: Option<u32>,
138    /// When the first packet was received.
139    base_time: Option<Instant>,
140    /// Whether we've started playout.
141    playing: bool,
142}
143
144impl JitterBuffer {
145    /// Create a new jitter buffer with the given configuration.
146    pub fn new(config: JitterConfig) -> Self {
147        let initial_delay = match config.mode {
148            JitterMode::Fixed { delay_ms } => delay_ms,
149            JitterMode::Adaptive { target_ms, .. } => target_ms,
150        };
151
152        Self {
153            config,
154            packets: BTreeMap::new(),
155            playout_seq: None,
156            seq_cycles: 0,
157            last_seq: None,
158            current_delay_ms: initial_delay,
159            jitter_estimate: 0.0,
160            last_transit: None,
161            stats: JitterStats::default(),
162            base_timestamp: None,
163            base_time: None,
164            playing: false,
165        }
166    }
167
168    /// Push a received RTP packet into the buffer.
169    ///
170    /// Returns `true` if the packet was buffered, `false` if dropped.
171    pub fn push(&mut self, seq: u16, timestamp: u32, payload: Vec<u8>) -> bool {
172        let now = Instant::now();
173        self.stats.packets_received += 1;
174
175        // Initialize base references on first packet
176        if self.base_timestamp.is_none() {
177            self.base_timestamp = Some(timestamp);
178            self.base_time = Some(now);
179            self.last_seq = Some(seq);
180        }
181
182        // Calculate extended sequence number
183        let extended_seq = self.extend_seq(seq);
184
185        // Update jitter estimate
186        self.update_jitter(timestamp, now);
187
188        // Check if packet is too late (already played)
189        if let Some(playout) = self.playout_seq
190            && extended_seq < playout
191        {
192            self.stats.packets_dropped += 1;
193            return false;
194        }
195
196        // Check buffer capacity
197        if self.packets.len() >= self.config.max_packets {
198            // Drop oldest packet
199            if let Some(&oldest_seq) = self.packets.keys().next() {
200                self.packets.remove(&oldest_seq);
201                self.stats.packets_dropped += 1;
202            }
203        }
204
205        // Buffer the packet
206        self.packets.insert(
207            extended_seq,
208            BufferedPacket {
209                seq,
210                timestamp,
211                payload,
212                received_at: now,
213                synthesized: false,
214            },
215        );
216
217        self.last_seq = Some(seq);
218        true
219    }
220
221    /// Pop the next packet for playout, if ready.
222    ///
223    /// Returns `None` if no packet is ready (still buffering or waiting for delay).
224    pub fn pop(&mut self) -> Option<BufferedPacket> {
225        let now = Instant::now();
226
227        // Determine which sequence to play
228        let target_seq = if let Some(seq) = self.playout_seq {
229            seq
230        } else {
231            // Not yet playing - check if we should start
232            if !self.should_start_playout(now) {
233                return None;
234            }
235            // Start with the lowest buffered sequence
236            let first_seq = *self.packets.keys().next()?;
237            self.playout_seq = Some(first_seq);
238            self.playing = true;
239            first_seq
240        };
241
242        // Try to get the target packet
243        let packet = if let Some(pkt) = self.packets.remove(&target_seq) {
244            Some(pkt)
245        } else {
246            // Packet missing - loss concealment
247            self.stats.packets_lost += 1;
248            // Return a synthesized empty packet for PLC
249            Some(BufferedPacket {
250                seq: (target_seq & 0xFFFF) as u16,
251                timestamp: self.estimate_timestamp(target_seq),
252                payload: Vec::new(), // Empty = signal for PLC
253                received_at: now,
254                synthesized: true,
255            })
256        };
257
258        self.stats.packets_played += 1;
259
260        // Advance playout sequence
261        self.playout_seq = Some(target_seq.wrapping_add(1));
262
263        // Update statistics
264        self.stats.buffer_depth = self.packets.len();
265        self.stats.current_delay_ms = self.current_delay_ms;
266        self.stats.observed_jitter_ms = self.jitter_ms();
267
268        // Adapt delay if in adaptive mode
269        if matches!(self.config.mode, JitterMode::Adaptive { .. }) {
270            self.adapt_delay();
271        }
272
273        packet
274    }
275
276    /// Get current statistics.
277    pub fn stats(&self) -> JitterStats {
278        let mut stats = self.stats.clone();
279        stats.buffer_depth = self.packets.len();
280        stats.current_delay_ms = self.current_delay_ms;
281        stats.observed_jitter_ms = self.jitter_ms();
282        stats
283    }
284
285    /// Get current delay in milliseconds.
286    pub fn delay_ms(&self) -> u32 {
287        self.current_delay_ms
288    }
289
290    /// Get observed jitter in milliseconds.
291    pub fn jitter_ms(&self) -> f64 {
292        // Convert from timestamp units to ms
293        (self.jitter_estimate / self.config.clock_rate as f64) * 1000.0
294    }
295
296    /// Reset the jitter buffer.
297    pub fn reset(&mut self) {
298        self.packets.clear();
299        self.playout_seq = None;
300        self.seq_cycles = 0;
301        self.last_seq = None;
302        self.jitter_estimate = 0.0;
303        self.last_transit = None;
304        self.base_timestamp = None;
305        self.base_time = None;
306        self.playing = false;
307        self.stats = JitterStats::default();
308
309        // Reset delay to initial
310        self.current_delay_ms = match self.config.mode {
311            JitterMode::Fixed { delay_ms } => delay_ms,
312            JitterMode::Adaptive { target_ms, .. } => target_ms,
313        };
314    }
315
316    /// Flush all buffered packets.
317    pub fn flush(&mut self) -> Vec<BufferedPacket> {
318        let packets: Vec<_> = self.packets.values().cloned().collect();
319        self.packets.clear();
320        packets
321    }
322
323    /// Check if the buffer is empty.
324    pub fn is_empty(&self) -> bool {
325        self.packets.is_empty()
326    }
327
328    /// Get the number of buffered packets.
329    pub fn len(&self) -> usize {
330        self.packets.len()
331    }
332
333    // --- Internal methods ---
334
335    fn extend_seq(&mut self, seq: u16) -> u32 {
336        if let Some(last) = self.last_seq {
337            // Detect rollover
338            if seq < last && (last.wrapping_sub(seq)) > 0x8000 {
339                self.seq_cycles = self.seq_cycles.wrapping_add(1);
340            } else if seq > last && (seq.wrapping_sub(last)) > 0x8000 {
341                self.seq_cycles = self.seq_cycles.wrapping_sub(1);
342            }
343        }
344        ((self.seq_cycles as u32) << 16) | (seq as u32)
345    }
346
347    fn update_jitter(&mut self, timestamp: u32, now: Instant) {
348        let base_ts = match self.base_timestamp {
349            Some(ts) => ts,
350            None => return,
351        };
352        let base_time = match self.base_time {
353            Some(t) => t,
354            None => return,
355        };
356
357        // Calculate transit time in timestamp units
358        let arrival_ts = now.duration_since(base_time).as_micros() as i64
359            * self.config.clock_rate as i64
360            / 1_000_000;
361        let send_ts = timestamp.wrapping_sub(base_ts) as i64;
362        let transit = arrival_ts - send_ts;
363
364        if let Some(last_transit) = self.last_transit {
365            // RFC 3550 jitter calculation
366            let d = (transit - last_transit).abs() as f64;
367            self.jitter_estimate += (d - self.jitter_estimate) / 16.0;
368        }
369
370        self.last_transit = Some(transit);
371    }
372
373    fn should_start_playout(&self, now: Instant) -> bool {
374        // Need at least one packet
375        if self.packets.is_empty() {
376            return false;
377        }
378
379        // Check if we've waited long enough
380        if let Some(base_time) = self.base_time {
381            let elapsed = now.duration_since(base_time);
382            let delay = Duration::from_millis(self.current_delay_ms as u64);
383            return elapsed >= delay;
384        }
385
386        false
387    }
388
389    fn adapt_delay(&mut self) {
390        let JitterMode::Adaptive {
391            min_ms,
392            max_ms,
393            target_ms: _,
394        } = self.config.mode
395        else {
396            return;
397        };
398
399        // Target delay = 2 * observed jitter (with bounds)
400        let jitter_ms = self.jitter_ms();
401        let target = (jitter_ms * 2.0) as u32;
402        let target = target.clamp(min_ms, max_ms);
403
404        // Smooth adjustment (don't change too fast)
405        if target > self.current_delay_ms {
406            // Increase quickly when jitter rises
407            self.current_delay_ms = self
408                .current_delay_ms
409                .saturating_add(((target - self.current_delay_ms) / 4).max(1));
410        } else if target < self.current_delay_ms {
411            // Decrease slowly when jitter falls
412            self.current_delay_ms = self
413                .current_delay_ms
414                .saturating_sub(((self.current_delay_ms - target) / 8).max(1));
415        }
416
417        self.current_delay_ms = self.current_delay_ms.clamp(min_ms, max_ms);
418    }
419
420    fn estimate_timestamp(&self, extended_seq: u32) -> u32 {
421        // Estimate timestamp based on sequence and samples per packet
422        // Assuming 20ms frames at the clock rate
423        let samples_per_frame = self.config.clock_rate / 50; // 20ms
424        let base = self.base_timestamp.unwrap_or(0);
425        let seq_offset = extended_seq.wrapping_sub(self.playout_seq.unwrap_or(extended_seq));
426        base.wrapping_add(seq_offset * samples_per_frame)
427    }
428}
429
430impl std::fmt::Debug for JitterBuffer {
431    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
432        f.debug_struct("JitterBuffer")
433            .field("mode", &self.config.mode)
434            .field("buffered", &self.packets.len())
435            .field("delay_ms", &self.current_delay_ms)
436            .field("jitter_ms", &self.jitter_ms())
437            .field("playing", &self.playing)
438            .finish()
439    }
440}
441
442#[cfg(test)]
443mod tests {
444    use super::*;
445    use std::thread::sleep;
446
447    #[test]
448    fn test_fixed_jitter_buffer_basic() {
449        let config = JitterConfig {
450            mode: JitterMode::Fixed { delay_ms: 20 },
451            clock_rate: 8000,
452            max_packets: 10,
453        };
454        let mut jitter = JitterBuffer::new(config);
455
456        // Push some packets
457        assert!(jitter.push(0, 0, vec![1, 2, 3]));
458        assert!(jitter.push(1, 160, vec![4, 5, 6]));
459        assert!(jitter.push(2, 320, vec![7, 8, 9]));
460
461        assert_eq!(jitter.len(), 3);
462
463        // Wait for delay
464        sleep(Duration::from_millis(25));
465
466        // Pop packets in order
467        let p1 = jitter.pop().unwrap();
468        assert_eq!(p1.seq, 0);
469        assert_eq!(p1.payload, vec![1, 2, 3]);
470
471        let p2 = jitter.pop().unwrap();
472        assert_eq!(p2.seq, 1);
473
474        let p3 = jitter.pop().unwrap();
475        assert_eq!(p3.seq, 2);
476    }
477
478    #[test]
479    fn test_packet_reordering() {
480        let config = JitterConfig {
481            mode: JitterMode::Fixed { delay_ms: 10 },
482            clock_rate: 8000,
483            max_packets: 10,
484        };
485        let mut jitter = JitterBuffer::new(config);
486
487        // Push packets out of order
488        jitter.push(2, 320, vec![3]);
489        jitter.push(0, 0, vec![1]);
490        jitter.push(1, 160, vec![2]);
491
492        sleep(Duration::from_millis(15));
493
494        // Should come out in order
495        assert_eq!(jitter.pop().unwrap().seq, 0);
496        assert_eq!(jitter.pop().unwrap().seq, 1);
497        assert_eq!(jitter.pop().unwrap().seq, 2);
498    }
499
500    #[test]
501    fn test_packet_loss_detection() {
502        let config = JitterConfig {
503            mode: JitterMode::Fixed { delay_ms: 10 },
504            clock_rate: 8000,
505            max_packets: 10,
506        };
507        let mut jitter = JitterBuffer::new(config);
508
509        // Push packets with gap (missing seq 1)
510        jitter.push(0, 0, vec![1]);
511        jitter.push(2, 320, vec![3]);
512
513        sleep(Duration::from_millis(15));
514
515        // First packet normal
516        let p1 = jitter.pop().unwrap();
517        assert_eq!(p1.seq, 0);
518        assert!(!p1.synthesized);
519
520        // Second packet is synthesized (loss)
521        let p2 = jitter.pop().unwrap();
522        assert_eq!(p2.seq, 1);
523        assert!(p2.synthesized);
524        assert!(p2.payload.is_empty());
525
526        // Third packet normal
527        let p3 = jitter.pop().unwrap();
528        assert_eq!(p3.seq, 2);
529        assert!(!p3.synthesized);
530
531        let stats = jitter.stats();
532        assert_eq!(stats.packets_lost, 1);
533    }
534
535    #[test]
536    fn test_late_packet_dropped() {
537        let config = JitterConfig {
538            mode: JitterMode::Fixed { delay_ms: 5 },
539            clock_rate: 8000,
540            max_packets: 10,
541        };
542        let mut jitter = JitterBuffer::new(config);
543
544        jitter.push(0, 0, vec![1]);
545        jitter.push(1, 160, vec![2]);
546
547        sleep(Duration::from_millis(10));
548
549        // Play first packet
550        jitter.pop();
551
552        // Try to push packet 0 again (too late)
553        assert!(!jitter.push(0, 0, vec![1]));
554
555        let stats = jitter.stats();
556        assert_eq!(stats.packets_dropped, 1);
557    }
558
559    #[test]
560    fn test_adaptive_jitter_buffer() {
561        let config = JitterConfig {
562            mode: JitterMode::Adaptive {
563                target_ms: 40,
564                min_ms: 20,
565                max_ms: 200,
566            },
567            clock_rate: 8000,
568            max_packets: 20,
569        };
570        let mut jitter = JitterBuffer::new(config);
571
572        assert_eq!(jitter.delay_ms(), 40); // Starts at target
573
574        // Push packets with simulated jitter
575        for i in 0..10u16 {
576            jitter.push(i, i as u32 * 160, vec![i as u8]);
577            sleep(Duration::from_millis(5)); // Varying arrival
578        }
579
580        sleep(Duration::from_millis(50));
581
582        // Pop packets and check adaptation
583        for _ in 0..5 {
584            jitter.pop();
585        }
586
587        // Delay should have adapted (may increase or decrease)
588        let delay = jitter.delay_ms();
589        assert!(delay >= 20 && delay <= 200);
590    }
591
592    #[test]
593    fn test_sequence_rollover_in_jitter_buffer() {
594        let config = JitterConfig {
595            mode: JitterMode::Fixed { delay_ms: 5 },
596            clock_rate: 8000,
597            max_packets: 10,
598        };
599        let mut jitter = JitterBuffer::new(config);
600
601        // Push packets near rollover
602        jitter.push(65534, 0, vec![1]);
603        jitter.push(65535, 160, vec![2]);
604        jitter.push(0, 320, vec![3]); // Rollover
605        jitter.push(1, 480, vec![4]);
606
607        sleep(Duration::from_millis(10));
608
609        // Should come out in order across rollover
610        assert_eq!(jitter.pop().unwrap().seq, 65534);
611        assert_eq!(jitter.pop().unwrap().seq, 65535);
612        assert_eq!(jitter.pop().unwrap().seq, 0);
613        assert_eq!(jitter.pop().unwrap().seq, 1);
614    }
615
616    #[test]
617    fn test_buffer_overflow() {
618        let config = JitterConfig {
619            mode: JitterMode::Fixed { delay_ms: 100 },
620            clock_rate: 8000,
621            max_packets: 3,
622        };
623        let mut jitter = JitterBuffer::new(config);
624
625        // Push more than max
626        jitter.push(0, 0, vec![1]);
627        jitter.push(1, 160, vec![2]);
628        jitter.push(2, 320, vec![3]);
629        jitter.push(3, 480, vec![4]); // Should drop oldest
630
631        assert_eq!(jitter.len(), 3);
632
633        let stats = jitter.stats();
634        assert_eq!(stats.packets_dropped, 1);
635    }
636
637    #[test]
638    fn test_reset() {
639        let config = JitterConfig::default();
640        let mut jitter = JitterBuffer::new(config);
641
642        jitter.push(0, 0, vec![1]);
643        jitter.push(1, 160, vec![2]);
644
645        jitter.reset();
646
647        assert!(jitter.is_empty());
648        assert_eq!(jitter.stats().packets_received, 0);
649    }
650
651    #[test]
652    fn test_flush() {
653        let config = JitterConfig::default();
654        let mut jitter = JitterBuffer::new(config);
655
656        jitter.push(0, 0, vec![1]);
657        jitter.push(1, 160, vec![2]);
658        jitter.push(2, 320, vec![3]);
659
660        let flushed = jitter.flush();
661        assert_eq!(flushed.len(), 3);
662        assert!(jitter.is_empty());
663    }
664
665    #[test]
666    fn test_jitter_calculation() {
667        let config = JitterConfig {
668            mode: JitterMode::Fixed { delay_ms: 10 },
669            clock_rate: 8000,
670            max_packets: 20,
671        };
672        let mut jitter = JitterBuffer::new(config);
673
674        // Simulate packets with varying inter-arrival times
675        jitter.push(0, 0, vec![1]);
676        sleep(Duration::from_millis(20));
677        jitter.push(1, 160, vec![2]);
678        sleep(Duration::from_millis(25));
679        jitter.push(2, 320, vec![3]);
680        sleep(Duration::from_millis(15));
681        jitter.push(3, 480, vec![4]);
682
683        // Jitter should be non-zero due to varying arrival
684        let jitter_ms = jitter.jitter_ms();
685        assert!(jitter_ms >= 0.0);
686    }
687
688    #[test]
689    fn test_stats() {
690        let config = JitterConfig {
691            mode: JitterMode::Fixed { delay_ms: 5 },
692            clock_rate: 8000,
693            max_packets: 10,
694        };
695        let mut jitter = JitterBuffer::new(config);
696
697        jitter.push(0, 0, vec![1]);
698        jitter.push(1, 160, vec![2]);
699        // Skip seq 2
700        jitter.push(3, 480, vec![4]);
701
702        sleep(Duration::from_millis(10));
703
704        jitter.pop(); // seq 0
705        jitter.pop(); // seq 1
706        jitter.pop(); // seq 2 (synthesized)
707        jitter.pop(); // seq 3
708
709        let stats = jitter.stats();
710        assert_eq!(stats.packets_received, 3);
711        assert_eq!(stats.packets_played, 4);
712        assert_eq!(stats.packets_lost, 1);
713    }
714}