use std::task::{Poll, ready};
use std::time::Duration;
use bytes::Bytes;
use hang::catalog::{AudioConfig, VideoCodec, VideoConfig};
use crate::catalog::CatalogFormat;
use crate::catalog::hang::Container as HangContainer;
use crate::catalog::hang::{Catalog, CatalogExt};
use crate::codec::h264::Avc1;
use crate::codec::h265::Hvc1;
use crate::container::{Consumer, Frame};
pub(crate) enum CatalogSource<E: CatalogExt = ()> {
Hang(crate::catalog::hang::Consumer<E>),
Msf(crate::catalog::msf::Consumer),
}
impl<E: CatalogExt> CatalogSource<E> {
pub(crate) fn new(broadcast: &moq_net::BroadcastConsumer, format: CatalogFormat) -> Result<Self, crate::Error> {
Ok(match format {
CatalogFormat::Hang => {
let track = broadcast.subscribe_track(&hang::Catalog::default_track())?;
CatalogSource::Hang(crate::catalog::hang::Consumer::new(track))
}
CatalogFormat::Msf => {
let track = broadcast.subscribe_track(&moq_net::Track::new(moq_msf::DEFAULT_NAME))?;
CatalogSource::Msf(crate::catalog::msf::Consumer::new(track))
}
})
}
pub(crate) fn poll_next(&mut self, waiter: &kio::Waiter) -> Poll<anyhow::Result<Option<Catalog<E>>>> {
match self {
Self::Hang(c) => {
let catalog = ready!(c.poll_next(waiter))?;
Poll::Ready(Ok(catalog))
}
Self::Msf(c) => {
let catalog = ready!(c.poll_next(waiter))?;
Poll::Ready(Ok(catalog.map(|media| Catalog {
video: media.video,
audio: media.audio,
ext: E::default(),
})))
}
}
}
}
pub(crate) enum VideoTransform {
Avc1(Avc1),
Hvc1(Hvc1),
}
impl VideoTransform {
fn codec_private(&self) -> Option<&Bytes> {
match self {
VideoTransform::Avc1(t) => t.avcc(),
VideoTransform::Hvc1(t) => t.hvcc(),
}
}
fn transform(&mut self, payload: Bytes) -> anyhow::Result<Option<Bytes>> {
match self {
VideoTransform::Avc1(t) => t.transform(payload),
VideoTransform::Hvc1(t) => t.transform(payload),
}
}
}
pub(crate) struct ExportSource {
consumer: Consumer<HangContainer>,
transform: Option<VideoTransform>,
description: Option<Bytes>,
}
impl ExportSource {
pub fn for_video(
broadcast: &moq_net::BroadcastConsumer,
name: &str,
config: &VideoConfig,
latency: Duration,
) -> Result<Self, crate::Error> {
let media: HangContainer = (&config.container).try_into()?;
let track = broadcast.subscribe_track(&moq_net::Track::new(name.to_string()))?;
let consumer = Consumer::new(track, media).with_latency(latency);
let transform = build_video_transform(config);
let description = config.description.as_ref().filter(|b| !b.is_empty()).cloned();
Ok(Self {
consumer,
transform,
description,
})
}
pub fn for_audio(
broadcast: &moq_net::BroadcastConsumer,
name: &str,
config: &AudioConfig,
latency: Duration,
) -> Result<Self, crate::Error> {
let media: HangContainer = (&config.container).try_into()?;
let track = broadcast.subscribe_track(&moq_net::Track::new(name.to_string()))?;
let consumer = Consumer::new(track, media).with_latency(latency);
let description = config.description.as_ref().filter(|b| !b.is_empty()).cloned();
Ok(Self {
consumer,
transform: None,
description,
})
}
pub fn for_stream(
broadcast: &moq_net::BroadcastConsumer,
name: &str,
latency: Duration,
) -> Result<Self, crate::Error> {
let track = broadcast.subscribe_track(&moq_net::Track::new(name.to_string()))?;
let consumer = Consumer::new(track, HangContainer::Legacy).with_latency(latency);
Ok(Self {
consumer,
transform: None,
description: None,
})
}
pub fn description(&self) -> Option<&Bytes> {
self.description.as_ref()
}
pub fn header_ready(&self) -> bool {
self.transform.is_none() || self.description.is_some()
}
pub fn poll_read(&mut self, waiter: &kio::Waiter) -> Poll<anyhow::Result<Option<Frame>>> {
loop {
let frame = match self.consumer.poll_read(waiter) {
Poll::Ready(Ok(Some(f))) => f,
Poll::Ready(Ok(None)) => return Poll::Ready(Ok(None)),
Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
Poll::Pending => return Poll::Pending,
};
let Some(transform) = self.transform.as_mut() else {
return Poll::Ready(Ok(Some(frame)));
};
match transform.transform(frame.payload.clone())? {
None => {
self.refresh_description();
continue;
}
Some(payload) => {
self.refresh_description();
return Poll::Ready(Ok(Some(Frame { payload, ..frame })));
}
}
}
}
fn refresh_description(&mut self) {
if let Some(transform) = self.transform.as_ref()
&& let Some(d) = transform.codec_private()
&& self.description.as_ref() != Some(d)
{
self.description = Some(d.clone());
}
}
}
fn build_video_transform(config: &VideoConfig) -> Option<VideoTransform> {
let needs_transform = config.description.as_ref().map(|d| d.is_empty()).unwrap_or(true);
if !needs_transform {
return None;
}
match &config.codec {
VideoCodec::H264(_) => Some(VideoTransform::Avc1(Avc1::new())),
VideoCodec::H265(_) => Some(VideoTransform::Hvc1(Hvc1::new())),
_ => None,
}
}