Skip to main content

moq_pub/
media.rs

1use anyhow::{self, Context};
2use bytes::{Buf, Bytes};
3use moq_transport::serve::{SubgroupWriter, SubgroupsWriter, TrackWriter, TracksWriter};
4use mp4::{self, ReadBox, TrackType};
5use std::cmp::max;
6use std::collections::HashMap;
7use std::io::Cursor;
8use std::time;
9
10pub struct Media {
11    // Tracks based on their track ID.
12    tracks: HashMap<u32, Track>,
13
14    // The full broadcast of tracks
15    broadcast: TracksWriter,
16
17    // The init and catalog tracks
18    init: SubgroupsWriter,
19    catalog: SubgroupsWriter,
20
21    // The ftyp and moov atoms at the start of the file.
22    ftyp: Option<Bytes>,
23    moov: Option<mp4::MoovBox>,
24
25    // The current track name
26    current: Option<u32>,
27}
28
29impl Media {
30    pub fn new(mut broadcast: TracksWriter) -> anyhow::Result<Self> {
31        let catalog = broadcast
32            .create(".catalog")
33            .context("broadcast closed")?
34            .subgroups()?;
35        let init = broadcast
36            .create("0.mp4")
37            .context("broadcast closed")?
38            .subgroups()?;
39
40        Ok(Media {
41            tracks: Default::default(),
42            broadcast,
43            catalog,
44            init,
45            ftyp: None,
46            moov: None,
47            current: None,
48        })
49    }
50
51    pub fn reset(&mut self) {
52        for track in self.tracks.values_mut() {
53            track.end_group();
54        }
55    }
56
57    // Parse the input buffer, reading any full atoms we can find.
58    // Keep appending more data and calling parse.
59    pub fn parse<B: Buf>(&mut self, buf: &mut B) -> anyhow::Result<()> {
60        while self.parse_atom(buf)? {}
61        Ok(())
62    }
63
64    fn parse_atom<B: Buf>(&mut self, buf: &mut B) -> anyhow::Result<bool> {
65        let atom = match next_atom(buf)? {
66            Some(atom) => atom,
67            None => return Ok(false),
68        };
69
70        let mut reader = Cursor::new(&atom);
71        let header = mp4::BoxHeader::read(&mut reader)?;
72
73        match header.name {
74            mp4::BoxType::FtypBox => {
75                if self.ftyp.is_some() {
76                    tracing::debug!("multiple ftyp atoms");
77                    return Ok(true);
78                }
79
80                // Save the ftyp atom for later.
81                self.ftyp = Some(atom)
82            }
83            mp4::BoxType::MoovBox => {
84                if self.moov.is_some() {
85                    tracing::debug!("multiple moov atoms");
86                    return Ok(true);
87                }
88
89                // Parse the moov box so we can detect the timescales for each track.
90                let moov = mp4::MoovBox::read_box(&mut reader, header.size)?;
91
92                self.setup(&moov, atom)?;
93                self.moov = Some(moov);
94            }
95            mp4::BoxType::MoofBox => {
96                let moof = mp4::MoofBox::read_box(&mut reader, header.size)?;
97
98                // Process the moof.
99                let fragment = Fragment::new(moof)?;
100
101                if fragment.keyframe {
102                    // Gross but thanks to rust we have to do a separate hashmap lookup
103                    if self
104                        .tracks
105                        .get(&fragment.track)
106                        .context("failed to find track")?
107                        .handler
108                        == TrackType::Video
109                    {
110                        // Start a new group for the keyframe.
111                        for track in self.tracks.values_mut() {
112                            track.end_group();
113                        }
114                    }
115                }
116
117                // Get the track for this moof.
118                let track = self
119                    .tracks
120                    .get_mut(&fragment.track)
121                    .context("failed to find track")?;
122
123                // Save the track ID for the next iteration, which must be a mdat.
124                anyhow::ensure!(self.current.is_none(), "multiple moof atoms");
125                self.current.replace(fragment.track);
126
127                // Publish the moof header, creating a new segment if it's a keyframe.
128                track
129                    .header(atom, fragment)
130                    .context("failed to publish moof")?;
131            }
132            mp4::BoxType::MdatBox => {
133                // Get the track ID from the previous moof.
134                let track = self.current.take().context("missing moof")?;
135                let track = self
136                    .tracks
137                    .get_mut(&track)
138                    .context("failed to find track")?;
139
140                // Publish the mdat atom.
141                track.data(atom).context("failed to publish mdat")?;
142            }
143
144            _ => {
145                // Skip unknown atoms
146            }
147        }
148
149        Ok(true)
150    }
151
152    fn setup(&mut self, moov: &mp4::MoovBox, raw: Bytes) -> anyhow::Result<()> {
153        // Combine the ftyp+moov atoms into a single object.
154        let mut init = self.ftyp.clone().context("missing ftyp")?.to_vec();
155        init.extend_from_slice(&raw);
156
157        // Create the catalog track with a single segment.
158        self.init.append(0)?.write(init.into())?;
159
160        let mut tracks = Vec::new();
161
162        // Produce the catalog
163        for trak in &moov.traks {
164            let id = trak.tkhd.track_id;
165            let name = format!("{id}.m4s");
166
167            let timescale = track_timescale(moov, id);
168            let handler = (&trak.mdia.hdlr.handler_type).try_into()?;
169
170            let mut selection_params = moq_catalog::SelectionParam::default();
171
172            let mut track = moq_catalog::Track {
173                init_track: Some(self.init.name.clone()),
174                name: name.clone(),
175                namespace: Some(self.broadcast.namespace.to_utf8_path()),
176                packaging: Some(moq_catalog::TrackPackaging::Cmaf),
177                render_group: Some(1),
178                ..Default::default()
179            };
180
181            let stsd = &trak.mdia.minf.stbl.stsd;
182
183            if let Some(avc1) = &stsd.avc1 {
184                // avc1[.PPCCLL]
185                //
186                // let profile = 0x64;
187                // let constraints = 0x00;
188                // let level = 0x1f;
189                let profile = avc1.avcc.avc_profile_indication;
190                let constraints = avc1.avcc.profile_compatibility; // Not 100% certain here, but it's 0x00 on my current test video
191                let level = avc1.avcc.avc_level_indication;
192
193                let width = avc1.width;
194                let height = avc1.height;
195
196                let codec = rfc6381_codec::Codec::avc1(profile, constraints, level);
197                let codec_str = codec.to_string();
198
199                selection_params.codec = Some(codec_str);
200                selection_params.width = Some(width.into());
201                selection_params.height = Some(height.into());
202            } else if let Some(_hev1) = &stsd.hev1 {
203                // TODO https://github.com/gpac/mp4box.js/blob/325741b592d910297bf609bc7c400fc76101077b/src/box-codecs.js#L106
204                anyhow::bail!("HEVC not yet supported")
205            } else if let Some(mp4a) = &stsd.mp4a {
206                let desc = &mp4a
207                    .esds
208                    .as_ref()
209                    .context("missing esds box for MP4a")?
210                    .es_desc
211                    .dec_config;
212                let codec_str = format!(
213                    "mp4a.{:02x}.{}",
214                    desc.object_type_indication, desc.dec_specific.profile
215                );
216
217                selection_params.codec = Some(codec_str);
218                selection_params.channel_config = Some(mp4a.channelcount.to_string());
219                selection_params.samplerate = Some(mp4a.samplerate.value().into());
220
221                let bitrate = max(desc.max_bitrate, desc.avg_bitrate);
222                if bitrate > 0 {
223                    selection_params.bitrate = Some(bitrate);
224                }
225            } else if let Some(vp09) = &stsd.vp09 {
226                // https://github.com/gpac/mp4box.js/blob/325741b592d910297bf609bc7c400fc76101077b/src/box-codecs.js#L238
227                let vpcc = &vp09.vpcc;
228                let codec_str = format!(
229                    "vp09.0.{:02x}.{:02x}.{:02x}",
230                    vpcc.profile, vpcc.level, vpcc.bit_depth
231                );
232
233                selection_params.codec = Some(codec_str);
234                selection_params.width = Some(vp09.width.into());
235                selection_params.height = Some(vp09.height.into());
236
237                // TODO Test if this actually works; I'm just guessing based on mp4box.js
238                anyhow::bail!("VP9 not yet supported")
239            } else {
240                // TODO add av01 support: https://github.com/gpac/mp4box.js/blob/325741b592d910297bf609bc7c400fc76101077b/src/box-codecs.js#L251
241                anyhow::bail!("unknown codec for track: {}", trak.tkhd.track_id);
242            }
243
244            track.selection_params = selection_params;
245
246            tracks.push(track);
247
248            // Store the track publisher in a map so we can update it later.
249            let track = self.broadcast.create(&name).context("broadcast closed")?;
250            let track = Track::new(track, handler, timescale);
251            self.tracks.insert(id, track);
252        }
253
254        let catalog = moq_catalog::Root {
255            version: 1,
256            streaming_format: 1,
257            streaming_format_version: "0.2".to_string(),
258            streaming_delta_updates: true,
259            common_track_fields: moq_catalog::CommonTrackFields::from_tracks(&mut tracks),
260            tracks,
261        };
262
263        let catalog_str = serde_json::to_string_pretty(&catalog)?;
264
265        log::info!("catalog: {}", catalog_str);
266
267        // Create a single fragment for the segment.
268        self.catalog.append(0)?.write(catalog_str.into())?;
269
270        Ok(())
271    }
272}
273
274// Find the next full atom in the buffer.
275// TODO return the amount of data still needed in Err?
276fn next_atom<B: Buf>(buf: &mut B) -> anyhow::Result<Option<Bytes>> {
277    let mut peek = Cursor::new(buf.chunk());
278
279    if peek.remaining() < 8 {
280        if buf.remaining() != buf.chunk().len() {
281            // TODO figure out a way to peek at the first 8 bytes
282            anyhow::bail!("TODO: vectored Buf not yet supported");
283        }
284
285        return Ok(None);
286    }
287
288    // Convert the first 4 bytes into the size.
289    let size = peek.get_u32();
290    let _type = peek.get_u32();
291
292    let size = match size {
293        // Runs until the end of the file.
294        0 => anyhow::bail!("TODO: unsupported EOF atom"),
295
296        // The next 8 bytes are the extended size to be used instead.
297        1 => {
298            let size_ext = peek.get_u64();
299            anyhow::ensure!(size_ext >= 16, "impossible extended box size: {}", size_ext);
300            size_ext as usize
301        }
302
303        2..=7 => {
304            anyhow::bail!("impossible box size: {}", size)
305        }
306
307        size => size as usize,
308    };
309
310    if buf.remaining() < size {
311        return Ok(None);
312    }
313
314    let atom = buf.copy_to_bytes(size);
315
316    Ok(Some(atom))
317}
318
319struct Track {
320    // The track we're producing
321    track: SubgroupsWriter,
322
323    // The current segment
324    current: Option<SubgroupWriter>,
325
326    // The number of units per second.
327    timescale: u64,
328
329    // The type of track, ex. "vide" or "soun"
330    handler: TrackType,
331}
332
333impl Track {
334    fn new(track: TrackWriter, handler: TrackType, timescale: u64) -> Self {
335        Self {
336            track: track.subgroups().unwrap(),
337            current: None,
338            timescale,
339            handler,
340        }
341    }
342
343    pub fn header(&mut self, raw: Bytes, fragment: Fragment) -> anyhow::Result<()> {
344        if let Some(current) = self.current.as_mut() {
345            // Use the existing segment
346            current.write(raw)?;
347            return Ok(());
348        }
349
350        // Otherwise make a new segment
351
352        let _timestamp: u32 = fragment
353            .timestamp(self.timescale)
354            .as_millis()
355            .try_into()
356            .context("timestamp too large")?;
357        // Prioritize each group equally for now
358        // (A u8 doesn't give us granularity for ms since epoch)
359        // TODO: Revisit post draft-05 prioritization
360        let priority: u8 = 127;
361
362        // Create a new segment.
363        let mut segment = self.track.append(priority)?;
364
365        println!(
366            "timestamp: {:?} segment: {:?}:{:?} priority: {:?}",
367            fragment.timestamp, segment.info.group_id, segment.info.subgroup_id, priority
368        );
369
370        // Write the fragment in it's own object.
371        segment.write(raw)?;
372
373        // Save for the next iteration
374        self.current = Some(segment);
375
376        Ok(())
377    }
378
379    pub fn data(&mut self, raw: Bytes) -> anyhow::Result<()> {
380        let segment = self.current.as_mut().context("missing current fragment")?;
381        segment.write(raw)?;
382
383        Ok(())
384    }
385
386    pub fn end_group(&mut self) {
387        self.current = None;
388    }
389}
390
391struct Fragment {
392    // The track for this fragment.
393    track: u32,
394
395    // The timestamp of the first sample in this fragment, in timescale units.
396    timestamp: u64,
397
398    // True if this fragment is a keyframe.
399    keyframe: bool,
400}
401
402impl Fragment {
403    fn new(moof: mp4::MoofBox) -> anyhow::Result<Self> {
404        // We can't split the mdat atom, so this is impossible to support
405        anyhow::ensure!(moof.trafs.len() == 1, "multiple tracks per moof atom");
406        let track = moof.trafs[0].tfhd.track_id;
407
408        // Parse the moof to get some timing information to sleep.
409        let timestamp = sample_timestamp(&moof).expect("couldn't find timestamp");
410
411        // Detect if we should start a new segment.
412        let keyframe = sample_keyframe(&moof);
413
414        Ok(Self {
415            track,
416            timestamp,
417            keyframe,
418        })
419    }
420
421    // Convert from timescale units to a duration.
422    fn timestamp(&self, timescale: u64) -> time::Duration {
423        time::Duration::from_millis(1000 * self.timestamp / timescale)
424    }
425}
426
427fn sample_timestamp(moof: &mp4::MoofBox) -> Option<u64> {
428    Some(moof.trafs.first()?.tfdt.as_ref()?.base_media_decode_time)
429}
430
431fn sample_keyframe(moof: &mp4::MoofBox) -> bool {
432    for traf in &moof.trafs {
433        // TODO trak default flags if this is None
434        let default_flags = traf.tfhd.default_sample_flags.unwrap_or_default();
435        let trun = match &traf.trun {
436            Some(t) => t,
437            None => return false,
438        };
439
440        for i in 0..trun.sample_count {
441            let mut flags = match trun.sample_flags.get(i as usize) {
442                Some(f) => *f,
443                None => default_flags,
444            };
445
446            if i == 0 {
447                if let Some(first_flags) = trun.first_sample_flags {
448                    flags = first_flags;
449                }
450            }
451
452            // https://chromium.googlesource.com/chromium/src/media/+/master/formats/mp4/track_run_iterator.cc#177
453            let keyframe = (flags >> 24) & 0x3 == 0x2; // kSampleDependsOnNoOther
454            let non_sync = (flags >> 16) & 0x1 == 0x1; // kSampleIsNonSyncSample
455
456            if keyframe && !non_sync {
457                return true;
458            }
459        }
460    }
461
462    false
463}
464
465// Find the timescale for the given track.
466fn track_timescale(moov: &mp4::MoovBox, track_id: u32) -> u64 {
467    let trak = moov
468        .traks
469        .iter()
470        .find(|trak| trak.tkhd.track_id == track_id)
471        .expect("failed to find trak");
472
473    trak.mdia.mdhd.timescale as u64
474}