use bytes::Bytes;
const ANNEXB_START: [u8; 4] = [0, 0, 0, 1];
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RtpHeader {
pub payload_type: u8,
pub marker: bool,
pub sequence: u16,
pub timestamp: u32,
pub ssrc: u32,
pub payload_offset: usize,
}
impl RtpHeader {
pub fn parse(buf: &[u8]) -> Option<RtpHeader> {
use super::byteops::ByteReader;
let mut r = ByteReader::new(buf);
let b0 = r.u8()?;
if b0 >> 6 != 2 {
return None; }
let has_extension = b0 & 0x10 != 0;
let csrc_count = (b0 & 0x0F) as usize;
let b1 = r.u8()?;
let marker = b1 & 0x80 != 0;
let payload_type = b1 & 0x7F;
let sequence = r.u16_be()?;
let timestamp = r.u32_be()?;
let ssrc = r.u32_be()?;
r.skip(csrc_count * 4)?;
if has_extension {
r.skip(2)?;
let ext_words = r.u16_be()? as usize;
r.skip(ext_words * 4)?;
}
Some(RtpHeader {
payload_type,
marker,
sequence,
timestamp,
ssrc,
payload_offset: r.position(),
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum DepacketizeError {
Truncated,
OutOfOrder,
Unsupported(u8),
}
#[derive(Debug, Clone, Copy, Default)]
pub struct AacDepacketizer {
size_length: u8,
index_length: u8,
}
impl AacDepacketizer {
pub fn new() -> Self {
Self {
size_length: 13,
index_length: 3,
}
}
pub fn with_lengths(size_length: u8, index_length: u8) -> Self {
Self {
size_length,
index_length,
}
}
pub fn push(&self, payload: &[u8]) -> Result<Vec<Bytes>, DepacketizeError> {
if payload.len() < 2 {
return Err(DepacketizeError::Truncated);
}
if self.size_length == 0 || self.size_length > 16 {
return Err(DepacketizeError::Unsupported(self.size_length));
}
let header_bits = u16::from_be_bytes([payload[0], payload[1]]) as usize;
let au_header_bits = self.size_length as usize + self.index_length as usize;
if au_header_bits == 0 {
return Err(DepacketizeError::Unsupported(0));
}
let header_bytes = header_bits.div_ceil(8);
let au_count = header_bits / au_header_bits;
let headers = payload
.get(2..2 + header_bytes)
.ok_or(DepacketizeError::Truncated)?;
let mut data_off = 2 + header_bytes;
let mut out = Vec::with_capacity(au_count);
for i in 0..au_count {
let bit = i * au_header_bits;
let byte = bit / 8;
let hdr = headers
.get(byte..byte + 2)
.ok_or(DepacketizeError::Truncated)?;
let size = (u16::from_be_bytes([hdr[0], hdr[1]]) >> (16 - self.size_length)) as usize;
let end = data_off + size;
let au = payload
.get(data_off..end)
.ok_or(DepacketizeError::Truncated)?;
out.push(Bytes::copy_from_slice(au));
data_off = end;
}
Ok(out)
}
}
#[derive(Debug, Clone)]
pub struct RtpPacketizer {
payload_type: u8,
ssrc: u32,
sequence: u16,
max_payload: usize,
}
impl RtpPacketizer {
pub fn new(payload_type: u8, ssrc: u32, mtu: usize) -> Self {
Self {
payload_type,
ssrc,
sequence: 0,
max_payload: mtu.saturating_sub(12).max(1),
}
}
fn header(&mut self, marker: bool, timestamp: u32, out: &mut Vec<u8>) {
out.push(0x80); out.push(if marker { 0x80 } else { 0 } | (self.payload_type & 0x7F));
out.extend_from_slice(&self.sequence.to_be_bytes());
out.extend_from_slice(×tamp.to_be_bytes());
out.extend_from_slice(&self.ssrc.to_be_bytes());
self.sequence = self.sequence.wrapping_add(1);
}
pub fn packetize(&mut self, access_unit: &[u8], timestamp: u32) -> Vec<Vec<u8>> {
let nals: Vec<&[u8]> = crate::codec::h264::iter_nals_annexb(access_unit)
.filter(|n| !n.is_empty())
.collect();
let mut packets = Vec::new();
for (i, nal) in nals.iter().enumerate() {
let last_nal = i + 1 == nals.len();
if nal.len() <= self.max_payload {
let mut pkt = Vec::with_capacity(12 + nal.len());
self.header(last_nal, timestamp, &mut pkt);
pkt.extend_from_slice(nal);
packets.push(pkt);
} else {
self.fragment_fua(nal, timestamp, last_nal, &mut packets);
}
}
packets
}
fn fragment_fua(&mut self, nal: &[u8], timestamp: u32, last_nal: bool, out: &mut Vec<Vec<u8>>) {
let nal_header = nal[0];
let fu_indicator = (nal_header & 0xE0) | 28; let nal_type = nal_header & 0x1F;
let body = &nal[1..];
let chunk = self.max_payload.saturating_sub(2).max(1);
let n_chunks = body.len().div_ceil(chunk);
for (idx, part) in body.chunks(chunk).enumerate() {
let start = idx == 0;
let end = idx + 1 == n_chunks;
let mut fu_header = nal_type;
if start {
fu_header |= 0x80;
}
if end {
fu_header |= 0x40;
}
let mut pkt = Vec::with_capacity(12 + 2 + part.len());
self.header(last_nal && end, timestamp, &mut pkt);
pkt.push(fu_indicator);
pkt.push(fu_header);
pkt.extend_from_slice(part);
out.push(pkt);
}
}
}
#[derive(Debug, Default)]
pub struct H264Depacketizer {
au: Vec<u8>,
fua: Vec<u8>,
in_fragment: bool,
fua_header: u8,
current_ts: Option<u32>,
last_seq: Option<u16>,
}
impl H264Depacketizer {
pub fn new() -> Self {
Self::default()
}
fn append_nal(&mut self, nal: &[u8]) {
self.au.extend_from_slice(&ANNEXB_START);
self.au.extend_from_slice(nal);
}
fn pending_is_keyframe(&self) -> bool {
let mut i = 0;
while i + 4 < self.au.len() {
if self.au[i..i + 4] == ANNEXB_START {
let nal_type = self.au[i + 4] & 0x1F;
if nal_type == 5 {
return true;
}
}
i += 1;
}
false
}
fn take_au(&mut self) -> Option<AccessUnit> {
if self.au.is_empty() {
return None;
}
let keyframe = self.pending_is_keyframe();
let timestamp = self.current_ts.unwrap_or(0);
let data = Bytes::from(std::mem::take(&mut self.au));
self.current_ts = None;
Some(AccessUnit {
data,
timestamp,
keyframe,
})
}
pub fn push(
&mut self,
payload: &[u8],
marker: bool,
timestamp: u32,
sequence: u16,
) -> Result<Option<AccessUnit>, DepacketizeError> {
if payload.is_empty() {
return Err(DepacketizeError::Truncated);
}
let mut completed = None;
if let Some(ts) = self.current_ts {
if ts != timestamp && !self.in_fragment {
completed = self.take_au();
}
}
self.current_ts = Some(timestamp);
let nal_type = payload[0] & 0x1F;
match nal_type {
1..=23 => {
self.append_nal(payload);
}
24 => {
let mut i = 1;
while i + 2 <= payload.len() {
let size = u16::from_be_bytes([payload[i], payload[i + 1]]) as usize;
i += 2;
if i + size > payload.len() {
return Err(DepacketizeError::Truncated);
}
self.append_nal(&payload[i..i + size]);
i += size;
}
}
28 => {
if payload.len() < 2 {
return Err(DepacketizeError::Truncated);
}
let fu_header = payload[1];
let start = fu_header & 0x80 != 0;
let end = fu_header & 0x40 != 0;
let frag_type = fu_header & 0x1F;
if start {
self.fua_header = (payload[0] & 0xE0) | frag_type;
self.fua.clear();
self.fua.push(self.fua_header);
self.in_fragment = true;
} else if !self.in_fragment {
return Err(DepacketizeError::OutOfOrder);
} else if self.seq_gap(sequence) {
self.in_fragment = false;
self.fua.clear();
return Err(DepacketizeError::OutOfOrder);
}
self.fua.extend_from_slice(&payload[2..]);
if end && self.in_fragment {
let nal = std::mem::take(&mut self.fua);
self.append_nal(&nal);
self.in_fragment = false;
}
}
other => return Err(DepacketizeError::Unsupported(other)),
}
self.last_seq = Some(sequence);
if completed.is_some() {
return Ok(completed);
}
if marker {
return Ok(self.take_au());
}
Ok(None)
}
fn seq_gap(&self, sequence: u16) -> bool {
match self.last_seq {
Some(prev) => sequence.wrapping_sub(prev) != 1,
None => false,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AccessUnit {
pub data: Bytes,
pub timestamp: u32,
pub keyframe: bool,
}
#[cfg(test)]
mod tests {
use super::*;
fn rtp(seq: u16, ts: u32, marker: bool, payload: &[u8]) -> Vec<u8> {
let mut p = vec![0x80, if marker { 0x80 | 96 } else { 96 }];
p.extend_from_slice(&seq.to_be_bytes());
p.extend_from_slice(&ts.to_be_bytes());
p.extend_from_slice(&[0, 0, 0, 1]); p.extend_from_slice(payload);
p
}
#[test]
fn parses_fixed_header_and_payload_offset() {
let pkt = rtp(7, 9000, true, &[0x65, 0xAA]);
let h = RtpHeader::parse(&pkt).unwrap();
assert_eq!(h.sequence, 7);
assert_eq!(h.timestamp, 9000);
assert!(h.marker);
assert_eq!(h.payload_type, 96);
assert_eq!(h.payload_offset, 12);
assert_eq!(&pkt[h.payload_offset..], &[0x65, 0xAA]);
}
#[test]
fn rejects_wrong_version_and_short_buffers() {
assert!(RtpHeader::parse(&[0x00; 12]).is_none()); assert!(RtpHeader::parse(&[0x80; 4]).is_none()); }
#[test]
fn honors_csrc_count_in_payload_offset() {
let mut pkt = rtp(1, 0, false, &[0x41]);
pkt[0] = 0x82; let mut with_csrc = pkt[..12].to_vec();
with_csrc.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF, 0, 0, 0, 0]); with_csrc.push(0x41);
let h = RtpHeader::parse(&with_csrc).unwrap();
assert_eq!(h.payload_offset, 20);
}
#[test]
fn aac_hbr_splits_two_access_units() {
let mut p = Vec::new();
p.extend_from_slice(&32u16.to_be_bytes()); p.extend_from_slice(&((3u16) << 3).to_be_bytes()); p.extend_from_slice(&((2u16) << 3).to_be_bytes()); p.extend_from_slice(&[0xA1, 0xA2, 0xA3]); p.extend_from_slice(&[0xB1, 0xB2]); let aus = AacDepacketizer::new().push(&p).unwrap();
assert_eq!(aus.len(), 2);
assert_eq!(&aus[0][..], &[0xA1, 0xA2, 0xA3]);
assert_eq!(&aus[1][..], &[0xB1, 0xB2]);
}
#[test]
fn aac_hbr_single_au() {
let mut p = Vec::new();
p.extend_from_slice(&16u16.to_be_bytes()); p.extend_from_slice(&((4u16) << 3).to_be_bytes()); p.extend_from_slice(&[1, 2, 3, 4]);
let aus = AacDepacketizer::new().push(&p).unwrap();
assert_eq!(aus.len(), 1);
assert_eq!(&aus[0][..], &[1, 2, 3, 4]);
}
#[test]
fn aac_truncated_payload_errors() {
assert_eq!(
AacDepacketizer::new().push(&[0x00]),
Err(DepacketizeError::Truncated)
);
let mut p = 16u16.to_be_bytes().to_vec();
p.extend_from_slice(&((8u16) << 3).to_be_bytes());
p.extend_from_slice(&[1, 2]);
assert_eq!(
AacDepacketizer::new().push(&p),
Err(DepacketizeError::Truncated)
);
}
#[test]
fn single_nal_packet_emits_annexb_on_marker() {
let mut d = H264Depacketizer::new();
let out = d.push(&[0x41, 0x9A, 0xBC], true, 3000, 1).unwrap().unwrap();
assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x41, 0x9A, 0xBC]);
assert!(!out.keyframe);
assert_eq!(out.timestamp, 3000);
}
#[test]
fn idr_single_nal_is_flagged_keyframe() {
let mut d = H264Depacketizer::new();
let out = d.push(&[0x65, 0x01], true, 0, 1).unwrap().unwrap();
assert!(out.keyframe);
}
#[test]
fn packetizer_single_nal_round_trips_through_depacketizer() {
let au = [0, 0, 0, 1, 0x67, 0x42, 0x00, 0, 0, 0, 1, 0x65, 0x88, 0x99];
let mut pkt = RtpPacketizer::new(96, 0xABCD, 1200);
let packets = pkt.packetize(&au, 3000);
assert_eq!(packets.len(), 2, "one packet per NAL");
let mut depack = H264Depacketizer::new();
let mut out = None;
for p in &packets {
let h = RtpHeader::parse(p).unwrap();
if let Some(au) = depack
.push(&p[h.payload_offset..], h.marker, h.timestamp, h.sequence)
.unwrap()
{
out = Some(au);
}
}
let out = out.expect("AU completed on the marker packet");
assert_eq!(&out.data[..], &au);
assert!(out.keyframe);
assert_eq!(out.timestamp, 3000);
}
#[test]
fn packetizer_fragments_oversized_nal_and_round_trips() {
let mut nal = vec![0, 0, 0, 1, 0x65]; nal.extend((0..600u16).map(|i| i as u8)); let mut pkt = RtpPacketizer::new(96, 1, 100); let packets = pkt.packetize(&nal, 90);
assert!(packets.len() > 1, "oversized NAL is fragmented");
let markers: Vec<bool> = packets
.iter()
.map(|p| RtpHeader::parse(p).unwrap().marker)
.collect();
assert_eq!(markers.iter().filter(|m| **m).count(), 1);
assert!(markers.last().unwrap());
let mut depack = H264Depacketizer::new();
let mut out = None;
for p in &packets {
let h = RtpHeader::parse(p).unwrap();
if let Some(au) = depack
.push(&p[h.payload_offset..], h.marker, h.timestamp, h.sequence)
.unwrap()
{
out = Some(au);
}
}
assert_eq!(&out.unwrap().data[..], &nal[..]);
}
#[test]
fn stap_a_splits_aggregated_nals() {
let payload = [24, 0, 2, 0xAA, 0xBB, 0, 3, 0xCC, 0xDD, 0xEE];
let mut d = H264Depacketizer::new();
let out = d.push(&payload, true, 0, 1).unwrap().unwrap();
assert_eq!(
&out.data[..],
&[0, 0, 0, 1, 0xAA, 0xBB, 0, 0, 0, 1, 0xCC, 0xDD, 0xEE]
);
}
#[test]
fn fu_a_reassembles_fragmented_nal() {
let mut d = H264Depacketizer::new();
assert!(d
.push(&[0x7C, 0x85, 0x11, 0x22], false, 0, 1)
.unwrap()
.is_none());
assert!(d.push(&[0x7C, 0x05, 0x33], false, 0, 2).unwrap().is_none());
let out = d.push(&[0x7C, 0x45, 0x44], true, 0, 3).unwrap().unwrap();
assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x65, 0x11, 0x22, 0x33, 0x44]);
assert!(out.keyframe);
}
#[test]
fn fu_a_sequence_gap_reports_out_of_order() {
let mut d = H264Depacketizer::new();
d.push(&[0x7C, 0x85, 0x11], false, 0, 1).unwrap();
assert_eq!(
d.push(&[0x7C, 0x05, 0x22], false, 0, 5),
Err(DepacketizeError::OutOfOrder)
);
}
#[test]
fn timestamp_change_flushes_previous_au_without_marker() {
let mut d = H264Depacketizer::new();
assert!(d.push(&[0x41, 0x01], false, 1000, 1).unwrap().is_none());
let out = d.push(&[0x41, 0x02], false, 2000, 2).unwrap().unwrap();
assert_eq!(out.timestamp, 1000);
assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x41, 0x01]);
}
}