use std::convert::TryFrom;
use std::fmt::Write;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use h264_reader::nal::{NalHeader, UnitType};
use log::{debug, log_enabled, trace};
use crate::{
rtp::{ReceivedPacket, ReceivedPacketBuilder},
Error, Timestamp,
};
use super::VideoFrame;
#[derive(Debug)]
pub(crate) struct Depacketizer {
input_state: DepacketizerInputState,
pending: Option<VideoFrame>,
parameters: Option<InternalParameters>,
pieces: Vec<Bytes>,
nals: Vec<Nal>,
}
#[derive(Debug)]
struct Nal {
hdr: h264_reader::nal::NalHeader,
next_piece_idx: u32,
len: u32,
}
#[derive(Debug)]
struct AccessUnit {
start_ctx: crate::PacketContext,
end_ctx: crate::PacketContext,
timestamp: crate::Timestamp,
stream_id: usize,
in_fu_a: bool,
loss: u16,
same_ts_as_prev: bool,
}
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
enum DepacketizerInputState {
New,
Loss {
timestamp: crate::Timestamp,
pkts: u16,
},
PreMark(AccessUnit),
PostMark {
timestamp: crate::Timestamp,
loss: u16,
},
}
impl Depacketizer {
pub(super) fn new(
clock_rate: u32,
format_specific_params: Option<&str>,
) -> Result<Self, String> {
if clock_rate != 90_000 {
return Err(format!(
"invalid H.264 clock rate {}; must always be 90000",
clock_rate
));
}
let parameters = match format_specific_params {
None => None,
Some(fp) => match InternalParameters::parse_format_specific_params(fp) {
Ok(p) => Some(p),
Err(e) => {
log::warn!("Ignoring bad H.264 format-specific-params {:?}: {}", fp, e);
None
}
},
};
Ok(Depacketizer {
input_state: DepacketizerInputState::New,
pending: None,
pieces: Vec::new(),
nals: Vec::new(),
parameters,
})
}
pub(super) fn parameters(&self) -> Option<super::ParametersRef> {
self.parameters
.as_ref()
.map(|p| super::ParametersRef::Video(&p.generic_parameters))
}
pub(super) fn push(&mut self, pkt: ReceivedPacket) -> Result<(), String> {
if let Some(p) = self.pending.as_ref() {
panic!("push with data already pending: {:?}", p);
}
let mut access_unit =
match std::mem::replace(&mut self.input_state, DepacketizerInputState::New) {
DepacketizerInputState::New => {
debug_assert!(self.nals.is_empty());
debug_assert!(self.pieces.is_empty());
AccessUnit::start(&pkt, 0, false)
}
DepacketizerInputState::PreMark(mut access_unit) => {
let loss = pkt.loss();
if loss > 0 {
self.nals.clear();
self.pieces.clear();
if access_unit.timestamp.timestamp == pkt.timestamp().timestamp {
self.input_state = if pkt.mark() {
DepacketizerInputState::PostMark {
timestamp: pkt.timestamp(),
loss,
}
} else {
self.pieces.clear();
self.nals.clear();
DepacketizerInputState::Loss {
timestamp: pkt.timestamp(),
pkts: loss,
}
};
return Ok(());
}
AccessUnit::start(&pkt, 0, false)
} else if access_unit.timestamp.timestamp != pkt.timestamp().timestamp {
if access_unit.in_fu_a {
return Err(format!(
"Timestamp changed from {} to {} in the middle of a fragmented NAL",
access_unit.timestamp,
pkt.timestamp()
));
}
let last_nal_hdr = self.nals.last().unwrap().hdr;
if can_end_au(last_nal_hdr.nal_unit_type()) {
access_unit.end_ctx = *pkt.ctx();
self.pending =
Some(self.finalize_access_unit(access_unit, "ts change")?);
AccessUnit::start(&pkt, 0, false)
} else {
log::debug!(
"Bogus mid-access unit timestamp change after {:?}",
last_nal_hdr
);
access_unit.timestamp.timestamp = pkt.timestamp().timestamp;
access_unit
}
} else {
access_unit
}
}
DepacketizerInputState::PostMark {
timestamp: state_ts,
loss,
} => {
debug_assert!(self.nals.is_empty());
debug_assert!(self.pieces.is_empty());
AccessUnit::start(&pkt, loss, state_ts.timestamp == pkt.timestamp().timestamp)
}
DepacketizerInputState::Loss {
timestamp,
mut pkts,
} => {
debug_assert!(self.nals.is_empty());
debug_assert!(self.pieces.is_empty());
if pkt.timestamp().timestamp == timestamp.timestamp {
pkts += pkt.loss();
self.input_state = DepacketizerInputState::Loss { timestamp, pkts };
return Ok(());
}
AccessUnit::start(&pkt, pkts, false)
}
};
let ctx = *pkt.ctx();
let mark = pkt.mark();
let loss = pkt.loss();
let timestamp = pkt.timestamp();
let mut data = pkt.into_payload_bytes();
if data.is_empty() {
return Err("Empty NAL".into());
}
let nal_header = data[0];
if (nal_header >> 7) != 0 {
return Err(format!("NAL header {:02x} has F bit set", nal_header));
}
data.advance(1); match nal_header & 0b11111 {
1..=23 => {
if access_unit.in_fu_a {
return Err(format!(
"Non-fragmented NAL {:02x} while fragment in progress",
nal_header
));
}
let len = u32::try_from(data.len()).expect("data len < u16::MAX") + 1;
let next_piece_idx = self.add_piece(data)?;
self.nals.push(Nal {
hdr: NalHeader::new(nal_header).expect("header w/o F bit set is valid"),
next_piece_idx,
len,
});
}
24 => {
loop {
if data.remaining() < 3 {
return Err(format!(
"STAP-A has {} remaining bytes; expecting 2-byte length, non-empty NAL",
data.remaining()
));
}
let len = data.get_u16();
if len == 0 {
return Err("zero length in STAP-A".into());
}
let hdr = NalHeader::new(data[0])
.map_err(|_| format!("bad header {:02x} in STAP-A", data[0]))?;
match data.remaining().cmp(&usize::from(len)) {
std::cmp::Ordering::Less => {
return Err(format!(
"STAP-A too short: {} bytes remaining, expecting {}-byte NAL",
data.remaining(),
len
))
}
std::cmp::Ordering::Equal => {
data.advance(1);
let next_piece_idx = self.add_piece(data)?;
self.nals.push(Nal {
hdr,
next_piece_idx,
len: u32::from(len),
});
break;
}
std::cmp::Ordering::Greater => {
let mut piece = data.split_to(usize::from(len));
piece.advance(1);
let next_piece_idx = self.add_piece(piece)?;
self.nals.push(Nal {
hdr,
next_piece_idx,
len: u32::from(len),
});
}
}
}
}
25..=27 | 29 => {
return Err(format!(
"unimplemented/unexpected interleaved mode NAL ({:02x})",
nal_header,
))
}
28 => {
if data.len() < 2 {
return Err(format!("FU-A len {} too short", data.len()));
}
let fu_header = data[0];
let start = (fu_header & 0b10000000) != 0;
let end = (fu_header & 0b01000000) != 0;
let reserved = (fu_header & 0b00100000) != 0;
let nal_header =
NalHeader::new((nal_header & 0b011100000) | (fu_header & 0b00011111))
.expect("NalHeader is valid");
data.advance(1);
if (start && end) || reserved {
return Err(format!("Invalid FU-A header {:02x}", fu_header));
}
if !end && mark {
return Err("FU-A pkt with MARK && !END".into());
}
let u32_len = u32::try_from(data.len()).expect("RTP packet len must be < u16::MAX");
match (start, access_unit.in_fu_a) {
(true, true) => return Err("FU-A with start bit while frag in progress".into()),
(true, false) => {
self.add_piece(data)?;
self.nals.push(Nal {
hdr: nal_header,
next_piece_idx: u32::MAX, len: 1 + u32_len,
});
access_unit.in_fu_a = true;
}
(false, true) => {
let pieces = self.add_piece(data)?;
let nal = self.nals.last_mut().expect("nals non-empty while in fu-a");
if u8::from(nal_header) != u8::from(nal.hdr) {
return Err(format!(
"FU-A has inconsistent NAL type: {:?} then {:?}",
nal.hdr, nal_header,
));
}
nal.len += u32_len;
if end {
nal.next_piece_idx = pieces;
access_unit.in_fu_a = false;
} else if mark {
return Err("FU-A has MARK and no END".into());
}
}
(false, false) => {
if loss > 0 {
self.pieces.clear();
self.nals.clear();
self.input_state = DepacketizerInputState::Loss {
timestamp,
pkts: loss,
};
return Ok(());
}
return Err("FU-A has start bit unset while no frag in progress".into());
}
}
}
_ => return Err(format!("bad nal header {:02x}", nal_header)),
}
self.input_state = if mark {
let last_nal_hdr = self.nals.last().unwrap().hdr;
if can_end_au(last_nal_hdr.nal_unit_type()) {
access_unit.end_ctx = ctx;
self.pending = Some(self.finalize_access_unit(access_unit, "mark")?);
DepacketizerInputState::PostMark { timestamp, loss: 0 }
} else {
log::debug!(
"Bogus mid-access unit timestamp change after {:?}",
last_nal_hdr
);
access_unit.timestamp.timestamp = timestamp.timestamp;
DepacketizerInputState::PreMark(access_unit)
}
} else {
DepacketizerInputState::PreMark(access_unit)
};
Ok(())
}
pub(super) fn pull(&mut self) -> Option<super::CodecItem> {
self.pending.take().map(super::CodecItem::VideoFrame)
}
fn add_piece(&mut self, piece: Bytes) -> Result<u32, String> {
self.pieces.push(piece);
u32::try_from(self.pieces.len()).map_err(|_| "more than u32::MAX pieces!".to_string())
}
fn log_access_unit(&self, au: &AccessUnit, reason: &str) {
let mut errs = String::new();
if au.same_ts_as_prev {
errs.push_str("\n* same timestamp as previous access unit");
}
validate_order(&self.nals, &mut errs);
if !errs.is_empty() {
let mut nals = String::new();
for (i, nal) in self.nals.iter().enumerate() {
let _ = write!(&mut nals, "\n {}: {:?}", i, nal.hdr);
}
debug!(
"bad access unit (ended by {}) at ts {}\nerrors are:{}\nNALs are:{}",
reason, au.timestamp, errs, nals
);
} else if log_enabled!(log::Level::Trace) {
let mut nals = String::new();
for (i, nal) in self.nals.iter().enumerate() {
let _ = write!(&mut nals, "\n {}: {:?}", i, nal.hdr);
}
trace!(
"access unit (ended by {}) at ts {}; NALS are:{}",
reason,
au.timestamp,
nals
);
}
}
fn finalize_access_unit(&mut self, au: AccessUnit, reason: &str) -> Result<VideoFrame, String> {
let mut piece_idx = 0;
let mut retained_len = 0usize;
let mut is_random_access_point = false;
let mut is_disposable = true;
let mut new_sps = None;
let mut new_pps = None;
if log_enabled!(log::Level::Debug) {
self.log_access_unit(&au, reason);
}
for nal in &self.nals {
let next_piece_idx = usize::try_from(nal.next_piece_idx).expect("u32 fits in usize");
let nal_pieces = &self.pieces[piece_idx..next_piece_idx];
match nal.hdr.nal_unit_type() {
UnitType::SeqParameterSet => {
if self
.parameters
.as_ref()
.map(|p| !nal_matches(&p.sps_nal[..], nal.hdr, nal_pieces))
.unwrap_or(true)
{
new_sps = Some(to_bytes(nal.hdr, nal.len, nal_pieces));
}
}
UnitType::PicParameterSet => {
if self
.parameters
.as_ref()
.map(|p| !nal_matches(&p.pps_nal[..], nal.hdr, nal_pieces))
.unwrap_or(true)
{
new_pps = Some(to_bytes(nal.hdr, nal.len, nal_pieces));
}
}
UnitType::SliceLayerWithoutPartitioningIdr => is_random_access_point = true,
_ => {}
}
if nal.hdr.nal_ref_idc() != 0 {
is_disposable = false;
}
retained_len += 4usize + usize::try_from(nal.len).expect("u32 fits in usize");
piece_idx = next_piece_idx;
}
let mut data = Vec::with_capacity(retained_len);
piece_idx = 0;
for nal in &self.nals {
let next_piece_idx = usize::try_from(nal.next_piece_idx).expect("u32 fits in usize");
let nal_pieces = &self.pieces[piece_idx..next_piece_idx];
data.extend_from_slice(&nal.len.to_be_bytes()[..]);
data.push(nal.hdr.into());
let mut actual_len = 1;
for piece in nal_pieces {
data.extend_from_slice(&piece[..]);
actual_len += piece.len();
}
debug_assert_eq!(
usize::try_from(nal.len).expect("u32 fits in usize"),
actual_len
);
piece_idx = next_piece_idx;
}
debug_assert_eq!(retained_len, data.len());
self.nals.clear();
self.pieces.clear();
let has_new_parameters = match (
new_sps.as_deref(),
new_pps.as_deref(),
self.parameters.as_ref(),
) {
(Some(sps_nal), Some(pps_nal), _) => {
self.parameters = Some(InternalParameters::parse_sps_and_pps(sps_nal, pps_nal)?);
true
}
(Some(_), None, Some(old_ip)) | (None, Some(_), Some(old_ip)) => {
let sps_nal = new_sps.as_deref().unwrap_or(&old_ip.sps_nal);
let pps_nal = new_pps.as_deref().unwrap_or(&old_ip.pps_nal);
self.parameters = Some(InternalParameters::parse_sps_and_pps(sps_nal, pps_nal)?);
true
}
_ => false,
};
Ok(VideoFrame {
has_new_parameters,
loss: au.loss,
start_ctx: au.start_ctx,
end_ctx: au.end_ctx,
timestamp: au.timestamp,
stream_id: au.stream_id,
is_random_access_point,
is_disposable,
data,
})
}
}
fn can_end_au(nal_unit_type: UnitType) -> bool {
nal_unit_type != UnitType::SeqParameterSet && nal_unit_type != UnitType::PicParameterSet
}
impl AccessUnit {
fn start(
pkt: &crate::rtp::ReceivedPacket,
additional_loss: u16,
same_ts_as_prev: bool,
) -> Self {
AccessUnit {
start_ctx: *pkt.ctx(),
end_ctx: *pkt.ctx(),
timestamp: pkt.timestamp(),
stream_id: pkt.stream_id(),
in_fu_a: false,
loss: pkt.loss() + additional_loss,
same_ts_as_prev,
}
}
}
fn validate_order(nals: &[Nal], errs: &mut String) {
let mut seen_vcl = false;
for (i, nal) in nals.iter().enumerate() {
match nal.hdr.nal_unit_type() {
UnitType::SliceLayerWithoutPartitioningNonIdr |
UnitType::SliceDataPartitionALayer |
UnitType::SliceDataPartitionBLayer |
UnitType::SliceDataPartitionCLayer |
UnitType::SliceLayerWithoutPartitioningIdr => {
seen_vcl = true;
}
UnitType::SEI => {
if seen_vcl {
errs.push_str("\n* SEI after VCL");
}
}
UnitType::AccessUnitDelimiter => {
if i != 0 {
let _ = write!(errs, "access unit delimiter must be first in AU; was preceded by {:?}",
nals[i-1].hdr);
}
}
UnitType::EndOfSeq => {
if !seen_vcl {
errs.push_str("\n* end of sequence without VCL");
}
}
UnitType::EndOfStream => {
if i != nals.len() - 1 {
errs.push_str("\n* end of stream NAL isn't last");
}
}
_ => {}
}
}
if !seen_vcl {
errs.push_str("\n* missing VCL");
}
}
#[derive(Clone, Debug)]
struct InternalParameters {
generic_parameters: super::VideoParameters,
sps_nal: Bytes,
pps_nal: Bytes,
}
impl InternalParameters {
fn parse_format_specific_params(format_specific_params: &str) -> Result<Self, String> {
let mut sprop_parameter_sets = None;
for p in format_specific_params.split(';') {
match p.trim().split_once('=') {
Some((key, value)) if key == "sprop-parameter-sets" => {
sprop_parameter_sets = Some(value)
}
None => return Err("key without value".into()),
_ => (),
}
}
let sprop_parameter_sets = sprop_parameter_sets
.ok_or_else(|| "no sprop-parameter-sets in H.264 format-specific-params".to_string())?;
let mut sps_nal = None;
let mut pps_nal = None;
for nal in sprop_parameter_sets.split(',') {
let nal = base64::decode(nal).map_err(|_| {
"bad sprop-parameter-sets: NAL has invalid base64 encoding".to_string()
})?;
if nal.is_empty() {
return Err("bad sprop-parameter-sets: empty NAL".into());
}
let header = h264_reader::nal::NalHeader::new(nal[0])
.map_err(|_| format!("bad sprop-parameter-sets: bad NAL header {:0x}", nal[0]))?;
match header.nal_unit_type() {
UnitType::SeqParameterSet => {
if sps_nal.is_some() {
return Err("multiple SPSs".into());
}
sps_nal = Some(nal);
}
UnitType::PicParameterSet => {
if pps_nal.is_some() {
return Err("multiple PPSs".into());
}
pps_nal = Some(nal);
}
_ => return Err("only SPS and PPS expected in parameter sets".into()),
}
}
let sps_nal = sps_nal.ok_or_else(|| "no sps".to_string())?;
let pps_nal = pps_nal.ok_or_else(|| "no pps".to_string())?;
let sps_nal = sps_nal
.strip_suffix(b"\x00\x00\x00\x01")
.unwrap_or(&sps_nal);
let pps_nal = pps_nal
.strip_suffix(b"\x00\x00\x00\x01")
.unwrap_or(&pps_nal);
Self::parse_sps_and_pps(sps_nal, pps_nal)
}
fn parse_sps_and_pps(sps_nal: &[u8], pps_nal: &[u8]) -> Result<InternalParameters, String> {
let sps_rbsp = h264_reader::rbsp::decode_nal(sps_nal).map_err(|_| "bad sps")?;
if sps_rbsp.len() < 5 {
return Err("bad sps".into());
}
let rfc6381_codec = format!(
"avc1.{:02X}{:02X}{:02X}",
sps_rbsp[0], sps_rbsp[1], sps_rbsp[2]
);
let sps = h264_reader::nal::sps::SeqParameterSet::from_bits(
h264_reader::rbsp::BitReader::new(&*sps_rbsp),
)
.map_err(|e| format!("Bad SPS: {:?}", e))?;
debug!("sps: {:#?}", &sps);
let pixel_dimensions = sps
.pixel_dimensions()
.map_err(|e| format!("SPS has invalid pixel dimensions: {:?}", e))?;
let mut avc_decoder_config = BytesMut::with_capacity(11 + sps_nal.len() + pps_nal.len());
avc_decoder_config.put_u8(1); avc_decoder_config.extend(&sps_rbsp[0..=2]);
avc_decoder_config.put_u8(0xff);
avc_decoder_config.put_u8(0xe1);
avc_decoder_config.extend(
&u16::try_from(sps_nal.len())
.map_err(|_| format!("SPS NAL is {} bytes long; must fit in u16", sps_nal.len()))?
.to_be_bytes()[..],
);
let sps_nal_start = avc_decoder_config.len();
avc_decoder_config.extend_from_slice(sps_nal);
let sps_nal_end = avc_decoder_config.len();
avc_decoder_config.put_u8(1); avc_decoder_config.extend(
&u16::try_from(pps_nal.len())
.map_err(|_| format!("PPS NAL is {} bytes long; must fit in u16", pps_nal.len()))?
.to_be_bytes()[..],
);
let pps_nal_start = avc_decoder_config.len();
avc_decoder_config.extend_from_slice(pps_nal);
let pps_nal_end = avc_decoder_config.len();
assert_eq!(avc_decoder_config.len(), 11 + sps_nal.len() + pps_nal.len());
let (pixel_aspect_ratio, frame_rate);
match sps.vui_parameters {
Some(ref vui) => {
pixel_aspect_ratio = vui
.aspect_ratio_info
.as_ref()
.and_then(|a| a.clone().get())
.map(|(h, v)| (u32::from(h), (u32::from(v))));
frame_rate = vui.timing_info.as_ref().and_then(|t| {
t.num_units_in_tick
.checked_mul(2)
.map(|doubled| (doubled, t.time_scale))
});
}
None => {
pixel_aspect_ratio = None;
frame_rate = None;
}
}
let avc_decoder_config = avc_decoder_config.freeze();
let sps_nal = avc_decoder_config.slice(sps_nal_start..sps_nal_end);
let pps_nal = avc_decoder_config.slice(pps_nal_start..pps_nal_end);
Ok(InternalParameters {
generic_parameters: super::VideoParameters {
rfc6381_codec,
pixel_dimensions,
pixel_aspect_ratio,
frame_rate,
extra_data: avc_decoder_config,
},
sps_nal,
pps_nal,
})
}
}
fn nal_matches(nal: &[u8], hdr: NalHeader, pieces: &[Bytes]) -> bool {
if nal.is_empty() || nal[0] != u8::from(hdr) {
return false;
}
let mut nal_pos = 1;
for piece in pieces {
let new_pos = nal_pos + piece.len();
if nal.len() < new_pos {
return false;
}
if piece[..] != nal[nal_pos..new_pos] {
return false;
}
nal_pos = new_pos;
}
nal_pos == nal.len()
}
fn to_bytes(hdr: NalHeader, len: u32, pieces: &[Bytes]) -> Bytes {
let len = usize::try_from(len).expect("u32 fits in usize");
let mut out = Vec::with_capacity(len);
out.push(hdr.into());
for piece in pieces {
out.extend_from_slice(&piece[..]);
}
debug_assert_eq!(len, out.len());
out.into()
}
#[doc(hidden)]
pub struct Packetizer {
max_payload_size: u16,
next_sequence_number: u16,
stream_id: usize,
ssrc: u32,
payload_type: u8,
state: PacketizerState,
}
impl Packetizer {
pub fn new(
max_payload_size: u16,
stream_id: usize,
initial_sequence_number: u16,
payload_type: u8,
ssrc: u32,
) -> Result<Self, String> {
if max_payload_size < 3 {
return Err("max_payload_size must be > 3".into());
}
Ok(Self {
max_payload_size,
stream_id,
next_sequence_number: initial_sequence_number,
ssrc,
payload_type,
state: PacketizerState::Idle,
})
}
pub fn push(&mut self, timestamp: Timestamp, data: Bytes) -> Result<(), Error> {
assert!(matches!(self.state, PacketizerState::Idle));
self.state = PacketizerState::HaveData { timestamp, data };
Ok(())
}
pub fn pull(&mut self) -> Result<Option<ReceivedPacket>, String> {
let max_payload_size = usize::from(self.max_payload_size);
match std::mem::replace(&mut self.state, PacketizerState::Idle) {
PacketizerState::Idle => Ok(None),
PacketizerState::HaveData {
timestamp,
mut data,
} => {
if data.len() < 5 {
return Err(format!(
"have only {} bytes; expected 4-byte length + non-empty NAL",
data.len()
));
}
let len = data.get_u32();
let usize_len = usize::try_from(len).expect("u32 fits in usize");
if data.len() < usize_len || len == 0 {
return Err(format!(
"bad length of {} bytes; expected [1, {}]",
len,
data.len()
));
}
let sequence_number = self.next_sequence_number;
self.next_sequence_number = self.next_sequence_number.wrapping_add(1);
let hdr = NalHeader::new(data[0]).map_err(|_| "F bit in NAL header".to_owned())?;
if matches!(hdr.nal_unit_type(), UnitType::Unspecified(_)) {
return Err(format!("bad NAL header {:?}", hdr));
}
if usize_len > max_payload_size {
data.advance(1);
let fu_indicator = (hdr.nal_ref_idc() << 5) | 28;
let fu_header = 0b1000_0000 | hdr.nal_unit_type().id(); let payload = [fu_indicator, fu_header]
.into_iter()
.chain(data[..max_payload_size - 2].iter().copied());
let pkt = ReceivedPacketBuilder {
ctx: crate::PacketContext::dummy(),
stream_id: self.stream_id,
timestamp,
ssrc: self.ssrc,
sequence_number,
loss: 0,
mark: false,
payload_type: self.payload_type,
}
.build(payload)?;
data.advance(max_payload_size - 2);
self.state = PacketizerState::InFragment {
timestamp,
hdr,
left: len + 1 - u32::from(self.max_payload_size),
data,
};
return Ok(Some(pkt));
}
let mark;
if data.len() == usize_len {
mark = true;
} else {
self.state = PacketizerState::HaveData {
timestamp,
data: data.split_off(usize_len),
};
mark = false;
}
Ok(Some(
ReceivedPacketBuilder {
ctx: crate::PacketContext::dummy(),
stream_id: self.stream_id,
timestamp,
ssrc: self.ssrc,
sequence_number,
loss: 0,
mark,
payload_type: self.payload_type,
}
.build(data)?,
))
}
PacketizerState::InFragment {
timestamp,
hdr,
left,
mut data,
} => {
let sequence_number = self.next_sequence_number;
self.next_sequence_number = self.next_sequence_number.wrapping_add(1);
let mut payload;
let mark;
if left > u32::from(self.max_payload_size) - 2 {
mark = false;
payload = Vec::with_capacity(max_payload_size);
let fu_indicator = (hdr.nal_ref_idc() << 5) | 28;
let fu_header = hdr.nal_unit_type().id(); payload.extend_from_slice(&[fu_indicator, fu_header]);
payload.extend_from_slice(&data[..max_payload_size - 2]);
data.advance(max_payload_size - 2);
self.state = PacketizerState::InFragment {
timestamp,
hdr,
left: left + 2 - u32::from(self.max_payload_size),
data,
};
} else {
let usize_left = usize::try_from(left).expect("u32 fits in usize");
payload = Vec::with_capacity(usize_left + 2);
let fu_indicator = (hdr.nal_ref_idc() << 5) | 28;
let fu_header = 0b0100_0000 | hdr.nal_unit_type().id(); payload.extend_from_slice(&[fu_indicator, fu_header]);
payload.extend_from_slice(&data[..usize_left]);
if data.len() == usize_left {
mark = true;
self.state = PacketizerState::Idle;
} else {
mark = false;
data.advance(usize_left);
self.state = PacketizerState::HaveData { timestamp, data };
}
}
Ok(Some(
ReceivedPacketBuilder {
ctx: crate::PacketContext::dummy(),
stream_id: self.stream_id,
timestamp,
ssrc: self.ssrc,
sequence_number,
loss: 0,
mark,
payload_type: self.payload_type,
}
.build(payload)?,
))
}
}
}
}
enum PacketizerState {
Idle,
HaveData {
timestamp: Timestamp,
data: Bytes,
},
InFragment {
timestamp: Timestamp,
hdr: NalHeader,
left: u32,
data: Bytes,
},
}
#[cfg(test)]
mod tests {
use std::num::NonZeroU32;
use crate::testutil::init_logging;
use crate::{codec::CodecItem, rtp::ReceivedPacketBuilder};
#[test]
fn depacketize() {
init_logging();
let mut d = super::Depacketizer::new(90_000, Some("packetization-mode=1;profile-level-id=64001E;sprop-parameter-sets=Z2QAHqwsaoLA9puCgIKgAAADACAAAAMD0IAA,aO4xshsA")).unwrap();
let timestamp = crate::Timestamp {
timestamp: 0,
clock_rate: NonZeroU32::new(90_000).unwrap(),
start: 0,
};
d.push(
ReceivedPacketBuilder {
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: false,
payload_type: 0,
}
.build(b"\x06plain".iter().copied())
.unwrap(),
)
.unwrap();
assert!(d.pull().is_none());
d.push(
ReceivedPacketBuilder {
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 1,
loss: 0,
mark: false,
payload_type: 0,
}
.build(*b"\x18\x00\x09\x06stap-a 1\x00\x09\x06stap-a 2")
.unwrap(),
)
.unwrap();
assert!(d.pull().is_none());
d.push(
ReceivedPacketBuilder {
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 2,
loss: 0,
mark: false,
payload_type: 0,
}
.build(*b"\x7c\x86fu-a start, ")
.unwrap(),
)
.unwrap();
assert!(d.pull().is_none());
d.push(
ReceivedPacketBuilder {
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 3,
loss: 0,
mark: false,
payload_type: 0,
}
.build(*b"\x7c\x06fu-a middle, ")
.unwrap(),
)
.unwrap();
assert!(d.pull().is_none());
d.push(
ReceivedPacketBuilder {
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 4,
loss: 0,
mark: true,
payload_type: 0,
}
.build(*b"\x7c\x46fu-a end")
.unwrap(),
)
.unwrap();
let frame = match d.pull() {
Some(CodecItem::VideoFrame(frame)) => frame,
_ => panic!(),
};
assert_eq!(
&frame.data()[..],
b"\x00\x00\x00\x06\x06plain\
\x00\x00\x00\x09\x06stap-a 1\
\x00\x00\x00\x09\x06stap-a 2\
\x00\x00\x00\x22\x66fu-a start, fu-a middle, fu-a end"
);
}
#[test]
fn depacketize_reolink_bad_framing_at_start() {
init_logging();
let mut d = super::Depacketizer::new(90_000, Some("packetization-mode=1;profile-level-id=640033;sprop-parameter-sets=Z2QAM6wVFKCgL/lQ,aO48sA==")).unwrap();
let ts1 = crate::Timestamp {
timestamp: 0,
clock_rate: NonZeroU32::new(90_000).unwrap(),
start: 0,
};
let ts2 = crate::Timestamp {
timestamp: 1,
clock_rate: NonZeroU32::new(90_000).unwrap(),
start: 0,
};
d.push(
ReceivedPacketBuilder {
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp: ts1,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: true,
payload_type: 0,
}
.build(*b"\x67\x64\x00\x33\xac\x15\x14\xa0\xa0\x2f\xf9\x50")
.unwrap(),
)
.unwrap();
assert!(d.pull().is_none());
d.push(
ReceivedPacketBuilder {
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp: ts1,
ssrc: 0,
sequence_number: 1,
loss: 0,
mark: false,
payload_type: 0,
}
.build(*b"\x68\xee\x3c\xb0")
.unwrap(),
)
.unwrap();
assert!(d.pull().is_none());
d.push(
ReceivedPacketBuilder {
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp: ts2,
ssrc: 0,
sequence_number: 2,
loss: 0,
mark: true,
payload_type: 0,
}
.build(*b"\x65slice")
.unwrap(),
)
.unwrap();
let frame = match d.pull() {
Some(CodecItem::VideoFrame(frame)) => frame,
o => panic!("unexpected pull result {:#?}", o),
};
assert_eq!(
&frame.data()[..],
b"\x00\x00\x00\x0C\x67\x64\x00\x33\xac\x15\x14\xa0\xa0\x2f\xf9\x50\
\x00\x00\x00\x04\x68\xee\x3c\xb0\
\x00\x00\x00\x06\x65slice"
);
assert_eq!(frame.timestamp, ts2); }
#[test]
fn depacketize_reolink_gop_boundary() {
init_logging();
let mut d = super::Depacketizer::new(90_000, Some("packetization-mode=1;profile-level-id=640033;sprop-parameter-sets=Z2QAM6wVFKCgL/lQ,aO48sA==")).unwrap();
let ts1 = crate::Timestamp {
timestamp: 0,
clock_rate: NonZeroU32::new(90_000).unwrap(),
start: 0,
};
let ts2 = crate::Timestamp {
timestamp: 1,
clock_rate: NonZeroU32::new(90_000).unwrap(),
start: 0,
};
d.push(
ReceivedPacketBuilder {
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp: ts1,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: true,
payload_type: 0,
}
.build(*b"\x01slice")
.unwrap(),
)
.unwrap();
let frame = match d.pull() {
Some(CodecItem::VideoFrame(frame)) => frame,
o => panic!("unexpected pull result {:#?}", o),
};
assert_eq!(&frame.data()[..], b"\x00\x00\x00\x06\x01slice");
assert_eq!(frame.timestamp, ts1);
d.push(
ReceivedPacketBuilder {
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp: ts1,
ssrc: 0,
sequence_number: 1,
loss: 0,
mark: false, payload_type: 0,
}
.build(*b"\x67\x64\x00\x33\xac\x15\x14\xa0\xa0\x2f\xf9\x50")
.unwrap(),
)
.unwrap();
assert!(d.pull().is_none());
d.push(
ReceivedPacketBuilder {
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp: ts1,
ssrc: 0,
sequence_number: 2,
loss: 0,
mark: false,
payload_type: 0,
}
.build(*b"\x68\xee\x3c\xb0")
.unwrap(),
)
.unwrap();
assert!(d.pull().is_none());
d.push(
ReceivedPacketBuilder {
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp: ts2,
ssrc: 0,
sequence_number: 3,
loss: 0,
mark: true,
payload_type: 0,
}
.build(*b"\x65slice")
.unwrap(),
)
.unwrap();
let frame = match d.pull() {
Some(CodecItem::VideoFrame(frame)) => frame,
o => panic!("unexpected pull result {:#?}", o),
};
assert_eq!(
&frame.data()[..],
b"\x00\x00\x00\x0C\x67\x64\x00\x33\xac\x15\x14\xa0\xa0\x2f\xf9\x50\
\x00\x00\x00\x04\x68\xee\x3c\xb0\
\x00\x00\x00\x06\x65slice"
);
assert_eq!(frame.timestamp, ts2); }
#[test]
fn depacketize_parameter_change() {
init_logging();
let mut d = super::Depacketizer::new(90_000, Some("a=fmtp:96 packetization-mode=1;profile-level-id=4d002a;sprop-parameter-sets=Z00AKp2oHgCJ+WbgICAoAAADAAgAAAMAfCA=,aO48gA==")).unwrap();
match d.parameters() {
Some(crate::codec::ParametersRef::Video(v)) => {
assert_eq!(v.pixel_dimensions(), (1920, 1080));
}
o => panic!("{:?}", o),
}
let timestamp = crate::Timestamp {
timestamp: 0,
clock_rate: NonZeroU32::new(90_000).unwrap(),
start: 0,
};
d.push(ReceivedPacketBuilder { ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: false,
payload_type: 0,
}.build(*b"\x67\x4d\x40\x1e\x9a\x64\x05\x01\xef\xf3\x50\x10\x10\x14\x00\x00\x0f\xa0\x00\x01\x38\x80\x10").unwrap()).unwrap();
assert!(d.pull().is_none());
d.push(
ReceivedPacketBuilder {
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 1,
loss: 0,
mark: false,
payload_type: 0,
}
.build(*b"\x68\xee\x3c\x80")
.unwrap(),
)
.unwrap();
assert!(d.pull().is_none());
d.push(
ReceivedPacketBuilder {
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 2,
loss: 0,
mark: true,
payload_type: 0,
}
.build(*b"\x65slice")
.unwrap(),
)
.unwrap();
let frame = match d.pull() {
Some(CodecItem::VideoFrame(frame)) => frame,
o => panic!("unexpected pull result {:#?}", o),
};
assert!(frame.has_new_parameters);
match d.parameters() {
Some(crate::codec::ParametersRef::Video(v)) => {
assert_eq!(v.pixel_dimensions(), (640, 480));
}
_ => unreachable!(),
}
}
#[test]
fn depacketize_empty() {
init_logging();
assert!(super::InternalParameters::parse_format_specific_params("").is_err());
assert!(super::InternalParameters::parse_format_specific_params(" ").is_err());
}
#[test]
fn gw_security_params() {
init_logging();
let params = super::InternalParameters::parse_format_specific_params(
"packetization-mode=1;\
profile-level-id=5046302;\
sprop-parameter-sets=Z00AHpWoLQ9puAgICBAAAAAB,aO48gAAAAAE=",
)
.unwrap();
assert_eq!(
¶ms.sps_nal[..],
b"\x67\x4d\x00\x1e\x95\xa8\x2d\x0f\x69\xb8\x08\x08\x08\x10"
);
assert_eq!(¶ms.pps_nal[..], b"\x68\xee\x3c\x80");
}
#[test]
fn bad_format_specific_params() {
init_logging();
const BAD_PARAMS: &str = "packetization-mode=1;\
profile-level-id=00f004;\
sprop-parameter-sets=6QDwBE/LCAAAH0gAB1TgIAAAAAA=,AAAAAA==";
super::InternalParameters::parse_format_specific_params(BAD_PARAMS).unwrap_err();
let mut d = super::Depacketizer::new(90_000, Some(BAD_PARAMS)).unwrap();
assert!(d.parameters().is_none());
let timestamp = crate::Timestamp {
timestamp: 0,
clock_rate: NonZeroU32::new(90_000).unwrap(),
start: 0,
};
d.push(ReceivedPacketBuilder {
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 0,
loss: 0,
mark: false,
payload_type: 0,
}.build(
*b"\x67\x4d\x00\x28\xe9\x00\xf0\x04\x4f\xcb\x08\x00\x00\x1f\x48\x00\x07\x54\xe0\x20",
).unwrap()).unwrap();
assert!(d.pull().is_none());
d.push(
ReceivedPacketBuilder {
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 1,
loss: 0,
mark: false,
payload_type: 0,
}
.build(*b"\x68\xea\x8f\x20")
.unwrap(),
)
.unwrap();
assert!(d.pull().is_none());
d.push(
ReceivedPacketBuilder {
ctx: crate::PacketContext::dummy(),
stream_id: 0,
timestamp,
ssrc: 0,
sequence_number: 2,
loss: 0,
mark: true,
payload_type: 0,
}
.build(*b"\x65idr slice")
.unwrap(),
)
.unwrap();
let frame = match d.pull() {
Some(CodecItem::VideoFrame(frame)) => frame,
_ => panic!(),
};
assert!(frame.has_new_parameters);
assert!(d.parameters().is_some());
}
}