use std::collections::HashMap;
use std::io::Cursor;
use std::task::Poll;
use std::time::Duration;
use anyhow::Context;
use bytes::{BufMut, Bytes, BytesMut};
use hang::catalog::{AudioCodec, AudioConfig, Catalog, Container, VideoCodec, VideoConfig};
use webm_iterable::matroska_spec::{Master, MatroskaSpec};
use webm_iterable::{WebmWriter, WriteOptions};
use crate::catalog::CatalogFormat;
use crate::container::Frame;
use crate::container::{CatalogSource, ExportSource};
const TIMESTAMP_SCALE_NS: u64 = 1_000_000;
pub struct Export {
broadcast: moq_net::BroadcastConsumer,
catalog: Option<CatalogSource>,
latency: Duration,
fragment_duration: Option<Duration>,
tracks: HashMap<String, MkvTrack>,
catalog_snapshot: Option<Catalog>,
header_emitted: bool,
cluster: Option<ClusterBuilder>,
}
struct MkvTrack {
source: ExportSource,
pending: Option<Frame>,
finished: bool,
track_number: u64,
kind: TrackKind,
}
#[derive(Clone, Copy, PartialEq, Eq)]
enum TrackKind {
Video,
Audio,
}
struct ClusterBuilder {
start_ticks: u64,
body: BytesMut,
max_ticks: u64,
has_video: bool,
}
impl ClusterBuilder {
fn new(start_ticks: u64) -> Self {
let mut body = BytesMut::with_capacity(64 * 1024);
write_tag_id(&mut body, ID_TIMESTAMP as u32);
let ts_bytes = encode_uint(start_ticks);
write_vint(&mut body, ts_bytes.len() as u64);
body.extend_from_slice(&ts_bytes);
Self {
start_ticks,
body,
max_ticks: start_ticks,
has_video: false,
}
}
fn append(
&mut self,
track_number: u64,
frame_ticks: u64,
keyframe: bool,
payload: &[u8],
is_video: bool,
) -> anyhow::Result<()> {
let rel = (frame_ticks as i64)
.checked_sub(self.start_ticks as i64)
.context("cluster underflow")?;
let rel: i16 = rel.try_into().context("block timestamp doesn't fit in i16")?;
let sb_body = encode_simple_block_body(track_number, rel, keyframe, payload);
write_tag_id(&mut self.body, ID_SIMPLEBLOCK as u32);
write_vint(&mut self.body, sb_body.len() as u64);
self.body.extend_from_slice(&sb_body);
if frame_ticks > self.max_ticks {
self.max_ticks = frame_ticks;
}
if is_video {
self.has_video = true;
}
Ok(())
}
fn fits(&self, frame_ticks: u64) -> bool {
match (frame_ticks as i64).checked_sub(self.start_ticks as i64) {
Some(rel) => i16::try_from(rel).is_ok(),
None => false,
}
}
fn finish(self) -> Bytes {
let mut out = BytesMut::with_capacity(self.body.len() + 16);
write_tag_id(&mut out, ID_CLUSTER);
write_vint(&mut out, self.body.len() as u64);
out.extend_from_slice(&self.body);
out.freeze()
}
}
impl Export {
pub fn new(broadcast: moq_net::BroadcastConsumer) -> Result<Self, crate::Error> {
Self::with_catalog_format(broadcast, CatalogFormat::default())
}
pub fn with_catalog_format(
broadcast: moq_net::BroadcastConsumer,
catalog_format: CatalogFormat,
) -> Result<Self, crate::Error> {
let catalog = CatalogSource::new(&broadcast, catalog_format)?;
Ok(Self {
broadcast,
catalog: Some(catalog),
latency: Duration::ZERO,
fragment_duration: None,
tracks: HashMap::new(),
catalog_snapshot: None,
header_emitted: false,
cluster: None,
})
}
pub fn with_latency(mut self, latency: Duration) -> Self {
self.latency = latency;
self
}
pub fn with_fragment_duration(mut self, duration: impl Into<Option<Duration>>) -> Self {
self.fragment_duration = duration.into();
self
}
pub async fn next(&mut self) -> anyhow::Result<Option<Bytes>> {
kio::wait(|waiter| self.poll_next(waiter)).await
}
pub fn poll_next(&mut self, waiter: &kio::Waiter) -> Poll<anyhow::Result<Option<Bytes>>> {
while let Some(catalog) = self.catalog.as_mut() {
match catalog.poll_next(waiter)? {
Poll::Ready(Some(snapshot)) => self.update_catalog(snapshot)?,
Poll::Ready(None) => {
self.catalog = None;
break;
}
Poll::Pending => break,
}
}
let waiting_for_header = !self.header_emitted;
for track in self.tracks.values_mut() {
if track.pending.is_some() || track.finished {
continue;
}
loop {
match track.source.poll_read(waiter) {
Poll::Ready(Ok(Some(frame))) => {
if waiting_for_header && !track.source.header_ready() {
continue;
}
track.pending = Some(frame);
break;
}
Poll::Ready(Ok(None)) => {
track.finished = true;
break;
}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => break,
}
}
}
if !self.header_emitted {
if self.header_ready() {
let header = self.build_header()?;
self.header_emitted = true;
return Poll::Ready(Ok(Some(header)));
}
if self.catalog.is_none() && self.tracks.values().all(|t| t.finished) {
return Poll::Ready(Ok(None));
}
return Poll::Pending;
}
if let Some(name) = self.pick_next_track() {
let frame = self.tracks.get_mut(&name).unwrap().pending.take().unwrap();
if let Some(chunk) = self.feed_frame(&name, frame)? {
return Poll::Ready(Ok(Some(chunk)));
}
return self.poll_next(waiter);
}
if !self.tracks.is_empty() && self.tracks.values().all(|t| t.finished) {
if let Some(cluster) = self.cluster.take() {
return Poll::Ready(Ok(Some(cluster.finish())));
}
if self.catalog.is_none() {
return Poll::Ready(Ok(None));
}
} else if self.catalog.is_none() && self.tracks.is_empty() {
return Poll::Ready(Ok(None));
}
self.tracks.retain(|_, t| !(t.finished && t.pending.is_none()));
Poll::Pending
}
fn update_catalog(&mut self, catalog: Catalog) -> anyhow::Result<()> {
let mut active: HashMap<String, ()> = HashMap::new();
for name in catalog.video.renditions.keys() {
active.insert(name.clone(), ());
}
for name in catalog.audio.renditions.keys() {
active.insert(name.clone(), ());
}
if self.header_emitted {
for name in active.keys() {
if !self.tracks.contains_key(name) {
anyhow::bail!("MKV track layout changed after header was emitted: track '{name}' added");
}
}
for name in self.tracks.keys() {
if !active.contains_key(name) {
anyhow::bail!("MKV track layout changed after header was emitted: track '{name}' removed");
}
}
self.catalog_snapshot = Some(catalog);
return Ok(());
}
let mut next_track_number: u64 = self.tracks.values().map(|t| t.track_number).max().unwrap_or(0) + 1;
for (name, config) in catalog.video.renditions.iter() {
if self.tracks.contains_key(name) {
continue;
}
ensure_legacy(&config.container, "video", name)?;
let source = ExportSource::for_video(&self.broadcast, name, config, self.latency)?;
self.tracks.insert(
name.clone(),
MkvTrack {
source,
pending: None,
finished: false,
track_number: next_track_number,
kind: TrackKind::Video,
},
);
next_track_number += 1;
}
for (name, config) in catalog.audio.renditions.iter() {
if self.tracks.contains_key(name) {
continue;
}
ensure_legacy(&config.container, "audio", name)?;
let source = ExportSource::for_audio(&self.broadcast, name, config, self.latency)?;
self.tracks.insert(
name.clone(),
MkvTrack {
source,
pending: None,
finished: false,
track_number: next_track_number,
kind: TrackKind::Audio,
},
);
next_track_number += 1;
}
self.tracks.retain(|name, _| active.contains_key(name));
self.catalog_snapshot = Some(catalog);
Ok(())
}
fn header_ready(&self) -> bool {
self.tracks.values().all(|t| t.source.header_ready())
}
fn build_header(&self) -> anyhow::Result<Bytes> {
let catalog = self.catalog_snapshot.as_ref().context("no catalog snapshot")?;
let webm_only = catalog
.video
.renditions
.values()
.all(|c| matches!(c.codec, VideoCodec::VP8 | VideoCodec::VP9(_) | VideoCodec::AV1(_)))
&& catalog
.audio
.renditions
.values()
.all(|c| matches!(c.codec, AudioCodec::Opus));
let doc_type = if webm_only { "webm" } else { "matroska" };
let mut entries: Vec<MatroskaSpec> = Vec::new();
for (name, config) in catalog.video.renditions.iter() {
let track = self.tracks.get(name).context("video track not subscribed")?;
entries.push(build_video_track_entry(
track.track_number,
config,
track.source.description(),
)?);
}
for (name, config) in catalog.audio.renditions.iter() {
let track = self.tracks.get(name).context("audio track not subscribed")?;
entries.push(build_audio_track_entry(track.track_number, config)?);
}
let mut dest = Cursor::new(Vec::new());
{
let mut writer = WebmWriter::new(&mut dest);
writer.write(&MatroskaSpec::Ebml(Master::Full(vec![
MatroskaSpec::DocType(doc_type.to_string()),
MatroskaSpec::DocTypeVersion(4),
MatroskaSpec::DocTypeReadVersion(2),
])))?;
writer.write_advanced(
&MatroskaSpec::Segment(Master::Start),
WriteOptions::is_unknown_sized_element(),
)?;
writer.write(&MatroskaSpec::Info(Master::Full(vec![
MatroskaSpec::TimestampScale(TIMESTAMP_SCALE_NS),
MatroskaSpec::MuxingApp("moq-mux".to_string()),
MatroskaSpec::WritingApp("moq-mux".to_string()),
])))?;
writer.write(&MatroskaSpec::Tracks(Master::Full(entries)))?;
writer.flush()?;
}
Ok(Bytes::from(dest.into_inner()))
}
fn pick_next_track(&self) -> Option<String> {
self.tracks
.iter()
.filter_map(|(n, t)| t.pending.as_ref().map(|f| (n.clone(), f.timestamp)))
.min_by_key(|(_, ts)| *ts)
.map(|(n, _)| n)
}
fn feed_frame(&mut self, name: &str, frame: Frame) -> anyhow::Result<Option<Bytes>> {
let track = self.tracks.get(name).context("missing track")?;
let track_number = track.track_number;
let kind = track.kind;
let payload = &frame.payload;
let frame_ticks: u64 = (frame.timestamp.as_micros() / 1_000)
.try_into()
.context("timestamp doesn't fit in u64 ms")?;
let is_video = kind == TrackKind::Video;
let keyframe = frame.keyframe;
let roll_over = match &self.cluster {
None => true,
Some(cluster) => {
let overflow = !cluster.fits(frame_ticks);
let gop_boundary = is_video && keyframe && cluster.has_video;
let too_long = match self.fragment_duration {
Some(d) if d.is_zero() => !cluster.body.is_empty(),
Some(d) => frame_ticks.saturating_sub(cluster.start_ticks) >= d.as_millis() as u64,
None => false,
};
overflow || gop_boundary || too_long
}
};
let emit = if roll_over {
let finished = self.cluster.take().map(|c| c.finish());
self.cluster = Some(ClusterBuilder::new(frame_ticks));
finished
} else {
None
};
self.cluster
.as_mut()
.unwrap()
.append(track_number, frame_ticks, keyframe, payload, is_video)?;
Ok(emit)
}
}
fn ensure_legacy(container: &Container, kind: &str, name: &str) -> anyhow::Result<()> {
match container {
Container::Legacy | Container::Loc => Ok(()),
Container::Cmaf { .. } => {
anyhow::bail!("MKV export does not support CMAF {} track '{}'", kind, name);
}
}
}
fn build_video_track_entry(
track_number: u64,
config: &VideoConfig,
description: Option<&Bytes>,
) -> anyhow::Result<MatroskaSpec> {
let codec_private = description.map(|b| b.to_vec());
let (codec_id, codec_private) = match &config.codec {
VideoCodec::VP8 => ("V_VP8", None),
VideoCodec::VP9(_) => ("V_VP9", None),
VideoCodec::AV1(_) => ("V_AV1", codec_private),
VideoCodec::H264(_) => {
let avcc = codec_private.context("H.264 track missing AVCDecoderConfigurationRecord")?;
("V_MPEG4/ISO/AVC", Some(avcc))
}
VideoCodec::H265(_) => {
let hvcc = codec_private.context("H.265 track missing HEVCDecoderConfigurationRecord")?;
("V_MPEGH/ISO/HEVC", Some(hvcc))
}
other => anyhow::bail!("MKV export does not support video codec {:?}", other),
};
let mut video_children: Vec<MatroskaSpec> = Vec::new();
if let Some(w) = config.coded_width {
video_children.push(MatroskaSpec::PixelWidth(w as u64));
}
if let Some(h) = config.coded_height {
video_children.push(MatroskaSpec::PixelHeight(h as u64));
}
let mut entry: Vec<MatroskaSpec> = vec![
MatroskaSpec::TrackNumber(track_number),
MatroskaSpec::TrackUID(track_number),
MatroskaSpec::TrackType(1),
MatroskaSpec::CodecID(codec_id.to_string()),
];
if let Some(cp) = codec_private {
entry.push(MatroskaSpec::CodecPrivate(cp));
}
if !video_children.is_empty() {
entry.push(MatroskaSpec::Video(Master::Full(video_children)));
}
Ok(MatroskaSpec::TrackEntry(Master::Full(entry)))
}
fn build_audio_track_entry(track_number: u64, config: &AudioConfig) -> anyhow::Result<MatroskaSpec> {
let (codec_id, codec_private) = match &config.codec {
AudioCodec::Opus => (
"A_OPUS",
Some(
crate::codec::opus::Config {
sample_rate: config.sample_rate,
channel_count: config.channel_count,
}
.encode()
.to_vec(),
),
),
AudioCodec::AAC(_) => (
"A_AAC",
Some(
config
.description
.as_ref()
.context("AAC track missing AudioSpecificConfig (description)")?
.to_vec(),
),
),
other => anyhow::bail!("MKV export does not support audio codec {:?}", other),
};
let entry = vec![
MatroskaSpec::TrackNumber(track_number),
MatroskaSpec::TrackUID(track_number),
MatroskaSpec::TrackType(2),
MatroskaSpec::CodecID(codec_id.to_string()),
MatroskaSpec::CodecPrivate(codec_private.unwrap()),
MatroskaSpec::Audio(Master::Full(vec![
MatroskaSpec::SamplingFrequency(config.sample_rate as f64),
MatroskaSpec::Channels(config.channel_count as u64),
])),
];
Ok(MatroskaSpec::TrackEntry(Master::Full(entry)))
}
const ID_CLUSTER: u32 = 0x1F43B675;
const ID_TIMESTAMP: u16 = 0xE7;
const ID_SIMPLEBLOCK: u16 = 0xA3;
fn encode_simple_block_body(track_number: u64, rel_ts: i16, keyframe: bool, payload: &[u8]) -> Bytes {
let mut data = BytesMut::with_capacity(payload.len() + 11);
write_vint(&mut data, track_number);
data.put_i16(rel_ts);
let mut flags: u8 = 0;
if keyframe {
flags |= 0x80;
}
data.put_u8(flags);
data.extend_from_slice(payload);
data.freeze()
}
fn write_tag_id(buf: &mut BytesMut, id: u32) {
let bytes = id.to_be_bytes();
let start = bytes.iter().position(|&b| b != 0).unwrap_or(3);
buf.extend_from_slice(&bytes[start..]);
}
fn encode_uint(value: u64) -> Vec<u8> {
if value == 0 {
return vec![0];
}
let leading_zero_bytes = (value.leading_zeros() / 8) as usize;
let bytes = value.to_be_bytes();
bytes[leading_zero_bytes..].to_vec()
}
fn write_vint(buf: &mut BytesMut, value: u64) {
let mut width = 1;
while width < 8 && value >= (1u64 << (7 * width)) - 1 {
width += 1;
}
let marker = 1u8 << (8 - width);
let mut bytes = [0u8; 8];
for i in 0..width {
bytes[width - 1 - i] = (value >> (8 * i)) as u8;
}
bytes[0] |= marker;
buf.extend_from_slice(&bytes[..width]);
}