use std::collections::HashMap;
use std::task::Poll;
use std::time::Duration;
use anyhow::Context;
use bytes::Bytes;
use hang::catalog::{Catalog, Container, VideoConfig};
use mp4_atom::{DecodeMaybe, Encode};
use crate::catalog::CatalogFormat;
use crate::container::Frame;
use crate::container::{CatalogSource, ExportSource};
pub struct Export {
broadcast: moq_net::BroadcastConsumer,
catalog: Option<CatalogSource>,
latency: Duration,
fragment_duration: Option<Duration>,
tracks: HashMap<String, Fmp4Track>,
catalog_snapshot: Option<Catalog>,
init_emitted: bool,
}
struct Fmp4Track {
source: ExportSource,
pending: Option<Frame>,
buffer: Vec<Frame>,
is_video: bool,
finished: bool,
track_id: u32,
timescale: u64,
sequence_number: u32,
}
impl Export {
pub fn new(broadcast: moq_net::BroadcastConsumer) -> Result<Self, crate::Error> {
Self::with_catalog_format(broadcast, CatalogFormat::default())
}
pub fn with_catalog_format(
broadcast: moq_net::BroadcastConsumer,
catalog_format: CatalogFormat,
) -> Result<Self, crate::Error> {
let catalog = CatalogSource::new(&broadcast, catalog_format)?;
Ok(Self {
broadcast,
catalog: Some(catalog),
latency: Duration::ZERO,
fragment_duration: None,
tracks: HashMap::new(),
catalog_snapshot: None,
init_emitted: false,
})
}
pub fn with_latency(mut self, latency: Duration) -> Self {
self.latency = latency;
self
}
pub fn with_fragment_duration(mut self, duration: impl Into<Option<Duration>>) -> Self {
self.fragment_duration = duration.into();
self
}
pub async fn next(&mut self) -> anyhow::Result<Option<Bytes>> {
kio::wait(|waiter| self.poll_next(waiter)).await
}
pub fn poll_next(&mut self, waiter: &kio::Waiter) -> Poll<anyhow::Result<Option<Bytes>>> {
while let Some(catalog) = self.catalog.as_mut() {
match catalog.poll_next(waiter)? {
Poll::Ready(Some(snapshot)) => self.update_catalog(&snapshot.media())?,
Poll::Ready(None) => {
self.catalog = None;
break;
}
Poll::Pending => break,
}
}
let waiting_for_init = !self.init_emitted;
for track in self.tracks.values_mut() {
if track.pending.is_some() || track.finished {
continue;
}
loop {
match track.source.poll_read(waiter) {
Poll::Ready(Ok(Some(frame))) => {
if waiting_for_init && !track.source.header_ready() {
continue;
}
track.pending = Some(frame);
break;
}
Poll::Ready(Ok(None)) => {
track.finished = true;
break;
}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => break,
}
}
}
if !self.init_emitted {
if self.init_ready() {
let init = self.build_init()?;
self.init_emitted = true;
return Poll::Ready(Ok(Some(init)));
}
if self.catalog.is_none() && self.tracks.values().all(|t| t.finished) {
return Poll::Ready(Ok(None));
}
return Poll::Pending;
}
let chosen = self
.tracks
.iter()
.filter_map(|(name, t)| t.pending.as_ref().map(|f| (name.clone(), f.timestamp)))
.min_by_key(|(_, ts)| *ts)
.map(|(name, _)| name);
if let Some(name) = chosen {
let frag = self.fragment_duration;
let has_video_track = self.tracks.values().any(|t| t.is_video);
let track = self.tracks.get_mut(&name).unwrap();
let frame = track.pending.take().unwrap();
let flush_before = should_flush(track, &frame, frag, has_video_track);
if flush_before {
let frames = std::mem::take(&mut track.buffer);
let emit = encode_fragment(track, frames)?;
track.buffer.push(frame);
return Poll::Ready(Ok(Some(emit)));
}
track.buffer.push(frame);
return self.poll_next(waiter);
}
let flushable = self
.tracks
.iter()
.filter_map(|(name, t)| {
if t.finished && !t.buffer.is_empty() {
Some((name.clone(), t.buffer.first().unwrap().timestamp))
} else {
None
}
})
.min_by_key(|(_, ts)| *ts)
.map(|(name, _)| name);
if let Some(name) = flushable {
let track = self.tracks.get_mut(&name).unwrap();
let frames = std::mem::take(&mut track.buffer);
let emit = encode_fragment(track, frames)?;
return Poll::Ready(Ok(Some(emit)));
}
if self.catalog.is_none() && self.tracks.values().all(|t| t.finished && t.buffer.is_empty()) {
return Poll::Ready(Ok(None));
}
self.tracks
.retain(|_, t| !(t.finished && t.pending.is_none() && t.buffer.is_empty()));
Poll::Pending
}
fn update_catalog(&mut self, catalog: &Catalog) -> anyhow::Result<()> {
let mut active: HashMap<String, ()> = HashMap::new();
for name in catalog.video.renditions.keys() {
active.insert(name.clone(), ());
}
for name in catalog.audio.renditions.keys() {
active.insert(name.clone(), ());
}
let mut next_track_id = self.tracks.values().map(|t| t.track_id).max().unwrap_or(0) + 1;
for (name, config) in &catalog.video.renditions {
if self.tracks.contains_key(name) {
continue;
}
let source = ExportSource::for_video(&self.broadcast, name, config, self.latency)?;
let timescale = catalog_timescale_video(config);
self.tracks.insert(
name.clone(),
Fmp4Track {
source,
pending: None,
buffer: Vec::new(),
is_video: true,
finished: false,
track_id: next_track_id,
timescale,
sequence_number: 1,
},
);
next_track_id += 1;
}
for (name, config) in &catalog.audio.renditions {
if self.tracks.contains_key(name) {
continue;
}
let source = ExportSource::for_audio(&self.broadcast, name, config, self.latency)?;
let timescale = catalog_timescale_audio(config);
self.tracks.insert(
name.clone(),
Fmp4Track {
source,
pending: None,
buffer: Vec::new(),
is_video: false,
finished: false,
track_id: next_track_id,
timescale,
sequence_number: 1,
},
);
next_track_id += 1;
}
self.tracks.retain(|name, _| active.contains_key(name));
self.catalog_snapshot = Some(catalog.clone());
Ok(())
}
fn init_ready(&self) -> bool {
self.catalog_snapshot.is_some() && self.tracks.values().all(|t| t.source.header_ready())
}
fn build_init(&self) -> anyhow::Result<Bytes> {
let catalog = self.catalog_snapshot.as_ref().context("no catalog snapshot")?;
let mut traks: Vec<mp4_atom::Trak> = Vec::new();
let mut trexs: Vec<mp4_atom::Trex> = Vec::new();
let mut ftyp_data: Option<mp4_atom::Ftyp> = None;
for (name, config) in &catalog.video.renditions {
let track = self.tracks.get(name).context("video track not subscribed")?;
match &config.container {
Container::Cmaf { init, .. } => {
extract_init(init, &mut ftyp_data, &mut traks, &mut trexs)?;
}
Container::Legacy | Container::Loc => {
let description = track.source.description();
let trak = crate::container::fmp4::synthesize_video_trak(
track.track_id,
track.timescale,
config,
description.map(|d| d.as_ref()),
)?;
trexs.push(mp4_atom::Trex {
track_id: trak.tkhd.track_id,
default_sample_description_index: 1,
..Default::default()
});
traks.push(trak);
}
}
}
for (name, config) in &catalog.audio.renditions {
let track = self.tracks.get(name).context("audio track not subscribed")?;
match &config.container {
Container::Cmaf { init, .. } => {
extract_init(init, &mut ftyp_data, &mut traks, &mut trexs)?;
}
Container::Legacy | Container::Loc => {
let trak = crate::container::fmp4::synthesize_audio_trak(track.track_id, track.timescale, config)?;
trexs.push(mp4_atom::Trex {
track_id: trak.tkhd.track_id,
default_sample_description_index: 1,
..Default::default()
});
traks.push(trak);
}
}
}
let ftyp = ftyp_data.unwrap_or(mp4_atom::Ftyp {
major_brand: b"isom".into(),
minor_version: 0x200,
compatible_brands: vec![b"isom".into(), b"iso6".into(), b"mp41".into()],
});
let timescale = traks.first().map(|t| t.mdia.mdhd.timescale).unwrap_or(1000);
let moov = mp4_atom::Moov {
mvhd: mp4_atom::Mvhd {
timescale,
..Default::default()
},
trak: traks,
mvex: if trexs.is_empty() {
None
} else {
Some(mp4_atom::Mvex {
trex: trexs,
..Default::default()
})
},
..Default::default()
};
let mut buf = Vec::new();
ftyp.encode(&mut buf)?;
moov.encode(&mut buf)?;
Ok(Bytes::from(buf))
}
}
fn extract_init(
init: &Bytes,
ftyp_data: &mut Option<mp4_atom::Ftyp>,
traks: &mut Vec<mp4_atom::Trak>,
trexs: &mut Vec<mp4_atom::Trex>,
) -> anyhow::Result<()> {
let mut cursor = std::io::Cursor::new(init.as_ref());
while let Some(atom) = mp4_atom::Any::decode_maybe(&mut cursor)? {
match atom {
mp4_atom::Any::Ftyp(f) if ftyp_data.is_none() => {
*ftyp_data = Some(f);
}
mp4_atom::Any::Moov(moov) => {
for trak in moov.trak {
traks.push(trak);
}
if let Some(mvex) = moov.mvex {
for trex in mvex.trex {
trexs.push(trex);
}
}
}
_ => {}
}
}
Ok(())
}
fn should_flush(track: &Fmp4Track, frame: &Frame, fragment_duration: Option<Duration>, has_video_track: bool) -> bool {
if track.buffer.is_empty() {
return false;
}
if track.is_video && frame.keyframe {
return true;
}
match fragment_duration {
Some(d) if d.is_zero() => true,
Some(d) => {
let first = track.buffer.first().unwrap();
let delta_us = frame.timestamp.as_micros().saturating_sub(first.timestamp.as_micros());
delta_us >= d.as_micros()
}
None => !track.is_video && !has_video_track,
}
}
fn encode_fragment(track: &mut Fmp4Track, frames: Vec<Frame>) -> anyhow::Result<Bytes> {
anyhow::ensure!(!frames.is_empty(), "encode_fragment called with no frames");
let seq = track.sequence_number;
track.sequence_number += 1;
Ok(crate::container::fmp4::encode_fragment(
track.track_id,
track.timescale,
seq,
&frames,
)?)
}
fn catalog_timescale_video(config: &VideoConfig) -> u64 {
match &config.container {
Container::Cmaf { init, .. } => {
parse_timescale_from_init(init).unwrap_or_else(|_| crate::container::fmp4::default_video_timescale(config))
}
Container::Loc | Container::Legacy => crate::container::fmp4::default_video_timescale(config),
}
}
fn catalog_timescale_audio(config: &hang::catalog::AudioConfig) -> u64 {
match &config.container {
Container::Cmaf { init, .. } => parse_timescale_from_init(init).unwrap_or(config.sample_rate as u64),
Container::Loc | Container::Legacy => config.sample_rate as u64,
}
}
fn parse_timescale_from_init(init: &[u8]) -> anyhow::Result<u64> {
let mut cursor = std::io::Cursor::new(init);
while let Some(atom) = mp4_atom::Any::decode_maybe(&mut cursor)? {
if let mp4_atom::Any::Moov(moov) = atom {
let trak = moov.trak.first().context("no tracks in moov")?;
return Ok(trak.mdia.mdhd.timescale as u64);
}
}
anyhow::bail!("no moov in init data")
}