1use parsely_rs::*;
2use std::collections::BTreeSet;
3
4use super::{
5 rtcp_fb_header::RtcpFbHeader, rtcp_fb_packet::RtcpFbTlPacket, rtcp_header::RtcpHeader,
6};
7
8#[derive(Debug, PartialEq)]
15pub struct RtcpFbNackPacket {
16 pub header: RtcpHeader,
17 pub fb_header: RtcpFbHeader,
18 pub missing_seq_nums: BTreeSet<u16>,
19}
20
21impl RtcpFbNackPacket {
26 pub const FMT: u5 = u5::new(1);
27
28 pub fn add_missing_seq_num(&mut self, missing_seq_num: u16) {
29 self.missing_seq_nums.insert(missing_seq_num);
30 }
31
32 pub fn payload_length_bytes(&self) -> u16 {
33 let num_chunks = self.missing_seq_nums.chunk_by_max_difference(16).len() as u16;
36 num_chunks * 4
37 }
38}
39
40impl Default for RtcpFbNackPacket {
41 fn default() -> Self {
42 Self {
43 header: RtcpHeader::default()
44 .packet_type(RtcpFbTlPacket::PT)
45 .report_count(RtcpFbNackPacket::FMT),
46 fb_header: Default::default(),
47 missing_seq_nums: Default::default(),
48 }
49 }
50}
51
52impl StateSync for RtcpFbNackPacket {
53 type SyncCtx = ();
54 fn sync(&mut self, _sync_ctx: ()) -> ParselyResult<()> {
55 self.header
57 .sync((self.payload_length_bytes() + 8, RtcpFbNackPacket::FMT))
58 }
59}
60
61impl<B: BitBuf> ParselyRead<B> for RtcpFbNackPacket {
62 type Ctx = (RtcpHeader, RtcpFbHeader);
63 fn read<T: ByteOrder>(buf: &mut B, (header, fb_header): Self::Ctx) -> ParselyResult<Self> {
64 let mut missing_seq_nums = BTreeSet::new();
65 let mut nack_block_num = 1;
66 while buf.remaining_bytes() >= NackBlock::SIZE_BYTES {
67 let mut nack_block = NackBlock::read::<T>(buf, ())
68 .with_context(|| format!("Nack block {nack_block_num}"))?;
69 missing_seq_nums.append(&mut nack_block.missing_seq_nums);
70 nack_block_num += 1;
71 }
72 Ok(RtcpFbNackPacket {
73 header,
74 fb_header,
75 missing_seq_nums,
76 })
77 }
78}
79
80impl<B: BitBufMut> ParselyWrite<B> for RtcpFbNackPacket {
81 type Ctx = ();
82 fn write<T: ByteOrder>(&self, buf: &mut B, _ctx: Self::Ctx) -> ParselyResult<()> {
83 self.header.write::<T>(buf, ()).context("header")?;
84 self.fb_header.write::<T>(buf, ()).context("fb header")?;
85 for (i, chunk) in self
86 .missing_seq_nums
87 .chunk_by_max_difference(16)
88 .into_iter()
89 .enumerate()
90 {
91 if buf.remaining_mut_bytes() < NackBlock::SIZE_BYTES {
92 bail!("Not enough room to write nack block {i}");
93 }
94 let nack_block = NackBlock {
95 missing_seq_nums: chunk,
96 };
97 nack_block
98 .write::<T>(buf, ())
99 .with_context(|| format!("Writing nack block {i}"))?;
100 }
101
102 Ok(())
103 }
104}
105
106#[derive(Debug, Default, PartialEq)]
107pub struct NackBlock {
108 missing_seq_nums: BTreeSet<u16>,
109}
110
111impl NackBlock {
112 pub const SIZE_BYTES: usize = 4;
113
114 pub fn add_missing_seq_num(&mut self, missing_seq_num: u16) {
115 self.missing_seq_nums.insert(missing_seq_num);
116 }
117}
118
119impl<B: BitBuf> ParselyRead<B> for NackBlock {
120 type Ctx = ();
121 fn read<T: ByteOrder>(buf: &mut B, _ctx: Self::Ctx) -> ParselyResult<Self> {
122 let packet_id = buf.get_u16::<NetworkOrder>().context("packet id")?;
123 let blp = buf.get_u16::<NetworkOrder>().context("blp")?;
124
125 let mut missing_seq_nums = BTreeSet::new();
126 missing_seq_nums.insert(packet_id);
127 for shift_amount in 0..16 {
128 if (blp >> shift_amount) & 0x1 == 1 {
129 missing_seq_nums.insert(packet_id + shift_amount + 1);
130 }
131 }
132 Ok(NackBlock { missing_seq_nums })
133 }
134}
135
136impl_stateless_sync!(NackBlock);
137
138impl<B: BitBufMut> ParselyWrite<B> for NackBlock {
139 type Ctx = ();
140 fn write<T: ByteOrder>(&self, buf: &mut B, _ctx: Self::Ctx) -> ParselyResult<()> {
141 let packet_id = self.missing_seq_nums.first().ok_or(anyhow!(
142 "NackBlock must contain at least one sequence number"
143 ))?;
144 buf.put_u16::<T>(*packet_id).context("packet id")?;
145 let mut blp = 0u16;
146 for missing_seq_num in self.missing_seq_nums.iter().skip(1) {
148 let delta = missing_seq_num - packet_id;
149 if delta > 16 {
150 bail!("NACK cannot contain sequence number spread larger than 16");
151 }
152 let mask = 1u16 << (delta - 1);
153 blp |= mask;
154 }
155 buf.put_u16::<T>(blp).context("blp")?;
156 Ok(())
157 }
158}
159
160trait ChunkByMaxDifference<T> {
161 fn chunk_by_max_difference(&self, max_diff: T) -> Vec<BTreeSet<T>>;
162}
163
164impl ChunkByMaxDifference<u16> for BTreeSet<u16> {
165 fn chunk_by_max_difference(&self, max_diff: u16) -> Vec<BTreeSet<u16>> {
168 let mut all_chunks: Vec<BTreeSet<u16>> = Vec::new();
169 let Some(first) = self.first() else {
170 return all_chunks;
171 };
172 let mut curr_chunk: BTreeSet<u16> = BTreeSet::from([*first]);
173 for value in self.iter().skip(1) {
174 if value - curr_chunk.first().unwrap() > max_diff {
175 all_chunks.push(curr_chunk);
176 curr_chunk = BTreeSet::from([*value]);
177 } else {
178 curr_chunk.insert(*value);
179 }
180 }
181 all_chunks.push(curr_chunk);
182
183 all_chunks
184 }
185}
186
187#[cfg(test)]
188mod test {
189 use crate::rtcp::rtcp_fb_packet::RtcpFbTlPacket;
190
191 use super::*;
192
193 #[test]
194 fn test_read_nack_block() {
195 let mut bits = Bits::from_static_bytes(&[0x00, 0x0A, 0xA8, 0xA1]);
197 let nack_block = NackBlock::read::<NetworkOrder>(&mut bits, ()).unwrap();
198 assert_eq!(
199 nack_block.missing_seq_nums,
200 BTreeSet::from([10, 11, 16, 18, 22, 24, 26]),
201 );
202 }
203
204 #[test]
205 fn test_put_nack_block() {
206 let mut nack_block = NackBlock::default();
208 nack_block.add_missing_seq_num(10);
209 nack_block.add_missing_seq_num(11);
210 nack_block.add_missing_seq_num(16);
211 nack_block.add_missing_seq_num(18);
212 nack_block.add_missing_seq_num(22);
213 nack_block.add_missing_seq_num(24);
214 nack_block.add_missing_seq_num(26);
215
216 let mut bits_mut = BitsMut::new();
217 nack_block.write::<NetworkOrder>(&mut bits_mut, ()).unwrap();
218
219 let mut bits = bits_mut.freeze();
220 let read_nack_block = NackBlock::read::<NetworkOrder>(&mut bits, ()).unwrap();
221 assert_eq!(read_nack_block, nack_block);
222 }
223
224 #[test]
225 fn test_read_nack_packet() {
226 let rtcp_header = RtcpHeader {
227 report_count: RtcpFbNackPacket::FMT,
228 packet_type: RtcpFbTlPacket::PT,
229 length_field: 3,
230 ..Default::default()
231 };
232 let rtcp_fb_header = RtcpFbHeader::default()
233 .media_source_ssrc(42)
234 .sender_ssrc(24);
235 #[rustfmt::skip]
236 let nack_payload = vec![
237 0x00, 0x0A,
239 0xA8, 0xA1,
241 0x00, 0x28,
243 0x24, 0x82
245 ];
246 let mut bits = Bits::from_owner_bytes(nack_payload);
247 let nack_packet =
248 RtcpFbNackPacket::read::<NetworkOrder>(&mut bits, (rtcp_header, rtcp_fb_header))
249 .unwrap();
250 assert_eq!(
251 nack_packet.missing_seq_nums,
252 BTreeSet::from_iter([10, 11, 16, 18, 22, 24, 26, 40, 42, 48, 51, 54])
253 );
254 }
255
256 #[test]
257 fn test_default() {
258 let rtcp_fb_nack = RtcpFbNackPacket::default();
259 assert_eq!(rtcp_fb_nack.header.packet_type, RtcpFbTlPacket::PT);
260 assert_eq!(rtcp_fb_nack.header.report_count, RtcpFbNackPacket::FMT);
261 assert_eq!(rtcp_fb_nack.header.length_field, 0);
262 }
263
264 #[test]
265 fn test_sync() {
266 let mut rtcp_fb_nack = RtcpFbNackPacket::default();
267 rtcp_fb_nack.add_missing_seq_num(10);
268 rtcp_fb_nack.add_missing_seq_num(12);
269 rtcp_fb_nack.add_missing_seq_num(13);
270 rtcp_fb_nack.add_missing_seq_num(17);
271 rtcp_fb_nack.add_missing_seq_num(21);
272 rtcp_fb_nack.add_missing_seq_num(23);
273 rtcp_fb_nack.sync(()).unwrap();
274 assert_eq!(rtcp_fb_nack.header.length_field, 3);
276 }
277
278 #[test]
279 fn test_sync_multiple_blocks() {
280 let mut rtcp_fb_nack = RtcpFbNackPacket::default();
281 rtcp_fb_nack.add_missing_seq_num(10);
282 rtcp_fb_nack.add_missing_seq_num(12);
283 rtcp_fb_nack.add_missing_seq_num(13);
284 rtcp_fb_nack.add_missing_seq_num(17);
285 rtcp_fb_nack.add_missing_seq_num(21);
286 rtcp_fb_nack.add_missing_seq_num(23);
287 rtcp_fb_nack.add_missing_seq_num(44);
288 rtcp_fb_nack.sync(()).unwrap();
289 assert_eq!(rtcp_fb_nack.header.length_field, 4);
290 }
291
292 #[test]
293 fn test_put_rtcp_fb_nack() {
294 let mut rtcp_fb_nack = RtcpFbNackPacket::default();
295 rtcp_fb_nack.add_missing_seq_num(10);
296 rtcp_fb_nack.add_missing_seq_num(12);
297 rtcp_fb_nack.add_missing_seq_num(13);
298 rtcp_fb_nack.add_missing_seq_num(17);
299 rtcp_fb_nack.add_missing_seq_num(21);
300 rtcp_fb_nack.add_missing_seq_num(23);
301 rtcp_fb_nack.add_missing_seq_num(44);
302 rtcp_fb_nack.sync(()).unwrap();
303
304 let mut bits_mut = BitsMut::new();
305
306 rtcp_fb_nack
307 .write::<NetworkOrder>(&mut bits_mut, ())
308 .unwrap();
309 let mut bits = bits_mut.freeze();
310 let header = RtcpHeader::read::<NetworkOrder>(&mut bits, ()).unwrap();
311 let fb_header = RtcpFbHeader::read::<NetworkOrder>(&mut bits, ()).unwrap();
312 let read_rtcp_fb_nack =
313 RtcpFbNackPacket::read::<NetworkOrder>(&mut bits, (header, fb_header)).unwrap();
314 assert_eq!(read_rtcp_fb_nack, rtcp_fb_nack);
315 }
316}