use std::io::Cursor;
use bytes::Bytes;
use hang::catalog::{AudioCodec, VideoCodec};
use webm_iterable::WebmIterator;
use webm_iterable::matroska_spec::{Master, MatroskaSpec, SimpleBlock};
#[tokio::test(start_paused = true)]
async fn export_header_roundtrip_vp9_opus() {
let import_bytes = synth_webm();
let broadcast = moq_net::Broadcast::new();
let mut producer = broadcast.produce();
let consumer = producer.consume();
let catalog = crate::catalog::hang::Producer::new(&mut producer).unwrap();
let mut importer = crate::container::mkv::Import::new(producer, catalog.clone());
let mut buf = bytes::BytesMut::from(import_bytes.as_slice());
importer.decode(&mut buf).unwrap();
importer.finish().unwrap();
let mut exporter = crate::container::mkv::Export::new(consumer).unwrap();
let header = tokio::time::timeout(std::time::Duration::from_secs(1), exporter.next())
.await
.expect("exporter timed out")
.expect("exporter result")
.expect("expected header bytes");
let mut cursor = Cursor::new(header.as_ref());
let iter = WebmIterator::new(
&mut cursor,
&[
MatroskaSpec::Ebml(Master::Start),
MatroskaSpec::Tracks(Master::Start),
MatroskaSpec::TrackEntry(Master::Start),
MatroskaSpec::Info(Master::Start),
],
);
let mut saw_ebml = false;
let mut saw_segment_start = false;
let mut saw_info = false;
let mut track_entries: Vec<Vec<MatroskaSpec>> = Vec::new();
for tag in iter {
match tag.expect("parse header") {
MatroskaSpec::Ebml(Master::Full(children)) => {
saw_ebml = true;
let doc_type = children
.iter()
.find_map(|c| {
if let MatroskaSpec::DocType(d) = c {
Some(d.clone())
} else {
None
}
})
.expect("DocType in header");
assert_eq!(doc_type, "webm", "should be webm when only VP9+Opus");
}
MatroskaSpec::Segment(Master::Start) => saw_segment_start = true,
MatroskaSpec::Info(Master::Full(children)) => {
saw_info = true;
let scale = children
.iter()
.find_map(|c| {
if let MatroskaSpec::TimestampScale(v) = c {
Some(*v)
} else {
None
}
})
.expect("TimestampScale");
assert_eq!(scale, 1_000_000);
}
MatroskaSpec::Tracks(Master::Full(entries)) => {
for entry in entries {
if let MatroskaSpec::TrackEntry(Master::Full(children)) = entry {
track_entries.push(children);
}
}
}
_ => {}
}
}
assert!(saw_ebml, "header missing EBML");
assert!(saw_segment_start, "header missing Segment::Start");
assert!(saw_info, "header missing Info");
assert_eq!(track_entries.len(), 2, "expected 2 track entries (1 video + 1 audio)");
let codec_ids: Vec<String> = track_entries
.iter()
.map(|e| {
e.iter()
.find_map(|c| {
if let MatroskaSpec::CodecID(s) = c {
Some(s.clone())
} else {
None
}
})
.unwrap()
})
.collect();
assert!(codec_ids.iter().any(|c| c == "V_VP9"));
assert!(codec_ids.iter().any(|c| c == "A_OPUS"));
let mut broadcast2 = moq_net::Broadcast::new().produce();
let catalog2 = crate::catalog::hang::Producer::new(&mut broadcast2).unwrap();
let mut importer2 = crate::container::mkv::Import::new(broadcast2, catalog2.clone());
let mut hbuf = bytes::BytesMut::from(header.as_ref());
importer2.decode(&mut hbuf).unwrap();
let snap = catalog2.snapshot();
assert_eq!(snap.video.renditions.len(), 1);
assert_eq!(snap.audio.renditions.len(), 1);
let v = snap.video.renditions.values().next().unwrap();
assert!(matches!(v.codec, VideoCodec::VP9(_)));
let a = snap.audio.renditions.values().next().unwrap();
assert!(matches!(a.codec, AudioCodec::Opus));
assert_eq!(a.sample_rate, 48000);
}
#[tokio::test(start_paused = true)]
async fn export_waits_for_catalog_before_header() {
let broadcast = moq_net::Broadcast::new();
let mut producer = broadcast.produce();
let consumer = producer.consume();
let _catalog = crate::catalog::hang::Producer::new(&mut producer).unwrap();
let mut exporter = crate::container::mkv::Export::new(consumer).unwrap();
let result = tokio::time::timeout(std::time::Duration::from_secs(1), exporter.next()).await;
assert!(
result.is_err(),
"exporter should stay pending before any rendition arrives, got {result:?}"
);
drop(producer);
}
#[tokio::test(start_paused = true)]
async fn export_emits_blocks_for_each_frame() {
let import_bytes = synth_webm_with_frames();
let broadcast = moq_net::Broadcast::new();
let mut producer = broadcast.produce();
let consumer = producer.consume();
let catalog = crate::catalog::hang::Producer::new(&mut producer).unwrap();
let mut importer = crate::container::mkv::Import::new(producer, catalog.clone());
let mut buf = bytes::BytesMut::from(import_bytes.as_slice());
importer.decode(&mut buf).unwrap();
importer.finish().unwrap();
let mut exporter = crate::container::mkv::Export::new(consumer)
.unwrap()
.with_fragment_duration(std::time::Duration::ZERO);
let mut exported: Vec<u8> = Vec::new();
let mut importer = Some(importer);
for _ in 0..32 {
let next = tokio::time::timeout(std::time::Duration::from_millis(100), exporter.next()).await;
match next {
Ok(Ok(Some(chunk))) => exported.extend_from_slice(&chunk),
Ok(Ok(None)) => break,
Ok(Err(e)) => panic!("exporter error: {e}"),
Err(_) => {
importer = None;
}
}
}
drop(importer);
drop(exporter);
let mut cursor = Cursor::new(exported.as_slice());
let iter = WebmIterator::new(&mut cursor, &[]);
let mut blocks_per_track: std::collections::HashMap<u64, usize> = Default::default();
for tag in iter {
if let Ok(MatroskaSpec::SimpleBlock(data)) = tag
&& let Ok(sb) = SimpleBlock::try_from(data.as_slice())
{
*blocks_per_track.entry(sb.track).or_default() += 1;
}
}
assert_eq!(blocks_per_track.values().sum::<usize>(), 5, "expected 5 total blocks");
assert_eq!(blocks_per_track.len(), 2, "expected 2 tracks with blocks");
let mut bcast2 = moq_net::Broadcast::new().produce();
let cat2 = crate::catalog::hang::Producer::new(&mut bcast2).unwrap();
let mut imp2 = crate::container::mkv::Import::new(bcast2, cat2.clone());
let mut rt = bytes::BytesMut::from(exported.as_slice());
imp2.decode(&mut rt).unwrap();
imp2.finish().unwrap();
let snap = cat2.snapshot();
assert_eq!(snap.video.renditions.len(), 1);
assert_eq!(snap.audio.renditions.len(), 1);
assert!(matches!(
snap.video.renditions.values().next().unwrap().codec,
VideoCodec::VP9(_)
));
assert!(matches!(
snap.audio.renditions.values().next().unwrap().codec,
AudioCodec::Opus
));
}
#[tokio::test(start_paused = true)]
async fn export_rejects_cmaf_track() {
use hang::catalog::{Container, H264, VideoConfig};
let broadcast = moq_net::Broadcast::new();
let mut producer = broadcast.produce();
let consumer = producer.consume();
let mut catalog = crate::catalog::hang::Producer::new(&mut producer).unwrap();
let track = producer.unique_track(".avc1").unwrap();
let mut config = VideoConfig::new(H264 {
profile: 0x64,
constraints: 0,
level: 0x1f,
inline: false,
});
config.coded_width = Some(640);
config.coded_height = Some(480);
config.description = Some(Bytes::from(vec![0u8; 8]));
config.container = Container::Cmaf {
init: Bytes::from(vec![0u8; 32]),
timescale: None,
track_id: None,
};
catalog.lock().video.renditions.insert(track.name.clone(), config);
let mut exporter = crate::container::mkv::Export::new(consumer).unwrap();
let result = tokio::time::timeout(std::time::Duration::from_secs(1), exporter.next())
.await
.expect("exporter timed out");
let err = result.expect_err("expected an error");
assert!(err.to_string().contains("CMAF"), "expected CMAF rejection, got: {err}");
}
#[tokio::test(start_paused = true)]
async fn export_avc3_source_synthesizes_avcc_and_length_prefixes() {
use crate::container::Timestamp;
use hang::catalog::{Container, H264, VideoConfig};
let broadcast = moq_net::Broadcast::new();
let mut producer = broadcast.produce();
let consumer = producer.consume();
let mut catalog = crate::catalog::hang::Producer::new(&mut producer).unwrap();
let track = producer.unique_track(".avc3").unwrap();
let mut config = VideoConfig::new(H264 {
profile: 0x42,
constraints: 0xc0,
level: 0x1f,
inline: true,
});
config.coded_width = Some(320);
config.coded_height = Some(240);
config.container = Container::Legacy;
catalog.lock().video.renditions.insert(track.name.clone(), config);
const SC: &[u8] = &[0, 0, 0, 1];
let sps = &[0x67u8, 0x42, 0xc0, 0x1f, 0xde, 0xad, 0xbe, 0xef][..];
let pps = &[0x68u8, 0xce, 0x3c, 0x80][..];
let idr = &[0x65u8, 0x88, 0x84, 0x21, 0x00, 0x11, 0x22, 0x33][..];
let pslice = &[0x61u8, 0xe0, 0x12, 0x34][..];
let mut keyframe_payload = bytes::BytesMut::new();
for nal in [sps, pps, idr] {
keyframe_payload.extend_from_slice(SC);
keyframe_payload.extend_from_slice(nal);
}
let keyframe_payload = keyframe_payload.freeze();
let mut pslice_payload = bytes::BytesMut::new();
pslice_payload.extend_from_slice(SC);
pslice_payload.extend_from_slice(pslice);
let pslice_payload = pslice_payload.freeze();
let mut track_producer = crate::container::Producer::new(track, crate::catalog::hang::Container::Legacy);
track_producer
.write(crate::container::Frame {
timestamp: Timestamp::from_micros(0).unwrap(),
payload: keyframe_payload,
keyframe: true,
})
.unwrap();
track_producer
.write(crate::container::Frame {
timestamp: Timestamp::from_micros(33_000).unwrap(),
payload: pslice_payload,
keyframe: false,
})
.unwrap();
track_producer.finish().unwrap();
let mut catalog = catalog;
catalog.finish().unwrap();
let mut exporter = crate::container::mkv::Export::new(consumer)
.unwrap()
.with_fragment_duration(std::time::Duration::ZERO);
let mut exported: Vec<u8> = Vec::new();
let mut held_producer = Some(producer);
for _ in 0..32 {
let next = tokio::time::timeout(std::time::Duration::from_millis(100), exporter.next()).await;
match next {
Ok(Ok(Some(chunk))) => exported.extend_from_slice(&chunk),
Ok(Ok(None)) => break,
Ok(Err(e)) => panic!("exporter error: {e}"),
Err(_) => {
held_producer = None;
}
}
}
drop(held_producer);
drop(catalog);
drop(track_producer);
drop(exporter);
let mut cursor = Cursor::new(exported.as_slice());
let iter = WebmIterator::new(
&mut cursor,
&[
MatroskaSpec::Tracks(Master::Start),
MatroskaSpec::TrackEntry(Master::Start),
],
);
let mut codec_id: Option<String> = None;
let mut codec_private: Option<Vec<u8>> = None;
let mut sample_payloads: Vec<Vec<u8>> = Vec::new();
for tag in iter {
match tag.expect("parse exported") {
MatroskaSpec::Tracks(Master::Full(entries)) => {
for entry in entries {
if let MatroskaSpec::TrackEntry(Master::Full(children)) = entry {
for c in children {
match c {
MatroskaSpec::CodecID(s) => codec_id = Some(s),
MatroskaSpec::CodecPrivate(p) => codec_private = Some(p),
_ => {}
}
}
}
}
}
MatroskaSpec::SimpleBlock(data) => {
let sb = SimpleBlock::try_from(data.as_slice()).expect("parse SimpleBlock");
sample_payloads.push(sb.raw_frame_data().to_vec());
}
_ => {}
}
}
assert_eq!(codec_id.as_deref(), Some("V_MPEG4/ISO/AVC"));
let avcc = codec_private.expect("avcC in CodecPrivate");
assert_eq!(avcc[0], 1, "configurationVersion");
assert_eq!(avcc[1], sps[1], "AVCProfileIndication");
assert_eq!(avcc[2], sps[2], "profile_compatibility");
assert_eq!(avcc[3], sps[3], "AVCLevelIndication");
assert_eq!(avcc[4] & 0x03, 3, "lengthSizeMinusOne");
let sps_len = u16::from_be_bytes([avcc[6], avcc[7]]) as usize;
assert_eq!(sps_len, sps.len());
assert_eq!(&avcc[8..8 + sps_len], sps);
let pps_offset = 8 + sps_len + 3;
assert_eq!(&avcc[pps_offset..pps_offset + pps.len()], pps);
assert_eq!(sample_payloads.len(), 2, "expected 2 sample blocks");
let first = &sample_payloads[0];
let idr_len = u32::from_be_bytes([first[0], first[1], first[2], first[3]]) as usize;
assert_eq!(idr_len, idr.len(), "IDR length prefix");
assert_eq!(&first[4..4 + idr_len], idr, "IDR payload");
let second = &sample_payloads[1];
let pslice_len = u32::from_be_bytes([second[0], second[1], second[2], second[3]]) as usize;
assert_eq!(pslice_len, pslice.len(), "P-slice length prefix");
assert_eq!(&second[4..4 + pslice_len], pslice, "P-slice payload");
let mut bcast2 = moq_net::Broadcast::new().produce();
let cat2 = crate::catalog::hang::Producer::new(&mut bcast2).unwrap();
let mut imp2 = crate::container::mkv::Import::new(bcast2, cat2.clone());
let mut rt = bytes::BytesMut::from(exported.as_slice());
imp2.decode(&mut rt).unwrap();
imp2.finish().unwrap();
let snap = cat2.snapshot();
assert_eq!(snap.video.renditions.len(), 1);
let v = snap.video.renditions.values().next().unwrap();
assert!(matches!(v.codec, hang::catalog::VideoCodec::H264(_)));
assert_eq!(
v.description.as_ref().map(|b| b.as_ref()),
Some(avcc.as_slice()),
"re-imported description should equal the avcC we wrote"
);
}
#[tokio::test(start_paused = true)]
async fn export_fragment_duration_batches_blocks() {
let import_bytes = synth_webm_with_frames();
let broadcast = moq_net::Broadcast::new();
let mut producer = broadcast.produce();
let consumer = producer.consume();
let mut catalog = crate::catalog::hang::Producer::new(&mut producer).unwrap();
let mut importer = crate::container::mkv::Import::new(producer, catalog.clone());
let mut buf = bytes::BytesMut::from(import_bytes.as_slice());
importer.decode(&mut buf).unwrap();
importer.finish().unwrap();
catalog.finish().unwrap();
let mut exporter = crate::container::mkv::Export::new(consumer)
.unwrap()
.with_fragment_duration(std::time::Duration::from_secs(2));
let mut exported: Vec<u8> = Vec::new();
let mut importer = Some(importer);
for _ in 0..32 {
let next = tokio::time::timeout(std::time::Duration::from_millis(100), exporter.next()).await;
match next {
Ok(Ok(Some(chunk))) => exported.extend_from_slice(&chunk),
Ok(Ok(None)) => break,
Ok(Err(e)) => panic!("exporter error: {e}"),
Err(_) => {
importer = None;
}
}
}
drop(importer);
drop(catalog);
drop(exporter);
let mut cursor = Cursor::new(exported.as_slice());
let iter = WebmIterator::new(&mut cursor, &[]);
let mut cluster_count = 0;
let mut block_count = 0;
for tag in iter {
match tag.expect("parse") {
MatroskaSpec::Cluster(Master::Start) => cluster_count += 1,
MatroskaSpec::SimpleBlock(_) => block_count += 1,
_ => {}
}
}
assert_eq!(block_count, 5, "all blocks should be emitted");
assert_eq!(cluster_count, 1, "all blocks should batch into one cluster");
}
fn synth_webm_with_frames() -> Vec<u8> {
use webm_iterable::WebmWriter;
let mut opus_head = Vec::new();
opus_head.extend_from_slice(b"OpusHead");
opus_head.push(1);
opus_head.push(2);
opus_head.extend_from_slice(&0u16.to_le_bytes());
opus_head.extend_from_slice(&48000u32.to_le_bytes());
opus_head.extend_from_slice(&0i16.to_le_bytes());
opus_head.push(0);
let simple_block = |track: u64, rel_ts: i16, keyframe: bool, payload: &[u8]| -> MatroskaSpec {
SimpleBlock::new_uncheked(payload, track, rel_ts, false, None, false, keyframe).into()
};
let tags: Vec<MatroskaSpec> = vec![
MatroskaSpec::Ebml(Master::Full(vec![
MatroskaSpec::DocType("webm".to_string()),
MatroskaSpec::DocTypeVersion(2),
MatroskaSpec::DocTypeReadVersion(2),
])),
MatroskaSpec::Segment(Master::Start),
MatroskaSpec::Info(Master::Full(vec![MatroskaSpec::TimestampScale(1_000_000)])),
MatroskaSpec::Tracks(Master::Full(vec![
MatroskaSpec::TrackEntry(Master::Full(vec![
MatroskaSpec::TrackNumber(1),
MatroskaSpec::TrackUID(1),
MatroskaSpec::TrackType(1),
MatroskaSpec::CodecID("V_VP9".to_string()),
MatroskaSpec::Video(Master::Full(vec![
MatroskaSpec::PixelWidth(320),
MatroskaSpec::PixelHeight(240),
])),
])),
MatroskaSpec::TrackEntry(Master::Full(vec![
MatroskaSpec::TrackNumber(2),
MatroskaSpec::TrackUID(2),
MatroskaSpec::TrackType(2),
MatroskaSpec::CodecID("A_OPUS".to_string()),
MatroskaSpec::CodecPrivate(opus_head),
MatroskaSpec::Audio(Master::Full(vec![
MatroskaSpec::SamplingFrequency(48000.0),
MatroskaSpec::Channels(2),
])),
])),
])),
MatroskaSpec::Cluster(Master::Start),
MatroskaSpec::Timestamp(0),
simple_block(1, 0, true, b"v0"),
simple_block(2, 0, true, b"a0"),
simple_block(1, 33, false, b"v1"),
simple_block(2, 20, true, b"a1"),
simple_block(1, 66, false, b"v2"),
MatroskaSpec::Cluster(Master::End),
MatroskaSpec::Segment(Master::End),
];
let mut dest = Cursor::new(Vec::new());
{
let mut writer = WebmWriter::new(&mut dest);
for tag in &tags {
writer.write(tag).unwrap();
}
writer.flush().unwrap();
}
dest.into_inner()
}
fn synth_webm() -> Vec<u8> {
use webm_iterable::WebmWriter;
let mut opus_head = Vec::new();
opus_head.extend_from_slice(b"OpusHead");
opus_head.push(1); opus_head.push(2); opus_head.extend_from_slice(&0u16.to_le_bytes()); opus_head.extend_from_slice(&48000u32.to_le_bytes()); opus_head.extend_from_slice(&0i16.to_le_bytes()); opus_head.push(0);
let tags: Vec<MatroskaSpec> = vec![
MatroskaSpec::Ebml(Master::Full(vec![
MatroskaSpec::DocType("webm".to_string()),
MatroskaSpec::DocTypeVersion(2),
MatroskaSpec::DocTypeReadVersion(2),
])),
MatroskaSpec::Segment(Master::Start),
MatroskaSpec::Info(Master::Full(vec![MatroskaSpec::TimestampScale(1_000_000)])),
MatroskaSpec::Tracks(Master::Full(vec![
MatroskaSpec::TrackEntry(Master::Full(vec![
MatroskaSpec::TrackNumber(1),
MatroskaSpec::TrackUID(1),
MatroskaSpec::TrackType(1),
MatroskaSpec::CodecID("V_VP9".to_string()),
MatroskaSpec::Video(Master::Full(vec![
MatroskaSpec::PixelWidth(640),
MatroskaSpec::PixelHeight(480),
])),
])),
MatroskaSpec::TrackEntry(Master::Full(vec![
MatroskaSpec::TrackNumber(2),
MatroskaSpec::TrackUID(2),
MatroskaSpec::TrackType(2),
MatroskaSpec::CodecID("A_OPUS".to_string()),
MatroskaSpec::CodecPrivate(opus_head),
MatroskaSpec::Audio(Master::Full(vec![
MatroskaSpec::SamplingFrequency(48000.0),
MatroskaSpec::Channels(2),
])),
])),
])),
MatroskaSpec::Segment(Master::End),
];
let mut dest = Cursor::new(Vec::new());
{
let mut writer = WebmWriter::new(&mut dest);
for tag in &tags {
writer.write(tag).unwrap();
}
}
dest.into_inner()
}