rtp_parse/rtcp/
rtcp_fb_nack.rs

1use parsely_rs::*;
2use std::collections::BTreeSet;
3
4use super::{
5    rtcp_fb_header::RtcpFbHeader, rtcp_fb_packet::RtcpFbTlPacket, rtcp_header::RtcpHeader,
6};
7
8/// https://datatracker.ietf.org/doc/html/rfc4585#section-6.2.1
9///  0                   1                   2                   3
10///  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
11/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
12/// |            PID                |             BLP               |
13/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
14#[derive(Debug, PartialEq)]
15pub struct RtcpFbNackPacket {
16    pub header: RtcpHeader,
17    pub fb_header: RtcpFbHeader,
18    pub missing_seq_nums: BTreeSet<u16>,
19}
20
21// TODO: somewhere in here we need to enforce some kind of maximum number of sequence numbers that
22// can be included in a nack packet (or, a way for a nack to fail to serialize due to having more
23// sequence numbers that couldn't be added to the buffer)
24
25impl RtcpFbNackPacket {
26    pub const FMT: u5 = u5::new(1);
27
28    pub fn add_missing_seq_num(&mut self, missing_seq_num: u16) {
29        self.missing_seq_nums.insert(missing_seq_num);
30    }
31
32    pub fn payload_length_bytes(&self) -> u16 {
33        // TODO: not ideal to have to do this chunking just to calculate the size, maybe we can at
34        // least cache the result and re-use it if no more packets are added?
35        let num_chunks = self.missing_seq_nums.chunk_by_max_difference(16).len() as u16;
36        num_chunks * 4
37    }
38}
39
40impl Default for RtcpFbNackPacket {
41    fn default() -> Self {
42        Self {
43            header: RtcpHeader::default()
44                .packet_type(RtcpFbTlPacket::PT)
45                .report_count(RtcpFbNackPacket::FMT),
46            fb_header: Default::default(),
47            missing_seq_nums: Default::default(),
48        }
49    }
50}
51
52impl StateSync for RtcpFbNackPacket {
53    type SyncCtx = ();
54    fn sync(&mut self, _sync_ctx: ()) -> ParselyResult<()> {
55        // Add 8 for the size of the fb header
56        self.header
57            .sync((self.payload_length_bytes() + 8, RtcpFbNackPacket::FMT))
58    }
59}
60
61impl<B: BitBuf> ParselyRead<B> for RtcpFbNackPacket {
62    type Ctx = (RtcpHeader, RtcpFbHeader);
63    fn read<T: ByteOrder>(buf: &mut B, (header, fb_header): Self::Ctx) -> ParselyResult<Self> {
64        let mut missing_seq_nums = BTreeSet::new();
65        let mut nack_block_num = 1;
66        while buf.remaining_bytes() >= NackBlock::SIZE_BYTES {
67            let mut nack_block = NackBlock::read::<T>(buf, ())
68                .with_context(|| format!("Nack block {nack_block_num}"))?;
69            missing_seq_nums.append(&mut nack_block.missing_seq_nums);
70            nack_block_num += 1;
71        }
72        Ok(RtcpFbNackPacket {
73            header,
74            fb_header,
75            missing_seq_nums,
76        })
77    }
78}
79
80impl<B: BitBufMut> ParselyWrite<B> for RtcpFbNackPacket {
81    type Ctx = ();
82    fn write<T: ByteOrder>(&self, buf: &mut B, _ctx: Self::Ctx) -> ParselyResult<()> {
83        self.header.write::<T>(buf, ()).context("header")?;
84        self.fb_header.write::<T>(buf, ()).context("fb header")?;
85        for (i, chunk) in self
86            .missing_seq_nums
87            .chunk_by_max_difference(16)
88            .into_iter()
89            .enumerate()
90        {
91            if buf.remaining_mut_bytes() < NackBlock::SIZE_BYTES {
92                bail!("Not enough room to write nack block {i}");
93            }
94            let nack_block = NackBlock {
95                missing_seq_nums: chunk,
96            };
97            nack_block
98                .write::<T>(buf, ())
99                .with_context(|| format!("Writing nack block {i}"))?;
100        }
101
102        Ok(())
103    }
104}
105
106#[derive(Debug, Default, PartialEq)]
107pub struct NackBlock {
108    missing_seq_nums: BTreeSet<u16>,
109}
110
111impl NackBlock {
112    pub const SIZE_BYTES: usize = 4;
113
114    pub fn add_missing_seq_num(&mut self, missing_seq_num: u16) {
115        self.missing_seq_nums.insert(missing_seq_num);
116    }
117}
118
119impl<B: BitBuf> ParselyRead<B> for NackBlock {
120    type Ctx = ();
121    fn read<T: ByteOrder>(buf: &mut B, _ctx: Self::Ctx) -> ParselyResult<Self> {
122        let packet_id = buf.get_u16::<NetworkOrder>().context("packet id")?;
123        let blp = buf.get_u16::<NetworkOrder>().context("blp")?;
124
125        let mut missing_seq_nums = BTreeSet::new();
126        missing_seq_nums.insert(packet_id);
127        for shift_amount in 0..16 {
128            if (blp >> shift_amount) & 0x1 == 1 {
129                missing_seq_nums.insert(packet_id + shift_amount + 1);
130            }
131        }
132        Ok(NackBlock { missing_seq_nums })
133    }
134}
135
136impl_stateless_sync!(NackBlock);
137
138impl<B: BitBufMut> ParselyWrite<B> for NackBlock {
139    type Ctx = ();
140    fn write<T: ByteOrder>(&self, buf: &mut B, _ctx: Self::Ctx) -> ParselyResult<()> {
141        let packet_id = self.missing_seq_nums.first().ok_or(anyhow!(
142            "NackBlock must contain at least one sequence number"
143        ))?;
144        buf.put_u16::<T>(*packet_id).context("packet id")?;
145        let mut blp = 0u16;
146        // Skip past the first one since it was used for the packet id
147        for missing_seq_num in self.missing_seq_nums.iter().skip(1) {
148            let delta = missing_seq_num - packet_id;
149            if delta > 16 {
150                bail!("NACK cannot contain sequence number spread larger than 16");
151            }
152            let mask = 1u16 << (delta - 1);
153            blp |= mask;
154        }
155        buf.put_u16::<T>(blp).context("blp")?;
156        Ok(())
157    }
158}
159
160trait ChunkByMaxDifference<T> {
161    fn chunk_by_max_difference(&self, max_diff: T) -> Vec<BTreeSet<T>>;
162}
163
164impl ChunkByMaxDifference<u16> for BTreeSet<u16> {
165    /// Return a Vec of BTreeSets where the values included in each one do not differ by more than
166    /// the given `max_diff`.`
167    fn chunk_by_max_difference(&self, max_diff: u16) -> Vec<BTreeSet<u16>> {
168        let mut all_chunks: Vec<BTreeSet<u16>> = Vec::new();
169        let Some(first) = self.first() else {
170            return all_chunks;
171        };
172        let mut curr_chunk: BTreeSet<u16> = BTreeSet::from([*first]);
173        for value in self.iter().skip(1) {
174            if value - curr_chunk.first().unwrap() > max_diff {
175                all_chunks.push(curr_chunk);
176                curr_chunk = BTreeSet::from([*value]);
177            } else {
178                curr_chunk.insert(*value);
179            }
180        }
181        all_chunks.push(curr_chunk);
182
183        all_chunks
184    }
185}
186
187#[cfg(test)]
188mod test {
189    use crate::rtcp::rtcp_fb_packet::RtcpFbTlPacket;
190
191    use super::*;
192
193    #[test]
194    fn test_read_nack_block() {
195        // Missing seq nums 10, 11, 16, 18, 22, 24, 26
196        let mut bits = Bits::from_static_bytes(&[0x00, 0x0A, 0xA8, 0xA1]);
197        let nack_block = NackBlock::read::<NetworkOrder>(&mut bits, ()).unwrap();
198        assert_eq!(
199            nack_block.missing_seq_nums,
200            BTreeSet::from([10, 11, 16, 18, 22, 24, 26]),
201        );
202    }
203
204    #[test]
205    fn test_put_nack_block() {
206        // Missing seq nums 10, 11, 16, 18, 22, 24, 26
207        let mut nack_block = NackBlock::default();
208        nack_block.add_missing_seq_num(10);
209        nack_block.add_missing_seq_num(11);
210        nack_block.add_missing_seq_num(16);
211        nack_block.add_missing_seq_num(18);
212        nack_block.add_missing_seq_num(22);
213        nack_block.add_missing_seq_num(24);
214        nack_block.add_missing_seq_num(26);
215
216        let mut bits_mut = BitsMut::new();
217        nack_block.write::<NetworkOrder>(&mut bits_mut, ()).unwrap();
218
219        let mut bits = bits_mut.freeze();
220        let read_nack_block = NackBlock::read::<NetworkOrder>(&mut bits, ()).unwrap();
221        assert_eq!(read_nack_block, nack_block);
222    }
223
224    #[test]
225    fn test_read_nack_packet() {
226        let rtcp_header = RtcpHeader {
227            report_count: RtcpFbNackPacket::FMT,
228            packet_type: RtcpFbTlPacket::PT,
229            length_field: 3,
230            ..Default::default()
231        };
232        let rtcp_fb_header = RtcpFbHeader::default()
233            .media_source_ssrc(42)
234            .sender_ssrc(24);
235        #[rustfmt::skip]
236        let nack_payload = vec![
237            // packet id 10
238            0x00, 0x0A,
239            // Missing seq nums 10, 11, 16, 18, 22, 24, 26
240            0xA8, 0xA1,
241            // packet id 40
242            0x00, 0x28,
243            // Missing seq nums 40, 42, 48, 51, 54
244            0x24, 0x82
245        ];
246        let mut bits = Bits::from_owner_bytes(nack_payload);
247        let nack_packet =
248            RtcpFbNackPacket::read::<NetworkOrder>(&mut bits, (rtcp_header, rtcp_fb_header))
249                .unwrap();
250        assert_eq!(
251            nack_packet.missing_seq_nums,
252            BTreeSet::from_iter([10, 11, 16, 18, 22, 24, 26, 40, 42, 48, 51, 54])
253        );
254    }
255
256    #[test]
257    fn test_default() {
258        let rtcp_fb_nack = RtcpFbNackPacket::default();
259        assert_eq!(rtcp_fb_nack.header.packet_type, RtcpFbTlPacket::PT);
260        assert_eq!(rtcp_fb_nack.header.report_count, RtcpFbNackPacket::FMT);
261        assert_eq!(rtcp_fb_nack.header.length_field, 0);
262    }
263
264    #[test]
265    fn test_sync() {
266        let mut rtcp_fb_nack = RtcpFbNackPacket::default();
267        rtcp_fb_nack.add_missing_seq_num(10);
268        rtcp_fb_nack.add_missing_seq_num(12);
269        rtcp_fb_nack.add_missing_seq_num(13);
270        rtcp_fb_nack.add_missing_seq_num(17);
271        rtcp_fb_nack.add_missing_seq_num(21);
272        rtcp_fb_nack.add_missing_seq_num(23);
273        rtcp_fb_nack.sync(()).unwrap();
274        // Above missing packets should fit in a single block
275        assert_eq!(rtcp_fb_nack.header.length_field, 3);
276    }
277
278    #[test]
279    fn test_sync_multiple_blocks() {
280        let mut rtcp_fb_nack = RtcpFbNackPacket::default();
281        rtcp_fb_nack.add_missing_seq_num(10);
282        rtcp_fb_nack.add_missing_seq_num(12);
283        rtcp_fb_nack.add_missing_seq_num(13);
284        rtcp_fb_nack.add_missing_seq_num(17);
285        rtcp_fb_nack.add_missing_seq_num(21);
286        rtcp_fb_nack.add_missing_seq_num(23);
287        rtcp_fb_nack.add_missing_seq_num(44);
288        rtcp_fb_nack.sync(()).unwrap();
289        assert_eq!(rtcp_fb_nack.header.length_field, 4);
290    }
291
292    #[test]
293    fn test_put_rtcp_fb_nack() {
294        let mut rtcp_fb_nack = RtcpFbNackPacket::default();
295        rtcp_fb_nack.add_missing_seq_num(10);
296        rtcp_fb_nack.add_missing_seq_num(12);
297        rtcp_fb_nack.add_missing_seq_num(13);
298        rtcp_fb_nack.add_missing_seq_num(17);
299        rtcp_fb_nack.add_missing_seq_num(21);
300        rtcp_fb_nack.add_missing_seq_num(23);
301        rtcp_fb_nack.add_missing_seq_num(44);
302        rtcp_fb_nack.sync(()).unwrap();
303
304        let mut bits_mut = BitsMut::new();
305
306        rtcp_fb_nack
307            .write::<NetworkOrder>(&mut bits_mut, ())
308            .unwrap();
309        let mut bits = bits_mut.freeze();
310        let header = RtcpHeader::read::<NetworkOrder>(&mut bits, ()).unwrap();
311        let fb_header = RtcpFbHeader::read::<NetworkOrder>(&mut bits, ()).unwrap();
312        let read_rtcp_fb_nack =
313            RtcpFbNackPacket::read::<NetworkOrder>(&mut bits, (header, fb_header)).unwrap();
314        assert_eq!(read_rtcp_fb_nack, rtcp_fb_nack);
315    }
316}