Skip to main content

moq_pub/
media.rs

1// SPDX-FileCopyrightText: 2024-2026 Cloudflare Inc., Luke Curley, Mike English and contributors
2// SPDX-FileCopyrightText: 2023-2024 Luke Curley and contributors
3// SPDX-License-Identifier: MIT OR Apache-2.0
4
5use anyhow::{self, Context};
6use bytes::{Buf, BufMut, Bytes, BytesMut};
7use moq_transport::serve::{SubgroupWriter, SubgroupsWriter, TrackWriter, TracksWriter};
8use mp4::{self, ReadBox, TrackType};
9use std::cmp::max;
10use std::collections::HashMap;
11use std::io::Cursor;
12use std::time;
13
14pub struct Media {
15    // Tracks based on their track ID.
16    tracks: HashMap<u32, Track>,
17
18    // The full broadcast of tracks
19    broadcast: TracksWriter,
20
21    // The init and catalog tracks
22    init: SubgroupsWriter,
23    catalog: SubgroupsWriter,
24
25    // The ftyp and moov atoms at the start of the file.
26    ftyp: Option<Bytes>,
27    moov: Option<mp4::MoovBox>,
28
29    // The current track name
30    current: Option<u32>,
31}
32
33impl Media {
34    pub fn new(mut broadcast: TracksWriter) -> anyhow::Result<Self> {
35        let catalog = broadcast
36            .create(".catalog")
37            .context("broadcast closed")?
38            .subgroups()?;
39        let init = broadcast
40            .create("0.mp4")
41            .context("broadcast closed")?
42            .subgroups()?;
43
44        Ok(Media {
45            tracks: Default::default(),
46            broadcast,
47            catalog,
48            init,
49            ftyp: None,
50            moov: None,
51            current: None,
52        })
53    }
54
55    pub fn reset(&mut self) {
56        for track in self.tracks.values_mut() {
57            track.end_group();
58        }
59    }
60
61    // Parse the input buffer, reading any full atoms we can find.
62    // Keep appending more data and calling parse.
63    pub fn parse<B: Buf>(&mut self, buf: &mut B) -> anyhow::Result<()> {
64        while self.parse_atom(buf)? {}
65        Ok(())
66    }
67
68    fn parse_atom<B: Buf>(&mut self, buf: &mut B) -> anyhow::Result<bool> {
69        let atom = match next_atom(buf)? {
70            Some(atom) => atom,
71            None => return Ok(false),
72        };
73
74        let mut reader = Cursor::new(&atom);
75        let header = mp4::BoxHeader::read(&mut reader)?;
76
77        match header.name {
78            mp4::BoxType::FtypBox => {
79                if self.ftyp.is_some() {
80                    tracing::debug!("multiple ftyp atoms");
81                    return Ok(true);
82                }
83
84                // Save the ftyp atom for later.
85                self.ftyp = Some(atom)
86            }
87            mp4::BoxType::MoovBox => {
88                if self.moov.is_some() {
89                    tracing::debug!("multiple moov atoms");
90                    return Ok(true);
91                }
92
93                // Parse the moov box so we can detect the timescales for each track.
94                let moov = mp4::MoovBox::read_box(&mut reader, header.size)?;
95
96                self.setup(&moov, atom)?;
97                self.moov = Some(moov);
98            }
99            mp4::BoxType::MoofBox => {
100                let moof = mp4::MoofBox::read_box(&mut reader, header.size)?;
101
102                // Process the moof.
103                let fragment = Fragment::new(moof)?;
104
105                if fragment.keyframe {
106                    // Gross but thanks to rust we have to do a separate hashmap lookup
107                    if self
108                        .tracks
109                        .get(&fragment.track)
110                        .context("failed to find track")?
111                        .handler
112                        == TrackType::Video
113                    {
114                        // Start a new group for the keyframe.
115                        for track in self.tracks.values_mut() {
116                            track.end_group();
117                        }
118                    }
119                }
120
121                // Get the track for this moof.
122                let track = self
123                    .tracks
124                    .get_mut(&fragment.track)
125                    .context("failed to find track")?;
126
127                // Save the track ID for the next iteration, which must be a mdat.
128                anyhow::ensure!(self.current.is_none(), "multiple moof atoms");
129                self.current.replace(fragment.track);
130
131                // Publish the moof header, creating a new segment if it's a keyframe.
132                track
133                    .header(atom, fragment)
134                    .context("failed to publish moof")?;
135            }
136            mp4::BoxType::MdatBox => {
137                // Get the track ID from the previous moof.
138                let track = self.current.take().context("missing moof")?;
139                let track = self
140                    .tracks
141                    .get_mut(&track)
142                    .context("failed to find track")?;
143
144                // Publish the mdat atom.
145                track.data(atom).context("failed to publish mdat")?;
146            }
147
148            _ => {
149                // Skip unknown atoms
150            }
151        }
152
153        Ok(true)
154    }
155
156    fn setup(&mut self, moov: &mp4::MoovBox, raw: Bytes) -> anyhow::Result<()> {
157        // Combine the ftyp+moov atoms into a single object.
158        let mut init = self.ftyp.clone().context("missing ftyp")?.to_vec();
159        init.extend_from_slice(&raw);
160
161        // Create the catalog track with a single segment.
162        self.init.append(0)?.write(init.into())?;
163
164        let mut tracks = Vec::new();
165
166        // Produce the catalog
167        for trak in &moov.traks {
168            let id = trak.tkhd.track_id;
169            let name = format!("{id}.m4s");
170
171            let timescale = track_timescale(moov, id);
172            let handler = (&trak.mdia.hdlr.handler_type).try_into()?;
173
174            let mut selection_params = moq_catalog::SelectionParam::default();
175
176            let mut track = moq_catalog::Track {
177                init_track: Some(self.init.name.clone()),
178                name: name.clone(),
179                namespace: Some(self.broadcast.namespace.to_utf8_path()),
180                packaging: Some(moq_catalog::TrackPackaging::Cmaf),
181                render_group: Some(1),
182                ..Default::default()
183            };
184
185            let stsd = &trak.mdia.minf.stbl.stsd;
186
187            if let Some(avc1) = &stsd.avc1 {
188                // avc1[.PPCCLL]
189                //
190                // let profile = 0x64;
191                // let constraints = 0x00;
192                // let level = 0x1f;
193                let profile = avc1.avcc.avc_profile_indication;
194                let constraints = avc1.avcc.profile_compatibility; // Not 100% certain here, but it's 0x00 on my current test video
195                let level = avc1.avcc.avc_level_indication;
196
197                let width = avc1.width;
198                let height = avc1.height;
199
200                let codec = rfc6381_codec::Codec::avc1(profile, constraints, level);
201                let codec_str = codec.to_string();
202
203                selection_params.codec = Some(codec_str);
204                selection_params.width = Some(width.into());
205                selection_params.height = Some(height.into());
206            } else if let Some(_hev1) = &stsd.hev1 {
207                // TODO https://github.com/gpac/mp4box.js/blob/325741b592d910297bf609bc7c400fc76101077b/src/box-codecs.js#L106
208                anyhow::bail!("HEVC not yet supported")
209            } else if let Some(mp4a) = &stsd.mp4a {
210                let desc = &mp4a
211                    .esds
212                    .as_ref()
213                    .context("missing esds box for MP4a")?
214                    .es_desc
215                    .dec_config;
216                let codec_str = format!(
217                    "mp4a.{:02x}.{}",
218                    desc.object_type_indication, desc.dec_specific.profile
219                );
220
221                selection_params.codec = Some(codec_str);
222                selection_params.channel_config = Some(mp4a.channelcount.to_string());
223                selection_params.samplerate = Some(mp4a.samplerate.value().into());
224
225                let bitrate = max(desc.max_bitrate, desc.avg_bitrate);
226                if bitrate > 0 {
227                    selection_params.bitrate = Some(bitrate);
228                }
229            } else if let Some(vp09) = &stsd.vp09 {
230                // https://github.com/gpac/mp4box.js/blob/325741b592d910297bf609bc7c400fc76101077b/src/box-codecs.js#L238
231                let vpcc = &vp09.vpcc;
232                let codec_str = format!(
233                    "vp09.0.{:02x}.{:02x}.{:02x}",
234                    vpcc.profile, vpcc.level, vpcc.bit_depth
235                );
236
237                selection_params.codec = Some(codec_str);
238                selection_params.width = Some(vp09.width.into());
239                selection_params.height = Some(vp09.height.into());
240
241                // TODO Test if this actually works; I'm just guessing based on mp4box.js
242                anyhow::bail!("VP9 not yet supported")
243            } else {
244                // TODO add av01 support: https://github.com/gpac/mp4box.js/blob/325741b592d910297bf609bc7c400fc76101077b/src/box-codecs.js#L251
245                anyhow::bail!("unknown codec for track: {}", trak.tkhd.track_id);
246            }
247
248            track.selection_params = selection_params;
249
250            tracks.push(track);
251
252            // Store the track publisher in a map so we can update it later.
253            let track = self.broadcast.create(&name).context("broadcast closed")?;
254            let track = Track::new(track, handler, timescale);
255            self.tracks.insert(id, track);
256        }
257
258        let catalog = moq_catalog::Root {
259            version: 1,
260            streaming_format: 1,
261            streaming_format_version: "0.2".to_string(),
262            streaming_delta_updates: true,
263            common_track_fields: moq_catalog::CommonTrackFields::from_tracks(&mut tracks),
264            tracks,
265        };
266
267        let catalog_str = serde_json::to_string_pretty(&catalog)?;
268
269        tracing::info!("catalog: {}", catalog_str);
270
271        // Create a single fragment for the segment.
272        self.catalog.append(0)?.write(catalog_str.into())?;
273
274        Ok(())
275    }
276}
277
278// Find the next full atom in the buffer.
279// TODO return the amount of data still needed in Err?
280fn next_atom<B: Buf>(buf: &mut B) -> anyhow::Result<Option<Bytes>> {
281    let mut peek = Cursor::new(buf.chunk());
282
283    if peek.remaining() < 8 {
284        if buf.remaining() != buf.chunk().len() {
285            // TODO figure out a way to peek at the first 8 bytes
286            anyhow::bail!("TODO: vectored Buf not yet supported");
287        }
288
289        return Ok(None);
290    }
291
292    // Convert the first 4 bytes into the size.
293    let size = peek.get_u32();
294    let _type = peek.get_u32();
295
296    let size = match size {
297        // Runs until the end of the file.
298        0 => anyhow::bail!("TODO: unsupported EOF atom"),
299
300        // The next 8 bytes are the extended size to be used instead.
301        1 => {
302            let size_ext = peek.get_u64();
303            anyhow::ensure!(size_ext >= 16, "impossible extended box size: {}", size_ext);
304            size_ext as usize
305        }
306
307        2..=7 => {
308            anyhow::bail!("impossible box size: {}", size)
309        }
310
311        size => size as usize,
312    };
313
314    if buf.remaining() < size {
315        return Ok(None);
316    }
317
318    let atom = buf.copy_to_bytes(size);
319
320    Ok(Some(atom))
321}
322
323struct Track {
324    // The track we're producing
325    track: SubgroupsWriter,
326
327    // The current segment
328    current: Option<SubgroupWriter>,
329
330    // Pending moof header bytes, waiting to be combined with mdat.
331    // Per CMSF (draft-ietf-moq-cmsf-00 §3.3), each MoQ Object must contain
332    // at least one complete CMAF Chunk (moof+mdat pair).
333    pending_moof: Option<Bytes>,
334
335    // The number of units per second.
336    timescale: u64,
337
338    // The type of track, ex. "vide" or "soun"
339    handler: TrackType,
340}
341
342impl Track {
343    fn new(track: TrackWriter, handler: TrackType, timescale: u64) -> Self {
344        Self {
345            track: track.subgroups().unwrap(),
346            current: None,
347            pending_moof: None,
348            timescale,
349            handler,
350        }
351    }
352
353    pub fn header(&mut self, raw: Bytes, fragment: Fragment) -> anyhow::Result<()> {
354        if self.current.is_some() {
355            // Use the existing segment — just stash the moof for now.
356            debug_assert!(self.pending_moof.is_none(), "overwriting pending moof");
357            self.pending_moof = Some(raw);
358            return Ok(());
359        }
360
361        // Otherwise make a new segment
362
363        let _timestamp: u32 = fragment
364            .timestamp(self.timescale)
365            .as_millis()
366            .try_into()
367            .context("timestamp too large")?;
368        // Prioritize each group equally for now
369        // (A u8 doesn't give us granularity for ms since epoch)
370        // TODO: Revisit post draft-05 prioritization
371        let priority: u8 = 127;
372
373        // Create a new segment.
374        let segment = self.track.append(priority)?;
375
376        println!(
377            "timestamp: {:?} segment: {:?}:{:?} priority: {:?}",
378            fragment.timestamp, segment.info.group_id, segment.info.subgroup_id, priority
379        );
380
381        // Stash the moof — it will be combined with mdat in data().
382        self.pending_moof = Some(raw);
383
384        // Save for the next iteration
385        self.current = Some(segment);
386
387        Ok(())
388    }
389
390    pub fn data(&mut self, raw: Bytes) -> anyhow::Result<()> {
391        let moof = self.pending_moof.take().context("missing pending moof")?;
392        let segment = self.current.as_mut().context("missing current fragment")?;
393        // Combine moof+mdat into a single MoQ Object (CMSF §3.3 compliance).
394        let mut combined = BytesMut::with_capacity(moof.len() + raw.len());
395        combined.put_slice(&moof);
396        combined.put_slice(&raw);
397        segment.write(combined.freeze())?;
398
399        Ok(())
400    }
401
402    pub fn end_group(&mut self) {
403        self.current = None;
404        self.pending_moof = None;
405    }
406}
407
408struct Fragment {
409    // The track for this fragment.
410    track: u32,
411
412    // The timestamp of the first sample in this fragment, in timescale units.
413    timestamp: u64,
414
415    // True if this fragment is a keyframe.
416    keyframe: bool,
417}
418
419impl Fragment {
420    fn new(moof: mp4::MoofBox) -> anyhow::Result<Self> {
421        // We can't split the mdat atom, so this is impossible to support
422        anyhow::ensure!(moof.trafs.len() == 1, "multiple tracks per moof atom");
423        let track = moof.trafs[0].tfhd.track_id;
424
425        // Parse the moof to get some timing information to sleep.
426        let timestamp = sample_timestamp(&moof).expect("couldn't find timestamp");
427
428        // Detect if we should start a new segment.
429        let keyframe = sample_keyframe(&moof);
430
431        Ok(Self {
432            track,
433            timestamp,
434            keyframe,
435        })
436    }
437
438    // Convert from timescale units to a duration.
439    fn timestamp(&self, timescale: u64) -> time::Duration {
440        time::Duration::from_millis(1000 * self.timestamp / timescale)
441    }
442}
443
444fn sample_timestamp(moof: &mp4::MoofBox) -> Option<u64> {
445    Some(moof.trafs.first()?.tfdt.as_ref()?.base_media_decode_time)
446}
447
448fn sample_keyframe(moof: &mp4::MoofBox) -> bool {
449    for traf in &moof.trafs {
450        // TODO trak default flags if this is None
451        let default_flags = traf.tfhd.default_sample_flags.unwrap_or_default();
452        let trun = match &traf.trun {
453            Some(t) => t,
454            None => return false,
455        };
456
457        for i in 0..trun.sample_count {
458            let mut flags = match trun.sample_flags.get(i as usize) {
459                Some(f) => *f,
460                None => default_flags,
461            };
462
463            if i == 0 {
464                if let Some(first_flags) = trun.first_sample_flags {
465                    flags = first_flags;
466                }
467            }
468
469            // https://chromium.googlesource.com/chromium/src/media/+/master/formats/mp4/track_run_iterator.cc#177
470            let keyframe = (flags >> 24) & 0x3 == 0x2; // kSampleDependsOnNoOther
471            let non_sync = (flags >> 16) & 0x1 == 0x1; // kSampleIsNonSyncSample
472
473            if keyframe && !non_sync {
474                return true;
475            }
476        }
477    }
478
479    false
480}
481
482// Find the timescale for the given track.
483fn track_timescale(moov: &mp4::MoovBox, track_id: u32) -> u64 {
484    let trak = moov
485        .traks
486        .iter()
487        .find(|trak| trak.tkhd.track_id == track_id)
488        .expect("failed to find trak");
489
490    trak.mdia.mdhd.timescale as u64
491}