use bytes::Bytes;
const ANNEXB_START: [u8; 4] = [0, 0, 0, 1];
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RtpHeader {
pub payload_type: u8,
pub marker: bool,
pub sequence: u16,
pub timestamp: u32,
pub ssrc: u32,
pub payload_offset: usize,
}
impl RtpHeader {
pub fn parse(buf: &[u8]) -> Option<RtpHeader> {
if buf.len() < 12 {
return None;
}
let version = buf[0] >> 6;
if version != 2 {
return None;
}
let has_extension = buf[0] & 0x10 != 0;
let csrc_count = (buf[0] & 0x0F) as usize;
let marker = buf[1] & 0x80 != 0;
let payload_type = buf[1] & 0x7F;
let sequence = u16::from_be_bytes([buf[2], buf[3]]);
let timestamp = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
let ssrc = u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]]);
let mut offset = 12 + csrc_count * 4;
if has_extension {
if buf.len() < offset + 4 {
return None;
}
let ext_words = u16::from_be_bytes([buf[offset + 2], buf[offset + 3]]) as usize;
offset += 4 + ext_words * 4;
}
if buf.len() < offset {
return None;
}
Some(RtpHeader {
payload_type,
marker,
sequence,
timestamp,
ssrc,
payload_offset: offset,
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum DepacketizeError {
Truncated,
OutOfOrder,
Unsupported(u8),
}
#[derive(Debug, Default)]
pub struct H264Depacketizer {
au: Vec<u8>,
fua: Vec<u8>,
in_fragment: bool,
fua_header: u8,
current_ts: Option<u32>,
last_seq: Option<u16>,
}
impl H264Depacketizer {
pub fn new() -> Self {
Self::default()
}
fn append_nal(&mut self, nal: &[u8]) {
self.au.extend_from_slice(&ANNEXB_START);
self.au.extend_from_slice(nal);
}
fn pending_is_keyframe(&self) -> bool {
let mut i = 0;
while i + 4 < self.au.len() {
if self.au[i..i + 4] == ANNEXB_START {
let nal_type = self.au[i + 4] & 0x1F;
if nal_type == 5 {
return true;
}
}
i += 1;
}
false
}
fn take_au(&mut self) -> Option<AccessUnit> {
if self.au.is_empty() {
return None;
}
let keyframe = self.pending_is_keyframe();
let timestamp = self.current_ts.unwrap_or(0);
let data = Bytes::from(std::mem::take(&mut self.au));
self.current_ts = None;
Some(AccessUnit {
data,
timestamp,
keyframe,
})
}
pub fn push(
&mut self,
payload: &[u8],
marker: bool,
timestamp: u32,
sequence: u16,
) -> Result<Option<AccessUnit>, DepacketizeError> {
if payload.is_empty() {
return Err(DepacketizeError::Truncated);
}
let mut completed = None;
if let Some(ts) = self.current_ts {
if ts != timestamp && !self.in_fragment {
completed = self.take_au();
}
}
self.current_ts = Some(timestamp);
let nal_type = payload[0] & 0x1F;
match nal_type {
1..=23 => {
self.append_nal(payload);
}
24 => {
let mut i = 1;
while i + 2 <= payload.len() {
let size = u16::from_be_bytes([payload[i], payload[i + 1]]) as usize;
i += 2;
if i + size > payload.len() {
return Err(DepacketizeError::Truncated);
}
self.append_nal(&payload[i..i + size]);
i += size;
}
}
28 => {
if payload.len() < 2 {
return Err(DepacketizeError::Truncated);
}
let fu_header = payload[1];
let start = fu_header & 0x80 != 0;
let end = fu_header & 0x40 != 0;
let frag_type = fu_header & 0x1F;
if start {
self.fua_header = (payload[0] & 0xE0) | frag_type;
self.fua.clear();
self.fua.push(self.fua_header);
self.in_fragment = true;
} else if !self.in_fragment {
return Err(DepacketizeError::OutOfOrder);
} else if self.seq_gap(sequence) {
self.in_fragment = false;
self.fua.clear();
return Err(DepacketizeError::OutOfOrder);
}
self.fua.extend_from_slice(&payload[2..]);
if end && self.in_fragment {
let nal = std::mem::take(&mut self.fua);
self.append_nal(&nal);
self.in_fragment = false;
}
}
other => return Err(DepacketizeError::Unsupported(other)),
}
self.last_seq = Some(sequence);
if completed.is_some() {
return Ok(completed);
}
if marker {
return Ok(self.take_au());
}
Ok(None)
}
fn seq_gap(&self, sequence: u16) -> bool {
match self.last_seq {
Some(prev) => sequence.wrapping_sub(prev) != 1,
None => false,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AccessUnit {
pub data: Bytes,
pub timestamp: u32,
pub keyframe: bool,
}
#[cfg(test)]
mod tests {
use super::*;
fn rtp(seq: u16, ts: u32, marker: bool, payload: &[u8]) -> Vec<u8> {
let mut p = vec![0x80, if marker { 0x80 | 96 } else { 96 }];
p.extend_from_slice(&seq.to_be_bytes());
p.extend_from_slice(&ts.to_be_bytes());
p.extend_from_slice(&[0, 0, 0, 1]); p.extend_from_slice(payload);
p
}
#[test]
fn parses_fixed_header_and_payload_offset() {
let pkt = rtp(7, 9000, true, &[0x65, 0xAA]);
let h = RtpHeader::parse(&pkt).unwrap();
assert_eq!(h.sequence, 7);
assert_eq!(h.timestamp, 9000);
assert!(h.marker);
assert_eq!(h.payload_type, 96);
assert_eq!(h.payload_offset, 12);
assert_eq!(&pkt[h.payload_offset..], &[0x65, 0xAA]);
}
#[test]
fn rejects_wrong_version_and_short_buffers() {
assert!(RtpHeader::parse(&[0x00; 12]).is_none()); assert!(RtpHeader::parse(&[0x80; 4]).is_none()); }
#[test]
fn honors_csrc_count_in_payload_offset() {
let mut pkt = rtp(1, 0, false, &[0x41]);
pkt[0] = 0x82; let mut with_csrc = pkt[..12].to_vec();
with_csrc.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF, 0, 0, 0, 0]); with_csrc.push(0x41);
let h = RtpHeader::parse(&with_csrc).unwrap();
assert_eq!(h.payload_offset, 20);
}
#[test]
fn single_nal_packet_emits_annexb_on_marker() {
let mut d = H264Depacketizer::new();
let out = d.push(&[0x41, 0x9A, 0xBC], true, 3000, 1).unwrap().unwrap();
assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x41, 0x9A, 0xBC]);
assert!(!out.keyframe);
assert_eq!(out.timestamp, 3000);
}
#[test]
fn idr_single_nal_is_flagged_keyframe() {
let mut d = H264Depacketizer::new();
let out = d.push(&[0x65, 0x01], true, 0, 1).unwrap().unwrap();
assert!(out.keyframe);
}
#[test]
fn stap_a_splits_aggregated_nals() {
let payload = [24, 0, 2, 0xAA, 0xBB, 0, 3, 0xCC, 0xDD, 0xEE];
let mut d = H264Depacketizer::new();
let out = d.push(&payload, true, 0, 1).unwrap().unwrap();
assert_eq!(
&out.data[..],
&[0, 0, 0, 1, 0xAA, 0xBB, 0, 0, 0, 1, 0xCC, 0xDD, 0xEE]
);
}
#[test]
fn fu_a_reassembles_fragmented_nal() {
let mut d = H264Depacketizer::new();
assert!(d
.push(&[0x7C, 0x85, 0x11, 0x22], false, 0, 1)
.unwrap()
.is_none());
assert!(d.push(&[0x7C, 0x05, 0x33], false, 0, 2).unwrap().is_none());
let out = d.push(&[0x7C, 0x45, 0x44], true, 0, 3).unwrap().unwrap();
assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x65, 0x11, 0x22, 0x33, 0x44]);
assert!(out.keyframe);
}
#[test]
fn fu_a_sequence_gap_reports_out_of_order() {
let mut d = H264Depacketizer::new();
d.push(&[0x7C, 0x85, 0x11], false, 0, 1).unwrap();
assert_eq!(
d.push(&[0x7C, 0x05, 0x22], false, 0, 5),
Err(DepacketizeError::OutOfOrder)
);
}
#[test]
fn timestamp_change_flushes_previous_au_without_marker() {
let mut d = H264Depacketizer::new();
assert!(d.push(&[0x41, 0x01], false, 1000, 1).unwrap().is_none());
let out = d.push(&[0x41, 0x02], false, 2000, 2).unwrap().unwrap();
assert_eq!(out.timestamp, 1000);
assert_eq!(&out.data[..], &[0, 0, 0, 1, 0x41, 0x01]);
}
}