1use std::{
2 cmp::{max, min},
3 fmt::Debug,
4 io::{self},
5 path::Path,
6};
7
8use tokio::{
9 fs::File,
10 io::{AsyncRead, AsyncReadExt, AsyncSeek},
11};
12
13use crate::{
14 error::{ParsedError, ParsingErrorState},
15 parser::{
16 clear_and_skip_decide, parse_loop_step, Buf, LoopAction, ParsingState, SkipPlan,
17 MAX_PARSE_BUF_SIZE, MIN_GROW_SIZE,
18 },
19};
20
21const HEADER_PARSE_BUF_SIZE: usize = 128;
23
24pub(crate) type AsyncSkipBySeekFn<R> = for<'a> fn(
29 &'a mut R,
30 u64,
31) -> std::pin::Pin<
32 Box<dyn std::future::Future<Output = io::Result<bool>> + Send + 'a>,
33>;
34
35pub struct AsyncMediaSource<R> {
36 pub(crate) reader: R,
37 pub(crate) buf: Vec<u8>,
38 pub(crate) mime: crate::file::MediaMime,
39 pub(crate) skip_by_seek: AsyncSkipBySeekFn<R>,
40 pub(crate) memory: Option<bytes::Bytes>,
45}
46
47impl<R> Debug for AsyncMediaSource<R> {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 f.debug_struct("AsyncMediaSource")
50 .field("mime", &self.mime)
51 .finish_non_exhaustive()
52 }
53}
54
55impl<R: AsyncRead + Unpin> AsyncMediaSource<R> {
56 async fn build(mut reader: R, skip_by_seek: AsyncSkipBySeekFn<R>) -> crate::Result<Self> {
57 let mut buf = Vec::with_capacity(HEADER_PARSE_BUF_SIZE);
58 (&mut reader)
59 .take(HEADER_PARSE_BUF_SIZE as u64)
60 .read_to_end(&mut buf)
61 .await?;
62 let mime: crate::file::MediaMime = buf.as_slice().try_into()?;
63 Ok(Self {
64 reader,
65 buf,
66 mime,
67 skip_by_seek,
68 memory: None,
69 })
70 }
71
72 pub fn kind(&self) -> crate::MediaKind {
73 match self.mime {
74 crate::file::MediaMime::Image(_) => crate::MediaKind::Image,
75 crate::file::MediaMime::Track(_) => crate::MediaKind::Track,
76 }
77 }
78}
79
80impl AsyncMediaSource<tokio::io::Empty> {
81 pub fn from_memory(bytes: impl Into<bytes::Bytes>) -> crate::Result<Self> {
113 let bytes = bytes.into();
114 let head_end = bytes.len().min(HEADER_PARSE_BUF_SIZE);
115 let mime: crate::file::MediaMime = bytes[..head_end].try_into()?;
116 Ok(Self {
117 reader: tokio::io::empty(),
118 buf: Vec::new(),
119 mime,
120 skip_by_seek: |_, _| Box::pin(async move { Ok(false) }),
122 memory: Some(bytes),
123 })
124 }
125}
126
127fn make_seekable_skip<R: AsyncRead + AsyncSeek + Unpin + Send>() -> AsyncSkipBySeekFn<R> {
128 |r, n| {
129 Box::pin(async move {
130 use std::io::SeekFrom;
131 use tokio::io::AsyncSeekExt;
132 let signed: i64 = n
133 .try_into()
134 .map_err(|_| io::Error::from(io::ErrorKind::InvalidInput))?;
135 r.seek(SeekFrom::Current(signed)).await?;
136 Ok(true)
137 })
138 }
139}
140
141fn make_unseekable_skip<R: AsyncRead + Unpin + Send>() -> AsyncSkipBySeekFn<R> {
142 |_, _| Box::pin(async move { Ok(false) })
143}
144
145impl<R: AsyncRead + AsyncSeek + Unpin + Send> AsyncMediaSource<R> {
146 pub async fn seekable(reader: R) -> crate::Result<Self> {
147 Self::build(reader, make_seekable_skip::<R>()).await
148 }
149}
150
151impl<R: AsyncRead + Unpin + Send> AsyncMediaSource<R> {
152 pub async fn unseekable(reader: R) -> crate::Result<Self> {
153 Self::build(reader, make_unseekable_skip::<R>()).await
154 }
155}
156
157impl AsyncMediaSource<File> {
158 pub async fn open<P: AsRef<Path>>(path: P) -> crate::Result<Self> {
161 Self::seekable(File::open(path).await?).await
162 }
163}
164
165pub(crate) trait AsyncBufParser: Buf + Debug {
166 async fn fill_buf<R: AsyncRead + Unpin>(
167 &mut self,
168 reader: &mut R,
169 size: usize,
170 ) -> io::Result<usize>;
171
172 async fn load_and_parse<R: AsyncRead + Unpin, P, O>(
173 &mut self,
174 reader: &mut R,
175 skip_by_seek: AsyncSkipBySeekFn<R>,
176 parse: P,
177 ) -> Result<O, ParsedError>
178 where
179 P: Fn(&[u8], Option<ParsingState>) -> Result<O, ParsingErrorState>,
180 {
181 self.load_and_parse_with_offset(
182 reader,
183 skip_by_seek,
184 |data, _, state| parse(data, state),
185 0,
186 )
187 .await
188 }
189
190 #[tracing::instrument(skip_all)]
191 async fn load_and_parse_with_offset<R: AsyncRead + Unpin, P, O>(
192 &mut self,
193 reader: &mut R,
194 skip_by_seek: AsyncSkipBySeekFn<R>,
195 parse: P,
196 offset: usize,
197 ) -> Result<O, ParsedError>
198 where
199 P: Fn(&[u8], usize, Option<ParsingState>) -> Result<O, ParsingErrorState>,
200 {
201 if offset >= self.buffer().len() {
202 self.fill_buf(reader, MIN_GROW_SIZE).await?;
203 }
204
205 let mut parsing_state: Option<ParsingState> = None;
206 let mut parse = parse; loop {
208 match parse_loop_step(self.buffer(), offset, &mut parsing_state, &mut parse) {
209 LoopAction::Done(o) => return Ok(o),
210 LoopAction::NeedFill(needed) => {
211 let to_read = max(needed, MIN_GROW_SIZE);
212 let n = self.fill_buf(reader, to_read).await?;
213 if n == 0 {
214 return Err(ParsedError::NoEnoughBytes);
215 }
216 }
217 LoopAction::Skip(n) => {
218 self.clear_and_skip(reader, skip_by_seek, n).await?;
219 }
220 LoopAction::Failed { kind, message } => {
221 return Err(ParsedError::Failed { kind, message })
222 }
223 }
224 }
225 }
226
227 #[tracing::instrument(skip(reader, skip_by_seek))]
228 async fn clear_and_skip<R: AsyncRead + Unpin>(
229 &mut self,
230 reader: &mut R,
231 skip_by_seek: AsyncSkipBySeekFn<R>,
232 n: usize,
233 ) -> Result<(), ParsedError> {
234 match clear_and_skip_decide(self.buffer().len(), n) {
235 SkipPlan::AdvanceOnly => {
236 self.set_position(self.position() + n);
237 return Ok(());
238 }
239 SkipPlan::ClearAndSkip { extra: skip_n } => {
240 self.clear();
241 let done = (skip_by_seek)(
242 reader,
243 skip_n.try_into().map_err(|_| ParsedError::Failed {
244 kind: crate::error::MalformedKind::IsoBmffBox,
248 message: "skip too many bytes".into(),
249 })?,
250 )
251 .await?;
252 if !done {
253 let mut skipped = 0;
254 while skipped < skip_n {
255 let mut to_skip = skip_n - skipped;
256 to_skip = min(to_skip, MAX_PARSE_BUF_SIZE);
257 let n = self.fill_buf(reader, to_skip).await?;
258 skipped += n;
259 if skipped <= skip_n {
260 self.clear();
261 } else {
262 let remain = skipped - skip_n;
263 self.set_position(self.buffer().len() - remain);
264 break;
265 }
266 }
267 }
268
269 if self.buffer().is_empty() {
270 self.fill_buf(reader, MIN_GROW_SIZE).await?;
271 }
272 Ok(())
273 }
274 }
275 }
276}
277
278#[cfg(test)]
279mod tests {
280 use std::path::Path;
281
282 use super::*;
283 use crate::{ExifIter, TrackInfo};
284 use test_case::case;
285
286 enum TrackExif {
287 Track,
288 Exif,
289 NoData,
290 Invalid,
291 }
292 use tokio::fs::File;
293 use TrackExif::*;
294
295 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
296 #[case("3gp_640x360.3gp", Track)]
297 #[case("broken.jpg", Exif)]
298 #[case("compatible-brands-fail.heic", Invalid)]
299 #[case("compatible-brands-fail.mov", Invalid)]
300 #[case("compatible-brands.heic", NoData)]
301 #[case("compatible-brands.mov", NoData)]
302 #[case("embedded-in-heic.mov", Track)]
303 #[case("exif.heic", Exif)]
304 #[case("exif.jpg", Exif)]
305 #[case("meta.mov", Track)]
306 #[case("meta.mp4", Track)]
307 #[case("mka.mka", Track)]
308 #[case("mkv_640x360.mkv", Track)]
309 #[case("exif-one-entry.heic", Exif)]
310 #[case("no-exif.jpg", NoData)]
311 #[case("tif.tif", Exif)]
312 #[case("ramdisk.img", Invalid)]
313 #[case("webm_480.webm", Track)]
314 async fn parse_media(path: &str, te: TrackExif) {
315 use crate::MediaParser;
316 let mut parser = MediaParser::new();
317 let ms = AsyncMediaSource::open(Path::new("testdata").join(path)).await;
318 match te {
319 Track => {
320 let ms = ms.unwrap();
321 assert_eq!(ms.kind(), crate::MediaKind::Track);
322 let _: TrackInfo = parser.parse_track_async(ms).await.unwrap();
323 }
324 Exif => {
325 let ms = ms.unwrap();
326 assert_eq!(ms.kind(), crate::MediaKind::Image);
327 let mut it: ExifIter = parser.parse_exif_async(ms).await.unwrap();
328 let _ = it.parse_gps();
329
330 if path.contains("one-entry") {
331 assert!(it.next().is_some());
332 assert!(it.next().is_none());
333
334 let exif: crate::Exif = it.clone_rewound().into();
335 assert!(exif.get(ExifTag::Orientation).is_some());
336 } else {
337 let _: crate::Exif = it.clone_rewound().into();
338 }
339 }
340 NoData => {
341 let ms = ms.unwrap();
342 match ms.kind() {
343 crate::MediaKind::Image => {
344 let res = parser.parse_exif_async(ms).await;
345 res.unwrap_err();
346 }
347 crate::MediaKind::Track => {
348 let res = parser.parse_track_async(ms).await;
349 res.unwrap_err();
350 }
351 }
352 }
353 Invalid => {
354 ms.unwrap_err();
355 }
356 }
357 }
358
359 use crate::{EntryValue, ExifTag, TrackInfoTag};
360 use chrono::DateTime;
361 use test_case::test_case;
362
363 use crate::video::TrackInfoTag::*;
364
365 #[test]
366 fn fill_buf_check_rejects_oversize_when_combined_with_existing() {
367 use crate::parser::check_fill_size;
368 let res = check_fill_size(MAX_PARSE_BUF_SIZE - 1024, 2 * 1024);
371 assert!(res.is_err(), "expected Err, got Ok");
372 let res = check_fill_size(MAX_PARSE_BUF_SIZE - 4096, 1024);
374 assert!(res.is_ok());
375 }
376
377 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
378 #[test_case("mkv_640x360.mkv", Width, 640_u32.into())]
379 #[test_case("mkv_640x360.mkv", Height, 360_u32.into())]
380 #[test_case("mkv_640x360.mkv", DurationMs, 13346_u64.into())]
381 #[test_case("mkv_640x360.mkv", CreateDate, DateTime::parse_from_str("2008-08-08T08:08:08Z", "%+").unwrap().into())]
382 #[test_case("meta.mov", Make, "Apple".into())]
383 #[test_case("meta.mov", Model, "iPhone X".into())]
384 #[test_case("meta.mov", GpsIso6709, "+27.1281+100.2508+000.000/".into())]
385 #[test_case("meta.mp4", Width, 1920_u32.into())]
386 #[test_case("meta.mp4", Height, 1080_u32.into())]
387 #[test_case("meta.mp4", DurationMs, 1063_u64.into())]
388 #[test_case("meta.mp4", GpsIso6709, "+27.2939+112.6932/".into())]
389 #[test_case("meta.mp4", CreateDate, DateTime::parse_from_str("2024-02-03T07:05:38Z", "%+").unwrap().into())]
390 async fn parse_track_info(path: &str, tag: TrackInfoTag, v: EntryValue) {
391 use crate::MediaParser;
392 let mut parser = MediaParser::new();
393
394 let f = File::open(Path::new("testdata").join(path)).await.unwrap();
395 let ms = AsyncMediaSource::seekable(f).await.unwrap();
396 let info: TrackInfo = parser.parse_track_async(ms).await.unwrap();
397 assert_eq!(info.get(tag).unwrap(), &v);
398
399 let f = File::open(Path::new("testdata").join(path)).await.unwrap();
400 let ms = AsyncMediaSource::unseekable(f).await.unwrap();
401 let info: TrackInfo = parser.parse_track_async(ms).await.unwrap();
402 assert_eq!(info.get(tag).unwrap(), &v);
403 }
404
405 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
406 async fn async_media_kind_classifies_image_and_track() {
407 let img = AsyncMediaSource::open("testdata/exif.jpg").await.unwrap();
408 assert_eq!(img.kind(), crate::MediaKind::Image);
409
410 let trk = AsyncMediaSource::open("testdata/meta.mov").await.unwrap();
411 assert_eq!(trk.kind(), crate::MediaKind::Track);
412 }
413
414 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
415 async fn async_media_source_open() {
416 let ms = AsyncMediaSource::open("testdata/exif.jpg").await.unwrap();
417 assert_eq!(ms.kind(), crate::MediaKind::Image);
418 }
419}