1use std::collections::BinaryHeap;
2use std::time::SystemTime;
3
4pub struct JitterBuffer<P, const S: u8>
6where
7 P: Packet,
8{
9 last: Option<JitterPacket<P>>,
10 heap: BinaryHeap<JitterPacket<P>>,
11}
12
13impl<P, const S: u8> JitterBuffer<P, S>
14where
15 P: Packet,
16{
17 pub fn new() -> Self {
19 Self {
20 last: None,
21 heap: BinaryHeap::with_capacity(S as usize),
22 }
23 }
24
25 pub fn push(&mut self, packet: P) {
30 if self.heap.len() >= S as usize {
31 while self.heap.len() >= S as usize && !self.heap.is_empty() {
32 let dropped = self.heap.pop();
34 self.last = None;
35 #[cfg(feature = "log")]
36 log::warn!("dropping packet: {:?}", dropped.map(|p| p.sequence_number));
37 }
38 }
39
40 if let Some(ref last) = self.last {
41 if last.sequence_number >= packet.sequence_number().into() {
42 #[cfg(feature = "log")]
43 log::warn!(
44 "discarded packet {} since newer packet was already played back",
45 packet.sequence_number()
46 );
47
48 return;
49 }
50 }
51
52 if self
53 .heap
54 .iter()
55 .any(|p| p.sequence_number == packet.sequence_number().into())
56 {
57 #[cfg(feature = "log")]
58 log::warn!(
59 "discarded packet {} since its already buffered",
60 packet.sequence_number()
61 );
62
63 return;
64 }
65
66 if !self.heap.is_empty() {
67 let max_seq = self.heap.iter().max().unwrap().sequence_number;
69
70 if SequenceNumber(max_seq.0.overflowing_add(S as u16).0)
71 < packet.sequence_number().into()
72 {
73 #[cfg(feature = "log")]
74 log::warn!(
75 "unexpectedly received packet {} which is too far ahead (over {S} packets) of current playback window, clearing jitter buffer",
76 packet.sequence_number()
77 );
78
79 self.clear();
80 }
81 }
82
83 #[cfg(feature = "log")]
84 log::debug!("pushed packet {} onto heap", packet.sequence_number());
85 self.heap.push(packet.into());
86 }
87
88 pub fn pop(&mut self) -> Option<P> {
94 if self.heap.is_empty() {
95 return None;
96 }
97
98 let last = match self.last {
99 Some(ref last) => last.to_owned(),
100 None => {
101 let mut packet = self.heap.pop().unwrap();
105 packet.yielded_at = Some(SystemTime::now());
106 self.last = Some(packet.clone());
107
108 #[cfg(feature = "log")]
109 log::debug!(
110 "packet {} yielded, {} remaining",
111 packet.sequence_number.0,
112 self.heap.len()
113 );
114
115 return packet.into();
116 }
117 };
118
119 let next_sequence = match self.heap.peek() {
120 Some(next) => next.sequence_number,
121 None => {
122 #[cfg(feature = "log")]
123 log::error!("expected next packet to be present but heap is empty");
124
125 return None;
126 }
127 };
128
129 let packet = if next_sequence == (u16::from(last.sequence_number).wrapping_add(1)).into() {
130 match self.heap.pop() {
131 Some(packet) => packet.into(),
132 None => {
133 #[cfg(feature = "log")]
134 log::error!("expected packet {} to be present", next_sequence.0);
135
136 return None;
137 }
138 }
139 } else {
140 None
141 };
142
143 self.last = Some(JitterPacket {
144 raw: packet.clone(),
145 sequence_number: packet
146 .as_ref()
147 .map(|p| p.sequence_number())
148 .unwrap_or_else(|| u16::from(last.sequence_number).wrapping_add(1))
149 .into(),
150 yielded_at: Some(SystemTime::now()),
151 });
152
153 #[cfg(feature = "log")]
154 log::debug!(
155 "packet {:?} yielded, {} remaining",
156 self.last.as_ref().map(|l| l.sequence_number),
157 self.heap.len()
158 );
159
160 packet
161 }
162
163 pub fn lossless_packets_buffered(&self) -> usize {
169 match self.last {
170 Some(ref last) => {
171 let mut last = last.sequence_number;
172 let mut count = 0;
173
174 let sequence_numbers = self.heap.clone().into_sorted_vec();
175 let sequence_numbers = sequence_numbers.iter().rev().map(|p| p.sequence_number);
176
177 #[cfg(feature = "log")]
178 log::debug!(
179 "compute lossless packets: {:?}",
180 sequence_numbers.clone().collect::<Vec<SequenceNumber>>()
181 );
182
183 for packet in sequence_numbers {
184 #[cfg(feature = "log")]
185 log::info!(
186 "is next of: {:?} {:?} = {}",
187 packet,
188 last,
189 packet.is_next_of(last)
190 );
191
192 if packet.is_next_of(last) {
193 #[cfg(feature = "log")]
194 log::debug!("{:?} is next of {:?}", packet, last);
195 last = packet;
196 count += 1;
197 } else {
198 break;
199 }
200 }
201
202 #[cfg(feature = "log")]
203 log::debug!("computed lossless packets: {count}");
204
205 count
206 }
207 None => 0,
208 }
209 }
210
211 pub fn clear(&mut self) {
213 self.last = None;
214 self.heap.clear();
215 }
216}
217
218impl<P: Packet, const S: u8> Default for JitterBuffer<P, S> {
219 fn default() -> Self {
220 Self::new()
221 }
222}
223
224pub trait Packet: Unpin + Clone {
226 fn sequence_number(&self) -> u16;
227}
228
229#[derive(Debug, Clone)]
230pub(crate) struct JitterPacket<P>
231where
232 P: Packet,
233{
234 pub(crate) raw: Option<P>,
235 pub(crate) sequence_number: SequenceNumber,
236 pub(crate) yielded_at: Option<SystemTime>,
237}
238
239impl<P> JitterPacket<P>
240where
241 P: Packet,
242{
243 fn into(self) -> Option<P> {
244 self.raw
245 }
246}
247
248impl<P> From<P> for JitterPacket<P>
249where
250 P: Packet,
251{
252 fn from(raw: P) -> Self {
253 Self {
254 sequence_number: raw.sequence_number().into(),
255 yielded_at: None,
256 raw: Some(raw),
257 }
258 }
259}
260
261impl<P> PartialEq for JitterPacket<P>
262where
263 P: Packet,
264{
265 fn eq(&self, other: &Self) -> bool {
266 self.sequence_number.eq(&other.sequence_number)
267 }
268}
269
270impl<P> Eq for JitterPacket<P> where P: Packet {}
271
272impl<P> PartialOrd for JitterPacket<P>
273where
274 P: Packet,
275{
276 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
277 Some(self.cmp(other))
278 }
279}
280
281impl<P> Ord for JitterPacket<P>
282where
283 P: Packet,
284{
285 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
286 self.sequence_number.cmp(&other.sequence_number).reverse()
287 }
288}
289
290#[derive(Debug, Clone, Copy, PartialEq, Eq)]
298struct SequenceNumber(u16);
299
300impl SequenceNumber {
301 const WRAPPING_WINDOW_SIZE: u16 = 16;
302 const WRAPPING_WINDOW_START: u16 = u16::MAX - (Self::WRAPPING_WINDOW_SIZE / 2);
303 const WRAPPING_WINDOW_END: u16 = u16::MIN + (Self::WRAPPING_WINDOW_SIZE / 2);
304
305 pub fn did_wrap(&self, next: Self) -> bool {
306 self.0 >= Self::WRAPPING_WINDOW_START && next.0 <= Self::WRAPPING_WINDOW_END
307 }
308
309 pub fn is_next_of(&self, last: SequenceNumber) -> bool {
310 if last.did_wrap(*self) {
311 return last.0 == u16::MAX && self.0 == u16::MIN;
312 }
313
314 last.0.wrapping_add(1) == self.0
315 }
316}
317
318impl PartialOrd for SequenceNumber {
319 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
320 Some(self.cmp(other))
321 }
322}
323
324impl Ord for SequenceNumber {
325 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
326 if self.did_wrap(*other) {
327 return std::cmp::Ordering::Less;
328 } else if other.did_wrap(*self) {
329 return std::cmp::Ordering::Greater;
330 }
331
332 self.0.cmp(&other.0)
333 }
334}
335
336impl From<u16> for SequenceNumber {
337 fn from(num: u16) -> Self {
338 Self(num)
339 }
340}
341
342impl From<SequenceNumber> for u16 {
343 fn from(sn: SequenceNumber) -> Self {
344 sn.0
345 }
346}
347
348#[cfg(test)]
349mod tests {
350 use super::*;
351
352 #[derive(Debug, Clone, PartialEq)]
353 struct Rtp {
354 seq: u16,
355 }
356
357 impl Packet for Rtp {
358 #[inline]
359 fn sequence_number(&self) -> u16 {
360 self.seq
361 }
362 }
363
364 #[test]
365 fn const_capacity() {
366 let jitter = JitterBuffer::<Rtp, 10>::new();
367 assert_eq!(jitter.heap.capacity(), 10);
368 }
369
370 #[test]
371 fn send() {
372 let mut jitter = JitterBuffer::<Rtp, 10>::new();
373 let packet = Rtp { seq: 0 };
374 jitter.push(packet.clone());
375 assert_eq!(jitter.heap.peek(), Some(&packet.into()));
376 }
377
378 #[test]
379 fn reorders_racing_packets() {
380 let mut jitter = JitterBuffer::<Rtp, 10>::new();
381
382 jitter.push(Rtp { seq: 0 });
383 assert_eq!(jitter.pop(), Some(Rtp { seq: 0 }));
384
385 jitter.push(Rtp { seq: 2 });
386 jitter.push(Rtp { seq: 1 });
387
388 assert_eq!(jitter.pop(), Some(Rtp { seq: 1 }));
389 assert_eq!(jitter.pop(), Some(Rtp { seq: 2 }));
390 }
391
392 #[test]
393 fn discards_already_played_packets() {
394 let mut jitter = JitterBuffer::<Rtp, 10>::new();
395
396 jitter.push(Rtp { seq: 0 });
397 assert_eq!(jitter.pop(), Some(Rtp { seq: 0 }));
398
399 jitter.push(Rtp { seq: 0 });
400 jitter.push(Rtp { seq: 1 });
401
402 assert_eq!(jitter.pop(), Some(Rtp { seq: 1 }));
403 }
404
405 #[test]
406 fn discards_duplicated_packets() {
407 let mut jitter = JitterBuffer::<Rtp, 10>::new();
408
409 jitter.push(Rtp { seq: 0 });
410 jitter.push(Rtp { seq: 0 });
411 jitter.push(Rtp { seq: 0 });
412 jitter.push(Rtp { seq: 0 });
413 jitter.push(Rtp { seq: 0 });
414
415 assert_eq!(jitter.pop(), Some(Rtp { seq: 0 }));
416 assert_eq!(jitter.heap.len(), 0);
417 assert_eq!(jitter.pop(), None);
418 }
419
420 #[test]
421 fn handles_packet_loss_correctly() {
422 let mut jitter = JitterBuffer::<Rtp, 10>::new();
423
424 jitter.push(Rtp { seq: 0 });
425 jitter.push(Rtp { seq: 1 });
426 jitter.push(Rtp { seq: 2 });
427 jitter.push(Rtp { seq: 3 });
428 jitter.push(Rtp { seq: 5 });
429
430 assert_eq!(jitter.pop(), Some(Rtp { seq: 0 }));
431 assert_eq!(jitter.pop(), Some(Rtp { seq: 1 }));
432 assert_eq!(jitter.pop(), Some(Rtp { seq: 2 }));
433 assert_eq!(jitter.pop(), Some(Rtp { seq: 3 }));
434 assert_eq!(jitter.pop(), None);
435 assert_eq!(jitter.pop(), Some(Rtp { seq: 5 }));
436 }
437
438 #[test]
439 fn handles_wrapping_sequence_numbers() {
440 let mut jitter = JitterBuffer::<Rtp, 10>::new();
441
442 jitter.push(Rtp { seq: u16::MAX - 2 });
443 jitter.push(Rtp { seq: u16::MAX - 1 });
444 jitter.push(Rtp { seq: u16::MAX });
445 jitter.push(Rtp { seq: u16::MIN });
446 jitter.push(Rtp { seq: u16::MIN + 1 });
447 jitter.push(Rtp { seq: u16::MIN + 2 });
448
449 assert_eq!(jitter.heap.len(), 6);
450 assert_eq!(jitter.pop(), Some(Rtp { seq: u16::MAX - 2 }));
451 assert_eq!(jitter.pop(), Some(Rtp { seq: u16::MAX - 1 }));
452 assert_eq!(jitter.pop(), Some(Rtp { seq: u16::MAX }));
453 assert_eq!(jitter.pop(), Some(Rtp { seq: u16::MIN }));
454 assert_eq!(jitter.pop(), Some(Rtp { seq: u16::MIN + 1 }));
455 assert_eq!(jitter.pop(), Some(Rtp { seq: u16::MIN + 2 }));
456 assert_eq!(jitter.heap.len(), 0);
457 }
458
459 #[test]
460 fn handles_reordering_of_wrapping_sequence_numbers() {
461 let mut jitter = JitterBuffer::<Rtp, 10>::new();
462
463 jitter.push(Rtp { seq: u16::MAX - 1 });
464 jitter.push(Rtp { seq: u16::MIN });
465 jitter.push(Rtp { seq: u16::MIN + 2 });
466 jitter.push(Rtp { seq: u16::MAX - 2 });
467 jitter.push(Rtp { seq: u16::MIN + 1 });
468 jitter.push(Rtp { seq: u16::MAX });
469
470 assert_eq!(jitter.heap.len(), 6);
471 assert_eq!(jitter.pop(), Some(Rtp { seq: u16::MAX - 2 }));
472 assert_eq!(jitter.pop(), Some(Rtp { seq: u16::MAX - 1 }));
473 assert_eq!(jitter.pop(), Some(Rtp { seq: u16::MAX }));
474 assert_eq!(jitter.pop(), Some(Rtp { seq: u16::MIN }));
475 assert_eq!(jitter.pop(), Some(Rtp { seq: u16::MIN + 1 }));
476 assert_eq!(jitter.pop(), Some(Rtp { seq: u16::MIN + 2 }));
477 assert_eq!(jitter.heap.len(), 0);
478 }
479
480 mod sequence_numbers {
481 use super::SequenceNumber as S;
482 use std::cmp::Ordering::*;
483
484 #[test]
485 fn preserves_u16_ordering_for_non_wrapping_nums() {
486 for i in 1..u16::MAX - 1 {
489 assert_eq!(S(i - 1).cmp(&S(i)), Less);
490 assert_eq!(S(i - 1).cmp(&S(i + 1)), Less);
491 assert_eq!(S(i).cmp(&S(i - 1)), Greater);
492 assert_eq!(S(i).cmp(&S(i)), Equal);
493 assert_eq!(S(i).cmp(&S(i + 1)), Less);
494 assert_eq!(S(i + 1).cmp(&S(i - 1)), Greater);
495 assert_eq!(S(i + 1).cmp(&S(i)), Greater);
496 }
497 }
498
499 #[test]
500 fn inverts_ordering_if_wrapped() {
501 for i in S::WRAPPING_WINDOW_START..u16::MAX {
502 for j in u16::MIN..S::WRAPPING_WINDOW_END {
503 assert_eq!(S(i).cmp(&S(j)), Less);
504 assert_eq!(S(j).cmp(&S(i)), Greater);
505 }
506 }
507 }
508
509 #[test]
510 fn respects_window() {
511 for i in S::WRAPPING_WINDOW_START..u16::MAX {
512 for j in S::WRAPPING_WINDOW_END + 1..S::WRAPPING_WINDOW_END + 8 {
513 assert_eq!(S(i).cmp(&S(j)), Greater);
514 assert_eq!(S(j).cmp(&S(i)), Less);
515 }
516 }
517 }
518 }
519}