1use anyhow::{self, Context};
2use bytes::{Buf, Bytes};
3use moq_transport::serve::{SubgroupWriter, SubgroupsWriter, TrackWriter, TracksWriter};
4use mp4::{self, ReadBox, TrackType};
5use std::cmp::max;
6use std::collections::HashMap;
7use std::io::Cursor;
8use std::time;
9
10pub struct Media {
11 tracks: HashMap<u32, Track>,
13
14 broadcast: TracksWriter,
16
17 init: SubgroupsWriter,
19 catalog: SubgroupsWriter,
20
21 ftyp: Option<Bytes>,
23 moov: Option<mp4::MoovBox>,
24
25 current: Option<u32>,
27}
28
29impl Media {
30 pub fn new(mut broadcast: TracksWriter) -> anyhow::Result<Self> {
31 let catalog = broadcast
32 .create(".catalog")
33 .context("broadcast closed")?
34 .subgroups()?;
35 let init = broadcast
36 .create("0.mp4")
37 .context("broadcast closed")?
38 .subgroups()?;
39
40 Ok(Media {
41 tracks: Default::default(),
42 broadcast,
43 catalog,
44 init,
45 ftyp: None,
46 moov: None,
47 current: None,
48 })
49 }
50
51 pub fn reset(&mut self) {
52 for track in self.tracks.values_mut() {
53 track.end_group();
54 }
55 }
56
57 pub fn parse<B: Buf>(&mut self, buf: &mut B) -> anyhow::Result<()> {
60 while self.parse_atom(buf)? {}
61 Ok(())
62 }
63
64 fn parse_atom<B: Buf>(&mut self, buf: &mut B) -> anyhow::Result<bool> {
65 let atom = match next_atom(buf)? {
66 Some(atom) => atom,
67 None => return Ok(false),
68 };
69
70 let mut reader = Cursor::new(&atom);
71 let header = mp4::BoxHeader::read(&mut reader)?;
72
73 match header.name {
74 mp4::BoxType::FtypBox => {
75 if self.ftyp.is_some() {
76 tracing::debug!("multiple ftyp atoms");
77 return Ok(true);
78 }
79
80 self.ftyp = Some(atom)
82 }
83 mp4::BoxType::MoovBox => {
84 if self.moov.is_some() {
85 tracing::debug!("multiple moov atoms");
86 return Ok(true);
87 }
88
89 let moov = mp4::MoovBox::read_box(&mut reader, header.size)?;
91
92 self.setup(&moov, atom)?;
93 self.moov = Some(moov);
94 }
95 mp4::BoxType::MoofBox => {
96 let moof = mp4::MoofBox::read_box(&mut reader, header.size)?;
97
98 let fragment = Fragment::new(moof)?;
100
101 if fragment.keyframe {
102 if self
104 .tracks
105 .get(&fragment.track)
106 .context("failed to find track")?
107 .handler
108 == TrackType::Video
109 {
110 for track in self.tracks.values_mut() {
112 track.end_group();
113 }
114 }
115 }
116
117 let track = self
119 .tracks
120 .get_mut(&fragment.track)
121 .context("failed to find track")?;
122
123 anyhow::ensure!(self.current.is_none(), "multiple moof atoms");
125 self.current.replace(fragment.track);
126
127 track
129 .header(atom, fragment)
130 .context("failed to publish moof")?;
131 }
132 mp4::BoxType::MdatBox => {
133 let track = self.current.take().context("missing moof")?;
135 let track = self
136 .tracks
137 .get_mut(&track)
138 .context("failed to find track")?;
139
140 track.data(atom).context("failed to publish mdat")?;
142 }
143
144 _ => {
145 }
147 }
148
149 Ok(true)
150 }
151
152 fn setup(&mut self, moov: &mp4::MoovBox, raw: Bytes) -> anyhow::Result<()> {
153 let mut init = self.ftyp.clone().context("missing ftyp")?.to_vec();
155 init.extend_from_slice(&raw);
156
157 self.init.append(0)?.write(init.into())?;
159
160 let mut tracks = Vec::new();
161
162 for trak in &moov.traks {
164 let id = trak.tkhd.track_id;
165 let name = format!("{id}.m4s");
166
167 let timescale = track_timescale(moov, id);
168 let handler = (&trak.mdia.hdlr.handler_type).try_into()?;
169
170 let mut selection_params = moq_catalog::SelectionParam::default();
171
172 let mut track = moq_catalog::Track {
173 init_track: Some(self.init.name.clone()),
174 name: name.clone(),
175 namespace: Some(self.broadcast.namespace.to_utf8_path()),
176 packaging: Some(moq_catalog::TrackPackaging::Cmaf),
177 render_group: Some(1),
178 ..Default::default()
179 };
180
181 let stsd = &trak.mdia.minf.stbl.stsd;
182
183 if let Some(avc1) = &stsd.avc1 {
184 let profile = avc1.avcc.avc_profile_indication;
190 let constraints = avc1.avcc.profile_compatibility; let level = avc1.avcc.avc_level_indication;
192
193 let width = avc1.width;
194 let height = avc1.height;
195
196 let codec = rfc6381_codec::Codec::avc1(profile, constraints, level);
197 let codec_str = codec.to_string();
198
199 selection_params.codec = Some(codec_str);
200 selection_params.width = Some(width.into());
201 selection_params.height = Some(height.into());
202 } else if let Some(_hev1) = &stsd.hev1 {
203 anyhow::bail!("HEVC not yet supported")
205 } else if let Some(mp4a) = &stsd.mp4a {
206 let desc = &mp4a
207 .esds
208 .as_ref()
209 .context("missing esds box for MP4a")?
210 .es_desc
211 .dec_config;
212 let codec_str = format!(
213 "mp4a.{:02x}.{}",
214 desc.object_type_indication, desc.dec_specific.profile
215 );
216
217 selection_params.codec = Some(codec_str);
218 selection_params.channel_config = Some(mp4a.channelcount.to_string());
219 selection_params.samplerate = Some(mp4a.samplerate.value().into());
220
221 let bitrate = max(desc.max_bitrate, desc.avg_bitrate);
222 if bitrate > 0 {
223 selection_params.bitrate = Some(bitrate);
224 }
225 } else if let Some(vp09) = &stsd.vp09 {
226 let vpcc = &vp09.vpcc;
228 let codec_str = format!(
229 "vp09.0.{:02x}.{:02x}.{:02x}",
230 vpcc.profile, vpcc.level, vpcc.bit_depth
231 );
232
233 selection_params.codec = Some(codec_str);
234 selection_params.width = Some(vp09.width.into());
235 selection_params.height = Some(vp09.height.into());
236
237 anyhow::bail!("VP9 not yet supported")
239 } else {
240 anyhow::bail!("unknown codec for track: {}", trak.tkhd.track_id);
242 }
243
244 track.selection_params = selection_params;
245
246 tracks.push(track);
247
248 let track = self.broadcast.create(&name).context("broadcast closed")?;
250 let track = Track::new(track, handler, timescale);
251 self.tracks.insert(id, track);
252 }
253
254 let catalog = moq_catalog::Root {
255 version: 1,
256 streaming_format: 1,
257 streaming_format_version: "0.2".to_string(),
258 streaming_delta_updates: true,
259 common_track_fields: moq_catalog::CommonTrackFields::from_tracks(&mut tracks),
260 tracks,
261 };
262
263 let catalog_str = serde_json::to_string_pretty(&catalog)?;
264
265 log::info!("catalog: {}", catalog_str);
266
267 self.catalog.append(0)?.write(catalog_str.into())?;
269
270 Ok(())
271 }
272}
273
274fn next_atom<B: Buf>(buf: &mut B) -> anyhow::Result<Option<Bytes>> {
277 let mut peek = Cursor::new(buf.chunk());
278
279 if peek.remaining() < 8 {
280 if buf.remaining() != buf.chunk().len() {
281 anyhow::bail!("TODO: vectored Buf not yet supported");
283 }
284
285 return Ok(None);
286 }
287
288 let size = peek.get_u32();
290 let _type = peek.get_u32();
291
292 let size = match size {
293 0 => anyhow::bail!("TODO: unsupported EOF atom"),
295
296 1 => {
298 let size_ext = peek.get_u64();
299 anyhow::ensure!(size_ext >= 16, "impossible extended box size: {}", size_ext);
300 size_ext as usize
301 }
302
303 2..=7 => {
304 anyhow::bail!("impossible box size: {}", size)
305 }
306
307 size => size as usize,
308 };
309
310 if buf.remaining() < size {
311 return Ok(None);
312 }
313
314 let atom = buf.copy_to_bytes(size);
315
316 Ok(Some(atom))
317}
318
319struct Track {
320 track: SubgroupsWriter,
322
323 current: Option<SubgroupWriter>,
325
326 timescale: u64,
328
329 handler: TrackType,
331}
332
333impl Track {
334 fn new(track: TrackWriter, handler: TrackType, timescale: u64) -> Self {
335 Self {
336 track: track.subgroups().unwrap(),
337 current: None,
338 timescale,
339 handler,
340 }
341 }
342
343 pub fn header(&mut self, raw: Bytes, fragment: Fragment) -> anyhow::Result<()> {
344 if let Some(current) = self.current.as_mut() {
345 current.write(raw)?;
347 return Ok(());
348 }
349
350 let _timestamp: u32 = fragment
353 .timestamp(self.timescale)
354 .as_millis()
355 .try_into()
356 .context("timestamp too large")?;
357 let priority: u8 = 127;
361
362 let mut segment = self.track.append(priority)?;
364
365 println!(
366 "timestamp: {:?} segment: {:?}:{:?} priority: {:?}",
367 fragment.timestamp, segment.info.group_id, segment.info.subgroup_id, priority
368 );
369
370 segment.write(raw)?;
372
373 self.current = Some(segment);
375
376 Ok(())
377 }
378
379 pub fn data(&mut self, raw: Bytes) -> anyhow::Result<()> {
380 let segment = self.current.as_mut().context("missing current fragment")?;
381 segment.write(raw)?;
382
383 Ok(())
384 }
385
386 pub fn end_group(&mut self) {
387 self.current = None;
388 }
389}
390
391struct Fragment {
392 track: u32,
394
395 timestamp: u64,
397
398 keyframe: bool,
400}
401
402impl Fragment {
403 fn new(moof: mp4::MoofBox) -> anyhow::Result<Self> {
404 anyhow::ensure!(moof.trafs.len() == 1, "multiple tracks per moof atom");
406 let track = moof.trafs[0].tfhd.track_id;
407
408 let timestamp = sample_timestamp(&moof).expect("couldn't find timestamp");
410
411 let keyframe = sample_keyframe(&moof);
413
414 Ok(Self {
415 track,
416 timestamp,
417 keyframe,
418 })
419 }
420
421 fn timestamp(&self, timescale: u64) -> time::Duration {
423 time::Duration::from_millis(1000 * self.timestamp / timescale)
424 }
425}
426
427fn sample_timestamp(moof: &mp4::MoofBox) -> Option<u64> {
428 Some(moof.trafs.first()?.tfdt.as_ref()?.base_media_decode_time)
429}
430
431fn sample_keyframe(moof: &mp4::MoofBox) -> bool {
432 for traf in &moof.trafs {
433 let default_flags = traf.tfhd.default_sample_flags.unwrap_or_default();
435 let trun = match &traf.trun {
436 Some(t) => t,
437 None => return false,
438 };
439
440 for i in 0..trun.sample_count {
441 let mut flags = match trun.sample_flags.get(i as usize) {
442 Some(f) => *f,
443 None => default_flags,
444 };
445
446 if i == 0 {
447 if let Some(first_flags) = trun.first_sample_flags {
448 flags = first_flags;
449 }
450 }
451
452 let keyframe = (flags >> 24) & 0x3 == 0x2; let non_sync = (flags >> 16) & 0x1 == 0x1; if keyframe && !non_sync {
457 return true;
458 }
459 }
460 }
461
462 false
463}
464
465fn track_timescale(moov: &mp4::MoovBox, track_id: u32) -> u64 {
467 let trak = moov
468 .traks
469 .iter()
470 .find(|trak| trak.tkhd.track_id == track_id)
471 .expect("failed to find trak");
472
473 trak.mdia.mdhd.timescale as u64
474}