use std::task::Poll;
use std::time::Duration;
use anyhow::Context;
use bytes::{BufMut, Bytes, BytesMut};
use hang::catalog::{AudioCodec, Catalog, Container, VideoCodec};
use super::{
AAC_AUDIO_TAG_HEADER, AAC_RAW, AAC_SEQUENCE_HEADER, AVC_NALU, AVC_SEQUENCE_HEADER, FRAME_TYPE_INTER,
FRAME_TYPE_KEY, TAG_AUDIO, TAG_HEADER_LEN, TAG_VIDEO, VIDEO_CODEC_AVC,
};
use crate::catalog::CatalogFormat;
use crate::container::{CatalogSource, ExportSource, Frame};
pub struct Export {
broadcast: moq_net::BroadcastConsumer,
catalog: Option<CatalogSource>,
latency: Duration,
video: Option<FlvTrack>,
audio: Option<FlvTrack>,
header_emitted: bool,
}
struct FlvTrack {
name: String,
source: ExportSource,
pending: Option<Frame>,
finished: bool,
}
impl Export {
pub fn new(broadcast: moq_net::BroadcastConsumer) -> Result<Self, crate::Error> {
Self::with_catalog_format(broadcast, CatalogFormat::default())
}
pub fn with_catalog_format(
broadcast: moq_net::BroadcastConsumer,
catalog_format: CatalogFormat,
) -> Result<Self, crate::Error> {
let catalog = CatalogSource::new(&broadcast, catalog_format)?;
Ok(Self {
broadcast,
catalog: Some(catalog),
latency: Duration::ZERO,
video: None,
audio: None,
header_emitted: false,
})
}
pub fn with_latency(mut self, latency: Duration) -> Self {
self.latency = latency;
self
}
pub async fn next(&mut self) -> anyhow::Result<Option<Bytes>> {
kio::wait(|waiter| self.poll_next(waiter)).await
}
pub fn poll_next(&mut self, waiter: &kio::Waiter) -> Poll<anyhow::Result<Option<Bytes>>> {
while let Some(catalog) = self.catalog.as_mut() {
match catalog.poll_next(waiter)? {
Poll::Ready(Some(snapshot)) => self.update_catalog(snapshot.media())?,
Poll::Ready(None) => {
self.catalog = None;
break;
}
Poll::Pending => break,
}
}
let waiting_for_header = !self.header_emitted;
for track in [self.video.as_mut(), self.audio.as_mut()].into_iter().flatten() {
if track.pending.is_some() || track.finished {
continue;
}
loop {
match track.source.poll_read(waiter) {
Poll::Ready(Ok(Some(frame))) => {
if waiting_for_header && !track.source.header_ready() {
continue;
}
track.pending = Some(frame);
break;
}
Poll::Ready(Ok(None)) => {
track.finished = true;
break;
}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => break,
}
}
}
if !self.header_emitted {
if self.header_ready() {
let header = self.build_header()?;
self.header_emitted = true;
return Poll::Ready(Ok(Some(header)));
}
if self.catalog.is_none() && (!self.has_tracks() || self.tracks().all(|t| t.finished)) {
return Poll::Ready(Ok(None));
}
return Poll::Pending;
}
if let Some(is_video) = self.pick_next_track() {
let track = if is_video {
self.video.as_mut()
} else {
self.audio.as_mut()
};
let track = track.unwrap();
let frame = track.pending.take().unwrap();
let chunk = self.encode_frame(is_video, frame)?;
return Poll::Ready(Ok(Some(chunk)));
}
if self.has_tracks() && self.tracks().all(|t| t.finished && t.pending.is_none()) {
if self.catalog.is_none() {
return Poll::Ready(Ok(None));
}
} else if self.catalog.is_none() && !self.has_tracks() {
return Poll::Ready(Ok(None));
}
Poll::Pending
}
fn tracks(&self) -> impl Iterator<Item = &FlvTrack> {
[self.video.as_ref(), self.audio.as_ref()].into_iter().flatten()
}
fn has_tracks(&self) -> bool {
self.video.is_some() || self.audio.is_some()
}
fn update_catalog(&mut self, catalog: Catalog) -> anyhow::Result<()> {
if self.video.is_none()
&& let Some((name, config)) = catalog.video.renditions.iter().next()
{
ensure_supported_video(config)?;
ensure_legacy(&config.container, "video", name)?;
let source = ExportSource::for_video(&self.broadcast, name, config, self.latency)?;
self.video = Some(FlvTrack {
name: name.clone(),
source,
pending: None,
finished: false,
});
}
if catalog.video.renditions.len() > 1 {
tracing::warn!("FLV export only supports one video track; ignoring the rest");
}
if self.audio.is_none()
&& let Some((name, config)) = catalog.audio.renditions.iter().next()
{
ensure_supported_audio(config)?;
ensure_legacy(&config.container, "audio", name)?;
let source = ExportSource::for_audio(&self.broadcast, name, config, self.latency)?;
self.audio = Some(FlvTrack {
name: name.clone(),
source,
pending: None,
finished: false,
});
}
if catalog.audio.renditions.len() > 1 {
tracing::warn!("FLV export only supports one audio track; ignoring the rest");
}
if let Some(track) = &self.video {
anyhow::ensure!(
catalog.video.renditions.contains_key(&track.name),
"FLV video track '{}' removed mid-stream",
track.name
);
}
if let Some(track) = &self.audio {
anyhow::ensure!(
catalog.audio.renditions.contains_key(&track.name),
"FLV audio track '{}' removed mid-stream",
track.name
);
}
Ok(())
}
fn header_ready(&self) -> bool {
self.has_tracks() && self.tracks().all(|t| t.source.header_ready())
}
fn build_header(&self) -> anyhow::Result<Bytes> {
let mut out = BytesMut::new();
out.put_slice(b"FLV");
out.put_u8(1);
let mut flags = 0u8;
if self.video.is_some() {
flags |= 0x01;
}
if self.audio.is_some() {
flags |= 0x04;
}
out.put_u8(flags);
out.put_u32(9);
out.put_u32(0);
if let Some(track) = &self.video {
let avcc = track.source.description().context("H.264 track missing avcC")?;
let mut body = BytesMut::with_capacity(5 + avcc.len());
body.put_u8((FRAME_TYPE_KEY << 4) | VIDEO_CODEC_AVC);
body.put_u8(AVC_SEQUENCE_HEADER);
body.put_slice(&[0, 0, 0]); body.put_slice(avcc);
write_tag(&mut out, TAG_VIDEO, 0, &body)?;
}
if let Some(track) = &self.audio {
let asc = track
.source
.description()
.context("AAC track missing AudioSpecificConfig")?;
let mut body = BytesMut::with_capacity(2 + asc.len());
body.put_u8(AAC_AUDIO_TAG_HEADER);
body.put_u8(AAC_SEQUENCE_HEADER);
body.put_slice(asc);
write_tag(&mut out, TAG_AUDIO, 0, &body)?;
}
Ok(out.freeze())
}
fn pick_next_track(&self) -> Option<bool> {
let video = self
.video
.as_ref()
.and_then(|t| t.pending.as_ref())
.map(|f| f.timestamp);
let audio = self
.audio
.as_ref()
.and_then(|t| t.pending.as_ref())
.map(|f| f.timestamp);
match (video, audio) {
(Some(v), Some(a)) => Some(v <= a),
(Some(_), None) => Some(true),
(None, Some(_)) => Some(false),
(None, None) => None,
}
}
fn encode_frame(&self, is_video: bool, frame: Frame) -> anyhow::Result<Bytes> {
let timestamp_ms: u32 = (frame.timestamp.as_millis())
.try_into()
.context("FLV timestamp exceeds 32 bits")?;
let mut out = BytesMut::with_capacity(TAG_HEADER_LEN + frame.payload.len() + 8);
if is_video {
let mut body = BytesMut::with_capacity(5 + frame.payload.len());
let frame_type = if frame.keyframe {
FRAME_TYPE_KEY
} else {
FRAME_TYPE_INTER
};
body.put_u8((frame_type << 4) | VIDEO_CODEC_AVC);
body.put_u8(AVC_NALU);
body.put_slice(&[0, 0, 0]);
body.put_slice(&frame.payload);
write_tag(&mut out, TAG_VIDEO, timestamp_ms, &body)?;
} else {
let mut body = BytesMut::with_capacity(2 + frame.payload.len());
body.put_u8(AAC_AUDIO_TAG_HEADER);
body.put_u8(AAC_RAW);
body.put_slice(&frame.payload);
write_tag(&mut out, TAG_AUDIO, timestamp_ms, &body)?;
}
Ok(out.freeze())
}
}
fn write_tag(out: &mut BytesMut, tag_type: u8, timestamp_ms: u32, body: &[u8]) -> anyhow::Result<()> {
let size: u32 = body
.len()
.try_into()
.ok()
.filter(|n| *n <= 0x00FF_FFFF)
.context("FLV tag body exceeds the 24-bit DataSize limit")?;
out.put_u8(tag_type);
out.put_slice(&size.to_be_bytes()[1..]); out.put_slice(×tamp_ms.to_be_bytes()[1..]); out.put_u8((timestamp_ms >> 24) as u8); out.put_slice(&[0, 0, 0]); out.put_slice(body);
out.put_u32(TAG_HEADER_LEN as u32 + size);
Ok(())
}
fn ensure_legacy(container: &Container, kind: &str, name: &str) -> anyhow::Result<()> {
match container {
Container::Legacy | Container::Loc => Ok(()),
Container::Cmaf { .. } => anyhow::bail!("FLV export does not support CMAF {kind} track '{name}'"),
}
}
fn ensure_supported_video(config: &hang::catalog::VideoConfig) -> anyhow::Result<()> {
match &config.codec {
VideoCodec::H264(_) => Ok(()),
other => anyhow::bail!("FLV export only supports H.264 video, got {other:?}"),
}
}
fn ensure_supported_audio(config: &hang::catalog::AudioConfig) -> anyhow::Result<()> {
match &config.codec {
AudioCodec::AAC(_) => Ok(()),
other => anyhow::bail!("FLV export only supports AAC audio, got {other:?}"),
}
}