use bytes::Bytes;
const ANNEXB_START: [u8; 4] = [0, 0, 0, 1];
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RtpHeader {
pub payload_type: u8,
pub marker: bool,
pub sequence: u16,
pub timestamp: u32,
pub ssrc: u32,
pub payload_offset: usize,
}
impl RtpHeader {
pub fn parse(buf: &[u8]) -> Option<RtpHeader> {
use super::byteops::ByteReader;
let mut r = ByteReader::new(buf);
let b0 = r.u8()?;
if b0 >> 6 != 2 {
return None; }
let has_extension = b0 & 0x10 != 0;
let csrc_count = (b0 & 0x0F) as usize;
let b1 = r.u8()?;
let marker = b1 & 0x80 != 0;
let payload_type = b1 & 0x7F;
let sequence = r.u16_be()?;
let timestamp = r.u32_be()?;
let ssrc = r.u32_be()?;
r.skip(csrc_count * 4)?;
if has_extension {
r.skip(2)?;
let ext_words = r.u16_be()? as usize;
r.skip(ext_words * 4)?;
}
Some(RtpHeader {
payload_type,
marker,
sequence,
timestamp,
ssrc,
payload_offset: r.position(),
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum DepacketizeError {
Truncated,
OutOfOrder,
Unsupported(u8),
}
#[derive(Debug, Clone, Copy, Default)]
pub struct AacDepacketizer {
size_length: u8,
index_length: u8,
}
impl AacDepacketizer {
pub fn new() -> Self {
Self {
size_length: 13,
index_length: 3,
}
}
pub fn with_lengths(size_length: u8, index_length: u8) -> Self {
Self {
size_length,
index_length,
}
}
pub fn push(&self, payload: &[u8]) -> Result<Vec<Bytes>, DepacketizeError> {
if payload.len() < 2 {
return Err(DepacketizeError::Truncated);
}
if self.size_length == 0 || self.size_length > 16 {
return Err(DepacketizeError::Unsupported(self.size_length));
}
let header_bits = u16::from_be_bytes([payload[0], payload[1]]) as usize;
let au_header_bits = self.size_length as usize + self.index_length as usize;
if au_header_bits == 0 {
return Err(DepacketizeError::Unsupported(0));
}
let header_bytes = header_bits.div_ceil(8);
let au_count = header_bits / au_header_bits;
let headers = payload
.get(2..2 + header_bytes)
.ok_or(DepacketizeError::Truncated)?;
let mut data_off = 2 + header_bytes;
let mut out = Vec::with_capacity(au_count);
for i in 0..au_count {
let bit = i * au_header_bits;
let byte = bit / 8;
let hdr = headers
.get(byte..byte + 2)
.ok_or(DepacketizeError::Truncated)?;
let size = (u16::from_be_bytes([hdr[0], hdr[1]]) >> (16 - self.size_length)) as usize;
let end = data_off + size;
let au = payload
.get(data_off..end)
.ok_or(DepacketizeError::Truncated)?;
out.push(Bytes::copy_from_slice(au));
data_off = end;
}
Ok(out)
}
}
#[derive(Debug, Clone)]
pub struct RtpPacketizer {
payload_type: u8,
ssrc: u32,
sequence: u16,
max_payload: usize,
codec: NalCodec,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum NalCodec {
H264,
H265,
}
impl RtpPacketizer {
pub fn new(payload_type: u8, ssrc: u32, mtu: usize) -> Self {
Self::with_codec(payload_type, ssrc, mtu, NalCodec::H264)
}
pub fn new_h265(payload_type: u8, ssrc: u32, mtu: usize) -> Self {
Self::with_codec(payload_type, ssrc, mtu, NalCodec::H265)
}
fn with_codec(payload_type: u8, ssrc: u32, mtu: usize, codec: NalCodec) -> Self {
Self {
payload_type,
ssrc,
sequence: 0,
max_payload: mtu.saturating_sub(12).max(1),
codec,
}
}
fn emit(
&mut self,
recycle: &mut Vec<Vec<u8>>,
out: &mut Vec<Vec<u8>>,
marker: bool,
timestamp: u32,
fill: impl FnOnce(&mut Vec<u8>),
) {
let mut buf = recycle.pop().unwrap_or_default();
buf.clear();
write_rtp_header(
&mut buf,
self.payload_type,
marker,
self.sequence,
timestamp,
self.ssrc,
);
self.sequence = self.sequence.wrapping_add(1);
fill(&mut buf);
out.push(buf);
}
pub fn packetize(&mut self, access_unit: &[u8], timestamp: u32) -> Vec<Vec<u8>> {
let mut out = Vec::new();
self.packetize_into(access_unit, timestamp, &mut out);
out
}
pub fn packetize_into(&mut self, access_unit: &[u8], timestamp: u32, out: &mut Vec<Vec<u8>>) {
let mut recycle = std::mem::take(out);
let nals: Vec<&[u8]> = crate::codec::nal::iter_nals(access_unit)
.filter(|n| !n.is_empty())
.collect();
for (i, nal) in nals.iter().enumerate() {
let last_nal = i + 1 == nals.len();
if nal.len() <= self.max_payload {
self.emit(&mut recycle, out, last_nal, timestamp, |b| {
b.extend_from_slice(nal)
});
} else {
match self.codec {
NalCodec::H264 => {
self.fragment_fua(nal, timestamp, last_nal, &mut recycle, out)
}
NalCodec::H265 => {
self.fragment_fu_h265(nal, timestamp, last_nal, &mut recycle, out)
}
}
}
}
}
fn fragment_fua(
&mut self,
nal: &[u8],
timestamp: u32,
last_nal: bool,
recycle: &mut Vec<Vec<u8>>,
out: &mut Vec<Vec<u8>>,
) {
let nal_header = nal[0];
let fu_indicator = (nal_header & 0xE0) | 28; let nal_type = nal_header & 0x1F;
let body = &nal[1..];
let chunk = self.max_payload.saturating_sub(2).max(1);
let n_chunks = body.len().div_ceil(chunk);
for (idx, part) in body.chunks(chunk).enumerate() {
let start = idx == 0;
let end = idx + 1 == n_chunks;
let mut fu_header = nal_type;
if start {
fu_header |= 0x80;
}
if end {
fu_header |= 0x40;
}
self.emit(recycle, out, last_nal && end, timestamp, |pkt| {
pkt.push(fu_indicator);
pkt.push(fu_header);
pkt.extend_from_slice(part);
});
}
}
fn fragment_fu_h265(
&mut self,
nal: &[u8],
timestamp: u32,
last_nal: bool,
recycle: &mut Vec<Vec<u8>>,
out: &mut Vec<Vec<u8>>,
) {
if nal.len() < 2 {
self.emit(recycle, out, last_nal, timestamp, |pkt| {
pkt.extend_from_slice(nal)
});
return;
}
let nal_type = (nal[0] >> 1) & 0x3F;
let payload_hdr0 = (nal[0] & 0x81) | (49 << 1);
let payload_hdr1 = nal[1];
let body = &nal[2..];
let chunk = self.max_payload.saturating_sub(3).max(1);
let n_chunks = body.len().div_ceil(chunk);
for (idx, part) in body.chunks(chunk).enumerate() {
let start = idx == 0;
let end = idx + 1 == n_chunks;
let mut fu_header = nal_type;
if start {
fu_header |= 0x80;
}
if end {
fu_header |= 0x40;
}
self.emit(recycle, out, last_nal && end, timestamp, |pkt| {
pkt.push(payload_hdr0);
pkt.push(payload_hdr1);
pkt.push(fu_header);
pkt.extend_from_slice(part);
});
}
}
}
#[derive(Debug, Default)]
pub struct H264Depacketizer {
au: Vec<u8>,
fua: Vec<u8>,
in_fragment: bool,
fua_header: u8,
current_ts: Option<u32>,
last_seq: Option<u16>,
}
impl H264Depacketizer {
pub fn new() -> Self {
Self::default()
}
fn append_nal(&mut self, nal: &[u8]) {
self.au.extend_from_slice(&ANNEXB_START);
self.au.extend_from_slice(nal);
}
fn pending_is_keyframe(&self) -> bool {
let mut i = 0;
while i + 4 < self.au.len() {
if self.au[i..i + 4] == ANNEXB_START {
let nal_type = self.au[i + 4] & 0x1F;
if nal_type == 5 {
return true;
}
}
i += 1;
}
false
}
fn take_au(&mut self) -> Option<AccessUnit> {
if self.au.is_empty() {
return None;
}
let keyframe = self.pending_is_keyframe();
let timestamp = self.current_ts.unwrap_or(0);
let data = Bytes::from(std::mem::take(&mut self.au));
self.current_ts = None;
Some(AccessUnit {
data,
timestamp,
keyframe,
})
}
pub fn push(
&mut self,
payload: &[u8],
marker: bool,
timestamp: u32,
sequence: u16,
) -> Result<Option<AccessUnit>, DepacketizeError> {
if payload.is_empty() {
return Err(DepacketizeError::Truncated);
}
let mut completed = None;
if let Some(ts) = self.current_ts {
if ts != timestamp && !self.in_fragment {
completed = self.take_au();
}
}
self.current_ts = Some(timestamp);
let nal_type = payload[0] & 0x1F;
match nal_type {
1..=23 => {
self.append_nal(payload);
}
24 => {
let mut i = 1;
while i + 2 <= payload.len() {
let size = u16::from_be_bytes([payload[i], payload[i + 1]]) as usize;
i += 2;
if i + size > payload.len() {
return Err(DepacketizeError::Truncated);
}
self.append_nal(&payload[i..i + size]);
i += size;
}
}
28 => {
if payload.len() < 2 {
return Err(DepacketizeError::Truncated);
}
let fu_header = payload[1];
let start = fu_header & 0x80 != 0;
let end = fu_header & 0x40 != 0;
let frag_type = fu_header & 0x1F;
if start {
self.fua_header = (payload[0] & 0xE0) | frag_type;
self.fua.clear();
self.fua.push(self.fua_header);
self.in_fragment = true;
} else if !self.in_fragment {
return Err(DepacketizeError::OutOfOrder);
} else if self.seq_gap(sequence) {
self.in_fragment = false;
self.fua.clear();
return Err(DepacketizeError::OutOfOrder);
}
self.fua.extend_from_slice(&payload[2..]);
if end && self.in_fragment {
let nal = std::mem::take(&mut self.fua);
self.append_nal(&nal);
self.in_fragment = false;
}
}
other => return Err(DepacketizeError::Unsupported(other)),
}
self.last_seq = Some(sequence);
if completed.is_some() {
return Ok(completed);
}
if marker {
return Ok(self.take_au());
}
Ok(None)
}
fn seq_gap(&self, sequence: u16) -> bool {
match self.last_seq {
Some(prev) => sequence.wrapping_sub(prev) != 1,
None => false,
}
}
}
#[derive(Debug, Default)]
pub struct H265Depacketizer {
au: Vec<u8>,
fu: Vec<u8>,
in_fragment: bool,
current_ts: Option<u32>,
last_seq: Option<u16>,
}
impl H265Depacketizer {
pub fn new() -> Self {
Self::default()
}
fn append_nal(&mut self, nal: &[u8]) {
self.au.extend_from_slice(&ANNEXB_START);
self.au.extend_from_slice(nal);
}
fn pending_is_keyframe(&self) -> bool {
let mut i = 0;
while i + 4 < self.au.len() {
if self.au[i..i + 4] == ANNEXB_START {
let nal_type = (self.au[i + 4] >> 1) & 0x3F;
if (16..=23).contains(&nal_type) {
return true;
}
}
i += 1;
}
false
}
fn take_au(&mut self) -> Option<AccessUnit> {
if self.au.is_empty() {
return None;
}
let keyframe = self.pending_is_keyframe();
let timestamp = self.current_ts.unwrap_or(0);
let data = Bytes::from(std::mem::take(&mut self.au));
self.current_ts = None;
Some(AccessUnit {
data,
timestamp,
keyframe,
})
}
fn seq_gap(&self, sequence: u16) -> bool {
match self.last_seq {
Some(prev) => sequence.wrapping_sub(prev) != 1,
None => false,
}
}
pub fn push(
&mut self,
payload: &[u8],
marker: bool,
timestamp: u32,
sequence: u16,
) -> Result<Option<AccessUnit>, DepacketizeError> {
if payload.len() < 2 {
return Err(DepacketizeError::Truncated);
}
let mut completed = None;
if let Some(ts) = self.current_ts {
if ts != timestamp && !self.in_fragment {
completed = self.take_au();
}
}
self.current_ts = Some(timestamp);
let nal_type = (payload[0] >> 1) & 0x3F;
match nal_type {
0..=47 => self.append_nal(payload),
48 => {
let mut i = 2;
while i + 2 <= payload.len() {
let size = u16::from_be_bytes([payload[i], payload[i + 1]]) as usize;
i += 2;
if i + size > payload.len() {
return Err(DepacketizeError::Truncated);
}
self.append_nal(&payload[i..i + size]);
i += size;
}
}
49 => {
if payload.len() < 3 {
return Err(DepacketizeError::Truncated);
}
let fu_header = payload[2];
let start = fu_header & 0x80 != 0;
let end = fu_header & 0x40 != 0;
let fu_type = fu_header & 0x3F;
if start {
let hdr0 = (payload[0] & 0x81) | (fu_type << 1);
let hdr1 = payload[1];
self.fu.clear();
self.fu.push(hdr0);
self.fu.push(hdr1);
self.in_fragment = true;
} else if !self.in_fragment {
return Err(DepacketizeError::OutOfOrder);
} else if self.seq_gap(sequence) {
self.in_fragment = false;
self.fu.clear();
return Err(DepacketizeError::OutOfOrder);
}
self.fu.extend_from_slice(&payload[3..]);
if end && self.in_fragment {
let nal = std::mem::take(&mut self.fu);
self.append_nal(&nal);
self.in_fragment = false;
}
}
other => return Err(DepacketizeError::Unsupported(other)),
}
self.last_seq = Some(sequence);
if completed.is_some() {
return Ok(completed);
}
if marker {
return Ok(self.take_au());
}
Ok(None)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AccessUnit {
pub data: Bytes,
pub timestamp: u32,
pub keyframe: bool,
}
fn write_rtp_header(out: &mut Vec<u8>, pt: u8, marker: bool, seq: u16, ts: u32, ssrc: u32) {
out.push(0x80); out.push(if marker { 0x80 } else { 0 } | (pt & 0x7F));
out.extend_from_slice(&seq.to_be_bytes());
out.extend_from_slice(&ts.to_be_bytes());
out.extend_from_slice(&ssrc.to_be_bytes());
}
#[derive(Debug, Clone)]
pub struct OpusPacketizer {
payload_type: u8,
ssrc: u32,
sequence: u16,
}
impl OpusPacketizer {
pub fn new(payload_type: u8, ssrc: u32) -> Self {
Self {
payload_type,
ssrc,
sequence: 0,
}
}
pub fn packetize_into(&mut self, opus: &[u8], timestamp: u32, out: &mut Vec<Vec<u8>>) {
let mut recycle = std::mem::take(out);
let mut pkt = recycle.pop().unwrap_or_default();
pkt.clear();
write_rtp_header(
&mut pkt,
self.payload_type,
false,
self.sequence,
timestamp,
self.ssrc,
);
self.sequence = self.sequence.wrapping_add(1);
pkt.extend_from_slice(opus);
out.push(pkt);
}
}
#[derive(Debug, Clone)]
pub struct Vp9Packetizer {
payload_type: u8,
ssrc: u32,
sequence: u16,
max_payload: usize,
picture_id: u16,
}
impl Vp9Packetizer {
pub fn new(payload_type: u8, ssrc: u32, mtu: usize) -> Self {
Self {
payload_type,
ssrc,
sequence: 0,
max_payload: mtu.saturating_sub(12 + 3).max(1),
picture_id: 0,
}
}
pub fn packetize(&mut self, frame: &[u8], timestamp: u32, keyframe: bool) -> Vec<Vec<u8>> {
let mut out = Vec::new();
self.packetize_into(frame, timestamp, keyframe, &mut out);
out
}
pub fn packetize_into(
&mut self,
frame: &[u8],
timestamp: u32,
keyframe: bool,
out: &mut Vec<Vec<u8>>,
) {
let pid = self.picture_id & 0x7FFF;
self.picture_id = self.picture_id.wrapping_add(1);
let mut recycle = std::mem::take(out);
let chunks: Vec<&[u8]> = if frame.is_empty() {
vec![&[]]
} else {
frame.chunks(self.max_payload).collect()
};
let n = chunks.len();
for (i, chunk) in chunks.into_iter().enumerate() {
let begin = i == 0;
let end = i + 1 == n;
let mut pkt = recycle.pop().unwrap_or_default();
pkt.clear();
write_rtp_header(
&mut pkt,
self.payload_type,
end,
self.sequence,
timestamp,
self.ssrc,
);
self.sequence = self.sequence.wrapping_add(1);
let mut desc0 = 0x80; if !keyframe {
desc0 |= 0x40; }
if begin {
desc0 |= 0x08; }
if end {
desc0 |= 0x04; }
pkt.push(desc0);
pkt.push(0x80 | (pid >> 8) as u8);
pkt.push((pid & 0xFF) as u8);
pkt.extend_from_slice(chunk);
out.push(pkt);
}
}
}
#[derive(Debug, Default)]
pub struct Vp9Depacketizer {
frame: Vec<u8>,
in_frame: bool,
keyframe: bool,
current_ts: Option<u32>,
}
impl Vp9Depacketizer {
pub fn new() -> Self {
Self::default()
}
pub fn push(
&mut self,
payload: &[u8],
marker: bool,
timestamp: u32,
) -> Result<Option<AccessUnit>, DepacketizeError> {
if payload.is_empty() {
return Err(DepacketizeError::Truncated);
}
let desc0 = payload[0];
let has_pid = desc0 & 0x80 != 0;
let has_layer = desc0 & 0x20 != 0;
let flexible = desc0 & 0x10 != 0;
let begin = desc0 & 0x08 != 0;
let end = desc0 & 0x04 != 0;
let predicted = desc0 & 0x40 != 0;
let mut off = 1;
if has_pid {
let m = payload.get(off).ok_or(DepacketizeError::Truncated)? & 0x80 != 0;
off += if m { 2 } else { 1 };
}
if has_layer {
off += 1; if !flexible {
off += 1; }
}
if off > payload.len() {
return Err(DepacketizeError::Truncated);
}
if begin {
self.frame.clear();
self.in_frame = true;
self.keyframe = !predicted;
self.current_ts = Some(timestamp);
} else if !self.in_frame {
return Err(DepacketizeError::OutOfOrder);
}
self.frame.extend_from_slice(&payload[off..]);
if end && marker && self.in_frame {
self.in_frame = false;
return Ok(Some(AccessUnit {
data: Bytes::from(std::mem::take(&mut self.frame)),
timestamp: self.current_ts.unwrap_or(timestamp),
keyframe: self.keyframe,
}));
}
Ok(None)
}
}
#[cfg(feature = "codec-av1")]
fn leb128_encode(mut v: u64, out: &mut Vec<u8>) {
loop {
let mut byte = (v & 0x7F) as u8;
v >>= 7;
if v != 0 {
byte |= 0x80;
}
out.push(byte);
if v == 0 {
break;
}
}
}
#[cfg(feature = "codec-av1")]
const AV1_OBU_SEQUENCE_HEADER: u8 = 1;
#[cfg(feature = "codec-av1")]
const AV1_OBU_TEMPORAL_DELIMITER: u8 = 2;
#[cfg(feature = "codec-av1")]
#[derive(Debug, Clone)]
pub struct Av1Packetizer {
payload_type: u8,
ssrc: u32,
sequence: u16,
max_payload: usize,
}
#[cfg(feature = "codec-av1")]
impl Av1Packetizer {
pub fn new(payload_type: u8, ssrc: u32, mtu: usize) -> Self {
Self {
payload_type,
ssrc,
sequence: 0,
max_payload: mtu.saturating_sub(12 + 1).max(1),
}
}
pub fn packetize(&mut self, temporal_unit: &[u8], timestamp: u32) -> Vec<Vec<u8>> {
let mut out = Vec::new();
self.packetize_into(temporal_unit, timestamp, &mut out);
out
}
pub fn packetize_into(&mut self, temporal_unit: &[u8], timestamp: u32, out: &mut Vec<Vec<u8>>) {
let mut stream = Vec::with_capacity(temporal_unit.len());
let mut new_cvs = false;
for obu in crate::codec::obu::iter_obus(temporal_unit) {
if obu.obu_type == AV1_OBU_TEMPORAL_DELIMITER {
continue;
}
if obu.obu_type == AV1_OBU_SEQUENCE_HEADER {
new_cvs = true;
}
let header_len = 1 + obu.has_extension as usize;
let mut element = Vec::with_capacity(header_len + obu.payload.len());
element.push(obu.raw[0] & !0x02); if obu.has_extension {
element.push(obu.raw[1]);
}
element.extend_from_slice(obu.payload);
leb128_encode(element.len() as u64, &mut stream);
stream.extend_from_slice(&element);
}
let mut recycle = std::mem::take(out);
let chunks: Vec<&[u8]> = if stream.is_empty() {
vec![&[]]
} else {
stream.chunks(self.max_payload).collect()
};
let n = chunks.len();
for (i, chunk) in chunks.into_iter().enumerate() {
let last = i + 1 == n;
let mut pkt = recycle.pop().unwrap_or_default();
pkt.clear();
write_rtp_header(
&mut pkt,
self.payload_type,
last,
self.sequence,
timestamp,
self.ssrc,
);
self.sequence = self.sequence.wrapping_add(1);
let mut agg = 0u8;
if i > 0 {
agg |= 0x80; }
if !last {
agg |= 0x40; }
if i == 0 && new_cvs {
agg |= 0x08; }
pkt.push(agg);
pkt.extend_from_slice(chunk);
out.push(pkt);
}
}
}
#[cfg(feature = "codec-av1")]
#[derive(Debug, Default)]
pub struct Av1Depacketizer {
stream: Vec<u8>,
new_cvs: bool,
current_ts: Option<u32>,
}
#[cfg(feature = "codec-av1")]
impl Av1Depacketizer {
pub fn new() -> Self {
Self::default()
}
pub fn push(
&mut self,
payload: &[u8],
marker: bool,
timestamp: u32,
) -> Result<Option<AccessUnit>, DepacketizeError> {
if payload.is_empty() {
return Err(DepacketizeError::Truncated);
}
let agg = payload[0];
if agg & 0x08 != 0 {
self.new_cvs = true; }
if self.current_ts.is_none() {
self.current_ts = Some(timestamp);
}
self.stream.extend_from_slice(&payload[1..]);
if !marker {
return Ok(None);
}
let stream = std::mem::take(&mut self.stream);
let mut tu = Vec::with_capacity(stream.len() + 8);
let mut pos = 0;
while pos < stream.len() {
let len = leb128_decode(&stream, &mut pos).ok_or(DepacketizeError::Truncated)?;
let end = pos.checked_add(len).ok_or(DepacketizeError::Truncated)?;
let element = stream.get(pos..end).ok_or(DepacketizeError::Truncated)?;
pos = end;
let hdr0 = *element.first().ok_or(DepacketizeError::Truncated)?;
let has_ext = (hdr0 >> 2) & 1 == 1;
let header_len = 1 + has_ext as usize;
let obu_payload = element
.get(header_len..)
.ok_or(DepacketizeError::Truncated)?;
tu.push(hdr0 | 0x02);
if has_ext {
tu.push(element[1]);
}
leb128_encode(obu_payload.len() as u64, &mut tu);
tu.extend_from_slice(obu_payload);
}
let keyframe = std::mem::take(&mut self.new_cvs);
let ts = self.current_ts.take().unwrap_or(timestamp);
Ok(Some(AccessUnit {
data: Bytes::from(tu),
timestamp: ts,
keyframe,
}))
}
}
#[cfg(feature = "codec-av1")]
fn leb128_decode(data: &[u8], pos: &mut usize) -> Option<usize> {
let mut value: u64 = 0;
for i in 0..8 {
let byte = *data.get(*pos)?;
*pos += 1;
value |= ((byte & 0x7F) as u64) << (i * 7);
if byte & 0x80 == 0 {
return usize::try_from(value).ok();
}
}
None
}
#[cfg(test)]
mod tests {
use super::*;
fn rtp(seq: u16, ts: u32, marker: bool, payload: &[u8]) -> Vec<u8> {
let mut p = vec![0x80, if marker { 0x80 | 96 } else { 96 }];
p.extend_from_slice(&seq.to_be_bytes());
p.extend_from_slice(&ts.to_be_bytes());
p.extend_from_slice(&[0, 0, 0, 1]); p.extend_from_slice(payload);
p
}
#[test]
fn parses_fixed_header_and_payload_offset() {
let pkt = rtp(7, 9000, true, &[0x65, 0xAA]);
let h = RtpHeader::parse(&pkt).unwrap();
assert_eq!(h.sequence, 7);
assert_eq!(h.timestamp, 9000);
assert!(h.marker);
assert_eq!(h.payload_type, 96);
assert_eq!(h.payload_offset, 12);
assert_eq!(&pkt[h.payload_offset..], &[0x65, 0xAA]);
}
#[test]
fn rejects_wrong_version_and_short_buffers() {
assert!(RtpHeader::parse(&[0x00; 12]).is_none()); assert!(RtpHeader::parse(&[0x80; 4]).is_none()); }
#[test]
fn honors_csrc_count_in_payload_offset() {
let mut pkt = rtp(1, 0, false, &[0x41]);
pkt[0] = 0x82; let mut with_csrc = pkt[..12].to_vec();
with_csrc.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF, 0, 0, 0, 0]); with_csrc.push(0x41);
let h = RtpHeader::parse(&with_csrc).unwrap();
assert_eq!(h.payload_offset, 20);
}
#[test]
fn aac_hbr_splits_two_access_units() {
let mut p = Vec::new();
p.extend_from_slice(&32u16.to_be_bytes()); p.extend_from_slice(&((3u16) << 3).to_be_bytes()); p.extend_from_slice(&((2u16) << 3).to_be_bytes()); p.extend_from_slice(&[0xA1, 0xA2, 0xA3]); p.extend_from_slice(&[0xB1, 0xB2]); let aus = AacDepacketizer::new().push(&p).unwrap();
assert_eq!(aus.len(), 2);
assert_eq!(&aus[0][..], &[0xA1, 0xA2, 0xA3]);
assert_eq!(&aus[1][..], &[0xB1, 0xB2]);
}
#[test]
fn aac_hbr_single_au() {
let mut p = Vec::new();
p.extend_from_slice(&16u16.to_be_bytes()); p.extend_from_slice(&((4u16) << 3).to_be_bytes()); p.extend_from_slice(&[1, 2, 3, 4]);
let aus = AacDepacketizer::new().push(&p).unwrap();
assert_eq!(aus.len(), 1);
assert_eq!(&aus[0][..], &[1, 2, 3, 4]);
}
#[test]
fn aac_truncated_payload_errors() {
assert_eq!(
AacDepacketizer::new().push(&[0x00]),
Err(DepacketizeError::Truncated)
);
let mut p = 16u16.to_be_bytes().to_vec();
p.extend_from_slice(&((8u16) << 3).to_be_bytes());
p.extend_from_slice(&[1, 2]);
assert_eq!(
AacDepacketizer::new().push(&p),
Err(DepacketizeError::Truncated)
);
}
#[test]
fn single_nal_packet_emits_annexb_on_marker() {
let mut d = H264Depacketizer::new();
let out = d.push(&[0x41, 0x9A, 0xBC], true, 3000, 1).unwrap().unwrap();
assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x41, 0x9A, 0xBC]);
assert!(!out.keyframe);
assert_eq!(out.timestamp, 3000);
}
#[test]
fn idr_single_nal_is_flagged_keyframe() {
let mut d = H264Depacketizer::new();
let out = d.push(&[0x65, 0x01], true, 0, 1).unwrap().unwrap();
assert!(out.keyframe);
}
#[test]
fn packetizer_single_nal_round_trips_through_depacketizer() {
let au = [0, 0, 0, 1, 0x67, 0x42, 0x00, 0, 0, 0, 1, 0x65, 0x88, 0x99];
let mut pkt = RtpPacketizer::new(96, 0xABCD, 1200);
let packets = pkt.packetize(&au, 3000);
assert_eq!(packets.len(), 2, "one packet per NAL");
let mut depack = H264Depacketizer::new();
let mut out = None;
for p in &packets {
let h = RtpHeader::parse(p).unwrap();
if let Some(au) = depack
.push(&p[h.payload_offset..], h.marker, h.timestamp, h.sequence)
.unwrap()
{
out = Some(au);
}
}
let out = out.expect("AU completed on the marker packet");
assert_eq!(&out.data[..], &au);
assert!(out.keyframe);
assert_eq!(out.timestamp, 3000);
}
#[test]
fn packetize_into_recycles_buffers_without_changing_output() {
let au1 = [0, 0, 0, 1, 0x67, 0x42, 0x00, 0, 0, 0, 1, 0x65, 0x88, 0x99];
let au2 = [0, 0, 0, 1, 0x65, 0x11, 0x22, 0x33];
let mut a = RtpPacketizer::new(96, 0xABCD, 1200);
let mut b = RtpPacketizer::new(96, 0xABCD, 1200);
let mut reused: Vec<Vec<u8>> = Vec::new();
for au in [&au1[..], &au2[..], &au1[..]] {
let expected = a.packetize(au, 3000);
b.packetize_into(au, 3000, &mut reused);
assert_eq!(
reused, expected,
"recycled output matches allocating output"
);
}
}
#[test]
fn packetizer_fragments_oversized_nal_and_round_trips() {
let mut nal = vec![0, 0, 0, 1, 0x65]; nal.extend((0..600u16).map(|i| i as u8)); let mut pkt = RtpPacketizer::new(96, 1, 100); let packets = pkt.packetize(&nal, 90);
assert!(packets.len() > 1, "oversized NAL is fragmented");
let markers: Vec<bool> = packets
.iter()
.map(|p| RtpHeader::parse(p).unwrap().marker)
.collect();
assert_eq!(markers.iter().filter(|m| **m).count(), 1);
assert!(markers.last().unwrap());
let mut depack = H264Depacketizer::new();
let mut out = None;
for p in &packets {
let h = RtpHeader::parse(p).unwrap();
if let Some(au) = depack
.push(&p[h.payload_offset..], h.marker, h.timestamp, h.sequence)
.unwrap()
{
out = Some(au);
}
}
assert_eq!(&out.unwrap().data[..], &nal[..]);
}
#[test]
fn stap_a_splits_aggregated_nals() {
let payload = [24, 0, 2, 0xAA, 0xBB, 0, 3, 0xCC, 0xDD, 0xEE];
let mut d = H264Depacketizer::new();
let out = d.push(&payload, true, 0, 1).unwrap().unwrap();
assert_eq!(
&out.data[..],
&[0, 0, 0, 1, 0xAA, 0xBB, 0, 0, 0, 1, 0xCC, 0xDD, 0xEE]
);
}
#[test]
fn fu_a_reassembles_fragmented_nal() {
let mut d = H264Depacketizer::new();
assert!(d
.push(&[0x7C, 0x85, 0x11, 0x22], false, 0, 1)
.unwrap()
.is_none());
assert!(d.push(&[0x7C, 0x05, 0x33], false, 0, 2).unwrap().is_none());
let out = d.push(&[0x7C, 0x45, 0x44], true, 0, 3).unwrap().unwrap();
assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x65, 0x11, 0x22, 0x33, 0x44]);
assert!(out.keyframe);
}
#[test]
fn fu_a_sequence_gap_reports_out_of_order() {
let mut d = H264Depacketizer::new();
d.push(&[0x7C, 0x85, 0x11], false, 0, 1).unwrap();
assert_eq!(
d.push(&[0x7C, 0x05, 0x22], false, 0, 5),
Err(DepacketizeError::OutOfOrder)
);
}
#[test]
fn timestamp_change_flushes_previous_au_without_marker() {
let mut d = H264Depacketizer::new();
assert!(d.push(&[0x41, 0x01], false, 1000, 1).unwrap().is_none());
let out = d.push(&[0x41, 0x02], false, 2000, 2).unwrap().unwrap();
assert_eq!(out.timestamp, 1000);
assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x41, 0x01]);
}
#[test]
fn h265_single_nal_round_trips_through_depacketizer() {
let au = [
0, 0, 0, 1, 0x40, 0x01, 0xAA, 0, 0, 0, 1, 0x26, 0x01, 0x88, 0x99, ];
let mut pkt = RtpPacketizer::new_h265(96, 0xABCD, 1200);
let packets = pkt.packetize(&au, 3000);
assert_eq!(packets.len(), 2, "one packet per NAL");
let mut depack = H265Depacketizer::new();
let mut out = None;
for p in &packets {
let h = RtpHeader::parse(p).unwrap();
if let Some(au) = depack
.push(&p[h.payload_offset..], h.marker, h.timestamp, h.sequence)
.unwrap()
{
out = Some(au);
}
}
let out = out.expect("AU completed on the marker packet");
assert_eq!(&out.data[..], &au);
assert!(out.keyframe, "IRAP type 19 is a keyframe");
assert_eq!(out.timestamp, 3000);
}
#[test]
fn h265_fragments_oversized_nal_and_round_trips() {
let mut nal = vec![0, 0, 0, 1, 0x26, 0x01]; nal.extend((0..600u16).map(|i| i as u8));
let mut pkt = RtpPacketizer::new_h265(96, 1, 100); let packets = pkt.packetize(&nal, 90);
assert!(packets.len() > 1, "oversized NAL is fragmented");
let markers: Vec<bool> = packets
.iter()
.map(|p| RtpHeader::parse(p).unwrap().marker)
.collect();
assert_eq!(markers.iter().filter(|m| **m).count(), 1);
assert!(markers.last().unwrap());
for p in &packets {
let h = RtpHeader::parse(p).unwrap();
let pt = (p[h.payload_offset] >> 1) & 0x3F;
assert_eq!(pt, 49, "FU payload type");
}
let mut depack = H265Depacketizer::new();
let mut out = None;
for p in &packets {
let h = RtpHeader::parse(p).unwrap();
if let Some(au) = depack
.push(&p[h.payload_offset..], h.marker, h.timestamp, h.sequence)
.unwrap()
{
out = Some(au);
}
}
assert_eq!(&out.unwrap().data[..], &nal[..]);
}
#[test]
fn h265_ap_splits_aggregated_nals() {
let payload = [0x60, 0x01, 0, 2, 0xAA, 0xBB, 0, 3, 0xCC, 0xDD, 0xEE];
let mut d = H265Depacketizer::new();
let out = d.push(&payload, true, 0, 1).unwrap().unwrap();
assert_eq!(
&out.data[..],
&[0, 0, 0, 1, 0xAA, 0xBB, 0, 0, 0, 1, 0xCC, 0xDD, 0xEE]
);
}
#[test]
fn h265_rejects_truncated_and_unsupported() {
let mut d = H265Depacketizer::new();
assert_eq!(
d.push(&[0x26], true, 0, 1),
Err(DepacketizeError::Truncated)
);
assert_eq!(
d.push(&[50 << 1, 0x01, 0x00], true, 0, 2),
Err(DepacketizeError::Unsupported(50))
);
}
fn vp9_depacketize(packets: &[Vec<u8>]) -> Option<AccessUnit> {
let mut d = Vp9Depacketizer::new();
let mut out = None;
for p in packets {
let h = RtpHeader::parse(p).unwrap();
if let Some(f) = d
.push(&p[h.payload_offset..], h.marker, h.timestamp)
.unwrap()
{
out = Some(f);
}
}
out
}
#[test]
fn vp9_fragmented_frame_round_trips() {
let frame: Vec<u8> = (0..500u16).map(|i| i as u8).collect();
let mut pkt = Vp9Packetizer::new(98, 0x1234, 100); let packets = pkt.packetize(&frame, 9000, true);
assert!(packets.len() > 1, "frame fragmented");
let markers: Vec<bool> = packets
.iter()
.map(|p| RtpHeader::parse(p).unwrap().marker)
.collect();
assert_eq!(markers.iter().filter(|m| **m).count(), 1);
assert!(markers.last().unwrap());
let out = vp9_depacketize(&packets).expect("frame completed");
assert_eq!(&out.data[..], &frame[..]);
assert!(out.keyframe, "keyframe → P bit clear");
assert_eq!(out.timestamp, 9000);
}
#[test]
fn vp9_inter_frame_is_not_a_keyframe() {
let mut pkt = Vp9Packetizer::new(98, 1, 1200);
let packets = pkt.packetize(&[1, 2, 3], 0, false);
assert_eq!(packets.len(), 1);
let out = vp9_depacketize(&packets).expect("frame");
assert_eq!(&out.data[..], &[1, 2, 3]);
assert!(!out.keyframe, "P bit set → inter frame");
}
#[cfg(feature = "codec-av1")]
fn av1_depacketize(packets: &[Vec<u8>]) -> Option<AccessUnit> {
let mut d = Av1Depacketizer::new();
let mut out = None;
for p in packets {
let h = RtpHeader::parse(p).unwrap();
if let Some(f) = d
.push(&p[h.payload_offset..], h.marker, h.timestamp)
.unwrap()
{
out = Some(f);
}
}
out
}
#[cfg(feature = "codec-av1")]
#[test]
fn av1_temporal_unit_round_trips_without_delimiter() {
let td = [0x12u8, 0x00];
let seq = [0x0Au8, 0x02, 0xAA, 0xBB];
let frame = [0x32u8, 0x03, 0x11, 0x22, 0x33];
let mut tu = Vec::new();
tu.extend_from_slice(&td);
tu.extend_from_slice(&seq);
tu.extend_from_slice(&frame);
let mut pkt = Av1Packetizer::new(99, 7, 1200);
let packets = pkt.packetize(&tu, 1000);
let out = av1_depacketize(&packets).expect("TU completed");
let mut expected = Vec::new();
expected.extend_from_slice(&seq);
expected.extend_from_slice(&frame);
assert_eq!(&out.data[..], &expected[..]);
assert!(out.keyframe, "sequence header → new coded video sequence");
assert_eq!(out.timestamp, 1000);
}
#[cfg(feature = "codec-av1")]
#[test]
fn av1_large_temporal_unit_fragments_and_round_trips() {
let mut frame = vec![0x32u8, 0xAC, 0x02];
frame.extend((0..300u16).map(|i| i as u8));
let mut tu = vec![0x12u8, 0x00]; tu.extend_from_slice(&frame);
let mut pkt = Av1Packetizer::new(99, 1, 64); let packets = pkt.packetize(&tu, 0);
assert!(packets.len() > 1, "large TU fragmented");
for (i, p) in packets.iter().enumerate() {
let agg = p[RtpHeader::parse(p).unwrap().payload_offset];
assert_eq!((agg & 0x80 != 0), i > 0, "Z continuation bit");
assert_eq!(
(agg & 0x40 != 0),
i + 1 < packets.len(),
"Y continuation bit"
);
}
let out = av1_depacketize(&packets).expect("TU completed");
assert_eq!(&out.data[..], &frame[..], "frame OBU reconstructed");
}
}