use anyhow::{self, Context};
use bytes::{Buf, Bytes};
use moq_transport::serve::{GroupWriter, GroupsWriter, TrackWriter, TracksWriter};
use mp4::{self, ReadBox, TrackType};
use serde_json::json;
use std::cmp::max;
use std::collections::HashMap;
use std::io::Cursor;
use std::time;
pub struct Media {
tracks: HashMap<u32, Track>,
broadcast: TracksWriter,
init: GroupsWriter,
catalog: GroupsWriter,
ftyp: Option<Bytes>,
moov: Option<mp4::MoovBox>,
current: Option<u32>,
}
impl Media {
pub fn new(mut broadcast: TracksWriter) -> anyhow::Result<Self> {
let catalog = broadcast.create(".catalog").context("broadcast closed")?.groups()?;
let init = broadcast.create("0.mp4").context("broadcast closed")?.groups()?;
Ok(Media {
tracks: Default::default(),
broadcast,
catalog,
init,
ftyp: None,
moov: None,
current: None,
})
}
pub fn parse<B: Buf>(&mut self, buf: &mut B) -> anyhow::Result<()> {
while self.parse_atom(buf)? {}
Ok(())
}
fn parse_atom<B: Buf>(&mut self, buf: &mut B) -> anyhow::Result<bool> {
let atom = match next_atom(buf)? {
Some(atom) => atom,
None => return Ok(false),
};
let mut reader = Cursor::new(&atom);
let header = mp4::BoxHeader::read(&mut reader)?;
match header.name {
mp4::BoxType::FtypBox => {
if self.ftyp.is_some() {
anyhow::bail!("multiple ftyp atoms");
}
self.ftyp = Some(atom)
}
mp4::BoxType::MoovBox => {
if self.moov.is_some() {
anyhow::bail!("multiple moov atoms");
}
let moov = mp4::MoovBox::read_box(&mut reader, header.size)?;
self.setup(&moov, atom)?;
self.moov = Some(moov);
}
mp4::BoxType::MoofBox => {
let moof = mp4::MoofBox::read_box(&mut reader, header.size)?;
let fragment = Fragment::new(moof)?;
if fragment.keyframe {
if self
.tracks
.get(&fragment.track)
.context("failed to find track")?
.handler == TrackType::Video
{
for track in self.tracks.values_mut() {
track.end_group();
}
}
}
let track = self.tracks.get_mut(&fragment.track).context("failed to find track")?;
anyhow::ensure!(self.current.is_none(), "multiple moof atoms");
self.current.replace(fragment.track);
track.header(atom, fragment).context("failed to publish moof")?;
}
mp4::BoxType::MdatBox => {
let track = self.current.take().context("missing moof")?;
let track = self.tracks.get_mut(&track).context("failed to find track")?;
track.data(atom).context("failed to publish mdat")?;
}
_ => {
}
}
Ok(true)
}
fn setup(&mut self, moov: &mp4::MoovBox, raw: Bytes) -> anyhow::Result<()> {
for trak in &moov.traks {
let id = trak.tkhd.track_id;
let name = format!("{}.m4s", id);
let timescale = track_timescale(moov, id);
let handler = (&trak.mdia.hdlr.handler_type).try_into()?;
let track = self.broadcast.create(&name).context("broadcast closed")?;
let track = Track::new(track, handler, timescale);
self.tracks.insert(id, track);
}
let mut init = self.ftyp.clone().context("missing ftyp")?.to_vec();
init.extend_from_slice(&raw);
self.init.append(0)?.write(init.into())?;
let mut tracks = Vec::new();
for trak in &moov.traks {
let mut track = json!({
"container": "mp4",
"init_track": "0.mp4",
"data_track": format!("{}.m4s", trak.tkhd.track_id),
});
let stsd = &trak.mdia.minf.stbl.stsd;
if let Some(avc1) = &stsd.avc1 {
let profile = avc1.avcc.avc_profile_indication;
let constraints = avc1.avcc.profile_compatibility; let level = avc1.avcc.avc_level_indication;
let width = avc1.width;
let height = avc1.height;
let codec = rfc6381_codec::Codec::avc1(profile, constraints, level);
let codec_str = codec.to_string();
track["kind"] = json!("video");
track["codec"] = json!(codec_str);
track["width"] = json!(width);
track["height"] = json!(height);
} else if let Some(_hev1) = &stsd.hev1 {
anyhow::bail!("HEVC not yet supported")
} else if let Some(mp4a) = &stsd.mp4a {
let desc = &mp4a
.esds
.as_ref()
.context("missing esds box for MP4a")?
.es_desc
.dec_config;
let codec_str = format!("mp4a.{:02x}.{}", desc.object_type_indication, desc.dec_specific.profile);
track["kind"] = json!("audio");
track["codec"] = json!(codec_str);
track["channel_count"] = json!(mp4a.channelcount);
track["sample_rate"] = json!(mp4a.samplerate.value());
track["sample_size"] = json!(mp4a.samplesize);
let bitrate = max(desc.max_bitrate, desc.avg_bitrate);
if bitrate > 0 {
track["bit_rate"] = json!(bitrate);
}
} else if let Some(vp09) = &stsd.vp09 {
let vpcc = &vp09.vpcc;
let codec_str = format!("vp09.0.{:02x}.{:02x}.{:02x}", vpcc.profile, vpcc.level, vpcc.bit_depth);
track["kind"] = json!("video");
track["codec"] = json!(codec_str);
track["width"] = json!(vp09.width); track["height"] = json!(vp09.height); anyhow::bail!("VP9 not yet supported")
} else {
anyhow::bail!("unknown codec for track: {}", trak.tkhd.track_id);
}
tracks.push(track);
}
let catalog = json!({
"tracks": tracks
});
let catalog_str = serde_json::to_string_pretty(&catalog)?;
log::info!("catalog: {}", catalog_str);
self.catalog.append(0)?.write(catalog_str.into())?;
Ok(())
}
}
fn next_atom<B: Buf>(buf: &mut B) -> anyhow::Result<Option<Bytes>> {
let mut peek = Cursor::new(buf.chunk());
if peek.remaining() < 8 {
if buf.remaining() != buf.chunk().len() {
anyhow::bail!("TODO: vectored Buf not yet supported");
}
return Ok(None);
}
let size = peek.get_u32();
let _type = peek.get_u32();
let size = match size {
0 => anyhow::bail!("TODO: unsupported EOF atom"),
1 => {
let size_ext = peek.get_u64();
anyhow::ensure!(size_ext >= 16, "impossible extended box size: {}", size_ext);
size_ext as usize
}
2..=7 => {
anyhow::bail!("impossible box size: {}", size)
}
size => size as usize,
};
if buf.remaining() < size {
return Ok(None);
}
let atom = buf.copy_to_bytes(size);
Ok(Some(atom))
}
struct Track {
track: GroupsWriter,
current: Option<GroupWriter>,
timescale: u64,
handler: TrackType,
}
impl Track {
fn new(track: TrackWriter, handler: TrackType, timescale: u64) -> Self {
Self {
track: track.groups().unwrap(),
current: None,
timescale,
handler,
}
}
pub fn header(&mut self, raw: Bytes, fragment: Fragment) -> anyhow::Result<()> {
if let Some(current) = self.current.as_mut() {
current.write(raw)?;
return Ok(());
}
let timestamp: u32 = fragment
.timestamp(self.timescale)
.as_millis()
.try_into()
.context("timestamp too large")?;
let priority = u32::MAX.checked_sub(timestamp).context("priority too large")?.into();
let mut segment = self.track.append(priority)?;
segment.write(raw)?;
self.current = Some(segment);
Ok(())
}
pub fn data(&mut self, raw: Bytes) -> anyhow::Result<()> {
let segment = self.current.as_mut().context("missing current fragment")?;
segment.write(raw)?;
Ok(())
}
pub fn end_group(&mut self) {
self.current = None;
}
}
struct Fragment {
track: u32,
timestamp: u64,
keyframe: bool,
}
impl Fragment {
fn new(moof: mp4::MoofBox) -> anyhow::Result<Self> {
anyhow::ensure!(moof.trafs.len() == 1, "multiple tracks per moof atom");
let track = moof.trafs[0].tfhd.track_id;
let timestamp = sample_timestamp(&moof).expect("couldn't find timestamp");
let keyframe = sample_keyframe(&moof);
Ok(Self {
track,
timestamp,
keyframe,
})
}
fn timestamp(&self, timescale: u64) -> time::Duration {
time::Duration::from_millis(1000 * self.timestamp / timescale)
}
}
fn sample_timestamp(moof: &mp4::MoofBox) -> Option<u64> {
Some(moof.trafs.first()?.tfdt.as_ref()?.base_media_decode_time)
}
fn sample_keyframe(moof: &mp4::MoofBox) -> bool {
for traf in &moof.trafs {
let default_flags = traf.tfhd.default_sample_flags.unwrap_or_default();
let trun = match &traf.trun {
Some(t) => t,
None => return false,
};
for i in 0..trun.sample_count {
let mut flags = match trun.sample_flags.get(i as usize) {
Some(f) => *f,
None => default_flags,
};
if i == 0 && trun.first_sample_flags.is_some() {
flags = trun.first_sample_flags.unwrap();
}
let keyframe = (flags >> 24) & 0x3 == 0x2; let non_sync = (flags >> 16) & 0x1 == 0x1; if keyframe && !non_sync {
return true;
}
}
}
false
}
fn track_timescale(moov: &mp4::MoovBox, track_id: u32) -> u64 {
let trak = moov
.traks
.iter()
.find(|trak| trak.tkhd.track_id == track_id)
.expect("failed to find trak");
trak.mdia.mdhd.timescale as u64
}