use anyhow::Context;
use bytes::{Buf, Bytes, BytesMut};
use tokio::io::{AsyncRead, AsyncReadExt};
use super::Sps;
use crate::catalog::hang::CatalogExt;
use crate::codec::annexb::{NalIterator, START_CODE};
use crate::container::jitter::MinFrameDuration;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum Mode {
Avc1,
Avc3,
}
pub struct Import<E: CatalogExt = ()> {
tracks: crate::track_provider::TrackProvider,
catalog: crate::catalog::Producer<E>,
track: Option<crate::container::Producer<crate::catalog::hang::Container>>,
config: Option<hang::catalog::VideoConfig>,
state: State,
zero: Option<tokio::time::Instant>,
jitter: MinFrameDuration,
}
enum State {
Pending { mode_hint: Option<Mode> },
Avc1 { length_size: usize },
Avc3 {
current: Avc3Frame,
sps: Option<Bytes>,
pps: Option<Bytes>,
},
}
#[derive(Default)]
struct Avc3Frame {
chunks: BytesMut,
contains_idr: bool,
contains_slice: bool,
contains_sps: bool,
contains_pps: bool,
}
impl<E: CatalogExt> Import<E> {
pub fn new(broadcast: moq_net::BroadcastProducer, catalog: crate::catalog::Producer<E>) -> Self {
Self {
tracks: crate::track_provider::TrackProvider::unique(broadcast, ".avc3"),
catalog,
track: None,
config: None,
state: State::Pending { mode_hint: None },
zero: None,
jitter: MinFrameDuration::new(),
}
}
pub fn new_with_track(track: moq_net::TrackProducer, catalog: crate::catalog::Producer<E>) -> Self {
Self {
tracks: crate::track_provider::TrackProvider::fixed(track),
catalog,
track: None,
config: None,
state: State::Pending { mode_hint: None },
zero: None,
jitter: MinFrameDuration::new(),
}
}
pub fn with_mode(mut self, mode: Mode) -> anyhow::Result<Self> {
match mode {
Mode::Avc1 => {
self.tracks.set_suffix(".avc1");
self.state = State::Pending {
mode_hint: Some(Mode::Avc1),
};
}
Mode::Avc3 => {
self.tracks.set_suffix(".avc3");
let track = self.tracks.create()?;
self.track = Some(
crate::container::Producer::new(track, crate::catalog::hang::Container::Legacy)
.with_lenient_start(),
);
self.state = State::Avc3 {
current: Avc3Frame::default(),
sps: None,
pps: None,
};
}
}
Ok(self)
}
pub fn track(&self) -> Option<&moq_net::TrackProducer> {
self.track.as_ref().map(|t| t.track())
}
pub fn initialize<T: Buf + AsRef<[u8]>>(&mut self, buf: &mut T) -> anyhow::Result<()> {
let mode = match &self.state {
State::Pending { mode_hint } => mode_hint.unwrap_or_else(|| detect_mode(buf.as_ref())),
State::Avc1 { .. } => Mode::Avc1,
State::Avc3 { .. } => Mode::Avc3,
};
match mode {
Mode::Avc1 => self.initialize_avc1(buf),
Mode::Avc3 => self.initialize_avc3(buf),
}
}
fn initialize_avc1<T: Buf + AsRef<[u8]>>(&mut self, buf: &mut T) -> anyhow::Result<()> {
let avcc_bytes = buf.as_ref();
let avcc = super::Avcc::parse(avcc_bytes)?;
self.state = State::Avc1 {
length_size: avcc.length_size,
};
let mut config = hang::catalog::VideoConfig::new(hang::catalog::H264 {
profile: avcc.profile,
constraints: avcc.constraints,
level: avcc.level,
inline: false,
});
config.coded_width = avcc.coded_width;
config.coded_height = avcc.coded_height;
config.description = Some(Bytes::copy_from_slice(avcc_bytes));
config.container = hang::catalog::Container::Legacy;
self.tracks.set_suffix(".avc1");
self.swap_config(config)?;
buf.advance(buf.remaining());
Ok(())
}
fn initialize_avc3<T: Buf + AsRef<[u8]>>(&mut self, buf: &mut T) -> anyhow::Result<()> {
if !matches!(self.state, State::Avc3 { .. }) {
self.state = State::Avc3 {
current: Avc3Frame::default(),
sps: None,
pps: None,
};
if self.track.is_none() {
self.tracks.set_suffix(".avc3");
let track = self.tracks.create()?;
self.track = Some(
crate::container::Producer::new(track, crate::catalog::hang::Container::Legacy)
.with_lenient_start(),
);
}
}
let mut nals = NalIterator::new(buf);
while let Some(nal) = nals.next().transpose()? {
self.decode_nal(nal, None)?;
}
if let Some(nal) = nals.flush()? {
self.decode_nal(nal, None)?;
}
Ok(())
}
pub fn is_initialized(&self) -> bool {
self.track.is_some()
}
pub async fn decode_from<T: AsyncRead + Unpin>(&mut self, reader: &mut T) -> anyhow::Result<()> {
let mut buffer = BytesMut::new();
while reader.read_buf(&mut buffer).await? > 0 {
self.decode_stream(&mut buffer, None)?;
}
Ok(())
}
pub fn decode_stream<T: Buf + AsRef<[u8]>>(
&mut self,
buf: &mut T,
pts: Option<crate::container::Timestamp>,
) -> anyhow::Result<()> {
anyhow::ensure!(matches!(self.state, State::Avc3 { .. }), "decode_stream is avc3 only");
let pts = self.pts(pts)?;
let nals = NalIterator::new(buf);
for nal in nals {
self.decode_nal(nal?, Some(pts))?;
}
Ok(())
}
pub fn decode_frame<T: Buf + AsRef<[u8]>>(
&mut self,
buf: &mut T,
pts: Option<crate::container::Timestamp>,
) -> anyhow::Result<()> {
match &self.state {
State::Avc1 { .. } => self.decode_avc1(buf, pts),
State::Avc3 { .. } => self.decode_avc3_frame(buf, pts),
State::Pending { .. } => anyhow::bail!("not initialized; call initialize() or with_mode() first"),
}
}
fn decode_avc1<T: Buf + AsRef<[u8]>>(
&mut self,
buf: &mut T,
pts: Option<crate::container::Timestamp>,
) -> anyhow::Result<()> {
let State::Avc1 { length_size } = self.state else {
unreachable!("checked by decode_frame")
};
let data = buf.as_ref();
let pts = self.pts(pts)?;
let keyframe = avc1_is_keyframe(data, length_size);
let track = self
.track
.as_mut()
.context("not initialized; call initialize() first")?;
track.write(crate::container::Frame {
timestamp: pts,
payload: data.to_vec().into(),
keyframe,
})?;
if let Some(jitter) = self.jitter.observe(pts)
&& let Some(c) = self.catalog.lock().video.renditions.get_mut(&track.name)
{
c.jitter = Some(jitter);
}
buf.advance(buf.remaining());
Ok(())
}
fn decode_avc3_frame<T: Buf + AsRef<[u8]>>(
&mut self,
buf: &mut T,
pts: Option<crate::container::Timestamp>,
) -> anyhow::Result<()> {
let pts = self.pts(pts)?;
let mut nals = NalIterator::new(buf);
while let Some(nal) = nals.next().transpose()? {
self.decode_nal(nal, Some(pts))?;
}
if let Some(nal) = nals.flush()? {
self.decode_nal(nal, Some(pts))?;
}
self.maybe_start_frame(Some(pts))?;
Ok(())
}
fn decode_nal(&mut self, nal: Bytes, pts: Option<crate::container::Timestamp>) -> anyhow::Result<()> {
let header = nal.first().context("NAL unit is too short")?;
let forbidden_zero_bit = (header >> 7) & 1;
anyhow::ensure!(forbidden_zero_bit == 0, "forbidden zero bit is not zero");
let nal_unit_type = header & 0b11111;
let nal_type = Avc3NalType::try_from(nal_unit_type).ok();
match nal_type {
Some(Avc3NalType::Sps) => {
self.maybe_start_frame(pts)?;
let sps = Sps::parse(&nal)?;
self.init_from_sps(&sps)?;
let State::Avc3 { current, sps, pps } = &mut self.state else {
unreachable!("decode_nal is avc3 only")
};
if sps.as_ref().is_some_and(|cached| cached != &nal) {
*pps = None;
current.chunks.clear();
current.contains_pps = false;
current.contains_sps = false;
}
*sps = Some(nal.clone());
current.contains_sps = true;
}
Some(Avc3NalType::Pps) => {
self.maybe_start_frame(pts)?;
let State::Avc3 { current, pps, .. } = &mut self.state else {
unreachable!()
};
*pps = Some(nal.clone());
current.contains_pps = true;
}
Some(Avc3NalType::Aud) | Some(Avc3NalType::Sei) => {
self.maybe_start_frame(pts)?;
}
Some(Avc3NalType::IdrSlice) => {
let State::Avc3 { current, sps, pps } = &mut self.state else {
unreachable!()
};
if !current.contains_sps
&& let Some(sps) = sps.as_ref()
{
current.chunks.extend_from_slice(&START_CODE);
current.chunks.extend_from_slice(sps);
current.contains_sps = true;
}
if !current.contains_pps
&& let Some(pps) = pps.as_ref()
{
current.chunks.extend_from_slice(&START_CODE);
current.chunks.extend_from_slice(pps);
current.contains_pps = true;
}
current.contains_idr = true;
current.contains_slice = true;
}
Some(Avc3NalType::NonIdrSlice)
| Some(Avc3NalType::DataPartitionA)
| Some(Avc3NalType::DataPartitionB)
| Some(Avc3NalType::DataPartitionC) => {
if nal.get(1).context("NAL unit is too short")? & 0x80 != 0 {
self.maybe_start_frame(pts)?;
}
let State::Avc3 { current, .. } = &mut self.state else {
unreachable!()
};
current.contains_slice = true;
}
_ => {}
}
tracing::trace!(kind = ?nal_type, "parsed NAL");
let State::Avc3 { current, .. } = &mut self.state else {
unreachable!()
};
current.chunks.extend_from_slice(&START_CODE);
current.chunks.extend_from_slice(&nal);
Ok(())
}
fn init_from_sps(&mut self, sps: &Sps) -> anyhow::Result<()> {
let mut config = hang::catalog::VideoConfig::new(hang::catalog::H264 {
profile: sps.profile,
constraints: sps.constraints,
level: sps.level,
inline: true,
});
config.coded_width = Some(sps.coded_width);
config.coded_height = Some(sps.coded_height);
config.container = hang::catalog::Container::Legacy;
if let Some(old) = &self.config
&& old == &config
{
return Ok(());
}
let track_name = self.track.as_ref().context("avc3 track not created")?.name.clone();
let mut catalog = self.catalog.lock();
catalog.video.renditions.insert(track_name, config.clone());
self.config = Some(config);
Ok(())
}
fn maybe_start_frame(&mut self, pts: Option<crate::container::Timestamp>) -> anyhow::Result<()> {
let State::Avc3 { current, .. } = &mut self.state else {
return Ok(());
};
if !current.contains_slice {
return Ok(());
}
let pts = pts.context("missing timestamp")?;
let payload = std::mem::take(&mut current.chunks).freeze();
let keyframe = current.contains_idr;
current.contains_idr = false;
current.contains_slice = false;
current.contains_sps = false;
current.contains_pps = false;
let track = self.track.as_mut().context("avc3 track not created")?;
track.write(crate::container::Frame {
timestamp: pts,
payload,
keyframe,
})?;
if let Some(jitter) = self.jitter.observe(pts)
&& let Some(c) = self.catalog.lock().video.renditions.get_mut(&track.name)
{
c.jitter = Some(jitter);
}
Ok(())
}
fn swap_config(&mut self, config: hang::catalog::VideoConfig) -> anyhow::Result<()> {
if let Some(old) = &self.config
&& old == &config
{
return Ok(());
}
let mut catalog = self.catalog.lock();
if let Some(track) = self.track.take() {
if self.tracks.is_fixed() {
self.track = Some(track);
anyhow::bail!("fixed track cannot be reconfigured");
}
tracing::debug!(name = ?track.name, "reinitializing H.264 track");
catalog.video.renditions.remove(&track.name);
}
let track = self.tracks.create()?;
tracing::debug!(name = ?track.name, ?config, "starting H.264 track");
catalog.video.renditions.insert(track.name.clone(), config.clone());
self.config = Some(config);
self.track =
Some(crate::container::Producer::new(track, crate::catalog::hang::Container::Legacy).with_lenient_start());
Ok(())
}
pub fn finish(&mut self) -> anyhow::Result<()> {
let track = self.track.as_mut().context("not initialized")?;
track.finish()?;
Ok(())
}
pub fn seek(&mut self, sequence: u64) -> anyhow::Result<()> {
let track = self.track.as_mut().context("not initialized")?;
track.seek(sequence)?;
Ok(())
}
fn pts(&mut self, hint: Option<crate::container::Timestamp>) -> anyhow::Result<crate::container::Timestamp> {
if let Some(pts) = hint {
return Ok(pts);
}
let zero = self.zero.get_or_insert_with(tokio::time::Instant::now);
Ok(crate::container::Timestamp::from_micros(
zero.elapsed().as_micros() as u64
)?)
}
}
impl<E: CatalogExt> Drop for Import<E> {
fn drop(&mut self) {
if let Some(track) = self.track.take() {
tracing::debug!(name = ?track.name, "ending H.264 track");
self.catalog.lock().video.renditions.remove(&track.name);
}
}
}
fn detect_mode(bytes: &[u8]) -> Mode {
let three_byte = matches!(bytes, [0, 0, 1, ..]);
let four_byte = matches!(bytes, [0, 0, 0, 1, ..]);
if three_byte || four_byte {
Mode::Avc3
} else {
Mode::Avc1
}
}
fn avc1_is_keyframe(data: &[u8], length_size: usize) -> bool {
let mut offset = 0;
while offset + length_size <= data.len() {
let nal_len = match length_size {
1 => data[offset] as usize,
2 => u16::from_be_bytes([data[offset], data[offset + 1]]) as usize,
3 => u32::from_be_bytes([0, data[offset], data[offset + 1], data[offset + 2]]) as usize,
4 => u32::from_be_bytes([data[offset], data[offset + 1], data[offset + 2], data[offset + 3]]) as usize,
_ => return false,
};
offset += length_size;
if offset + nal_len > data.len() {
break;
}
if nal_len > 0 && data[offset] & 0x1f == 5 {
return true; }
offset += nal_len;
}
false
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn detect_mode_avc1_avcc_buffer() {
let avcc: &[u8] = &[
0x01, 0x42, 0xc0, 0x1f, 0xff, 0xe1, 0x00, 0x06, 0x67, 0x42, 0xc0, 0x1f, 0xde, 0xad,
];
assert_eq!(detect_mode(avcc), Mode::Avc1);
}
#[test]
fn detect_mode_avc3_3byte_start_code() {
let nals: &[u8] = &[0x00, 0x00, 0x01, 0x67, 0x42, 0xc0, 0x1f];
assert_eq!(detect_mode(nals), Mode::Avc3);
}
#[test]
fn detect_mode_avc3_4byte_start_code() {
let nals: &[u8] = &[0x00, 0x00, 0x00, 0x01, 0x67, 0x42, 0xc0, 0x1f];
assert_eq!(detect_mode(nals), Mode::Avc3);
}
#[tokio::test(start_paused = true)]
async fn auto_detect_avc1_lands_in_catalog() {
let sps_nal = [0x67, 0x42, 0xc0, 0x1f];
let mut avcc = vec![0x01, 0x42, 0xc0, 0x1f, 0xff, 0xe1, 0x00, sps_nal.len() as u8];
avcc.extend_from_slice(&sps_nal);
avcc.extend_from_slice(&[0x01, 0x00, 0x04, 0x68, 0xce, 0x3c, 0x80]);
let broadcast = moq_net::Broadcast::new();
let mut producer = broadcast.produce();
let catalog = crate::catalog::Producer::new(&mut producer).unwrap();
let mut importer = Import::new(producer, catalog.clone());
let mut buf = bytes::BytesMut::from(avcc.as_slice());
importer.initialize(&mut buf).expect("initialize avc1");
let snapshot = catalog.snapshot();
assert_eq!(snapshot.video.renditions.len(), 1);
let cfg = snapshot.video.renditions.values().next().unwrap();
let hang::catalog::VideoCodec::H264(h264) = &cfg.codec else {
panic!("expected H.264 codec")
};
assert!(!h264.inline, "avc1 source should land as inline=false");
assert_eq!(h264.profile, 0x42);
assert_eq!(h264.level, 0x1f);
let desc = cfg.description.as_ref().expect("description set");
assert_eq!(desc.as_ref(), avcc.as_slice());
}
#[tokio::test(start_paused = true)]
async fn auto_detect_avc3_lands_in_catalog() {
let sps: &[u8] = &[
0x67, 0x42, 0xc0, 0x1f, 0xda, 0x01, 0x40, 0x16, 0xe9, 0xb8, 0x08, 0x08, 0x0a, 0x00, 0x00, 0x07, 0xd0, 0x00,
0x01, 0xd4, 0xc0, 0x80,
];
let pps: &[u8] = &[0x68, 0xce, 0x3c, 0x80];
let mut annexb = bytes::BytesMut::new();
annexb.extend_from_slice(&[0, 0, 0, 1]);
annexb.extend_from_slice(sps);
annexb.extend_from_slice(&[0, 0, 0, 1]);
annexb.extend_from_slice(pps);
let broadcast = moq_net::Broadcast::new();
let mut producer = broadcast.produce();
let catalog = crate::catalog::Producer::new(&mut producer).unwrap();
let mut importer = Import::new(producer, catalog.clone());
importer.initialize(&mut annexb).expect("initialize avc3");
let snapshot = catalog.snapshot();
assert_eq!(snapshot.video.renditions.len(), 1);
let cfg = snapshot.video.renditions.values().next().unwrap();
let hang::catalog::VideoCodec::H264(h264) = &cfg.codec else {
panic!("expected H.264 codec")
};
assert!(h264.inline, "avc3 source should land as inline=true");
assert!(cfg.description.is_none(), "avc3 has no out-of-band description");
assert_eq!(h264.profile, sps[1]);
assert_eq!(h264.level, sps[3]);
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, num_enum::TryFromPrimitive)]
#[repr(u8)]
enum Avc3NalType {
Unspecified = 0,
NonIdrSlice = 1,
DataPartitionA = 2,
DataPartitionB = 3,
DataPartitionC = 4,
IdrSlice = 5,
Sei = 6,
Sps = 7,
Pps = 8,
Aud = 9,
EndOfSeq = 10,
EndOfStream = 11,
Filler = 12,
SpsExt = 13,
Prefix = 14,
SubsetSps = 15,
DepthParameterSet = 16,
}