msf_rtp/
utils.rs

1//! Helpers.
2
3use std::collections::VecDeque;
4
5use crate::rtp::RtpPacket;
6
7/// Reordering error.
8pub enum ReorderingError {
9    BufferFull(RtpPacket),
10    DuplicatePacket(RtpPacket),
11}
12
13impl ReorderingError {
14    /// Check if the reordering buffer rejected the packet because it would
15    /// exceed its capacity.
16    #[inline]
17    pub fn is_full(&self) -> bool {
18        matches!(self, Self::BufferFull(_))
19    }
20
21    /// Check if the packet was rejected because of being a duplicate.
22    #[inline]
23    pub fn is_duplicate(&self) -> bool {
24        matches!(self, Self::DuplicatePacket(_))
25    }
26}
27
28/// Reordering buffer for RTP packets.
29///
30/// The reordering buffer internally uses `u64` packet indices. These indices
31/// are estimated from packet sequence numbers according to the algorithm
32/// presented in RFC 3711, section 3.3.1.
33///
34/// This simplifies the original algorithm presented in RFC 3550.
35///
36/// Note: RFC 3711 uses 32-bit ROC. This implementation uses 64-bit indices, so
37/// the actual bit width of the ROC is 48 bits here (48-bit ROC + 16-bit
38/// sequence number gives 64-bit packet index). The 32-bit ROC value can be
39/// extracted from packet indices simply by cutting off the upper 16 bits, e.g.
40/// `(index >> 16) as u32`.
41pub struct ReorderingBuffer {
42    start: Option<u64>,
43    window: VecDeque<Option<RtpPacket>>,
44    capacity: usize,
45}
46
47impl ReorderingBuffer {
48    /// Create a new reordering buffer with a given depth.
49    #[inline]
50    pub fn new(depth: usize) -> Self {
51        Self {
52            start: None,
53            window: VecDeque::with_capacity(depth.min(32)),
54            capacity: depth,
55        }
56    }
57
58    /// Estimate packet index from a given sequence number.
59    pub fn estimate_index(&self, sequence_nr: u16) -> u64 {
60        let start_index = self.start.unwrap_or(sequence_nr as u64);
61        let last_index = start_index + (self.window.len() as u64);
62        let last_seq_nr = (last_index & 0xffff) as u16;
63        let last_roc = last_index >> 16;
64
65        let new_seq_nr = sequence_nr;
66
67        let new_roc = if new_seq_nr > last_seq_nr {
68            if (new_seq_nr - last_seq_nr) < 0x8000 {
69                last_roc
70            } else {
71                last_roc.wrapping_sub(1)
72            }
73        } else if (last_seq_nr - new_seq_nr) < 0x8000 {
74            last_roc
75        } else {
76            last_roc.wrapping_add(1)
77        };
78
79        (new_roc << 16) | (new_seq_nr as u64)
80    }
81
82    /// Check if a packet with a given index would be a duplicate.
83    pub fn is_duplicate(&self, index: u64) -> bool {
84        let start = self.start.unwrap_or(index);
85
86        if index < start {
87            return true;
88        }
89
90        let offset = index - start;
91
92        if offset > (usize::MAX as u64) {
93            return false;
94        }
95
96        let offset = offset as usize;
97
98        self.window
99            .get(offset)
100            .map(|entry| entry.is_some())
101            .unwrap_or(false)
102    }
103
104    /// Push a given packet into the buffer and return index of the inserted
105    /// packet.
106    ///
107    /// The method returns an error if the packet cannot be inserted into the
108    /// buffer because it is either a duplicate or the buffer would exceed its
109    /// capacity.
110    pub fn push(&mut self, packet: RtpPacket) -> Result<u64, ReorderingError> {
111        let index = self.estimate_index(packet.sequence_number());
112
113        if self.start.is_none() {
114            self.start = Some(index);
115        }
116
117        let start = self.start.unwrap();
118
119        if index < start {
120            return Err(ReorderingError::DuplicatePacket(packet));
121        }
122
123        let offset = index - start;
124
125        if offset > (usize::MAX as u64) {
126            return Err(ReorderingError::BufferFull(packet));
127        }
128
129        let offset = offset as usize;
130
131        if offset < self.capacity {
132            while offset >= self.window.len() {
133                self.window.push_back(None);
134            }
135
136            let entry = &mut self.window[offset];
137
138            if entry.is_some() {
139                return Err(ReorderingError::DuplicatePacket(packet));
140            }
141
142            *entry = Some(packet);
143
144            Ok(index)
145        } else {
146            Err(ReorderingError::BufferFull(packet))
147        }
148    }
149
150    /// Take the next packet from the buffer.
151    ///
152    /// This method will return a packet only if there is a packet in the front
153    /// slot of the buffer. In other words, the index of the returned packet
154    /// will always be equal to the start index.
155    #[allow(clippy::should_implement_trait)]
156    pub fn next(&mut self) -> Option<RtpPacket> {
157        if let Some(entry) = self.window.front() {
158            if entry.is_some() {
159                return self.take();
160            }
161        }
162
163        None
164    }
165
166    /// Remove the front packet in the buffer and advance the window start
167    /// position by one.
168    ///
169    /// The method will always advance the window start position even if the
170    /// front slot is empty or if the underlying buffer itself is empty.
171    pub fn take(&mut self) -> Option<RtpPacket> {
172        if let Some(start) = self.start.as_mut() {
173            *start += 1;
174        }
175
176        self.window.pop_front()?
177    }
178
179    /// Check if the underlying buffer is empty.
180    #[inline]
181    pub fn is_empty(&self) -> bool {
182        self.window.is_empty()
183    }
184}