1use bytes::Bytes;
34
35const ANNEXB_START: [u8; 4] = [0, 0, 0, 1];
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub struct RtpHeader {
41 pub payload_type: u8,
43 pub marker: bool,
45 pub sequence: u16,
47 pub timestamp: u32,
49 pub ssrc: u32,
51 pub payload_offset: usize,
53}
54
55impl RtpHeader {
56 pub fn parse(buf: &[u8]) -> Option<RtpHeader> {
60 use super::byteops::ByteReader;
61 let mut r = ByteReader::new(buf);
62 let b0 = r.u8()?;
63 if b0 >> 6 != 2 {
64 return None; }
66 let has_extension = b0 & 0x10 != 0;
67 let csrc_count = (b0 & 0x0F) as usize;
68 let b1 = r.u8()?;
69 let marker = b1 & 0x80 != 0;
70 let payload_type = b1 & 0x7F;
71 let sequence = r.u16_be()?;
72 let timestamp = r.u32_be()?;
73 let ssrc = r.u32_be()?;
74 r.skip(csrc_count * 4)?; if has_extension {
77 r.skip(2)?;
79 let ext_words = r.u16_be()? as usize;
80 r.skip(ext_words * 4)?;
81 }
82 Some(RtpHeader {
83 payload_type,
84 marker,
85 sequence,
86 timestamp,
87 ssrc,
88 payload_offset: r.position(),
89 })
90 }
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
95#[non_exhaustive]
96pub enum DepacketizeError {
97 Truncated,
99 OutOfOrder,
102 Unsupported(u8),
104}
105
106#[derive(Debug, Clone, Copy, Default)]
115pub struct AacDepacketizer {
116 size_length: u8,
118 index_length: u8,
120}
121
122impl AacDepacketizer {
123 pub fn new() -> Self {
126 Self {
127 size_length: 13,
128 index_length: 3,
129 }
130 }
131
132 pub fn with_lengths(size_length: u8, index_length: u8) -> Self {
134 Self {
135 size_length,
136 index_length,
137 }
138 }
139
140 pub fn push(&self, payload: &[u8]) -> Result<Vec<Bytes>, DepacketizeError> {
142 if payload.len() < 2 {
143 return Err(DepacketizeError::Truncated);
144 }
145 if self.size_length == 0 || self.size_length > 16 {
148 return Err(DepacketizeError::Unsupported(self.size_length));
149 }
150 let header_bits = u16::from_be_bytes([payload[0], payload[1]]) as usize;
151 let au_header_bits = self.size_length as usize + self.index_length as usize;
152 if au_header_bits == 0 {
153 return Err(DepacketizeError::Unsupported(0));
154 }
155 let header_bytes = header_bits.div_ceil(8);
156 let au_count = header_bits / au_header_bits;
157 let headers = payload
158 .get(2..2 + header_bytes)
159 .ok_or(DepacketizeError::Truncated)?;
160 let mut data_off = 2 + header_bytes;
161 let mut out = Vec::with_capacity(au_count);
162 for i in 0..au_count {
163 let bit = i * au_header_bits;
166 let byte = bit / 8;
167 let hdr = headers
168 .get(byte..byte + 2)
169 .ok_or(DepacketizeError::Truncated)?;
170 let size = (u16::from_be_bytes([hdr[0], hdr[1]]) >> (16 - self.size_length)) as usize;
171 let end = data_off + size;
172 let au = payload
173 .get(data_off..end)
174 .ok_or(DepacketizeError::Truncated)?;
175 out.push(Bytes::copy_from_slice(au));
176 data_off = end;
177 }
178 Ok(out)
179 }
180}
181
182#[derive(Debug, Clone)]
189pub struct RtpPacketizer {
190 payload_type: u8,
191 ssrc: u32,
192 sequence: u16,
193 max_payload: usize,
195 codec: NalCodec,
198}
199
200#[derive(Debug, Clone, Copy, PartialEq, Eq)]
204enum NalCodec {
205 H264,
206 H265,
207}
208
209impl RtpPacketizer {
210 pub fn new(payload_type: u8, ssrc: u32, mtu: usize) -> Self {
214 Self::with_codec(payload_type, ssrc, mtu, NalCodec::H264)
215 }
216
217 pub fn new_h265(payload_type: u8, ssrc: u32, mtu: usize) -> Self {
220 Self::with_codec(payload_type, ssrc, mtu, NalCodec::H265)
221 }
222
223 fn with_codec(payload_type: u8, ssrc: u32, mtu: usize, codec: NalCodec) -> Self {
224 Self {
225 payload_type,
226 ssrc,
227 sequence: 0,
228 max_payload: mtu.saturating_sub(12).max(1),
229 codec,
230 }
231 }
232
233 fn header(&mut self, marker: bool, timestamp: u32, out: &mut Vec<u8>) {
235 write_rtp_header(
236 out,
237 self.payload_type,
238 marker,
239 self.sequence,
240 timestamp,
241 self.ssrc,
242 );
243 self.sequence = self.sequence.wrapping_add(1);
244 }
245
246 pub fn packetize(&mut self, access_unit: &[u8], timestamp: u32) -> Vec<Vec<u8>> {
252 let nals: Vec<&[u8]> = crate::codec::nal::iter_nals(access_unit)
255 .filter(|n| !n.is_empty())
256 .collect();
257 let mut packets = Vec::new();
258 for (i, nal) in nals.iter().enumerate() {
259 let last_nal = i + 1 == nals.len();
260 if nal.len() <= self.max_payload {
261 let mut pkt = Vec::with_capacity(12 + nal.len());
263 self.header(last_nal, timestamp, &mut pkt);
264 pkt.extend_from_slice(nal);
265 packets.push(pkt);
266 } else {
267 match self.codec {
268 NalCodec::H264 => self.fragment_fua(nal, timestamp, last_nal, &mut packets),
269 NalCodec::H265 => self.fragment_fu_h265(nal, timestamp, last_nal, &mut packets),
270 }
271 }
272 }
273 packets
274 }
275
276 fn fragment_fua(&mut self, nal: &[u8], timestamp: u32, last_nal: bool, out: &mut Vec<Vec<u8>>) {
278 let nal_header = nal[0];
279 let fu_indicator = (nal_header & 0xE0) | 28; let nal_type = nal_header & 0x1F;
281 let body = &nal[1..];
282 let chunk = self.max_payload.saturating_sub(2).max(1);
284 let n_chunks = body.len().div_ceil(chunk);
285 for (idx, part) in body.chunks(chunk).enumerate() {
286 let start = idx == 0;
287 let end = idx + 1 == n_chunks;
288 let mut fu_header = nal_type;
289 if start {
290 fu_header |= 0x80;
291 }
292 if end {
293 fu_header |= 0x40;
294 }
295 let mut pkt = Vec::with_capacity(12 + 2 + part.len());
296 self.header(last_nal && end, timestamp, &mut pkt);
298 pkt.push(fu_indicator);
299 pkt.push(fu_header);
300 pkt.extend_from_slice(part);
301 out.push(pkt);
302 }
303 }
304
305 fn fragment_fu_h265(
311 &mut self,
312 nal: &[u8],
313 timestamp: u32,
314 last_nal: bool,
315 out: &mut Vec<Vec<u8>>,
316 ) {
317 if nal.len() < 2 {
320 let mut pkt = Vec::with_capacity(12 + nal.len());
321 self.header(last_nal, timestamp, &mut pkt);
322 pkt.extend_from_slice(nal);
323 out.push(pkt);
324 return;
325 }
326 let nal_type = (nal[0] >> 1) & 0x3F;
327 let payload_hdr0 = (nal[0] & 0x81) | (49 << 1);
330 let payload_hdr1 = nal[1];
331 let body = &nal[2..];
332 let chunk = self.max_payload.saturating_sub(3).max(1);
334 let n_chunks = body.len().div_ceil(chunk);
335 for (idx, part) in body.chunks(chunk).enumerate() {
336 let start = idx == 0;
337 let end = idx + 1 == n_chunks;
338 let mut fu_header = nal_type;
339 if start {
340 fu_header |= 0x80;
341 }
342 if end {
343 fu_header |= 0x40;
344 }
345 let mut pkt = Vec::with_capacity(12 + 3 + part.len());
346 self.header(last_nal && end, timestamp, &mut pkt);
347 pkt.push(payload_hdr0);
348 pkt.push(payload_hdr1);
349 pkt.push(fu_header);
350 pkt.extend_from_slice(part);
351 out.push(pkt);
352 }
353 }
354}
355
356#[derive(Debug, Default)]
364pub struct H264Depacketizer {
365 au: Vec<u8>,
367 fua: Vec<u8>,
369 in_fragment: bool,
371 fua_header: u8,
373 current_ts: Option<u32>,
375 last_seq: Option<u16>,
377}
378
379impl H264Depacketizer {
380 pub fn new() -> Self {
382 Self::default()
383 }
384
385 fn append_nal(&mut self, nal: &[u8]) {
387 self.au.extend_from_slice(&ANNEXB_START);
388 self.au.extend_from_slice(nal);
389 }
390
391 fn pending_is_keyframe(&self) -> bool {
393 let mut i = 0;
395 while i + 4 < self.au.len() {
396 if self.au[i..i + 4] == ANNEXB_START {
397 let nal_type = self.au[i + 4] & 0x1F;
398 if nal_type == 5 {
399 return true;
400 }
401 }
402 i += 1;
403 }
404 false
405 }
406
407 fn take_au(&mut self) -> Option<AccessUnit> {
409 if self.au.is_empty() {
410 return None;
411 }
412 let keyframe = self.pending_is_keyframe();
413 let timestamp = self.current_ts.unwrap_or(0);
414 let data = Bytes::from(std::mem::take(&mut self.au));
415 self.current_ts = None;
416 Some(AccessUnit {
417 data,
418 timestamp,
419 keyframe,
420 })
421 }
422
423 pub fn push(
426 &mut self,
427 payload: &[u8],
428 marker: bool,
429 timestamp: u32,
430 sequence: u16,
431 ) -> Result<Option<AccessUnit>, DepacketizeError> {
432 if payload.is_empty() {
433 return Err(DepacketizeError::Truncated);
434 }
435
436 let mut completed = None;
439 if let Some(ts) = self.current_ts {
440 if ts != timestamp && !self.in_fragment {
441 completed = self.take_au();
442 }
443 }
444 self.current_ts = Some(timestamp);
445
446 let nal_type = payload[0] & 0x1F;
447 match nal_type {
448 1..=23 => {
449 self.append_nal(payload);
451 }
452 24 => {
453 let mut i = 1;
455 while i + 2 <= payload.len() {
456 let size = u16::from_be_bytes([payload[i], payload[i + 1]]) as usize;
457 i += 2;
458 if i + size > payload.len() {
459 return Err(DepacketizeError::Truncated);
460 }
461 self.append_nal(&payload[i..i + size]);
462 i += size;
463 }
464 }
465 28 => {
466 if payload.len() < 2 {
468 return Err(DepacketizeError::Truncated);
469 }
470 let fu_header = payload[1];
471 let start = fu_header & 0x80 != 0;
472 let end = fu_header & 0x40 != 0;
473 let frag_type = fu_header & 0x1F;
474
475 if start {
476 self.fua_header = (payload[0] & 0xE0) | frag_type;
479 self.fua.clear();
480 self.fua.push(self.fua_header);
481 self.in_fragment = true;
482 } else if !self.in_fragment {
483 return Err(DepacketizeError::OutOfOrder);
485 } else if self.seq_gap(sequence) {
486 self.in_fragment = false;
487 self.fua.clear();
488 return Err(DepacketizeError::OutOfOrder);
489 }
490 self.fua.extend_from_slice(&payload[2..]);
491
492 if end && self.in_fragment {
493 let nal = std::mem::take(&mut self.fua);
494 self.append_nal(&nal);
495 self.in_fragment = false;
496 }
497 }
498 other => return Err(DepacketizeError::Unsupported(other)),
499 }
500
501 self.last_seq = Some(sequence);
502
503 if completed.is_some() {
504 return Ok(completed);
505 }
506 if marker {
507 return Ok(self.take_au());
508 }
509 Ok(None)
510 }
511
512 fn seq_gap(&self, sequence: u16) -> bool {
514 match self.last_seq {
515 Some(prev) => sequence.wrapping_sub(prev) != 1,
516 None => false,
517 }
518 }
519}
520
521#[derive(Debug, Default)]
532pub struct H265Depacketizer {
533 au: Vec<u8>,
535 fu: Vec<u8>,
537 in_fragment: bool,
539 current_ts: Option<u32>,
541 last_seq: Option<u16>,
543}
544
545impl H265Depacketizer {
546 pub fn new() -> Self {
548 Self::default()
549 }
550
551 fn append_nal(&mut self, nal: &[u8]) {
553 self.au.extend_from_slice(&ANNEXB_START);
554 self.au.extend_from_slice(nal);
555 }
556
557 fn pending_is_keyframe(&self) -> bool {
560 let mut i = 0;
561 while i + 4 < self.au.len() {
562 if self.au[i..i + 4] == ANNEXB_START {
563 let nal_type = (self.au[i + 4] >> 1) & 0x3F;
564 if (16..=23).contains(&nal_type) {
565 return true;
566 }
567 }
568 i += 1;
569 }
570 false
571 }
572
573 fn take_au(&mut self) -> Option<AccessUnit> {
575 if self.au.is_empty() {
576 return None;
577 }
578 let keyframe = self.pending_is_keyframe();
579 let timestamp = self.current_ts.unwrap_or(0);
580 let data = Bytes::from(std::mem::take(&mut self.au));
581 self.current_ts = None;
582 Some(AccessUnit {
583 data,
584 timestamp,
585 keyframe,
586 })
587 }
588
589 fn seq_gap(&self, sequence: u16) -> bool {
591 match self.last_seq {
592 Some(prev) => sequence.wrapping_sub(prev) != 1,
593 None => false,
594 }
595 }
596
597 pub fn push(
600 &mut self,
601 payload: &[u8],
602 marker: bool,
603 timestamp: u32,
604 sequence: u16,
605 ) -> Result<Option<AccessUnit>, DepacketizeError> {
606 if payload.len() < 2 {
608 return Err(DepacketizeError::Truncated);
609 }
610
611 let mut completed = None;
614 if let Some(ts) = self.current_ts {
615 if ts != timestamp && !self.in_fragment {
616 completed = self.take_au();
617 }
618 }
619 self.current_ts = Some(timestamp);
620
621 let nal_type = (payload[0] >> 1) & 0x3F;
622 match nal_type {
623 0..=47 => self.append_nal(payload),
625 48 => {
626 let mut i = 2;
628 while i + 2 <= payload.len() {
629 let size = u16::from_be_bytes([payload[i], payload[i + 1]]) as usize;
630 i += 2;
631 if i + size > payload.len() {
632 return Err(DepacketizeError::Truncated);
633 }
634 self.append_nal(&payload[i..i + size]);
635 i += size;
636 }
637 }
638 49 => {
639 if payload.len() < 3 {
641 return Err(DepacketizeError::Truncated);
642 }
643 let fu_header = payload[2];
644 let start = fu_header & 0x80 != 0;
645 let end = fu_header & 0x40 != 0;
646 let fu_type = fu_header & 0x3F;
647
648 if start {
649 let hdr0 = (payload[0] & 0x81) | (fu_type << 1);
652 let hdr1 = payload[1];
653 self.fu.clear();
654 self.fu.push(hdr0);
655 self.fu.push(hdr1);
656 self.in_fragment = true;
657 } else if !self.in_fragment {
658 return Err(DepacketizeError::OutOfOrder);
659 } else if self.seq_gap(sequence) {
660 self.in_fragment = false;
661 self.fu.clear();
662 return Err(DepacketizeError::OutOfOrder);
663 }
664 self.fu.extend_from_slice(&payload[3..]);
665
666 if end && self.in_fragment {
667 let nal = std::mem::take(&mut self.fu);
668 self.append_nal(&nal);
669 self.in_fragment = false;
670 }
671 }
672 other => return Err(DepacketizeError::Unsupported(other)),
673 }
674
675 self.last_seq = Some(sequence);
676
677 if completed.is_some() {
678 return Ok(completed);
679 }
680 if marker {
681 return Ok(self.take_au());
682 }
683 Ok(None)
684 }
685}
686
687#[derive(Debug, Clone, PartialEq, Eq)]
693pub struct AccessUnit {
694 pub data: Bytes,
696 pub timestamp: u32,
698 pub keyframe: bool,
700}
701
702fn write_rtp_header(out: &mut Vec<u8>, pt: u8, marker: bool, seq: u16, ts: u32, ssrc: u32) {
705 out.push(0x80); out.push(if marker { 0x80 } else { 0 } | (pt & 0x7F));
707 out.extend_from_slice(&seq.to_be_bytes());
708 out.extend_from_slice(&ts.to_be_bytes());
709 out.extend_from_slice(&ssrc.to_be_bytes());
710}
711
712#[derive(Debug, Clone)]
723pub struct Vp9Packetizer {
724 payload_type: u8,
725 ssrc: u32,
726 sequence: u16,
727 max_payload: usize,
728 picture_id: u16,
729}
730
731impl Vp9Packetizer {
732 pub fn new(payload_type: u8, ssrc: u32, mtu: usize) -> Self {
735 Self {
736 payload_type,
737 ssrc,
738 sequence: 0,
739 max_payload: mtu.saturating_sub(12 + 3).max(1),
741 picture_id: 0,
742 }
743 }
744
745 pub fn packetize(&mut self, frame: &[u8], timestamp: u32, keyframe: bool) -> Vec<Vec<u8>> {
748 let pid = self.picture_id & 0x7FFF;
749 self.picture_id = self.picture_id.wrapping_add(1);
750
751 let mut packets = Vec::new();
752 let chunks: Vec<&[u8]> = if frame.is_empty() {
753 vec![&[]]
754 } else {
755 frame.chunks(self.max_payload).collect()
756 };
757 let n = chunks.len();
758 for (i, chunk) in chunks.into_iter().enumerate() {
759 let begin = i == 0;
760 let end = i + 1 == n;
761 let mut pkt = Vec::with_capacity(12 + 3 + chunk.len());
762 write_rtp_header(
763 &mut pkt,
764 self.payload_type,
765 end,
766 self.sequence,
767 timestamp,
768 self.ssrc,
769 );
770 self.sequence = self.sequence.wrapping_add(1);
771
772 let mut desc0 = 0x80; if !keyframe {
775 desc0 |= 0x40; }
777 if begin {
778 desc0 |= 0x08; }
780 if end {
781 desc0 |= 0x04; }
783 pkt.push(desc0);
784 pkt.push(0x80 | (pid >> 8) as u8);
786 pkt.push((pid & 0xFF) as u8);
787 pkt.extend_from_slice(chunk);
788 packets.push(pkt);
789 }
790 packets
791 }
792}
793
794#[derive(Debug, Default)]
797pub struct Vp9Depacketizer {
798 frame: Vec<u8>,
799 in_frame: bool,
800 keyframe: bool,
801 current_ts: Option<u32>,
802}
803
804impl Vp9Depacketizer {
805 pub fn new() -> Self {
807 Self::default()
808 }
809
810 pub fn push(
813 &mut self,
814 payload: &[u8],
815 marker: bool,
816 timestamp: u32,
817 ) -> Result<Option<AccessUnit>, DepacketizeError> {
818 if payload.is_empty() {
819 return Err(DepacketizeError::Truncated);
820 }
821 let desc0 = payload[0];
822 let has_pid = desc0 & 0x80 != 0;
823 let has_layer = desc0 & 0x20 != 0;
824 let flexible = desc0 & 0x10 != 0;
825 let begin = desc0 & 0x08 != 0;
826 let end = desc0 & 0x04 != 0;
827 let predicted = desc0 & 0x40 != 0;
828
829 let mut off = 1;
831 if has_pid {
832 let m = payload.get(off).ok_or(DepacketizeError::Truncated)? & 0x80 != 0;
834 off += if m { 2 } else { 1 };
835 }
836 if has_layer {
837 off += 1; if !flexible {
839 off += 1; }
841 }
842 if off > payload.len() {
843 return Err(DepacketizeError::Truncated);
844 }
845
846 if begin {
847 self.frame.clear();
848 self.in_frame = true;
849 self.keyframe = !predicted;
850 self.current_ts = Some(timestamp);
851 } else if !self.in_frame {
852 return Err(DepacketizeError::OutOfOrder);
853 }
854 self.frame.extend_from_slice(&payload[off..]);
855
856 if end && marker && self.in_frame {
857 self.in_frame = false;
858 return Ok(Some(AccessUnit {
859 data: Bytes::from(std::mem::take(&mut self.frame)),
860 timestamp: self.current_ts.unwrap_or(timestamp),
861 keyframe: self.keyframe,
862 }));
863 }
864 Ok(None)
865 }
866}
867
868#[cfg(feature = "codec-av1")]
872fn leb128_encode(mut v: u64, out: &mut Vec<u8>) {
873 loop {
874 let mut byte = (v & 0x7F) as u8;
875 v >>= 7;
876 if v != 0 {
877 byte |= 0x80;
878 }
879 out.push(byte);
880 if v == 0 {
881 break;
882 }
883 }
884}
885
886const AV1_OBU_SEQUENCE_HEADER: u8 = 1;
887const AV1_OBU_TEMPORAL_DELIMITER: u8 = 2;
888
889#[cfg(feature = "codec-av1")]
899#[derive(Debug, Clone)]
900pub struct Av1Packetizer {
901 payload_type: u8,
902 ssrc: u32,
903 sequence: u16,
904 max_payload: usize,
905}
906
907#[cfg(feature = "codec-av1")]
908impl Av1Packetizer {
909 pub fn new(payload_type: u8, ssrc: u32, mtu: usize) -> Self {
912 Self {
913 payload_type,
914 ssrc,
915 sequence: 0,
916 max_payload: mtu.saturating_sub(12 + 1).max(1),
917 }
918 }
919
920 pub fn packetize(&mut self, temporal_unit: &[u8], timestamp: u32) -> Vec<Vec<u8>> {
922 let mut stream = Vec::with_capacity(temporal_unit.len());
925 let mut new_cvs = false;
926 for obu in crate::codec::obu::iter_obus(temporal_unit) {
927 if obu.obu_type == AV1_OBU_TEMPORAL_DELIMITER {
928 continue;
929 }
930 if obu.obu_type == AV1_OBU_SEQUENCE_HEADER {
931 new_cvs = true;
932 }
933 let header_len = 1 + obu.has_extension as usize;
934 let mut element = Vec::with_capacity(header_len + obu.payload.len());
935 element.push(obu.raw[0] & !0x02); if obu.has_extension {
937 element.push(obu.raw[1]);
938 }
939 element.extend_from_slice(obu.payload);
940 leb128_encode(element.len() as u64, &mut stream);
941 stream.extend_from_slice(&element);
942 }
943
944 let mut packets = Vec::new();
945 let chunks: Vec<&[u8]> = if stream.is_empty() {
946 vec![&[]]
947 } else {
948 stream.chunks(self.max_payload).collect()
949 };
950 let n = chunks.len();
951 for (i, chunk) in chunks.into_iter().enumerate() {
952 let last = i + 1 == n;
953 let mut pkt = Vec::with_capacity(12 + 1 + chunk.len());
954 write_rtp_header(
955 &mut pkt,
956 self.payload_type,
957 last,
958 self.sequence,
959 timestamp,
960 self.ssrc,
961 );
962 self.sequence = self.sequence.wrapping_add(1);
963
964 let mut agg = 0u8;
967 if i > 0 {
968 agg |= 0x80; }
970 if !last {
971 agg |= 0x40; }
973 if i == 0 && new_cvs {
974 agg |= 0x08; }
976 pkt.push(agg);
977 pkt.extend_from_slice(chunk);
978 packets.push(pkt);
979 }
980 packets
981 }
982}
983
984#[cfg(feature = "codec-av1")]
989#[derive(Debug, Default)]
990pub struct Av1Depacketizer {
991 stream: Vec<u8>,
992 new_cvs: bool,
993 current_ts: Option<u32>,
994}
995
996#[cfg(feature = "codec-av1")]
997impl Av1Depacketizer {
998 pub fn new() -> Self {
1000 Self::default()
1001 }
1002
1003 pub fn push(
1006 &mut self,
1007 payload: &[u8],
1008 marker: bool,
1009 timestamp: u32,
1010 ) -> Result<Option<AccessUnit>, DepacketizeError> {
1011 if payload.is_empty() {
1012 return Err(DepacketizeError::Truncated);
1013 }
1014 let agg = payload[0];
1015 if agg & 0x08 != 0 {
1016 self.new_cvs = true; }
1018 if self.current_ts.is_none() {
1019 self.current_ts = Some(timestamp);
1020 }
1021 self.stream.extend_from_slice(&payload[1..]);
1022
1023 if !marker {
1024 return Ok(None);
1025 }
1026
1027 let stream = std::mem::take(&mut self.stream);
1029 let mut tu = Vec::with_capacity(stream.len() + 8);
1030 let mut pos = 0;
1031 while pos < stream.len() {
1032 let len = leb128_decode(&stream, &mut pos).ok_or(DepacketizeError::Truncated)?;
1033 let end = pos.checked_add(len).ok_or(DepacketizeError::Truncated)?;
1034 let element = stream.get(pos..end).ok_or(DepacketizeError::Truncated)?;
1035 pos = end;
1036 let hdr0 = *element.first().ok_or(DepacketizeError::Truncated)?;
1038 let has_ext = (hdr0 >> 2) & 1 == 1;
1039 let header_len = 1 + has_ext as usize;
1040 let obu_payload = element
1041 .get(header_len..)
1042 .ok_or(DepacketizeError::Truncated)?;
1043 tu.push(hdr0 | 0x02);
1044 if has_ext {
1045 tu.push(element[1]);
1046 }
1047 leb128_encode(obu_payload.len() as u64, &mut tu);
1048 tu.extend_from_slice(obu_payload);
1049 }
1050
1051 let keyframe = std::mem::take(&mut self.new_cvs);
1052 let ts = self.current_ts.take().unwrap_or(timestamp);
1053 Ok(Some(AccessUnit {
1054 data: Bytes::from(tu),
1055 timestamp: ts,
1056 keyframe,
1057 }))
1058 }
1059}
1060
1061#[cfg(feature = "codec-av1")]
1064fn leb128_decode(data: &[u8], pos: &mut usize) -> Option<usize> {
1065 let mut value: u64 = 0;
1066 for i in 0..8 {
1067 let byte = *data.get(*pos)?;
1068 *pos += 1;
1069 value |= ((byte & 0x7F) as u64) << (i * 7);
1070 if byte & 0x80 == 0 {
1071 return usize::try_from(value).ok();
1072 }
1073 }
1074 None
1075}
1076
1077#[cfg(test)]
1078mod tests {
1079 use super::*;
1080
1081 fn rtp(seq: u16, ts: u32, marker: bool, payload: &[u8]) -> Vec<u8> {
1083 let mut p = vec![0x80, if marker { 0x80 | 96 } else { 96 }];
1084 p.extend_from_slice(&seq.to_be_bytes());
1085 p.extend_from_slice(&ts.to_be_bytes());
1086 p.extend_from_slice(&[0, 0, 0, 1]); p.extend_from_slice(payload);
1088 p
1089 }
1090
1091 #[test]
1092 fn parses_fixed_header_and_payload_offset() {
1093 let pkt = rtp(7, 9000, true, &[0x65, 0xAA]);
1094 let h = RtpHeader::parse(&pkt).unwrap();
1095 assert_eq!(h.sequence, 7);
1096 assert_eq!(h.timestamp, 9000);
1097 assert!(h.marker);
1098 assert_eq!(h.payload_type, 96);
1099 assert_eq!(h.payload_offset, 12);
1100 assert_eq!(&pkt[h.payload_offset..], &[0x65, 0xAA]);
1101 }
1102
1103 #[test]
1104 fn rejects_wrong_version_and_short_buffers() {
1105 assert!(RtpHeader::parse(&[0x00; 12]).is_none()); assert!(RtpHeader::parse(&[0x80; 4]).is_none()); }
1108
1109 #[test]
1110 fn honors_csrc_count_in_payload_offset() {
1111 let mut pkt = rtp(1, 0, false, &[0x41]);
1112 pkt[0] = 0x82; let mut with_csrc = pkt[..12].to_vec();
1114 with_csrc.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF, 0, 0, 0, 0]); with_csrc.push(0x41);
1116 let h = RtpHeader::parse(&with_csrc).unwrap();
1117 assert_eq!(h.payload_offset, 20);
1118 }
1119
1120 #[test]
1121 fn aac_hbr_splits_two_access_units() {
1122 let mut p = Vec::new();
1125 p.extend_from_slice(&32u16.to_be_bytes()); p.extend_from_slice(&((3u16) << 3).to_be_bytes()); p.extend_from_slice(&((2u16) << 3).to_be_bytes()); p.extend_from_slice(&[0xA1, 0xA2, 0xA3]); p.extend_from_slice(&[0xB1, 0xB2]); let aus = AacDepacketizer::new().push(&p).unwrap();
1131 assert_eq!(aus.len(), 2);
1132 assert_eq!(&aus[0][..], &[0xA1, 0xA2, 0xA3]);
1133 assert_eq!(&aus[1][..], &[0xB1, 0xB2]);
1134 }
1135
1136 #[test]
1137 fn aac_hbr_single_au() {
1138 let mut p = Vec::new();
1139 p.extend_from_slice(&16u16.to_be_bytes()); p.extend_from_slice(&((4u16) << 3).to_be_bytes()); p.extend_from_slice(&[1, 2, 3, 4]);
1142 let aus = AacDepacketizer::new().push(&p).unwrap();
1143 assert_eq!(aus.len(), 1);
1144 assert_eq!(&aus[0][..], &[1, 2, 3, 4]);
1145 }
1146
1147 #[test]
1148 fn aac_truncated_payload_errors() {
1149 assert_eq!(
1150 AacDepacketizer::new().push(&[0x00]),
1151 Err(DepacketizeError::Truncated)
1152 );
1153 let mut p = 16u16.to_be_bytes().to_vec();
1155 p.extend_from_slice(&((8u16) << 3).to_be_bytes());
1156 p.extend_from_slice(&[1, 2]);
1157 assert_eq!(
1158 AacDepacketizer::new().push(&p),
1159 Err(DepacketizeError::Truncated)
1160 );
1161 }
1162
1163 #[test]
1164 fn single_nal_packet_emits_annexb_on_marker() {
1165 let mut d = H264Depacketizer::new();
1166 let out = d.push(&[0x41, 0x9A, 0xBC], true, 3000, 1).unwrap().unwrap();
1168 assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x41, 0x9A, 0xBC]);
1169 assert!(!out.keyframe);
1170 assert_eq!(out.timestamp, 3000);
1171 }
1172
1173 #[test]
1174 fn idr_single_nal_is_flagged_keyframe() {
1175 let mut d = H264Depacketizer::new();
1176 let out = d.push(&[0x65, 0x01], true, 0, 1).unwrap().unwrap();
1177 assert!(out.keyframe);
1178 }
1179
1180 #[test]
1181 fn packetizer_single_nal_round_trips_through_depacketizer() {
1182 let au = [0, 0, 0, 1, 0x67, 0x42, 0x00, 0, 0, 0, 1, 0x65, 0x88, 0x99];
1184 let mut pkt = RtpPacketizer::new(96, 0xABCD, 1200);
1185 let packets = pkt.packetize(&au, 3000);
1186 assert_eq!(packets.len(), 2, "one packet per NAL");
1187
1188 let mut depack = H264Depacketizer::new();
1189 let mut out = None;
1190 for p in &packets {
1191 let h = RtpHeader::parse(p).unwrap();
1192 if let Some(au) = depack
1193 .push(&p[h.payload_offset..], h.marker, h.timestamp, h.sequence)
1194 .unwrap()
1195 {
1196 out = Some(au);
1197 }
1198 }
1199 let out = out.expect("AU completed on the marker packet");
1200 assert_eq!(&out.data[..], &au);
1201 assert!(out.keyframe);
1202 assert_eq!(out.timestamp, 3000);
1203 }
1204
1205 #[test]
1206 fn packetizer_fragments_oversized_nal_and_round_trips() {
1207 let mut nal = vec![0, 0, 0, 1, 0x65]; nal.extend((0..600u16).map(|i| i as u8)); let mut pkt = RtpPacketizer::new(96, 1, 100); let packets = pkt.packetize(&nal, 90);
1212 assert!(packets.len() > 1, "oversized NAL is fragmented");
1213 let markers: Vec<bool> = packets
1215 .iter()
1216 .map(|p| RtpHeader::parse(p).unwrap().marker)
1217 .collect();
1218 assert_eq!(markers.iter().filter(|m| **m).count(), 1);
1219 assert!(markers.last().unwrap());
1220
1221 let mut depack = H264Depacketizer::new();
1222 let mut out = None;
1223 for p in &packets {
1224 let h = RtpHeader::parse(p).unwrap();
1225 if let Some(au) = depack
1226 .push(&p[h.payload_offset..], h.marker, h.timestamp, h.sequence)
1227 .unwrap()
1228 {
1229 out = Some(au);
1230 }
1231 }
1232 assert_eq!(&out.unwrap().data[..], &nal[..]);
1233 }
1234
1235 #[test]
1236 fn stap_a_splits_aggregated_nals() {
1237 let payload = [24, 0, 2, 0xAA, 0xBB, 0, 3, 0xCC, 0xDD, 0xEE];
1239 let mut d = H264Depacketizer::new();
1240 let out = d.push(&payload, true, 0, 1).unwrap().unwrap();
1241 assert_eq!(
1242 &out.data[..],
1243 &[0, 0, 0, 1, 0xAA, 0xBB, 0, 0, 0, 1, 0xCC, 0xDD, 0xEE]
1244 );
1245 }
1246
1247 #[test]
1248 fn fu_a_reassembles_fragmented_nal() {
1249 let mut d = H264Depacketizer::new();
1250 assert!(d
1252 .push(&[0x7C, 0x85, 0x11, 0x22], false, 0, 1)
1253 .unwrap()
1254 .is_none());
1255 assert!(d.push(&[0x7C, 0x05, 0x33], false, 0, 2).unwrap().is_none());
1257 let out = d.push(&[0x7C, 0x45, 0x44], true, 0, 3).unwrap().unwrap();
1259 assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x65, 0x11, 0x22, 0x33, 0x44]);
1261 assert!(out.keyframe);
1262 }
1263
1264 #[test]
1265 fn fu_a_sequence_gap_reports_out_of_order() {
1266 let mut d = H264Depacketizer::new();
1267 d.push(&[0x7C, 0x85, 0x11], false, 0, 1).unwrap();
1268 assert_eq!(
1270 d.push(&[0x7C, 0x05, 0x22], false, 0, 5),
1271 Err(DepacketizeError::OutOfOrder)
1272 );
1273 }
1274
1275 #[test]
1276 fn timestamp_change_flushes_previous_au_without_marker() {
1277 let mut d = H264Depacketizer::new();
1278 assert!(d.push(&[0x41, 0x01], false, 1000, 1).unwrap().is_none());
1280 let out = d.push(&[0x41, 0x02], false, 2000, 2).unwrap().unwrap();
1282 assert_eq!(out.timestamp, 1000);
1283 assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x41, 0x01]);
1284 }
1285
1286 #[test]
1291 fn h265_single_nal_round_trips_through_depacketizer() {
1292 let au = [
1294 0, 0, 0, 1, 0x40, 0x01, 0xAA, 0, 0, 0, 1, 0x26, 0x01, 0x88, 0x99, ];
1297 let mut pkt = RtpPacketizer::new_h265(96, 0xABCD, 1200);
1298 let packets = pkt.packetize(&au, 3000);
1299 assert_eq!(packets.len(), 2, "one packet per NAL");
1300
1301 let mut depack = H265Depacketizer::new();
1302 let mut out = None;
1303 for p in &packets {
1304 let h = RtpHeader::parse(p).unwrap();
1305 if let Some(au) = depack
1306 .push(&p[h.payload_offset..], h.marker, h.timestamp, h.sequence)
1307 .unwrap()
1308 {
1309 out = Some(au);
1310 }
1311 }
1312 let out = out.expect("AU completed on the marker packet");
1313 assert_eq!(&out.data[..], &au);
1314 assert!(out.keyframe, "IRAP type 19 is a keyframe");
1315 assert_eq!(out.timestamp, 3000);
1316 }
1317
1318 #[test]
1319 fn h265_fragments_oversized_nal_and_round_trips() {
1320 let mut nal = vec![0, 0, 0, 1, 0x26, 0x01]; nal.extend((0..600u16).map(|i| i as u8));
1323 let mut pkt = RtpPacketizer::new_h265(96, 1, 100); let packets = pkt.packetize(&nal, 90);
1325 assert!(packets.len() > 1, "oversized NAL is fragmented");
1326 let markers: Vec<bool> = packets
1328 .iter()
1329 .map(|p| RtpHeader::parse(p).unwrap().marker)
1330 .collect();
1331 assert_eq!(markers.iter().filter(|m| **m).count(), 1);
1332 assert!(markers.last().unwrap());
1333 for p in &packets {
1335 let h = RtpHeader::parse(p).unwrap();
1336 let pt = (p[h.payload_offset] >> 1) & 0x3F;
1337 assert_eq!(pt, 49, "FU payload type");
1338 }
1339
1340 let mut depack = H265Depacketizer::new();
1341 let mut out = None;
1342 for p in &packets {
1343 let h = RtpHeader::parse(p).unwrap();
1344 if let Some(au) = depack
1345 .push(&p[h.payload_offset..], h.marker, h.timestamp, h.sequence)
1346 .unwrap()
1347 {
1348 out = Some(au);
1349 }
1350 }
1351 assert_eq!(&out.unwrap().data[..], &nal[..]);
1352 }
1353
1354 #[test]
1355 fn h265_ap_splits_aggregated_nals() {
1356 let payload = [0x60, 0x01, 0, 2, 0xAA, 0xBB, 0, 3, 0xCC, 0xDD, 0xEE];
1358 let mut d = H265Depacketizer::new();
1359 let out = d.push(&payload, true, 0, 1).unwrap().unwrap();
1360 assert_eq!(
1361 &out.data[..],
1362 &[0, 0, 0, 1, 0xAA, 0xBB, 0, 0, 0, 1, 0xCC, 0xDD, 0xEE]
1363 );
1364 }
1365
1366 #[test]
1367 fn h265_rejects_truncated_and_unsupported() {
1368 let mut d = H265Depacketizer::new();
1369 assert_eq!(
1371 d.push(&[0x26], true, 0, 1),
1372 Err(DepacketizeError::Truncated)
1373 );
1374 assert_eq!(
1376 d.push(&[50 << 1, 0x01, 0x00], true, 0, 2),
1377 Err(DepacketizeError::Unsupported(50))
1378 );
1379 }
1380
1381 fn vp9_depacketize(packets: &[Vec<u8>]) -> Option<AccessUnit> {
1384 let mut d = Vp9Depacketizer::new();
1385 let mut out = None;
1386 for p in packets {
1387 let h = RtpHeader::parse(p).unwrap();
1388 if let Some(f) = d
1389 .push(&p[h.payload_offset..], h.marker, h.timestamp)
1390 .unwrap()
1391 {
1392 out = Some(f);
1393 }
1394 }
1395 out
1396 }
1397
1398 #[test]
1399 fn vp9_fragmented_frame_round_trips() {
1400 let frame: Vec<u8> = (0..500u16).map(|i| i as u8).collect();
1401 let mut pkt = Vp9Packetizer::new(98, 0x1234, 100); let packets = pkt.packetize(&frame, 9000, true);
1403 assert!(packets.len() > 1, "frame fragmented");
1404
1405 let markers: Vec<bool> = packets
1407 .iter()
1408 .map(|p| RtpHeader::parse(p).unwrap().marker)
1409 .collect();
1410 assert_eq!(markers.iter().filter(|m| **m).count(), 1);
1411 assert!(markers.last().unwrap());
1412
1413 let out = vp9_depacketize(&packets).expect("frame completed");
1414 assert_eq!(&out.data[..], &frame[..]);
1415 assert!(out.keyframe, "keyframe → P bit clear");
1416 assert_eq!(out.timestamp, 9000);
1417 }
1418
1419 #[test]
1420 fn vp9_inter_frame_is_not_a_keyframe() {
1421 let mut pkt = Vp9Packetizer::new(98, 1, 1200);
1422 let packets = pkt.packetize(&[1, 2, 3], 0, false);
1423 assert_eq!(packets.len(), 1);
1424 let out = vp9_depacketize(&packets).expect("frame");
1425 assert_eq!(&out.data[..], &[1, 2, 3]);
1426 assert!(!out.keyframe, "P bit set → inter frame");
1427 }
1428
1429 #[cfg(feature = "codec-av1")]
1432 fn av1_depacketize(packets: &[Vec<u8>]) -> Option<AccessUnit> {
1433 let mut d = Av1Depacketizer::new();
1434 let mut out = None;
1435 for p in packets {
1436 let h = RtpHeader::parse(p).unwrap();
1437 if let Some(f) = d
1438 .push(&p[h.payload_offset..], h.marker, h.timestamp)
1439 .unwrap()
1440 {
1441 out = Some(f);
1442 }
1443 }
1444 out
1445 }
1446
1447 #[cfg(feature = "codec-av1")]
1448 #[test]
1449 fn av1_temporal_unit_round_trips_without_delimiter() {
1450 let td = [0x12u8, 0x00];
1452 let seq = [0x0Au8, 0x02, 0xAA, 0xBB];
1453 let frame = [0x32u8, 0x03, 0x11, 0x22, 0x33];
1454 let mut tu = Vec::new();
1455 tu.extend_from_slice(&td);
1456 tu.extend_from_slice(&seq);
1457 tu.extend_from_slice(&frame);
1458
1459 let mut pkt = Av1Packetizer::new(99, 7, 1200);
1460 let packets = pkt.packetize(&tu, 1000);
1461 let out = av1_depacketize(&packets).expect("TU completed");
1462
1463 let mut expected = Vec::new();
1465 expected.extend_from_slice(&seq);
1466 expected.extend_from_slice(&frame);
1467 assert_eq!(&out.data[..], &expected[..]);
1468 assert!(out.keyframe, "sequence header → new coded video sequence");
1469 assert_eq!(out.timestamp, 1000);
1470 }
1471
1472 #[cfg(feature = "codec-av1")]
1473 #[test]
1474 fn av1_large_temporal_unit_fragments_and_round_trips() {
1475 let mut frame = vec![0x32u8, 0xAC, 0x02];
1477 frame.extend((0..300u16).map(|i| i as u8));
1478 let mut tu = vec![0x12u8, 0x00]; tu.extend_from_slice(&frame);
1480
1481 let mut pkt = Av1Packetizer::new(99, 1, 64); let packets = pkt.packetize(&tu, 0);
1483 assert!(packets.len() > 1, "large TU fragmented");
1484 for (i, p) in packets.iter().enumerate() {
1486 let agg = p[RtpHeader::parse(p).unwrap().payload_offset];
1487 assert_eq!((agg & 0x80 != 0), i > 0, "Z continuation bit");
1488 assert_eq!(
1489 (agg & 0x40 != 0),
1490 i + 1 < packets.len(),
1491 "Y continuation bit"
1492 );
1493 }
1494
1495 let out = av1_depacketize(&packets).expect("TU completed");
1496 assert_eq!(&out.data[..], &frame[..], "frame OBU reconstructed");
1497 }
1498}