msf_rtp/utils/
reorder.rs

1//! Helpers.
2
3use std::{collections::VecDeque, ops::Deref};
4
5use lru::LruCache;
6
7use crate::rtp::{IncomingRtpPacket, OrderedRtpPacket};
8
9/// Reordering error.
10pub enum ReorderingError {
11    BufferFull(IncomingRtpPacket),
12    DuplicatePacket(IncomingRtpPacket),
13}
14
15impl ReorderingError {
16    /// Check if the reordering buffer rejected the packet because it would
17    /// exceed its capacity.
18    #[inline]
19    pub fn is_full(&self) -> bool {
20        matches!(self, Self::BufferFull(_))
21    }
22
23    /// Check if the packet was rejected because of being a duplicate.
24    #[inline]
25    pub fn is_duplicate(&self) -> bool {
26        matches!(self, Self::DuplicatePacket(_))
27    }
28}
29
30/// Reordering buffer for RTP packets.
31///
32/// The reordering buffer internally uses `u64` packet indices. These indices
33/// are estimated from packet sequence numbers according to the algorithm
34/// presented in RFC 3711, section 3.3.1.
35///
36/// This simplifies the original algorithm presented in RFC 3550.
37///
38/// Note: RFC 3711 uses 32-bit ROC. This implementation uses 64-bit indices, so
39/// the actual bit width of the ROC is 48 bits here (48-bit ROC + 16-bit
40/// sequence number gives 64-bit packet index). The 32-bit ROC value can be
41/// extracted from packet indices simply by cutting off the upper 16 bits, e.g.
42/// `(index >> 16) as u32`.
43pub struct ReorderingBuffer {
44    inner: InternalBuffer,
45}
46
47impl ReorderingBuffer {
48    /// Create a new reordering buffer with a given depth.
49    #[inline]
50    pub fn new(depth: usize) -> Self {
51        Self {
52            inner: InternalBuffer::new(depth),
53        }
54    }
55
56    /// Estimate packet index from a given sequence number.
57    pub fn estimate_index(&self, sequence_nr: u16) -> u64 {
58        self.inner.estimate_index(sequence_nr)
59    }
60
61    /// Check if a packet with a given index would be a duplicate.
62    pub fn is_duplicate(&self, index: u64) -> bool {
63        self.inner.is_duplicate(index)
64    }
65
66    /// Push a given packet into the buffer and return index of the inserted
67    /// packet.
68    ///
69    /// The method returns an error if the packet cannot be inserted into the
70    /// buffer because it is either a duplicate or the buffer would exceed its
71    /// capacity.
72    #[allow(clippy::result_large_err)]
73    pub fn push(&mut self, packet: IncomingRtpPacket) -> Result<u64, ReorderingError> {
74        self.inner
75            .push(InputPacket::new(packet, 0))
76            .map_err(ReorderingError::from)
77    }
78
79    /// Take the next packet from the buffer.
80    ///
81    /// This method will return a packet only if there is a packet in the front
82    /// slot of the buffer. In other words, a packet will be returned only if
83    /// it is an in-order packet and returning it would not skip any packets.
84    #[allow(clippy::should_implement_trait)]
85    pub fn next(&mut self) -> Option<OrderedRtpPacket> {
86        self.inner.next().map(OrderedRtpPacket::from)
87    }
88
89    /// Remove the front slot from the buffer and return the contained packet
90    /// (if any).
91    ///
92    /// The method will always advance start position of the reordering window
93    /// by one even if the front slot is empty or if the underlying buffer
94    /// itself is empty.
95    pub fn take(&mut self) -> Option<OrderedRtpPacket> {
96        self.inner.take().map(OrderedRtpPacket::from)
97    }
98
99    /// Check if the buffer is empty.
100    #[inline]
101    pub fn is_empty(&self) -> bool {
102        self.inner.is_empty()
103    }
104}
105
106/// Reordering buffer for RTP packets from multiple synchronization sources.
107///
108/// The buffer can be used the same way as `ReorderingBuffer` in cases where
109/// RTP packets with multiple SSRCs are expected. The buffer reorder packets
110/// from each SSRC independently but the global reordering depth will be
111/// still limited to make sure that no packet is delayed for too long.
112pub struct ReorderingMultiBuffer {
113    input_index_to_ssrc: VecDeque<Option<u32>>,
114    first_input_index: usize,
115    sources: LruCache<u32, InternalBuffer>,
116    output: VecDeque<OutputPacket>,
117    capacity: usize,
118    max_ssrcs: Option<usize>,
119}
120
121impl ReorderingMultiBuffer {
122    /// Create a new reordering buffer.
123    ///
124    /// # Arguments
125    /// * `depth`: maximum number of buffered packets across all SSRCs and also
126    ///   the maximum distance between sequence numbers of any pair of packets
127    ///   within a single SSRC
128    /// * `max_ssrcs`: maximum number of SSRCs to track; if the number of SSRCs
129    ///   exceeds this limit, the least recently used SSRCs will be dropped
130    ///
131    /// Using unlimited number of SSRCs (i.e. `max_ssrcs == None`) is not
132    /// recommended as the memory usage would not be limited in this case.
133    pub fn new(depth: usize, max_ssrcs: Option<usize>) -> Self {
134        Self {
135            input_index_to_ssrc: VecDeque::new(),
136            first_input_index: 0,
137            sources: LruCache::unbounded(),
138            output: VecDeque::with_capacity(depth.min(8)),
139            capacity: depth,
140            max_ssrcs,
141        }
142    }
143
144    /// Estimate packet index from a given sequence number.
145    pub fn estimate_index(&self, ssrc: u32, sequence_nr: u16) -> u64 {
146        self.sources
147            .peek(&ssrc)
148            .map(|source| source.estimate_index(sequence_nr))
149            .unwrap_or(sequence_nr as u64)
150    }
151
152    /// Check if a packet with a given index would be a duplicate.
153    pub fn is_duplicate(&self, ssrc: u32, index: u64) -> bool {
154        self.sources
155            .peek(&ssrc)
156            .map(|source| source.is_duplicate(index))
157            .unwrap_or(false)
158    }
159
160    /// Push a given packet into the buffer and return index of the inserted
161    /// packet.
162    ///
163    /// The method returns an error if the packet cannot be inserted into the
164    /// buffer because it is either a duplicate or the buffer would exceed its
165    /// capacity.
166    #[allow(clippy::result_large_err)]
167    pub fn push(&mut self, packet: IncomingRtpPacket) -> Result<u64, ReorderingError> {
168        // check if the oldest packet in the buffer is more than `capacity`
169        // packets behind
170        if self.input_index_to_ssrc.len() >= self.capacity {
171            return Err(ReorderingError::BufferFull(packet));
172        }
173
174        let ssrc = packet.ssrc();
175
176        let source = self
177            .sources
178            .get_or_insert_mut(ssrc, || InternalBuffer::new(self.capacity));
179
180        let input_index = self
181            .first_input_index
182            .wrapping_add(self.input_index_to_ssrc.len());
183
184        let output_index = source.push(InputPacket::new(packet, input_index))?;
185
186        self.input_index_to_ssrc.push_back(Some(ssrc));
187
188        while let Some(packet) = source.next() {
189            self.output.push_back(packet);
190        }
191
192        // drop the least recently used SSRCs if we exceed the maximum allowed
193        // number of SSRCs
194        if let Some(max_ssrcs) = self.max_ssrcs {
195            while self.sources.len() > max_ssrcs {
196                if let Some((_, mut source)) = self.sources.pop_lru() {
197                    while !source.is_empty() {
198                        if let Some(packet) = source.take() {
199                            self.output.push_back(packet);
200                        }
201                    }
202                }
203            }
204        }
205
206        Ok(output_index)
207    }
208
209    /// Take the next packet from the buffer.
210    ///
211    /// This method will return a packet only if it is in-order (i.e. no
212    /// packets would be skipped for the corresponding SSRC) or if the
213    /// corresponding SSRC has been dropped from the buffer.
214    #[allow(clippy::should_implement_trait)]
215    pub fn next(&mut self) -> Option<OrderedRtpPacket> {
216        let packet = self.output.pop_front()?;
217
218        self.remove_input_index(packet.input_index);
219
220        Some(packet.into())
221    }
222
223    /// Take the next packet from the buffer.
224    ///
225    /// This method will either return the next in-order packet or contents
226    /// of the first slot of a SSRC sub-buffer containing the oldest packet.
227    /// The method will always advance the buffer state. Calling this method
228    /// repeatedly will eventually drain the buffer.
229    ///
230    /// Note that this method may return `None` even if the buffer is not
231    /// empty. It only indicates a missing packet.
232    pub fn take(&mut self) -> Option<OrderedRtpPacket> {
233        let packet = if let Some(p) = self.output.pop_front() {
234            p
235        } else {
236            self.poll_oldest_source()?
237        };
238
239        self.remove_input_index(packet.input_index);
240
241        Some(packet.into())
242    }
243
244    /// Check if the underlying buffer is empty.
245    #[inline]
246    pub fn is_empty(&self) -> bool {
247        self.input_index_to_ssrc.is_empty()
248    }
249
250    /// Remove a given input index from the `input_index_to_ssrc` map.
251    ///
252    /// The method will also remove all tombstones from the front of the map in
253    /// order to keep the invariant that the front element of the map is not
254    /// `None` and it will also adjust `first_input_index` accordingly.
255    ///
256    /// Note that the time complexity of this method is still O(1) on average
257    /// because there are only two operations for each input index:
258    ///
259    /// * replacing the index with `None`
260    /// * and removing the element from the map.
261    fn remove_input_index(&mut self, input_index: usize) {
262        let offset = input_index.wrapping_sub(self.first_input_index);
263
264        self.input_index_to_ssrc[offset] = None;
265
266        // remove all tombstones from the front of the map
267        while let Some(None) = self.input_index_to_ssrc.front() {
268            self.input_index_to_ssrc.pop_front();
269
270            // ... and increment the first input index accordingly
271            self.first_input_index = self.first_input_index.wrapping_add(1);
272        }
273    }
274
275    /// Advance the SSRC buffer containing the oldest packet.
276    ///
277    /// The method will return contents of the front slot of that buffer.
278    /// Returning `None` does not indicate that the buffer is empty.
279    fn poll_oldest_source(&mut self) -> Option<OutputPacket> {
280        if let Some(ssrc) = self.input_index_to_ssrc.front()? {
281            if let Some(source) = self.sources.peek_mut(ssrc) {
282                if !source.is_empty() {
283                    let res = source.take();
284
285                    // in-order packets may follow after removing the first
286                    // slot; we need to move them to the output queue
287                    while let Some(packet) = source.next() {
288                        self.output.push_back(packet);
289                    }
290
291                    return res;
292                }
293            }
294        }
295
296        // NOTE: This should never happen. The invariant is that if
297        //   `input_index_to_ssrc` is not empty, its front element is not
298        //   `None` and the corresponding source exists and is not empty
299        //   (because it must contain the oldest packet).
300
301        panic!("inconsistent state")
302    }
303}
304
305/// Reordering buffer for RTP packets from a single synchronization source.
306struct InternalBuffer {
307    start: Option<u64>,
308    window: VecDeque<Option<OutputPacket>>,
309    capacity: usize,
310}
311
312impl InternalBuffer {
313    /// Create a new reordering buffer with a given depth.
314    #[inline]
315    fn new(depth: usize) -> Self {
316        Self {
317            start: None,
318            window: VecDeque::with_capacity(depth.min(8)),
319            capacity: depth,
320        }
321    }
322
323    /// Estimate packet index from a given sequence number.
324    fn estimate_index(&self, sequence_nr: u16) -> u64 {
325        let start_index = self.start.unwrap_or(sequence_nr as u64);
326        let last_index = start_index.wrapping_add(self.window.len() as u64);
327        let last_seq_nr = last_index as u16;
328        let last_roc = last_index & !0xffff;
329
330        let new_seq_nr = sequence_nr;
331
332        let new_roc = if last_seq_nr < 0x8000 {
333            if new_seq_nr > (last_seq_nr + 0x8000) {
334                last_roc.wrapping_sub(0x10000)
335            } else {
336                last_roc
337            }
338        } else if (last_seq_nr - 0x8000) > new_seq_nr {
339            last_roc.wrapping_add(0x10000)
340        } else {
341            last_roc
342        };
343
344        new_roc | (new_seq_nr as u64)
345    }
346
347    /// Check if a packet with a given index would be a duplicate.
348    fn is_duplicate(&self, index: u64) -> bool {
349        let start = self.start.unwrap_or(index);
350
351        let offset = index.wrapping_sub(start);
352
353        // this is `index < start` in wrapping arithmetic
354        if offset > (u64::MAX >> 1) {
355            return true;
356        }
357
358        let Ok(offset) = usize::try_from(offset) else {
359            return false;
360        };
361
362        self.window
363            .get(offset)
364            .map(|entry| entry.is_some())
365            .unwrap_or(false)
366    }
367
368    /// Push a given packet into the buffer and return index of the inserted
369    /// packet.
370    ///
371    /// The method returns an error if the packet cannot be inserted into the
372    /// buffer because it is either a duplicate or the buffer would exceed its
373    /// capacity.
374    #[allow(clippy::result_large_err)]
375    fn push(&mut self, packet: InputPacket) -> Result<u64, InternalError> {
376        let index = self.estimate_index(packet.sequence_number());
377
378        if self.start.is_none() {
379            self.start = Some(index);
380        }
381
382        let start = self.start.unwrap_or(index);
383
384        let offset = index.wrapping_sub(start);
385
386        // this is `index < start` in wrapping arithmetic
387        if offset > (u64::MAX >> 1) {
388            return Err(InternalError::DuplicatePacket(packet));
389        }
390
391        let Ok(offset) = usize::try_from(offset) else {
392            return Err(InternalError::BufferFull(packet));
393        };
394
395        if offset < self.capacity {
396            while offset >= self.window.len() {
397                self.window.push_back(None);
398            }
399
400            let entry = &mut self.window[offset];
401
402            if entry.is_some() {
403                return Err(InternalError::DuplicatePacket(packet));
404            }
405
406            *entry = Some(OutputPacket::new(packet, index));
407
408            Ok(index)
409        } else {
410            Err(InternalError::BufferFull(packet))
411        }
412    }
413
414    /// Take the next packet from the buffer.
415    ///
416    /// This method will return a packet only if there is a packet in the front
417    /// slot of the buffer. In other words, a packet will be returned only if
418    /// it is an in-order packet and returning it would not skip any packets.
419    fn next(&mut self) -> Option<OutputPacket> {
420        if let Some(entry) = self.window.front() {
421            if entry.is_some() {
422                return self.take();
423            }
424        }
425
426        None
427    }
428
429    /// Remove the front slot from the buffer and return the contained packet
430    /// (if any).
431    ///
432    /// The method will always advance start position of the reordering window
433    /// by one even if the front slot is empty or if the underlying buffer
434    /// itself is empty.
435    fn take(&mut self) -> Option<OutputPacket> {
436        if let Some(start) = self.start.as_mut() {
437            *start = start.wrapping_add(1);
438        }
439
440        self.window.pop_front()?
441    }
442
443    /// Check if the buffer is empty.
444    #[inline]
445    fn is_empty(&self) -> bool {
446        self.window.is_empty()
447    }
448}
449
450/// Helper struct.
451///
452/// It associates an input index with an incoming RTP packet.
453struct InputPacket {
454    input_index: usize,
455    packet: IncomingRtpPacket,
456}
457
458impl InputPacket {
459    /// Create a new input packet.
460    fn new(packet: IncomingRtpPacket, input_index: usize) -> Self {
461        Self {
462            input_index,
463            packet,
464        }
465    }
466}
467
468impl Deref for InputPacket {
469    type Target = IncomingRtpPacket;
470
471    fn deref(&self) -> &Self::Target {
472        &self.packet
473    }
474}
475
476/// Helper struct.
477///
478/// It associates an input index and an output index with an incoming RTP
479/// packet.
480struct OutputPacket {
481    input_index: usize,
482    output_index: u64,
483    packet: IncomingRtpPacket,
484}
485
486impl OutputPacket {
487    /// Create a new output packet.
488    fn new(packet: InputPacket, output_index: u64) -> Self {
489        Self {
490            input_index: packet.input_index,
491            output_index,
492            packet: packet.packet,
493        }
494    }
495}
496
497impl From<OutputPacket> for OrderedRtpPacket {
498    fn from(packet: OutputPacket) -> Self {
499        OrderedRtpPacket::new(packet.packet, packet.output_index)
500    }
501}
502
503/// Helper enum.
504///
505/// It has the same variants as `ReorderingError` but it carries `InputPacket`
506/// instead of `IncomingRtpPacket`.
507enum InternalError {
508    BufferFull(InputPacket),
509    DuplicatePacket(InputPacket),
510}
511
512impl From<InternalError> for ReorderingError {
513    fn from(err: InternalError) -> Self {
514        match err {
515            InternalError::BufferFull(packet) => Self::BufferFull(packet.packet),
516            InternalError::DuplicatePacket(packet) => Self::DuplicatePacket(packet.packet),
517        }
518    }
519}
520
521#[cfg(test)]
522mod tests {
523    use std::time::Instant;
524
525    use super::{ReorderingBuffer, ReorderingError, ReorderingMultiBuffer};
526
527    use crate::rtp::{IncomingRtpPacket, RtpPacket};
528
529    fn make_packet(seq: u16, ssrc: u32) -> IncomingRtpPacket {
530        let packet = RtpPacket::new().with_sequence_number(seq).with_ssrc(ssrc);
531
532        IncomingRtpPacket::new(packet, Instant::now())
533    }
534
535    #[test]
536    fn test_wrapping_index_arithmetic() {
537        let mut buffer = ReorderingBuffer::new(4);
538
539        assert!(matches!(buffer.push(make_packet(0x1000, 1)), Ok(0x1000)));
540
541        assert_eq!(buffer.estimate_index(0x0000), 0x0000_0000_0000_0000);
542        assert_eq!(buffer.estimate_index(0x2000), 0x0000_0000_0000_2000);
543        assert_eq!(buffer.estimate_index(0xf000), 0xffff_ffff_ffff_f000);
544
545        assert!(matches!(
546            buffer.push(make_packet(0xf000, 1)),
547            Err(ReorderingError::DuplicatePacket(_))
548        ));
549        assert!(matches!(
550            buffer.push(make_packet(0x2000, 1)),
551            Err(ReorderingError::BufferFull(_))
552        ));
553
554        buffer = ReorderingBuffer::new(4);
555
556        assert!(matches!(buffer.push(make_packet(0xe000, 1)), Ok(0xe000)));
557
558        assert_eq!(buffer.estimate_index(0xd000), 0x0000_0000_0000_d000);
559        assert_eq!(buffer.estimate_index(0xf000), 0x0000_0000_0000_f000);
560        assert_eq!(buffer.estimate_index(0x1000), 0x0000_0000_0001_1000);
561
562        assert!(matches!(
563            buffer.push(make_packet(0xd000, 1)),
564            Err(ReorderingError::DuplicatePacket(_))
565        ));
566        assert!(matches!(
567            buffer.push(make_packet(0x1000, 1)),
568            Err(ReorderingError::BufferFull(_))
569        ));
570
571        buffer = ReorderingBuffer::new(4);
572
573        buffer.inner.start = Some(u64::MAX);
574
575        assert!(matches!(buffer.push(make_packet(0xffff, 1)), Ok(u64::MAX)));
576        assert!(matches!(buffer.push(make_packet(0x0000, 1)), Ok(0)));
577
578        assert_eq!(buffer.estimate_index(0xf000), 0xffff_ffff_ffff_f000);
579        assert_eq!(buffer.estimate_index(0x1000), 0x0000_0000_0000_1000);
580
581        assert!(matches!(
582            buffer.push(make_packet(0xf000, 1)),
583            Err(ReorderingError::DuplicatePacket(_))
584        ));
585        assert!(matches!(
586            buffer.push(make_packet(0x1000, 1)),
587            Err(ReorderingError::BufferFull(_))
588        ));
589    }
590
591    #[test]
592    fn test_reordering_buffer() {
593        let mut buffer = ReorderingBuffer::new(5);
594
595        assert!(buffer.is_empty());
596
597        assert!(matches!(buffer.push(make_packet(2, 1)), Ok(2)));
598        assert!(matches!(
599            buffer.push(make_packet(0, 1)),
600            Err(ReorderingError::DuplicatePacket(_))
601        ));
602        assert!(matches!(
603            buffer.push(make_packet(1, 1)),
604            Err(ReorderingError::DuplicatePacket(_))
605        ));
606        assert!(matches!(buffer.push(make_packet(4, 1)), Ok(4)));
607        assert!(matches!(buffer.push(make_packet(3, 1)), Ok(3)));
608        assert!(matches!(
609            buffer.push(make_packet(3, 1)),
610            Err(ReorderingError::DuplicatePacket(_))
611        ));
612        assert!(matches!(buffer.push(make_packet(6, 1)), Ok(6)));
613        assert!(matches!(
614            buffer.push(make_packet(7, 1)),
615            Err(ReorderingError::BufferFull(_))
616        ));
617
618        assert!(!buffer.is_empty());
619
620        assert_eq!(buffer.next().unwrap().index(), 2);
621        assert_eq!(buffer.next().unwrap().index(), 3);
622        assert_eq!(buffer.next().unwrap().index(), 4);
623
624        assert!(matches!(buffer.next(), None));
625
626        assert!(!buffer.is_empty());
627
628        assert!(matches!(buffer.take(), None));
629
630        assert!(!buffer.is_empty());
631
632        assert_eq!(buffer.next().unwrap().index(), 6);
633
634        assert!(buffer.is_empty());
635    }
636
637    #[test]
638    fn test_reordering_multi_buffer() {
639        let mut buffer = ReorderingMultiBuffer::new(8, Some(2));
640
641        assert!(buffer.is_empty());
642
643        assert!(matches!(buffer.push(make_packet(2, 1)), Ok(2)));
644        assert!(matches!(
645            buffer.push(make_packet(0, 1)),
646            Err(ReorderingError::DuplicatePacket(_))
647        ));
648        assert!(matches!(
649            buffer.push(make_packet(1, 1)),
650            Err(ReorderingError::DuplicatePacket(_))
651        ));
652        assert!(matches!(buffer.push(make_packet(4, 1)), Ok(4)));
653        assert!(matches!(buffer.push(make_packet(3, 1)), Ok(3)));
654        assert!(matches!(
655            buffer.push(make_packet(3, 1)),
656            Err(ReorderingError::DuplicatePacket(_))
657        ));
658        assert!(matches!(buffer.push(make_packet(6, 1)), Ok(6)));
659        assert!(matches!(
660            buffer.push(make_packet(13, 1)),
661            Err(ReorderingError::BufferFull(_))
662        ));
663
664        assert!(matches!(buffer.push(make_packet(10, 2)), Ok(10)));
665        assert!(matches!(
666            buffer.push(make_packet(9, 2)),
667            Err(ReorderingError::DuplicatePacket(_))
668        ));
669        assert!(matches!(
670            buffer.push(make_packet(8, 2)),
671            Err(ReorderingError::DuplicatePacket(_))
672        ));
673        assert!(matches!(buffer.push(make_packet(12, 2)), Ok(12)));
674        assert!(matches!(buffer.push(make_packet(11, 2)), Ok(11)));
675        assert!(matches!(
676            buffer.push(make_packet(11, 2)),
677            Err(ReorderingError::DuplicatePacket(_))
678        ));
679        assert!(matches!(
680            buffer.push(make_packet(21, 2)),
681            Err(ReorderingError::BufferFull(_))
682        ));
683        assert!(matches!(buffer.push(make_packet(14, 2)), Ok(14)));
684        assert!(matches!(
685            buffer.push(make_packet(15, 2)),
686            Err(ReorderingError::BufferFull(_))
687        ));
688
689        assert!(!buffer.is_empty());
690
691        assert_eq!(buffer.next().unwrap().index(), 2);
692        assert_eq!(buffer.next().unwrap().index(), 3);
693        assert_eq!(buffer.next().unwrap().index(), 4);
694
695        assert_eq!(buffer.next().unwrap().index(), 10);
696        assert_eq!(buffer.next().unwrap().index(), 11);
697        assert_eq!(buffer.next().unwrap().index(), 12);
698
699        assert!(matches!(buffer.next(), None));
700
701        assert!(!buffer.is_empty());
702
703        assert!(matches!(buffer.take(), None));
704
705        assert!(!buffer.is_empty());
706
707        assert_eq!(buffer.next().unwrap().index(), 6);
708
709        assert!(matches!(buffer.next(), None));
710
711        assert!(!buffer.is_empty());
712
713        assert!(matches!(buffer.take(), None));
714
715        assert!(!buffer.is_empty());
716
717        assert_eq!(buffer.next().unwrap().index(), 14);
718
719        assert!(buffer.is_empty());
720    }
721}