Skip to main content

nom_exif/
parser_async.rs

1use std::{
2    cmp::{max, min},
3    fmt::Debug,
4    io::{self},
5    path::Path,
6};
7
8use tokio::{
9    fs::File,
10    io::{AsyncRead, AsyncReadExt, AsyncSeek},
11};
12
13use crate::{
14    error::{ParsedError, ParsingErrorState},
15    parser::{
16        clear_and_skip_decide, parse_loop_step, Buf, LoopAction, ParsingState, SkipPlan,
17        MAX_PARSE_BUF_SIZE, MIN_GROW_SIZE,
18    },
19};
20
21// Should be enough for parsing header
22const HEADER_PARSE_BUF_SIZE: usize = 128;
23
24/// Async counterpart to `crate::parser::SkipBySeekFn<R>`. Closures that
25/// return a future cannot coerce to a plain `fn` type, so we use a fn pointer
26/// to a `Pin<Box<dyn Future>>`-returning closure. The Box-per-skip overhead
27/// is trivial against actual async I/O.
28pub(crate) type AsyncSkipBySeekFn<R> = for<'a> fn(
29    &'a mut R,
30    u64,
31) -> std::pin::Pin<
32    Box<dyn std::future::Future<Output = io::Result<bool>> + Send + 'a>,
33>;
34
35pub struct AsyncMediaSource<R> {
36    pub(crate) reader: R,
37    pub(crate) buf: Vec<u8>,
38    pub(crate) mime: crate::file::MediaMime,
39    pub(crate) skip_by_seek: AsyncSkipBySeekFn<R>,
40    /// Set when this source was constructed via [`Self::from_memory`].
41    /// The full payload lives here as a zero-copy [`bytes::Bytes`]; the
42    /// async parse methods branch on this field to take the memory path
43    /// instead of `fill_buf`-ing from `reader`.
44    pub(crate) memory: Option<bytes::Bytes>,
45}
46
47impl<R> Debug for AsyncMediaSource<R> {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        f.debug_struct("AsyncMediaSource")
50            .field("mime", &self.mime)
51            .finish_non_exhaustive()
52    }
53}
54
55impl<R: AsyncRead + Unpin> AsyncMediaSource<R> {
56    async fn build(mut reader: R, skip_by_seek: AsyncSkipBySeekFn<R>) -> crate::Result<Self> {
57        let mut buf = Vec::with_capacity(HEADER_PARSE_BUF_SIZE);
58        (&mut reader)
59            .take(HEADER_PARSE_BUF_SIZE as u64)
60            .read_to_end(&mut buf)
61            .await?;
62        let mime: crate::file::MediaMime = buf.as_slice().try_into()?;
63        Ok(Self {
64            reader,
65            buf,
66            mime,
67            skip_by_seek,
68            memory: None,
69        })
70    }
71
72    pub fn kind(&self) -> crate::MediaKind {
73        match self.mime {
74            crate::file::MediaMime::Image(_) => crate::MediaKind::Image,
75            crate::file::MediaMime::Track(_) => crate::MediaKind::Track,
76        }
77    }
78}
79
80impl AsyncMediaSource<tokio::io::Empty> {
81    /// Build an [`AsyncMediaSource`] from an in-memory byte payload.
82    ///
83    /// Async counterpart of [`crate::MediaSource::from_memory`]. Returns
84    /// `AsyncMediaSource<tokio::io::Empty>`, which satisfies the
85    /// `<R: AsyncRead + Unpin + Send>` bound on
86    /// [`MediaParser::parse_exif_async`](crate::MediaParser::parse_exif_async),
87    /// [`parse_track_async`](crate::MediaParser::parse_track_async), and
88    /// [`parse_image_metadata_async`](crate::MediaParser::parse_image_metadata_async)
89    /// so a single async entry point per "what to parse" handles both
90    /// streaming and in-memory inputs.
91    ///
92    /// Accepts any type convertible into [`bytes::Bytes`] — `Bytes`,
93    /// `Vec<u8>`, `&'static [u8]`, `String`, `Box<[u8]>`, plus HTTP-stack
94    /// body types implementing `Into<Bytes>`. Zero-copy: parsed
95    /// `ExifIter` / sub-IFDs share the original `Bytes` via reference
96    /// counting, no copy.
97    ///
98    /// # Example
99    ///
100    /// ```rust,no_run
101    /// # async fn run() -> Result<(), nom_exif::Error> {
102    /// use nom_exif::{AsyncMediaSource, MediaKind, MediaParser};
103    ///
104    /// let bytes = tokio::fs::read("./testdata/exif.jpg").await?;
105    /// let ms = AsyncMediaSource::from_memory(bytes)?;
106    /// assert_eq!(ms.kind(), MediaKind::Image);
107    ///
108    /// let mut parser = MediaParser::new();
109    /// let _iter = parser.parse_exif_async(ms).await?;
110    /// # Ok(()) }
111    /// ```
112    pub fn from_memory(bytes: impl Into<bytes::Bytes>) -> crate::Result<Self> {
113        let bytes = bytes.into();
114        let head_end = bytes.len().min(HEADER_PARSE_BUF_SIZE);
115        let mime: crate::file::MediaMime = bytes[..head_end].try_into()?;
116        Ok(Self {
117            reader: tokio::io::empty(),
118            buf: Vec::new(),
119            mime,
120            // Placeholder: never invoked in memory mode (AdvanceOnly path).
121            skip_by_seek: |_, _| Box::pin(async move { Ok(false) }),
122            memory: Some(bytes),
123        })
124    }
125}
126
127fn make_seekable_skip<R: AsyncRead + AsyncSeek + Unpin + Send>() -> AsyncSkipBySeekFn<R> {
128    |r, n| {
129        Box::pin(async move {
130            use std::io::SeekFrom;
131            use tokio::io::AsyncSeekExt;
132            let signed: i64 = n
133                .try_into()
134                .map_err(|_| io::Error::from(io::ErrorKind::InvalidInput))?;
135            r.seek(SeekFrom::Current(signed)).await?;
136            Ok(true)
137        })
138    }
139}
140
141fn make_unseekable_skip<R: AsyncRead + Unpin + Send>() -> AsyncSkipBySeekFn<R> {
142    |_, _| Box::pin(async move { Ok(false) })
143}
144
145impl<R: AsyncRead + AsyncSeek + Unpin + Send> AsyncMediaSource<R> {
146    pub async fn seekable(reader: R) -> crate::Result<Self> {
147        Self::build(reader, make_seekable_skip::<R>()).await
148    }
149}
150
151impl<R: AsyncRead + Unpin + Send> AsyncMediaSource<R> {
152    pub async fn unseekable(reader: R) -> crate::Result<Self> {
153        Self::build(reader, make_unseekable_skip::<R>()).await
154    }
155}
156
157impl AsyncMediaSource<File> {
158    /// Open a file at `path` (via `tokio::fs::File`) and parse its header.
159    /// For an already-open async `File` use [`Self::seekable`].
160    pub async fn open<P: AsRef<Path>>(path: P) -> crate::Result<Self> {
161        Self::seekable(File::open(path).await?).await
162    }
163}
164
165pub(crate) trait AsyncBufParser: Buf + Debug {
166    async fn fill_buf<R: AsyncRead + Unpin>(
167        &mut self,
168        reader: &mut R,
169        size: usize,
170    ) -> io::Result<usize>;
171
172    async fn load_and_parse<R: AsyncRead + Unpin, P, O>(
173        &mut self,
174        reader: &mut R,
175        skip_by_seek: AsyncSkipBySeekFn<R>,
176        parse: P,
177    ) -> Result<O, ParsedError>
178    where
179        P: Fn(&[u8], Option<ParsingState>) -> Result<O, ParsingErrorState>,
180    {
181        self.load_and_parse_with_offset(
182            reader,
183            skip_by_seek,
184            |data, _, state| parse(data, state),
185            0,
186        )
187        .await
188    }
189
190    #[tracing::instrument(skip_all)]
191    async fn load_and_parse_with_offset<R: AsyncRead + Unpin, P, O>(
192        &mut self,
193        reader: &mut R,
194        skip_by_seek: AsyncSkipBySeekFn<R>,
195        parse: P,
196        offset: usize,
197    ) -> Result<O, ParsedError>
198    where
199        P: Fn(&[u8], usize, Option<ParsingState>) -> Result<O, ParsingErrorState>,
200    {
201        if offset >= self.buffer().len() {
202            self.fill_buf(reader, MIN_GROW_SIZE).await?;
203        }
204
205        let mut parsing_state: Option<ParsingState> = None;
206        let mut parse = parse; // coerce Fn → FnMut
207        loop {
208            match parse_loop_step(self.buffer(), offset, &mut parsing_state, &mut parse) {
209                LoopAction::Done(o) => return Ok(o),
210                LoopAction::NeedFill(needed) => {
211                    let to_read = max(needed, MIN_GROW_SIZE);
212                    let n = self.fill_buf(reader, to_read).await?;
213                    if n == 0 {
214                        return Err(ParsedError::NoEnoughBytes);
215                    }
216                }
217                LoopAction::Skip(n) => {
218                    self.clear_and_skip(reader, skip_by_seek, n).await?;
219                }
220                LoopAction::Failed { kind, message } => {
221                    return Err(ParsedError::Failed { kind, message })
222                }
223            }
224        }
225    }
226
227    #[tracing::instrument(skip(reader, skip_by_seek))]
228    async fn clear_and_skip<R: AsyncRead + Unpin>(
229        &mut self,
230        reader: &mut R,
231        skip_by_seek: AsyncSkipBySeekFn<R>,
232        n: usize,
233    ) -> Result<(), ParsedError> {
234        match clear_and_skip_decide(self.buffer().len(), n) {
235            SkipPlan::AdvanceOnly => {
236                self.set_position(self.position() + n);
237                return Ok(());
238            }
239            SkipPlan::ClearAndSkip { extra: skip_n } => {
240                self.clear();
241                let done = (skip_by_seek)(
242                    reader,
243                    skip_n.try_into().map_err(|_| ParsedError::Failed {
244                        // No format context available here: the parser
245                        // hit an internal limit honoring a caller's skip.
246                        // Pick a sensible default — see #55 follow-up.
247                        kind: crate::error::MalformedKind::IsoBmffBox,
248                        message: "skip too many bytes".into(),
249                    })?,
250                )
251                .await?;
252                if !done {
253                    let mut skipped = 0;
254                    while skipped < skip_n {
255                        let mut to_skip = skip_n - skipped;
256                        to_skip = min(to_skip, MAX_PARSE_BUF_SIZE);
257                        let n = self.fill_buf(reader, to_skip).await?;
258                        skipped += n;
259                        if skipped <= skip_n {
260                            self.clear();
261                        } else {
262                            let remain = skipped - skip_n;
263                            self.set_position(self.buffer().len() - remain);
264                            break;
265                        }
266                    }
267                }
268
269                if self.buffer().is_empty() {
270                    self.fill_buf(reader, MIN_GROW_SIZE).await?;
271                }
272                Ok(())
273            }
274        }
275    }
276}
277
278#[cfg(test)]
279mod tests {
280    use std::path::Path;
281
282    use super::*;
283    use crate::{ExifIter, TrackInfo};
284    use test_case::case;
285
286    enum TrackExif {
287        Track,
288        Exif,
289        NoData,
290        Invalid,
291    }
292    use tokio::fs::File;
293    use TrackExif::*;
294
295    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
296    #[case("3gp_640x360.3gp", Track)]
297    #[case("broken.jpg", Exif)]
298    #[case("compatible-brands-fail.heic", Invalid)]
299    #[case("compatible-brands-fail.mov", Invalid)]
300    #[case("compatible-brands.heic", NoData)]
301    #[case("compatible-brands.mov", NoData)]
302    #[case("embedded-in-heic.mov", Track)]
303    #[case("exif.heic", Exif)]
304    #[case("exif.jpg", Exif)]
305    #[case("meta.mov", Track)]
306    #[case("meta.mp4", Track)]
307    #[case("mka.mka", Track)]
308    #[case("mkv_640x360.mkv", Track)]
309    #[case("exif-one-entry.heic", Exif)]
310    #[case("no-exif.jpg", NoData)]
311    #[case("tif.tif", Exif)]
312    #[case("ramdisk.img", Invalid)]
313    #[case("webm_480.webm", Track)]
314    async fn parse_media(path: &str, te: TrackExif) {
315        use crate::MediaParser;
316        let mut parser = MediaParser::new();
317        let ms = AsyncMediaSource::open(Path::new("testdata").join(path)).await;
318        match te {
319            Track => {
320                let ms = ms.unwrap();
321                assert_eq!(ms.kind(), crate::MediaKind::Track);
322                let _: TrackInfo = parser.parse_track_async(ms).await.unwrap();
323            }
324            Exif => {
325                let ms = ms.unwrap();
326                assert_eq!(ms.kind(), crate::MediaKind::Image);
327                let mut it: ExifIter = parser.parse_exif_async(ms).await.unwrap();
328                let _ = it.parse_gps();
329
330                if path.contains("one-entry") {
331                    assert!(it.next().is_some());
332                    assert!(it.next().is_none());
333
334                    let exif: crate::Exif = it.clone_rewound().into();
335                    assert!(exif.get(ExifTag::Orientation).is_some());
336                } else {
337                    let _: crate::Exif = it.clone_rewound().into();
338                }
339            }
340            NoData => {
341                let ms = ms.unwrap();
342                match ms.kind() {
343                    crate::MediaKind::Image => {
344                        let res = parser.parse_exif_async(ms).await;
345                        res.unwrap_err();
346                    }
347                    crate::MediaKind::Track => {
348                        let res = parser.parse_track_async(ms).await;
349                        res.unwrap_err();
350                    }
351                }
352            }
353            Invalid => {
354                ms.unwrap_err();
355            }
356        }
357    }
358
359    use crate::{EntryValue, ExifTag, TrackInfoTag};
360    use chrono::DateTime;
361    use test_case::test_case;
362
363    use crate::video::TrackInfoTag::*;
364
365    #[test]
366    fn fill_buf_check_rejects_oversize_when_combined_with_existing() {
367        use crate::parser::check_fill_size;
368        // The combined size guard used by both sync and async fill_buf.
369        // existing=MAX-1024, requested=2*1024 => existing+requested > MAX => Err.
370        let res = check_fill_size(MAX_PARSE_BUF_SIZE - 1024, 2 * 1024);
371        assert!(res.is_err(), "expected Err, got Ok");
372        // Below the threshold passes.
373        let res = check_fill_size(MAX_PARSE_BUF_SIZE - 4096, 1024);
374        assert!(res.is_ok());
375    }
376
377    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
378    #[test_case("mkv_640x360.mkv", Width, 640_u32.into())]
379    #[test_case("mkv_640x360.mkv", Height, 360_u32.into())]
380    #[test_case("mkv_640x360.mkv", DurationMs, 13346_u64.into())]
381    #[test_case("mkv_640x360.mkv", CreateDate, DateTime::parse_from_str("2008-08-08T08:08:08Z", "%+").unwrap().into())]
382    #[test_case("meta.mov", Make, "Apple".into())]
383    #[test_case("meta.mov", Model, "iPhone X".into())]
384    #[test_case("meta.mov", GpsIso6709, "+27.1281+100.2508+000.000/".into())]
385    #[test_case("meta.mp4", Width, 1920_u32.into())]
386    #[test_case("meta.mp4", Height, 1080_u32.into())]
387    #[test_case("meta.mp4", DurationMs, 1063_u64.into())]
388    #[test_case("meta.mp4", GpsIso6709, "+27.2939+112.6932/".into())]
389    #[test_case("meta.mp4", CreateDate, DateTime::parse_from_str("2024-02-03T07:05:38Z", "%+").unwrap().into())]
390    async fn parse_track_info(path: &str, tag: TrackInfoTag, v: EntryValue) {
391        use crate::MediaParser;
392        let mut parser = MediaParser::new();
393
394        let f = File::open(Path::new("testdata").join(path)).await.unwrap();
395        let ms = AsyncMediaSource::seekable(f).await.unwrap();
396        let info: TrackInfo = parser.parse_track_async(ms).await.unwrap();
397        assert_eq!(info.get(tag).unwrap(), &v);
398
399        let f = File::open(Path::new("testdata").join(path)).await.unwrap();
400        let ms = AsyncMediaSource::unseekable(f).await.unwrap();
401        let info: TrackInfo = parser.parse_track_async(ms).await.unwrap();
402        assert_eq!(info.get(tag).unwrap(), &v);
403    }
404
405    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
406    async fn async_media_kind_classifies_image_and_track() {
407        let img = AsyncMediaSource::open("testdata/exif.jpg").await.unwrap();
408        assert_eq!(img.kind(), crate::MediaKind::Image);
409
410        let trk = AsyncMediaSource::open("testdata/meta.mov").await.unwrap();
411        assert_eq!(trk.kind(), crate::MediaKind::Track);
412    }
413
414    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
415    async fn async_media_source_open() {
416        let ms = AsyncMediaSource::open("testdata/exif.jpg").await.unwrap();
417        assert_eq!(ms.kind(), crate::MediaKind::Image);
418    }
419}