use std::collections::HashMap;
use std::task::Poll;
use std::time::Duration;
use anyhow::Context;
use bytes::Bytes;
use hang::catalog::{AudioCodec, AudioConfig, Container, VideoCodec, VideoConfig};
use mpeg2ts::es::StreamId;
use mpeg2ts::es::StreamType;
use mpeg2ts::time::Timestamp as TsTimestamp;
use mpeg2ts::ts::payload::{Bytes as TsBytes, Pat, Pes, Pmt, Section};
use mpeg2ts::ts::{
AdaptationField, ContinuityCounter, Descriptor, EsInfo, Pid, ProgramAssociation, TransportScramblingControl,
TsHeader, TsPacket, TsPacketWriter, TsPayload, VersionNumber, WriteTsPacket,
};
use crate::catalog::CatalogFormat;
use crate::catalog::hang::Catalog;
use crate::codec::annexb;
use crate::container::{CatalogSource, ExportSource, Frame};
use super::adts;
use super::scte35;
const PMT_PID: u16 = 0x1000;
const FIRST_ES_PID: u16 = 0x1001;
const PSI_INTERVAL: Duration = Duration::from_millis(500);
pub struct Export<E: scte35::Catalog = ()> {
broadcast: moq_net::BroadcastConsumer,
catalog: Option<CatalogSource<E>>,
latency: Duration,
tracks: HashMap<String, Track>,
counters: HashMap<u16, ContinuityCounter>,
psi: Option<Psi>,
last_psi: Option<crate::container::Timestamp>,
}
struct Track {
source: ExportSource,
pending: Option<Frame>,
finished: bool,
pid: u16,
kind: Kind,
}
#[derive(Clone)]
enum Kind {
Video(StreamType),
Aac {
object_type: u8,
sample_rate: u32,
channel_count: u32,
},
Mp2 { sample_rate: u32 },
Ac3,
Eac3,
Scte35,
}
struct Psi {
pat: Pat,
pmt: Pmt,
pcr_pid: u16,
}
struct PesUnit {
pid: u16,
is_pcr: bool,
is_video: bool,
keyframe: bool,
timestamp: crate::container::Timestamp,
}
impl Export {
pub fn new(broadcast: moq_net::BroadcastConsumer) -> Result<Self, crate::Error> {
Self::with_catalog_format(broadcast, CatalogFormat::default())
}
pub fn with_catalog_format(
broadcast: moq_net::BroadcastConsumer,
catalog_format: CatalogFormat,
) -> Result<Self, crate::Error> {
Self::build(broadcast, catalog_format)
}
}
impl Export<scte35::Ext> {
pub fn with_scte35(
broadcast: moq_net::BroadcastConsumer,
catalog_format: CatalogFormat,
) -> Result<Self, crate::Error> {
Self::build(broadcast, catalog_format)
}
}
impl<E: scte35::Catalog> Export<E> {
fn build(broadcast: moq_net::BroadcastConsumer, catalog_format: CatalogFormat) -> Result<Self, crate::Error> {
let catalog = CatalogSource::new(&broadcast, catalog_format)?;
Ok(Self {
broadcast,
catalog: Some(catalog),
latency: Duration::ZERO,
tracks: HashMap::new(),
counters: HashMap::new(),
psi: None,
last_psi: None,
})
}
pub fn with_latency(mut self, latency: Duration) -> Self {
self.latency = latency;
self
}
pub async fn next(&mut self) -> anyhow::Result<Option<Bytes>> {
kio::wait(|waiter| self.poll_next(waiter)).await
}
pub fn poll_next(&mut self, waiter: &kio::Waiter) -> Poll<anyhow::Result<Option<Bytes>>> {
while let Some(catalog) = self.catalog.as_mut() {
match catalog.poll_next(waiter)? {
Poll::Ready(Some(snapshot)) => self.update_catalog(snapshot)?,
Poll::Ready(None) => {
self.catalog = None;
break;
}
Poll::Pending => break,
}
}
let waiting_for_header = self.psi.is_none();
for track in self.tracks.values_mut() {
if track.pending.is_some() || track.finished {
continue;
}
loop {
match track.source.poll_read(waiter) {
Poll::Ready(Ok(Some(frame))) => {
if waiting_for_header && !track.source.header_ready() {
continue;
}
track.pending = Some(frame);
break;
}
Poll::Ready(Ok(None)) => {
track.finished = true;
break;
}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => break,
}
}
}
if self.psi.is_none() {
if self.tracks.is_empty() {
if self.catalog.is_none() {
return Poll::Ready(Ok(None));
}
return Poll::Pending;
}
if !self.header_ready() {
if self.catalog.is_none() && self.tracks.values().all(|t| t.finished) {
return Poll::Ready(Ok(None));
}
return Poll::Pending;
}
self.build_psi()?;
let header = self.write_psi()?;
return Poll::Ready(Ok(Some(header)));
}
if let Some(name) = self.pick_next_track() {
let frame = self.tracks.get_mut(&name).unwrap().pending.take().unwrap();
let chunk = self.write_frame(&name, frame)?;
return Poll::Ready(Ok(Some(chunk)));
}
if self.catalog.is_none() && !self.tracks.is_empty() && self.tracks.values().all(|t| t.finished) {
return Poll::Ready(Ok(None));
}
if self.catalog.is_none() && self.tracks.is_empty() {
return Poll::Ready(Ok(None));
}
Poll::Pending
}
fn update_catalog(&mut self, mut catalog: Catalog<E>) -> anyhow::Result<()> {
let scte35 = catalog.scte35_mut().cloned().unwrap_or_default();
let mut active: HashMap<String, ()> = HashMap::new();
for name in catalog.video.renditions.keys() {
active.insert(name.clone(), ());
}
for name in catalog.audio.renditions.keys() {
active.insert(name.clone(), ());
}
for name in scte35.renditions.keys() {
active.insert(name.clone(), ());
}
if self.psi.is_some() {
for name in active.keys() {
anyhow::ensure!(
self.tracks.contains_key(name),
"TS track layout changed after PAT/PMT was emitted: '{name}' added"
);
}
for name in self.tracks.keys() {
anyhow::ensure!(
active.contains_key(name),
"TS track layout changed after PAT/PMT was emitted: '{name}' removed"
);
}
return Ok(());
}
let mut next_pid = self
.tracks
.values()
.map(|t| t.pid)
.max()
.map(|p| p + 1)
.unwrap_or(FIRST_ES_PID);
for (name, config) in catalog.video.renditions.iter() {
if self.tracks.contains_key(name) {
continue;
}
let kind = video_kind(config, name)?;
let source = ExportSource::for_video(&self.broadcast, name, config, self.latency)?;
self.tracks.insert(
name.clone(),
Track {
source,
pending: None,
finished: false,
pid: next_pid,
kind,
},
);
next_pid += 1;
}
for (name, config) in catalog.audio.renditions.iter() {
if self.tracks.contains_key(name) {
continue;
}
let kind = audio_kind(config, name)?;
let source = ExportSource::for_audio(&self.broadcast, name, config, self.latency)?;
self.tracks.insert(
name.clone(),
Track {
source,
pending: None,
finished: false,
pid: next_pid,
kind,
},
);
next_pid += 1;
}
for (name, config) in scte35.renditions.iter() {
if self.tracks.contains_key(name) {
continue;
}
let kind = scte35_kind(config, name)?;
let source = ExportSource::for_scte35(&self.broadcast, name, config, self.latency)?;
self.tracks.insert(
name.clone(),
Track {
source,
pending: None,
finished: false,
pid: next_pid,
kind,
},
);
next_pid += 1;
}
self.tracks.retain(|name, _| active.contains_key(name));
Ok(())
}
fn header_ready(&self) -> bool {
self.tracks.values().all(|t| t.source.header_ready())
}
fn build_psi(&mut self) -> anyhow::Result<()> {
let mut tracks: Vec<&Track> = self.tracks.values().collect();
tracks.sort_by_key(|t| t.pid);
let has_scte = tracks.iter().any(|t| matches!(t.kind, Kind::Scte35));
let video = tracks.iter().find(|t| matches!(t.kind, Kind::Video(_)));
anyhow::ensure!(
!has_scte || video.is_some(),
"TS export of SCTE-35 requires a video track for the program clock"
);
let pcr_pid = video
.or_else(|| {
tracks
.iter()
.find(|t| matches!(t.kind, Kind::Aac { .. } | Kind::Mp2 { .. } | Kind::Ac3 | Kind::Eac3))
})
.map(|t| t.pid)
.context("TS export requires a video or audio track for the PCR")?;
let es_info = tracks
.iter()
.map(|t| {
Ok(EsInfo {
stream_type: match t.kind {
Kind::Video(stream_type) => stream_type,
Kind::Aac { .. } => StreamType::AdtsAac,
Kind::Mp2 { sample_rate } if sample_rate < 32000 => StreamType::Mpeg2HalvedSampleRateAudio,
Kind::Mp2 { .. } => StreamType::Mpeg1Audio,
Kind::Ac3 => StreamType::DolbyDigitalUpToSixChannelAudio,
Kind::Eac3 => StreamType::DolbyDigitalPlusUpTo16ChannelAudioForAtsc,
Kind::Scte35 => StreamType::Dts8ChannelLosslessAudio,
},
elementary_pid: Pid::new(t.pid)?,
descriptors: match t.kind {
Kind::Ac3 => vec![Descriptor {
tag: 0x05,
data: b"AC-3".to_vec(),
}],
Kind::Eac3 => vec![Descriptor {
tag: 0x05,
data: b"EAC3".to_vec(),
}],
_ => Vec::new(),
},
})
})
.collect::<anyhow::Result<Vec<_>>>()?;
let program_info = if tracks.iter().any(|t| matches!(t.kind, Kind::Scte35)) {
vec![Descriptor {
tag: 0x05,
data: b"CUEI".to_vec(),
}]
} else {
Vec::new()
};
let pat = Pat {
transport_stream_id: 1,
version_number: VersionNumber::default(),
table: vec![ProgramAssociation {
program_num: 1,
program_map_pid: Pid::new(PMT_PID)?,
}],
};
let pmt = Pmt {
program_num: 1,
pcr_pid: Some(Pid::new(pcr_pid)?),
version_number: VersionNumber::default(),
program_info,
es_info,
};
self.psi = Some(Psi { pat, pmt, pcr_pid });
Ok(())
}
fn write_psi(&mut self) -> anyhow::Result<Bytes> {
let psi = self.psi.as_ref().context("PSI not built")?;
let pat = TsPayload::Pat(psi.pat.clone());
let pmt = TsPayload::Pmt(psi.pmt.clone());
let mut out = Vec::with_capacity(2 * TsPacket::SIZE);
self.write_packet(&mut out, Pid::PAT, None, pat)?;
self.write_packet(&mut out, PMT_PID, None, pmt)?;
Ok(Bytes::from(out))
}
fn pick_next_track(&self) -> Option<String> {
self.tracks
.iter()
.filter_map(|(n, t)| t.pending.as_ref().map(|f| (n.clone(), f.timestamp)))
.min_by_key(|(_, ts)| *ts)
.map(|(n, _)| n)
}
fn write_frame(&mut self, name: &str, frame: Frame) -> anyhow::Result<Bytes> {
let track = self.tracks.get(name).context("missing track")?;
let pid = track.pid;
let kind = track.kind.clone();
let is_pcr = self.psi.as_ref().is_some_and(|p| p.pcr_pid == pid);
let is_video = matches!(kind, Kind::Video(_));
let es_payload = match &kind {
Kind::Video(stream_type) => Some(video_es_payload(*stream_type, track.source.description(), &frame)?),
Kind::Aac {
object_type,
sample_rate,
channel_count,
} => {
let header = adts::write_header(*object_type, *sample_rate, *channel_count, frame.payload.len())?;
let mut framed = Vec::with_capacity(7 + frame.payload.len());
framed.extend_from_slice(&header);
framed.extend_from_slice(&frame.payload);
Some(framed)
}
Kind::Mp2 { .. } | Kind::Ac3 | Kind::Eac3 => Some(frame.payload.to_vec()),
Kind::Scte35 => None,
};
let mut out = Vec::with_capacity(TsPacket::SIZE);
let psi_due = match self.last_psi {
None => true,
Some(last) => frame.timestamp >= last && (frame.timestamp - last) >= psi_interval(),
};
if (is_video && frame.keyframe) || psi_due {
let psi = self.psi.as_ref().context("PSI not built")?;
let pat = TsPayload::Pat(psi.pat.clone());
let pmt = TsPayload::Pmt(psi.pmt.clone());
self.write_packet(&mut out, Pid::PAT, None, pat)?;
self.write_packet(&mut out, PMT_PID, None, pmt)?;
self.last_psi = Some(frame.timestamp);
}
match es_payload {
None => self.write_section(&mut out, pid, &frame.payload)?,
Some(es_payload) => {
let unit = PesUnit {
pid,
is_pcr,
is_video,
keyframe: frame.keyframe,
timestamp: frame.timestamp,
};
self.write_pes(&mut out, &unit, &es_payload)?;
}
}
Ok(Bytes::from(out))
}
fn write_pes(&mut self, out: &mut Vec<u8>, unit: &PesUnit, payload: &[u8]) -> anyhow::Result<()> {
let pts = to_ts_timestamp(unit.timestamp)?;
let stream_id = if unit.is_video {
StreamId::new(StreamId::VIDEO_MIN)
} else {
StreamId::new(StreamId::AUDIO_MIN)
};
let header = mpeg2ts::pes::PesHeader {
stream_id,
priority: false,
data_alignment_indicator: true,
copyright: false,
original_or_copy: false,
pts: Some(pts),
dts: None,
escr: None,
};
let pes_packet_len = if unit.is_video {
0
} else {
u16::try_from(PES_OPTIONAL_LEN + payload.len()).unwrap_or(0)
};
let mut offset = 0;
let mut first = true;
loop {
let adaptation = if first && (unit.is_pcr || unit.keyframe) {
Some(AdaptationField {
discontinuity_indicator: false,
random_access_indicator: unit.keyframe,
es_priority_indicator: false,
pcr: if unit.is_pcr { Some(pts.into()) } else { None },
opcr: None,
splice_countdown: None,
transport_private_data: Vec::new(),
extension: None,
})
} else {
None
};
let header_len = if first { PES_HEADER_LEN } else { 0 };
let af_len = adaptation.as_ref().map(adaptation_size).unwrap_or(0);
let avail = TsBytes::MAX_SIZE - header_len - af_len;
let take = avail.min(payload.len() - offset);
let chunk = &payload[offset..offset + take];
let ts_payload = if first {
TsPayload::PesStart(Pes {
header: header.clone(),
pes_packet_len,
data: TsBytes::new(chunk).map_err(anyhow::Error::msg)?,
})
} else {
TsPayload::PesContinuation(TsBytes::new(chunk).map_err(anyhow::Error::msg)?)
};
self.write_packet(out, unit.pid, adaptation, ts_payload)?;
offset += take;
first = false;
if offset >= payload.len() {
break;
}
}
Ok(())
}
fn write_section(&mut self, out: &mut Vec<u8>, pid: u16, section: &[u8]) -> anyhow::Result<()> {
if !is_complete_scte35_section(section) {
tracing::warn!(pid, len = section.len(), "dropping malformed SCTE-35 section on export");
return Ok(());
}
let mut offset = 0;
let mut first = true;
loop {
let payload = if first {
let take = (TsBytes::MAX_SIZE - 1).min(section.len());
let chunk = §ion[..take];
offset = take;
TsPayload::Section(Section {
pointer_field: 0,
data: TsBytes::new(chunk).map_err(anyhow::Error::msg)?,
})
} else {
let take = TsBytes::MAX_SIZE.min(section.len() - offset);
let chunk = §ion[offset..offset + take];
offset += take;
TsPayload::Raw(TsBytes::new(chunk).map_err(anyhow::Error::msg)?)
};
self.write_packet(out, pid, None, payload)?;
first = false;
if offset >= section.len() {
break;
}
}
Ok(())
}
fn write_packet(
&mut self,
out: &mut Vec<u8>,
pid: u16,
adaptation_field: Option<AdaptationField>,
payload: TsPayload,
) -> anyhow::Result<()> {
let counter = self.counters.entry(pid).or_default();
let continuity_counter = *counter;
counter.increment();
let packet = TsPacket {
header: TsHeader {
transport_error_indicator: false,
transport_priority: false,
pid: Pid::new(pid)?,
transport_scrambling_control: TransportScramblingControl::NotScrambled,
continuity_counter,
},
adaptation_field,
payload: Some(payload),
};
let mut writer = TsPacketWriter::new(out);
writer.write_ts_packet(&packet).map_err(anyhow::Error::msg)?;
Ok(())
}
}
const PES_OPTIONAL_LEN: usize = 3 + 5;
const PES_HEADER_LEN: usize = 6 + PES_OPTIONAL_LEN;
fn psi_interval() -> crate::container::Timestamp {
crate::container::Timestamp::try_from(PSI_INTERVAL).unwrap_or(crate::container::Timestamp::ZERO)
}
fn adaptation_size(af: &AdaptationField) -> usize {
2 + if af.pcr.is_some() { 6 } else { 0 }
}
fn to_ts_timestamp(timestamp: crate::container::Timestamp) -> anyhow::Result<TsTimestamp> {
let micros = timestamp.as_micros();
let ticks = (micros * 90_000 / 1_000_000) as u64 & ((1 << 33) - 1);
TsTimestamp::new(ticks).map_err(anyhow::Error::msg)
}
fn video_kind(config: &VideoConfig, name: &str) -> anyhow::Result<Kind> {
ensure_raw(&config.container, "video", name)?;
match &config.codec {
VideoCodec::H264(_) => Ok(Kind::Video(StreamType::H264)),
VideoCodec::H265(_) => Ok(Kind::Video(StreamType::H265)),
other => anyhow::bail!("TS export does not support video codec {other:?} (track '{name}')"),
}
}
fn video_es_payload(stream_type: StreamType, description: Option<&Bytes>, frame: &Frame) -> anyhow::Result<Vec<u8>> {
let description = description.context("video codec config (avcC/hvcC) not resolved")?;
let (length_size, params) = match stream_type {
StreamType::H264 => crate::codec::h264::avcc_params(description)?,
StreamType::H265 => crate::codec::h265::hvcc_params(description)?,
other => anyhow::bail!("unsupported TS video stream type {other:?}"),
};
let mut out = Vec::with_capacity(frame.payload.len() + 64);
if frame.keyframe {
for nal in ¶ms {
out.extend_from_slice(&annexb::START_CODE);
out.extend_from_slice(nal);
}
}
annexb::length_prefixed_to_annexb(&frame.payload, length_size, &mut out)?;
Ok(out)
}
fn audio_kind(config: &AudioConfig, name: &str) -> anyhow::Result<Kind> {
ensure_raw(&config.container, "audio", name)?;
match &config.codec {
AudioCodec::AAC(aac) => Ok(Kind::Aac {
object_type: aac.profile,
sample_rate: config.sample_rate,
channel_count: config.channel_count,
}),
AudioCodec::Mp2 => Ok(Kind::Mp2 {
sample_rate: config.sample_rate,
}),
AudioCodec::Ac3 => Ok(Kind::Ac3),
AudioCodec::Ec3 => Ok(Kind::Eac3),
other => anyhow::bail!("TS export does not support audio codec {other:?} (track '{name}')"),
}
}
fn scte35_kind(config: &scte35::Config, name: &str) -> anyhow::Result<Kind> {
ensure_raw(&config.container, "scte35", name)?;
Ok(Kind::Scte35)
}
fn is_complete_scte35_section(section: &[u8]) -> bool {
section.len() >= 3
&& section[0] == 0xfc
&& section.len() == 3 + ((((section[1] & 0x0f) as usize) << 8) | section[2] as usize)
}
fn ensure_raw(container: &Container, kind: &str, name: &str) -> anyhow::Result<()> {
match container {
Container::Legacy | Container::Loc => Ok(()),
Container::Cmaf { .. } => anyhow::bail!("TS export does not support CMAF {kind} track '{name}'"),
}
}
#[cfg(test)]
mod tests {
use super::is_complete_scte35_section;
#[test]
fn scte35_section_validation() {
let mut ok = vec![0xfc, 0x30, 0x1b];
ok.resize(30, 0x00);
assert!(is_complete_scte35_section(&ok));
assert!(is_complete_scte35_section(&[0xfc, 0x00, 0x00]));
assert!(!is_complete_scte35_section(&[0xfc, 0x00]));
assert!(!is_complete_scte35_section(&[0x00, 0x00, 0x00]));
assert!(!is_complete_scte35_section(&[0xfc, 0x30, 0x1b]));
}
}