use std::str::FromStr;
use std::task::Poll;
use anyhow::Context;
use base64::Engine;
use hang::catalog::{AudioCodec, AudioConfig, Container, VideoCodec, VideoConfig};
pub struct Consumer {
pub track: moq_net::TrackConsumer,
group: Option<moq_net::GroupConsumer>,
}
impl Consumer {
pub fn new(track: moq_net::TrackConsumer) -> Self {
Self { track, group: None }
}
pub fn poll_next(&mut self, waiter: &kio::Waiter) -> Poll<anyhow::Result<Option<hang::Catalog>>> {
let track_finished = loop {
match self.track.poll_next_group(waiter)? {
Poll::Ready(Some(group)) => self.group = Some(group),
Poll::Ready(None) => break true,
Poll::Pending => break false,
}
};
if let Some(group) = &mut self.group {
match group.poll_read_frame(waiter)? {
Poll::Ready(Some(frame)) => {
self.group = None;
let json = std::str::from_utf8(&frame).context("MSF catalog frame is not valid UTF-8")?;
let msf = moq_msf::Catalog::from_str(json).context("failed to parse MSF catalog frame")?;
let catalog = from_msf(&msf)?;
return Poll::Ready(Ok(Some(catalog)));
}
Poll::Ready(None) => self.group = None,
Poll::Pending => return Poll::Pending,
}
}
if track_finished {
Poll::Ready(Ok(None))
} else {
Poll::Pending
}
}
pub async fn next(&mut self) -> anyhow::Result<Option<hang::Catalog>> {
kio::wait(|waiter| self.poll_next(waiter)).await
}
}
impl From<moq_net::TrackConsumer> for Consumer {
fn from(inner: moq_net::TrackConsumer) -> Self {
Self::new(inner)
}
}
pub(crate) fn from_msf(msf: &moq_msf::Catalog) -> anyhow::Result<hang::Catalog> {
let mut catalog = hang::Catalog::default();
for track in &msf.tracks {
let Some(role) = track.role.as_ref() else {
tracing::warn!(track = %track.name, "skipping MSF track with no role");
continue;
};
match role {
moq_msf::Role::Video => match video_config_from_msf(track)? {
Some(config) => {
catalog.video.renditions.insert(track.name.clone(), config);
}
None => {
tracing::warn!(
track = %track.name,
packaging = %track.packaging,
"skipping MSF video track with unsupported packaging",
);
}
},
moq_msf::Role::Audio => match audio_config_from_msf(track)? {
Some(config) => {
catalog.audio.renditions.insert(track.name.clone(), config);
}
None => {
tracing::warn!(
track = %track.name,
packaging = %track.packaging,
"skipping MSF audio track with unsupported packaging",
);
}
},
other => {
tracing::warn!(track = %track.name, role = %other, "skipping MSF track with unsupported role");
}
}
}
Ok(catalog)
}
fn container_from_msf(track: &moq_msf::Track) -> anyhow::Result<Option<Container>> {
match &track.packaging {
moq_msf::Packaging::Loc | moq_msf::Packaging::Legacy => Ok(Some(Container::Legacy)),
moq_msf::Packaging::Cmaf => {
let init = decode_init_data(track)?
.with_context(|| format!("MSF CMAF track {:?} missing init_data", track.name))?;
#[allow(deprecated)]
Ok(Some(Container::Cmaf {
init,
timescale: None,
track_id: None,
}))
}
_ => Ok(None),
}
}
fn decode_init_data(track: &moq_msf::Track) -> anyhow::Result<Option<bytes::Bytes>> {
track
.init_data
.as_ref()
.map(|b64| {
base64::engine::general_purpose::STANDARD
.decode(b64)
.map(bytes::Bytes::from)
.with_context(|| format!("MSF track {:?} has malformed init_data", track.name))
})
.transpose()
}
fn legacy_description(track: &moq_msf::Track) -> anyhow::Result<Option<bytes::Bytes>> {
match track.packaging {
moq_msf::Packaging::Loc | moq_msf::Packaging::Legacy => decode_init_data(track),
_ => Ok(None),
}
}
fn video_config_from_msf(track: &moq_msf::Track) -> anyhow::Result<Option<VideoConfig>> {
let Some(container) = container_from_msf(track)? else {
return Ok(None);
};
let codec_str = track
.codec
.as_deref()
.with_context(|| format!("MSF video track {:?} missing codec", track.name))?;
let codec = VideoCodec::from_str(codec_str)
.with_context(|| format!("MSF video track {:?} has invalid codec {codec_str:?}", track.name))?;
let mut config = VideoConfig::new(codec);
config.description = legacy_description(track)?;
config.coded_width = track.width;
config.coded_height = track.height;
config.bitrate = track.bitrate;
config.framerate = track.framerate;
config.container = container;
config.jitter = track
.jitter
.filter(|v| v.is_finite() && *v >= 0.0)
.and_then(|v| moq_net::Time::from_millis(v as u64).ok());
Ok(Some(config))
}
fn audio_config_from_msf(track: &moq_msf::Track) -> anyhow::Result<Option<AudioConfig>> {
let Some(container) = container_from_msf(track)? else {
return Ok(None);
};
let codec_str = track
.codec
.as_deref()
.with_context(|| format!("MSF audio track {:?} missing codec", track.name))?;
let codec = AudioCodec::from_str(codec_str)
.with_context(|| format!("MSF audio track {:?} has invalid codec {codec_str:?}", track.name))?;
let channel_count_from_field = track.channel_config.as_deref().and_then(|s| s.parse::<u32>().ok());
let (sample_rate, channel_count) = match (track.samplerate, channel_count_from_field) {
(Some(sr), Some(cc)) => (sr, cc),
(sr_opt, cc_opt) => {
let derived = derive_audio_params(track, &codec)?;
(
sr_opt.unwrap_or(derived.sample_rate),
cc_opt.unwrap_or(derived.channel_count),
)
}
};
let mut config = AudioConfig::new(codec, sample_rate, channel_count);
config.bitrate = track.bitrate;
config.description = legacy_description(track)?;
config.container = container;
config.jitter = track
.jitter
.filter(|v| v.is_finite() && *v >= 0.0)
.and_then(|v| moq_net::Time::from_millis(v as u64).ok());
Ok(Some(config))
}
struct DerivedAudio {
sample_rate: u32,
channel_count: u32,
}
fn derive_audio_params(track: &moq_msf::Track, codec: &AudioCodec) -> anyhow::Result<DerivedAudio> {
let init = decode_init_data(track)?.with_context(|| {
format!(
"MSF audio track {:?} omits samplerate/channelConfig and has no init_data to derive from",
track.name
)
})?;
match track.packaging {
moq_msf::Packaging::Loc | moq_msf::Packaging::Legacy => derive_from_codec_config(track, codec, init),
moq_msf::Packaging::Cmaf => derive_from_cmaf_moov(track, init),
_ => anyhow::bail!(
"MSF audio track {:?} packaging {:?} is unsupported for parameter derivation",
track.name,
track.packaging
),
}
}
fn derive_from_codec_config(
track: &moq_msf::Track,
codec: &AudioCodec,
init: bytes::Bytes,
) -> anyhow::Result<DerivedAudio> {
use bytes::Buf;
let mut buf = init;
match codec {
AudioCodec::AAC(_) => {
let cfg = crate::codec::aac::Config::parse(&mut buf)
.with_context(|| format!("MSF audio track {:?} has malformed AudioSpecificConfig", track.name))?;
anyhow::ensure!(
!buf.has_remaining(),
"MSF audio track {:?} AudioSpecificConfig has trailing bytes",
track.name,
);
Ok(DerivedAudio {
sample_rate: cfg.sample_rate,
channel_count: cfg.channel_count,
})
}
AudioCodec::Opus => {
let cfg = crate::codec::opus::Config::parse(&mut buf)
.with_context(|| format!("MSF audio track {:?} has malformed OpusHead", track.name))?;
anyhow::ensure!(
!buf.has_remaining(),
"MSF audio track {:?} OpusHead has trailing bytes",
track.name,
);
Ok(DerivedAudio {
sample_rate: cfg.sample_rate,
channel_count: cfg.channel_count,
})
}
_ => anyhow::bail!(
"MSF audio track {:?} omits samplerate/channelConfig; codec {:?} has no init_data parser",
track.name,
codec,
),
}
}
fn derive_from_cmaf_moov(track: &moq_msf::Track, init: bytes::Bytes) -> anyhow::Result<DerivedAudio> {
use mp4_atom::{Any, DecodeMaybe};
let mut cursor = std::io::Cursor::new(init.as_ref());
let mut moov: Option<mp4_atom::Moov> = None;
while let Some(atom) = mp4_atom::Any::decode_maybe(&mut cursor)
.with_context(|| format!("MSF audio track {:?} init segment is malformed", track.name))?
{
if let Any::Moov(m) = atom {
moov = Some(m);
break;
}
}
let moov = moov.with_context(|| format!("MSF audio track {:?} init segment missing moov", track.name))?;
for trak in &moov.trak {
let stbl = &trak.mdia.minf.stbl;
for sample in &stbl.stsd.codecs {
match sample {
mp4_atom::Codec::Mp4a(mp4a) => {
return Ok(DerivedAudio {
sample_rate: mp4a.audio.sample_rate.integer() as u32,
channel_count: mp4a.audio.channel_count as u32,
});
}
mp4_atom::Codec::Opus(opus) => {
return Ok(DerivedAudio {
sample_rate: opus.audio.sample_rate.integer() as u32,
channel_count: opus.audio.channel_count as u32,
});
}
_ => {}
}
}
}
anyhow::bail!(
"MSF audio track {:?} CMAF init has no audio sample entry to derive samplerate/channelConfig from",
track.name,
)
}
#[cfg(test)]
mod test {
use super::*;
fn video_track(name: &str, packaging: moq_msf::Packaging, init_data: Option<&str>) -> moq_msf::Track {
let mut track = moq_msf::Track::new(name, packaging);
track.is_live = true;
track.role = Some(moq_msf::Role::Video);
track.codec = Some("avc1.640028".to_string());
track.width = Some(1920);
track.height = Some(1080);
track.framerate = Some(30.0);
track.bitrate = Some(5_000_000);
track.init_data = init_data.map(str::to_string);
track.render_group = Some(1);
track
}
fn audio_track(name: &str, packaging: moq_msf::Packaging) -> moq_msf::Track {
let mut track = moq_msf::Track::new(name, packaging);
track.is_live = true;
track.role = Some(moq_msf::Role::Audio);
track.codec = Some("opus".to_string());
track.samplerate = Some(48_000);
track.channel_config = Some("2".to_string());
track.bitrate = Some(128_000);
track.render_group = Some(1);
track
}
#[test]
fn cmaf_video_yields_cmaf_container() {
let init_b64 = "AAAYZ2Z0eXA=";
let expected_init = base64::engine::general_purpose::STANDARD.decode(init_b64).unwrap();
let msf = moq_msf::Catalog {
version: 1,
tracks: vec![video_track("video0", moq_msf::Packaging::Cmaf, Some(init_b64))],
};
let catalog = from_msf(&msf).expect("CMAF video should convert");
let video = catalog.video.renditions.get("video0").expect("video0 rendition");
match &video.container {
Container::Cmaf { init, .. } => assert_eq!(init.as_ref(), expected_init.as_slice()),
other => panic!("expected Cmaf container, got {other:?}"),
}
assert_eq!(video.coded_width, Some(1920));
assert_eq!(video.coded_height, Some(1080));
assert_eq!(video.framerate, Some(30.0));
assert_eq!(video.bitrate, Some(5_000_000));
}
#[test]
fn loc_audio_yields_legacy_container() {
let msf = moq_msf::Catalog {
version: 1,
tracks: vec![audio_track("audio0", moq_msf::Packaging::Loc)],
};
let catalog = from_msf(&msf).expect("LOC audio should convert");
let audio = catalog.audio.renditions.get("audio0").expect("audio0 rendition");
assert_eq!(audio.container, Container::Legacy);
assert_eq!(audio.codec, AudioCodec::Opus);
assert_eq!(audio.sample_rate, 48_000);
assert_eq!(audio.channel_count, 2);
assert_eq!(audio.bitrate, Some(128_000));
}
#[test]
fn legacy_init_data_round_trips_into_description() {
let description_bytes: &[u8] = &[0x01, 0x42, 0xc0, 0x1e, 0xff, 0xe1];
let init_b64 = base64::engine::general_purpose::STANDARD.encode(description_bytes);
let mut video = video_track("video0", moq_msf::Packaging::Legacy, Some(&init_b64));
video.codec = Some("avc1.42c01e".to_string());
let mut audio = audio_track("audio0", moq_msf::Packaging::Loc);
audio.init_data = Some(init_b64);
let msf = moq_msf::Catalog {
version: 1,
tracks: vec![video, audio],
};
let catalog = from_msf(&msf).expect("legacy tracks should convert");
let v = catalog.video.renditions.get("video0").expect("video0 rendition");
let a = catalog.audio.renditions.get("audio0").expect("audio0 rendition");
assert_eq!(v.description.as_deref(), Some(description_bytes));
assert_eq!(a.description.as_deref(), Some(description_bytes));
}
#[test]
fn cmaf_description_stays_none() {
let init_b64 = "AAAYZ2Z0eXA=";
let msf = moq_msf::Catalog {
version: 1,
tracks: vec![video_track("video0", moq_msf::Packaging::Cmaf, Some(init_b64))],
};
let catalog = from_msf(&msf).unwrap();
assert!(catalog.video.renditions["video0"].description.is_none());
}
#[test]
fn legacy_malformed_init_data_is_error() {
let mut track = video_track("video0", moq_msf::Packaging::Legacy, Some("!!!not-base64!!!"));
track.codec = Some("avc1.42c01e".to_string());
let msf = moq_msf::Catalog {
version: 1,
tracks: vec![track],
};
let err = from_msf(&msf).expect_err("malformed base64 should error");
assert!(
err.to_string().contains("malformed init_data"),
"unexpected error: {}",
err
);
}
#[test]
fn unknown_codec_yields_unknown_variant() {
let mut track = video_track("video0", moq_msf::Packaging::Legacy, None);
track.codec = Some("weirdcodec".to_string());
let msf = moq_msf::Catalog {
version: 1,
tracks: vec![track],
};
let catalog = from_msf(&msf).expect("unknown codec is not an error");
let video = catalog.video.renditions.get("video0").expect("video0 rendition");
assert_eq!(video.codec, VideoCodec::Unknown("weirdcodec".to_string()));
}
#[test]
fn cmaf_without_init_data_is_error() {
let msf = moq_msf::Catalog {
version: 1,
tracks: vec![video_track("video0", moq_msf::Packaging::Cmaf, None)],
};
let err = from_msf(&msf).expect_err("CMAF without init_data must error");
let msg = format!("{err:#}");
assert!(msg.contains("init_data"), "expected init_data in error, got: {msg}");
}
#[test]
fn empty_catalog_is_empty_hang_catalog() {
let msf = moq_msf::Catalog {
version: 1,
tracks: vec![],
};
let catalog = from_msf(&msf).expect("empty catalog should convert");
assert!(catalog.video.renditions.is_empty());
assert!(catalog.audio.renditions.is_empty());
}
#[test]
fn track_without_role_is_skipped() {
let mut track = video_track("video0", moq_msf::Packaging::Legacy, None);
track.role = None;
let msf = moq_msf::Catalog {
version: 1,
tracks: vec![track],
};
let catalog = from_msf(&msf).expect("no-role track should be skipped, not error");
assert!(catalog.video.renditions.is_empty());
assert!(catalog.audio.renditions.is_empty());
}
#[test]
fn unsupported_role_is_skipped() {
let mut track = audio_track("caption0", moq_msf::Packaging::Legacy);
track.role = Some(moq_msf::Role::Caption);
let msf = moq_msf::Catalog {
version: 1,
tracks: vec![track],
};
let catalog = from_msf(&msf).expect("unsupported role should be skipped, not error");
assert!(catalog.audio.renditions.is_empty());
assert!(catalog.video.renditions.is_empty());
}
#[test]
fn audio_missing_samplerate_and_channels_without_init_data_errors() {
let mut track = audio_track("audio0", moq_msf::Packaging::Legacy);
track.samplerate = None;
track.channel_config = None;
track.init_data = None;
let msf = moq_msf::Catalog {
version: 1,
tracks: vec![track],
};
let err = from_msf(&msf).expect_err("missing fields with no init_data should error");
assert!(err.to_string().contains("no init_data"), "unexpected error: {}", err);
}
#[test]
fn audio_missing_samplerate_and_channels_derived_from_opus_head() {
let mut head = Vec::with_capacity(19);
head.extend_from_slice(b"OpusHead");
head.push(1); head.push(6); head.extend_from_slice(&0u16.to_le_bytes()); head.extend_from_slice(&24_000u32.to_le_bytes()); head.extend_from_slice(&[0, 0, 0]); let init_b64 = base64::engine::general_purpose::STANDARD.encode(&head);
let mut track = audio_track("audio0", moq_msf::Packaging::Loc);
track.codec = Some("opus".to_string());
track.samplerate = None;
track.channel_config = None;
track.init_data = Some(init_b64);
let msf = moq_msf::Catalog {
version: 1,
tracks: vec![track],
};
let catalog = from_msf(&msf).expect("Opus OpusHead should parse");
let audio = catalog.audio.renditions.get("audio0").expect("audio0 rendition");
assert_eq!(audio.sample_rate, 24_000);
assert_eq!(audio.channel_count, 6);
}
#[test]
fn audio_missing_samplerate_and_channels_derived_from_aac_config() {
let asc = [0x11u8, 0x90];
let init_b64 = base64::engine::general_purpose::STANDARD.encode(asc);
let mut track = audio_track("audio0", moq_msf::Packaging::Legacy);
track.codec = Some("mp4a.40.2".to_string());
track.samplerate = None;
track.channel_config = None;
track.init_data = Some(init_b64);
let msf = moq_msf::Catalog {
version: 1,
tracks: vec![track],
};
let catalog = from_msf(&msf).expect("AAC AudioSpecificConfig should parse");
let audio = catalog.audio.renditions.get("audio0").expect("audio0 rendition");
assert_eq!(audio.sample_rate, 48_000);
assert_eq!(audio.channel_count, 2);
}
#[test]
fn audio_only_channels_missing_uses_explicit_samplerate() {
let mut head = Vec::with_capacity(19);
head.extend_from_slice(b"OpusHead");
head.push(1);
head.push(2); head.extend_from_slice(&0u16.to_le_bytes());
head.extend_from_slice(&48_000u32.to_le_bytes()); head.extend_from_slice(&[0, 0, 0]);
let init_b64 = base64::engine::general_purpose::STANDARD.encode(&head);
let mut track = audio_track("audio0", moq_msf::Packaging::Loc);
track.codec = Some("opus".to_string());
track.samplerate = Some(24_000); track.channel_config = None; track.init_data = Some(init_b64);
let msf = moq_msf::Catalog {
version: 1,
tracks: vec![track],
};
let catalog = from_msf(&msf).expect("partial derivation should succeed");
let audio = catalog.audio.renditions.get("audio0").expect("audio0 rendition");
assert_eq!(audio.sample_rate, 24_000);
assert_eq!(audio.channel_count, 2);
}
#[test]
fn unsupported_packaging_video_is_skipped() {
let bad = video_track("timeline0", moq_msf::Packaging::MediaTimeline, None);
let good = video_track("video0", moq_msf::Packaging::Legacy, None);
let msf = moq_msf::Catalog {
version: 1,
tracks: vec![bad, good],
};
let catalog = from_msf(&msf).expect("unsupported packaging should be skipped, not error");
assert!(
!catalog.video.renditions.contains_key("timeline0"),
"timeline track must be skipped"
);
assert!(
catalog.video.renditions.contains_key("video0"),
"sibling track must still be parsed"
);
}
#[test]
fn unsupported_packaging_audio_is_skipped() {
let mut bad = audio_track("event0", moq_msf::Packaging::EventTimeline);
bad.codec = None;
let good = audio_track("audio0", moq_msf::Packaging::Loc);
let msf = moq_msf::Catalog {
version: 1,
tracks: vec![bad, good],
};
let catalog = from_msf(&msf).expect("unsupported packaging should be skipped, not error");
assert!(!catalog.audio.renditions.contains_key("event0"));
assert!(catalog.audio.renditions.contains_key("audio0"));
}
#[test]
fn unknown_packaging_variant_is_skipped() {
let track = video_track("video0", moq_msf::Packaging::Unknown("custom".to_string()), None);
let msf = moq_msf::Catalog {
version: 1,
tracks: vec![track],
};
let catalog = from_msf(&msf).expect("unknown packaging should be skipped, not error");
assert!(catalog.video.renditions.is_empty());
}
#[test]
fn missing_video_codec_is_error() {
let mut track = video_track("video0", moq_msf::Packaging::Legacy, None);
track.codec = None;
let msf = moq_msf::Catalog {
version: 1,
tracks: vec![track],
};
let err = from_msf(&msf).expect_err("missing video codec must error");
let msg = format!("{err:#}");
assert!(
msg.contains("missing codec"),
"expected 'missing codec' in error, got: {msg}"
);
}
#[test]
fn missing_audio_codec_is_error() {
let mut track = audio_track("audio0", moq_msf::Packaging::Legacy);
track.codec = None;
let msf = moq_msf::Catalog {
version: 1,
tracks: vec![track],
};
let err = from_msf(&msf).expect_err("missing audio codec must error");
let msg = format!("{err:#}");
assert!(
msg.contains("missing codec"),
"expected 'missing codec' in error, got: {msg}"
);
}
#[test]
fn invalid_video_codec_includes_codec_in_error() {
let mut track = video_track("video0", moq_msf::Packaging::Legacy, None);
track.codec = Some("avc1.0".to_string());
let msf = moq_msf::Catalog {
version: 1,
tracks: vec![track],
};
let err = from_msf(&msf).expect_err("malformed avc1 codec must error");
let msg = format!("{err:#}");
assert!(msg.contains("avc1.0"), "expected codec string in error, got: {msg}");
}
}