1use anyhow::{self, Context};
6use bytes::{Buf, BufMut, Bytes, BytesMut};
7use moq_transport::serve::{SubgroupWriter, SubgroupsWriter, TrackWriter, TracksWriter};
8use mp4::{self, ReadBox, TrackType};
9use std::cmp::max;
10use std::collections::HashMap;
11use std::io::Cursor;
12use std::time;
13
14pub struct Media {
15 tracks: HashMap<u32, Track>,
17
18 broadcast: TracksWriter,
20
21 init: SubgroupsWriter,
23 catalog: SubgroupsWriter,
24
25 ftyp: Option<Bytes>,
27 moov: Option<mp4::MoovBox>,
28
29 current: Option<u32>,
31}
32
33impl Media {
34 pub fn new(mut broadcast: TracksWriter) -> anyhow::Result<Self> {
35 let catalog = broadcast
36 .create(".catalog")
37 .context("broadcast closed")?
38 .subgroups()?;
39 let init = broadcast
40 .create("0.mp4")
41 .context("broadcast closed")?
42 .subgroups()?;
43
44 Ok(Media {
45 tracks: Default::default(),
46 broadcast,
47 catalog,
48 init,
49 ftyp: None,
50 moov: None,
51 current: None,
52 })
53 }
54
55 pub fn reset(&mut self) {
56 for track in self.tracks.values_mut() {
57 track.end_group();
58 }
59 }
60
61 pub fn parse<B: Buf>(&mut self, buf: &mut B) -> anyhow::Result<()> {
64 while self.parse_atom(buf)? {}
65 Ok(())
66 }
67
68 fn parse_atom<B: Buf>(&mut self, buf: &mut B) -> anyhow::Result<bool> {
69 let atom = match next_atom(buf)? {
70 Some(atom) => atom,
71 None => return Ok(false),
72 };
73
74 let mut reader = Cursor::new(&atom);
75 let header = mp4::BoxHeader::read(&mut reader)?;
76
77 match header.name {
78 mp4::BoxType::FtypBox => {
79 if self.ftyp.is_some() {
80 tracing::debug!("multiple ftyp atoms");
81 return Ok(true);
82 }
83
84 self.ftyp = Some(atom)
86 }
87 mp4::BoxType::MoovBox => {
88 if self.moov.is_some() {
89 tracing::debug!("multiple moov atoms");
90 return Ok(true);
91 }
92
93 let moov = mp4::MoovBox::read_box(&mut reader, header.size)?;
95
96 self.setup(&moov, atom)?;
97 self.moov = Some(moov);
98 }
99 mp4::BoxType::MoofBox => {
100 let moof = mp4::MoofBox::read_box(&mut reader, header.size)?;
101
102 let fragment = Fragment::new(moof)?;
104
105 if fragment.keyframe {
106 if self
108 .tracks
109 .get(&fragment.track)
110 .context("failed to find track")?
111 .handler
112 == TrackType::Video
113 {
114 for track in self.tracks.values_mut() {
116 track.end_group();
117 }
118 }
119 }
120
121 let track = self
123 .tracks
124 .get_mut(&fragment.track)
125 .context("failed to find track")?;
126
127 anyhow::ensure!(self.current.is_none(), "multiple moof atoms");
129 self.current.replace(fragment.track);
130
131 track
133 .header(atom, fragment)
134 .context("failed to publish moof")?;
135 }
136 mp4::BoxType::MdatBox => {
137 let track = self.current.take().context("missing moof")?;
139 let track = self
140 .tracks
141 .get_mut(&track)
142 .context("failed to find track")?;
143
144 track.data(atom).context("failed to publish mdat")?;
146 }
147
148 _ => {
149 }
151 }
152
153 Ok(true)
154 }
155
156 fn setup(&mut self, moov: &mp4::MoovBox, raw: Bytes) -> anyhow::Result<()> {
157 let mut init = self.ftyp.clone().context("missing ftyp")?.to_vec();
159 init.extend_from_slice(&raw);
160
161 self.init.append(0)?.write(init.into())?;
163
164 let mut tracks = Vec::new();
165
166 for trak in &moov.traks {
168 let id = trak.tkhd.track_id;
169 let name = format!("{id}.m4s");
170
171 let timescale = track_timescale(moov, id);
172 let handler = (&trak.mdia.hdlr.handler_type).try_into()?;
173
174 let mut selection_params = moq_catalog::SelectionParam::default();
175
176 let mut track = moq_catalog::Track {
177 init_track: Some(self.init.name.clone()),
178 name: name.clone(),
179 namespace: Some(self.broadcast.namespace.to_utf8_path()),
180 packaging: Some(moq_catalog::TrackPackaging::Cmaf),
181 render_group: Some(1),
182 ..Default::default()
183 };
184
185 let stsd = &trak.mdia.minf.stbl.stsd;
186
187 if let Some(avc1) = &stsd.avc1 {
188 let profile = avc1.avcc.avc_profile_indication;
194 let constraints = avc1.avcc.profile_compatibility; let level = avc1.avcc.avc_level_indication;
196
197 let width = avc1.width;
198 let height = avc1.height;
199
200 let codec = rfc6381_codec::Codec::avc1(profile, constraints, level);
201 let codec_str = codec.to_string();
202
203 selection_params.codec = Some(codec_str);
204 selection_params.width = Some(width.into());
205 selection_params.height = Some(height.into());
206 } else if let Some(_hev1) = &stsd.hev1 {
207 anyhow::bail!("HEVC not yet supported")
209 } else if let Some(mp4a) = &stsd.mp4a {
210 let desc = &mp4a
211 .esds
212 .as_ref()
213 .context("missing esds box for MP4a")?
214 .es_desc
215 .dec_config;
216 let codec_str = format!(
217 "mp4a.{:02x}.{}",
218 desc.object_type_indication, desc.dec_specific.profile
219 );
220
221 selection_params.codec = Some(codec_str);
222 selection_params.channel_config = Some(mp4a.channelcount.to_string());
223 selection_params.samplerate = Some(mp4a.samplerate.value().into());
224
225 let bitrate = max(desc.max_bitrate, desc.avg_bitrate);
226 if bitrate > 0 {
227 selection_params.bitrate = Some(bitrate);
228 }
229 } else if let Some(vp09) = &stsd.vp09 {
230 let vpcc = &vp09.vpcc;
232 let codec_str = format!(
233 "vp09.0.{:02x}.{:02x}.{:02x}",
234 vpcc.profile, vpcc.level, vpcc.bit_depth
235 );
236
237 selection_params.codec = Some(codec_str);
238 selection_params.width = Some(vp09.width.into());
239 selection_params.height = Some(vp09.height.into());
240
241 anyhow::bail!("VP9 not yet supported")
243 } else {
244 anyhow::bail!("unknown codec for track: {}", trak.tkhd.track_id);
246 }
247
248 track.selection_params = selection_params;
249
250 tracks.push(track);
251
252 let track = self.broadcast.create(&name).context("broadcast closed")?;
254 let track = Track::new(track, handler, timescale);
255 self.tracks.insert(id, track);
256 }
257
258 let catalog = moq_catalog::Root {
259 version: 1,
260 streaming_format: 1,
261 streaming_format_version: "0.2".to_string(),
262 streaming_delta_updates: true,
263 common_track_fields: moq_catalog::CommonTrackFields::from_tracks(&mut tracks),
264 tracks,
265 };
266
267 let catalog_str = serde_json::to_string_pretty(&catalog)?;
268
269 tracing::info!("catalog: {}", catalog_str);
270
271 self.catalog.append(0)?.write(catalog_str.into())?;
273
274 Ok(())
275 }
276}
277
278fn next_atom<B: Buf>(buf: &mut B) -> anyhow::Result<Option<Bytes>> {
281 let mut peek = Cursor::new(buf.chunk());
282
283 if peek.remaining() < 8 {
284 if buf.remaining() != buf.chunk().len() {
285 anyhow::bail!("TODO: vectored Buf not yet supported");
287 }
288
289 return Ok(None);
290 }
291
292 let size = peek.get_u32();
294 let _type = peek.get_u32();
295
296 let size = match size {
297 0 => anyhow::bail!("TODO: unsupported EOF atom"),
299
300 1 => {
302 let size_ext = peek.get_u64();
303 anyhow::ensure!(size_ext >= 16, "impossible extended box size: {}", size_ext);
304 size_ext as usize
305 }
306
307 2..=7 => {
308 anyhow::bail!("impossible box size: {}", size)
309 }
310
311 size => size as usize,
312 };
313
314 if buf.remaining() < size {
315 return Ok(None);
316 }
317
318 let atom = buf.copy_to_bytes(size);
319
320 Ok(Some(atom))
321}
322
323struct Track {
324 track: SubgroupsWriter,
326
327 current: Option<SubgroupWriter>,
329
330 pending_moof: Option<Bytes>,
334
335 timescale: u64,
337
338 handler: TrackType,
340}
341
342impl Track {
343 fn new(track: TrackWriter, handler: TrackType, timescale: u64) -> Self {
344 Self {
345 track: track.subgroups().unwrap(),
346 current: None,
347 pending_moof: None,
348 timescale,
349 handler,
350 }
351 }
352
353 pub fn header(&mut self, raw: Bytes, fragment: Fragment) -> anyhow::Result<()> {
354 if self.current.is_some() {
355 debug_assert!(self.pending_moof.is_none(), "overwriting pending moof");
357 self.pending_moof = Some(raw);
358 return Ok(());
359 }
360
361 let _timestamp: u32 = fragment
364 .timestamp(self.timescale)
365 .as_millis()
366 .try_into()
367 .context("timestamp too large")?;
368 let priority: u8 = 127;
372
373 let segment = self.track.append(priority)?;
375
376 println!(
377 "timestamp: {:?} segment: {:?}:{:?} priority: {:?}",
378 fragment.timestamp, segment.info.group_id, segment.info.subgroup_id, priority
379 );
380
381 self.pending_moof = Some(raw);
383
384 self.current = Some(segment);
386
387 Ok(())
388 }
389
390 pub fn data(&mut self, raw: Bytes) -> anyhow::Result<()> {
391 let moof = self.pending_moof.take().context("missing pending moof")?;
392 let segment = self.current.as_mut().context("missing current fragment")?;
393 let mut combined = BytesMut::with_capacity(moof.len() + raw.len());
395 combined.put_slice(&moof);
396 combined.put_slice(&raw);
397 segment.write(combined.freeze())?;
398
399 Ok(())
400 }
401
402 pub fn end_group(&mut self) {
403 self.current = None;
404 self.pending_moof = None;
405 }
406}
407
408struct Fragment {
409 track: u32,
411
412 timestamp: u64,
414
415 keyframe: bool,
417}
418
419impl Fragment {
420 fn new(moof: mp4::MoofBox) -> anyhow::Result<Self> {
421 anyhow::ensure!(moof.trafs.len() == 1, "multiple tracks per moof atom");
423 let track = moof.trafs[0].tfhd.track_id;
424
425 let timestamp = sample_timestamp(&moof).expect("couldn't find timestamp");
427
428 let keyframe = sample_keyframe(&moof);
430
431 Ok(Self {
432 track,
433 timestamp,
434 keyframe,
435 })
436 }
437
438 fn timestamp(&self, timescale: u64) -> time::Duration {
440 time::Duration::from_millis(1000 * self.timestamp / timescale)
441 }
442}
443
444fn sample_timestamp(moof: &mp4::MoofBox) -> Option<u64> {
445 Some(moof.trafs.first()?.tfdt.as_ref()?.base_media_decode_time)
446}
447
448fn sample_keyframe(moof: &mp4::MoofBox) -> bool {
449 for traf in &moof.trafs {
450 let default_flags = traf.tfhd.default_sample_flags.unwrap_or_default();
452 let trun = match &traf.trun {
453 Some(t) => t,
454 None => return false,
455 };
456
457 for i in 0..trun.sample_count {
458 let mut flags = match trun.sample_flags.get(i as usize) {
459 Some(f) => *f,
460 None => default_flags,
461 };
462
463 if i == 0 {
464 if let Some(first_flags) = trun.first_sample_flags {
465 flags = first_flags;
466 }
467 }
468
469 let keyframe = (flags >> 24) & 0x3 == 0x2; let non_sync = (flags >> 16) & 0x1 == 0x1; if keyframe && !non_sync {
474 return true;
475 }
476 }
477 }
478
479 false
480}
481
482fn track_timescale(moov: &mp4::MoovBox, track_id: u32) -> u64 {
484 let trak = moov
485 .traks
486 .iter()
487 .find(|trak| trak.tkhd.track_id == track_id)
488 .expect("failed to find trak");
489
490 trak.mdia.mdhd.timescale as u64
491}