jittr/
lib.rs

1use std::collections::BinaryHeap;
2use std::time::SystemTime;
3
4/// Zero latency jitter buffer for real time udp/rtp streams
5pub struct JitterBuffer<P, const S: u8>
6where
7    P: Packet,
8{
9    last: Option<JitterPacket<P>>,
10    heap: BinaryHeap<JitterPacket<P>>,
11}
12
13impl<P, const S: u8> JitterBuffer<P, S>
14where
15    P: Packet,
16{
17    /// Create a new jitter buffer
18    pub fn new() -> Self {
19        Self {
20            last: None,
21            heap: BinaryHeap::with_capacity(S as usize),
22        }
23    }
24
25    /// Push a packet onto the jitter buffer
26    ///
27    /// Hint: This may drop the packet if it is already been played
28    /// back or is already present in the buffer
29    pub fn push(&mut self, packet: P) {
30        if self.heap.len() >= S as usize {
31            while self.heap.len() >= S as usize && !self.heap.is_empty() {
32                // SAFETY: We just checked the length is greater or equal to 1
33                let dropped = self.heap.pop();
34                self.last = None;
35                #[cfg(feature = "log")]
36                log::warn!("dropping packet: {:?}", dropped.map(|p| p.sequence_number));
37            }
38        }
39
40        if let Some(ref last) = self.last {
41            if last.sequence_number >= packet.sequence_number().into() {
42                #[cfg(feature = "log")]
43                log::warn!(
44                    "discarded packet {} since newer packet was already played back",
45                    packet.sequence_number()
46                );
47
48                return;
49            }
50        }
51
52        if self
53            .heap
54            .iter()
55            .any(|p| p.sequence_number == packet.sequence_number().into())
56        {
57            #[cfg(feature = "log")]
58            log::warn!(
59                "discarded packet {} since its already buffered",
60                packet.sequence_number()
61            );
62
63            return;
64        }
65
66        if !self.heap.is_empty() {
67            // SAFETY: we checked that we have at least one packet in the heap
68            let max_seq = self.heap.iter().max().unwrap().sequence_number;
69
70            if SequenceNumber(max_seq.0.overflowing_add(S as u16).0)
71                < packet.sequence_number().into()
72            {
73                #[cfg(feature = "log")]
74                log::warn!(
75                    "unexpectedly received packet {} which is too far ahead (over {S} packets) of current playback window, clearing jitter buffer",
76                    packet.sequence_number()
77                );
78
79                self.clear();
80            }
81        }
82
83        #[cfg(feature = "log")]
84        log::debug!("pushed packet {} onto heap", packet.sequence_number());
85        self.heap.push(packet.into());
86    }
87
88    /// Pop the next packet from the jitter buffer
89    ///
90    /// Hint: This will return `None` if the next packet expected
91    /// (by sequence number) was lost. Most audio and video codecs used for
92    /// realtime streaming support inference of lost packets.
93    pub fn pop(&mut self) -> Option<P> {
94        if self.heap.is_empty() {
95            return None;
96        }
97
98        let last = match self.last {
99            Some(ref last) => last.to_owned(),
100            None => {
101                // SAFETY:
102                // we checked that the heap is not empty so at least one
103                // element must be present or the std implementation is flawed.
104                let mut packet = self.heap.pop().unwrap();
105                packet.yielded_at = Some(SystemTime::now());
106                self.last = Some(packet.clone());
107
108                #[cfg(feature = "log")]
109                log::debug!(
110                    "packet {} yielded, {} remaining",
111                    packet.sequence_number.0,
112                    self.heap.len()
113                );
114
115                return packet.into();
116            }
117        };
118
119        let next_sequence = match self.heap.peek() {
120            Some(next) => next.sequence_number,
121            None => {
122                #[cfg(feature = "log")]
123                log::error!("expected next packet to be present but heap is empty");
124
125                return None;
126            }
127        };
128
129        let packet = if next_sequence == (u16::from(last.sequence_number).wrapping_add(1)).into() {
130            match self.heap.pop() {
131                Some(packet) => packet.into(),
132                None => {
133                    #[cfg(feature = "log")]
134                    log::error!("expected packet {} to be present", next_sequence.0);
135
136                    return None;
137                }
138            }
139        } else {
140            None
141        };
142
143        self.last = Some(JitterPacket {
144            raw: packet.clone(),
145            sequence_number: packet
146                .as_ref()
147                .map(|p| p.sequence_number())
148                .unwrap_or_else(|| u16::from(last.sequence_number).wrapping_add(1))
149                .into(),
150            yielded_at: Some(SystemTime::now()),
151        });
152
153        #[cfg(feature = "log")]
154        log::debug!(
155            "packet {:?} yielded, {} remaining",
156            self.last.as_ref().map(|l| l.sequence_number),
157            self.heap.len()
158        );
159
160        packet
161    }
162
163    /// Retrieve the number of packets available for playback without packet loss.
164    ///
165    /// Hint: Use this to reduce latency once the network is in good condition.
166    /// If there are a lot of packets available for playback without packet loss
167    /// it is pointless to keep them in the buffer.
168    pub fn lossless_packets_buffered(&self) -> usize {
169        match self.last {
170            Some(ref last) => {
171                let mut last = last.sequence_number;
172                let mut count = 0;
173
174                let sequence_numbers = self.heap.clone().into_sorted_vec();
175                let sequence_numbers = sequence_numbers.iter().rev().map(|p| p.sequence_number);
176
177                #[cfg(feature = "log")]
178                log::debug!(
179                    "compute lossless packets: {:?}",
180                    sequence_numbers.clone().collect::<Vec<SequenceNumber>>()
181                );
182
183                for packet in sequence_numbers {
184                    #[cfg(feature = "log")]
185                    log::info!(
186                        "is next of: {:?} {:?} = {}",
187                        packet,
188                        last,
189                        packet.is_next_of(last)
190                    );
191
192                    if packet.is_next_of(last) {
193                        #[cfg(feature = "log")]
194                        log::debug!("{:?} is next of {:?}", packet, last);
195                        last = packet;
196                        count += 1;
197                    } else {
198                        break;
199                    }
200                }
201
202                #[cfg(feature = "log")]
203                log::debug!("computed lossless packets: {count}");
204
205                count
206            }
207            None => 0,
208        }
209    }
210
211    /// Drops all packets in the jitter buffer
212    pub fn clear(&mut self) {
213        self.last = None;
214        self.heap.clear();
215    }
216}
217
218impl<P: Packet, const S: u8> Default for JitterBuffer<P, S> {
219    fn default() -> Self {
220        Self::new()
221    }
222}
223
224/// A packet which should be reordered and managed by the jitter buffer
225pub trait Packet: Unpin + Clone {
226    fn sequence_number(&self) -> u16;
227}
228
229#[derive(Debug, Clone)]
230pub(crate) struct JitterPacket<P>
231where
232    P: Packet,
233{
234    pub(crate) raw: Option<P>,
235    pub(crate) sequence_number: SequenceNumber,
236    pub(crate) yielded_at: Option<SystemTime>,
237}
238
239impl<P> JitterPacket<P>
240where
241    P: Packet,
242{
243    fn into(self) -> Option<P> {
244        self.raw
245    }
246}
247
248impl<P> From<P> for JitterPacket<P>
249where
250    P: Packet,
251{
252    fn from(raw: P) -> Self {
253        Self {
254            sequence_number: raw.sequence_number().into(),
255            yielded_at: None,
256            raw: Some(raw),
257        }
258    }
259}
260
261impl<P> PartialEq for JitterPacket<P>
262where
263    P: Packet,
264{
265    fn eq(&self, other: &Self) -> bool {
266        self.sequence_number.eq(&other.sequence_number)
267    }
268}
269
270impl<P> Eq for JitterPacket<P> where P: Packet {}
271
272impl<P> PartialOrd for JitterPacket<P>
273where
274    P: Packet,
275{
276    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
277        Some(self.cmp(other))
278    }
279}
280
281impl<P> Ord for JitterPacket<P>
282where
283    P: Packet,
284{
285    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
286        self.sequence_number.cmp(&other.sequence_number).reverse()
287    }
288}
289
290/// A wrapping sequence number type according to the RFC 3550
291/// that has a window in which normal u16 comparisons are inverted
292///
293/// See https://www.rfc-editor.org/rfc/rfc3550#appendix-A.1 as reference
294/// for wrapping sequence number handling
295///
296/// The accepted wrapping window is set to 16 numbers
297#[derive(Debug, Clone, Copy, PartialEq, Eq)]
298struct SequenceNumber(u16);
299
300impl SequenceNumber {
301    const WRAPPING_WINDOW_SIZE: u16 = 16;
302    const WRAPPING_WINDOW_START: u16 = u16::MAX - (Self::WRAPPING_WINDOW_SIZE / 2);
303    const WRAPPING_WINDOW_END: u16 = u16::MIN + (Self::WRAPPING_WINDOW_SIZE / 2);
304
305    pub fn did_wrap(&self, next: Self) -> bool {
306        self.0 >= Self::WRAPPING_WINDOW_START && next.0 <= Self::WRAPPING_WINDOW_END
307    }
308
309    pub fn is_next_of(&self, last: SequenceNumber) -> bool {
310        if last.did_wrap(*self) {
311            return last.0 == u16::MAX && self.0 == u16::MIN;
312        }
313
314        last.0.wrapping_add(1) == self.0
315    }
316}
317
318impl PartialOrd for SequenceNumber {
319    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
320        Some(self.cmp(other))
321    }
322}
323
324impl Ord for SequenceNumber {
325    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
326        if self.did_wrap(*other) {
327            return std::cmp::Ordering::Less;
328        } else if other.did_wrap(*self) {
329            return std::cmp::Ordering::Greater;
330        }
331
332        self.0.cmp(&other.0)
333    }
334}
335
336impl From<u16> for SequenceNumber {
337    fn from(num: u16) -> Self {
338        Self(num)
339    }
340}
341
342impl From<SequenceNumber> for u16 {
343    fn from(sn: SequenceNumber) -> Self {
344        sn.0
345    }
346}
347
348#[cfg(test)]
349mod tests {
350    use super::*;
351
352    #[derive(Debug, Clone, PartialEq)]
353    struct Rtp {
354        seq: u16,
355    }
356
357    impl Packet for Rtp {
358        #[inline]
359        fn sequence_number(&self) -> u16 {
360            self.seq
361        }
362    }
363
364    #[test]
365    fn const_capacity() {
366        let jitter = JitterBuffer::<Rtp, 10>::new();
367        assert_eq!(jitter.heap.capacity(), 10);
368    }
369
370    #[test]
371    fn send() {
372        let mut jitter = JitterBuffer::<Rtp, 10>::new();
373        let packet = Rtp { seq: 0 };
374        jitter.push(packet.clone());
375        assert_eq!(jitter.heap.peek(), Some(&packet.into()));
376    }
377
378    #[test]
379    fn reorders_racing_packets() {
380        let mut jitter = JitterBuffer::<Rtp, 10>::new();
381
382        jitter.push(Rtp { seq: 0 });
383        assert_eq!(jitter.pop(), Some(Rtp { seq: 0 }));
384
385        jitter.push(Rtp { seq: 2 });
386        jitter.push(Rtp { seq: 1 });
387
388        assert_eq!(jitter.pop(), Some(Rtp { seq: 1 }));
389        assert_eq!(jitter.pop(), Some(Rtp { seq: 2 }));
390    }
391
392    #[test]
393    fn discards_already_played_packets() {
394        let mut jitter = JitterBuffer::<Rtp, 10>::new();
395
396        jitter.push(Rtp { seq: 0 });
397        assert_eq!(jitter.pop(), Some(Rtp { seq: 0 }));
398
399        jitter.push(Rtp { seq: 0 });
400        jitter.push(Rtp { seq: 1 });
401
402        assert_eq!(jitter.pop(), Some(Rtp { seq: 1 }));
403    }
404
405    #[test]
406    fn discards_duplicated_packets() {
407        let mut jitter = JitterBuffer::<Rtp, 10>::new();
408
409        jitter.push(Rtp { seq: 0 });
410        jitter.push(Rtp { seq: 0 });
411        jitter.push(Rtp { seq: 0 });
412        jitter.push(Rtp { seq: 0 });
413        jitter.push(Rtp { seq: 0 });
414
415        assert_eq!(jitter.pop(), Some(Rtp { seq: 0 }));
416        assert_eq!(jitter.heap.len(), 0);
417        assert_eq!(jitter.pop(), None);
418    }
419
420    #[test]
421    fn handles_packet_loss_correctly() {
422        let mut jitter = JitterBuffer::<Rtp, 10>::new();
423
424        jitter.push(Rtp { seq: 0 });
425        jitter.push(Rtp { seq: 1 });
426        jitter.push(Rtp { seq: 2 });
427        jitter.push(Rtp { seq: 3 });
428        jitter.push(Rtp { seq: 5 });
429
430        assert_eq!(jitter.pop(), Some(Rtp { seq: 0 }));
431        assert_eq!(jitter.pop(), Some(Rtp { seq: 1 }));
432        assert_eq!(jitter.pop(), Some(Rtp { seq: 2 }));
433        assert_eq!(jitter.pop(), Some(Rtp { seq: 3 }));
434        assert_eq!(jitter.pop(), None);
435        assert_eq!(jitter.pop(), Some(Rtp { seq: 5 }));
436    }
437
438    #[test]
439    fn handles_wrapping_sequence_numbers() {
440        let mut jitter = JitterBuffer::<Rtp, 10>::new();
441
442        jitter.push(Rtp { seq: u16::MAX - 2 });
443        jitter.push(Rtp { seq: u16::MAX - 1 });
444        jitter.push(Rtp { seq: u16::MAX });
445        jitter.push(Rtp { seq: u16::MIN });
446        jitter.push(Rtp { seq: u16::MIN + 1 });
447        jitter.push(Rtp { seq: u16::MIN + 2 });
448
449        assert_eq!(jitter.heap.len(), 6);
450        assert_eq!(jitter.pop(), Some(Rtp { seq: u16::MAX - 2 }));
451        assert_eq!(jitter.pop(), Some(Rtp { seq: u16::MAX - 1 }));
452        assert_eq!(jitter.pop(), Some(Rtp { seq: u16::MAX }));
453        assert_eq!(jitter.pop(), Some(Rtp { seq: u16::MIN }));
454        assert_eq!(jitter.pop(), Some(Rtp { seq: u16::MIN + 1 }));
455        assert_eq!(jitter.pop(), Some(Rtp { seq: u16::MIN + 2 }));
456        assert_eq!(jitter.heap.len(), 0);
457    }
458
459    #[test]
460    fn handles_reordering_of_wrapping_sequence_numbers() {
461        let mut jitter = JitterBuffer::<Rtp, 10>::new();
462
463        jitter.push(Rtp { seq: u16::MAX - 1 });
464        jitter.push(Rtp { seq: u16::MIN });
465        jitter.push(Rtp { seq: u16::MIN + 2 });
466        jitter.push(Rtp { seq: u16::MAX - 2 });
467        jitter.push(Rtp { seq: u16::MIN + 1 });
468        jitter.push(Rtp { seq: u16::MAX });
469
470        assert_eq!(jitter.heap.len(), 6);
471        assert_eq!(jitter.pop(), Some(Rtp { seq: u16::MAX - 2 }));
472        assert_eq!(jitter.pop(), Some(Rtp { seq: u16::MAX - 1 }));
473        assert_eq!(jitter.pop(), Some(Rtp { seq: u16::MAX }));
474        assert_eq!(jitter.pop(), Some(Rtp { seq: u16::MIN }));
475        assert_eq!(jitter.pop(), Some(Rtp { seq: u16::MIN + 1 }));
476        assert_eq!(jitter.pop(), Some(Rtp { seq: u16::MIN + 2 }));
477        assert_eq!(jitter.heap.len(), 0);
478    }
479
480    mod sequence_numbers {
481        use super::SequenceNumber as S;
482        use std::cmp::Ordering::*;
483
484        #[test]
485        fn preserves_u16_ordering_for_non_wrapping_nums() {
486            // Preserves normal ordering when not encountering wrapping
487            // 1..-1 since otherwise the checks would wrap
488            for i in 1..u16::MAX - 1 {
489                assert_eq!(S(i - 1).cmp(&S(i)), Less);
490                assert_eq!(S(i - 1).cmp(&S(i + 1)), Less);
491                assert_eq!(S(i).cmp(&S(i - 1)), Greater);
492                assert_eq!(S(i).cmp(&S(i)), Equal);
493                assert_eq!(S(i).cmp(&S(i + 1)), Less);
494                assert_eq!(S(i + 1).cmp(&S(i - 1)), Greater);
495                assert_eq!(S(i + 1).cmp(&S(i)), Greater);
496            }
497        }
498
499        #[test]
500        fn inverts_ordering_if_wrapped() {
501            for i in S::WRAPPING_WINDOW_START..u16::MAX {
502                for j in u16::MIN..S::WRAPPING_WINDOW_END {
503                    assert_eq!(S(i).cmp(&S(j)), Less);
504                    assert_eq!(S(j).cmp(&S(i)), Greater);
505                }
506            }
507        }
508
509        #[test]
510        fn respects_window() {
511            for i in S::WRAPPING_WINDOW_START..u16::MAX {
512                for j in S::WRAPPING_WINDOW_END + 1..S::WRAPPING_WINDOW_END + 8 {
513                    assert_eq!(S(i).cmp(&S(j)), Greater);
514                    assert_eq!(S(j).cmp(&S(i)), Less);
515                }
516            }
517        }
518    }
519}