1use bytes::Bytes;
31
32const ANNEXB_START: [u8; 4] = [0, 0, 0, 1];
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub struct RtpHeader {
38 pub payload_type: u8,
40 pub marker: bool,
42 pub sequence: u16,
44 pub timestamp: u32,
46 pub ssrc: u32,
48 pub payload_offset: usize,
50}
51
52impl RtpHeader {
53 pub fn parse(buf: &[u8]) -> Option<RtpHeader> {
57 use super::byteops::ByteReader;
58 let mut r = ByteReader::new(buf);
59 let b0 = r.u8()?;
60 if b0 >> 6 != 2 {
61 return None; }
63 let has_extension = b0 & 0x10 != 0;
64 let csrc_count = (b0 & 0x0F) as usize;
65 let b1 = r.u8()?;
66 let marker = b1 & 0x80 != 0;
67 let payload_type = b1 & 0x7F;
68 let sequence = r.u16_be()?;
69 let timestamp = r.u32_be()?;
70 let ssrc = r.u32_be()?;
71 r.skip(csrc_count * 4)?; if has_extension {
74 r.skip(2)?;
76 let ext_words = r.u16_be()? as usize;
77 r.skip(ext_words * 4)?;
78 }
79 Some(RtpHeader {
80 payload_type,
81 marker,
82 sequence,
83 timestamp,
84 ssrc,
85 payload_offset: r.position(),
86 })
87 }
88}
89
90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92#[non_exhaustive]
93pub enum DepacketizeError {
94 Truncated,
96 OutOfOrder,
99 Unsupported(u8),
101}
102
103#[derive(Debug, Clone, Copy, Default)]
112pub struct AacDepacketizer {
113 size_length: u8,
115 index_length: u8,
117}
118
119impl AacDepacketizer {
120 pub fn new() -> Self {
123 Self {
124 size_length: 13,
125 index_length: 3,
126 }
127 }
128
129 pub fn with_lengths(size_length: u8, index_length: u8) -> Self {
131 Self {
132 size_length,
133 index_length,
134 }
135 }
136
137 pub fn push(&self, payload: &[u8]) -> Result<Vec<Bytes>, DepacketizeError> {
139 if payload.len() < 2 {
140 return Err(DepacketizeError::Truncated);
141 }
142 if self.size_length == 0 || self.size_length > 16 {
145 return Err(DepacketizeError::Unsupported(self.size_length));
146 }
147 let header_bits = u16::from_be_bytes([payload[0], payload[1]]) as usize;
148 let au_header_bits = self.size_length as usize + self.index_length as usize;
149 if au_header_bits == 0 {
150 return Err(DepacketizeError::Unsupported(0));
151 }
152 let header_bytes = header_bits.div_ceil(8);
153 let au_count = header_bits / au_header_bits;
154 let headers = payload
155 .get(2..2 + header_bytes)
156 .ok_or(DepacketizeError::Truncated)?;
157 let mut data_off = 2 + header_bytes;
158 let mut out = Vec::with_capacity(au_count);
159 for i in 0..au_count {
160 let bit = i * au_header_bits;
163 let byte = bit / 8;
164 let hdr = headers
165 .get(byte..byte + 2)
166 .ok_or(DepacketizeError::Truncated)?;
167 let size = (u16::from_be_bytes([hdr[0], hdr[1]]) >> (16 - self.size_length)) as usize;
168 let end = data_off + size;
169 let au = payload
170 .get(data_off..end)
171 .ok_or(DepacketizeError::Truncated)?;
172 out.push(Bytes::copy_from_slice(au));
173 data_off = end;
174 }
175 Ok(out)
176 }
177}
178
179#[derive(Debug, Clone)]
186pub struct RtpPacketizer {
187 payload_type: u8,
188 ssrc: u32,
189 sequence: u16,
190 max_payload: usize,
192}
193
194impl RtpPacketizer {
195 pub fn new(payload_type: u8, ssrc: u32, mtu: usize) -> Self {
198 Self {
199 payload_type,
200 ssrc,
201 sequence: 0,
202 max_payload: mtu.saturating_sub(12).max(1),
203 }
204 }
205
206 fn header(&mut self, marker: bool, timestamp: u32, out: &mut Vec<u8>) {
208 out.push(0x80); out.push(if marker { 0x80 } else { 0 } | (self.payload_type & 0x7F));
210 out.extend_from_slice(&self.sequence.to_be_bytes());
211 out.extend_from_slice(×tamp.to_be_bytes());
212 out.extend_from_slice(&self.ssrc.to_be_bytes());
213 self.sequence = self.sequence.wrapping_add(1);
214 }
215
216 pub fn packetize(&mut self, access_unit: &[u8], timestamp: u32) -> Vec<Vec<u8>> {
218 let nals: Vec<&[u8]> = crate::codec::h264::iter_nals_annexb(access_unit)
220 .filter(|n| !n.is_empty())
221 .collect();
222 let mut packets = Vec::new();
223 for (i, nal) in nals.iter().enumerate() {
224 let last_nal = i + 1 == nals.len();
225 if nal.len() <= self.max_payload {
226 let mut pkt = Vec::with_capacity(12 + nal.len());
228 self.header(last_nal, timestamp, &mut pkt);
229 pkt.extend_from_slice(nal);
230 packets.push(pkt);
231 } else {
232 self.fragment_fua(nal, timestamp, last_nal, &mut packets);
233 }
234 }
235 packets
236 }
237
238 fn fragment_fua(&mut self, nal: &[u8], timestamp: u32, last_nal: bool, out: &mut Vec<Vec<u8>>) {
240 let nal_header = nal[0];
241 let fu_indicator = (nal_header & 0xE0) | 28; let nal_type = nal_header & 0x1F;
243 let body = &nal[1..];
244 let chunk = self.max_payload.saturating_sub(2).max(1);
246 let n_chunks = body.len().div_ceil(chunk);
247 for (idx, part) in body.chunks(chunk).enumerate() {
248 let start = idx == 0;
249 let end = idx + 1 == n_chunks;
250 let mut fu_header = nal_type;
251 if start {
252 fu_header |= 0x80;
253 }
254 if end {
255 fu_header |= 0x40;
256 }
257 let mut pkt = Vec::with_capacity(12 + 2 + part.len());
258 self.header(last_nal && end, timestamp, &mut pkt);
260 pkt.push(fu_indicator);
261 pkt.push(fu_header);
262 pkt.extend_from_slice(part);
263 out.push(pkt);
264 }
265 }
266}
267
268#[derive(Debug, Default)]
276pub struct H264Depacketizer {
277 au: Vec<u8>,
279 fua: Vec<u8>,
281 in_fragment: bool,
283 fua_header: u8,
285 current_ts: Option<u32>,
287 last_seq: Option<u16>,
289}
290
291impl H264Depacketizer {
292 pub fn new() -> Self {
294 Self::default()
295 }
296
297 fn append_nal(&mut self, nal: &[u8]) {
299 self.au.extend_from_slice(&ANNEXB_START);
300 self.au.extend_from_slice(nal);
301 }
302
303 fn pending_is_keyframe(&self) -> bool {
305 let mut i = 0;
307 while i + 4 < self.au.len() {
308 if self.au[i..i + 4] == ANNEXB_START {
309 let nal_type = self.au[i + 4] & 0x1F;
310 if nal_type == 5 {
311 return true;
312 }
313 }
314 i += 1;
315 }
316 false
317 }
318
319 fn take_au(&mut self) -> Option<AccessUnit> {
321 if self.au.is_empty() {
322 return None;
323 }
324 let keyframe = self.pending_is_keyframe();
325 let timestamp = self.current_ts.unwrap_or(0);
326 let data = Bytes::from(std::mem::take(&mut self.au));
327 self.current_ts = None;
328 Some(AccessUnit {
329 data,
330 timestamp,
331 keyframe,
332 })
333 }
334
335 pub fn push(
338 &mut self,
339 payload: &[u8],
340 marker: bool,
341 timestamp: u32,
342 sequence: u16,
343 ) -> Result<Option<AccessUnit>, DepacketizeError> {
344 if payload.is_empty() {
345 return Err(DepacketizeError::Truncated);
346 }
347
348 let mut completed = None;
351 if let Some(ts) = self.current_ts {
352 if ts != timestamp && !self.in_fragment {
353 completed = self.take_au();
354 }
355 }
356 self.current_ts = Some(timestamp);
357
358 let nal_type = payload[0] & 0x1F;
359 match nal_type {
360 1..=23 => {
361 self.append_nal(payload);
363 }
364 24 => {
365 let mut i = 1;
367 while i + 2 <= payload.len() {
368 let size = u16::from_be_bytes([payload[i], payload[i + 1]]) as usize;
369 i += 2;
370 if i + size > payload.len() {
371 return Err(DepacketizeError::Truncated);
372 }
373 self.append_nal(&payload[i..i + size]);
374 i += size;
375 }
376 }
377 28 => {
378 if payload.len() < 2 {
380 return Err(DepacketizeError::Truncated);
381 }
382 let fu_header = payload[1];
383 let start = fu_header & 0x80 != 0;
384 let end = fu_header & 0x40 != 0;
385 let frag_type = fu_header & 0x1F;
386
387 if start {
388 self.fua_header = (payload[0] & 0xE0) | frag_type;
391 self.fua.clear();
392 self.fua.push(self.fua_header);
393 self.in_fragment = true;
394 } else if !self.in_fragment {
395 return Err(DepacketizeError::OutOfOrder);
397 } else if self.seq_gap(sequence) {
398 self.in_fragment = false;
399 self.fua.clear();
400 return Err(DepacketizeError::OutOfOrder);
401 }
402 self.fua.extend_from_slice(&payload[2..]);
403
404 if end && self.in_fragment {
405 let nal = std::mem::take(&mut self.fua);
406 self.append_nal(&nal);
407 self.in_fragment = false;
408 }
409 }
410 other => return Err(DepacketizeError::Unsupported(other)),
411 }
412
413 self.last_seq = Some(sequence);
414
415 if completed.is_some() {
416 return Ok(completed);
417 }
418 if marker {
419 return Ok(self.take_au());
420 }
421 Ok(None)
422 }
423
424 fn seq_gap(&self, sequence: u16) -> bool {
426 match self.last_seq {
427 Some(prev) => sequence.wrapping_sub(prev) != 1,
428 None => false,
429 }
430 }
431}
432
433#[derive(Debug, Clone, PartialEq, Eq)]
435pub struct AccessUnit {
436 pub data: Bytes,
438 pub timestamp: u32,
440 pub keyframe: bool,
442}
443
444#[cfg(test)]
445mod tests {
446 use super::*;
447
448 fn rtp(seq: u16, ts: u32, marker: bool, payload: &[u8]) -> Vec<u8> {
450 let mut p = vec![0x80, if marker { 0x80 | 96 } else { 96 }];
451 p.extend_from_slice(&seq.to_be_bytes());
452 p.extend_from_slice(&ts.to_be_bytes());
453 p.extend_from_slice(&[0, 0, 0, 1]); p.extend_from_slice(payload);
455 p
456 }
457
458 #[test]
459 fn parses_fixed_header_and_payload_offset() {
460 let pkt = rtp(7, 9000, true, &[0x65, 0xAA]);
461 let h = RtpHeader::parse(&pkt).unwrap();
462 assert_eq!(h.sequence, 7);
463 assert_eq!(h.timestamp, 9000);
464 assert!(h.marker);
465 assert_eq!(h.payload_type, 96);
466 assert_eq!(h.payload_offset, 12);
467 assert_eq!(&pkt[h.payload_offset..], &[0x65, 0xAA]);
468 }
469
470 #[test]
471 fn rejects_wrong_version_and_short_buffers() {
472 assert!(RtpHeader::parse(&[0x00; 12]).is_none()); assert!(RtpHeader::parse(&[0x80; 4]).is_none()); }
475
476 #[test]
477 fn honors_csrc_count_in_payload_offset() {
478 let mut pkt = rtp(1, 0, false, &[0x41]);
479 pkt[0] = 0x82; let mut with_csrc = pkt[..12].to_vec();
481 with_csrc.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF, 0, 0, 0, 0]); with_csrc.push(0x41);
483 let h = RtpHeader::parse(&with_csrc).unwrap();
484 assert_eq!(h.payload_offset, 20);
485 }
486
487 #[test]
488 fn aac_hbr_splits_two_access_units() {
489 let mut p = Vec::new();
492 p.extend_from_slice(&32u16.to_be_bytes()); p.extend_from_slice(&((3u16) << 3).to_be_bytes()); p.extend_from_slice(&((2u16) << 3).to_be_bytes()); p.extend_from_slice(&[0xA1, 0xA2, 0xA3]); p.extend_from_slice(&[0xB1, 0xB2]); let aus = AacDepacketizer::new().push(&p).unwrap();
498 assert_eq!(aus.len(), 2);
499 assert_eq!(&aus[0][..], &[0xA1, 0xA2, 0xA3]);
500 assert_eq!(&aus[1][..], &[0xB1, 0xB2]);
501 }
502
503 #[test]
504 fn aac_hbr_single_au() {
505 let mut p = Vec::new();
506 p.extend_from_slice(&16u16.to_be_bytes()); p.extend_from_slice(&((4u16) << 3).to_be_bytes()); p.extend_from_slice(&[1, 2, 3, 4]);
509 let aus = AacDepacketizer::new().push(&p).unwrap();
510 assert_eq!(aus.len(), 1);
511 assert_eq!(&aus[0][..], &[1, 2, 3, 4]);
512 }
513
514 #[test]
515 fn aac_truncated_payload_errors() {
516 assert_eq!(
517 AacDepacketizer::new().push(&[0x00]),
518 Err(DepacketizeError::Truncated)
519 );
520 let mut p = 16u16.to_be_bytes().to_vec();
522 p.extend_from_slice(&((8u16) << 3).to_be_bytes());
523 p.extend_from_slice(&[1, 2]);
524 assert_eq!(
525 AacDepacketizer::new().push(&p),
526 Err(DepacketizeError::Truncated)
527 );
528 }
529
530 #[test]
531 fn single_nal_packet_emits_annexb_on_marker() {
532 let mut d = H264Depacketizer::new();
533 let out = d.push(&[0x41, 0x9A, 0xBC], true, 3000, 1).unwrap().unwrap();
535 assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x41, 0x9A, 0xBC]);
536 assert!(!out.keyframe);
537 assert_eq!(out.timestamp, 3000);
538 }
539
540 #[test]
541 fn idr_single_nal_is_flagged_keyframe() {
542 let mut d = H264Depacketizer::new();
543 let out = d.push(&[0x65, 0x01], true, 0, 1).unwrap().unwrap();
544 assert!(out.keyframe);
545 }
546
547 #[test]
548 fn packetizer_single_nal_round_trips_through_depacketizer() {
549 let au = [0, 0, 0, 1, 0x67, 0x42, 0x00, 0, 0, 0, 1, 0x65, 0x88, 0x99];
551 let mut pkt = RtpPacketizer::new(96, 0xABCD, 1200);
552 let packets = pkt.packetize(&au, 3000);
553 assert_eq!(packets.len(), 2, "one packet per NAL");
554
555 let mut depack = H264Depacketizer::new();
556 let mut out = None;
557 for p in &packets {
558 let h = RtpHeader::parse(p).unwrap();
559 if let Some(au) = depack
560 .push(&p[h.payload_offset..], h.marker, h.timestamp, h.sequence)
561 .unwrap()
562 {
563 out = Some(au);
564 }
565 }
566 let out = out.expect("AU completed on the marker packet");
567 assert_eq!(&out.data[..], &au);
568 assert!(out.keyframe);
569 assert_eq!(out.timestamp, 3000);
570 }
571
572 #[test]
573 fn packetizer_fragments_oversized_nal_and_round_trips() {
574 let mut nal = vec![0, 0, 0, 1, 0x65]; nal.extend((0..600u16).map(|i| i as u8)); let mut pkt = RtpPacketizer::new(96, 1, 100); let packets = pkt.packetize(&nal, 90);
579 assert!(packets.len() > 1, "oversized NAL is fragmented");
580 let markers: Vec<bool> = packets
582 .iter()
583 .map(|p| RtpHeader::parse(p).unwrap().marker)
584 .collect();
585 assert_eq!(markers.iter().filter(|m| **m).count(), 1);
586 assert!(markers.last().unwrap());
587
588 let mut depack = H264Depacketizer::new();
589 let mut out = None;
590 for p in &packets {
591 let h = RtpHeader::parse(p).unwrap();
592 if let Some(au) = depack
593 .push(&p[h.payload_offset..], h.marker, h.timestamp, h.sequence)
594 .unwrap()
595 {
596 out = Some(au);
597 }
598 }
599 assert_eq!(&out.unwrap().data[..], &nal[..]);
600 }
601
602 #[test]
603 fn stap_a_splits_aggregated_nals() {
604 let payload = [24, 0, 2, 0xAA, 0xBB, 0, 3, 0xCC, 0xDD, 0xEE];
606 let mut d = H264Depacketizer::new();
607 let out = d.push(&payload, true, 0, 1).unwrap().unwrap();
608 assert_eq!(
609 &out.data[..],
610 &[0, 0, 0, 1, 0xAA, 0xBB, 0, 0, 0, 1, 0xCC, 0xDD, 0xEE]
611 );
612 }
613
614 #[test]
615 fn fu_a_reassembles_fragmented_nal() {
616 let mut d = H264Depacketizer::new();
617 assert!(d
619 .push(&[0x7C, 0x85, 0x11, 0x22], false, 0, 1)
620 .unwrap()
621 .is_none());
622 assert!(d.push(&[0x7C, 0x05, 0x33], false, 0, 2).unwrap().is_none());
624 let out = d.push(&[0x7C, 0x45, 0x44], true, 0, 3).unwrap().unwrap();
626 assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x65, 0x11, 0x22, 0x33, 0x44]);
628 assert!(out.keyframe);
629 }
630
631 #[test]
632 fn fu_a_sequence_gap_reports_out_of_order() {
633 let mut d = H264Depacketizer::new();
634 d.push(&[0x7C, 0x85, 0x11], false, 0, 1).unwrap();
635 assert_eq!(
637 d.push(&[0x7C, 0x05, 0x22], false, 0, 5),
638 Err(DepacketizeError::OutOfOrder)
639 );
640 }
641
642 #[test]
643 fn timestamp_change_flushes_previous_au_without_marker() {
644 let mut d = H264Depacketizer::new();
645 assert!(d.push(&[0x41, 0x01], false, 1000, 1).unwrap().is_none());
647 let out = d.push(&[0x41, 0x02], false, 2000, 2).unwrap().unwrap();
649 assert_eq!(out.timestamp, 1000);
650 assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x41, 0x01]);
651 }
652}