Skip to main content

nom_exif/
parser_async.rs

1use std::{
2    cmp::{max, min},
3    fmt::Debug,
4    io::{self},
5    path::Path,
6};
7
8use tokio::{
9    fs::File,
10    io::{AsyncRead, AsyncReadExt, AsyncSeek},
11};
12
13use crate::{
14    error::{ParsedError, ParsingErrorState},
15    parser::{
16        clear_and_skip_decide, parse_loop_step, Buf, LoopAction, ParsingState, SkipPlan,
17        MAX_PARSE_BUF_SIZE, MIN_GROW_SIZE,
18    },
19};
20
21// Should be enough for parsing header
22const HEADER_PARSE_BUF_SIZE: usize = 128;
23
24/// Async counterpart to `crate::parser::SkipBySeekFn<R>`. Closures that
25/// return a future cannot coerce to a plain `fn` type, so we use a fn pointer
26/// to a `Pin<Box<dyn Future>>`-returning closure. The Box-per-skip overhead
27/// is trivial against actual async I/O.
28pub(crate) type AsyncSkipBySeekFn<R> = for<'a> fn(
29    &'a mut R,
30    u64,
31) -> std::pin::Pin<
32    Box<dyn std::future::Future<Output = io::Result<bool>> + Send + 'a>,
33>;
34
35pub struct AsyncMediaSource<R> {
36    pub(crate) reader: R,
37    pub(crate) buf: Vec<u8>,
38    pub(crate) mime: crate::file::MediaMime,
39    pub(crate) skip_by_seek: AsyncSkipBySeekFn<R>,
40}
41
42impl<R> Debug for AsyncMediaSource<R> {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        f.debug_struct("AsyncMediaSource")
45            .field("mime", &self.mime)
46            .finish_non_exhaustive()
47    }
48}
49
50impl<R: AsyncRead + Unpin> AsyncMediaSource<R> {
51    async fn build(mut reader: R, skip_by_seek: AsyncSkipBySeekFn<R>) -> crate::Result<Self> {
52        let mut buf = Vec::with_capacity(HEADER_PARSE_BUF_SIZE);
53        (&mut reader)
54            .take(HEADER_PARSE_BUF_SIZE as u64)
55            .read_to_end(&mut buf)
56            .await?;
57        let mime: crate::file::MediaMime = buf.as_slice().try_into()?;
58        Ok(Self {
59            reader,
60            buf,
61            mime,
62            skip_by_seek,
63        })
64    }
65
66    pub fn kind(&self) -> crate::MediaKind {
67        match self.mime {
68            crate::file::MediaMime::Image(_) => crate::MediaKind::Image,
69            crate::file::MediaMime::Track(_) => crate::MediaKind::Track,
70        }
71    }
72}
73
74fn make_seekable_skip<R: AsyncRead + AsyncSeek + Unpin + Send>() -> AsyncSkipBySeekFn<R> {
75    |r, n| {
76        Box::pin(async move {
77            use std::io::SeekFrom;
78            use tokio::io::AsyncSeekExt;
79            let signed: i64 = n
80                .try_into()
81                .map_err(|_| io::Error::from(io::ErrorKind::InvalidInput))?;
82            r.seek(SeekFrom::Current(signed)).await?;
83            Ok(true)
84        })
85    }
86}
87
88fn make_unseekable_skip<R: AsyncRead + Unpin + Send>() -> AsyncSkipBySeekFn<R> {
89    |_, _| Box::pin(async move { Ok(false) })
90}
91
92impl<R: AsyncRead + AsyncSeek + Unpin + Send> AsyncMediaSource<R> {
93    pub async fn seekable(reader: R) -> crate::Result<Self> {
94        Self::build(reader, make_seekable_skip::<R>()).await
95    }
96}
97
98impl<R: AsyncRead + Unpin + Send> AsyncMediaSource<R> {
99    pub async fn unseekable(reader: R) -> crate::Result<Self> {
100        Self::build(reader, make_unseekable_skip::<R>()).await
101    }
102}
103
104impl AsyncMediaSource<File> {
105    /// Open a file at `path` (via `tokio::fs::File`) and parse its header.
106    /// For an already-open async `File` use [`Self::seekable`].
107    pub async fn open<P: AsRef<Path>>(path: P) -> crate::Result<Self> {
108        Self::seekable(File::open(path).await?).await
109    }
110}
111
112pub(crate) trait AsyncBufParser: Buf + Debug {
113    async fn fill_buf<R: AsyncRead + Unpin>(
114        &mut self,
115        reader: &mut R,
116        size: usize,
117    ) -> io::Result<usize>;
118
119    async fn load_and_parse<R: AsyncRead + Unpin, P, O>(
120        &mut self,
121        reader: &mut R,
122        skip_by_seek: AsyncSkipBySeekFn<R>,
123        parse: P,
124    ) -> Result<O, ParsedError>
125    where
126        P: Fn(&[u8], Option<ParsingState>) -> Result<O, ParsingErrorState>,
127    {
128        self.load_and_parse_with_offset(
129            reader,
130            skip_by_seek,
131            |data, _, state| parse(data, state),
132            0,
133        )
134        .await
135    }
136
137    #[tracing::instrument(skip_all)]
138    async fn load_and_parse_with_offset<R: AsyncRead + Unpin, P, O>(
139        &mut self,
140        reader: &mut R,
141        skip_by_seek: AsyncSkipBySeekFn<R>,
142        parse: P,
143        offset: usize,
144    ) -> Result<O, ParsedError>
145    where
146        P: Fn(&[u8], usize, Option<ParsingState>) -> Result<O, ParsingErrorState>,
147    {
148        if offset >= self.buffer().len() {
149            self.fill_buf(reader, MIN_GROW_SIZE).await?;
150        }
151
152        let mut parsing_state: Option<ParsingState> = None;
153        let mut parse = parse; // coerce Fn → FnMut
154        loop {
155            match parse_loop_step(self.buffer(), offset, &mut parsing_state, &mut parse) {
156                LoopAction::Done(o) => return Ok(o),
157                LoopAction::NeedFill(needed) => {
158                    let to_read = max(needed, MIN_GROW_SIZE);
159                    let n = self.fill_buf(reader, to_read).await?;
160                    if n == 0 {
161                        return Err(ParsedError::NoEnoughBytes);
162                    }
163                }
164                LoopAction::Skip(n) => {
165                    self.clear_and_skip(reader, skip_by_seek, n).await?;
166                }
167                LoopAction::Failed(s) => return Err(ParsedError::Failed(s)),
168            }
169        }
170    }
171
172    #[tracing::instrument(skip(reader, skip_by_seek))]
173    async fn clear_and_skip<R: AsyncRead + Unpin>(
174        &mut self,
175        reader: &mut R,
176        skip_by_seek: AsyncSkipBySeekFn<R>,
177        n: usize,
178    ) -> Result<(), ParsedError> {
179        match clear_and_skip_decide(self.buffer().len(), n) {
180            SkipPlan::AdvanceOnly => {
181                self.set_position(self.position() + n);
182                return Ok(());
183            }
184            SkipPlan::ClearAndSkip { extra: skip_n } => {
185                self.clear();
186                let done = (skip_by_seek)(
187                    reader,
188                    skip_n
189                        .try_into()
190                        .map_err(|_| ParsedError::Failed("skip too many bytes".into()))?,
191                )
192                .await?;
193                if !done {
194                    let mut skipped = 0;
195                    while skipped < skip_n {
196                        let mut to_skip = skip_n - skipped;
197                        to_skip = min(to_skip, MAX_PARSE_BUF_SIZE);
198                        let n = self.fill_buf(reader, to_skip).await?;
199                        skipped += n;
200                        if skipped <= skip_n {
201                            self.clear();
202                        } else {
203                            let remain = skipped - skip_n;
204                            self.set_position(self.buffer().len() - remain);
205                            break;
206                        }
207                    }
208                }
209
210                if self.buffer().is_empty() {
211                    self.fill_buf(reader, MIN_GROW_SIZE).await?;
212                }
213                Ok(())
214            }
215        }
216    }
217}
218
219#[cfg(test)]
220mod tests {
221    use std::path::Path;
222
223    use super::*;
224    use crate::{ExifIter, TrackInfo};
225    use test_case::case;
226
227    enum TrackExif {
228        Track,
229        Exif,
230        NoData,
231        Invalid,
232    }
233    use tokio::fs::File;
234    use TrackExif::*;
235
236    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
237    #[case("3gp_640x360.3gp", Track)]
238    #[case("broken.jpg", Exif)]
239    #[case("compatible-brands-fail.heic", Invalid)]
240    #[case("compatible-brands-fail.mov", Invalid)]
241    #[case("compatible-brands.heic", NoData)]
242    #[case("compatible-brands.mov", NoData)]
243    #[case("embedded-in-heic.mov", Track)]
244    #[case("exif.heic", Exif)]
245    #[case("exif.jpg", Exif)]
246    #[case("meta.mov", Track)]
247    #[case("meta.mp4", Track)]
248    #[case("mka.mka", Track)]
249    #[case("mkv_640x360.mkv", Track)]
250    #[case("exif-one-entry.heic", Exif)]
251    #[case("no-exif.jpg", NoData)]
252    #[case("tif.tif", Exif)]
253    #[case("ramdisk.img", Invalid)]
254    #[case("webm_480.webm", Track)]
255    async fn parse_media(path: &str, te: TrackExif) {
256        use crate::MediaParser;
257        let mut parser = MediaParser::new();
258        let ms = AsyncMediaSource::open(Path::new("testdata").join(path)).await;
259        match te {
260            Track => {
261                let ms = ms.unwrap();
262                assert_eq!(ms.kind(), crate::MediaKind::Track);
263                let _: TrackInfo = parser.parse_track_async(ms).await.unwrap();
264            }
265            Exif => {
266                let ms = ms.unwrap();
267                assert_eq!(ms.kind(), crate::MediaKind::Image);
268                let mut it: ExifIter = parser.parse_exif_async(ms).await.unwrap();
269                let _ = it.parse_gps();
270
271                if path.contains("one-entry") {
272                    assert!(it.next().is_some());
273                    assert!(it.next().is_none());
274
275                    let exif: crate::Exif = it.clone_rewound().into();
276                    assert!(exif.get(ExifTag::Orientation).is_some());
277                } else {
278                    let _: crate::Exif = it.clone_rewound().into();
279                }
280            }
281            NoData => {
282                let ms = ms.unwrap();
283                match ms.kind() {
284                    crate::MediaKind::Image => {
285                        let res = parser.parse_exif_async(ms).await;
286                        res.unwrap_err();
287                    }
288                    crate::MediaKind::Track => {
289                        let res = parser.parse_track_async(ms).await;
290                        res.unwrap_err();
291                    }
292                }
293            }
294            Invalid => {
295                ms.unwrap_err();
296            }
297        }
298    }
299
300    use crate::{EntryValue, ExifTag, TrackInfoTag};
301    use chrono::DateTime;
302    use test_case::test_case;
303
304    use crate::video::TrackInfoTag::*;
305
306    #[test]
307    fn fill_buf_check_rejects_oversize_when_combined_with_existing() {
308        use crate::parser::check_fill_size;
309        // The combined size guard used by both sync and async fill_buf.
310        // existing=MAX-1024, requested=2*1024 => existing+requested > MAX => Err.
311        let res = check_fill_size(MAX_PARSE_BUF_SIZE - 1024, 2 * 1024);
312        assert!(res.is_err(), "expected Err, got Ok");
313        // Below the threshold passes.
314        let res = check_fill_size(MAX_PARSE_BUF_SIZE - 4096, 1024);
315        assert!(res.is_ok());
316    }
317
318    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
319    #[test_case("mkv_640x360.mkv", Width, 640_u32.into())]
320    #[test_case("mkv_640x360.mkv", Height, 360_u32.into())]
321    #[test_case("mkv_640x360.mkv", DurationMs, 13346_u64.into())]
322    #[test_case("mkv_640x360.mkv", CreateDate, DateTime::parse_from_str("2008-08-08T08:08:08Z", "%+").unwrap().into())]
323    #[test_case("meta.mov", Make, "Apple".into())]
324    #[test_case("meta.mov", Model, "iPhone X".into())]
325    #[test_case("meta.mov", GpsIso6709, "+27.1281+100.2508+000.000/".into())]
326    #[test_case("meta.mp4", Width, 1920_u32.into())]
327    #[test_case("meta.mp4", Height, 1080_u32.into())]
328    #[test_case("meta.mp4", DurationMs, 1063_u64.into())]
329    #[test_case("meta.mp4", GpsIso6709, "+27.2939+112.6932/".into())]
330    #[test_case("meta.mp4", CreateDate, DateTime::parse_from_str("2024-02-03T07:05:38Z", "%+").unwrap().into())]
331    async fn parse_track_info(path: &str, tag: TrackInfoTag, v: EntryValue) {
332        use crate::MediaParser;
333        let mut parser = MediaParser::new();
334
335        let f = File::open(Path::new("testdata").join(path)).await.unwrap();
336        let ms = AsyncMediaSource::seekable(f).await.unwrap();
337        let info: TrackInfo = parser.parse_track_async(ms).await.unwrap();
338        assert_eq!(info.get(tag).unwrap(), &v);
339
340        let f = File::open(Path::new("testdata").join(path)).await.unwrap();
341        let ms = AsyncMediaSource::unseekable(f).await.unwrap();
342        let info: TrackInfo = parser.parse_track_async(ms).await.unwrap();
343        assert_eq!(info.get(tag).unwrap(), &v);
344    }
345
346    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
347    async fn async_media_kind_classifies_image_and_track() {
348        let img = AsyncMediaSource::open("testdata/exif.jpg").await.unwrap();
349        assert_eq!(img.kind(), crate::MediaKind::Image);
350
351        let trk = AsyncMediaSource::open("testdata/meta.mov").await.unwrap();
352        assert_eq!(trk.kind(), crate::MediaKind::Track);
353    }
354
355    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
356    async fn async_media_source_open() {
357        let ms = AsyncMediaSource::open("testdata/exif.jpg").await.unwrap();
358        assert_eq!(ms.kind(), crate::MediaKind::Image);
359    }
360}