use std::io::Cursor;
use bytes::{Bytes, BytesMut};
use hang::catalog::{AAC, AudioConfig, Container, H264, VideoConfig};
use mpeg2ts::es::StreamType;
use mpeg2ts::pes::{PesPacketReader, ReadPesPacket};
use mpeg2ts::ts::{ReadTsPacket, TsPacketReader, TsPayload};
use crate::catalog::hang::Container as HangContainer;
use crate::container::ts::{Export, scte35};
use crate::container::{Frame, Producer, Timestamp};
const SC: &[u8] = &[0, 0, 0, 1];
const SPS: &[u8] = &[0x67, 0x42, 0xc0, 0x1f, 0xde];
const PPS: &[u8] = &[0x68, 0xce, 0x3c, 0x80];
const CUE: &[u8] = &[
0xfc, 0x30, 0x1b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xf0, 0x0a, 0x05, 0x00, 0x00, 0x2b, 0xb4, 0x7f,
0xdf, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0xad, 0x25, 0xe8, 0x39,
];
fn annexb(nals: &[&[u8]]) -> Bytes {
let mut buf = BytesMut::new();
for nal in nals {
buf.extend_from_slice(SC);
buf.extend_from_slice(nal);
}
buf.freeze()
}
fn length_prefixed(nals: &[&[u8]]) -> Bytes {
let mut buf = BytesMut::new();
for nal in nals {
buf.extend_from_slice(&(nal.len() as u32).to_be_bytes());
buf.extend_from_slice(nal);
}
buf.freeze()
}
async fn drain(consumer: moq_net::BroadcastConsumer) -> BytesMut {
drain_with(Export::new(consumer).unwrap()).await
}
async fn drain_with<E: scte35::Catalog>(mut exporter: Export<E>) -> BytesMut {
let mut out = BytesMut::new();
while let Ok(res) = tokio::time::timeout(std::time::Duration::from_secs(1), exporter.next()).await {
let Some(chunk) = res.expect("exporter error") else {
break;
};
out.extend_from_slice(&chunk);
}
out
}
fn assert_packet_aligned(ts: &[u8]) {
assert!(!ts.is_empty(), "no TS output");
assert_eq!(ts.len() % 188, 0, "output not a whole number of 188-byte packets");
assert!(
ts.chunks(188).all(|p| p[0] == 0x47),
"every packet must start with the sync byte"
);
}
#[tokio::test(start_paused = true)]
async fn export_aac_roundtrip() {
let mut broadcast = moq_net::Broadcast::new().produce();
let consumer = broadcast.consume();
let mut catalog = crate::catalog::Producer::new(&mut broadcast).unwrap();
let track = broadcast.unique_track(".aac").unwrap();
let name = track.name.clone();
{
let mut cfg = AudioConfig::new(AAC { profile: 2 }, 48_000, 2);
cfg.container = Container::Legacy;
catalog.lock().audio.renditions.insert(name.clone(), cfg);
}
let mut producer = Producer::new(track, HangContainer::Legacy);
let frames: Vec<Bytes> = vec![
Bytes::from_static(&[0x01, 0x02, 0x03, 0x04]),
Bytes::from_static(&[0x10, 0x11, 0x12, 0x13, 0x14]),
Bytes::from(vec![0x20u8; 200]),
];
for (i, payload) in frames.iter().enumerate() {
producer
.write(Frame {
timestamp: Timestamp::from_millis(i as u64 * 20).unwrap(),
payload: payload.clone(),
keyframe: true,
})
.unwrap();
producer.finish_group().unwrap();
}
producer.finish().unwrap();
let ts = drain(consumer).await;
assert_packet_aligned(&ts);
let mut reader = TsPacketReader::new(Cursor::new(ts.as_ref()));
let mut saw_pat = false;
let mut saw_pmt = false;
while let Some(packet) = reader.read_ts_packet().unwrap() {
match packet.payload {
Some(TsPayload::Pat(_)) => saw_pat = true,
Some(TsPayload::Pmt(pmt)) => {
saw_pmt = true;
assert_eq!(pmt.es_info.len(), 1);
assert_eq!(pmt.es_info[0].stream_type, StreamType::AdtsAac);
}
_ => {}
}
}
assert!(saw_pat, "missing PAT");
assert!(saw_pmt, "missing PMT");
let mut pes = PesPacketReader::new(TsPacketReader::new(Cursor::new(ts.as_ref())));
let mut recovered: Vec<(u64, Vec<u8>)> = Vec::new();
while let Some(packet) = pes.read_pes_packet().unwrap() {
let pts = packet.header.pts.expect("PES carried no PTS").as_u64();
assert!(packet.data.len() >= 7, "PES payload shorter than an ADTS header");
recovered.push((pts, packet.data[7..].to_vec()));
}
assert_eq!(recovered.len(), frames.len());
for (i, payload) in frames.iter().enumerate() {
let (pts, raw) = &recovered[i];
assert_eq!(*pts, i as u64 * 20 * 90, "PTS should be ms * 90 (90 kHz)");
assert_eq!(raw.as_slice(), payload.as_ref(), "raw AAC payload mismatch");
}
}
fn reassemble_video(ts: &[u8], expected_stream_type: StreamType) -> Vec<u8> {
let mut reader = TsPacketReader::new(Cursor::new(ts));
let mut video_pid = None;
let mut saw_random_access = false;
let mut saw_pcr = false;
let mut reassembled: Vec<u8> = Vec::new();
let mut unbounded = false;
while let Some(packet) = reader.read_ts_packet().unwrap() {
match packet.payload {
Some(TsPayload::Pmt(pmt)) => {
assert_eq!(pmt.es_info.len(), 1);
assert_eq!(pmt.es_info[0].stream_type, expected_stream_type);
video_pid = Some(pmt.es_info[0].elementary_pid);
}
Some(TsPayload::PesStart(pes)) => {
if let Some(af) = &packet.adaptation_field {
saw_random_access |= af.random_access_indicator;
saw_pcr |= af.pcr.is_some();
}
unbounded = pes.pes_packet_len == 0;
reassembled.extend_from_slice(&pes.data);
}
Some(TsPayload::PesContinuation(bytes)) => reassembled.extend_from_slice(&bytes),
_ => {}
}
}
assert!(video_pid.is_some(), "missing video PMT entry");
assert!(saw_random_access, "keyframe should set random_access_indicator");
assert!(saw_pcr, "PCR pid should carry a PCR on the keyframe");
assert!(unbounded, "video PES should be unbounded");
reassembled
}
#[tokio::test(start_paused = true)]
async fn export_avc3_in_band_reassembles() {
let mut broadcast = moq_net::Broadcast::new().produce();
let consumer = broadcast.consume();
let mut catalog = crate::catalog::Producer::new(&mut broadcast).unwrap();
let track = broadcast.unique_track(".avc3").unwrap();
let name = track.name.clone();
{
let mut cfg = VideoConfig::new(H264 {
profile: 0x64,
constraints: 0,
level: 0x1f,
inline: true,
});
cfg.container = Container::Legacy;
catalog.lock().video.renditions.insert(name.clone(), cfg);
}
let mut producer = Producer::new(track, HangContainer::Legacy);
let mut idr = vec![0x65u8];
idr.extend(std::iter::repeat_n(0xAB, 300));
producer
.write(Frame {
timestamp: Timestamp::from_millis(0).unwrap(),
payload: annexb(&[SPS, PPS, &idr]),
keyframe: true,
})
.unwrap();
producer.finish().unwrap();
let ts = drain(consumer).await;
assert_packet_aligned(&ts);
let reassembled = reassemble_video(&ts, StreamType::H264);
assert_eq!(reassembled.as_slice(), annexb(&[SPS, PPS, &idr]).as_ref());
}
#[tokio::test(start_paused = true)]
async fn export_avc1_out_of_band_reassembles() {
let mut broadcast = moq_net::Broadcast::new().produce();
let consumer = broadcast.consume();
let mut catalog = crate::catalog::Producer::new(&mut broadcast).unwrap();
let avcc = crate::codec::h264::build_avcc(SPS, PPS).unwrap();
let track = broadcast.unique_track(".avc1").unwrap();
let name = track.name.clone();
{
let mut cfg = VideoConfig::new(H264 {
profile: 0x64,
constraints: 0,
level: 0x1f,
inline: false,
});
cfg.container = Container::Legacy;
cfg.description = Some(avcc);
catalog.lock().video.renditions.insert(name.clone(), cfg);
}
let mut producer = Producer::new(track, HangContainer::Legacy);
let mut idr = vec![0x65u8];
idr.extend(std::iter::repeat_n(0xAB, 300));
producer
.write(Frame {
timestamp: Timestamp::from_millis(0).unwrap(),
payload: length_prefixed(&[&idr]),
keyframe: true,
})
.unwrap();
producer.finish().unwrap();
let ts = drain(consumer).await;
assert_packet_aligned(&ts);
let reassembled = reassemble_video(&ts, StreamType::H264);
assert_eq!(reassembled.as_slice(), annexb(&[SPS, PPS, &idr]).as_ref());
}
#[tokio::test(start_paused = true)]
async fn export_scte35_roundtrip() {
let data = include_bytes!("test_data/bbb.ts");
let mut broadcast = moq_net::Broadcast::new().produce();
let consumer = broadcast.consume();
let mut catalog =
crate::catalog::Producer::with_catalog(&mut broadcast, crate::catalog::hang::Catalog::<scte35::Ext>::default())
.unwrap();
let scte = broadcast.unique_track(".scte35").unwrap();
let scte_name = scte.name.clone();
{
let mut cfg = scte35::Config::new();
cfg.container = Container::Legacy;
catalog.lock().scte35.renditions.insert(scte_name.clone(), cfg);
}
let mut scte_producer = Producer::new(scte, HangContainer::Legacy);
scte_producer
.write(Frame {
timestamp: Timestamp::from_millis(40).unwrap(),
payload: Bytes::from_static(CUE),
keyframe: true,
})
.unwrap();
scte_producer.finish_group().unwrap();
scte_producer.finish().unwrap();
let mut import = crate::container::ts::Import::new(broadcast, catalog.clone());
import.decode(&mut BytesMut::from(&data[..])).unwrap();
import.finish().unwrap();
let ts = drain_with(Export::with_scte35(consumer, crate::catalog::CatalogFormat::Hang).unwrap()).await;
assert_packet_aligned(&ts);
let mut reader = TsPacketReader::new(Cursor::new(ts.as_ref()));
let mut saw_scte_es = false;
let mut saw_cuei = false;
while let Some(packet) = reader.read_ts_packet().unwrap() {
if let Some(TsPayload::Pmt(pmt)) = packet.payload {
saw_scte_es = pmt
.es_info
.iter()
.any(|e| e.stream_type == StreamType::Dts8ChannelLosslessAudio);
saw_cuei = pmt
.program_info
.iter()
.any(|d| d.tag == 0x05 && d.data.len() >= 4 && &d.data[0..4] == b"CUEI");
break;
}
}
assert!(saw_scte_es, "PMT missing the SCTE-35 elementary stream (0x86)");
assert!(saw_cuei, "PMT missing the program-level CUEI registration descriptor");
let mut broadcast2 = moq_net::Broadcast::new().produce();
let consumer2 = broadcast2.consume();
let catalog2 = crate::catalog::Producer::with_catalog(
&mut broadcast2,
crate::catalog::hang::Catalog::<scte35::Ext>::default(),
)
.unwrap();
let mut import2 = crate::container::ts::Import::new(broadcast2, catalog2.clone());
import2.decode(&mut BytesMut::from(ts.as_ref())).unwrap();
import2.finish().unwrap();
let snapshot = catalog2.snapshot();
assert_eq!(snapshot.scte35.renditions.len(), 1, "round-trip lost the SCTE-35 track");
let name = snapshot.scte35.renditions.keys().next().unwrap();
let track = consumer2.subscribe_track(&moq_net::Track::new(name.clone())).unwrap();
let mut scte_reader = crate::container::Consumer::new(track, HangContainer::Legacy);
let frame = scte_reader
.read()
.await
.unwrap()
.expect("no SCTE-35 frame after round-trip");
assert_eq!(
frame.payload.as_ref(),
CUE,
"SCTE-35 section did not round-trip byte-for-byte"
);
}
#[tokio::test(start_paused = true)]
async fn scte35_without_video_export_is_rejected() {
let mut broadcast = moq_net::Broadcast::new().produce();
let consumer = broadcast.consume();
let mut catalog =
crate::catalog::Producer::with_catalog(&mut broadcast, crate::catalog::hang::Catalog::<scte35::Ext>::default())
.unwrap();
let scte = broadcast.unique_track(".scte35").unwrap();
let scte_name = scte.name.clone();
{
let mut cfg = scte35::Config::new();
cfg.container = Container::Legacy;
catalog.lock().scte35.renditions.insert(scte_name, cfg);
}
let mut producer = Producer::new(scte, HangContainer::Legacy);
producer
.write(Frame {
timestamp: Timestamp::from_millis(0).unwrap(),
payload: Bytes::from_static(CUE),
keyframe: true,
})
.unwrap();
producer.finish_group().unwrap();
producer.finish().unwrap();
let mut exporter = Export::with_scte35(consumer, crate::catalog::CatalogFormat::Hang).unwrap();
let err = loop {
match tokio::time::timeout(std::time::Duration::from_secs(1), exporter.next()).await {
Ok(Ok(Some(_))) => continue,
Ok(Ok(None)) => panic!("export completed; a cue program without video must be rejected"),
Ok(Err(e)) => break e,
Err(_) => panic!("export neither errored nor completed"),
}
};
assert!(
err.to_string().contains("requires a video track"),
"expected a video-required rejection, got: {err}"
);
}
async fn read_frames(consumer: &moq_net::BroadcastConsumer, name: &str) -> Vec<Vec<u8>> {
let track = consumer
.subscribe_track(&moq_net::Track::new(name.to_string()))
.unwrap();
let mut reader = crate::container::Consumer::new(track, HangContainer::Legacy);
let mut frames = Vec::new();
while let Ok(res) = tokio::time::timeout(std::time::Duration::from_millis(50), reader.read()).await {
let Some(frame) = res.unwrap() else { break };
frames.push(frame.payload.to_vec());
}
frames
}
#[tokio::test(start_paused = true)]
async fn mp2_kyrion_roundtrip_byte_exact() {
let data = include_bytes!("test_data/scte35/kyrion_dirtystart.ts");
let mut broadcast = moq_net::Broadcast::new().produce();
let consumer = broadcast.consume();
let catalog = crate::catalog::Producer::new(&mut broadcast).unwrap();
let mut import = crate::container::ts::Import::new(broadcast, catalog.clone());
import.decode(&mut BytesMut::from(&data[..])).unwrap();
import.finish().unwrap();
let names: Vec<String> = catalog.snapshot().audio.renditions.keys().cloned().collect();
assert_eq!(names.len(), 2, "both Kyrion MP2 programs");
let mut ingested = Vec::new();
for name in &names {
let frames = read_frames(&consumer, name).await;
assert!(!frames.is_empty(), "{name}: no MP2 frames");
assert!(
frames.iter().all(|f| f[0] == 0xFF && f[1] & 0xE0 == 0xE0),
"{name}: whole-frame carriage starts at the Layer II sync word"
);
ingested.push(frames);
}
let ts = drain(consumer).await;
assert_packet_aligned(&ts);
let mut reader = TsPacketReader::new(Cursor::new(ts.as_ref()));
while let Some(packet) = reader.read_ts_packet().unwrap() {
if let Some(TsPayload::Pmt(pmt)) = packet.payload {
let mp2 = pmt
.es_info
.iter()
.filter(|e| e.stream_type == StreamType::Mpeg1Audio)
.count();
assert_eq!(mp2, 2, "PMT must re-announce both MP2 streams as 0x03");
break;
}
}
let mut broadcast2 = moq_net::Broadcast::new().produce();
let consumer2 = broadcast2.consume();
let catalog2 = crate::catalog::Producer::new(&mut broadcast2).unwrap();
let mut import2 = crate::container::ts::Import::new(broadcast2, catalog2.clone());
import2.decode(&mut BytesMut::from(ts.as_ref())).unwrap();
import2.finish().unwrap();
let names2: Vec<String> = catalog2.snapshot().audio.renditions.keys().cloned().collect();
assert_eq!(names2.len(), 2, "round-trip lost an MP2 track");
let mut roundtripped = Vec::new();
for name in &names2 {
roundtripped.push(read_frames(&consumer2, name).await);
}
ingested.sort();
roundtripped.sort();
assert_eq!(roundtripped, ingested, "MP2 frames must survive byte-for-byte");
}
#[tokio::test(start_paused = true)]
async fn ac3_roundtrip_byte_exact() {
let data = include_bytes!("test_data/ac3.ts");
let mut broadcast = moq_net::Broadcast::new().produce();
let consumer = broadcast.consume();
let catalog = crate::catalog::Producer::new(&mut broadcast).unwrap();
let mut import = crate::container::ts::Import::new(broadcast, catalog.clone());
import.decode(&mut BytesMut::from(&data[..])).unwrap();
import.finish().unwrap();
let name = catalog
.snapshot()
.audio
.renditions
.keys()
.next()
.expect("an AC-3 track")
.clone();
let ingested = read_frames(&consumer, &name).await;
assert!(!ingested.is_empty(), "no AC-3 frames");
assert!(
ingested.iter().all(|f| f[0] == 0x0B && f[1] == 0x77),
"whole-frame carriage starts at the AC-3 sync word"
);
let ts = drain(consumer).await;
assert_packet_aligned(&ts);
let mut reader = TsPacketReader::new(Cursor::new(ts.as_ref()));
let mut checked_pmt = false;
while let Some(packet) = reader.read_ts_packet().unwrap() {
if let Some(TsPayload::Pmt(pmt)) = packet.payload {
assert_eq!(pmt.es_info.len(), 1);
assert_eq!(pmt.es_info[0].stream_type, StreamType::DolbyDigitalUpToSixChannelAudio);
assert!(
pmt.es_info[0]
.descriptors
.iter()
.any(|d| d.tag == 0x05 && d.data.as_slice() == b"AC-3"),
"PMT missing the ES-level 'AC-3' registration descriptor"
);
checked_pmt = true;
break;
}
}
assert!(checked_pmt, "missing PMT");
let mut broadcast2 = moq_net::Broadcast::new().produce();
let consumer2 = broadcast2.consume();
let catalog2 = crate::catalog::Producer::new(&mut broadcast2).unwrap();
let mut import2 = crate::container::ts::Import::new(broadcast2, catalog2.clone());
import2.decode(&mut BytesMut::from(ts.as_ref())).unwrap();
import2.finish().unwrap();
let name2 = catalog2
.snapshot()
.audio
.renditions
.keys()
.next()
.expect("round-trip lost the AC-3 track")
.clone();
let roundtripped = read_frames(&consumer2, &name2).await;
assert_eq!(roundtripped, ingested, "AC-3 frames must survive byte-for-byte");
}
#[tokio::test(start_paused = true)]
async fn eac3_roundtrip_byte_exact() {
let data = include_bytes!("test_data/eac3.ts");
let mut broadcast = moq_net::Broadcast::new().produce();
let consumer = broadcast.consume();
let catalog = crate::catalog::Producer::new(&mut broadcast).unwrap();
let mut import = crate::container::ts::Import::new(broadcast, catalog.clone());
import.decode(&mut BytesMut::from(&data[..])).unwrap();
import.finish().unwrap();
let name = catalog
.snapshot()
.audio
.renditions
.keys()
.next()
.expect("an E-AC-3 track")
.clone();
let ingested = read_frames(&consumer, &name).await;
assert!(!ingested.is_empty(), "no E-AC-3 frames");
assert!(
ingested.iter().all(|f| f[0] == 0x0B && f[1] == 0x77),
"whole-frame carriage starts at the E-AC-3 sync word"
);
let ts = drain(consumer).await;
assert_packet_aligned(&ts);
let mut reader = TsPacketReader::new(Cursor::new(ts.as_ref()));
let mut checked_pmt = false;
while let Some(packet) = reader.read_ts_packet().unwrap() {
if let Some(TsPayload::Pmt(pmt)) = packet.payload {
assert_eq!(pmt.es_info.len(), 1);
assert_eq!(
pmt.es_info[0].stream_type,
StreamType::DolbyDigitalPlusUpTo16ChannelAudioForAtsc
);
assert!(
pmt.es_info[0]
.descriptors
.iter()
.any(|d| d.tag == 0x05 && d.data.as_slice() == b"EAC3"),
"PMT missing the ES-level 'EAC3' registration descriptor"
);
checked_pmt = true;
break;
}
}
assert!(checked_pmt, "missing PMT");
let mut broadcast2 = moq_net::Broadcast::new().produce();
let consumer2 = broadcast2.consume();
let catalog2 = crate::catalog::Producer::new(&mut broadcast2).unwrap();
let mut import2 = crate::container::ts::Import::new(broadcast2, catalog2.clone());
import2.decode(&mut BytesMut::from(ts.as_ref())).unwrap();
import2.finish().unwrap();
let name2 = catalog2
.snapshot()
.audio
.renditions
.keys()
.next()
.expect("round-trip lost the E-AC-3 track")
.clone();
let roundtripped = read_frames(&consumer2, &name2).await;
assert_eq!(roundtripped, ingested, "E-AC-3 frames must survive byte-for-byte");
}
async fn read_audio_by_codec(
consumer: &moq_net::BroadcastConsumer,
catalog: &crate::catalog::Producer,
) -> std::collections::BTreeMap<String, Vec<Vec<u8>>> {
let mut out = std::collections::BTreeMap::new();
for (name, config) in &catalog.snapshot().audio.renditions {
out.insert(config.codec.to_string(), read_frames(consumer, name).await);
}
out
}
#[tokio::test(start_paused = true)]
async fn kyrion_ac3_mp2_roundtrip_byte_exact() {
let data = include_bytes!("test_data/kyrion_mpeg2av_ac3.ts");
let mut broadcast = moq_net::Broadcast::new().produce();
let consumer = broadcast.consume();
let catalog = crate::catalog::Producer::new(&mut broadcast).unwrap();
let mut import = crate::container::ts::Import::new(broadcast, catalog.clone());
import.decode(&mut BytesMut::from(&data[..])).unwrap();
import.finish().unwrap();
let ingested = read_audio_by_codec(&consumer, &catalog).await;
assert_eq!(
ingested.keys().cloned().collect::<Vec<_>>(),
["ac-3", "mp2"],
"both real audio codecs cataloged"
);
assert!(
ingested["ac-3"].iter().all(|f| f[0] == 0x0B && f[1] == 0x77),
"AC-3 whole-frame carriage"
);
assert!(
ingested["mp2"].iter().all(|f| f[0] == 0xFF && f[1] & 0xE0 == 0xE0),
"MP2 whole-frame carriage"
);
let ts = drain(consumer).await;
assert_packet_aligned(&ts);
let mut reader = TsPacketReader::new(Cursor::new(ts.as_ref()));
while let Some(packet) = reader.read_ts_packet().unwrap() {
if let Some(TsPayload::Pmt(pmt)) = packet.payload {
assert_eq!(pmt.es_info.len(), 2, "audio-only program: AC-3 + MP2");
let ac3 = pmt
.es_info
.iter()
.find(|e| e.stream_type == StreamType::DolbyDigitalUpToSixChannelAudio)
.expect("AC-3 ES re-announced as 0x81");
assert!(
ac3.descriptors
.iter()
.any(|d| d.tag == 0x05 && d.data.as_slice() == b"AC-3"),
"AC-3 registration descriptor"
);
assert!(
pmt.es_info.iter().any(|e| e.stream_type == StreamType::Mpeg1Audio),
"MP2 re-announced as 0x03 (48 kHz is an MPEG-1 rate)"
);
break;
}
}
let mut broadcast2 = moq_net::Broadcast::new().produce();
let consumer2 = broadcast2.consume();
let catalog2 = crate::catalog::Producer::new(&mut broadcast2).unwrap();
let mut import2 = crate::container::ts::Import::new(broadcast2, catalog2.clone());
import2.decode(&mut BytesMut::from(ts.as_ref())).unwrap();
import2.finish().unwrap();
let roundtripped = read_audio_by_codec(&consumer2, &catalog2).await;
assert_eq!(roundtripped, ingested, "both audio streams survive byte-for-byte");
}
async fn read_cues(consumer: &moq_net::BroadcastConsumer, name: &str) -> Vec<(Vec<u8>, Timestamp)> {
let track = consumer
.subscribe_track(&moq_net::Track::new(name.to_string()))
.unwrap();
let mut reader = crate::container::Consumer::new(track, HangContainer::Legacy);
let mut cues = Vec::new();
while let Ok(res) = tokio::time::timeout(std::time::Duration::from_millis(50), reader.read()).await {
let Some(frame) = res.unwrap() else { break };
cues.push((frame.payload.to_vec(), frame.timestamp));
}
cues
}
#[tokio::test(start_paused = true)]
async fn scte35_fixtures_survive_roundtrip() {
type Fixture = (&'static str, usize, usize, &'static [u8], &'static [u8]);
let fixtures: &[Fixture] = &[
("ffmpeg", 5, 5, &[], include_bytes!("test_data/scte35/ffmpeg.ts")),
("gst480i", 5, 5, &[], include_bytes!("test_data/scte35/gst480i.ts")),
("bbb5s", 5, 5, &[], include_bytes!("test_data/scte35/bbb5s.ts")),
(
"tsduck",
10,
5,
&[0x00, 0x05, 0x06, 0xff],
include_bytes!("test_data/scte35/tsduck.ts"),
),
(
"kyrion_dirtystart",
6,
6,
&[],
include_bytes!("test_data/scte35/kyrion_dirtystart.ts"),
),
];
const KNOWN_SPLICE_COMMANDS: [u8; 6] = [0x00, 0x04, 0x05, 0x06, 0x07, 0xff];
for (source, total, distinct, command_types, data) in fixtures {
let mut broadcast = moq_net::Broadcast::new().produce();
let consumer = broadcast.consume();
let catalog = crate::catalog::Producer::with_catalog(
&mut broadcast,
crate::catalog::hang::Catalog::<scte35::Ext>::default(),
)
.unwrap();
let mut import = crate::container::ts::Import::new(broadcast, catalog.clone());
import.decode(&mut BytesMut::from(&data[..])).unwrap();
import.finish().unwrap();
let snap = catalog.snapshot();
assert!(!snap.video.renditions.is_empty(), "{source}: video track from the clip");
let name = snap.scte35.renditions.keys().next().expect("a scte35 track").clone();
let ingested = read_cues(&consumer, &name).await;
assert_eq!(ingested.len(), *total, "{source}: {total} cues on ingest");
assert!(
ingested.iter().all(|(b, _)| b.first() == Some(&0xfc)),
"{source}: every cue is a splice_info_section (table_id 0xFC)"
);
let unique: std::collections::HashSet<&Vec<u8>> = ingested.iter().map(|(b, _)| b).collect();
assert_eq!(
unique.len(),
*distinct,
"{source}: {distinct} distinct cue sections, not dups"
);
assert!(
ingested
.iter()
.all(|(b, _)| b.get(13).is_some_and(|t| KNOWN_SPLICE_COMMANDS.contains(t))),
"{source}: every cue carries a known splice_command_type"
);
if !command_types.is_empty() {
let mut got: Vec<u8> = ingested.iter().filter_map(|(b, _)| b.get(13).copied()).collect();
got.sort_unstable();
got.dedup();
assert_eq!(got.as_slice(), *command_types, "{source}: splice_command_type set");
}
assert!(
ingested.iter().all(|(_, ts)| *ts != Timestamp::ZERO),
"{source}: cues stamped with the video PTS, not zero"
);
let ts = drain_with(Export::with_scte35(consumer, crate::catalog::CatalogFormat::Hang).unwrap()).await;
assert_packet_aligned(&ts);
let mut broadcast2 = moq_net::Broadcast::new().produce();
let consumer2 = broadcast2.consume();
let catalog2 = crate::catalog::Producer::with_catalog(
&mut broadcast2,
crate::catalog::hang::Catalog::<scte35::Ext>::default(),
)
.unwrap();
let mut import2 = crate::container::ts::Import::new(broadcast2, catalog2.clone());
import2.decode(&mut BytesMut::from(ts.as_ref())).unwrap();
import2.finish().unwrap();
let name2 = catalog2
.snapshot()
.scte35
.renditions
.keys()
.next()
.expect("a scte35 track")
.clone();
let roundtripped = read_cues(&consumer2, &name2).await;
let before: Vec<&Vec<u8>> = ingested.iter().map(|(b, _)| b).collect();
let after: Vec<&Vec<u8>> = roundtripped.iter().map(|(b, _)| b).collect();
assert_eq!(
after, before,
"{source}: every section survived TS -> MoQ -> TS byte-for-byte"
);
}
}