use std::collections::HashMap;
use std::task::Poll;
use std::time::Duration;
use anyhow::Context;
use bytes::Bytes;
use hang::catalog::{AudioCodec, AudioConfig, Container, VideoCodec, VideoConfig};
use mpeg2ts::es::StreamId;
use mpeg2ts::es::StreamType;
use mpeg2ts::time::Timestamp as TsTimestamp;
use mpeg2ts::ts::payload::{Bytes as TsBytes, Pat, Pes, Pmt, Section};
use mpeg2ts::ts::{
AdaptationField, ContinuityCounter, Descriptor, EsInfo, Pid, ProgramAssociation, TransportScramblingControl,
TsHeader, TsPacket, TsPacketWriter, TsPayload, VersionNumber, WriteTsPacket,
};
use crate::catalog::CatalogFormat;
use crate::catalog::hang::Catalog;
use crate::codec::annexb;
use crate::container::{CatalogSource, ExportSource, Frame};
use super::adts;
use super::catalog;
const PMT_PID: u16 = 0x1000;
const FIRST_ES_PID: u16 = 0x1001;
const PSI_INTERVAL: Duration = Duration::from_millis(500);
pub struct Export<E: catalog::Catalog = ()> {
broadcast: moq_net::BroadcastConsumer,
catalog: Option<CatalogSource<E>>,
latency: Duration,
tracks: HashMap<String, Track>,
counters: HashMap<u16, ContinuityCounter>,
program_descriptors: Vec<catalog::Descriptor>,
psi: Option<Psi>,
last_psi: Option<crate::container::Timestamp>,
}
struct Track {
source: ExportSource,
pending: Option<Frame>,
finished: bool,
pid: u16,
kind: Kind,
descriptors: Vec<catalog::Descriptor>,
last_dts: Option<u64>,
dts_reserve: u64,
}
#[derive(Clone)]
enum Kind {
Video(StreamType),
Aac {
object_type: u8,
sample_rate: u32,
channel_count: u32,
},
Mp2 { sample_rate: u32 },
Ac3,
Eac3,
Verbatim {
stream_type: u8,
framing: catalog::Framing,
stream_id: Option<u8>,
},
}
struct Psi {
pat: Pat,
pmt: Pmt,
pcr_pid: u16,
}
struct PesUnit {
pid: u16,
is_pcr: bool,
is_video: bool,
keyframe: bool,
timestamp: crate::container::Timestamp,
dts: Option<u64>,
stream_id: Option<u8>,
}
impl Export {
pub fn new(broadcast: moq_net::BroadcastConsumer) -> Result<Self, crate::Error> {
Self::with_catalog_format(broadcast, CatalogFormat::default())
}
pub fn with_catalog_format(
broadcast: moq_net::BroadcastConsumer,
catalog_format: CatalogFormat,
) -> Result<Self, crate::Error> {
Self::build(broadcast, catalog_format)
}
}
impl Export<catalog::Ext> {
pub fn with_ts(broadcast: moq_net::BroadcastConsumer, catalog_format: CatalogFormat) -> Result<Self, crate::Error> {
Self::build(broadcast, catalog_format)
}
}
impl<E: catalog::Catalog> Export<E> {
fn build(broadcast: moq_net::BroadcastConsumer, catalog_format: CatalogFormat) -> Result<Self, crate::Error> {
let catalog = CatalogSource::new(&broadcast, catalog_format)?;
Ok(Self {
broadcast,
catalog: Some(catalog),
latency: Duration::ZERO,
tracks: HashMap::new(),
counters: HashMap::new(),
program_descriptors: Vec::new(),
psi: None,
last_psi: None,
})
}
pub fn with_latency(mut self, latency: Duration) -> Self {
self.latency = latency;
self
}
pub async fn next(&mut self) -> anyhow::Result<Option<Bytes>> {
kio::wait(|waiter| self.poll_next(waiter)).await
}
pub fn poll_next(&mut self, waiter: &kio::Waiter) -> Poll<anyhow::Result<Option<Bytes>>> {
while let Some(catalog) = self.catalog.as_mut() {
match catalog.poll_next(waiter)? {
Poll::Ready(Some(snapshot)) => self.update_catalog(snapshot)?,
Poll::Ready(None) => {
self.catalog = None;
break;
}
Poll::Pending => break,
}
}
let waiting_for_header = self.psi.is_none();
for track in self.tracks.values_mut() {
if track.pending.is_some() || track.finished {
continue;
}
loop {
match track.source.poll_read(waiter) {
Poll::Ready(Ok(Some(frame))) => {
if waiting_for_header && !track.source.header_ready() {
continue;
}
track.pending = Some(frame);
break;
}
Poll::Ready(Ok(None)) => {
track.finished = true;
break;
}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => break,
}
}
}
if self.psi.is_none() {
if self.tracks.is_empty() {
if self.catalog.is_none() {
return Poll::Ready(Ok(None));
}
return Poll::Pending;
}
if !self.header_ready() {
if self.catalog.is_none() && self.tracks.values().all(|t| t.finished) {
return Poll::Ready(Ok(None));
}
return Poll::Pending;
}
self.build_psi()?;
let header = self.write_psi()?;
return Poll::Ready(Ok(Some(header)));
}
if let Some(name) = self.pick_next_track() {
let frame = self.tracks.get_mut(&name).unwrap().pending.take().unwrap();
let chunk = self.write_frame(&name, frame)?;
return Poll::Ready(Ok(Some(chunk)));
}
if self.catalog.is_none() && !self.tracks.is_empty() && self.tracks.values().all(|t| t.finished) {
return Poll::Ready(Ok(None));
}
if self.catalog.is_none() && self.tracks.is_empty() {
return Poll::Ready(Ok(None));
}
Poll::Pending
}
fn update_catalog(&mut self, mut catalog: Catalog<E>) -> anyhow::Result<()> {
let mpegts = catalog.mpegts_mut().cloned().unwrap_or_default();
self.program_descriptors = mpegts.program_descriptors.clone();
let mut active: HashMap<String, ()> = HashMap::new();
for name in catalog.video.renditions.keys() {
active.insert(name.clone(), ());
}
for name in catalog.audio.renditions.keys() {
active.insert(name.clone(), ());
}
for (name, track) in mpegts.tracks.iter() {
if track.verbatim.is_some() {
active.insert(name.clone(), ());
}
}
if self.psi.is_some() {
for name in active.keys() {
anyhow::ensure!(
self.tracks.contains_key(name),
"TS track layout changed after PAT/PMT was emitted: '{name}' added"
);
}
for name in self.tracks.keys() {
anyhow::ensure!(
active.contains_key(name),
"TS track layout changed after PAT/PMT was emitted: '{name}' removed"
);
}
return Ok(());
}
let mut used: Vec<u16> = vec![0x0000, PMT_PID, 0x1FFF];
let mut pids: HashMap<String, u16> = HashMap::new();
for name in active.keys() {
if let Some(pid) = mpegts.tracks.get(name).map(|t| t.pid)
&& !used.contains(&pid)
{
used.push(pid);
pids.insert(name.clone(), pid);
}
}
for name in active.keys() {
if !pids.contains_key(name) {
let mut pid = FIRST_ES_PID;
while used.contains(&pid) {
pid += 1;
}
used.push(pid);
pids.insert(name.clone(), pid);
}
}
let mut old = std::mem::take(&mut self.tracks);
for (name, config) in catalog.video.renditions.iter() {
let kind = video_kind(config, name)?;
let descriptors = track_descriptors(&mpegts, name);
let pid = pids[name];
let reserve = dts_reserve(config);
match old.remove(name) {
Some(mut track) => {
track.pid = pid;
track.kind = kind;
track.descriptors = descriptors;
track.dts_reserve = reserve;
self.tracks.insert(name.clone(), track);
}
None => {
let source = ExportSource::for_video(&self.broadcast, name, config, self.latency)?;
self.insert_track(name, source, pid, kind, descriptors, reserve);
}
}
}
for (name, config) in catalog.audio.renditions.iter() {
let kind = audio_kind(config, name)?;
let descriptors = track_descriptors(&mpegts, name);
let pid = pids[name];
match old.remove(name) {
Some(mut track) => {
track.pid = pid;
track.kind = kind;
track.descriptors = descriptors;
self.tracks.insert(name.clone(), track);
}
None => {
let source = ExportSource::for_audio(&self.broadcast, name, config, self.latency)?;
self.insert_track(name, source, pid, kind, descriptors, DEFAULT_DTS_RESERVE);
}
}
}
for (name, track) in mpegts.tracks.iter() {
let Some(verbatim) = &track.verbatim else {
continue;
};
let kind = Kind::Verbatim {
stream_type: verbatim.stream_type,
framing: verbatim.framing,
stream_id: verbatim.stream_id,
};
let descriptors = track.descriptors.clone();
let pid = pids[name];
match old.remove(name) {
Some(mut existing) => {
existing.pid = pid;
existing.kind = kind;
existing.descriptors = descriptors;
self.tracks.insert(name.clone(), existing);
}
None => {
let source = ExportSource::for_stream(&self.broadcast, name, self.latency)?;
self.insert_track(name, source, pid, kind, descriptors, DEFAULT_DTS_RESERVE);
}
}
}
Ok(())
}
fn insert_track(
&mut self,
name: &str,
source: ExportSource,
pid: u16,
kind: Kind,
descriptors: Vec<catalog::Descriptor>,
dts_reserve: u64,
) {
self.tracks.insert(
name.to_string(),
Track {
source,
pending: None,
finished: false,
pid,
kind,
descriptors,
last_dts: None,
dts_reserve,
},
);
}
fn header_ready(&self) -> bool {
self.tracks.values().all(|t| t.source.header_ready())
}
fn build_psi(&mut self) -> anyhow::Result<()> {
let mut tracks: Vec<&Track> = self.tracks.values().collect();
tracks.sort_by_key(|t| t.pid);
let needs_clock = tracks.iter().any(|t| {
matches!(
&t.kind,
Kind::Verbatim {
framing: catalog::Framing::Section,
..
}
)
});
let video = tracks.iter().find(|t| matches!(t.kind, Kind::Video(_)));
anyhow::ensure!(
!needs_clock || video.is_some(),
"TS export of section-framed verbatim streams (e.g. SCTE-35) requires a video track for the program clock"
);
let pcr_pid = video
.or_else(|| {
tracks
.iter()
.find(|t| matches!(t.kind, Kind::Aac { .. } | Kind::Mp2 { .. } | Kind::Ac3 | Kind::Eac3))
})
.map(|t| t.pid)
.context("TS export requires a video or audio track for the PCR")?;
let es_info = tracks
.iter()
.map(|t| {
let stream_type = match &t.kind {
Kind::Video(stream_type) => *stream_type,
Kind::Aac { .. } => StreamType::AdtsAac,
Kind::Mp2 { sample_rate } if *sample_rate < 32000 => StreamType::Mpeg2HalvedSampleRateAudio,
Kind::Mp2 { .. } => StreamType::Mpeg1Audio,
Kind::Ac3 => StreamType::DolbyDigitalUpToSixChannelAudio,
Kind::Eac3 => StreamType::DolbyDigitalPlusUpTo16ChannelAudioForAtsc,
Kind::Verbatim { stream_type, .. } => {
StreamType::from_u8(*stream_type).map_err(anyhow::Error::msg)?
}
};
let descriptors = if !t.descriptors.is_empty() {
to_pmt_descriptors(&t.descriptors)
} else {
match &t.kind {
Kind::Ac3 => vec![Descriptor {
tag: 0x05,
data: b"AC-3".to_vec(),
}],
Kind::Eac3 => vec![Descriptor {
tag: 0x05,
data: b"EAC3".to_vec(),
}],
_ => Vec::new(),
}
};
Ok(EsInfo {
stream_type,
elementary_pid: Pid::new(t.pid)?,
descriptors,
})
})
.collect::<anyhow::Result<Vec<_>>>()?;
let program_info = if !self.program_descriptors.is_empty() {
to_pmt_descriptors(&self.program_descriptors)
} else if tracks.iter().any(|t| {
matches!(
&t.kind,
Kind::Verbatim {
stream_type: 0x86,
framing: catalog::Framing::Section,
..
}
)
}) {
vec![Descriptor {
tag: 0x05,
data: b"CUEI".to_vec(),
}]
} else {
Vec::new()
};
let pat = Pat {
transport_stream_id: 1,
version_number: VersionNumber::default(),
table: vec![ProgramAssociation {
program_num: 1,
program_map_pid: Pid::new(PMT_PID)?,
}],
};
let pmt = Pmt {
program_num: 1,
pcr_pid: Some(Pid::new(pcr_pid)?),
version_number: VersionNumber::default(),
program_info,
es_info,
};
self.psi = Some(Psi { pat, pmt, pcr_pid });
Ok(())
}
fn write_psi(&mut self) -> anyhow::Result<Bytes> {
let psi = self.psi.as_ref().context("PSI not built")?;
let pat = TsPayload::Pat(psi.pat.clone());
let pmt = TsPayload::Pmt(psi.pmt.clone());
let mut out = Vec::with_capacity(2 * TsPacket::SIZE);
self.write_packet(&mut out, Pid::PAT, None, pat)?;
self.write_packet(&mut out, PMT_PID, None, pmt)?;
Ok(Bytes::from(out))
}
fn pick_next_track(&self) -> Option<String> {
self.tracks
.iter()
.filter_map(|(n, t)| t.pending.as_ref().map(|f| (n.clone(), f.timestamp)))
.min_by_key(|(_, ts)| *ts)
.map(|(n, _)| n)
}
fn write_frame(&mut self, name: &str, frame: Frame) -> anyhow::Result<Bytes> {
let track = self.tracks.get(name).context("missing track")?;
let pid = track.pid;
let kind = track.kind.clone();
let is_pcr = self.psi.as_ref().is_some_and(|p| p.pcr_pid == pid);
let is_video = matches!(kind, Kind::Video(_));
let es_payload = match &kind {
Kind::Video(stream_type) => Some(video_es_payload(*stream_type, track.source.description(), &frame)?),
Kind::Aac {
object_type,
sample_rate,
channel_count,
} => {
let header = adts::write_header(*object_type, *sample_rate, *channel_count, frame.payload.len())?;
let mut framed = Vec::with_capacity(7 + frame.payload.len());
framed.extend_from_slice(&header);
framed.extend_from_slice(&frame.payload);
Some(framed)
}
Kind::Mp2 { .. } | Kind::Ac3 | Kind::Eac3 => Some(frame.payload.to_vec()),
Kind::Verbatim {
framing: catalog::Framing::Pes,
..
} => Some(frame.payload.to_vec()),
Kind::Verbatim {
framing: catalog::Framing::Section,
..
} => None,
};
let dts = if is_video {
let pts = to_ticks(frame.timestamp);
let track = self.tracks.get_mut(name).context("missing track")?;
author_dts(pts, track.dts_reserve, &mut track.last_dts)
} else {
None
};
let mut out = Vec::with_capacity(TsPacket::SIZE);
let psi_due = match self.last_psi {
None => true,
Some(last) => frame.timestamp >= last && (frame.timestamp - last) >= psi_interval(),
};
if (is_video && frame.keyframe) || psi_due {
let psi = self.psi.as_ref().context("PSI not built")?;
let pat = TsPayload::Pat(psi.pat.clone());
let pmt = TsPayload::Pmt(psi.pmt.clone());
self.write_packet(&mut out, Pid::PAT, None, pat)?;
self.write_packet(&mut out, PMT_PID, None, pmt)?;
self.last_psi = Some(frame.timestamp);
}
match es_payload {
None => self.write_section(&mut out, pid, &frame.payload)?,
Some(es_payload) => {
let stream_id = match &kind {
Kind::Verbatim { stream_id, .. } => Some(stream_id.unwrap_or(StreamId::PRIVATE_STREAM_1)),
_ => None,
};
let unit = PesUnit {
pid,
is_pcr,
is_video,
keyframe: frame.keyframe,
timestamp: frame.timestamp,
dts,
stream_id,
};
self.write_pes(&mut out, &unit, &es_payload)?;
}
}
Ok(Bytes::from(out))
}
fn write_pes(&mut self, out: &mut Vec<u8>, unit: &PesUnit, payload: &[u8]) -> anyhow::Result<()> {
let pts = to_ts_timestamp(unit.timestamp)?;
let dts = unit
.dts
.map(|t| TsTimestamp::new(t & TS_TIMESTAMP_MASK).map_err(anyhow::Error::msg))
.transpose()?;
let stream_id = match unit.stream_id {
Some(id) => StreamId::new(id),
None if unit.is_video => StreamId::new(StreamId::VIDEO_MIN),
None => StreamId::new(StreamId::AUDIO_MIN),
};
let header = mpeg2ts::pes::PesHeader {
stream_id,
priority: false,
data_alignment_indicator: true,
copyright: false,
original_or_copy: false,
pts: Some(pts),
dts,
escr: None,
};
let optional_len = PES_OPTIONAL_LEN + if dts.is_some() { PES_DTS_LEN } else { 0 };
let pes_packet_len = if unit.is_video {
0
} else {
u16::try_from(optional_len + payload.len()).unwrap_or(0)
};
let pcr = dts.unwrap_or(pts);
let mut offset = 0;
let mut first = true;
loop {
let adaptation = if first && (unit.is_pcr || unit.keyframe) {
Some(AdaptationField {
discontinuity_indicator: false,
random_access_indicator: unit.keyframe,
es_priority_indicator: false,
pcr: if unit.is_pcr { Some(pcr.into()) } else { None },
opcr: None,
splice_countdown: None,
transport_private_data: Vec::new(),
extension: None,
})
} else {
None
};
let header_len = if first { 6 + optional_len } else { 0 };
let af_len = adaptation.as_ref().map(adaptation_size).unwrap_or(0);
let avail = TsBytes::MAX_SIZE - header_len - af_len;
let take = avail.min(payload.len() - offset);
let chunk = &payload[offset..offset + take];
let ts_payload = if first {
TsPayload::PesStart(Pes {
header: header.clone(),
pes_packet_len,
data: TsBytes::new(chunk).map_err(anyhow::Error::msg)?,
})
} else {
TsPayload::PesContinuation(TsBytes::new(chunk).map_err(anyhow::Error::msg)?)
};
self.write_packet(out, unit.pid, adaptation, ts_payload)?;
offset += take;
first = false;
if offset >= payload.len() {
break;
}
}
Ok(())
}
fn write_section(&mut self, out: &mut Vec<u8>, pid: u16, section: &[u8]) -> anyhow::Result<()> {
if !is_complete_section(section) {
tracing::warn!(pid, len = section.len(), "dropping malformed private section on export");
return Ok(());
}
let mut offset = 0;
let mut first = true;
loop {
let payload = if first {
let take = (TsBytes::MAX_SIZE - 1).min(section.len());
let chunk = §ion[..take];
offset = take;
TsPayload::Section(Section {
pointer_field: 0,
data: TsBytes::new(chunk).map_err(anyhow::Error::msg)?,
})
} else {
let take = TsBytes::MAX_SIZE.min(section.len() - offset);
let chunk = §ion[offset..offset + take];
offset += take;
TsPayload::Raw(TsBytes::new(chunk).map_err(anyhow::Error::msg)?)
};
self.write_packet(out, pid, None, payload)?;
first = false;
if offset >= section.len() {
break;
}
}
Ok(())
}
fn write_packet(
&mut self,
out: &mut Vec<u8>,
pid: u16,
adaptation_field: Option<AdaptationField>,
payload: TsPayload,
) -> anyhow::Result<()> {
let counter = self.counters.entry(pid).or_default();
let continuity_counter = *counter;
counter.increment();
let packet = TsPacket {
header: TsHeader {
transport_error_indicator: false,
transport_priority: false,
pid: Pid::new(pid)?,
transport_scrambling_control: TransportScramblingControl::NotScrambled,
continuity_counter,
},
adaptation_field,
payload: Some(payload),
};
let mut writer = TsPacketWriter::new(out);
writer.write_ts_packet(&packet).map_err(anyhow::Error::msg)?;
Ok(())
}
}
const PES_OPTIONAL_LEN: usize = 3 + 5;
const PES_DTS_LEN: usize = 5;
const DEFAULT_DTS_RESERVE: u64 = 16;
fn psi_interval() -> crate::container::Timestamp {
crate::container::Timestamp::try_from(PSI_INTERVAL).unwrap_or(crate::container::Timestamp::ZERO)
}
fn adaptation_size(af: &AdaptationField) -> usize {
2 + if af.pcr.is_some() { 6 } else { 0 }
}
const TS_TIMESTAMP_MASK: u64 = (1 << 33) - 1;
fn to_ticks(timestamp: crate::container::Timestamp) -> u64 {
(timestamp.as_micros() * 90_000 / 1_000_000) as u64
}
fn to_ts_timestamp(timestamp: crate::container::Timestamp) -> anyhow::Result<TsTimestamp> {
TsTimestamp::new(to_ticks(timestamp) & TS_TIMESTAMP_MASK).map_err(anyhow::Error::msg)
}
fn video_kind(config: &VideoConfig, name: &str) -> anyhow::Result<Kind> {
ensure_raw(&config.container, "video", name)?;
match &config.codec {
VideoCodec::H264(_) => Ok(Kind::Video(StreamType::H264)),
VideoCodec::H265(_) => Ok(Kind::Video(StreamType::H265)),
other => anyhow::bail!("TS export does not support video codec {other:?} (track '{name}')"),
}
}
fn video_es_payload(stream_type: StreamType, description: Option<&Bytes>, frame: &Frame) -> anyhow::Result<Vec<u8>> {
let description = description.context("video codec config (avcC/hvcC) not resolved")?;
let (length_size, params) = match stream_type {
StreamType::H264 => crate::codec::h264::avcc_params(description)?,
StreamType::H265 => crate::codec::h265::hvcc_params(description)?,
other => anyhow::bail!("unsupported TS video stream type {other:?}"),
};
let mut out = Vec::with_capacity(frame.payload.len() + 64);
if frame.keyframe {
for nal in ¶ms {
out.extend_from_slice(&annexb::START_CODE);
out.extend_from_slice(nal);
}
}
annexb::length_prefixed_to_annexb(&frame.payload, length_size, &mut out)?;
Ok(out)
}
fn audio_kind(config: &AudioConfig, name: &str) -> anyhow::Result<Kind> {
ensure_raw(&config.container, "audio", name)?;
match &config.codec {
AudioCodec::AAC(aac) => Ok(Kind::Aac {
object_type: aac.profile,
sample_rate: config.sample_rate,
channel_count: config.channel_count,
}),
AudioCodec::Mp2 => Ok(Kind::Mp2 {
sample_rate: config.sample_rate,
}),
AudioCodec::Ac3 => Ok(Kind::Ac3),
AudioCodec::Ec3 => Ok(Kind::Eac3),
other => anyhow::bail!("TS export does not support audio codec {other:?} (track '{name}')"),
}
}
fn track_descriptors(mpegts: &catalog::Mpegts, name: &str) -> Vec<catalog::Descriptor> {
mpegts
.tracks
.get(name)
.map(|t| t.descriptors.clone())
.unwrap_or_default()
}
fn to_pmt_descriptors(descriptors: &[catalog::Descriptor]) -> Vec<Descriptor> {
descriptors
.iter()
.map(|d| Descriptor {
tag: d.tag,
data: d.data.to_vec(),
})
.collect()
}
fn is_complete_section(section: &[u8]) -> bool {
section.len() >= 3 && section.len() == 3 + ((((section[1] & 0x0f) as usize) << 8) | section[2] as usize)
}
fn ensure_raw(container: &Container, kind: &str, name: &str) -> anyhow::Result<()> {
match container {
Container::Legacy | Container::Loc => Ok(()),
Container::Cmaf { .. } => anyhow::bail!("TS export does not support CMAF {kind} track '{name}'"),
}
}
fn author_dts(pts: u64, reserve: u64, last: &mut Option<u64>) -> Option<u64> {
let mut dts = pts.saturating_sub(reserve);
if let Some(prev) = *last
&& dts <= prev
{
dts = prev + 1;
}
*last = Some(dts);
(dts != pts).then_some(dts)
}
fn dts_reserve(config: &VideoConfig) -> u64 {
config
.jitter
.map(|t| t.as_scale(90_000) as u64)
.filter(|&ticks| ticks > 0)
.unwrap_or(DEFAULT_DTS_RESERVE)
}
#[cfg(test)]
mod tests {
use super::{DEFAULT_DTS_RESERVE, author_dts, is_complete_section};
fn run_clock(pts: &[u64], reserve: u64) -> Vec<u64> {
let mut last = None;
pts.iter()
.map(|&p| author_dts(p, reserve, &mut last).unwrap_or(p))
.collect()
}
fn decode_order(refs: usize, b: usize, dur: u64, base: u64) -> Vec<u64> {
let pts = |display: usize| base + display as u64 * dur;
let span = b + 1;
let mut out = vec![pts(0)]; for g in 1..refs {
let reference = g * span;
out.push(pts(reference)); for j in 1..=b {
out.push(pts(reference - span + j)); }
}
out
}
#[test]
fn dts_is_monotonic_across_reorder() {
for b in [1, 3, 5] {
let pts = decode_order(40, b, 3_600, 10_000_000);
let dts = run_clock(&pts, DEFAULT_DTS_RESERVE);
assert!(pts.windows(2).any(|w| w[1] < w[0]), "b={b}: stream must reorder PTS");
for (i, win) in dts.windows(2).enumerate() {
assert!(win[1] > win[0], "b={b}: DTS not strictly increasing at {i}: {win:?}");
}
}
}
#[test]
fn sufficient_reserve_keeps_dts_under_pts() {
let dur = 3_600;
for b in [1, 3, 5] {
let reserve = (b as u64 + 1) * dur; let pts = decode_order(40, b, dur, 10_000_000);
let dts = run_clock(&pts, reserve);
for (i, win) in dts.windows(2).enumerate() {
assert!(win[1] > win[0], "b={b}: DTS not strictly increasing at {i}: {win:?}");
}
for (i, (&d, &p)) in dts.iter().zip(pts.iter()).enumerate() {
assert!(d <= p, "b={b}: DTS {d} after PTS {p} at {i}");
}
}
}
#[test]
fn dts_clock_survives_33bit_wrap() {
let wrap = 1u64 << 33;
let pts = decode_order(40, 3, 3_600, wrap - 20 * 3_600);
let dts = run_clock(&pts, DEFAULT_DTS_RESERVE);
assert!(pts.iter().any(|&p| p >= wrap), "test must cross the wrap boundary");
for (i, win) in dts.windows(2).enumerate() {
assert!(
win[1] > win[0],
"DTS not strictly increasing across wrap at {i}: {win:?}"
);
}
}
#[test]
fn dts_without_reorder_trails_pts_by_the_reserve() {
let pts: Vec<u64> = (0..40).map(|i| 10_000_000 + i * 3_600).collect();
let dts = run_clock(&pts, DEFAULT_DTS_RESERVE);
for (i, win) in dts.windows(2).enumerate() {
assert!(win[1] > win[0], "DTS not strictly increasing at {i}: {win:?}");
}
for (i, (&d, &p)) in dts.iter().zip(pts.iter()).enumerate() {
assert_eq!(d, p - DEFAULT_DTS_RESERVE, "DTS should trail PTS by the reserve at {i}");
}
}
#[test]
fn section_validation() {
let mut ok = vec![0xfc, 0x30, 0x1b];
ok.resize(30, 0x00);
assert!(is_complete_section(&ok));
assert!(is_complete_section(&[0xfc, 0x00, 0x00]));
assert!(is_complete_section(&[0x00, 0x00, 0x00]));
assert!(!is_complete_section(&[0xfc, 0x00]));
assert!(!is_complete_section(&[0xfc, 0x30, 0x1b]));
}
}