1use bytes::Bytes;
34
35const ANNEXB_START: [u8; 4] = [0, 0, 0, 1];
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub struct RtpHeader {
41 pub payload_type: u8,
43 pub marker: bool,
45 pub sequence: u16,
47 pub timestamp: u32,
49 pub ssrc: u32,
51 pub payload_offset: usize,
53}
54
55impl RtpHeader {
56 pub fn parse(buf: &[u8]) -> Option<RtpHeader> {
60 use super::byteops::ByteReader;
61 let mut r = ByteReader::new(buf);
62 let b0 = r.u8()?;
63 if b0 >> 6 != 2 {
64 return None; }
66 let has_extension = b0 & 0x10 != 0;
67 let csrc_count = (b0 & 0x0F) as usize;
68 let b1 = r.u8()?;
69 let marker = b1 & 0x80 != 0;
70 let payload_type = b1 & 0x7F;
71 let sequence = r.u16_be()?;
72 let timestamp = r.u32_be()?;
73 let ssrc = r.u32_be()?;
74 r.skip(csrc_count * 4)?; if has_extension {
77 r.skip(2)?;
79 let ext_words = r.u16_be()? as usize;
80 r.skip(ext_words * 4)?;
81 }
82 Some(RtpHeader {
83 payload_type,
84 marker,
85 sequence,
86 timestamp,
87 ssrc,
88 payload_offset: r.position(),
89 })
90 }
91}
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
95#[non_exhaustive]
96pub enum DepacketizeError {
97 Truncated,
99 OutOfOrder,
102 Unsupported(u8),
104}
105
106#[derive(Debug, Clone, Copy, Default)]
115pub struct AacDepacketizer {
116 size_length: u8,
118 index_length: u8,
120}
121
122impl AacDepacketizer {
123 pub fn new() -> Self {
126 Self {
127 size_length: 13,
128 index_length: 3,
129 }
130 }
131
132 pub fn with_lengths(size_length: u8, index_length: u8) -> Self {
134 Self {
135 size_length,
136 index_length,
137 }
138 }
139
140 pub fn push(&self, payload: &[u8]) -> Result<Vec<Bytes>, DepacketizeError> {
142 if payload.len() < 2 {
143 return Err(DepacketizeError::Truncated);
144 }
145 if self.size_length == 0 || self.size_length > 16 {
148 return Err(DepacketizeError::Unsupported(self.size_length));
149 }
150 let header_bits = u16::from_be_bytes([payload[0], payload[1]]) as usize;
151 let au_header_bits = self.size_length as usize + self.index_length as usize;
152 if au_header_bits == 0 {
153 return Err(DepacketizeError::Unsupported(0));
154 }
155 let header_bytes = header_bits.div_ceil(8);
156 let au_count = header_bits / au_header_bits;
157 let headers = payload
158 .get(2..2 + header_bytes)
159 .ok_or(DepacketizeError::Truncated)?;
160 let mut data_off = 2 + header_bytes;
161 let mut out = Vec::with_capacity(au_count);
162 for i in 0..au_count {
163 let bit = i * au_header_bits;
166 let byte = bit / 8;
167 let hdr = headers
168 .get(byte..byte + 2)
169 .ok_or(DepacketizeError::Truncated)?;
170 let size = (u16::from_be_bytes([hdr[0], hdr[1]]) >> (16 - self.size_length)) as usize;
171 let end = data_off + size;
172 let au = payload
173 .get(data_off..end)
174 .ok_or(DepacketizeError::Truncated)?;
175 out.push(Bytes::copy_from_slice(au));
176 data_off = end;
177 }
178 Ok(out)
179 }
180}
181
182#[derive(Debug, Clone)]
189pub struct RtpPacketizer {
190 payload_type: u8,
191 ssrc: u32,
192 sequence: u16,
193 max_payload: usize,
195 codec: NalCodec,
198}
199
200#[derive(Debug, Clone, Copy, PartialEq, Eq)]
204enum NalCodec {
205 H264,
206 H265,
207}
208
209impl RtpPacketizer {
210 pub fn new(payload_type: u8, ssrc: u32, mtu: usize) -> Self {
214 Self::with_codec(payload_type, ssrc, mtu, NalCodec::H264)
215 }
216
217 pub fn new_h265(payload_type: u8, ssrc: u32, mtu: usize) -> Self {
220 Self::with_codec(payload_type, ssrc, mtu, NalCodec::H265)
221 }
222
223 fn with_codec(payload_type: u8, ssrc: u32, mtu: usize, codec: NalCodec) -> Self {
224 Self {
225 payload_type,
226 ssrc,
227 sequence: 0,
228 max_payload: mtu.saturating_sub(12).max(1),
229 codec,
230 }
231 }
232
233 fn emit(
237 &mut self,
238 recycle: &mut Vec<Vec<u8>>,
239 out: &mut Vec<Vec<u8>>,
240 marker: bool,
241 timestamp: u32,
242 fill: impl FnOnce(&mut Vec<u8>),
243 ) {
244 let mut buf = recycle.pop().unwrap_or_default();
245 buf.clear();
246 write_rtp_header(
247 &mut buf,
248 self.payload_type,
249 marker,
250 self.sequence,
251 timestamp,
252 self.ssrc,
253 );
254 self.sequence = self.sequence.wrapping_add(1);
255 fill(&mut buf);
256 out.push(buf);
257 }
258
259 pub fn packetize(&mut self, access_unit: &[u8], timestamp: u32) -> Vec<Vec<u8>> {
268 let mut out = Vec::new();
269 self.packetize_into(access_unit, timestamp, &mut out);
270 out
271 }
272
273 pub fn packetize_into(&mut self, access_unit: &[u8], timestamp: u32, out: &mut Vec<Vec<u8>>) {
281 let mut recycle = std::mem::take(out);
283 let nals: Vec<&[u8]> = crate::codec::nal::iter_nals(access_unit)
286 .filter(|n| !n.is_empty())
287 .collect();
288 for (i, nal) in nals.iter().enumerate() {
289 let last_nal = i + 1 == nals.len();
290 if nal.len() <= self.max_payload {
291 self.emit(&mut recycle, out, last_nal, timestamp, |b| {
293 b.extend_from_slice(nal)
294 });
295 } else {
296 match self.codec {
297 NalCodec::H264 => {
298 self.fragment_fua(nal, timestamp, last_nal, &mut recycle, out)
299 }
300 NalCodec::H265 => {
301 self.fragment_fu_h265(nal, timestamp, last_nal, &mut recycle, out)
302 }
303 }
304 }
305 }
306 }
307
308 fn fragment_fua(
310 &mut self,
311 nal: &[u8],
312 timestamp: u32,
313 last_nal: bool,
314 recycle: &mut Vec<Vec<u8>>,
315 out: &mut Vec<Vec<u8>>,
316 ) {
317 let nal_header = nal[0];
318 let fu_indicator = (nal_header & 0xE0) | 28; let nal_type = nal_header & 0x1F;
320 let body = &nal[1..];
321 let chunk = self.max_payload.saturating_sub(2).max(1);
323 let n_chunks = body.len().div_ceil(chunk);
324 for (idx, part) in body.chunks(chunk).enumerate() {
325 let start = idx == 0;
326 let end = idx + 1 == n_chunks;
327 let mut fu_header = nal_type;
328 if start {
329 fu_header |= 0x80;
330 }
331 if end {
332 fu_header |= 0x40;
333 }
334 self.emit(recycle, out, last_nal && end, timestamp, |pkt| {
336 pkt.push(fu_indicator);
337 pkt.push(fu_header);
338 pkt.extend_from_slice(part);
339 });
340 }
341 }
342
343 fn fragment_fu_h265(
349 &mut self,
350 nal: &[u8],
351 timestamp: u32,
352 last_nal: bool,
353 recycle: &mut Vec<Vec<u8>>,
354 out: &mut Vec<Vec<u8>>,
355 ) {
356 if nal.len() < 2 {
359 self.emit(recycle, out, last_nal, timestamp, |pkt| {
360 pkt.extend_from_slice(nal)
361 });
362 return;
363 }
364 let nal_type = (nal[0] >> 1) & 0x3F;
365 let payload_hdr0 = (nal[0] & 0x81) | (49 << 1);
368 let payload_hdr1 = nal[1];
369 let body = &nal[2..];
370 let chunk = self.max_payload.saturating_sub(3).max(1);
372 let n_chunks = body.len().div_ceil(chunk);
373 for (idx, part) in body.chunks(chunk).enumerate() {
374 let start = idx == 0;
375 let end = idx + 1 == n_chunks;
376 let mut fu_header = nal_type;
377 if start {
378 fu_header |= 0x80;
379 }
380 if end {
381 fu_header |= 0x40;
382 }
383 self.emit(recycle, out, last_nal && end, timestamp, |pkt| {
384 pkt.push(payload_hdr0);
385 pkt.push(payload_hdr1);
386 pkt.push(fu_header);
387 pkt.extend_from_slice(part);
388 });
389 }
390 }
391}
392
393#[derive(Debug, Default)]
401pub struct H264Depacketizer {
402 au: Vec<u8>,
404 fua: Vec<u8>,
406 in_fragment: bool,
408 fua_header: u8,
410 current_ts: Option<u32>,
412 last_seq: Option<u16>,
414}
415
416impl H264Depacketizer {
417 pub fn new() -> Self {
419 Self::default()
420 }
421
422 fn append_nal(&mut self, nal: &[u8]) {
424 self.au.extend_from_slice(&ANNEXB_START);
425 self.au.extend_from_slice(nal);
426 }
427
428 fn pending_is_keyframe(&self) -> bool {
430 let mut i = 0;
432 while i + 4 < self.au.len() {
433 if self.au[i..i + 4] == ANNEXB_START {
434 let nal_type = self.au[i + 4] & 0x1F;
435 if nal_type == 5 {
436 return true;
437 }
438 }
439 i += 1;
440 }
441 false
442 }
443
444 fn take_au(&mut self) -> Option<AccessUnit> {
446 if self.au.is_empty() {
447 return None;
448 }
449 let keyframe = self.pending_is_keyframe();
450 let timestamp = self.current_ts.unwrap_or(0);
451 let data = Bytes::from(std::mem::take(&mut self.au));
452 self.current_ts = None;
453 Some(AccessUnit {
454 data,
455 timestamp,
456 keyframe,
457 })
458 }
459
460 pub fn push(
463 &mut self,
464 payload: &[u8],
465 marker: bool,
466 timestamp: u32,
467 sequence: u16,
468 ) -> Result<Option<AccessUnit>, DepacketizeError> {
469 if payload.is_empty() {
470 return Err(DepacketizeError::Truncated);
471 }
472
473 let mut completed = None;
476 if let Some(ts) = self.current_ts {
477 if ts != timestamp && !self.in_fragment {
478 completed = self.take_au();
479 }
480 }
481 self.current_ts = Some(timestamp);
482
483 let nal_type = payload[0] & 0x1F;
484 match nal_type {
485 1..=23 => {
486 self.append_nal(payload);
488 }
489 24 => {
490 let mut i = 1;
492 while i + 2 <= payload.len() {
493 let size = u16::from_be_bytes([payload[i], payload[i + 1]]) as usize;
494 i += 2;
495 if i + size > payload.len() {
496 return Err(DepacketizeError::Truncated);
497 }
498 self.append_nal(&payload[i..i + size]);
499 i += size;
500 }
501 }
502 28 => {
503 if payload.len() < 2 {
505 return Err(DepacketizeError::Truncated);
506 }
507 let fu_header = payload[1];
508 let start = fu_header & 0x80 != 0;
509 let end = fu_header & 0x40 != 0;
510 let frag_type = fu_header & 0x1F;
511
512 if start {
513 self.fua_header = (payload[0] & 0xE0) | frag_type;
516 self.fua.clear();
517 self.fua.push(self.fua_header);
518 self.in_fragment = true;
519 } else if !self.in_fragment {
520 return Err(DepacketizeError::OutOfOrder);
522 } else if self.seq_gap(sequence) {
523 self.in_fragment = false;
524 self.fua.clear();
525 return Err(DepacketizeError::OutOfOrder);
526 }
527 self.fua.extend_from_slice(&payload[2..]);
528
529 if end && self.in_fragment {
530 let nal = std::mem::take(&mut self.fua);
531 self.append_nal(&nal);
532 self.in_fragment = false;
533 }
534 }
535 other => return Err(DepacketizeError::Unsupported(other)),
536 }
537
538 self.last_seq = Some(sequence);
539
540 if completed.is_some() {
541 return Ok(completed);
542 }
543 if marker {
544 return Ok(self.take_au());
545 }
546 Ok(None)
547 }
548
549 fn seq_gap(&self, sequence: u16) -> bool {
551 match self.last_seq {
552 Some(prev) => sequence.wrapping_sub(prev) != 1,
553 None => false,
554 }
555 }
556}
557
558#[derive(Debug, Default)]
569pub struct H265Depacketizer {
570 au: Vec<u8>,
572 fu: Vec<u8>,
574 in_fragment: bool,
576 current_ts: Option<u32>,
578 last_seq: Option<u16>,
580}
581
582impl H265Depacketizer {
583 pub fn new() -> Self {
585 Self::default()
586 }
587
588 fn append_nal(&mut self, nal: &[u8]) {
590 self.au.extend_from_slice(&ANNEXB_START);
591 self.au.extend_from_slice(nal);
592 }
593
594 fn pending_is_keyframe(&self) -> bool {
597 let mut i = 0;
598 while i + 4 < self.au.len() {
599 if self.au[i..i + 4] == ANNEXB_START {
600 let nal_type = (self.au[i + 4] >> 1) & 0x3F;
601 if (16..=23).contains(&nal_type) {
602 return true;
603 }
604 }
605 i += 1;
606 }
607 false
608 }
609
610 fn take_au(&mut self) -> Option<AccessUnit> {
612 if self.au.is_empty() {
613 return None;
614 }
615 let keyframe = self.pending_is_keyframe();
616 let timestamp = self.current_ts.unwrap_or(0);
617 let data = Bytes::from(std::mem::take(&mut self.au));
618 self.current_ts = None;
619 Some(AccessUnit {
620 data,
621 timestamp,
622 keyframe,
623 })
624 }
625
626 fn seq_gap(&self, sequence: u16) -> bool {
628 match self.last_seq {
629 Some(prev) => sequence.wrapping_sub(prev) != 1,
630 None => false,
631 }
632 }
633
634 pub fn push(
637 &mut self,
638 payload: &[u8],
639 marker: bool,
640 timestamp: u32,
641 sequence: u16,
642 ) -> Result<Option<AccessUnit>, DepacketizeError> {
643 if payload.len() < 2 {
645 return Err(DepacketizeError::Truncated);
646 }
647
648 let mut completed = None;
651 if let Some(ts) = self.current_ts {
652 if ts != timestamp && !self.in_fragment {
653 completed = self.take_au();
654 }
655 }
656 self.current_ts = Some(timestamp);
657
658 let nal_type = (payload[0] >> 1) & 0x3F;
659 match nal_type {
660 0..=47 => self.append_nal(payload),
662 48 => {
663 let mut i = 2;
665 while i + 2 <= payload.len() {
666 let size = u16::from_be_bytes([payload[i], payload[i + 1]]) as usize;
667 i += 2;
668 if i + size > payload.len() {
669 return Err(DepacketizeError::Truncated);
670 }
671 self.append_nal(&payload[i..i + size]);
672 i += size;
673 }
674 }
675 49 => {
676 if payload.len() < 3 {
678 return Err(DepacketizeError::Truncated);
679 }
680 let fu_header = payload[2];
681 let start = fu_header & 0x80 != 0;
682 let end = fu_header & 0x40 != 0;
683 let fu_type = fu_header & 0x3F;
684
685 if start {
686 let hdr0 = (payload[0] & 0x81) | (fu_type << 1);
689 let hdr1 = payload[1];
690 self.fu.clear();
691 self.fu.push(hdr0);
692 self.fu.push(hdr1);
693 self.in_fragment = true;
694 } else if !self.in_fragment {
695 return Err(DepacketizeError::OutOfOrder);
696 } else if self.seq_gap(sequence) {
697 self.in_fragment = false;
698 self.fu.clear();
699 return Err(DepacketizeError::OutOfOrder);
700 }
701 self.fu.extend_from_slice(&payload[3..]);
702
703 if end && self.in_fragment {
704 let nal = std::mem::take(&mut self.fu);
705 self.append_nal(&nal);
706 self.in_fragment = false;
707 }
708 }
709 other => return Err(DepacketizeError::Unsupported(other)),
710 }
711
712 self.last_seq = Some(sequence);
713
714 if completed.is_some() {
715 return Ok(completed);
716 }
717 if marker {
718 return Ok(self.take_au());
719 }
720 Ok(None)
721 }
722}
723
724#[derive(Debug, Clone, PartialEq, Eq)]
730pub struct AccessUnit {
731 pub data: Bytes,
733 pub timestamp: u32,
735 pub keyframe: bool,
737}
738
739fn write_rtp_header(out: &mut Vec<u8>, pt: u8, marker: bool, seq: u16, ts: u32, ssrc: u32) {
742 out.push(0x80); out.push(if marker { 0x80 } else { 0 } | (pt & 0x7F));
744 out.extend_from_slice(&seq.to_be_bytes());
745 out.extend_from_slice(&ts.to_be_bytes());
746 out.extend_from_slice(&ssrc.to_be_bytes());
747}
748
749#[derive(Debug, Clone)]
756pub struct OpusPacketizer {
757 payload_type: u8,
758 ssrc: u32,
759 sequence: u16,
760}
761
762impl OpusPacketizer {
763 pub fn new(payload_type: u8, ssrc: u32) -> Self {
765 Self {
766 payload_type,
767 ssrc,
768 sequence: 0,
769 }
770 }
771
772 pub fn packetize_into(&mut self, opus: &[u8], timestamp: u32, out: &mut Vec<Vec<u8>>) {
776 let mut recycle = std::mem::take(out);
777 let mut pkt = recycle.pop().unwrap_or_default();
778 pkt.clear();
779 write_rtp_header(
780 &mut pkt,
781 self.payload_type,
782 false,
783 self.sequence,
784 timestamp,
785 self.ssrc,
786 );
787 self.sequence = self.sequence.wrapping_add(1);
788 pkt.extend_from_slice(opus);
789 out.push(pkt);
790 }
791}
792
793#[derive(Debug, Clone)]
804pub struct Vp9Packetizer {
805 payload_type: u8,
806 ssrc: u32,
807 sequence: u16,
808 max_payload: usize,
809 picture_id: u16,
810}
811
812impl Vp9Packetizer {
813 pub fn new(payload_type: u8, ssrc: u32, mtu: usize) -> Self {
816 Self {
817 payload_type,
818 ssrc,
819 sequence: 0,
820 max_payload: mtu.saturating_sub(12 + 3).max(1),
822 picture_id: 0,
823 }
824 }
825
826 pub fn packetize(&mut self, frame: &[u8], timestamp: u32, keyframe: bool) -> Vec<Vec<u8>> {
829 let mut out = Vec::new();
830 self.packetize_into(frame, timestamp, keyframe, &mut out);
831 out
832 }
833
834 pub fn packetize_into(
837 &mut self,
838 frame: &[u8],
839 timestamp: u32,
840 keyframe: bool,
841 out: &mut Vec<Vec<u8>>,
842 ) {
843 let pid = self.picture_id & 0x7FFF;
844 self.picture_id = self.picture_id.wrapping_add(1);
845
846 let mut recycle = std::mem::take(out);
847 let chunks: Vec<&[u8]> = if frame.is_empty() {
848 vec![&[]]
849 } else {
850 frame.chunks(self.max_payload).collect()
851 };
852 let n = chunks.len();
853 for (i, chunk) in chunks.into_iter().enumerate() {
854 let begin = i == 0;
855 let end = i + 1 == n;
856 let mut pkt = recycle.pop().unwrap_or_default();
857 pkt.clear();
858 write_rtp_header(
859 &mut pkt,
860 self.payload_type,
861 end,
862 self.sequence,
863 timestamp,
864 self.ssrc,
865 );
866 self.sequence = self.sequence.wrapping_add(1);
867
868 let mut desc0 = 0x80; if !keyframe {
871 desc0 |= 0x40; }
873 if begin {
874 desc0 |= 0x08; }
876 if end {
877 desc0 |= 0x04; }
879 pkt.push(desc0);
880 pkt.push(0x80 | (pid >> 8) as u8);
882 pkt.push((pid & 0xFF) as u8);
883 pkt.extend_from_slice(chunk);
884 out.push(pkt);
885 }
886 }
887}
888
889#[derive(Debug, Default)]
892pub struct Vp9Depacketizer {
893 frame: Vec<u8>,
894 in_frame: bool,
895 keyframe: bool,
896 current_ts: Option<u32>,
897}
898
899impl Vp9Depacketizer {
900 pub fn new() -> Self {
902 Self::default()
903 }
904
905 pub fn push(
908 &mut self,
909 payload: &[u8],
910 marker: bool,
911 timestamp: u32,
912 ) -> Result<Option<AccessUnit>, DepacketizeError> {
913 if payload.is_empty() {
914 return Err(DepacketizeError::Truncated);
915 }
916 let desc0 = payload[0];
917 let has_pid = desc0 & 0x80 != 0;
918 let has_layer = desc0 & 0x20 != 0;
919 let flexible = desc0 & 0x10 != 0;
920 let begin = desc0 & 0x08 != 0;
921 let end = desc0 & 0x04 != 0;
922 let predicted = desc0 & 0x40 != 0;
923
924 let mut off = 1;
926 if has_pid {
927 let m = payload.get(off).ok_or(DepacketizeError::Truncated)? & 0x80 != 0;
929 off += if m { 2 } else { 1 };
930 }
931 if has_layer {
932 off += 1; if !flexible {
934 off += 1; }
936 }
937 if off > payload.len() {
938 return Err(DepacketizeError::Truncated);
939 }
940
941 if begin {
942 self.frame.clear();
943 self.in_frame = true;
944 self.keyframe = !predicted;
945 self.current_ts = Some(timestamp);
946 } else if !self.in_frame {
947 return Err(DepacketizeError::OutOfOrder);
948 }
949 self.frame.extend_from_slice(&payload[off..]);
950
951 if end && marker && self.in_frame {
952 self.in_frame = false;
953 return Ok(Some(AccessUnit {
954 data: Bytes::from(std::mem::take(&mut self.frame)),
955 timestamp: self.current_ts.unwrap_or(timestamp),
956 keyframe: self.keyframe,
957 }));
958 }
959 Ok(None)
960 }
961}
962
963#[cfg(feature = "codec-av1")]
967fn leb128_encode(mut v: u64, out: &mut Vec<u8>) {
968 loop {
969 let mut byte = (v & 0x7F) as u8;
970 v >>= 7;
971 if v != 0 {
972 byte |= 0x80;
973 }
974 out.push(byte);
975 if v == 0 {
976 break;
977 }
978 }
979}
980
981#[cfg(feature = "codec-av1")]
982const AV1_OBU_SEQUENCE_HEADER: u8 = 1;
983#[cfg(feature = "codec-av1")]
984const AV1_OBU_TEMPORAL_DELIMITER: u8 = 2;
985
986#[cfg(feature = "codec-av1")]
996#[derive(Debug, Clone)]
997pub struct Av1Packetizer {
998 payload_type: u8,
999 ssrc: u32,
1000 sequence: u16,
1001 max_payload: usize,
1002}
1003
1004#[cfg(feature = "codec-av1")]
1005impl Av1Packetizer {
1006 pub fn new(payload_type: u8, ssrc: u32, mtu: usize) -> Self {
1009 Self {
1010 payload_type,
1011 ssrc,
1012 sequence: 0,
1013 max_payload: mtu.saturating_sub(12 + 1).max(1),
1014 }
1015 }
1016
1017 pub fn packetize(&mut self, temporal_unit: &[u8], timestamp: u32) -> Vec<Vec<u8>> {
1019 let mut out = Vec::new();
1020 self.packetize_into(temporal_unit, timestamp, &mut out);
1021 out
1022 }
1023
1024 pub fn packetize_into(&mut self, temporal_unit: &[u8], timestamp: u32, out: &mut Vec<Vec<u8>>) {
1027 let mut stream = Vec::with_capacity(temporal_unit.len());
1030 let mut new_cvs = false;
1031 for obu in crate::codec::obu::iter_obus(temporal_unit) {
1032 if obu.obu_type == AV1_OBU_TEMPORAL_DELIMITER {
1033 continue;
1034 }
1035 if obu.obu_type == AV1_OBU_SEQUENCE_HEADER {
1036 new_cvs = true;
1037 }
1038 let header_len = 1 + obu.has_extension as usize;
1039 let mut element = Vec::with_capacity(header_len + obu.payload.len());
1040 element.push(obu.raw[0] & !0x02); if obu.has_extension {
1042 element.push(obu.raw[1]);
1043 }
1044 element.extend_from_slice(obu.payload);
1045 leb128_encode(element.len() as u64, &mut stream);
1046 stream.extend_from_slice(&element);
1047 }
1048
1049 let mut recycle = std::mem::take(out);
1050 let chunks: Vec<&[u8]> = if stream.is_empty() {
1051 vec![&[]]
1052 } else {
1053 stream.chunks(self.max_payload).collect()
1054 };
1055 let n = chunks.len();
1056 for (i, chunk) in chunks.into_iter().enumerate() {
1057 let last = i + 1 == n;
1058 let mut pkt = recycle.pop().unwrap_or_default();
1059 pkt.clear();
1060 write_rtp_header(
1061 &mut pkt,
1062 self.payload_type,
1063 last,
1064 self.sequence,
1065 timestamp,
1066 self.ssrc,
1067 );
1068 self.sequence = self.sequence.wrapping_add(1);
1069
1070 let mut agg = 0u8;
1073 if i > 0 {
1074 agg |= 0x80; }
1076 if !last {
1077 agg |= 0x40; }
1079 if i == 0 && new_cvs {
1080 agg |= 0x08; }
1082 pkt.push(agg);
1083 pkt.extend_from_slice(chunk);
1084 out.push(pkt);
1085 }
1086 }
1087}
1088
1089#[cfg(feature = "codec-av1")]
1094#[derive(Debug, Default)]
1095pub struct Av1Depacketizer {
1096 stream: Vec<u8>,
1097 new_cvs: bool,
1098 current_ts: Option<u32>,
1099}
1100
1101#[cfg(feature = "codec-av1")]
1102impl Av1Depacketizer {
1103 pub fn new() -> Self {
1105 Self::default()
1106 }
1107
1108 pub fn push(
1111 &mut self,
1112 payload: &[u8],
1113 marker: bool,
1114 timestamp: u32,
1115 ) -> Result<Option<AccessUnit>, DepacketizeError> {
1116 if payload.is_empty() {
1117 return Err(DepacketizeError::Truncated);
1118 }
1119 let agg = payload[0];
1120 if agg & 0x08 != 0 {
1121 self.new_cvs = true; }
1123 if self.current_ts.is_none() {
1124 self.current_ts = Some(timestamp);
1125 }
1126 self.stream.extend_from_slice(&payload[1..]);
1127
1128 if !marker {
1129 return Ok(None);
1130 }
1131
1132 let stream = std::mem::take(&mut self.stream);
1134 let mut tu = Vec::with_capacity(stream.len() + 8);
1135 let mut pos = 0;
1136 while pos < stream.len() {
1137 let len = leb128_decode(&stream, &mut pos).ok_or(DepacketizeError::Truncated)?;
1138 let end = pos.checked_add(len).ok_or(DepacketizeError::Truncated)?;
1139 let element = stream.get(pos..end).ok_or(DepacketizeError::Truncated)?;
1140 pos = end;
1141 let hdr0 = *element.first().ok_or(DepacketizeError::Truncated)?;
1143 let has_ext = (hdr0 >> 2) & 1 == 1;
1144 let header_len = 1 + has_ext as usize;
1145 let obu_payload = element
1146 .get(header_len..)
1147 .ok_or(DepacketizeError::Truncated)?;
1148 tu.push(hdr0 | 0x02);
1149 if has_ext {
1150 tu.push(element[1]);
1151 }
1152 leb128_encode(obu_payload.len() as u64, &mut tu);
1153 tu.extend_from_slice(obu_payload);
1154 }
1155
1156 let keyframe = std::mem::take(&mut self.new_cvs);
1157 let ts = self.current_ts.take().unwrap_or(timestamp);
1158 Ok(Some(AccessUnit {
1159 data: Bytes::from(tu),
1160 timestamp: ts,
1161 keyframe,
1162 }))
1163 }
1164}
1165
1166#[cfg(feature = "codec-av1")]
1169fn leb128_decode(data: &[u8], pos: &mut usize) -> Option<usize> {
1170 let mut value: u64 = 0;
1171 for i in 0..8 {
1172 let byte = *data.get(*pos)?;
1173 *pos += 1;
1174 value |= ((byte & 0x7F) as u64) << (i * 7);
1175 if byte & 0x80 == 0 {
1176 return usize::try_from(value).ok();
1177 }
1178 }
1179 None
1180}
1181
1182#[cfg(test)]
1183mod tests {
1184 use super::*;
1185
1186 fn rtp(seq: u16, ts: u32, marker: bool, payload: &[u8]) -> Vec<u8> {
1188 let mut p = vec![0x80, if marker { 0x80 | 96 } else { 96 }];
1189 p.extend_from_slice(&seq.to_be_bytes());
1190 p.extend_from_slice(&ts.to_be_bytes());
1191 p.extend_from_slice(&[0, 0, 0, 1]); p.extend_from_slice(payload);
1193 p
1194 }
1195
1196 #[test]
1197 fn parses_fixed_header_and_payload_offset() {
1198 let pkt = rtp(7, 9000, true, &[0x65, 0xAA]);
1199 let h = RtpHeader::parse(&pkt).unwrap();
1200 assert_eq!(h.sequence, 7);
1201 assert_eq!(h.timestamp, 9000);
1202 assert!(h.marker);
1203 assert_eq!(h.payload_type, 96);
1204 assert_eq!(h.payload_offset, 12);
1205 assert_eq!(&pkt[h.payload_offset..], &[0x65, 0xAA]);
1206 }
1207
1208 #[test]
1209 fn rejects_wrong_version_and_short_buffers() {
1210 assert!(RtpHeader::parse(&[0x00; 12]).is_none()); assert!(RtpHeader::parse(&[0x80; 4]).is_none()); }
1213
1214 #[test]
1215 fn honors_csrc_count_in_payload_offset() {
1216 let mut pkt = rtp(1, 0, false, &[0x41]);
1217 pkt[0] = 0x82; let mut with_csrc = pkt[..12].to_vec();
1219 with_csrc.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF, 0, 0, 0, 0]); with_csrc.push(0x41);
1221 let h = RtpHeader::parse(&with_csrc).unwrap();
1222 assert_eq!(h.payload_offset, 20);
1223 }
1224
1225 #[test]
1226 fn aac_hbr_splits_two_access_units() {
1227 let mut p = Vec::new();
1230 p.extend_from_slice(&32u16.to_be_bytes()); p.extend_from_slice(&((3u16) << 3).to_be_bytes()); p.extend_from_slice(&((2u16) << 3).to_be_bytes()); p.extend_from_slice(&[0xA1, 0xA2, 0xA3]); p.extend_from_slice(&[0xB1, 0xB2]); let aus = AacDepacketizer::new().push(&p).unwrap();
1236 assert_eq!(aus.len(), 2);
1237 assert_eq!(&aus[0][..], &[0xA1, 0xA2, 0xA3]);
1238 assert_eq!(&aus[1][..], &[0xB1, 0xB2]);
1239 }
1240
1241 #[test]
1242 fn aac_hbr_single_au() {
1243 let mut p = Vec::new();
1244 p.extend_from_slice(&16u16.to_be_bytes()); p.extend_from_slice(&((4u16) << 3).to_be_bytes()); p.extend_from_slice(&[1, 2, 3, 4]);
1247 let aus = AacDepacketizer::new().push(&p).unwrap();
1248 assert_eq!(aus.len(), 1);
1249 assert_eq!(&aus[0][..], &[1, 2, 3, 4]);
1250 }
1251
1252 #[test]
1253 fn aac_truncated_payload_errors() {
1254 assert_eq!(
1255 AacDepacketizer::new().push(&[0x00]),
1256 Err(DepacketizeError::Truncated)
1257 );
1258 let mut p = 16u16.to_be_bytes().to_vec();
1260 p.extend_from_slice(&((8u16) << 3).to_be_bytes());
1261 p.extend_from_slice(&[1, 2]);
1262 assert_eq!(
1263 AacDepacketizer::new().push(&p),
1264 Err(DepacketizeError::Truncated)
1265 );
1266 }
1267
1268 #[test]
1269 fn single_nal_packet_emits_annexb_on_marker() {
1270 let mut d = H264Depacketizer::new();
1271 let out = d.push(&[0x41, 0x9A, 0xBC], true, 3000, 1).unwrap().unwrap();
1273 assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x41, 0x9A, 0xBC]);
1274 assert!(!out.keyframe);
1275 assert_eq!(out.timestamp, 3000);
1276 }
1277
1278 #[test]
1279 fn idr_single_nal_is_flagged_keyframe() {
1280 let mut d = H264Depacketizer::new();
1281 let out = d.push(&[0x65, 0x01], true, 0, 1).unwrap().unwrap();
1282 assert!(out.keyframe);
1283 }
1284
1285 #[test]
1286 fn packetizer_single_nal_round_trips_through_depacketizer() {
1287 let au = [0, 0, 0, 1, 0x67, 0x42, 0x00, 0, 0, 0, 1, 0x65, 0x88, 0x99];
1289 let mut pkt = RtpPacketizer::new(96, 0xABCD, 1200);
1290 let packets = pkt.packetize(&au, 3000);
1291 assert_eq!(packets.len(), 2, "one packet per NAL");
1292
1293 let mut depack = H264Depacketizer::new();
1294 let mut out = None;
1295 for p in &packets {
1296 let h = RtpHeader::parse(p).unwrap();
1297 if let Some(au) = depack
1298 .push(&p[h.payload_offset..], h.marker, h.timestamp, h.sequence)
1299 .unwrap()
1300 {
1301 out = Some(au);
1302 }
1303 }
1304 let out = out.expect("AU completed on the marker packet");
1305 assert_eq!(&out.data[..], &au);
1306 assert!(out.keyframe);
1307 assert_eq!(out.timestamp, 3000);
1308 }
1309
1310 #[test]
1311 fn packetize_into_recycles_buffers_without_changing_output() {
1312 let au1 = [0, 0, 0, 1, 0x67, 0x42, 0x00, 0, 0, 0, 1, 0x65, 0x88, 0x99];
1316 let au2 = [0, 0, 0, 1, 0x65, 0x11, 0x22, 0x33];
1317
1318 let mut a = RtpPacketizer::new(96, 0xABCD, 1200);
1319 let mut b = RtpPacketizer::new(96, 0xABCD, 1200);
1320 let mut reused: Vec<Vec<u8>> = Vec::new();
1321
1322 for au in [&au1[..], &au2[..], &au1[..]] {
1323 let expected = a.packetize(au, 3000);
1324 b.packetize_into(au, 3000, &mut reused);
1326 assert_eq!(
1327 reused, expected,
1328 "recycled output matches allocating output"
1329 );
1330 }
1331 }
1332
1333 #[test]
1334 fn packetizer_fragments_oversized_nal_and_round_trips() {
1335 let mut nal = vec![0, 0, 0, 1, 0x65]; nal.extend((0..600u16).map(|i| i as u8)); let mut pkt = RtpPacketizer::new(96, 1, 100); let packets = pkt.packetize(&nal, 90);
1340 assert!(packets.len() > 1, "oversized NAL is fragmented");
1341 let markers: Vec<bool> = packets
1343 .iter()
1344 .map(|p| RtpHeader::parse(p).unwrap().marker)
1345 .collect();
1346 assert_eq!(markers.iter().filter(|m| **m).count(), 1);
1347 assert!(markers.last().unwrap());
1348
1349 let mut depack = H264Depacketizer::new();
1350 let mut out = None;
1351 for p in &packets {
1352 let h = RtpHeader::parse(p).unwrap();
1353 if let Some(au) = depack
1354 .push(&p[h.payload_offset..], h.marker, h.timestamp, h.sequence)
1355 .unwrap()
1356 {
1357 out = Some(au);
1358 }
1359 }
1360 assert_eq!(&out.unwrap().data[..], &nal[..]);
1361 }
1362
1363 #[test]
1364 fn stap_a_splits_aggregated_nals() {
1365 let payload = [24, 0, 2, 0xAA, 0xBB, 0, 3, 0xCC, 0xDD, 0xEE];
1367 let mut d = H264Depacketizer::new();
1368 let out = d.push(&payload, true, 0, 1).unwrap().unwrap();
1369 assert_eq!(
1370 &out.data[..],
1371 &[0, 0, 0, 1, 0xAA, 0xBB, 0, 0, 0, 1, 0xCC, 0xDD, 0xEE]
1372 );
1373 }
1374
1375 #[test]
1376 fn fu_a_reassembles_fragmented_nal() {
1377 let mut d = H264Depacketizer::new();
1378 assert!(d
1380 .push(&[0x7C, 0x85, 0x11, 0x22], false, 0, 1)
1381 .unwrap()
1382 .is_none());
1383 assert!(d.push(&[0x7C, 0x05, 0x33], false, 0, 2).unwrap().is_none());
1385 let out = d.push(&[0x7C, 0x45, 0x44], true, 0, 3).unwrap().unwrap();
1387 assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x65, 0x11, 0x22, 0x33, 0x44]);
1389 assert!(out.keyframe);
1390 }
1391
1392 #[test]
1393 fn fu_a_sequence_gap_reports_out_of_order() {
1394 let mut d = H264Depacketizer::new();
1395 d.push(&[0x7C, 0x85, 0x11], false, 0, 1).unwrap();
1396 assert_eq!(
1398 d.push(&[0x7C, 0x05, 0x22], false, 0, 5),
1399 Err(DepacketizeError::OutOfOrder)
1400 );
1401 }
1402
1403 #[test]
1404 fn timestamp_change_flushes_previous_au_without_marker() {
1405 let mut d = H264Depacketizer::new();
1406 assert!(d.push(&[0x41, 0x01], false, 1000, 1).unwrap().is_none());
1408 let out = d.push(&[0x41, 0x02], false, 2000, 2).unwrap().unwrap();
1410 assert_eq!(out.timestamp, 1000);
1411 assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x41, 0x01]);
1412 }
1413
1414 #[test]
1419 fn h265_single_nal_round_trips_through_depacketizer() {
1420 let au = [
1422 0, 0, 0, 1, 0x40, 0x01, 0xAA, 0, 0, 0, 1, 0x26, 0x01, 0x88, 0x99, ];
1425 let mut pkt = RtpPacketizer::new_h265(96, 0xABCD, 1200);
1426 let packets = pkt.packetize(&au, 3000);
1427 assert_eq!(packets.len(), 2, "one packet per NAL");
1428
1429 let mut depack = H265Depacketizer::new();
1430 let mut out = None;
1431 for p in &packets {
1432 let h = RtpHeader::parse(p).unwrap();
1433 if let Some(au) = depack
1434 .push(&p[h.payload_offset..], h.marker, h.timestamp, h.sequence)
1435 .unwrap()
1436 {
1437 out = Some(au);
1438 }
1439 }
1440 let out = out.expect("AU completed on the marker packet");
1441 assert_eq!(&out.data[..], &au);
1442 assert!(out.keyframe, "IRAP type 19 is a keyframe");
1443 assert_eq!(out.timestamp, 3000);
1444 }
1445
1446 #[test]
1447 fn h265_fragments_oversized_nal_and_round_trips() {
1448 let mut nal = vec![0, 0, 0, 1, 0x26, 0x01]; nal.extend((0..600u16).map(|i| i as u8));
1451 let mut pkt = RtpPacketizer::new_h265(96, 1, 100); let packets = pkt.packetize(&nal, 90);
1453 assert!(packets.len() > 1, "oversized NAL is fragmented");
1454 let markers: Vec<bool> = packets
1456 .iter()
1457 .map(|p| RtpHeader::parse(p).unwrap().marker)
1458 .collect();
1459 assert_eq!(markers.iter().filter(|m| **m).count(), 1);
1460 assert!(markers.last().unwrap());
1461 for p in &packets {
1463 let h = RtpHeader::parse(p).unwrap();
1464 let pt = (p[h.payload_offset] >> 1) & 0x3F;
1465 assert_eq!(pt, 49, "FU payload type");
1466 }
1467
1468 let mut depack = H265Depacketizer::new();
1469 let mut out = None;
1470 for p in &packets {
1471 let h = RtpHeader::parse(p).unwrap();
1472 if let Some(au) = depack
1473 .push(&p[h.payload_offset..], h.marker, h.timestamp, h.sequence)
1474 .unwrap()
1475 {
1476 out = Some(au);
1477 }
1478 }
1479 assert_eq!(&out.unwrap().data[..], &nal[..]);
1480 }
1481
1482 #[test]
1483 fn h265_ap_splits_aggregated_nals() {
1484 let payload = [0x60, 0x01, 0, 2, 0xAA, 0xBB, 0, 3, 0xCC, 0xDD, 0xEE];
1486 let mut d = H265Depacketizer::new();
1487 let out = d.push(&payload, true, 0, 1).unwrap().unwrap();
1488 assert_eq!(
1489 &out.data[..],
1490 &[0, 0, 0, 1, 0xAA, 0xBB, 0, 0, 0, 1, 0xCC, 0xDD, 0xEE]
1491 );
1492 }
1493
1494 #[test]
1495 fn h265_rejects_truncated_and_unsupported() {
1496 let mut d = H265Depacketizer::new();
1497 assert_eq!(
1499 d.push(&[0x26], true, 0, 1),
1500 Err(DepacketizeError::Truncated)
1501 );
1502 assert_eq!(
1504 d.push(&[50 << 1, 0x01, 0x00], true, 0, 2),
1505 Err(DepacketizeError::Unsupported(50))
1506 );
1507 }
1508
1509 fn vp9_depacketize(packets: &[Vec<u8>]) -> Option<AccessUnit> {
1512 let mut d = Vp9Depacketizer::new();
1513 let mut out = None;
1514 for p in packets {
1515 let h = RtpHeader::parse(p).unwrap();
1516 if let Some(f) = d
1517 .push(&p[h.payload_offset..], h.marker, h.timestamp)
1518 .unwrap()
1519 {
1520 out = Some(f);
1521 }
1522 }
1523 out
1524 }
1525
1526 #[test]
1527 fn vp9_fragmented_frame_round_trips() {
1528 let frame: Vec<u8> = (0..500u16).map(|i| i as u8).collect();
1529 let mut pkt = Vp9Packetizer::new(98, 0x1234, 100); let packets = pkt.packetize(&frame, 9000, true);
1531 assert!(packets.len() > 1, "frame fragmented");
1532
1533 let markers: Vec<bool> = packets
1535 .iter()
1536 .map(|p| RtpHeader::parse(p).unwrap().marker)
1537 .collect();
1538 assert_eq!(markers.iter().filter(|m| **m).count(), 1);
1539 assert!(markers.last().unwrap());
1540
1541 let out = vp9_depacketize(&packets).expect("frame completed");
1542 assert_eq!(&out.data[..], &frame[..]);
1543 assert!(out.keyframe, "keyframe → P bit clear");
1544 assert_eq!(out.timestamp, 9000);
1545 }
1546
1547 #[test]
1548 fn vp9_inter_frame_is_not_a_keyframe() {
1549 let mut pkt = Vp9Packetizer::new(98, 1, 1200);
1550 let packets = pkt.packetize(&[1, 2, 3], 0, false);
1551 assert_eq!(packets.len(), 1);
1552 let out = vp9_depacketize(&packets).expect("frame");
1553 assert_eq!(&out.data[..], &[1, 2, 3]);
1554 assert!(!out.keyframe, "P bit set → inter frame");
1555 }
1556
1557 #[cfg(feature = "codec-av1")]
1560 fn av1_depacketize(packets: &[Vec<u8>]) -> Option<AccessUnit> {
1561 let mut d = Av1Depacketizer::new();
1562 let mut out = None;
1563 for p in packets {
1564 let h = RtpHeader::parse(p).unwrap();
1565 if let Some(f) = d
1566 .push(&p[h.payload_offset..], h.marker, h.timestamp)
1567 .unwrap()
1568 {
1569 out = Some(f);
1570 }
1571 }
1572 out
1573 }
1574
1575 #[cfg(feature = "codec-av1")]
1576 #[test]
1577 fn av1_temporal_unit_round_trips_without_delimiter() {
1578 let td = [0x12u8, 0x00];
1580 let seq = [0x0Au8, 0x02, 0xAA, 0xBB];
1581 let frame = [0x32u8, 0x03, 0x11, 0x22, 0x33];
1582 let mut tu = Vec::new();
1583 tu.extend_from_slice(&td);
1584 tu.extend_from_slice(&seq);
1585 tu.extend_from_slice(&frame);
1586
1587 let mut pkt = Av1Packetizer::new(99, 7, 1200);
1588 let packets = pkt.packetize(&tu, 1000);
1589 let out = av1_depacketize(&packets).expect("TU completed");
1590
1591 let mut expected = Vec::new();
1593 expected.extend_from_slice(&seq);
1594 expected.extend_from_slice(&frame);
1595 assert_eq!(&out.data[..], &expected[..]);
1596 assert!(out.keyframe, "sequence header → new coded video sequence");
1597 assert_eq!(out.timestamp, 1000);
1598 }
1599
1600 #[cfg(feature = "codec-av1")]
1601 #[test]
1602 fn av1_large_temporal_unit_fragments_and_round_trips() {
1603 let mut frame = vec![0x32u8, 0xAC, 0x02];
1605 frame.extend((0..300u16).map(|i| i as u8));
1606 let mut tu = vec![0x12u8, 0x00]; tu.extend_from_slice(&frame);
1608
1609 let mut pkt = Av1Packetizer::new(99, 1, 64); let packets = pkt.packetize(&tu, 0);
1611 assert!(packets.len() > 1, "large TU fragmented");
1612 for (i, p) in packets.iter().enumerate() {
1614 let agg = p[RtpHeader::parse(p).unwrap().payload_offset];
1615 assert_eq!((agg & 0x80 != 0), i > 0, "Z continuation bit");
1616 assert_eq!(
1617 (agg & 0x40 != 0),
1618 i + 1 < packets.len(),
1619 "Y continuation bit"
1620 );
1621 }
1622
1623 let out = av1_depacketize(&packets).expect("TU completed");
1624 assert_eq!(&out.data[..], &frame[..], "frame OBU reconstructed");
1625 }
1626}