use std::collections::{HashMap, HashSet};
use std::io::Read;
use std::sync::{Arc, Mutex};
use bytes::{Buf, BytesMut};
use mpeg2ts::es::StreamType;
use mpeg2ts::pes::PesHeader;
use mpeg2ts::ts::payload::Pes;
use mpeg2ts::ts::{Pid, ReadTsPacket, TsPacket, TsPacketReader, TsPayload};
use tokio::io::{AsyncRead, AsyncReadExt};
use super::adts;
use super::scte35;
use crate::catalog::hang::CatalogExt;
use crate::codec::{aac, ac3, eac3, h264, h265, legacy, mp2};
use crate::container::Timestamp;
pub struct Import<E: scte35::Catalog = ()> {
broadcast: moq_net::BroadcastProducer,
catalog: crate::catalog::Producer<E>,
feed: Feed,
reader: TsPacketReader<Feed>,
pmt_pids: HashSet<Pid>,
streams: HashMap<Pid, Stream<E>>,
pending: HashMap<Pid, Pending>,
initialized: bool,
audio_burst: Option<u64>,
scratch: Vec<u8>,
synced: bool,
scte: HashMap<u16, ScteStream<E>>,
supports_scte35: bool,
last_pts: Option<Timestamp>,
media_unwrap: PtsUnwrap,
}
impl<E: scte35::Catalog> Import<E> {
pub fn new(broadcast: moq_net::BroadcastProducer, catalog: crate::catalog::Producer<E>) -> Self {
let feed = Feed::default();
let mut snapshot = catalog.snapshot();
let supports_scte35 = snapshot.scte35_mut().is_some();
Self {
broadcast,
catalog,
reader: TsPacketReader::new(feed.clone()),
feed,
pmt_pids: HashSet::new(),
streams: HashMap::new(),
pending: HashMap::new(),
initialized: false,
audio_burst: None,
scratch: Vec::new(),
synced: false,
scte: HashMap::new(),
supports_scte35,
last_pts: None,
media_unwrap: PtsUnwrap::default(),
}
}
pub fn is_initialized(&self) -> bool {
self.initialized
}
pub async fn decode_from<T: AsyncRead + Unpin>(&mut self, reader: &mut T) -> anyhow::Result<()> {
let mut chunk = BytesMut::with_capacity(64 * 1024);
loop {
chunk.clear();
let n = reader.read_buf(&mut chunk).await?;
if n == 0 {
break;
}
self.decode(&mut chunk)?;
}
Ok(())
}
pub fn decode<T: Buf + AsRef<[u8]>>(&mut self, buf: &mut T) -> anyhow::Result<()> {
while buf.has_remaining() {
let chunk = buf.chunk();
self.scratch.extend_from_slice(chunk);
let len = chunk.len();
buf.advance(len);
}
let mut off = 0;
while off + TsPacket::SIZE <= self.scratch.len() {
if !self.synced || self.scratch[off] != 0x47 {
self.synced = false;
loop {
let Some(rel) = memchr::memchr(0x47, &self.scratch[off..]) else {
off = self.scratch.len();
break;
};
off += rel;
match self.scratch.get(off + TsPacket::SIZE) {
Some(&0x47) => {
self.synced = true;
break;
}
Some(_) => off += 1,
None => break,
}
}
if !self.synced {
break;
}
continue;
}
let pkt: [u8; TsPacket::SIZE] = self.scratch[off..off + TsPacket::SIZE].try_into().unwrap();
off += TsPacket::SIZE;
let pid = (((pkt[1] & 0x1f) as u16) << 8) | pkt[2] as u16;
let pts = self.last_pts.unwrap_or(Timestamp::ZERO);
if let Some(scte) = self.scte.get_mut(&pid) {
scte.packet(&pkt, pts)?;
continue;
}
if let Ok(p) = Pid::new(pid)
&& matches!(self.streams.get(&p), Some(Stream::Ignored))
{
continue;
}
if pid != Pid::PAT
&& !Pid::new(pid).is_ok_and(|p| self.pmt_pids.contains(&p) || self.streams.contains_key(&p))
{
continue;
}
{
let mut state = self.feed.lock();
state.data.clear();
state.data.extend_from_slice(&pkt);
state.pos = 0;
}
while let Some(packet) = self.reader.read_ts_packet()? {
self.handle_packet(packet)?;
}
}
self.scratch.drain(..off);
Ok(())
}
fn handle_packet(&mut self, packet: TsPacket) -> anyhow::Result<()> {
let pid = packet.header.pid;
match packet.payload {
Some(TsPayload::Pmt(pmt)) => {
let scte = pmt
.program_info
.iter()
.any(|d| d.tag == 0x05 && d.data.len() >= 4 && &d.data[0..4] == b"CUEI");
for es in &pmt.es_info {
if scte && matches!(es.stream_type, StreamType::Dts8ChannelLosslessAudio) {
self.ensure_scte(es.elementary_pid)?;
} else {
self.ensure_stream(es.elementary_pid, es.stream_type)?;
}
}
}
Some(TsPayload::PesStart(pes)) => self.handle_pes_start(pid, pes)?,
Some(TsPayload::PesContinuation(bytes)) => self.handle_pes_continuation(pid, &bytes)?,
Some(TsPayload::Pat(pat)) => {
self.pmt_pids
.extend(pat.table.iter().map(|entry| entry.program_map_pid));
}
_ => {}
}
Ok(())
}
fn ensure_stream(&mut self, pid: Pid, stream_type: StreamType) -> anyhow::Result<()> {
if self.streams.contains_key(&pid) {
return Ok(());
}
let stream = match stream_type {
StreamType::H264 => {
let import =
h264::Import::new(self.broadcast.clone(), self.catalog.clone()).with_mode(h264::Mode::Avc3)?;
Stream::H264 {
import: Box::new(import),
unwrap: PtsUnwrap::default(),
}
}
StreamType::H265 => Stream::H265 {
import: Box::new(h265::Import::new(self.broadcast.clone(), self.catalog.clone())),
unwrap: PtsUnwrap::default(),
},
StreamType::AdtsAac => Stream::Aac(Box::new(AacStream {
import: None,
broadcast: self.broadcast.clone(),
catalog: self.catalog.clone(),
unwrap: PtsUnwrap::default(),
jitter: None,
})),
StreamType::Mpeg1Audio | StreamType::Mpeg2HalvedSampleRateAudio => self.legacy_stream(&mp2::DESCRIPTOR),
StreamType::DolbyDigitalUpToSixChannelAudio => self.legacy_stream(&ac3::DESCRIPTOR),
StreamType::DolbyDigitalPlusUpTo16ChannelAudioForAtsc => self.legacy_stream(&eac3::DESCRIPTOR),
StreamType::Mpeg1Video | StreamType::Mpeg2Video => Stream::Clock,
other => {
tracing::warn!(?other, pid = pid.as_u16(), "unsupported TS stream type, dropping");
Stream::Ignored
}
};
if !matches!(stream, Stream::Ignored | Stream::Clock) {
self.initialized = true;
}
self.streams.insert(pid, stream);
Ok(())
}
fn legacy_stream(&self, descriptor: &'static legacy::Descriptor) -> Stream<E> {
Stream::Legacy(Box::new(LegacyStream {
descriptor,
import: None,
broadcast: self.broadcast.clone(),
catalog: self.catalog.clone(),
unwrap: PtsUnwrap::default(),
tail: Vec::new(),
tail_pts: None,
}))
}
fn ensure_scte(&mut self, pid: Pid) -> anyhow::Result<()> {
if self.scte.contains_key(&pid.as_u16()) {
return Ok(());
}
self.pending.remove(&pid);
if !self.supports_scte35 {
if !matches!(self.streams.insert(pid, Stream::Ignored), Some(Stream::Ignored)) {
tracing::warn!(
pid = pid.as_u16(),
"SCTE-35 detected without catalog support; dropping cues"
);
}
return Ok(());
}
self.streams.remove(&pid);
let stream = ScteStream::new(self.broadcast.clone(), self.catalog.clone())?;
self.scte.insert(pid.as_u16(), stream);
self.initialized = true;
tracing::debug!(
pid = pid.as_u16(),
"SCTE-35 stream detected (CUEI); intercepting before the reader"
);
Ok(())
}
fn handle_pes_start(&mut self, pid: Pid, pes: Pes) -> anyhow::Result<()> {
if self.pending.contains_key(&pid) {
self.flush(pid)?;
}
let Some(stream) = self.streams.get(&pid) else {
return Ok(());
};
let is_video = matches!(stream, Stream::H264 { .. } | Stream::H265 { .. } | Stream::Clock);
let is_clock = matches!(stream, Stream::Clock);
if is_video {
self.audio_burst = None;
if pes.header.pts.is_some() {
self.last_pts = unwrap_pts(&mut self.media_unwrap, pes.header.pts.map(|t| t.as_u64()))?;
}
}
if is_clock {
return Ok(());
}
let data_len = pes_data_len(&pes.header, pes.pes_packet_len);
let mut pending = Pending {
pts: pes.header.pts.map(|t| t.as_u64()),
data: Vec::with_capacity(pes.data.len()),
data_len,
};
pending.data.extend_from_slice(&pes.data);
let complete = matches!(data_len, Some(len) if pending.data.len() >= len);
self.pending.insert(pid, pending);
if complete {
self.flush(pid)?;
}
Ok(())
}
fn handle_pes_continuation(&mut self, pid: Pid, data: &[u8]) -> anyhow::Result<()> {
let Some(pending) = self.pending.get_mut(&pid) else {
return Ok(());
};
pending.data.extend_from_slice(data);
if matches!(pending.data_len, Some(len) if pending.data.len() >= len) {
self.flush(pid)?;
}
Ok(())
}
fn flush(&mut self, pid: Pid) -> anyhow::Result<()> {
let Some(pending) = self.pending.remove(&pid) else {
return Ok(());
};
let is_video = matches!(self.streams.get(&pid), Some(Stream::H264 { .. } | Stream::H265 { .. }));
let run_start = if is_video {
self.audio_burst = None;
None
} else if matches!(self.streams.get(&pid), Some(Stream::Aac(_))) {
pending.pts.map(|audio| *self.audio_burst.get_or_insert(audio))
} else {
None
};
let Some(stream) = self.streams.get_mut(&pid) else {
return Ok(());
};
stream.write(pending, run_start)
}
pub fn seek(&mut self, sequence: u64) -> anyhow::Result<()> {
for stream in self.streams.values_mut() {
stream.seek(sequence)?;
}
for scte in self.scte.values_mut() {
scte.seek(sequence)?;
}
Ok(())
}
pub fn finish(&mut self) -> anyhow::Result<()> {
let pids: Vec<Pid> = self.pending.keys().copied().collect();
for pid in pids {
self.flush(pid)?;
}
for stream in self.streams.values_mut() {
stream.finish()?;
}
for scte in self.scte.values_mut() {
scte.finish()?;
}
Ok(())
}
}
struct Pending {
pts: Option<u64>,
data: Vec<u8>,
data_len: Option<usize>,
}
struct ScteStream<E: scte35::Catalog> {
track: crate::container::Producer<crate::catalog::hang::Container>,
catalog: crate::catalog::Producer<E>,
reassembler: ScteReassembler,
}
impl<E: scte35::Catalog> ScteStream<E> {
fn new(
mut broadcast: moq_net::BroadcastProducer,
mut catalog: crate::catalog::Producer<E>,
) -> anyhow::Result<Self> {
let mut guard = catalog.lock();
let Some(scte35) = guard.scte35_mut() else {
anyhow::bail!("catalog extension no longer carries a scte35 section");
};
let track = broadcast.unique_track(".scte35")?;
let mut config = scte35::Config::new();
config.container = hang::catalog::Container::Legacy;
scte35.renditions.insert(track.name.clone(), config);
drop(guard);
Ok(Self {
track: crate::container::Producer::new(track, crate::catalog::hang::Container::Legacy),
catalog,
reassembler: ScteReassembler::default(),
})
}
fn packet(&mut self, pkt: &[u8], pts: Timestamp) -> anyhow::Result<()> {
let mut sections = Vec::new();
self.reassembler.push(pkt, &mut sections);
for section in sections {
self.emit(section, pts)?;
}
Ok(())
}
fn emit(&mut self, section: Vec<u8>, pts: Timestamp) -> anyhow::Result<()> {
let frame = crate::container::Frame {
timestamp: pts,
payload: bytes::Bytes::from(section),
keyframe: true,
};
self.track.write(frame)?;
self.track.finish_group()?;
Ok(())
}
fn seek(&mut self, sequence: u64) -> anyhow::Result<()> {
self.track.seek(sequence)?;
Ok(())
}
fn finish(&mut self) -> anyhow::Result<()> {
self.track.finish()?;
Ok(())
}
}
impl<E: scte35::Catalog> Drop for ScteStream<E> {
fn drop(&mut self) {
if let Some(scte35) = self.catalog.lock().scte35_mut() {
scte35.renditions.remove(&self.track.name);
}
}
}
#[derive(Default)]
struct ScteReassembler {
acc: Vec<u8>,
last_cc: Option<u8>,
last_pkt: Option<[u8; 188]>,
}
impl ScteReassembler {
fn push(&mut self, pkt: &[u8], out: &mut Vec<Vec<u8>>) {
if pkt[1] & 0x80 != 0 {
self.acc.clear();
self.last_cc = None;
self.last_pkt = None;
return;
}
let pusi = pkt[1] & 0x40 != 0;
let afc = (pkt[3] >> 4) & 0x3;
let cc = pkt[3] & 0x0f;
let has_payload = afc & 0x1 != 0;
let mut off = 4;
let mut discontinuity = false;
if afc & 0x2 != 0 {
let af_len = pkt[4] as usize;
discontinuity = af_len > 0 && pkt[5] & 0x80 != 0;
off = 5 + af_len;
}
if !has_payload {
if discontinuity {
self.acc.clear();
self.last_cc = None;
self.last_pkt = None;
}
return;
}
if self.last_pkt.as_ref().is_some_and(|last| last[..] == pkt[..]) {
return;
}
self.last_pkt = pkt.try_into().ok();
let cc_gap = matches!(self.last_cc, Some(last) if cc != (last + 1) & 0x0f);
let reset = discontinuity || cc_gap;
if reset {
self.acc.clear();
}
self.last_cc = Some(cc);
if off >= pkt.len() {
return;
}
let payload = &pkt[off..];
if pusi {
let ptr = payload[0] as usize;
if 1 + ptr > payload.len() {
self.acc.clear();
return;
}
if !self.acc.is_empty() {
self.acc.extend_from_slice(&payload[1..1 + ptr]);
self.drain(out);
}
self.acc.clear();
self.acc.extend_from_slice(&payload[1 + ptr..]);
self.drain(out);
} else if !self.acc.is_empty() {
self.acc.extend_from_slice(payload);
self.drain(out);
}
}
fn drain(&mut self, out: &mut Vec<Vec<u8>>) {
loop {
match self.acc.first() {
None => return,
Some(&0xff) => {
self.acc.clear();
return;
}
_ => {}
}
if self.acc.len() < 3 {
return;
}
let section_length = (((self.acc[1] & 0x0f) as usize) << 8) | self.acc[2] as usize;
if section_length > 4093 {
self.acc.clear();
return;
}
let full = 3 + section_length;
if self.acc.len() < full {
return;
}
let section: Vec<u8> = self.acc.drain(..full).collect();
if section.first() == Some(&0xfc) {
out.push(section);
}
}
}
}
enum Stream<E: CatalogExt = ()> {
H264 {
import: Box<h264::Import<E>>,
unwrap: PtsUnwrap,
},
H265 {
import: Box<h265::Import<E>>,
unwrap: PtsUnwrap,
},
Aac(Box<AacStream<E>>),
Legacy(Box<LegacyStream<E>>),
Clock,
Ignored,
}
impl<E: CatalogExt> Stream<E> {
fn write(&mut self, pending: Pending, burst: Option<u64>) -> anyhow::Result<()> {
match self {
Stream::H264 { import, unwrap } => {
let pts = unwrap_pts(unwrap, pending.pts)?;
import.decode_frame(&mut pending.data.as_slice(), pts)
}
Stream::H265 { import, unwrap } => {
let pts = unwrap_pts(unwrap, pending.pts)?;
import.decode_frame(&mut pending.data.as_slice(), pts)
}
Stream::Aac(stream) => stream.write(pending, burst),
Stream::Legacy(stream) => stream.write(pending),
Stream::Clock | Stream::Ignored => Ok(()),
}
}
fn seek(&mut self, sequence: u64) -> anyhow::Result<()> {
match self {
Stream::H264 { import, .. } => import.seek(sequence),
Stream::H265 { import, .. } => import.seek(sequence),
Stream::Aac(stream) => stream.seek(sequence),
Stream::Legacy(stream) => stream.seek(sequence),
Stream::Clock | Stream::Ignored => Ok(()),
}
}
fn finish(&mut self) -> anyhow::Result<()> {
match self {
Stream::H264 { import, .. } => import.finish(),
Stream::H265 { import, .. } => import.finish(),
Stream::Aac(stream) => stream.finish(),
Stream::Legacy(stream) => stream.finish(),
Stream::Clock | Stream::Ignored => Ok(()),
}
}
}
struct AacStream<E: CatalogExt = ()> {
import: Option<aac::Import<E>>,
broadcast: moq_net::BroadcastProducer,
catalog: crate::catalog::Producer<E>,
unwrap: PtsUnwrap,
jitter: Option<Timestamp>,
}
impl<E: CatalogExt> AacStream<E> {
fn write(&mut self, pending: Pending, run_start: Option<u64>) -> anyhow::Result<()> {
let base = unwrap_pts(&mut self.unwrap, pending.pts)?;
let data = &pending.data;
let mut offset = 0;
let mut index = 0u64;
let mut sample_rate = None;
while offset + 7 <= data.len() {
let header = adts::Header::parse(&data[offset..])?;
let end = offset + header.frame_len;
anyhow::ensure!(end <= data.len(), "ADTS frame exceeds PES payload");
sample_rate = Some(header.sample_rate);
let import = match &mut self.import {
Some(import) => import,
None => {
let config = aac::Config {
profile: header.object_type,
sample_rate: header.sample_rate,
channel_count: header.channel_count,
};
let description = config.encode();
let import = aac::Import::new(self.broadcast.clone(), self.catalog.clone(), config)?;
let name = import.track().name.clone();
if let Some(rendition) = self.catalog.lock().audio.renditions.get_mut(&name) {
rendition.description = Some(description);
}
self.import.insert(import)
}
};
let pts = match base {
Some(base) if index > 0 => {
let advance = Timestamp::from_scale(index * 1024, header.sample_rate as u64)?;
Some(base + advance)
}
other => other,
};
let mut raw = &data[offset + header.header_len..end];
import.decode(&mut raw, pts)?;
offset = end;
index += 1;
}
self.update_jitter(run_start, pending.pts, index, sample_rate)
}
fn update_jitter(
&mut self,
run_start: Option<u64>,
pes_pts: Option<u64>,
frames: u64,
sample_rate: Option<u32>,
) -> anyhow::Result<()> {
let (Some(start), Some(pts), Some(rate)) = (run_start, pes_pts, sample_rate) else {
return Ok(());
};
if frames == 0 {
return Ok(());
}
let frame = 1024 * 90_000 / rate as u64;
let span = pts.saturating_sub(start) + frames * frame;
if span > 90_000 * 4 {
return Ok(());
}
let jitter = Timestamp::from_scale(span, 90_000)?;
if jitter <= self.jitter.unwrap_or(Timestamp::ZERO) {
return Ok(());
}
self.jitter = Some(jitter);
if let Some(import) = &self.import {
let name = import.track().name.clone();
if let Some(rendition) = self.catalog.lock().audio.renditions.get_mut(&name) {
rendition.jitter = Some(jitter.convert()?);
}
}
Ok(())
}
fn seek(&mut self, sequence: u64) -> anyhow::Result<()> {
if let Some(import) = &mut self.import {
import.seek(sequence)?;
}
Ok(())
}
fn finish(&mut self) -> anyhow::Result<()> {
if let Some(import) = &mut self.import {
import.finish()?;
}
Ok(())
}
}
struct LegacyStream<E: CatalogExt = ()> {
descriptor: &'static legacy::Descriptor,
import: Option<legacy::Import<E>>,
broadcast: moq_net::BroadcastProducer,
catalog: crate::catalog::Producer<E>,
unwrap: PtsUnwrap,
tail: Vec<u8>,
tail_pts: Option<Timestamp>,
}
impl<E: CatalogExt> LegacyStream<E> {
fn write(&mut self, pending: Pending) -> anyhow::Result<()> {
let pes_base = unwrap_pts(&mut self.unwrap, pending.pts)?;
let carried = self.tail.len();
let joined;
let data: &[u8] = if carried == 0 {
&pending.data
} else {
let mut j = std::mem::take(&mut self.tail);
j.extend_from_slice(&pending.data);
joined = j;
&joined
};
let mut pts = if carried > 0 { self.tail_pts.take() } else { pes_base };
let mut in_tail = carried > 0;
let mut offset = 0;
while offset + self.descriptor.min_header_len <= data.len() {
if in_tail && offset >= carried {
pts = pes_base;
in_tail = false;
}
let header = (self.descriptor.parse)(&data[offset..])?;
let end = offset + header.len;
if end > data.len() {
break;
}
let import = match &mut self.import {
Some(import) => import,
None => {
let config = legacy::Config {
sample_rate: header.sample_rate,
channel_count: header.channel_count,
};
let import =
legacy::Import::new(self.descriptor, self.broadcast.clone(), self.catalog.clone(), config)?;
self.import.insert(import)
}
};
let mut frame = &data[offset..end];
import.decode(&mut frame, pts)?;
pts = match pts {
Some(pts) => Some(pts + Timestamp::from_scale(header.samples, header.sample_rate as u64)?),
None => None,
};
offset = end;
}
if offset < data.len() {
if in_tail && offset >= carried {
pts = pes_base;
}
self.tail = data[offset..].to_vec();
self.tail_pts = pts;
}
Ok(())
}
fn seek(&mut self, sequence: u64) -> anyhow::Result<()> {
self.tail.clear();
self.tail_pts = None;
if let Some(import) = &mut self.import {
import.seek(sequence)?;
}
Ok(())
}
fn finish(&mut self) -> anyhow::Result<()> {
if !self.tail.is_empty() {
tracing::debug!(
suffix = self.descriptor.track_suffix,
bytes = self.tail.len(),
"dropping partial frame at end of stream"
);
}
if let Some(import) = &mut self.import {
import.finish()?;
}
Ok(())
}
}
fn pes_data_len(header: &PesHeader, pes_packet_len: u16) -> Option<usize> {
if pes_packet_len == 0 {
return None;
}
let optional = 3 + header.pts.map_or(0, |_| 5) + header.dts.map_or(0, |_| 5) + header.escr.map_or(0, |_| 6);
pes_packet_len.checked_sub(optional).map(|n| n as usize)
}
fn unwrap_pts(unwrap: &mut PtsUnwrap, pts: Option<u64>) -> anyhow::Result<Option<Timestamp>> {
let Some(raw) = pts else {
return Ok(None);
};
let extended = unwrap.unwrap(raw);
Ok(Some(Timestamp::from_scale(extended, 90_000)?))
}
#[derive(Default)]
struct PtsUnwrap {
last: Option<u64>,
offset: u64,
}
impl PtsUnwrap {
fn unwrap(&mut self, raw: u64) -> u64 {
const WRAP: u64 = 1 << 33;
const HALF: i64 = (WRAP / 2) as i64;
if let Some(last) = self.last {
let diff = raw as i64 - last as i64;
if diff < -HALF {
self.offset += WRAP;
} else if diff > HALF && self.offset >= WRAP {
self.offset -= WRAP;
}
}
self.last = Some(raw);
self.offset + raw
}
}
#[derive(Clone, Default)]
struct Feed(Arc<Mutex<FeedState>>);
#[derive(Default)]
struct FeedState {
data: BytesMut,
pos: usize,
}
impl Feed {
fn lock(&self) -> std::sync::MutexGuard<'_, FeedState> {
self.0.lock().unwrap()
}
}
impl Read for Feed {
fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
let mut state = self.lock();
let n = out.len().min(state.data.len() - state.pos);
if n == 0 {
return Ok(0);
}
out[..n].copy_from_slice(&state.data[state.pos..state.pos + n]);
state.pos += n;
Ok(n)
}
}
#[cfg(test)]
mod test {
use mpeg2ts::es::StreamType;
use super::ScteReassembler;
use crate::container::Timestamp;
const CUE: [u8; 30] = [
0xfc, 0x30, 0x1b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xf0, 0x0a, 0x05, 0x00, 0x00, 0x2b, 0xb4,
0x7f, 0xdf, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0xad, 0x25, 0xe8, 0x39,
];
fn packet(pusi: bool, cc: u8, pointer: u8, body: &[u8]) -> Vec<u8> {
let mut p = vec![0x47, 0x00, 0x21, 0x10 | (cc & 0x0f)];
if pusi {
p[1] |= 0x40;
p.push(pointer);
}
p.extend_from_slice(body);
assert!(p.len() <= 188, "test packet body overflows 188 bytes");
p.resize(188, 0xff);
p
}
fn discontinuity_packet(cc: u8, body: &[u8]) -> Vec<u8> {
let mut p = vec![0x47, 0x00, 0x21, 0x30 | (cc & 0x0f), 0x01, 0x80];
p.extend_from_slice(body);
assert!(p.len() <= 188, "test packet body overflows 188 bytes");
p.resize(188, 0xff);
p
}
fn fake_section(table_id: u8, body_len: usize) -> Vec<u8> {
let mut s = vec![table_id, ((body_len >> 8) & 0x0f) as u8, (body_len & 0xff) as u8];
s.resize(3 + body_len, 0x00);
s
}
fn run(pkts: &[Vec<u8>]) -> Vec<Vec<u8>> {
let mut r = ScteReassembler::default();
let mut out = Vec::new();
for p in pkts {
r.push(p, &mut out);
}
out
}
#[test]
fn single_section() {
assert_eq!(run(&[packet(true, 0, 0, &CUE)]), vec![CUE.to_vec()]);
}
#[test]
fn filters_non_scte() {
let mut body = fake_section(0x00, 5);
body.extend_from_slice(&CUE);
assert_eq!(run(&[packet(true, 0, 0, &body)]), vec![CUE.to_vec()]);
}
#[test]
fn stuffing_only() {
assert!(run(&[packet(true, 0, 0, &[])]).is_empty());
}
#[test]
fn payload_split_across_packets() {
let section = fake_section(0xfc, 247);
let p1 = packet(true, 0, 0, §ion[..183]);
let p2 = packet(false, 1, 0, §ion[183..]);
assert_eq!(run(&[p1, p2]), vec![section]);
}
#[test]
fn header_split_across_packets() {
let a = fake_section(0xfc, 178);
let mut body = a.clone();
body.extend_from_slice(&CUE[..2]);
let p1 = packet(true, 0, 0, &body);
let p2 = packet(false, 1, 0, &CUE[2..]);
assert_eq!(run(&[p1, p2]), vec![a, CUE.to_vec()]);
}
#[test]
fn continuity_gap_drops_partial() {
let section = fake_section(0xfc, 247);
let p1 = packet(true, 0, 0, §ion[..183]);
let p2 = packet(false, 3, 0, §ion[183..]);
assert!(run(&[p1, p2]).is_empty());
}
#[test]
fn discontinuity_drops_partial() {
let section = fake_section(0xfc, 247);
let p1 = packet(true, 0, 0, §ion[..183]);
let p2 = discontinuity_packet(1, §ion[183..]);
assert!(run(&[p1, p2]).is_empty());
}
#[test]
fn gap_then_unaligned_payload_is_not_emitted() {
let section = fake_section(0xfc, 247);
let p1 = packet(true, 0, 0, §ion[..183]);
let p2 = packet(false, 2, 0, &CUE); assert!(run(&[p1, p2]).is_empty());
}
#[test]
fn corrupt_pointer_field_is_dropped() {
assert!(run(&[packet(true, 0, 200, &CUE)]).is_empty());
}
#[test]
fn stays_desynced_until_next_pusi() {
let section = fake_section(0xfc, 247);
let p1 = packet(true, 0, 0, §ion[..183]);
let p2 = packet(false, 2, 0, §ion[183..]); let p3 = packet(false, 3, 0, &CUE); let p4 = packet(true, 4, 0, &CUE); assert_eq!(run(&[p1, p2, p3, p4]), vec![CUE.to_vec()]);
}
#[test]
fn orphan_tail_before_section_is_skipped() {
let mut body = CUE.to_vec(); body.extend_from_slice(&CUE); let pkt = packet(true, 0, CUE.len() as u8, &body);
assert_eq!(run(&[pkt]), vec![CUE.to_vec()]);
}
fn synth_pmt(es: &[(StreamType, u16)], cuei: bool) -> Vec<u8> {
use mpeg2ts::ts::payload::{Pat, Pmt};
use mpeg2ts::ts::{
ContinuityCounter, Descriptor, EsInfo, Pid, ProgramAssociation, TransportScramblingControl, TsHeader,
TsPacket, TsPacketWriter, TsPayload, VersionNumber, WriteTsPacket,
};
const PMT_PID: u16 = 0x0100;
let pat = Pat {
transport_stream_id: 1,
version_number: VersionNumber::default(),
table: vec![ProgramAssociation {
program_num: 1,
program_map_pid: Pid::new(PMT_PID).unwrap(),
}],
};
let pmt = Pmt {
program_num: 1,
pcr_pid: None,
version_number: VersionNumber::default(),
program_info: if cuei {
vec![Descriptor {
tag: 0x05,
data: b"CUEI".to_vec(),
}]
} else {
Vec::new()
},
es_info: es
.iter()
.map(|&(stream_type, pid)| EsInfo {
stream_type,
elementary_pid: Pid::new(pid).unwrap(),
descriptors: Vec::new(),
})
.collect(),
};
let write = |out: &mut Vec<u8>, pid: u16, payload: TsPayload| {
let packet = TsPacket {
header: TsHeader {
transport_error_indicator: false,
transport_priority: false,
pid: Pid::new(pid).unwrap(),
transport_scrambling_control: TransportScramblingControl::NotScrambled,
continuity_counter: ContinuityCounter::default(),
},
adaptation_field: None,
payload: Some(payload),
};
TsPacketWriter::new(out).write_ts_packet(&packet).unwrap();
};
let mut out = Vec::new();
write(&mut out, Pid::PAT, TsPayload::Pat(pat));
write(&mut out, PMT_PID, TsPayload::Pmt(pmt));
out
}
#[test]
fn scte35_extension_catalogs_the_cue_track() {
use crate::catalog::hang::Catalog;
use crate::container::ts::scte35;
let mut broadcast = moq_net::Broadcast::new().produce();
let catalog =
crate::catalog::Producer::with_catalog(&mut broadcast, Catalog::<scte35::Ext>::default()).unwrap();
let mut import = super::Import::new(broadcast, catalog.clone());
let mut bytes = bytes::BytesMut::new();
bytes.extend_from_slice(&synth_pmt(&[(StreamType::Dts8ChannelLosslessAudio, 0x21)], true));
bytes.extend_from_slice(&packet(true, 0, 0, &CUE));
import.decode(&mut bytes).unwrap();
import.finish().unwrap();
assert_eq!(
catalog.snapshot().scte35.renditions.len(),
1,
"expected one scte35 rendition"
);
}
#[tokio::test(start_paused = true)]
async fn base_catalog_routes_cue_pid_to_ignored() {
let mut broadcast = moq_net::Broadcast::new().produce();
let catalog = crate::catalog::Producer::new(&mut broadcast).unwrap();
let mut updates = catalog.consume().unwrap();
let mut import = super::Import::new(broadcast, catalog.clone());
let mut bytes = bytes::BytesMut::new();
bytes.extend_from_slice(&synth_pmt(&[(StreamType::Dts8ChannelLosslessAudio, 0x21)], true));
bytes.extend_from_slice(&packet(true, 0, 0, &CUE));
import.decode(&mut bytes).unwrap(); import.finish().unwrap();
assert!(import.scte.is_empty(), "no cue stream is created for a base catalog");
assert!(
matches!(
import.streams.get(&mpeg2ts::ts::Pid::new(0x21).unwrap()),
Some(super::Stream::Ignored)
),
"the CUEI PID routes to Ignored"
);
assert!(
tokio::time::timeout(std::time::Duration::from_millis(10), updates.next())
.await
.is_err(),
"SCTE detection must not publish the base catalog"
);
}
#[tokio::test(start_paused = true)]
async fn pmt_without_cuei_then_with_cuei_upgrades() {
use crate::catalog::hang::{Catalog, Container};
use crate::container::Consumer;
use crate::container::ts::scte35;
const SECTION_PID: u16 = 0x0021;
let pid = mpeg2ts::ts::Pid::new(SECTION_PID).unwrap();
let mut broadcast = moq_net::Broadcast::new().produce();
let consumer = broadcast.consume();
let catalog =
crate::catalog::Producer::with_catalog(&mut broadcast, Catalog::<scte35::Ext>::default()).unwrap();
let mut import = super::Import::new(broadcast, catalog.clone());
let mut bytes = bytes::BytesMut::new();
bytes.extend_from_slice(&synth_pmt(
&[(StreamType::Dts8ChannelLosslessAudio, SECTION_PID)],
false,
));
import.decode(&mut bytes).unwrap();
assert!(
matches!(import.streams.get(&pid), Some(super::Stream::Ignored)),
"pre-CUEI PMT routes the PID to Ignored"
);
let mut bytes = bytes::BytesMut::new();
bytes.extend_from_slice(&synth_pmt(&[(StreamType::Dts8ChannelLosslessAudio, SECTION_PID)], true));
bytes.extend_from_slice(&packet(true, 0, 0, &CUE));
import.decode(&mut bytes).unwrap();
import.finish().unwrap();
assert!(
!import.streams.contains_key(&pid),
"upgrade drops the stale Ignored route"
);
assert_eq!(
catalog.snapshot().scte35.renditions.len(),
1,
"upgrade advertises the cue track"
);
let name = catalog.snapshot().scte35.renditions.keys().next().unwrap().clone();
let track = consumer.subscribe_track(&moq_net::Track::new(name)).unwrap();
let mut reader = Consumer::new(track, Container::Legacy).with_latency(std::time::Duration::ZERO);
let frame = tokio::time::timeout(std::time::Duration::from_secs(1), reader.read())
.await
.expect("cue read timed out")
.unwrap()
.expect("a published cue frame");
assert_eq!(
&frame.payload[..],
&CUE[..],
"verbatim splice_info_section after upgrade"
);
}
fn pes_packet(pid: u16, pts: u64) -> Vec<u8> {
let pts_field = [
0x21 | (((pts >> 30) & 0x07) << 1) as u8,
((pts >> 22) & 0xff) as u8,
0x01 | (((pts >> 15) & 0x7f) << 1) as u8,
((pts >> 7) & 0xff) as u8,
0x01 | ((pts & 0x7f) << 1) as u8,
];
let mut pes = vec![0x00, 0x00, 0x01, 0xe0]; let pes_len = 3 + 5 + 1; pes.push((pes_len >> 8) as u8);
pes.push((pes_len & 0xff) as u8);
pes.push(0x80); pes.push(0x80); pes.push(0x05); pes.extend_from_slice(&pts_field);
pes.push(0xff);
let mut p = vec![0x47, 0x40 | ((pid >> 8) as u8 & 0x1f), (pid & 0xff) as u8, 0x10];
p.extend_from_slice(&pes);
assert!(p.len() <= 188, "PES packet overflows 188 bytes");
p.resize(188, 0xff);
p
}
#[test]
fn media_clock_follows_video_not_private_pes() {
const VIDEO_PID: u16 = 0x0050;
const PRIVATE_PID: u16 = 0x0051;
let mut broadcast = moq_net::Broadcast::new().produce();
let catalog = crate::catalog::Producer::new(&mut broadcast).unwrap();
let mut import = super::Import::new(broadcast, catalog);
let mut bytes = bytes::BytesMut::new();
bytes.extend_from_slice(&synth_pmt(
&[
(StreamType::Mpeg2Video, VIDEO_PID),
(StreamType::Mpeg2PacketizedData, PRIVATE_PID),
],
true,
));
import.decode(&mut bytes).unwrap();
import.decode(&mut pes_packet(PRIVATE_PID, 1_000).as_slice()).unwrap();
assert!(import.last_pts.is_none(), "a private PES must not start the clock");
import.decode(&mut pes_packet(VIDEO_PID, 90_000).as_slice()).unwrap();
let after_video = import.last_pts;
assert!(after_video.is_some(), "MPEG-2 video PTS must set the clock");
import.decode(&mut pes_packet(PRIVATE_PID, 270_000).as_slice()).unwrap();
assert_eq!(
import.last_pts, after_video,
"a later private PES must not overwrite the clock"
);
}
fn audio_pes_packet(pid: u16, cc: u8, pts: u64, payload: &[u8]) -> Vec<u8> {
let pts_field = [
0x21 | (((pts >> 30) & 0x07) << 1) as u8,
((pts >> 22) & 0xff) as u8,
0x01 | (((pts >> 15) & 0x7f) << 1) as u8,
((pts >> 7) & 0xff) as u8,
0x01 | ((pts & 0x7f) << 1) as u8,
];
let mut pes = vec![0x00, 0x00, 0x01, 0xc0];
let pes_len = 3 + 5 + payload.len();
pes.push((pes_len >> 8) as u8);
pes.push((pes_len & 0xff) as u8);
pes.extend_from_slice(&[0x80, 0x80, 0x05]); pes.extend_from_slice(&pts_field);
pes.extend_from_slice(payload);
let af_len = 184 - 1 - pes.len();
let mut p = vec![
0x47,
0x40 | ((pid >> 8) as u8 & 0x1f),
(pid & 0xff) as u8,
0x30 | (cc & 0x0f),
];
p.push(af_len as u8);
if af_len > 0 {
p.push(0x00); p.extend(std::iter::repeat_n(0xff, af_len - 1));
}
p.extend_from_slice(&pes);
assert_eq!(p.len(), 188, "audio PES packet must fill exactly one TS packet");
p
}
#[test]
fn verbatim_audio_does_not_anchor_aac_jitter() {
const AAC_PID: u16 = 0x0060;
const MP2_PID: u16 = 0x0061;
let mut broadcast = moq_net::Broadcast::new().produce();
let catalog = crate::catalog::Producer::new(&mut broadcast).unwrap();
let mut import = super::Import::new(broadcast, catalog.clone());
let mut bytes = bytes::BytesMut::new();
bytes.extend_from_slice(&synth_pmt(
&[(StreamType::AdtsAac, AAC_PID), (StreamType::Mpeg1Audio, MP2_PID)],
false,
));
let mut mp2 = vec![0xFF, 0xFD, 0x14, 0x00];
mp2.resize(96, 0xAA);
bytes.extend_from_slice(&audio_pes_packet(MP2_PID, 0, 90_000, &mp2));
let mut aac = super::adts::write_header(2, 48_000, 2, 8).unwrap().to_vec();
aac.extend_from_slice(&[0u8; 8]);
bytes.extend_from_slice(&audio_pes_packet(AAC_PID, 0, 270_000, &aac));
import.decode(&mut bytes).unwrap();
import.finish().unwrap();
let snap = catalog.snapshot();
assert_eq!(snap.audio.renditions.len(), 2, "AAC and MP2 renditions");
let aac_rendition = snap
.audio
.renditions
.values()
.find(|a| a.codec.to_string().starts_with("mp4a"))
.expect("AAC rendition");
let jitter = aac_rendition.jitter.expect("AAC publishes a jitter");
assert!(
jitter <= moq_net::Time::from_millis_unchecked(100),
"AAC jitter anchored on a foreign PID: {jitter:?}"
);
}
async fn read_audio_frames(
consumer: &moq_net::BroadcastConsumer,
catalog: &crate::catalog::Producer,
) -> Vec<crate::container::Frame> {
let name = catalog
.snapshot()
.audio
.renditions
.keys()
.next()
.expect("an audio track")
.clone();
let track = consumer.subscribe_track(&moq_net::Track::new(name)).unwrap();
let mut reader = crate::container::Consumer::new(track, crate::catalog::hang::Container::Legacy);
let mut frames = Vec::new();
while let Ok(Ok(Some(frame))) = tokio::time::timeout(std::time::Duration::from_millis(50), reader.read()).await
{
frames.push(frame);
}
frames
}
#[tokio::test(start_paused = true)]
async fn legacy_frame_split_across_pes_reassembles() {
const MP2_PID: u16 = 0x0061;
let mut broadcast = moq_net::Broadcast::new().produce();
let consumer = broadcast.consume();
let catalog = crate::catalog::Producer::new(&mut broadcast).unwrap();
let mut import = super::Import::new(broadcast, catalog.clone());
let pmt = synth_pmt(&[(StreamType::Mpeg1Audio, MP2_PID)], false);
import.decode(&mut bytes::BytesMut::from(&pmt[..])).unwrap();
let mut frame_a = vec![0xFF, 0xFD, 0x14, 0x00];
frame_a.extend((4..96).map(|i| i as u8));
let mut frame_b = vec![0xFF, 0xFD, 0x14, 0x00];
frame_b.extend((4..96).rev().map(|i| i as u8));
let mut second = frame_a[50..].to_vec();
second.extend_from_slice(&frame_b);
import
.decode(&mut audio_pes_packet(MP2_PID, 0, 90_000, &frame_a[..50]).as_slice())
.unwrap();
import
.decode(&mut audio_pes_packet(MP2_PID, 1, 270_000, &second).as_slice())
.unwrap();
import.finish().unwrap();
let frames = read_audio_frames(&consumer, &catalog).await;
assert_eq!(frames.len(), 2, "both frames must survive the split");
assert_eq!(
frames[0].payload.as_ref(),
&frame_a[..],
"frame A reassembled byte-exact"
);
assert_eq!(frames[1].payload.as_ref(), &frame_b[..], "frame B intact");
assert_eq!(frames[0].timestamp, Timestamp::from_millis(1_000).unwrap());
assert_eq!(frames[1].timestamp, Timestamp::from_millis(3_000).unwrap());
}
#[tokio::test(start_paused = true)]
async fn legacy_header_split_keeps_origin_pts() {
const MP2_PID: u16 = 0x0061;
let mut broadcast = moq_net::Broadcast::new().produce();
let consumer = broadcast.consume();
let catalog = crate::catalog::Producer::new(&mut broadcast).unwrap();
let mut import = super::Import::new(broadcast, catalog.clone());
let pmt = synth_pmt(&[(StreamType::Mpeg1Audio, MP2_PID)], false);
import.decode(&mut bytes::BytesMut::from(&pmt[..])).unwrap();
let mut frame_a = vec![0xFF, 0xFD, 0x14, 0x00];
frame_a.resize(96, 0x55);
let mut frame_b = vec![0xFF, 0xFD, 0x14, 0x00];
frame_b.resize(96, 0x66);
let mut first = frame_a.clone();
first.extend_from_slice(&frame_b[..2]);
import
.decode(&mut audio_pes_packet(MP2_PID, 0, 90_000, &first).as_slice())
.unwrap();
import
.decode(&mut audio_pes_packet(MP2_PID, 1, 900_000, &frame_b[2..]).as_slice())
.unwrap();
import.finish().unwrap();
let frames = read_audio_frames(&consumer, &catalog).await;
assert_eq!(frames.len(), 2, "both frames must survive the header split");
assert_eq!(
frames[1].payload.as_ref(),
&frame_b[..],
"frame B reassembled byte-exact"
);
assert_eq!(frames[1].timestamp, Timestamp::from_micros(1_024_000).unwrap());
}
#[tokio::test(start_paused = true)]
async fn scte35_cue_stamped_with_video_pts() {
use crate::catalog::hang::{Catalog, Container};
use crate::container::ts::scte35;
use crate::container::{Consumer, Timestamp};
const VIDEO_PID: u16 = 0x0050;
let mut broadcast = moq_net::Broadcast::new().produce();
let consumer = broadcast.consume();
let catalog =
crate::catalog::Producer::with_catalog(&mut broadcast, Catalog::<scte35::Ext>::default()).unwrap();
let mut import = super::Import::new(broadcast, catalog.clone());
let mut bytes = bytes::BytesMut::new();
bytes.extend_from_slice(&synth_pmt(
&[
(StreamType::Mpeg2Video, VIDEO_PID),
(StreamType::Dts8ChannelLosslessAudio, 0x21),
],
true,
));
bytes.extend_from_slice(&pes_packet(VIDEO_PID, 90_000)); bytes.extend_from_slice(&packet(true, 0, 0, &CUE)); import.decode(&mut bytes).unwrap();
let clock = import.last_pts.expect("video set the media clock");
import.finish().unwrap();
let name = catalog.snapshot().scte35.renditions.keys().next().unwrap().clone();
let track = consumer.subscribe_track(&moq_net::Track::new(name)).unwrap();
let mut reader = Consumer::new(track, Container::Legacy).with_latency(std::time::Duration::ZERO);
let frame = tokio::time::timeout(std::time::Duration::from_secs(1), reader.read())
.await
.expect("cue read timed out")
.unwrap()
.expect("a published cue frame");
assert_eq!(&frame.payload[..], &CUE[..], "verbatim splice_info_section");
assert_ne!(frame.timestamp, Timestamp::ZERO, "cue must not stamp zero");
assert_eq!(frame.timestamp, clock, "cue stamped with the video media clock");
}
#[test]
fn section_pid_without_cuei_is_dropped_not_cataloged() {
use crate::catalog::hang::Catalog;
use crate::container::ts::scte35;
const VIDEO_PID: u16 = 0x0050;
const SECTION_PID: u16 = 0x0021;
let mut broadcast = moq_net::Broadcast::new().produce();
let catalog =
crate::catalog::Producer::with_catalog(&mut broadcast, Catalog::<scte35::Ext>::default()).unwrap();
let mut import = super::Import::new(broadcast, catalog.clone());
let mut bytes = bytes::BytesMut::new();
bytes.extend_from_slice(&synth_pmt(
&[
(StreamType::Mpeg2Video, VIDEO_PID),
(StreamType::Dts8ChannelLosslessAudio, SECTION_PID),
],
false,
));
bytes.extend_from_slice(&packet(true, 0, 0, &CUE)); bytes.extend_from_slice(&pes_packet(VIDEO_PID, 90_000)); import.decode(&mut bytes).unwrap();
assert!(
import.last_pts.is_some(),
"video kept importing past the dropped section PID"
);
assert!(
catalog.snapshot().scte35.renditions.is_empty(),
"a 0x86 PID without CUEI must not be cataloged"
);
}
#[test]
fn duplicate_mid_section_packet_is_skipped() {
let section = fake_section(0xfc, 400); let p1 = packet(true, 0, 0, §ion[..183]);
let p2 = packet(false, 1, 0, §ion[183..367]);
let p3 = packet(false, 2, 0, §ion[367..]);
assert_eq!(run(&[p1, p2.clone(), p2, p3]), vec![section]);
}
#[test]
fn duplicate_pusi_packet_emits_once() {
let p = packet(true, 0, 0, &CUE);
assert_eq!(run(&[p.clone(), p]), vec![CUE.to_vec()]);
}
#[test]
fn tei_continuation_drops_partial_and_resyncs() {
let section = fake_section(0xfc, 247);
let p1 = packet(true, 0, 0, §ion[..183]);
let mut p2 = packet(false, 1, 0, §ion[183..]);
p2[1] |= 0x80; let p3 = packet(false, 2, 0, &CUE); let p4 = packet(true, 3, 0, &CUE); assert_eq!(run(&[p1, p2, p3, p4]), vec![CUE.to_vec()]);
}
}