use anyhow::Context;
use bytes::{Buf, Bytes, BytesMut};
use hang::catalog::{AAC, AV1, AudioCodec, AudioConfig, Container, H264, H265, VP9, VideoCodec, VideoConfig};
use hang::container::Timestamp;
use mp4_atom::{Any, Atom, DecodeMaybe, Mdat, Moof, Moov, Trak};
use std::collections::HashMap;
use tokio::io::{AsyncRead, AsyncReadExt};
#[derive(Clone, Default)]
pub struct Fmp4Config {
pub passthrough: bool,
}
pub struct Fmp4 {
broadcast: moq_lite::BroadcastProducer,
catalog: crate::CatalogProducer,
tracks: HashMap<u32, Fmp4Track>,
moov: Option<Moov>,
moof: Option<Moof>,
moof_size: usize,
config: Fmp4Config,
moof_raw: Option<Bytes>,
}
#[derive(PartialEq, Debug)]
enum TrackKind {
Video,
Audio,
}
const MAX_AUDIO_GROUP_DURATION: Timestamp = Timestamp::from_millis_unchecked(100);
enum Fmp4Producer {
Manual {
track: moq_lite::TrackProducer,
group: Option<moq_lite::GroupProducer>,
},
Ordered(hang::container::OrderedProducer),
}
impl Fmp4Producer {
fn info(&self) -> &moq_lite::Track {
match self {
Self::Manual { track, .. } => &track.info,
Self::Ordered(ordered) => &ordered.track.info,
}
}
}
struct Fmp4Track {
kind: TrackKind,
producer: Fmp4Producer,
jitter: Option<Timestamp>,
last_timestamp: Option<Timestamp>,
min_duration: Option<Timestamp>,
}
impl Fmp4 {
pub fn new(broadcast: moq_lite::BroadcastProducer, catalog: crate::CatalogProducer, config: Fmp4Config) -> Self {
Self {
catalog,
tracks: HashMap::default(),
moov: None,
moof: None,
moof_size: 0,
broadcast,
config,
moof_raw: None,
}
}
pub async fn decode_from<T: AsyncRead + Unpin>(&mut self, reader: &mut T) -> anyhow::Result<()> {
let mut buffer = BytesMut::new();
while reader.read_buf(&mut buffer).await? > 0 {
self.decode(&mut buffer)?;
}
Ok(())
}
pub fn decode<T: Buf + AsRef<[u8]>>(&mut self, buf: &mut T) -> anyhow::Result<()> {
let mut cursor = std::io::Cursor::new(buf);
let mut position = 0;
while let Some(atom) = mp4_atom::Any::decode_maybe(&mut cursor)? {
let size = cursor.position() as usize - position;
let raw = &cursor.get_ref().as_ref()[position..position + size];
match atom {
Any::Ftyp(_) | Any::Styp(_) => {}
Any::Moov(moov) => {
self.init(moov)?;
}
Any::Moof(moof) => {
anyhow::ensure!(self.moof.is_none(), "duplicate moof box");
self.moof.replace(moof);
self.moof_size = size;
if self.config.passthrough {
self.moof_raw.replace(Bytes::copy_from_slice(raw));
}
}
Any::Mdat(mdat) => {
self.extract(mdat, raw)?;
}
_ => {
}
}
position = cursor.position() as usize;
}
cursor.into_inner().advance(position);
Ok(())
}
pub fn is_initialized(&self) -> bool {
self.moov.is_some()
}
fn init(&mut self, moov: Moov) -> anyhow::Result<()> {
let mut catalog = self.catalog.clone();
let mut catalog = catalog.lock();
for trak in &moov.trak {
let track_id = trak.tkhd.track_id;
let handler = &trak.mdia.hdlr.handler;
let ext = if self.config.passthrough { "m4s" } else { "hang" };
let track = self.broadcast.unique_track(&format!(".{ext}"))?;
let kind = match handler.as_ref() {
b"vide" => {
let config = self.init_video(trak)?;
catalog.video.insert(&track.info.name, config)?;
TrackKind::Video
}
b"soun" => {
let config = self.init_audio(trak)?;
catalog.audio.insert(&track.info.name, config)?;
TrackKind::Audio
}
b"sbtl" => anyhow::bail!("subtitle tracks are not supported"),
handler => anyhow::bail!("unknown track type: {:?}", handler),
};
let producer = if kind == TrackKind::Audio && !self.config.passthrough {
Fmp4Producer::Ordered(
hang::container::OrderedProducer::new(track).with_max_group_duration(MAX_AUDIO_GROUP_DURATION),
)
} else {
Fmp4Producer::Manual { track, group: None }
};
self.tracks.insert(
track_id,
Fmp4Track {
kind,
producer,
jitter: None,
last_timestamp: None,
min_duration: None,
},
);
}
drop(catalog);
self.moov = Some(moov);
Ok(())
}
fn container(&self, trak: &Trak) -> Container {
if self.config.passthrough {
Container::Cmaf {
timescale: trak.mdia.mdhd.timescale as u64,
track_id: trak.tkhd.track_id,
}
} else {
Container::Legacy
}
}
fn init_video(&mut self, trak: &Trak) -> anyhow::Result<VideoConfig> {
let container = self.container(trak);
let stsd = &trak.mdia.minf.stbl.stsd;
let codec = match stsd.codecs.len() {
0 => anyhow::bail!("missing codec"),
1 => &stsd.codecs[0],
_ => anyhow::bail!("multiple codecs"),
};
let config = match codec {
mp4_atom::Codec::Avc1(avc1) => {
let avcc = &avc1.avcc;
let mut description = BytesMut::new();
avcc.encode_body(&mut description)?;
VideoConfig {
coded_width: Some(avc1.visual.width as _),
coded_height: Some(avc1.visual.height as _),
codec: H264 {
profile: avcc.avc_profile_indication,
constraints: avcc.profile_compatibility,
level: avcc.avc_level_indication,
inline: false,
}
.into(),
description: Some(description.freeze()),
framerate: None,
bitrate: None,
display_ratio_width: None,
display_ratio_height: None,
optimize_for_latency: None,
container,
jitter: None,
}
}
mp4_atom::Codec::Hev1(hev1) => self.init_h265(true, &hev1.hvcc, &hev1.visual, container)?,
mp4_atom::Codec::Hvc1(hvc1) => self.init_h265(false, &hvc1.hvcc, &hvc1.visual, container)?,
mp4_atom::Codec::Vp08(vp08) => VideoConfig {
codec: VideoCodec::VP8,
description: Default::default(),
coded_width: Some(vp08.visual.width as _),
coded_height: Some(vp08.visual.height as _),
framerate: None,
bitrate: None,
display_ratio_width: None,
display_ratio_height: None,
optimize_for_latency: None,
container,
jitter: None,
},
mp4_atom::Codec::Vp09(vp09) => {
let vpcc = &vp09.vpcc;
VideoConfig {
codec: VP9 {
profile: vpcc.profile,
level: vpcc.level,
bit_depth: vpcc.bit_depth,
color_primaries: vpcc.color_primaries,
chroma_subsampling: vpcc.chroma_subsampling,
transfer_characteristics: vpcc.transfer_characteristics,
matrix_coefficients: vpcc.matrix_coefficients,
full_range: vpcc.video_full_range_flag,
}
.into(),
description: Default::default(),
coded_width: Some(vp09.visual.width as _),
coded_height: Some(vp09.visual.height as _),
display_ratio_width: None,
display_ratio_height: None,
optimize_for_latency: None,
bitrate: None,
framerate: None,
container,
jitter: None,
}
}
mp4_atom::Codec::Av01(av01) => {
let av1c = &av01.av1c;
VideoConfig {
codec: AV1 {
profile: av1c.seq_profile,
level: av1c.seq_level_idx_0,
bitdepth: match (av1c.seq_tier_0, av1c.high_bitdepth) {
(true, true) => 12,
(true, false) => 10,
(false, true) => 10,
(false, false) => 8,
},
mono_chrome: av1c.monochrome,
chroma_subsampling_x: av1c.chroma_subsampling_x,
chroma_subsampling_y: av1c.chroma_subsampling_y,
chroma_sample_position: av1c.chroma_sample_position,
..Default::default()
}
.into(),
description: Default::default(),
coded_width: Some(av01.visual.width as _),
coded_height: Some(av01.visual.height as _),
display_ratio_width: None,
display_ratio_height: None,
optimize_for_latency: None,
bitrate: None,
framerate: None,
container,
jitter: None,
}
}
mp4_atom::Codec::Unknown(unknown) => anyhow::bail!("unknown codec: {:?}", unknown),
unsupported => anyhow::bail!("unsupported codec: {:?}", unsupported),
};
Ok(config)
}
fn init_h265(
&mut self,
in_band: bool,
hvcc: &mp4_atom::Hvcc,
visual: &mp4_atom::Visual,
container: Container,
) -> anyhow::Result<VideoConfig> {
let mut description = BytesMut::new();
hvcc.encode_body(&mut description)?;
Ok(VideoConfig {
codec: H265 {
in_band,
profile_space: hvcc.general_profile_space,
profile_idc: hvcc.general_profile_idc,
profile_compatibility_flags: hvcc.general_profile_compatibility_flags,
tier_flag: hvcc.general_tier_flag,
level_idc: hvcc.general_level_idc,
constraint_flags: hvcc.general_constraint_indicator_flags,
}
.into(),
description: Some(description.freeze()),
coded_width: Some(visual.width as _),
coded_height: Some(visual.height as _),
bitrate: None,
framerate: None,
display_ratio_width: None,
display_ratio_height: None,
optimize_for_latency: None,
container,
jitter: None,
})
}
fn init_audio(&mut self, trak: &Trak) -> anyhow::Result<AudioConfig> {
let container = self.container(trak);
let stsd = &trak.mdia.minf.stbl.stsd;
let codec = match stsd.codecs.len() {
0 => anyhow::bail!("missing codec"),
1 => &stsd.codecs[0],
_ => anyhow::bail!("multiple codecs"),
};
let config = match codec {
mp4_atom::Codec::Mp4a(mp4a) => {
let desc = &mp4a.esds.es_desc.dec_config;
if desc.object_type_indication != 0x40 {
anyhow::bail!("unsupported codec: MPEG2");
}
let bitrate = desc.avg_bitrate.max(desc.max_bitrate);
let profile = desc.dec_specific.profile;
let sample_rate = mp4a.audio.sample_rate.integer() as u32;
let channel_count = mp4a.audio.channel_count as u32;
let description = build_aac_audio_specific_config(profile, sample_rate, channel_count);
AudioConfig {
codec: AAC { profile }.into(),
sample_rate,
channel_count,
bitrate: Some(bitrate.into()),
description: Some(description),
container,
jitter: None,
}
}
mp4_atom::Codec::Opus(opus) => {
AudioConfig {
codec: AudioCodec::Opus,
sample_rate: opus.audio.sample_rate.integer() as _,
channel_count: opus.audio.channel_count as _,
bitrate: None,
description: None, container,
jitter: None,
}
}
mp4_atom::Codec::Unknown(unknown) => anyhow::bail!("unknown codec: {:?}", unknown),
unsupported => anyhow::bail!("unsupported codec: {:?}", unsupported),
};
Ok(config)
}
fn extract(&mut self, mdat: Mdat, mdat_raw: &[u8]) -> anyhow::Result<()> {
let moov = self.moov.as_ref().context("missing moov box")?;
let moof = self.moof.take().context("missing moof box")?;
let moof_size = self.moof_size;
let header_size = mdat_raw.len() - mdat.data.len();
for traf in &moof.traf {
let track_id = traf.tfhd.track_id;
let track = self.tracks.get_mut(&track_id).context("unknown track")?;
let trak = moov
.trak
.iter()
.find(|trak| trak.tkhd.track_id == track_id)
.context("unknown track")?;
let trex = moov
.mvex
.as_ref()
.and_then(|mvex| mvex.trex.iter().find(|trex| trex.track_id == track_id));
let default_sample_duration = trex.map(|trex| trex.default_sample_duration).unwrap_or_default();
let default_sample_size = trex.map(|trex| trex.default_sample_size).unwrap_or_default();
let default_sample_flags = trex.map(|trex| trex.default_sample_flags).unwrap_or_default();
let tfdt = traf.tfdt.as_ref().context("missing tfdt box")?;
let mut dts = tfdt.base_media_decode_time;
let timescale = trak.mdia.mdhd.timescale as u64;
let mut offset = traf.tfhd.base_data_offset.unwrap_or_default() as usize;
if traf.trun.is_empty() {
anyhow::bail!("missing trun box");
}
let mut min_timestamp = None;
let mut max_timestamp = None;
let mut contains_keyframe = false;
for trun in &traf.trun {
let tfhd = &traf.tfhd;
if let Some(data_offset) = trun.data_offset {
let base_offset = tfhd.base_data_offset.unwrap_or_default() as usize;
let data_offset: usize = data_offset.try_into().context("invalid data offset")?;
let relative_offset = data_offset
.checked_sub(moof_size)
.and_then(|v| v.checked_sub(header_size))
.context("invalid data offset: underflow")?;
offset = base_offset
.checked_add(relative_offset)
.context("invalid data offset: overflow")?;
}
for entry in &trun.entries {
let flags = entry
.flags
.unwrap_or(tfhd.default_sample_flags.unwrap_or(default_sample_flags));
let duration = entry
.duration
.unwrap_or(tfhd.default_sample_duration.unwrap_or(default_sample_duration));
let size = entry
.size
.unwrap_or(tfhd.default_sample_size.unwrap_or(default_sample_size)) as usize;
let pts = (dts as i64 + entry.cts.unwrap_or_default() as i64) as u64;
let timestamp = hang::container::Timestamp::from_scale(pts, timescale)?;
if offset + size > mdat.data.len() {
anyhow::bail!("invalid data offset");
}
let keyframe = match track.kind {
TrackKind::Video => {
let keyframe = (flags >> 24) & 0x3 == 0x2; let non_sync = (flags >> 16) & 0x1 == 0x1;
keyframe && !non_sync
}
TrackKind::Audio => {
true
}
};
contains_keyframe |= keyframe;
if !self.config.passthrough {
let payload = Bytes::copy_from_slice(&mdat.data[offset..(offset + size)]);
let frame = hang::container::Frame {
timestamp,
payload: payload.into(),
};
match &mut track.producer {
Fmp4Producer::Manual { track: raw, group } => {
let mut g = if keyframe {
if let Some(mut prev) = group.take() {
prev.finish()?;
}
raw.append_group()?
} else {
group.take().context("no keyframe at start")?
};
frame.encode(&mut g)?;
*group = Some(g);
}
Fmp4Producer::Ordered(ordered) => {
ordered.write(frame)?;
}
}
}
if timestamp >= max_timestamp.unwrap_or(Timestamp::ZERO) {
max_timestamp = Some(timestamp);
}
if timestamp <= min_timestamp.unwrap_or(Timestamp::MAX) {
min_timestamp = Some(timestamp);
}
if let Some(last_timestamp) = track.last_timestamp
&& let Ok(duration) = timestamp.checked_sub(last_timestamp)
&& duration < track.min_duration.unwrap_or(Timestamp::MAX)
{
track.min_duration = Some(duration);
}
track.last_timestamp = Some(timestamp);
dts += duration as u64;
offset += size;
}
}
if self.config.passthrough {
let Fmp4Producer::Manual { track: raw, group } = &mut track.producer else {
unreachable!("passthrough always uses Manual");
};
let mut g = if contains_keyframe {
if let Some(mut prev) = group.take() {
prev.finish()?;
}
raw.append_group()?
} else {
group.take().context("no keyframe at start")?
};
let moof_raw = self.moof_raw.as_ref().context("missing moof box")?;
let mut frame = g.create_frame(moq_lite::Frame {
size: moof_raw.len() as u64 + mdat_raw.len() as u64,
})?;
frame.write(moof_raw.clone())?;
frame.write(Bytes::copy_from_slice(mdat_raw))?;
frame.finish()?;
*group = Some(g);
}
if let (Some(min), Some(max), Some(min_duration)) = (min_timestamp, max_timestamp, track.min_duration) {
let jitter = max - min + min_duration;
if jitter < track.jitter.unwrap_or(Timestamp::MAX) {
track.jitter = Some(jitter);
let mut catalog = self.catalog.lock();
match track.kind {
TrackKind::Video => {
let config = catalog
.video
.renditions
.get_mut(&track.producer.info().name)
.context("missing video config")?;
config.jitter = Some(jitter.convert()?);
}
TrackKind::Audio => {
let config = catalog
.audio
.renditions
.get_mut(&track.producer.info().name)
.context("missing audio config")?;
config.jitter = Some(jitter.convert()?);
}
}
}
}
}
Ok(())
}
}
impl Fmp4 {
pub fn finish(&mut self) -> anyhow::Result<()> {
for track in self.tracks.values_mut() {
match &mut track.producer {
Fmp4Producer::Manual { track: raw, group } => {
if let Some(mut g) = group.take() {
g.finish()?;
}
raw.finish()?;
}
Fmp4Producer::Ordered(ordered) => {
ordered.finish()?;
}
}
}
Ok(())
}
}
impl Drop for Fmp4 {
fn drop(&mut self) {
let mut catalog = self.catalog.lock();
for track in self.tracks.values() {
match track.kind {
TrackKind::Video => catalog.video.remove(&track.producer.info().name).is_some(),
TrackKind::Audio => catalog.audio.remove(&track.producer.info().name).is_some(),
};
}
}
}
fn build_aac_audio_specific_config(profile: u8, sample_rate: u32, channels: u32) -> Bytes {
let profile = profile & 0x1F;
let freq_index: u8 = match sample_rate {
96000 => 0,
88200 => 1,
64000 => 2,
48000 => 3,
44100 => 4,
32000 => 5,
24000 => 6,
22050 => 7,
16000 => 8,
12000 => 9,
11025 => 10,
8000 => 11,
7350 => 12,
_ => 0xF, };
if freq_index != 0xF {
let b0 = (profile << 3) | (freq_index >> 1);
let b1 = ((freq_index & 1) << 7) | ((channels as u8 & 0x0F) << 3);
Bytes::from(vec![b0, b1])
} else {
let mut bits: u64 = 0;
bits |= (profile as u64) << 35;
bits |= 0xF_u64 << 31;
bits |= (sample_rate as u64) << 7;
bits |= ((channels as u64) & 0xF) << 3;
let all = bits.to_be_bytes();
Bytes::copy_from_slice(&all[3..8])
}
}