flowly_mp4/
file.rs

1use bytes::Bytes;
2use futures::Future;
3use std::collections::{BTreeSet, HashMap};
4use std::convert::TryInto;
5use std::iter::FromIterator;
6use std::ops::Range;
7use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, SeekFrom};
8
9use crate::{BlockReader, BoxHeader, BoxType, EmsgBox, Error, FtypBox, MoofBox, MoovBox};
10use crate::{Mp4Track, HEADER_SIZE};
11
12const MAX_MEM_MDAT_SIZE: u64 = 128 * 1024 * 1024; // 128mb
13
14pub trait DataStorage {
15    type Error;
16    type Id;
17
18    fn save_data(
19        &mut self,
20        reader: &mut (impl AsyncRead + Unpin),
21    ) -> impl Future<Output = Result<Self::Id, Self::Error>>;
22
23    fn read_data(
24        &self,
25        id: &Self::Id,
26        range: Range<u64>,
27    ) -> impl Future<Output = Result<Bytes, Self::Error>>;
28}
29
30#[derive(Default)]
31pub struct MemoryStorage {
32    pub data: Vec<Bytes>,
33}
34
35impl DataStorage for MemoryStorage {
36    type Error = Error;
37    type Id = usize;
38
39    #[inline]
40    async fn save_data(
41        &mut self,
42        reader: &mut (impl AsyncRead + Unpin),
43    ) -> Result<Self::Id, Self::Error> {
44        let mut buffer = Vec::new();
45        let index = self.data.len();
46        tokio::io::copy(reader, &mut buffer).await?;
47        self.data.push(buffer.into());
48
49        Ok(index)
50    }
51
52    #[inline]
53    async fn read_data(&self, id: &Self::Id, range: Range<u64>) -> Result<Bytes, Self::Error> {
54        let buff = self.data.get(*id).ok_or(Error::DataBufferNotFound(*id))?;
55
56        Ok(buff.slice(range.start as usize..range.end as usize))
57    }
58}
59
60enum DataBlockBody {
61    Memory(Bytes),
62    Reader,
63}
64
65pub struct DataBlock {
66    _kind: BoxType,
67    offset: u64,
68    size: u64,
69    buffer: DataBlockBody,
70}
71
72pub trait ReadSampleFormat: Default {
73    fn format(&self, data: &mut [u8]) -> Result<(), Error>;
74}
75
76#[derive(Default)]
77pub struct Annexb {}
78
79impl ReadSampleFormat for Annexb {
80    fn format(&self, data: &mut [u8]) -> Result<(), Error> {
81        // TODO:
82        // * For each IDR frame, copy the SPS and PPS from the stream's
83        //   parameters, rather than depend on it being present in the frame
84        //   already. In-band parameters aren't guaranteed. This is awkward
85        //   with h264_reader v0.5's h264_reader::avcc::AvcDecoderRecord because it
86        //   strips off the NAL header byte from each parameter. The next major
87        //   version shouldn't do this.
88        // * Copy only the slice data. In particular, don't copy SEI, which confuses
89        //   Safari: <https://github.com/scottlamb/retina/issues/60#issuecomment-1178369955>
90
91        let mut i = 0;
92        while i < data.len() - 3 {
93            // Replace each NAL's length with the Annex B start code b"\x00\x00\x00\x01".
94            let bytes = &mut data[i..i + 4];
95            let nalu_length = u32::from_be_bytes(bytes.try_into().unwrap()) as usize;
96            bytes.copy_from_slice(&[0, 0, 0, 1]);
97
98            i += 4 + nalu_length;
99
100            if i > data.len() {
101                return Err(Error::NaluLengthDelimetedRedFail);
102            }
103        }
104
105        if i < data.len() {
106            return Err(Error::NaluLengthDelimetedRedFail);
107        }
108
109        Ok(())
110    }
111}
112
113#[derive(Default)]
114pub struct LengthDelimited {}
115
116impl ReadSampleFormat for LengthDelimited {
117    fn format(&self, _data: &mut [u8]) -> Result<(), Error> {
118        Ok(())
119    }
120}
121
122pub struct Mp4File<R, F = Annexb>
123where
124    R: AsyncRead + AsyncSeek + Unpin,
125    F: ReadSampleFormat,
126{
127    pub ftyp: Option<FtypBox>,
128    pub emsgs: Vec<EmsgBox>,
129    pub tracks: HashMap<u32, Mp4Track>,
130    pub reader: R,
131    pub offsets: BTreeSet<u64>,
132    pub data_blocks: Vec<DataBlock>,
133    format_conv: F,
134}
135
136impl<R> Mp4File<R>
137where
138    R: AsyncRead + Unpin + AsyncSeek,
139{
140    pub fn new_annexb(reader: R) -> Self {
141        Self {
142            ftyp: None,
143            emsgs: Vec::new(),
144            tracks: HashMap::new(),
145            reader,
146            offsets: BTreeSet::new(),
147            data_blocks: Vec::new(),
148            format_conv: Default::default(),
149        }
150    }
151}
152
153impl<R> Mp4File<R, LengthDelimited>
154where
155    R: AsyncRead + Unpin + AsyncSeek,
156{
157    pub fn new(reader: R) -> Self {
158        Self {
159            ftyp: None,
160            emsgs: Vec::new(),
161            tracks: HashMap::new(),
162            reader,
163            offsets: BTreeSet::new(),
164            data_blocks: Vec::new(),
165            format_conv: Default::default(),
166        }
167    }
168}
169
170impl<R, F> Mp4File<R, F>
171where
172    R: AsyncRead + Unpin + AsyncSeek,
173    F: ReadSampleFormat,
174{
175    pub async fn read_header(&mut self) -> Result<bool, Error> {
176        let mut buff = Vec::with_capacity(8192);
177        let mut got_moov = false;
178        let mut offset = 0u64;
179
180        while let Some(BoxHeader { kind, size: mut s }) =
181            BoxHeader::read(&mut self.reader, &mut offset).await?
182        {
183            if s >= HEADER_SIZE {
184                s -= HEADER_SIZE; // size without header
185            }
186            match kind {
187                BoxType::FtypBox => {
188                    log::debug!("ftyp");
189
190                    if buff.len() < s as usize {
191                        buff.resize(s as usize, 0);
192                    }
193                    self.reader.read_exact(&mut buff[0..s as usize]).await?;
194                    offset += s;
195
196                    self.ftyp = Some(FtypBox::read_block(&mut &buff[0..s as usize])?);
197                }
198
199                BoxType::MoovBox => {
200                    log::debug!("moov");
201
202                    if buff.len() < s as usize {
203                        buff.resize(s as usize, 0);
204                    }
205
206                    self.reader.read_exact(&mut buff[0..s as usize]).await?;
207                    offset += s;
208
209                    got_moov = true;
210                    self.set_moov(MoovBox::read_block(&mut &buff[0..s as usize])?)?;
211                }
212
213                BoxType::MoofBox => {
214                    log::debug!("moof");
215
216                    if buff.len() < s as usize {
217                        buff.resize(s as usize, 0);
218                    }
219
220                    let begin_offset = offset;
221                    self.reader.read_exact(&mut buff[0..s as usize]).await?;
222                    offset += s;
223
224                    self.add_moof(
225                        begin_offset,
226                        MoofBox::read_block(&mut &buff[0..s as usize])?,
227                    )?;
228                }
229
230                BoxType::EmsgBox => {
231                    log::debug!("emsg");
232
233                    if buff.len() < s as usize {
234                        buff.resize(s as usize, 0);
235                    }
236
237                    self.reader.read_exact(&mut buff[0..s as usize]).await?;
238                    offset += s;
239
240                    self.emsgs
241                        .push(EmsgBox::read_block(&mut &buff[0..s as usize])?);
242                }
243
244                BoxType::MdatBox => {
245                    log::debug!("mdat");
246                    self.save_box(BoxType::MdatBox, s, offset).await?;
247                    offset += s;
248                }
249
250                bt => {
251                    log::debug!("{}", bt);
252
253                    self.skip_box(bt, s).await?;
254                    offset += s;
255                }
256            }
257        }
258
259        Ok(got_moov)
260    }
261
262    async fn skip_box(&mut self, bt: BoxType, size: u64) -> Result<(), Error> {
263        log::debug!("skip {:?}", bt);
264        self.reader.seek(SeekFrom::Current(size as _)).await?;
265        Ok(())
266    }
267
268    async fn save_box(&mut self, kind: BoxType, size: u64, offset: u64) -> Result<(), Error> {
269        log::debug!("data_block {:?} {} - {}", kind, offset, offset + size);
270        let reader = &mut self.reader;
271
272        if size < MAX_MEM_MDAT_SIZE {
273            let mut buffer = Vec::new();
274            tokio::io::copy(&mut reader.take(size), &mut buffer).await?;
275            self.data_blocks.push(DataBlock {
276                _kind: kind,
277                offset,
278                size,
279                buffer: DataBlockBody::Memory(buffer.into()),
280            });
281        } else {
282            self.skip_box(kind, size).await?;
283
284            self.data_blocks.push(DataBlock {
285                _kind: kind,
286                offset,
287                size,
288                buffer: DataBlockBody::Reader,
289            });
290        }
291
292        Ok(())
293    }
294
295    fn set_moov(&mut self, moov: MoovBox) -> Result<(), Error> {
296        for trak in moov.traks {
297            self.tracks
298                .insert(trak.tkhd.track_id, Mp4Track::new(trak, &mut self.offsets)?);
299        }
300
301        Ok(())
302    }
303
304    fn add_moof(&mut self, offset: u64, moof: MoofBox) -> Result<(), Error> {
305        for traf in moof.trafs {
306            let track_id = traf.tfhd.track_id;
307
308            if let Some(track) = self.tracks.get_mut(&track_id) {
309                track.add_traf(offset, moof.mfhd.sequence_number, traf, &mut self.offsets)
310            } else {
311                return Err(Error::TrakNotFound(track_id));
312            }
313        }
314
315        Ok(())
316    }
317
318    #[inline]
319    pub async fn read_sample_data(
320        &mut self,
321        track_id: u32,
322        sample_idx: usize,
323    ) -> Result<Option<Bytes>, Error> {
324        let Some(track) = self.tracks.get(&track_id) else {
325            return Ok(None);
326        };
327
328        let Some(sample) = track.samples.get(sample_idx) else {
329            return Ok(None);
330        };
331
332        for block in &self.data_blocks {
333            let range = block.offset..block.offset + block.size;
334
335            if range.contains(&sample.offset) {
336                return Ok(Some(match &block.buffer {
337                    DataBlockBody::Memory(mem) => {
338                        let offset = sample.offset - block.offset;
339                        let mut slice = mem
340                            .slice(offset as usize..offset as usize + sample.size as usize)
341                            .to_vec();
342
343                        self.format_conv.format(&mut slice).unwrap();
344                        Bytes::from(slice)
345                    }
346
347                    DataBlockBody::Reader => {
348                        let mut buff = vec![0u8; sample.size as _];
349                        self.reader.seek(SeekFrom::Start(sample.offset)).await?;
350                        self.reader.read_exact(&mut buff).await?;
351                        self.format_conv.format(&mut buff).unwrap();
352                        Bytes::from_iter(buff)
353                    }
354                }));
355            }
356        }
357
358        Ok(None)
359    }
360}
361
362// #[derive(Debug, Clone)]
363// pub struct Mp4Demuxer {
364//     annexb: bool,
365// }
366
367// impl Mp4Demuxer {
368//     pub fn new(annexb: bool) -> Self {
369//         Self { annexb }
370//     }
371// }
372
373// impl<F: DataFrame> Service<F> for Mp4Demuxer {
374//     type Out = Result<Mp4Frame<F::Source>, Error>;
375
376//     fn handle(
377//         &mut self,
378//         input: F,
379//         cx: &flowly::Context,
380//     ) -> impl futures::Stream<Item = Self::Out> + Send {
381//         async_stream::stream! {}
382//     }
383// }