1use bytes::Bytes;
2use futures::Future;
3use std::collections::{BTreeSet, HashMap};
4use std::convert::TryInto;
5use std::iter::FromIterator;
6use std::ops::Range;
7use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, SeekFrom};
8
9use crate::{BlockReader, BoxHeader, BoxType, EmsgBox, Error, FtypBox, MoofBox, MoovBox};
10use crate::{Mp4Track, HEADER_SIZE};
11
12const MAX_MEM_MDAT_SIZE: u64 = 128 * 1024 * 1024; pub trait DataStorage {
15 type Error;
16 type Id;
17
18 fn save_data(
19 &mut self,
20 reader: &mut (impl AsyncRead + Unpin),
21 ) -> impl Future<Output = Result<Self::Id, Self::Error>>;
22
23 fn read_data(
24 &self,
25 id: &Self::Id,
26 range: Range<u64>,
27 ) -> impl Future<Output = Result<Bytes, Self::Error>>;
28}
29
30#[derive(Default)]
31pub struct MemoryStorage {
32 pub data: Vec<Bytes>,
33}
34
35impl DataStorage for MemoryStorage {
36 type Error = Error;
37 type Id = usize;
38
39 #[inline]
40 async fn save_data(
41 &mut self,
42 reader: &mut (impl AsyncRead + Unpin),
43 ) -> Result<Self::Id, Self::Error> {
44 let mut buffer = Vec::new();
45 let index = self.data.len();
46 tokio::io::copy(reader, &mut buffer).await?;
47 self.data.push(buffer.into());
48
49 Ok(index)
50 }
51
52 #[inline]
53 async fn read_data(&self, id: &Self::Id, range: Range<u64>) -> Result<Bytes, Self::Error> {
54 let buff = self.data.get(*id).ok_or(Error::DataBufferNotFound(*id))?;
55
56 Ok(buff.slice(range.start as usize..range.end as usize))
57 }
58}
59
60enum DataBlockBody {
61 Memory(Bytes),
62 Reader,
63}
64
65pub struct DataBlock {
66 _kind: BoxType,
67 offset: u64,
68 size: u64,
69 buffer: DataBlockBody,
70}
71
72pub trait ReadSampleFormat: Default {
73 fn format(&self, data: &mut [u8]) -> Result<(), Error>;
74}
75
76#[derive(Default)]
77pub struct Annexb {}
78
79impl ReadSampleFormat for Annexb {
80 fn format(&self, data: &mut [u8]) -> Result<(), Error> {
81 let mut i = 0;
92 while i < data.len() - 3 {
93 let bytes = &mut data[i..i + 4];
95 let nalu_length = u32::from_be_bytes(bytes.try_into().unwrap()) as usize;
96 bytes.copy_from_slice(&[0, 0, 0, 1]);
97
98 i += 4 + nalu_length;
99
100 if i > data.len() {
101 return Err(Error::NaluLengthDelimetedRedFail);
102 }
103 }
104
105 if i < data.len() {
106 return Err(Error::NaluLengthDelimetedRedFail);
107 }
108
109 Ok(())
110 }
111}
112
113#[derive(Default)]
114pub struct LengthDelimited {}
115
116impl ReadSampleFormat for LengthDelimited {
117 fn format(&self, _data: &mut [u8]) -> Result<(), Error> {
118 Ok(())
119 }
120}
121
122pub struct Mp4File<R, F = Annexb>
123where
124 R: AsyncRead + AsyncSeek + Unpin,
125 F: ReadSampleFormat,
126{
127 pub ftyp: Option<FtypBox>,
128 pub emsgs: Vec<EmsgBox>,
129 pub tracks: HashMap<u32, Mp4Track>,
130 pub reader: R,
131 pub offsets: BTreeSet<u64>,
132 pub data_blocks: Vec<DataBlock>,
133 format_conv: F,
134}
135
136impl<R> Mp4File<R>
137where
138 R: AsyncRead + Unpin + AsyncSeek,
139{
140 pub fn new_annexb(reader: R) -> Self {
141 Self {
142 ftyp: None,
143 emsgs: Vec::new(),
144 tracks: HashMap::new(),
145 reader,
146 offsets: BTreeSet::new(),
147 data_blocks: Vec::new(),
148 format_conv: Default::default(),
149 }
150 }
151}
152
153impl<R> Mp4File<R, LengthDelimited>
154where
155 R: AsyncRead + Unpin + AsyncSeek,
156{
157 pub fn new(reader: R) -> Self {
158 Self {
159 ftyp: None,
160 emsgs: Vec::new(),
161 tracks: HashMap::new(),
162 reader,
163 offsets: BTreeSet::new(),
164 data_blocks: Vec::new(),
165 format_conv: Default::default(),
166 }
167 }
168}
169
170impl<R, F> Mp4File<R, F>
171where
172 R: AsyncRead + Unpin + AsyncSeek,
173 F: ReadSampleFormat,
174{
175 pub async fn read_header(&mut self) -> Result<bool, Error> {
176 let mut buff = Vec::with_capacity(8192);
177 let mut got_moov = false;
178 let mut offset = 0u64;
179
180 while let Some(BoxHeader { kind, size: mut s }) =
181 BoxHeader::read(&mut self.reader, &mut offset).await?
182 {
183 if s >= HEADER_SIZE {
184 s -= HEADER_SIZE; }
186 match kind {
187 BoxType::FtypBox => {
188 log::debug!("ftyp");
189
190 if buff.len() < s as usize {
191 buff.resize(s as usize, 0);
192 }
193 self.reader.read_exact(&mut buff[0..s as usize]).await?;
194 offset += s;
195
196 self.ftyp = Some(FtypBox::read_block(&mut &buff[0..s as usize])?);
197 }
198
199 BoxType::MoovBox => {
200 log::debug!("moov");
201
202 if buff.len() < s as usize {
203 buff.resize(s as usize, 0);
204 }
205
206 self.reader.read_exact(&mut buff[0..s as usize]).await?;
207 offset += s;
208
209 got_moov = true;
210 self.set_moov(MoovBox::read_block(&mut &buff[0..s as usize])?)?;
211 }
212
213 BoxType::MoofBox => {
214 log::debug!("moof");
215
216 if buff.len() < s as usize {
217 buff.resize(s as usize, 0);
218 }
219
220 let begin_offset = offset;
221 self.reader.read_exact(&mut buff[0..s as usize]).await?;
222 offset += s;
223
224 self.add_moof(
225 begin_offset,
226 MoofBox::read_block(&mut &buff[0..s as usize])?,
227 )?;
228 }
229
230 BoxType::EmsgBox => {
231 log::debug!("emsg");
232
233 if buff.len() < s as usize {
234 buff.resize(s as usize, 0);
235 }
236
237 self.reader.read_exact(&mut buff[0..s as usize]).await?;
238 offset += s;
239
240 self.emsgs
241 .push(EmsgBox::read_block(&mut &buff[0..s as usize])?);
242 }
243
244 BoxType::MdatBox => {
245 log::debug!("mdat");
246 self.save_box(BoxType::MdatBox, s, offset).await?;
247 offset += s;
248 }
249
250 bt => {
251 log::debug!("{}", bt);
252
253 self.skip_box(bt, s).await?;
254 offset += s;
255 }
256 }
257 }
258
259 Ok(got_moov)
260 }
261
262 async fn skip_box(&mut self, bt: BoxType, size: u64) -> Result<(), Error> {
263 log::debug!("skip {:?}", bt);
264 self.reader.seek(SeekFrom::Current(size as _)).await?;
265 Ok(())
266 }
267
268 async fn save_box(&mut self, kind: BoxType, size: u64, offset: u64) -> Result<(), Error> {
269 log::debug!("data_block {:?} {} - {}", kind, offset, offset + size);
270 let reader = &mut self.reader;
271
272 if size < MAX_MEM_MDAT_SIZE {
273 let mut buffer = Vec::new();
274 tokio::io::copy(&mut reader.take(size), &mut buffer).await?;
275 self.data_blocks.push(DataBlock {
276 _kind: kind,
277 offset,
278 size,
279 buffer: DataBlockBody::Memory(buffer.into()),
280 });
281 } else {
282 self.skip_box(kind, size).await?;
283
284 self.data_blocks.push(DataBlock {
285 _kind: kind,
286 offset,
287 size,
288 buffer: DataBlockBody::Reader,
289 });
290 }
291
292 Ok(())
293 }
294
295 fn set_moov(&mut self, moov: MoovBox) -> Result<(), Error> {
296 for trak in moov.traks {
297 self.tracks
298 .insert(trak.tkhd.track_id, Mp4Track::new(trak, &mut self.offsets)?);
299 }
300
301 Ok(())
302 }
303
304 fn add_moof(&mut self, offset: u64, moof: MoofBox) -> Result<(), Error> {
305 for traf in moof.trafs {
306 let track_id = traf.tfhd.track_id;
307
308 if let Some(track) = self.tracks.get_mut(&track_id) {
309 track.add_traf(offset, moof.mfhd.sequence_number, traf, &mut self.offsets)
310 } else {
311 return Err(Error::TrakNotFound(track_id));
312 }
313 }
314
315 Ok(())
316 }
317
318 #[inline]
319 pub async fn read_sample_data(
320 &mut self,
321 track_id: u32,
322 sample_idx: usize,
323 ) -> Result<Option<Bytes>, Error> {
324 let Some(track) = self.tracks.get(&track_id) else {
325 return Ok(None);
326 };
327
328 let Some(sample) = track.samples.get(sample_idx) else {
329 return Ok(None);
330 };
331
332 for block in &self.data_blocks {
333 let range = block.offset..block.offset + block.size;
334
335 if range.contains(&sample.offset) {
336 return Ok(Some(match &block.buffer {
337 DataBlockBody::Memory(mem) => {
338 let offset = sample.offset - block.offset;
339 let mut slice = mem
340 .slice(offset as usize..offset as usize + sample.size as usize)
341 .to_vec();
342
343 self.format_conv.format(&mut slice).unwrap();
344 Bytes::from(slice)
345 }
346
347 DataBlockBody::Reader => {
348 let mut buff = vec![0u8; sample.size as _];
349 self.reader.seek(SeekFrom::Start(sample.offset)).await?;
350 self.reader.read_exact(&mut buff).await?;
351 self.format_conv.format(&mut buff).unwrap();
352 Bytes::from_iter(buff)
353 }
354 }));
355 }
356 }
357
358 Ok(None)
359 }
360}
361
362