1use crate::packet::RtpPacket;
2use std::collections::BTreeMap;
3
4pub struct JitterBuffer {
7 buffer: BTreeMap<u16, RtpPacket>,
9 capacity: usize,
11 next_playout_seq: Option<u16>,
13 packets_received: u64,
15 packets_dropped: u64,
17}
18
19impl JitterBuffer {
20 pub fn new(capacity: usize) -> Self {
21 Self {
22 buffer: BTreeMap::new(),
23 capacity,
24 next_playout_seq: None,
25 packets_received: 0,
26 packets_dropped: 0,
27 }
28 }
29
30 pub fn insert(&mut self, packet: RtpPacket) {
32 self.packets_received += 1;
33 let seq = packet.sequence_number;
34
35 if let Some(next_seq) = self.next_playout_seq {
37 if Self::seq_before(seq, next_seq) {
38 self.packets_dropped += 1;
39 return;
40 }
41 }
42
43 if self.buffer.len() >= self.capacity {
45 if let Some(&oldest_seq) = self.buffer.keys().next() {
47 if Self::seq_before(oldest_seq, seq) {
48 self.buffer.remove(&oldest_seq);
49 self.packets_dropped += 1;
50 } else {
51 self.packets_dropped += 1;
53 return;
54 }
55 }
56 }
57
58 self.buffer.insert(seq, packet);
59
60 if self.next_playout_seq.is_none() && self.buffer.len() >= self.min_fill_level() {
62 self.next_playout_seq = self.buffer.keys().next().copied();
63 }
64 }
65
66 pub fn pop(&mut self) -> Option<RtpPacket> {
69 let next_seq = self.next_playout_seq?;
70
71 if let Some(packet) = self.buffer.remove(&next_seq) {
72 self.next_playout_seq = Some(next_seq.wrapping_add(1));
73 Some(packet)
74 } else {
75 self.next_playout_seq = Some(next_seq.wrapping_add(1));
77 None
78 }
79 }
80
81 pub fn peek(&self) -> Option<&RtpPacket> {
83 let next_seq = self.next_playout_seq?;
84 self.buffer.get(&next_seq)
85 }
86
87 pub fn len(&self) -> usize {
89 self.buffer.len()
90 }
91
92 pub fn is_empty(&self) -> bool {
94 self.buffer.is_empty()
95 }
96
97 pub fn reset(&mut self) {
99 self.buffer.clear();
100 self.next_playout_seq = None;
101 }
102
103 pub fn packets_received(&self) -> u64 {
105 self.packets_received
106 }
107
108 pub fn packets_dropped(&self) -> u64 {
110 self.packets_dropped
111 }
112
113 fn min_fill_level(&self) -> usize {
115 (self.capacity / 4).max(1)
117 }
118
119 fn seq_before(a: u16, b: u16) -> bool {
121 let diff = a.wrapping_sub(b) as i16;
123 diff < 0
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use super::*;
130
131 fn make_packet(seq: u16, ts: u32) -> RtpPacket {
132 RtpPacket::new(0, seq, ts, 0x12345678).with_payload(vec![0x7F; 160])
133 }
134
135 #[test]
136 fn test_new_jitter_buffer() {
137 let jb = JitterBuffer::new(10);
138 assert!(jb.is_empty());
139 assert_eq!(jb.len(), 0);
140 assert_eq!(jb.packets_received(), 0);
141 assert_eq!(jb.packets_dropped(), 0);
142 }
143
144 #[test]
145 fn test_insert_and_pop_in_order() {
146 let mut jb = JitterBuffer::new(10);
147
148 for i in 0..5 {
150 jb.insert(make_packet(i, i as u32 * 160));
151 }
152
153 for i in 0..5 {
155 let pkt = jb.pop().unwrap();
156 assert_eq!(pkt.sequence_number, i);
157 }
158 }
159
160 #[test]
161 fn test_insert_out_of_order() {
162 let mut jb = JitterBuffer::new(10);
163
164 jb.insert(make_packet(2, 320));
166 jb.insert(make_packet(0, 0));
167 jb.insert(make_packet(1, 160));
168 jb.insert(make_packet(4, 640));
169 jb.insert(make_packet(3, 480));
170
171 assert_eq!(jb.pop().unwrap().sequence_number, 0);
173 assert_eq!(jb.pop().unwrap().sequence_number, 1);
174 assert_eq!(jb.pop().unwrap().sequence_number, 2);
175 assert_eq!(jb.pop().unwrap().sequence_number, 3);
176 assert_eq!(jb.pop().unwrap().sequence_number, 4);
177 }
178
179 #[test]
180 fn test_missing_packet() {
181 let mut jb = JitterBuffer::new(10);
182
183 jb.insert(make_packet(0, 0));
184 jb.insert(make_packet(1, 160));
185 jb.insert(make_packet(3, 480));
187
188 assert_eq!(jb.pop().unwrap().sequence_number, 0);
189 assert_eq!(jb.pop().unwrap().sequence_number, 1);
190 assert!(jb.pop().is_none());
192 assert_eq!(jb.pop().unwrap().sequence_number, 3);
194 }
195
196 #[test]
197 fn test_buffer_overflow() {
198 let mut jb = JitterBuffer::new(3);
199
200 jb.insert(make_packet(0, 0));
201 jb.insert(make_packet(1, 160));
202 jb.insert(make_packet(2, 320));
203 jb.insert(make_packet(3, 480));
205
206 assert_eq!(jb.len(), 3);
207 assert!(jb.packets_dropped() > 0);
208 }
209
210 #[test]
211 fn test_late_packet_dropped() {
212 let mut jb = JitterBuffer::new(10);
213
214 jb.insert(make_packet(5, 800));
215 jb.insert(make_packet(6, 960));
216 jb.insert(make_packet(7, 1120));
217
218 jb.pop(); jb.pop(); let dropped_before = jb.packets_dropped();
224 jb.insert(make_packet(4, 640));
225 assert_eq!(jb.packets_dropped(), dropped_before + 1);
226 }
227
228 #[test]
229 fn test_reset() {
230 let mut jb = JitterBuffer::new(10);
231
232 jb.insert(make_packet(0, 0));
233 jb.insert(make_packet(1, 160));
234 jb.reset();
235
236 assert!(jb.is_empty());
237 assert_eq!(jb.len(), 0);
238 assert_eq!(jb.packets_received(), 2);
240 }
241
242 #[test]
243 fn test_peek() {
244 let mut jb = JitterBuffer::new(10);
245
246 jb.insert(make_packet(0, 0));
247 jb.insert(make_packet(1, 160));
248
249 let peeked = jb.peek().unwrap();
250 assert_eq!(peeked.sequence_number, 0);
251 assert_eq!(jb.len(), 2);
253
254 let popped = jb.pop().unwrap();
255 assert_eq!(popped.sequence_number, 0);
256 assert_eq!(jb.len(), 1);
257 }
258
259 #[test]
260 fn test_seq_wrapping() {
261 let mut jb = JitterBuffer::new(10);
262
263 jb.insert(make_packet(65534, 0));
265 jb.insert(make_packet(65535, 160));
266 jb.insert(make_packet(0, 320));
267 jb.insert(make_packet(1, 480));
268
269 assert_eq!(jb.pop().unwrap().sequence_number, 65534);
270 assert_eq!(jb.pop().unwrap().sequence_number, 65535);
271 assert_eq!(jb.pop().unwrap().sequence_number, 0);
272 assert_eq!(jb.pop().unwrap().sequence_number, 1);
273 }
274
275 #[test]
276 fn test_duplicate_packet() {
277 let mut jb = JitterBuffer::new(10);
278
279 jb.insert(make_packet(0, 0));
280 jb.insert(make_packet(0, 0)); jb.insert(make_packet(1, 160));
282
283 assert_eq!(jb.len(), 2);
285 }
286
287 #[test]
288 fn test_empty_pop() {
289 let mut jb = JitterBuffer::new(10);
290 assert!(jb.pop().is_none());
291 }
292
293 #[test]
294 fn test_stats() {
295 let mut jb = JitterBuffer::new(10);
296
297 for i in 0..5 {
298 jb.insert(make_packet(i, i as u32 * 160));
299 }
300
301 assert_eq!(jb.packets_received(), 5);
302
303 for _ in 0..5 {
304 jb.pop();
305 }
306
307 jb.insert(make_packet(0, 0));
309 assert_eq!(jb.packets_received(), 6);
310 assert_eq!(jb.packets_dropped(), 1);
311 }
312}