Skip to main content

rtp_core/
jitter.rs

1use crate::packet::RtpPacket;
2use std::collections::BTreeMap;
3
4/// A simple fixed-size jitter buffer that reorders incoming RTP packets
5/// by sequence number and provides them in order.
6pub struct JitterBuffer {
7    /// Buffer indexed by sequence number
8    buffer: BTreeMap<u16, RtpPacket>,
9    /// Maximum number of packets to buffer
10    capacity: usize,
11    /// Next expected sequence number to playout
12    next_playout_seq: Option<u16>,
13    /// Total packets received
14    packets_received: u64,
15    /// Total packets dropped (buffer overflow or too late)
16    packets_dropped: u64,
17}
18
19impl JitterBuffer {
20    pub fn new(capacity: usize) -> Self {
21        Self {
22            buffer: BTreeMap::new(),
23            capacity,
24            next_playout_seq: None,
25            packets_received: 0,
26            packets_dropped: 0,
27        }
28    }
29
30    /// Insert a packet into the jitter buffer
31    pub fn insert(&mut self, packet: RtpPacket) {
32        self.packets_received += 1;
33        let seq = packet.sequence_number;
34
35        // If we have a playout sequence and this packet is too old, drop it
36        if let Some(next_seq) = self.next_playout_seq {
37            if Self::seq_before(seq, next_seq) {
38                self.packets_dropped += 1;
39                return;
40            }
41        }
42
43        // If buffer is full, drop the oldest packet or reject
44        if self.buffer.len() >= self.capacity {
45            // Remove the oldest packet
46            if let Some(&oldest_seq) = self.buffer.keys().next() {
47                if Self::seq_before(oldest_seq, seq) {
48                    self.buffer.remove(&oldest_seq);
49                    self.packets_dropped += 1;
50                } else {
51                    // New packet is older than everything in buffer, drop it
52                    self.packets_dropped += 1;
53                    return;
54                }
55            }
56        }
57
58        self.buffer.insert(seq, packet);
59
60        // Initialize playout sequence if not set
61        if self.next_playout_seq.is_none() && self.buffer.len() >= self.min_fill_level() {
62            self.next_playout_seq = self.buffer.keys().next().copied();
63        }
64    }
65
66    /// Get the next packet in sequence order for playout.
67    /// Returns None if no packet is ready.
68    pub fn pop(&mut self) -> Option<RtpPacket> {
69        let next_seq = self.next_playout_seq?;
70
71        if let Some(packet) = self.buffer.remove(&next_seq) {
72            self.next_playout_seq = Some(next_seq.wrapping_add(1));
73            Some(packet)
74        } else {
75            // Packet is missing (lost); advance the playout pointer
76            self.next_playout_seq = Some(next_seq.wrapping_add(1));
77            None
78        }
79    }
80
81    /// Peek at the next packet without removing it
82    pub fn peek(&self) -> Option<&RtpPacket> {
83        let next_seq = self.next_playout_seq?;
84        self.buffer.get(&next_seq)
85    }
86
87    /// Get the number of packets currently buffered
88    pub fn len(&self) -> usize {
89        self.buffer.len()
90    }
91
92    /// Check if the buffer is empty
93    pub fn is_empty(&self) -> bool {
94        self.buffer.is_empty()
95    }
96
97    /// Reset the buffer
98    pub fn reset(&mut self) {
99        self.buffer.clear();
100        self.next_playout_seq = None;
101    }
102
103    /// Total packets received
104    pub fn packets_received(&self) -> u64 {
105        self.packets_received
106    }
107
108    /// Total packets dropped
109    pub fn packets_dropped(&self) -> u64 {
110        self.packets_dropped
111    }
112
113    /// Minimum fill level before starting playout
114    fn min_fill_level(&self) -> usize {
115        // Start playing when buffer is at least 25% full, min 1
116        (self.capacity / 4).max(1)
117    }
118
119    /// Check if sequence a comes before sequence b (with wrapping)
120    fn seq_before(a: u16, b: u16) -> bool {
121        // Using signed comparison for wrapping sequence numbers
122        let diff = a.wrapping_sub(b) as i16;
123        diff < 0
124    }
125}
126
127#[cfg(test)]
128mod tests {
129    use super::*;
130
131    fn make_packet(seq: u16, ts: u32) -> RtpPacket {
132        RtpPacket::new(0, seq, ts, 0x12345678).with_payload(vec![0x7F; 160])
133    }
134
135    #[test]
136    fn test_new_jitter_buffer() {
137        let jb = JitterBuffer::new(10);
138        assert!(jb.is_empty());
139        assert_eq!(jb.len(), 0);
140        assert_eq!(jb.packets_received(), 0);
141        assert_eq!(jb.packets_dropped(), 0);
142    }
143
144    #[test]
145    fn test_insert_and_pop_in_order() {
146        let mut jb = JitterBuffer::new(10);
147
148        // Insert packets in order
149        for i in 0..5 {
150            jb.insert(make_packet(i, i as u32 * 160));
151        }
152
153        // Pop should return them in order
154        for i in 0..5 {
155            let pkt = jb.pop().unwrap();
156            assert_eq!(pkt.sequence_number, i);
157        }
158    }
159
160    #[test]
161    fn test_insert_out_of_order() {
162        let mut jb = JitterBuffer::new(10);
163
164        // Insert packets out of order
165        jb.insert(make_packet(2, 320));
166        jb.insert(make_packet(0, 0));
167        jb.insert(make_packet(1, 160));
168        jb.insert(make_packet(4, 640));
169        jb.insert(make_packet(3, 480));
170
171        // Pop should return them in sequence order
172        assert_eq!(jb.pop().unwrap().sequence_number, 0);
173        assert_eq!(jb.pop().unwrap().sequence_number, 1);
174        assert_eq!(jb.pop().unwrap().sequence_number, 2);
175        assert_eq!(jb.pop().unwrap().sequence_number, 3);
176        assert_eq!(jb.pop().unwrap().sequence_number, 4);
177    }
178
179    #[test]
180    fn test_missing_packet() {
181        let mut jb = JitterBuffer::new(10);
182
183        jb.insert(make_packet(0, 0));
184        jb.insert(make_packet(1, 160));
185        // Skip 2
186        jb.insert(make_packet(3, 480));
187
188        assert_eq!(jb.pop().unwrap().sequence_number, 0);
189        assert_eq!(jb.pop().unwrap().sequence_number, 1);
190        // Seq 2 is missing
191        assert!(jb.pop().is_none());
192        // Now seq 3 should be available
193        assert_eq!(jb.pop().unwrap().sequence_number, 3);
194    }
195
196    #[test]
197    fn test_buffer_overflow() {
198        let mut jb = JitterBuffer::new(3);
199
200        jb.insert(make_packet(0, 0));
201        jb.insert(make_packet(1, 160));
202        jb.insert(make_packet(2, 320));
203        // Buffer is full, inserting a new packet should drop the oldest
204        jb.insert(make_packet(3, 480));
205
206        assert_eq!(jb.len(), 3);
207        assert!(jb.packets_dropped() > 0);
208    }
209
210    #[test]
211    fn test_late_packet_dropped() {
212        let mut jb = JitterBuffer::new(10);
213
214        jb.insert(make_packet(5, 800));
215        jb.insert(make_packet(6, 960));
216        jb.insert(make_packet(7, 1120));
217
218        // Pop a few
219        jb.pop(); // 5
220        jb.pop(); // 6
221
222        // Insert a packet that's already been played out
223        let dropped_before = jb.packets_dropped();
224        jb.insert(make_packet(4, 640));
225        assert_eq!(jb.packets_dropped(), dropped_before + 1);
226    }
227
228    #[test]
229    fn test_reset() {
230        let mut jb = JitterBuffer::new(10);
231
232        jb.insert(make_packet(0, 0));
233        jb.insert(make_packet(1, 160));
234        jb.reset();
235
236        assert!(jb.is_empty());
237        assert_eq!(jb.len(), 0);
238        // Stats should be preserved
239        assert_eq!(jb.packets_received(), 2);
240    }
241
242    #[test]
243    fn test_peek() {
244        let mut jb = JitterBuffer::new(10);
245
246        jb.insert(make_packet(0, 0));
247        jb.insert(make_packet(1, 160));
248
249        let peeked = jb.peek().unwrap();
250        assert_eq!(peeked.sequence_number, 0);
251        // Peek shouldn't remove the packet
252        assert_eq!(jb.len(), 2);
253
254        let popped = jb.pop().unwrap();
255        assert_eq!(popped.sequence_number, 0);
256        assert_eq!(jb.len(), 1);
257    }
258
259    #[test]
260    fn test_seq_wrapping() {
261        let mut jb = JitterBuffer::new(10);
262
263        // Wrap around u16 max
264        jb.insert(make_packet(65534, 0));
265        jb.insert(make_packet(65535, 160));
266        jb.insert(make_packet(0, 320));
267        jb.insert(make_packet(1, 480));
268
269        assert_eq!(jb.pop().unwrap().sequence_number, 65534);
270        assert_eq!(jb.pop().unwrap().sequence_number, 65535);
271        assert_eq!(jb.pop().unwrap().sequence_number, 0);
272        assert_eq!(jb.pop().unwrap().sequence_number, 1);
273    }
274
275    #[test]
276    fn test_duplicate_packet() {
277        let mut jb = JitterBuffer::new(10);
278
279        jb.insert(make_packet(0, 0));
280        jb.insert(make_packet(0, 0)); // Duplicate
281        jb.insert(make_packet(1, 160));
282
283        // BTreeMap replaces, so len should still be 2
284        assert_eq!(jb.len(), 2);
285    }
286
287    #[test]
288    fn test_empty_pop() {
289        let mut jb = JitterBuffer::new(10);
290        assert!(jb.pop().is_none());
291    }
292
293    #[test]
294    fn test_stats() {
295        let mut jb = JitterBuffer::new(10);
296
297        for i in 0..5 {
298            jb.insert(make_packet(i, i as u32 * 160));
299        }
300
301        assert_eq!(jb.packets_received(), 5);
302
303        for _ in 0..5 {
304            jb.pop();
305        }
306
307        // Insert a late packet
308        jb.insert(make_packet(0, 0));
309        assert_eq!(jb.packets_received(), 6);
310        assert_eq!(jb.packets_dropped(), 1);
311    }
312}