1use bytes::Bytes;
31
32const ANNEXB_START: [u8; 4] = [0, 0, 0, 1];
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub struct RtpHeader {
38 pub payload_type: u8,
40 pub marker: bool,
42 pub sequence: u16,
44 pub timestamp: u32,
46 pub ssrc: u32,
48 pub payload_offset: usize,
50}
51
52impl RtpHeader {
53 pub fn parse(buf: &[u8]) -> Option<RtpHeader> {
57 if buf.len() < 12 {
58 return None;
59 }
60 let version = buf[0] >> 6;
61 if version != 2 {
62 return None;
63 }
64 let has_extension = buf[0] & 0x10 != 0;
65 let csrc_count = (buf[0] & 0x0F) as usize;
66 let marker = buf[1] & 0x80 != 0;
67 let payload_type = buf[1] & 0x7F;
68 let sequence = u16::from_be_bytes([buf[2], buf[3]]);
69 let timestamp = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
70 let ssrc = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]);
71
72 let mut offset = 12 + csrc_count * 4;
73 if has_extension {
74 if buf.len() < offset + 4 {
76 return None;
77 }
78 let ext_words = u16::from_be_bytes([buf[offset + 2], buf[offset + 3]]) as usize;
79 offset += 4 + ext_words * 4;
80 }
81 if buf.len() < offset {
82 return None;
83 }
84 Some(RtpHeader {
85 payload_type,
86 marker,
87 sequence,
88 timestamp,
89 ssrc,
90 payload_offset: offset,
91 })
92 }
93}
94
95#[derive(Debug, Clone, Copy, PartialEq, Eq)]
97#[non_exhaustive]
98pub enum DepacketizeError {
99 Truncated,
101 OutOfOrder,
104 Unsupported(u8),
106}
107
108#[derive(Debug, Clone, Copy, Default)]
117pub struct AacDepacketizer {
118 size_length: u8,
120 index_length: u8,
122}
123
124impl AacDepacketizer {
125 pub fn new() -> Self {
128 Self {
129 size_length: 13,
130 index_length: 3,
131 }
132 }
133
134 pub fn with_lengths(size_length: u8, index_length: u8) -> Self {
136 Self {
137 size_length,
138 index_length,
139 }
140 }
141
142 pub fn push(&self, payload: &[u8]) -> Result<Vec<Bytes>, DepacketizeError> {
144 if payload.len() < 2 {
145 return Err(DepacketizeError::Truncated);
146 }
147 if self.size_length == 0 || self.size_length > 16 {
150 return Err(DepacketizeError::Unsupported(self.size_length));
151 }
152 let header_bits = u16::from_be_bytes([payload[0], payload[1]]) as usize;
153 let au_header_bits = self.size_length as usize + self.index_length as usize;
154 if au_header_bits == 0 {
155 return Err(DepacketizeError::Unsupported(0));
156 }
157 let header_bytes = header_bits.div_ceil(8);
158 let au_count = header_bits / au_header_bits;
159 let headers = payload
160 .get(2..2 + header_bytes)
161 .ok_or(DepacketizeError::Truncated)?;
162 let mut data_off = 2 + header_bytes;
163 let mut out = Vec::with_capacity(au_count);
164 for i in 0..au_count {
165 let bit = i * au_header_bits;
168 let byte = bit / 8;
169 let hdr = headers
170 .get(byte..byte + 2)
171 .ok_or(DepacketizeError::Truncated)?;
172 let size = (u16::from_be_bytes([hdr[0], hdr[1]]) >> (16 - self.size_length)) as usize;
173 let end = data_off + size;
174 let au = payload
175 .get(data_off..end)
176 .ok_or(DepacketizeError::Truncated)?;
177 out.push(Bytes::copy_from_slice(au));
178 data_off = end;
179 }
180 Ok(out)
181 }
182}
183
184#[derive(Debug, Default)]
192pub struct H264Depacketizer {
193 au: Vec<u8>,
195 fua: Vec<u8>,
197 in_fragment: bool,
199 fua_header: u8,
201 current_ts: Option<u32>,
203 last_seq: Option<u16>,
205}
206
207impl H264Depacketizer {
208 pub fn new() -> Self {
210 Self::default()
211 }
212
213 fn append_nal(&mut self, nal: &[u8]) {
215 self.au.extend_from_slice(&ANNEXB_START);
216 self.au.extend_from_slice(nal);
217 }
218
219 fn pending_is_keyframe(&self) -> bool {
221 let mut i = 0;
223 while i + 4 < self.au.len() {
224 if self.au[i..i + 4] == ANNEXB_START {
225 let nal_type = self.au[i + 4] & 0x1F;
226 if nal_type == 5 {
227 return true;
228 }
229 }
230 i += 1;
231 }
232 false
233 }
234
235 fn take_au(&mut self) -> Option<AccessUnit> {
237 if self.au.is_empty() {
238 return None;
239 }
240 let keyframe = self.pending_is_keyframe();
241 let timestamp = self.current_ts.unwrap_or(0);
242 let data = Bytes::from(std::mem::take(&mut self.au));
243 self.current_ts = None;
244 Some(AccessUnit {
245 data,
246 timestamp,
247 keyframe,
248 })
249 }
250
251 pub fn push(
254 &mut self,
255 payload: &[u8],
256 marker: bool,
257 timestamp: u32,
258 sequence: u16,
259 ) -> Result<Option<AccessUnit>, DepacketizeError> {
260 if payload.is_empty() {
261 return Err(DepacketizeError::Truncated);
262 }
263
264 let mut completed = None;
267 if let Some(ts) = self.current_ts {
268 if ts != timestamp && !self.in_fragment {
269 completed = self.take_au();
270 }
271 }
272 self.current_ts = Some(timestamp);
273
274 let nal_type = payload[0] & 0x1F;
275 match nal_type {
276 1..=23 => {
277 self.append_nal(payload);
279 }
280 24 => {
281 let mut i = 1;
283 while i + 2 <= payload.len() {
284 let size = u16::from_be_bytes([payload[i], payload[i + 1]]) as usize;
285 i += 2;
286 if i + size > payload.len() {
287 return Err(DepacketizeError::Truncated);
288 }
289 self.append_nal(&payload[i..i + size]);
290 i += size;
291 }
292 }
293 28 => {
294 if payload.len() < 2 {
296 return Err(DepacketizeError::Truncated);
297 }
298 let fu_header = payload[1];
299 let start = fu_header & 0x80 != 0;
300 let end = fu_header & 0x40 != 0;
301 let frag_type = fu_header & 0x1F;
302
303 if start {
304 self.fua_header = (payload[0] & 0xE0) | frag_type;
307 self.fua.clear();
308 self.fua.push(self.fua_header);
309 self.in_fragment = true;
310 } else if !self.in_fragment {
311 return Err(DepacketizeError::OutOfOrder);
313 } else if self.seq_gap(sequence) {
314 self.in_fragment = false;
315 self.fua.clear();
316 return Err(DepacketizeError::OutOfOrder);
317 }
318 self.fua.extend_from_slice(&payload[2..]);
319
320 if end && self.in_fragment {
321 let nal = std::mem::take(&mut self.fua);
322 self.append_nal(&nal);
323 self.in_fragment = false;
324 }
325 }
326 other => return Err(DepacketizeError::Unsupported(other)),
327 }
328
329 self.last_seq = Some(sequence);
330
331 if completed.is_some() {
332 return Ok(completed);
333 }
334 if marker {
335 return Ok(self.take_au());
336 }
337 Ok(None)
338 }
339
340 fn seq_gap(&self, sequence: u16) -> bool {
342 match self.last_seq {
343 Some(prev) => sequence.wrapping_sub(prev) != 1,
344 None => false,
345 }
346 }
347}
348
349#[derive(Debug, Clone, PartialEq, Eq)]
351pub struct AccessUnit {
352 pub data: Bytes,
354 pub timestamp: u32,
356 pub keyframe: bool,
358}
359
360#[cfg(test)]
361mod tests {
362 use super::*;
363
364 fn rtp(seq: u16, ts: u32, marker: bool, payload: &[u8]) -> Vec<u8> {
366 let mut p = vec![0x80, if marker { 0x80 | 96 } else { 96 }];
367 p.extend_from_slice(&seq.to_be_bytes());
368 p.extend_from_slice(&ts.to_be_bytes());
369 p.extend_from_slice(&[0, 0, 0, 1]); p.extend_from_slice(payload);
371 p
372 }
373
374 #[test]
375 fn parses_fixed_header_and_payload_offset() {
376 let pkt = rtp(7, 9000, true, &[0x65, 0xAA]);
377 let h = RtpHeader::parse(&pkt).unwrap();
378 assert_eq!(h.sequence, 7);
379 assert_eq!(h.timestamp, 9000);
380 assert!(h.marker);
381 assert_eq!(h.payload_type, 96);
382 assert_eq!(h.payload_offset, 12);
383 assert_eq!(&pkt[h.payload_offset..], &[0x65, 0xAA]);
384 }
385
386 #[test]
387 fn rejects_wrong_version_and_short_buffers() {
388 assert!(RtpHeader::parse(&[0x00; 12]).is_none()); assert!(RtpHeader::parse(&[0x80; 4]).is_none()); }
391
392 #[test]
393 fn honors_csrc_count_in_payload_offset() {
394 let mut pkt = rtp(1, 0, false, &[0x41]);
395 pkt[0] = 0x82; let mut with_csrc = pkt[..12].to_vec();
397 with_csrc.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF, 0, 0, 0, 0]); with_csrc.push(0x41);
399 let h = RtpHeader::parse(&with_csrc).unwrap();
400 assert_eq!(h.payload_offset, 20);
401 }
402
403 #[test]
404 fn aac_hbr_splits_two_access_units() {
405 let mut p = Vec::new();
408 p.extend_from_slice(&32u16.to_be_bytes()); p.extend_from_slice(&((3u16) << 3).to_be_bytes()); p.extend_from_slice(&((2u16) << 3).to_be_bytes()); p.extend_from_slice(&[0xA1, 0xA2, 0xA3]); p.extend_from_slice(&[0xB1, 0xB2]); let aus = AacDepacketizer::new().push(&p).unwrap();
414 assert_eq!(aus.len(), 2);
415 assert_eq!(&aus[0][..], &[0xA1, 0xA2, 0xA3]);
416 assert_eq!(&aus[1][..], &[0xB1, 0xB2]);
417 }
418
419 #[test]
420 fn aac_hbr_single_au() {
421 let mut p = Vec::new();
422 p.extend_from_slice(&16u16.to_be_bytes()); p.extend_from_slice(&((4u16) << 3).to_be_bytes()); p.extend_from_slice(&[1, 2, 3, 4]);
425 let aus = AacDepacketizer::new().push(&p).unwrap();
426 assert_eq!(aus.len(), 1);
427 assert_eq!(&aus[0][..], &[1, 2, 3, 4]);
428 }
429
430 #[test]
431 fn aac_truncated_payload_errors() {
432 assert_eq!(
433 AacDepacketizer::new().push(&[0x00]),
434 Err(DepacketizeError::Truncated)
435 );
436 let mut p = 16u16.to_be_bytes().to_vec();
438 p.extend_from_slice(&((8u16) << 3).to_be_bytes());
439 p.extend_from_slice(&[1, 2]);
440 assert_eq!(
441 AacDepacketizer::new().push(&p),
442 Err(DepacketizeError::Truncated)
443 );
444 }
445
446 #[test]
447 fn single_nal_packet_emits_annexb_on_marker() {
448 let mut d = H264Depacketizer::new();
449 let out = d.push(&[0x41, 0x9A, 0xBC], true, 3000, 1).unwrap().unwrap();
451 assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x41, 0x9A, 0xBC]);
452 assert!(!out.keyframe);
453 assert_eq!(out.timestamp, 3000);
454 }
455
456 #[test]
457 fn idr_single_nal_is_flagged_keyframe() {
458 let mut d = H264Depacketizer::new();
459 let out = d.push(&[0x65, 0x01], true, 0, 1).unwrap().unwrap();
460 assert!(out.keyframe);
461 }
462
463 #[test]
464 fn stap_a_splits_aggregated_nals() {
465 let payload = [24, 0, 2, 0xAA, 0xBB, 0, 3, 0xCC, 0xDD, 0xEE];
467 let mut d = H264Depacketizer::new();
468 let out = d.push(&payload, true, 0, 1).unwrap().unwrap();
469 assert_eq!(
470 &out.data[..],
471 &[0, 0, 0, 1, 0xAA, 0xBB, 0, 0, 0, 1, 0xCC, 0xDD, 0xEE]
472 );
473 }
474
475 #[test]
476 fn fu_a_reassembles_fragmented_nal() {
477 let mut d = H264Depacketizer::new();
478 assert!(d
480 .push(&[0x7C, 0x85, 0x11, 0x22], false, 0, 1)
481 .unwrap()
482 .is_none());
483 assert!(d.push(&[0x7C, 0x05, 0x33], false, 0, 2).unwrap().is_none());
485 let out = d.push(&[0x7C, 0x45, 0x44], true, 0, 3).unwrap().unwrap();
487 assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x65, 0x11, 0x22, 0x33, 0x44]);
489 assert!(out.keyframe);
490 }
491
492 #[test]
493 fn fu_a_sequence_gap_reports_out_of_order() {
494 let mut d = H264Depacketizer::new();
495 d.push(&[0x7C, 0x85, 0x11], false, 0, 1).unwrap();
496 assert_eq!(
498 d.push(&[0x7C, 0x05, 0x22], false, 0, 5),
499 Err(DepacketizeError::OutOfOrder)
500 );
501 }
502
503 #[test]
504 fn timestamp_change_flushes_previous_au_without_marker() {
505 let mut d = H264Depacketizer::new();
506 assert!(d.push(&[0x41, 0x01], false, 1000, 1).unwrap().is_none());
508 let out = d.push(&[0x41, 0x02], false, 2000, 2).unwrap().unwrap();
510 assert_eq!(out.timestamp, 1000);
511 assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x41, 0x01]);
512 }
513}