1use std::{
2 cmp::{max, min},
3 fmt::Debug,
4 io::{self},
5};
6
7#[cfg(feature = "tokio-fs")]
8use std::path::Path;
9#[cfg(feature = "tokio-fs")]
10use tokio::fs::File;
11use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek};
12
13use crate::{
14 error::{ParsedError, ParsingErrorState},
15 parser::{
16 clear_and_skip_decide, parse_loop_step, Buf, LoopAction, ParsingState, SkipPlan,
17 MAX_PARSE_BUF_SIZE, MIN_GROW_SIZE,
18 },
19};
20
21const HEADER_PARSE_BUF_SIZE: usize = 128;
23
24pub(crate) type AsyncSkipBySeekFn<R> = for<'a> fn(
29 &'a mut R,
30 u64,
31) -> std::pin::Pin<
32 Box<dyn std::future::Future<Output = io::Result<bool>> + Send + 'a>,
33>;
34
35pub struct AsyncMediaSource<R> {
36 pub(crate) reader: R,
37 pub(crate) buf: Vec<u8>,
38 pub(crate) mime: crate::file::MediaMime,
39 pub(crate) skip_by_seek: AsyncSkipBySeekFn<R>,
40 pub(crate) memory: Option<bytes::Bytes>,
45}
46
47impl<R> Debug for AsyncMediaSource<R> {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 f.debug_struct("AsyncMediaSource")
50 .field("mime", &self.mime)
51 .finish_non_exhaustive()
52 }
53}
54
55impl<R: AsyncRead + Unpin> AsyncMediaSource<R> {
56 async fn build(mut reader: R, skip_by_seek: AsyncSkipBySeekFn<R>) -> crate::Result<Self> {
57 let mut buf = Vec::with_capacity(HEADER_PARSE_BUF_SIZE);
58 (&mut reader)
59 .take(HEADER_PARSE_BUF_SIZE as u64)
60 .read_to_end(&mut buf)
61 .await?;
62 let mime: crate::file::MediaMime = buf.as_slice().try_into()?;
63 Ok(Self {
64 reader,
65 buf,
66 mime,
67 skip_by_seek,
68 memory: None,
69 })
70 }
71
72 pub fn kind(&self) -> crate::MediaKind {
73 match self.mime {
74 crate::file::MediaMime::Image(_) => crate::MediaKind::Image,
75 crate::file::MediaMime::Track(_) => crate::MediaKind::Track,
76 }
77 }
78}
79
80impl AsyncMediaSource<tokio::io::Empty> {
81 pub fn from_memory(bytes: impl Into<bytes::Bytes>) -> crate::Result<Self> {
113 let bytes = bytes.into();
114 let head_end = bytes.len().min(HEADER_PARSE_BUF_SIZE);
115 let mime: crate::file::MediaMime = bytes[..head_end].try_into()?;
116 Ok(Self {
117 reader: tokio::io::empty(),
118 buf: Vec::new(),
119 mime,
120 skip_by_seek: |_, _| Box::pin(async move { Ok(false) }),
122 memory: Some(bytes),
123 })
124 }
125}
126
127fn make_seekable_skip<R: AsyncRead + AsyncSeek + Unpin + Send>() -> AsyncSkipBySeekFn<R> {
128 |r, n| {
129 Box::pin(async move {
130 use std::io::SeekFrom;
131 use tokio::io::AsyncSeekExt;
132 let signed: i64 = n
133 .try_into()
134 .map_err(|_| io::Error::from(io::ErrorKind::InvalidInput))?;
135 r.seek(SeekFrom::Current(signed)).await?;
136 Ok(true)
137 })
138 }
139}
140
141fn make_unseekable_skip<R: AsyncRead + Unpin + Send>() -> AsyncSkipBySeekFn<R> {
142 |_, _| Box::pin(async move { Ok(false) })
143}
144
145impl<R: AsyncRead + AsyncSeek + Unpin + Send> AsyncMediaSource<R> {
146 pub async fn seekable(reader: R) -> crate::Result<Self> {
147 Self::build(reader, make_seekable_skip::<R>()).await
148 }
149}
150
151impl<R: AsyncRead + Unpin + Send> AsyncMediaSource<R> {
152 pub async fn unseekable(reader: R) -> crate::Result<Self> {
153 Self::build(reader, make_unseekable_skip::<R>()).await
154 }
155}
156
157#[cfg(feature = "tokio-fs")]
158impl AsyncMediaSource<File> {
159 pub async fn open<P: AsRef<Path>>(path: P) -> crate::Result<Self> {
162 Self::seekable(File::open(path).await?).await
163 }
164}
165
166pub(crate) trait AsyncBufParser: Buf + Debug {
167 async fn fill_buf<R: AsyncRead + Unpin>(
168 &mut self,
169 reader: &mut R,
170 size: usize,
171 ) -> io::Result<usize>;
172
173 async fn load_and_parse<R: AsyncRead + Unpin, P, O>(
174 &mut self,
175 reader: &mut R,
176 skip_by_seek: AsyncSkipBySeekFn<R>,
177 parse: P,
178 ) -> Result<O, ParsedError>
179 where
180 P: Fn(&[u8], Option<ParsingState>) -> Result<O, ParsingErrorState>,
181 {
182 self.load_and_parse_with_offset(
183 reader,
184 skip_by_seek,
185 |data, _, state| parse(data, state),
186 0,
187 )
188 .await
189 }
190
191 #[tracing::instrument(skip_all)]
192 async fn load_and_parse_with_offset<R: AsyncRead + Unpin, P, O>(
193 &mut self,
194 reader: &mut R,
195 skip_by_seek: AsyncSkipBySeekFn<R>,
196 parse: P,
197 offset: usize,
198 ) -> Result<O, ParsedError>
199 where
200 P: Fn(&[u8], usize, Option<ParsingState>) -> Result<O, ParsingErrorState>,
201 {
202 if offset >= self.buffer().len() {
203 self.fill_buf(reader, MIN_GROW_SIZE).await?;
204 }
205
206 let mut parsing_state: Option<ParsingState> = None;
207 let mut parse = parse; loop {
209 match parse_loop_step(self.buffer(), offset, &mut parsing_state, &mut parse) {
210 LoopAction::Done(o) => return Ok(o),
211 LoopAction::NeedFill(needed) => {
212 let to_read = max(needed, MIN_GROW_SIZE);
213 let n = self.fill_buf(reader, to_read).await?;
214 if n == 0 {
215 return Err(ParsedError::NoEnoughBytes);
216 }
217 }
218 LoopAction::Skip(n) => {
219 self.clear_and_skip(reader, skip_by_seek, n).await?;
220 }
221 LoopAction::Failed { kind, message } => {
222 return Err(ParsedError::Failed { kind, message })
223 }
224 }
225 }
226 }
227
228 #[tracing::instrument(skip(reader, skip_by_seek))]
229 async fn clear_and_skip<R: AsyncRead + Unpin>(
230 &mut self,
231 reader: &mut R,
232 skip_by_seek: AsyncSkipBySeekFn<R>,
233 n: usize,
234 ) -> Result<(), ParsedError> {
235 match clear_and_skip_decide(self.buffer().len(), n) {
236 SkipPlan::AdvanceOnly => {
237 self.set_position(self.position() + n);
238 return Ok(());
239 }
240 SkipPlan::ClearAndSkip { extra: skip_n } => {
241 self.clear();
242 let done = (skip_by_seek)(
243 reader,
244 skip_n.try_into().map_err(|_| ParsedError::Failed {
245 kind: crate::error::MalformedKind::IsoBmffBox,
249 message: "skip too many bytes".into(),
250 })?,
251 )
252 .await?;
253 if !done {
254 let mut skipped = 0;
255 while skipped < skip_n {
256 let mut to_skip = skip_n - skipped;
257 to_skip = min(to_skip, MAX_PARSE_BUF_SIZE);
258 let n = self.fill_buf(reader, to_skip).await?;
259 skipped += n;
260 if skipped <= skip_n {
261 self.clear();
262 } else {
263 let remain = skipped - skip_n;
264 self.set_position(self.buffer().len() - remain);
265 break;
266 }
267 }
268 }
269
270 if self.buffer().is_empty() {
271 self.fill_buf(reader, MIN_GROW_SIZE).await?;
272 }
273 Ok(())
274 }
275 }
276 }
277}
278
279#[cfg(all(test, feature = "tokio-fs"))]
280mod tests {
281 use std::path::Path;
282
283 use super::*;
284 use crate::{ExifIter, TrackInfo};
285 use test_case::case;
286
287 enum TrackExif {
288 Track,
289 Exif,
290 NoData,
291 Invalid,
292 }
293 use tokio::fs::File;
294 use TrackExif::*;
295
296 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
297 #[case("3gp_640x360.3gp", Track)]
298 #[case("broken.jpg", Exif)]
299 #[case("compatible-brands-fail.heic", Invalid)]
300 #[case("compatible-brands-fail.mov", Invalid)]
301 #[case("compatible-brands.heic", NoData)]
302 #[case("compatible-brands.mov", NoData)]
303 #[case("embedded-in-heic.mov", Track)]
304 #[case("exif.heic", Exif)]
305 #[case("exif.jpg", Exif)]
306 #[case("meta.mov", Track)]
307 #[case("meta.mp4", Track)]
308 #[case("mka.mka", Track)]
309 #[case("mkv_640x360.mkv", Track)]
310 #[case("exif-one-entry.heic", Exif)]
311 #[case("no-exif.jpg", NoData)]
312 #[case("tif.tif", Exif)]
313 #[case("ramdisk.img", Invalid)]
314 #[case("webm_480.webm", Track)]
315 async fn parse_media(path: &str, te: TrackExif) {
316 use crate::MediaParser;
317 let mut parser = MediaParser::new();
318 let ms = AsyncMediaSource::open(Path::new("testdata").join(path)).await;
319 match te {
320 Track => {
321 let ms = ms.unwrap();
322 assert_eq!(ms.kind(), crate::MediaKind::Track);
323 let _: TrackInfo = parser.parse_track_async(ms).await.unwrap();
324 }
325 Exif => {
326 let ms = ms.unwrap();
327 assert_eq!(ms.kind(), crate::MediaKind::Image);
328 let mut it: ExifIter = parser.parse_exif_async(ms).await.unwrap();
329 let _ = it.parse_gps();
330
331 if path.contains("one-entry") {
332 assert!(it.next().is_some());
333 assert!(it.next().is_none());
334
335 let exif: crate::Exif = it.clone_rewound().into();
336 assert!(exif.get(ExifTag::Orientation).is_some());
337 } else {
338 let _: crate::Exif = it.clone_rewound().into();
339 }
340 }
341 NoData => {
342 let ms = ms.unwrap();
343 match ms.kind() {
344 crate::MediaKind::Image => {
345 let res = parser.parse_exif_async(ms).await;
346 res.unwrap_err();
347 }
348 crate::MediaKind::Track => {
349 let res = parser.parse_track_async(ms).await;
350 res.unwrap_err();
351 }
352 }
353 }
354 Invalid => {
355 ms.unwrap_err();
356 }
357 }
358 }
359
360 use crate::{EntryValue, ExifTag, TrackInfoTag};
361 use chrono::DateTime;
362 use test_case::test_case;
363
364 use crate::video::TrackInfoTag::*;
365
366 #[test]
367 fn fill_buf_check_rejects_oversize_when_combined_with_existing() {
368 use crate::parser::check_fill_size;
369 let res = check_fill_size(MAX_PARSE_BUF_SIZE - 1024, 2 * 1024);
372 assert!(res.is_err(), "expected Err, got Ok");
373 let res = check_fill_size(MAX_PARSE_BUF_SIZE - 4096, 1024);
375 assert!(res.is_ok());
376 }
377
378 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
379 #[test_case("mkv_640x360.mkv", Width, 640_u32.into())]
380 #[test_case("mkv_640x360.mkv", Height, 360_u32.into())]
381 #[test_case("mkv_640x360.mkv", DurationMs, 13346_u64.into())]
382 #[test_case("mkv_640x360.mkv", CreateDate, DateTime::parse_from_str("2008-08-08T08:08:08Z", "%+").unwrap().into())]
383 #[test_case("meta.mov", Make, "Apple".into())]
384 #[test_case("meta.mov", Model, "iPhone X".into())]
385 #[test_case("meta.mov", GpsIso6709, "+27.1281+100.2508+000.000/".into())]
386 #[test_case("meta.mp4", Width, 1920_u32.into())]
387 #[test_case("meta.mp4", Height, 1080_u32.into())]
388 #[test_case("meta.mp4", DurationMs, 1063_u64.into())]
389 #[test_case("meta.mp4", GpsIso6709, "+27.2939+112.6932/".into())]
390 #[test_case("meta.mp4", CreateDate, DateTime::parse_from_str("2024-02-03T07:05:38Z", "%+").unwrap().into())]
391 async fn parse_track_info(path: &str, tag: TrackInfoTag, v: EntryValue) {
392 use crate::MediaParser;
393 let mut parser = MediaParser::new();
394
395 let f = File::open(Path::new("testdata").join(path)).await.unwrap();
396 let ms = AsyncMediaSource::seekable(f).await.unwrap();
397 let info: TrackInfo = parser.parse_track_async(ms).await.unwrap();
398 assert_eq!(info.get(tag).unwrap(), &v);
399
400 let f = File::open(Path::new("testdata").join(path)).await.unwrap();
401 let ms = AsyncMediaSource::unseekable(f).await.unwrap();
402 let info: TrackInfo = parser.parse_track_async(ms).await.unwrap();
403 assert_eq!(info.get(tag).unwrap(), &v);
404 }
405
406 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
407 async fn async_media_kind_classifies_image_and_track() {
408 let img = AsyncMediaSource::open("testdata/exif.jpg").await.unwrap();
409 assert_eq!(img.kind(), crate::MediaKind::Image);
410
411 let trk = AsyncMediaSource::open("testdata/meta.mov").await.unwrap();
412 assert_eq!(trk.kind(), crate::MediaKind::Track);
413 }
414
415 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
416 async fn async_media_source_open() {
417 let ms = AsyncMediaSource::open("testdata/exif.jpg").await.unwrap();
418 assert_eq!(ms.kind(), crate::MediaKind::Image);
419 }
420}