Skip to main content

nom_exif/
parser_async.rs

1use std::{
2    cmp::{max, min},
3    fmt::Debug,
4    io::{self},
5};
6
7#[cfg(feature = "tokio-fs")]
8use std::path::Path;
9#[cfg(feature = "tokio-fs")]
10use tokio::fs::File;
11use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek};
12
13use crate::{
14    error::{ParsedError, ParsingErrorState},
15    parser::{
16        clear_and_skip_decide, parse_loop_step, Buf, LoopAction, ParsingState, SkipPlan,
17        MAX_PARSE_BUF_SIZE, MIN_GROW_SIZE,
18    },
19};
20
21// Should be enough for parsing header
22const HEADER_PARSE_BUF_SIZE: usize = 128;
23
24/// Async counterpart to `crate::parser::SkipBySeekFn<R>`. Closures that
25/// return a future cannot coerce to a plain `fn` type, so we use a fn pointer
26/// to a `Pin<Box<dyn Future>>`-returning closure. The Box-per-skip overhead
27/// is trivial against actual async I/O.
28pub(crate) type AsyncSkipBySeekFn<R> = for<'a> fn(
29    &'a mut R,
30    u64,
31) -> std::pin::Pin<
32    Box<dyn std::future::Future<Output = io::Result<bool>> + Send + 'a>,
33>;
34
35pub struct AsyncMediaSource<R> {
36    pub(crate) reader: R,
37    pub(crate) buf: Vec<u8>,
38    pub(crate) mime: crate::file::MediaMime,
39    pub(crate) skip_by_seek: AsyncSkipBySeekFn<R>,
40    /// Set when this source was constructed via [`Self::from_memory`].
41    /// The full payload lives here as a zero-copy [`bytes::Bytes`]; the
42    /// async parse methods branch on this field to take the memory path
43    /// instead of `fill_buf`-ing from `reader`.
44    pub(crate) memory: Option<bytes::Bytes>,
45}
46
47impl<R> Debug for AsyncMediaSource<R> {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        f.debug_struct("AsyncMediaSource")
50            .field("mime", &self.mime)
51            .finish_non_exhaustive()
52    }
53}
54
55impl<R: AsyncRead + Unpin> AsyncMediaSource<R> {
56    async fn build(mut reader: R, skip_by_seek: AsyncSkipBySeekFn<R>) -> crate::Result<Self> {
57        let mut buf = Vec::with_capacity(HEADER_PARSE_BUF_SIZE);
58        (&mut reader)
59            .take(HEADER_PARSE_BUF_SIZE as u64)
60            .read_to_end(&mut buf)
61            .await?;
62        let mime: crate::file::MediaMime = buf.as_slice().try_into()?;
63        Ok(Self {
64            reader,
65            buf,
66            mime,
67            skip_by_seek,
68            memory: None,
69        })
70    }
71
72    pub fn kind(&self) -> crate::MediaKind {
73        match self.mime {
74            crate::file::MediaMime::Image(_) => crate::MediaKind::Image,
75            crate::file::MediaMime::Track(_) => crate::MediaKind::Track,
76        }
77    }
78}
79
80impl AsyncMediaSource<tokio::io::Empty> {
81    /// Build an [`AsyncMediaSource`] from an in-memory byte payload.
82    ///
83    /// Async counterpart of [`crate::MediaSource::from_memory`]. Returns
84    /// `AsyncMediaSource<tokio::io::Empty>`, which satisfies the
85    /// `<R: AsyncRead + Unpin + Send>` bound on
86    /// [`MediaParser::parse_exif_async`](crate::MediaParser::parse_exif_async),
87    /// [`parse_track_async`](crate::MediaParser::parse_track_async), and
88    /// [`parse_image_metadata_async`](crate::MediaParser::parse_image_metadata_async)
89    /// so a single async entry point per "what to parse" handles both
90    /// streaming and in-memory inputs.
91    ///
92    /// Accepts any type convertible into [`bytes::Bytes`] — `Bytes`,
93    /// `Vec<u8>`, `&'static [u8]`, `String`, `Box<[u8]>`, plus HTTP-stack
94    /// body types implementing `Into<Bytes>`. Zero-copy: parsed
95    /// `ExifIter` / sub-IFDs share the original `Bytes` via reference
96    /// counting, no copy.
97    ///
98    /// # Example
99    ///
100    /// ```rust,no_run
101    /// # async fn run() -> Result<(), nom_exif::Error> {
102    /// use nom_exif::{AsyncMediaSource, MediaKind, MediaParser};
103    ///
104    /// let bytes = tokio::fs::read("./testdata/exif.jpg").await?;
105    /// let ms = AsyncMediaSource::from_memory(bytes)?;
106    /// assert_eq!(ms.kind(), MediaKind::Image);
107    ///
108    /// let mut parser = MediaParser::new();
109    /// let _iter = parser.parse_exif_async(ms).await?;
110    /// # Ok(()) }
111    /// ```
112    pub fn from_memory(bytes: impl Into<bytes::Bytes>) -> crate::Result<Self> {
113        let bytes = bytes.into();
114        let head_end = bytes.len().min(HEADER_PARSE_BUF_SIZE);
115        let mime: crate::file::MediaMime = bytes[..head_end].try_into()?;
116        Ok(Self {
117            reader: tokio::io::empty(),
118            buf: Vec::new(),
119            mime,
120            // Placeholder: never invoked in memory mode (AdvanceOnly path).
121            skip_by_seek: |_, _| Box::pin(async move { Ok(false) }),
122            memory: Some(bytes),
123        })
124    }
125}
126
127fn make_seekable_skip<R: AsyncRead + AsyncSeek + Unpin + Send>() -> AsyncSkipBySeekFn<R> {
128    |r, n| {
129        Box::pin(async move {
130            use std::io::SeekFrom;
131            use tokio::io::AsyncSeekExt;
132            let signed: i64 = n
133                .try_into()
134                .map_err(|_| io::Error::from(io::ErrorKind::InvalidInput))?;
135            r.seek(SeekFrom::Current(signed)).await?;
136            Ok(true)
137        })
138    }
139}
140
141fn make_unseekable_skip<R: AsyncRead + Unpin + Send>() -> AsyncSkipBySeekFn<R> {
142    |_, _| Box::pin(async move { Ok(false) })
143}
144
145impl<R: AsyncRead + AsyncSeek + Unpin + Send> AsyncMediaSource<R> {
146    pub async fn seekable(reader: R) -> crate::Result<Self> {
147        Self::build(reader, make_seekable_skip::<R>()).await
148    }
149}
150
151impl<R: AsyncRead + Unpin + Send> AsyncMediaSource<R> {
152    pub async fn unseekable(reader: R) -> crate::Result<Self> {
153        Self::build(reader, make_unseekable_skip::<R>()).await
154    }
155}
156
157#[cfg(feature = "tokio-fs")]
158impl AsyncMediaSource<File> {
159    /// Open a file at `path` (via `tokio::fs::File`) and parse its header.
160    /// For an already-open async `File` use [`Self::seekable`].
161    pub async fn open<P: AsRef<Path>>(path: P) -> crate::Result<Self> {
162        Self::seekable(File::open(path).await?).await
163    }
164}
165
166pub(crate) trait AsyncBufParser: Buf + Debug {
167    async fn fill_buf<R: AsyncRead + Unpin>(
168        &mut self,
169        reader: &mut R,
170        size: usize,
171    ) -> io::Result<usize>;
172
173    async fn load_and_parse<R: AsyncRead + Unpin, P, O>(
174        &mut self,
175        reader: &mut R,
176        skip_by_seek: AsyncSkipBySeekFn<R>,
177        parse: P,
178    ) -> Result<O, ParsedError>
179    where
180        P: Fn(&[u8], Option<ParsingState>) -> Result<O, ParsingErrorState>,
181    {
182        self.load_and_parse_with_offset(
183            reader,
184            skip_by_seek,
185            |data, _, state| parse(data, state),
186            0,
187        )
188        .await
189    }
190
191    #[tracing::instrument(skip_all)]
192    async fn load_and_parse_with_offset<R: AsyncRead + Unpin, P, O>(
193        &mut self,
194        reader: &mut R,
195        skip_by_seek: AsyncSkipBySeekFn<R>,
196        parse: P,
197        offset: usize,
198    ) -> Result<O, ParsedError>
199    where
200        P: Fn(&[u8], usize, Option<ParsingState>) -> Result<O, ParsingErrorState>,
201    {
202        if offset >= self.buffer().len() {
203            self.fill_buf(reader, MIN_GROW_SIZE).await?;
204        }
205
206        let mut parsing_state: Option<ParsingState> = None;
207        let mut parse = parse; // coerce Fn → FnMut
208        loop {
209            match parse_loop_step(self.buffer(), offset, &mut parsing_state, &mut parse) {
210                LoopAction::Done(o) => return Ok(o),
211                LoopAction::NeedFill(needed) => {
212                    let to_read = max(needed, MIN_GROW_SIZE);
213                    let n = self.fill_buf(reader, to_read).await?;
214                    if n == 0 {
215                        return Err(ParsedError::NoEnoughBytes);
216                    }
217                }
218                LoopAction::Skip(n) => {
219                    self.clear_and_skip(reader, skip_by_seek, n).await?;
220                }
221                LoopAction::Failed { kind, message } => {
222                    return Err(ParsedError::Failed { kind, message })
223                }
224            }
225        }
226    }
227
228    #[tracing::instrument(skip(reader, skip_by_seek))]
229    async fn clear_and_skip<R: AsyncRead + Unpin>(
230        &mut self,
231        reader: &mut R,
232        skip_by_seek: AsyncSkipBySeekFn<R>,
233        n: usize,
234    ) -> Result<(), ParsedError> {
235        match clear_and_skip_decide(self.buffer().len(), n) {
236            SkipPlan::AdvanceOnly => {
237                self.set_position(self.position() + n);
238                return Ok(());
239            }
240            SkipPlan::ClearAndSkip { extra: skip_n } => {
241                self.clear();
242                let done = (skip_by_seek)(
243                    reader,
244                    skip_n.try_into().map_err(|_| ParsedError::Failed {
245                        // No format context available here: the parser
246                        // hit an internal limit honoring a caller's skip.
247                        // Pick a sensible default — see #55 follow-up.
248                        kind: crate::error::MalformedKind::IsoBmffBox,
249                        message: "skip too many bytes".into(),
250                    })?,
251                )
252                .await?;
253                if !done {
254                    let mut skipped = 0;
255                    while skipped < skip_n {
256                        let mut to_skip = skip_n - skipped;
257                        to_skip = min(to_skip, MAX_PARSE_BUF_SIZE);
258                        let n = self.fill_buf(reader, to_skip).await?;
259                        skipped += n;
260                        if skipped <= skip_n {
261                            self.clear();
262                        } else {
263                            let remain = skipped - skip_n;
264                            self.set_position(self.buffer().len() - remain);
265                            break;
266                        }
267                    }
268                }
269
270                if self.buffer().is_empty() {
271                    self.fill_buf(reader, MIN_GROW_SIZE).await?;
272                }
273                Ok(())
274            }
275        }
276    }
277}
278
279#[cfg(all(test, feature = "tokio-fs"))]
280mod tests {
281    use std::path::Path;
282
283    use super::*;
284    use crate::{ExifIter, TrackInfo};
285    use test_case::case;
286
287    enum TrackExif {
288        Track,
289        Exif,
290        NoData,
291        Invalid,
292    }
293    use tokio::fs::File;
294    use TrackExif::*;
295
296    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
297    #[case("3gp_640x360.3gp", Track)]
298    #[case("broken.jpg", Exif)]
299    #[case("compatible-brands-fail.heic", Invalid)]
300    #[case("compatible-brands-fail.mov", Invalid)]
301    #[case("compatible-brands.heic", NoData)]
302    #[case("compatible-brands.mov", NoData)]
303    #[case("embedded-in-heic.mov", Track)]
304    #[case("exif.heic", Exif)]
305    #[case("exif.jpg", Exif)]
306    #[case("meta.mov", Track)]
307    #[case("meta.mp4", Track)]
308    #[case("mka.mka", Track)]
309    #[case("mkv_640x360.mkv", Track)]
310    #[case("exif-one-entry.heic", Exif)]
311    #[case("no-exif.jpg", NoData)]
312    #[case("tif.tif", Exif)]
313    #[case("ramdisk.img", Invalid)]
314    #[case("webm_480.webm", Track)]
315    async fn parse_media(path: &str, te: TrackExif) {
316        use crate::MediaParser;
317        let mut parser = MediaParser::new();
318        let ms = AsyncMediaSource::open(Path::new("testdata").join(path)).await;
319        match te {
320            Track => {
321                let ms = ms.unwrap();
322                assert_eq!(ms.kind(), crate::MediaKind::Track);
323                let _: TrackInfo = parser.parse_track_async(ms).await.unwrap();
324            }
325            Exif => {
326                let ms = ms.unwrap();
327                assert_eq!(ms.kind(), crate::MediaKind::Image);
328                let mut it: ExifIter = parser.parse_exif_async(ms).await.unwrap();
329                let _ = it.parse_gps();
330
331                if path.contains("one-entry") {
332                    assert!(it.next().is_some());
333                    assert!(it.next().is_none());
334
335                    let exif: crate::Exif = it.clone_rewound().into();
336                    assert!(exif.get(ExifTag::Orientation).is_some());
337                } else {
338                    let _: crate::Exif = it.clone_rewound().into();
339                }
340            }
341            NoData => {
342                let ms = ms.unwrap();
343                match ms.kind() {
344                    crate::MediaKind::Image => {
345                        let res = parser.parse_exif_async(ms).await;
346                        res.unwrap_err();
347                    }
348                    crate::MediaKind::Track => {
349                        let res = parser.parse_track_async(ms).await;
350                        res.unwrap_err();
351                    }
352                }
353            }
354            Invalid => {
355                ms.unwrap_err();
356            }
357        }
358    }
359
360    use crate::{EntryValue, ExifTag, TrackInfoTag};
361    use chrono::DateTime;
362    use test_case::test_case;
363
364    use crate::video::TrackInfoTag::*;
365
366    #[test]
367    fn fill_buf_check_rejects_oversize_when_combined_with_existing() {
368        use crate::parser::check_fill_size;
369        // The combined size guard used by both sync and async fill_buf.
370        // existing=MAX-1024, requested=2*1024 => existing+requested > MAX => Err.
371        let res = check_fill_size(MAX_PARSE_BUF_SIZE - 1024, 2 * 1024);
372        assert!(res.is_err(), "expected Err, got Ok");
373        // Below the threshold passes.
374        let res = check_fill_size(MAX_PARSE_BUF_SIZE - 4096, 1024);
375        assert!(res.is_ok());
376    }
377
378    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
379    #[test_case("mkv_640x360.mkv", Width, 640_u32.into())]
380    #[test_case("mkv_640x360.mkv", Height, 360_u32.into())]
381    #[test_case("mkv_640x360.mkv", DurationMs, 13346_u64.into())]
382    #[test_case("mkv_640x360.mkv", CreateDate, DateTime::parse_from_str("2008-08-08T08:08:08Z", "%+").unwrap().into())]
383    #[test_case("meta.mov", Make, "Apple".into())]
384    #[test_case("meta.mov", Model, "iPhone X".into())]
385    #[test_case("meta.mov", GpsIso6709, "+27.1281+100.2508+000.000/".into())]
386    #[test_case("meta.mp4", Width, 1920_u32.into())]
387    #[test_case("meta.mp4", Height, 1080_u32.into())]
388    #[test_case("meta.mp4", DurationMs, 1063_u64.into())]
389    #[test_case("meta.mp4", GpsIso6709, "+27.2939+112.6932/".into())]
390    #[test_case("meta.mp4", CreateDate, DateTime::parse_from_str("2024-02-03T07:05:38Z", "%+").unwrap().into())]
391    async fn parse_track_info(path: &str, tag: TrackInfoTag, v: EntryValue) {
392        use crate::MediaParser;
393        let mut parser = MediaParser::new();
394
395        let f = File::open(Path::new("testdata").join(path)).await.unwrap();
396        let ms = AsyncMediaSource::seekable(f).await.unwrap();
397        let info: TrackInfo = parser.parse_track_async(ms).await.unwrap();
398        assert_eq!(info.get(tag).unwrap(), &v);
399
400        let f = File::open(Path::new("testdata").join(path)).await.unwrap();
401        let ms = AsyncMediaSource::unseekable(f).await.unwrap();
402        let info: TrackInfo = parser.parse_track_async(ms).await.unwrap();
403        assert_eq!(info.get(tag).unwrap(), &v);
404    }
405
406    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
407    async fn async_media_kind_classifies_image_and_track() {
408        let img = AsyncMediaSource::open("testdata/exif.jpg").await.unwrap();
409        assert_eq!(img.kind(), crate::MediaKind::Image);
410
411        let trk = AsyncMediaSource::open("testdata/meta.mov").await.unwrap();
412        assert_eq!(trk.kind(), crate::MediaKind::Track);
413    }
414
415    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
416    async fn async_media_source_open() {
417        let ms = AsyncMediaSource::open("testdata/exif.jpg").await.unwrap();
418        assert_eq!(ms.kind(), crate::MediaKind::Image);
419    }
420}