use bytes::BytesMut;
fn import_ts(data: &[u8]) -> crate::catalog::hang::Catalog {
let mut broadcast = moq_net::Broadcast::new().produce();
let catalog = crate::catalog::Producer::new(&mut broadcast).unwrap();
let mut import = crate::container::ts::Import::new(broadcast, catalog.clone());
let mut buf = BytesMut::from(data);
import.decode(&mut buf).unwrap();
import.finish().unwrap();
catalog.snapshot()
}
#[test]
fn import_bbb_catalog() {
let data = include_bytes!("test_data/bbb.ts");
let catalog = import_ts(data);
assert_eq!(catalog.video.renditions.len(), 1, "expected one H.264 track");
assert_eq!(catalog.audio.renditions.len(), 1, "expected one AAC track");
let video = catalog.video.renditions.values().next().unwrap();
assert!(
video.codec.to_string().starts_with("avc3"),
"video codec was {}",
video.codec
);
let audio = catalog.audio.renditions.values().next().unwrap();
assert!(
audio.codec.to_string().starts_with("mp4a"),
"audio codec was {}",
audio.codec
);
assert!(audio.description.is_some(), "AAC track missing AudioSpecificConfig");
}
#[test]
fn import_kyrion_mp2_catalog() {
let data = include_bytes!("test_data/scte35/kyrion_dirtystart.ts");
let catalog = import_ts(data);
assert_eq!(catalog.audio.renditions.len(), 2, "expected both MP2 tracks");
for (name, audio) in &catalog.audio.renditions {
assert_eq!(audio.codec.to_string(), "mp2", "track {name}");
assert_eq!(audio.sample_rate, 48_000, "track {name}");
assert_eq!(audio.channel_count, 2, "track {name}");
assert!(
audio.description.is_none(),
"verbatim MP2 needs no description (track {name})"
);
}
}
#[test]
fn import_ac3_catalog() {
let data = include_bytes!("test_data/ac3.ts");
let catalog = import_ts(data);
assert_eq!(catalog.video.renditions.len(), 0);
assert_eq!(catalog.audio.renditions.len(), 1, "expected one AC-3 track");
let audio = catalog.audio.renditions.values().next().unwrap();
assert_eq!(audio.codec.to_string(), "ac-3");
assert_eq!(audio.sample_rate, 48_000);
assert_eq!(audio.channel_count, 6, "5 full-bandwidth channels + LFE");
assert!(audio.description.is_none(), "verbatim AC-3 needs no description");
}
#[test]
fn import_eac3_catalog() {
let data = include_bytes!("test_data/eac3.ts");
let catalog = import_ts(data);
assert_eq!(catalog.video.renditions.len(), 0);
assert_eq!(catalog.audio.renditions.len(), 1, "expected one E-AC-3 track");
let audio = catalog.audio.renditions.values().next().unwrap();
assert_eq!(audio.codec.to_string(), "ec-3");
assert_eq!(audio.sample_rate, 48_000);
assert_eq!(audio.channel_count, 6, "5 full-bandwidth channels + LFE");
assert!(audio.description.is_none(), "verbatim E-AC-3 needs no description");
}
#[test]
fn import_kyrion_ac3_mp2_catalog() {
let data = include_bytes!("test_data/kyrion_mpeg2av_ac3.ts");
let catalog = import_ts(data);
assert_eq!(catalog.video.renditions.len(), 0, "MPEG-2 video is clock-only");
assert_eq!(catalog.audio.renditions.len(), 2, "expected AC-3 + MP2 tracks");
for (name, audio) in &catalog.audio.renditions {
assert!(
matches!(audio.codec.to_string().as_str(), "ac-3" | "mp2"),
"unexpected codec {} (track {name})",
audio.codec
);
assert_eq!(audio.sample_rate, 48_000, "track {name}");
assert_eq!(audio.channel_count, 2, "track {name}");
assert!(audio.description.is_none(), "track {name}");
}
let codecs: std::collections::HashSet<String> =
catalog.audio.renditions.values().map(|a| a.codec.to_string()).collect();
assert_eq!(codecs.len(), 2, "one rendition per codec");
}
#[test]
fn import_resyncs_after_byte_misalignment() {
let data = include_bytes!("test_data/bbb.ts");
let mut misaligned = vec![0x00, 0x11, 0x22];
misaligned.extend_from_slice(data);
let catalog = import_ts(&misaligned);
assert_eq!(catalog.video.renditions.len(), 1, "resync failed: no video track");
assert_eq!(catalog.audio.renditions.len(), 1, "resync failed: no audio track");
}
#[test]
fn resyncs_past_false_sync_byte() {
let data = include_bytes!("test_data/bbb.ts");
let mut misaligned = vec![0x00, 0x47];
misaligned.resize(202, 0x00);
misaligned.extend_from_slice(data);
let catalog = import_ts(&misaligned);
assert_eq!(catalog.video.renditions.len(), 1, "false sync derailed demux: no video");
assert_eq!(catalog.audio.renditions.len(), 1, "false sync derailed demux: no audio");
}
#[test]
fn resyncs_across_chunk_boundaries() {
let data = include_bytes!("test_data/bbb.ts");
let mut misaligned = vec![0x00, 0x11, 0x22];
misaligned.extend_from_slice(data);
let mut broadcast = moq_net::Broadcast::new().produce();
let catalog = crate::catalog::Producer::new(&mut broadcast).unwrap();
let mut import = crate::container::ts::Import::new(broadcast, catalog.clone());
for chunk in misaligned.chunks(100) {
import.decode(&mut BytesMut::from(chunk)).unwrap();
}
import.finish().unwrap();
let snapshot = catalog.snapshot();
assert_eq!(
snapshot.video.renditions.len(),
1,
"chunked resync failed: no video track"
);
assert_eq!(
snapshot.audio.renditions.len(),
1,
"chunked resync failed: no audio track"
);
}
#[tokio::test(start_paused = true)]
async fn import_export_import_roundtrip() {
let data = include_bytes!("test_data/bbb.ts");
let mut broadcast = moq_net::Broadcast::new().produce();
let consumer = broadcast.consume();
let catalog = crate::catalog::Producer::new(&mut broadcast).unwrap();
let mut import = crate::container::ts::Import::new(broadcast, catalog.clone());
let mut buf = BytesMut::from(&data[..]);
import.decode(&mut buf).unwrap();
import.finish().unwrap();
let mut exporter = crate::container::ts::Export::new(consumer).unwrap();
let mut out = BytesMut::new();
while let Ok(res) = tokio::time::timeout(std::time::Duration::from_secs(1), exporter.next()).await {
match res.expect("exporter error") {
Some(chunk) => out.extend_from_slice(&chunk),
None => break,
}
}
assert!(!out.is_empty(), "exporter produced no TS");
assert_eq!(out.len() % 188, 0, "exported TS not packet-aligned");
let roundtrip = import_ts(&out);
assert_eq!(roundtrip.video.renditions.len(), 1, "round-trip lost the video track");
assert_eq!(roundtrip.audio.renditions.len(), 1, "round-trip lost the audio track");
}
#[tokio::test(start_paused = true)]
async fn survives_midstream_join() {
let data = include_bytes!("test_data/bbb.ts");
let pkt = |i: usize| &data[i * 188..(i + 1) * 188];
let mut buf = Vec::new();
buf.extend_from_slice(pkt(5)); buf.extend_from_slice(pkt(1)); buf.extend_from_slice(pkt(2)); buf.extend_from_slice(pkt(5)); buf.extend_from_slice(pkt(8)); buf.extend_from_slice(pkt(9));
let mut broadcast = moq_net::Broadcast::new().produce();
let consumer = broadcast.consume();
let catalog = crate::catalog::Producer::new(&mut broadcast).unwrap();
let mut import = crate::container::ts::Import::new(broadcast, catalog.clone());
import
.decode(&mut BytesMut::from(&buf[..]))
.expect("a mid-stream join must not abort the demux");
import.finish().unwrap();
let snapshot = catalog.snapshot();
assert_eq!(snapshot.video.renditions.len(), 1, "video track lost across the join");
let name = snapshot.video.renditions.keys().next().unwrap().clone();
let track = consumer.subscribe_track(&moq_net::Track::new(name)).unwrap();
let mut reader = crate::container::Consumer::new(track, crate::catalog::hang::Container::Legacy);
let mut frames = Vec::new();
while let Ok(Ok(Some(frame))) = tokio::time::timeout(std::time::Duration::from_millis(50), reader.read()).await {
frames.push(frame);
}
assert_eq!(frames.len(), 1, "expected only the post-join IDR, got {}", frames.len());
assert!(frames[0].keyframe, "the first surviving frame must be the keyframe");
}
#[tokio::test(start_paused = true)]
async fn kyrion_dirtystart_extracts_real_cues() {
let data = include_bytes!("test_data/scte35/kyrion_dirtystart.ts");
let mut broadcast = moq_net::Broadcast::new().produce();
let consumer = broadcast.consume();
let catalog = crate::catalog::Producer::with_catalog(
&mut broadcast,
crate::catalog::hang::Catalog::<crate::container::ts::scte35::Ext>::default(),
)
.unwrap();
let mut import = crate::container::ts::Import::new(broadcast, catalog.clone());
import
.decode(&mut BytesMut::from(&data[..]))
.expect("a dirty mid-stream join must not abort the demux");
import.finish().unwrap();
let snap = catalog.snapshot();
assert_eq!(snap.video.renditions.len(), 1, "video track lost across the dirty join");
let name = snap.scte35.renditions.keys().next().expect("scte35 track").clone();
let track = consumer.subscribe_track(&moq_net::Track::new(name)).unwrap();
let mut reader = crate::container::Consumer::new(track, crate::catalog::hang::Container::Legacy);
let mut cues = Vec::new();
while let Ok(Ok(Some(frame))) = tokio::time::timeout(std::time::Duration::from_millis(50), reader.read()).await {
cues.push((frame.payload.to_vec(), frame.timestamp));
}
assert_eq!(cues.len(), 6, "expected the six real splice_inserts");
assert!(
cues.iter().all(|(b, _)| b.first() == Some(&0xfc)),
"every cue is a splice_info_section (table_id 0xFC)"
);
assert!(
cues.iter().all(|(b, _)| b.get(13) == Some(&0x05)),
"every cue is a splice_insert (command type 0x05)"
);
let distinct: std::collections::HashSet<&Vec<u8>> = cues.iter().map(|(b, _)| b).collect();
assert_eq!(distinct.len(), 6, "six distinct cue sections");
assert!(
cues.iter().all(|(_, ts)| *ts != crate::container::Timestamp::ZERO),
"cues stamped with the video PTS, not zero"
);
}
#[test]
fn import_handles_unaligned_chunks() {
let data = include_bytes!("test_data/bbb.ts");
let mut broadcast = moq_net::Broadcast::new().produce();
let catalog = crate::catalog::Producer::new(&mut broadcast).unwrap();
let mut import = crate::container::ts::Import::new(broadcast, catalog.clone());
for chunk in data.chunks(100) {
let mut buf = BytesMut::from(chunk);
import.decode(&mut buf).unwrap();
}
import.finish().unwrap();
let snapshot = catalog.snapshot();
assert_eq!(snapshot.video.renditions.len(), 1);
assert_eq!(snapshot.audio.renditions.len(), 1);
}