Skip to main content

nom_exif/
parser_async.rs

1use std::{
2    cmp::{max, min},
3    fmt::Debug,
4    io::{self},
5    path::Path,
6};
7
8use tokio::{
9    fs::File,
10    io::{AsyncRead, AsyncReadExt, AsyncSeek},
11};
12
13use crate::{
14    error::{ParsedError, ParsingErrorState},
15    parser::{
16        clear_and_skip_decide, parse_loop_step, Buf, LoopAction, ParsingState, SkipPlan,
17        MAX_PARSE_BUF_SIZE, MIN_GROW_SIZE,
18    },
19};
20
21// Should be enough for parsing header
22const HEADER_PARSE_BUF_SIZE: usize = 128;
23
24/// Async counterpart to `crate::parser::SkipBySeekFn<R>`. Closures that
25/// return a future cannot coerce to a plain `fn` type, so we use a fn pointer
26/// to a `Pin<Box<dyn Future>>`-returning closure. The Box-per-skip overhead
27/// is trivial against actual async I/O.
28pub(crate) type AsyncSkipBySeekFn<R> = for<'a> fn(
29    &'a mut R,
30    u64,
31) -> std::pin::Pin<
32    Box<dyn std::future::Future<Output = io::Result<bool>> + Send + 'a>,
33>;
34
35pub struct AsyncMediaSource<R> {
36    pub(crate) reader: R,
37    pub(crate) buf: Vec<u8>,
38    pub(crate) mime: crate::file::MediaMime,
39    pub(crate) skip_by_seek: AsyncSkipBySeekFn<R>,
40    /// Set when this source was constructed via [`Self::from_memory`].
41    /// The full payload lives here as a zero-copy [`bytes::Bytes`]; the
42    /// async parse methods branch on this field to take the memory path
43    /// instead of `fill_buf`-ing from `reader`.
44    pub(crate) memory: Option<bytes::Bytes>,
45}
46
47impl<R> Debug for AsyncMediaSource<R> {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        f.debug_struct("AsyncMediaSource")
50            .field("mime", &self.mime)
51            .finish_non_exhaustive()
52    }
53}
54
55impl<R: AsyncRead + Unpin> AsyncMediaSource<R> {
56    async fn build(mut reader: R, skip_by_seek: AsyncSkipBySeekFn<R>) -> crate::Result<Self> {
57        let mut buf = Vec::with_capacity(HEADER_PARSE_BUF_SIZE);
58        (&mut reader)
59            .take(HEADER_PARSE_BUF_SIZE as u64)
60            .read_to_end(&mut buf)
61            .await?;
62        let mime: crate::file::MediaMime = buf.as_slice().try_into()?;
63        Ok(Self {
64            reader,
65            buf,
66            mime,
67            skip_by_seek,
68            memory: None,
69        })
70    }
71
72    pub fn kind(&self) -> crate::MediaKind {
73        match self.mime {
74            crate::file::MediaMime::Image(_) => crate::MediaKind::Image,
75            crate::file::MediaMime::Track(_) => crate::MediaKind::Track,
76        }
77    }
78}
79
80impl AsyncMediaSource<tokio::io::Empty> {
81    /// Build an [`AsyncMediaSource`] from an in-memory byte payload.
82    ///
83    /// Async counterpart of [`crate::MediaSource::from_memory`]. Returns
84    /// `AsyncMediaSource<tokio::io::Empty>`, which satisfies the
85    /// `<R: AsyncRead + Unpin + Send>` bound on
86    /// [`MediaParser::parse_exif_async`](crate::MediaParser::parse_exif_async),
87    /// [`parse_track_async`](crate::MediaParser::parse_track_async), and
88    /// [`parse_image_metadata_async`](crate::MediaParser::parse_image_metadata_async)
89    /// so a single async entry point per "what to parse" handles both
90    /// streaming and in-memory inputs.
91    ///
92    /// Accepts any type convertible into [`bytes::Bytes`] — `Bytes`,
93    /// `Vec<u8>`, `&'static [u8]`, `String`, `Box<[u8]>`, plus HTTP-stack
94    /// body types implementing `Into<Bytes>`. Zero-copy: parsed
95    /// `ExifIter` / sub-IFDs share the original `Bytes` via reference
96    /// counting, no copy.
97    ///
98    /// # Example
99    ///
100    /// ```rust,no_run
101    /// # async fn run() -> Result<(), nom_exif::Error> {
102    /// use nom_exif::{AsyncMediaSource, MediaKind, MediaParser};
103    ///
104    /// let bytes = tokio::fs::read("./testdata/exif.jpg").await?;
105    /// let ms = AsyncMediaSource::from_memory(bytes)?;
106    /// assert_eq!(ms.kind(), MediaKind::Image);
107    ///
108    /// let mut parser = MediaParser::new();
109    /// let _iter = parser.parse_exif_async(ms).await?;
110    /// # Ok(()) }
111    /// ```
112    pub fn from_memory(bytes: impl Into<bytes::Bytes>) -> crate::Result<Self> {
113        let bytes = bytes.into();
114        let head_end = bytes.len().min(HEADER_PARSE_BUF_SIZE);
115        let mime: crate::file::MediaMime = bytes[..head_end].try_into()?;
116        Ok(Self {
117            reader: tokio::io::empty(),
118            buf: Vec::new(),
119            mime,
120            // Placeholder: never invoked in memory mode (AdvanceOnly path).
121            skip_by_seek: |_, _| Box::pin(async move { Ok(false) }),
122            memory: Some(bytes),
123        })
124    }
125}
126
127fn make_seekable_skip<R: AsyncRead + AsyncSeek + Unpin + Send>() -> AsyncSkipBySeekFn<R> {
128    |r, n| {
129        Box::pin(async move {
130            use std::io::SeekFrom;
131            use tokio::io::AsyncSeekExt;
132            let signed: i64 = n
133                .try_into()
134                .map_err(|_| io::Error::from(io::ErrorKind::InvalidInput))?;
135            r.seek(SeekFrom::Current(signed)).await?;
136            Ok(true)
137        })
138    }
139}
140
141fn make_unseekable_skip<R: AsyncRead + Unpin + Send>() -> AsyncSkipBySeekFn<R> {
142    |_, _| Box::pin(async move { Ok(false) })
143}
144
145impl<R: AsyncRead + AsyncSeek + Unpin + Send> AsyncMediaSource<R> {
146    pub async fn seekable(reader: R) -> crate::Result<Self> {
147        Self::build(reader, make_seekable_skip::<R>()).await
148    }
149}
150
151impl<R: AsyncRead + Unpin + Send> AsyncMediaSource<R> {
152    pub async fn unseekable(reader: R) -> crate::Result<Self> {
153        Self::build(reader, make_unseekable_skip::<R>()).await
154    }
155}
156
157impl AsyncMediaSource<File> {
158    /// Open a file at `path` (via `tokio::fs::File`) and parse its header.
159    /// For an already-open async `File` use [`Self::seekable`].
160    pub async fn open<P: AsRef<Path>>(path: P) -> crate::Result<Self> {
161        Self::seekable(File::open(path).await?).await
162    }
163}
164
165pub(crate) trait AsyncBufParser: Buf + Debug {
166    async fn fill_buf<R: AsyncRead + Unpin>(
167        &mut self,
168        reader: &mut R,
169        size: usize,
170    ) -> io::Result<usize>;
171
172    async fn load_and_parse<R: AsyncRead + Unpin, P, O>(
173        &mut self,
174        reader: &mut R,
175        skip_by_seek: AsyncSkipBySeekFn<R>,
176        parse: P,
177    ) -> Result<O, ParsedError>
178    where
179        P: Fn(&[u8], Option<ParsingState>) -> Result<O, ParsingErrorState>,
180    {
181        self.load_and_parse_with_offset(
182            reader,
183            skip_by_seek,
184            |data, _, state| parse(data, state),
185            0,
186        )
187        .await
188    }
189
190    #[tracing::instrument(skip_all)]
191    async fn load_and_parse_with_offset<R: AsyncRead + Unpin, P, O>(
192        &mut self,
193        reader: &mut R,
194        skip_by_seek: AsyncSkipBySeekFn<R>,
195        parse: P,
196        offset: usize,
197    ) -> Result<O, ParsedError>
198    where
199        P: Fn(&[u8], usize, Option<ParsingState>) -> Result<O, ParsingErrorState>,
200    {
201        if offset >= self.buffer().len() {
202            self.fill_buf(reader, MIN_GROW_SIZE).await?;
203        }
204
205        let mut parsing_state: Option<ParsingState> = None;
206        let mut parse = parse; // coerce Fn → FnMut
207        loop {
208            match parse_loop_step(self.buffer(), offset, &mut parsing_state, &mut parse) {
209                LoopAction::Done(o) => return Ok(o),
210                LoopAction::NeedFill(needed) => {
211                    let to_read = max(needed, MIN_GROW_SIZE);
212                    let n = self.fill_buf(reader, to_read).await?;
213                    if n == 0 {
214                        return Err(ParsedError::NoEnoughBytes);
215                    }
216                }
217                LoopAction::Skip(n) => {
218                    self.clear_and_skip(reader, skip_by_seek, n).await?;
219                }
220                LoopAction::Failed(s) => return Err(ParsedError::Failed(s)),
221            }
222        }
223    }
224
225    #[tracing::instrument(skip(reader, skip_by_seek))]
226    async fn clear_and_skip<R: AsyncRead + Unpin>(
227        &mut self,
228        reader: &mut R,
229        skip_by_seek: AsyncSkipBySeekFn<R>,
230        n: usize,
231    ) -> Result<(), ParsedError> {
232        match clear_and_skip_decide(self.buffer().len(), n) {
233            SkipPlan::AdvanceOnly => {
234                self.set_position(self.position() + n);
235                return Ok(());
236            }
237            SkipPlan::ClearAndSkip { extra: skip_n } => {
238                self.clear();
239                let done = (skip_by_seek)(
240                    reader,
241                    skip_n
242                        .try_into()
243                        .map_err(|_| ParsedError::Failed("skip too many bytes".into()))?,
244                )
245                .await?;
246                if !done {
247                    let mut skipped = 0;
248                    while skipped < skip_n {
249                        let mut to_skip = skip_n - skipped;
250                        to_skip = min(to_skip, MAX_PARSE_BUF_SIZE);
251                        let n = self.fill_buf(reader, to_skip).await?;
252                        skipped += n;
253                        if skipped <= skip_n {
254                            self.clear();
255                        } else {
256                            let remain = skipped - skip_n;
257                            self.set_position(self.buffer().len() - remain);
258                            break;
259                        }
260                    }
261                }
262
263                if self.buffer().is_empty() {
264                    self.fill_buf(reader, MIN_GROW_SIZE).await?;
265                }
266                Ok(())
267            }
268        }
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use std::path::Path;
275
276    use super::*;
277    use crate::{ExifIter, TrackInfo};
278    use test_case::case;
279
280    enum TrackExif {
281        Track,
282        Exif,
283        NoData,
284        Invalid,
285    }
286    use tokio::fs::File;
287    use TrackExif::*;
288
289    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
290    #[case("3gp_640x360.3gp", Track)]
291    #[case("broken.jpg", Exif)]
292    #[case("compatible-brands-fail.heic", Invalid)]
293    #[case("compatible-brands-fail.mov", Invalid)]
294    #[case("compatible-brands.heic", NoData)]
295    #[case("compatible-brands.mov", NoData)]
296    #[case("embedded-in-heic.mov", Track)]
297    #[case("exif.heic", Exif)]
298    #[case("exif.jpg", Exif)]
299    #[case("meta.mov", Track)]
300    #[case("meta.mp4", Track)]
301    #[case("mka.mka", Track)]
302    #[case("mkv_640x360.mkv", Track)]
303    #[case("exif-one-entry.heic", Exif)]
304    #[case("no-exif.jpg", NoData)]
305    #[case("tif.tif", Exif)]
306    #[case("ramdisk.img", Invalid)]
307    #[case("webm_480.webm", Track)]
308    async fn parse_media(path: &str, te: TrackExif) {
309        use crate::MediaParser;
310        let mut parser = MediaParser::new();
311        let ms = AsyncMediaSource::open(Path::new("testdata").join(path)).await;
312        match te {
313            Track => {
314                let ms = ms.unwrap();
315                assert_eq!(ms.kind(), crate::MediaKind::Track);
316                let _: TrackInfo = parser.parse_track_async(ms).await.unwrap();
317            }
318            Exif => {
319                let ms = ms.unwrap();
320                assert_eq!(ms.kind(), crate::MediaKind::Image);
321                let mut it: ExifIter = parser.parse_exif_async(ms).await.unwrap();
322                let _ = it.parse_gps();
323
324                if path.contains("one-entry") {
325                    assert!(it.next().is_some());
326                    assert!(it.next().is_none());
327
328                    let exif: crate::Exif = it.clone_rewound().into();
329                    assert!(exif.get(ExifTag::Orientation).is_some());
330                } else {
331                    let _: crate::Exif = it.clone_rewound().into();
332                }
333            }
334            NoData => {
335                let ms = ms.unwrap();
336                match ms.kind() {
337                    crate::MediaKind::Image => {
338                        let res = parser.parse_exif_async(ms).await;
339                        res.unwrap_err();
340                    }
341                    crate::MediaKind::Track => {
342                        let res = parser.parse_track_async(ms).await;
343                        res.unwrap_err();
344                    }
345                }
346            }
347            Invalid => {
348                ms.unwrap_err();
349            }
350        }
351    }
352
353    use crate::{EntryValue, ExifTag, TrackInfoTag};
354    use chrono::DateTime;
355    use test_case::test_case;
356
357    use crate::video::TrackInfoTag::*;
358
359    #[test]
360    fn fill_buf_check_rejects_oversize_when_combined_with_existing() {
361        use crate::parser::check_fill_size;
362        // The combined size guard used by both sync and async fill_buf.
363        // existing=MAX-1024, requested=2*1024 => existing+requested > MAX => Err.
364        let res = check_fill_size(MAX_PARSE_BUF_SIZE - 1024, 2 * 1024);
365        assert!(res.is_err(), "expected Err, got Ok");
366        // Below the threshold passes.
367        let res = check_fill_size(MAX_PARSE_BUF_SIZE - 4096, 1024);
368        assert!(res.is_ok());
369    }
370
371    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
372    #[test_case("mkv_640x360.mkv", Width, 640_u32.into())]
373    #[test_case("mkv_640x360.mkv", Height, 360_u32.into())]
374    #[test_case("mkv_640x360.mkv", DurationMs, 13346_u64.into())]
375    #[test_case("mkv_640x360.mkv", CreateDate, DateTime::parse_from_str("2008-08-08T08:08:08Z", "%+").unwrap().into())]
376    #[test_case("meta.mov", Make, "Apple".into())]
377    #[test_case("meta.mov", Model, "iPhone X".into())]
378    #[test_case("meta.mov", GpsIso6709, "+27.1281+100.2508+000.000/".into())]
379    #[test_case("meta.mp4", Width, 1920_u32.into())]
380    #[test_case("meta.mp4", Height, 1080_u32.into())]
381    #[test_case("meta.mp4", DurationMs, 1063_u64.into())]
382    #[test_case("meta.mp4", GpsIso6709, "+27.2939+112.6932/".into())]
383    #[test_case("meta.mp4", CreateDate, DateTime::parse_from_str("2024-02-03T07:05:38Z", "%+").unwrap().into())]
384    async fn parse_track_info(path: &str, tag: TrackInfoTag, v: EntryValue) {
385        use crate::MediaParser;
386        let mut parser = MediaParser::new();
387
388        let f = File::open(Path::new("testdata").join(path)).await.unwrap();
389        let ms = AsyncMediaSource::seekable(f).await.unwrap();
390        let info: TrackInfo = parser.parse_track_async(ms).await.unwrap();
391        assert_eq!(info.get(tag).unwrap(), &v);
392
393        let f = File::open(Path::new("testdata").join(path)).await.unwrap();
394        let ms = AsyncMediaSource::unseekable(f).await.unwrap();
395        let info: TrackInfo = parser.parse_track_async(ms).await.unwrap();
396        assert_eq!(info.get(tag).unwrap(), &v);
397    }
398
399    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
400    async fn async_media_kind_classifies_image_and_track() {
401        let img = AsyncMediaSource::open("testdata/exif.jpg").await.unwrap();
402        assert_eq!(img.kind(), crate::MediaKind::Image);
403
404        let trk = AsyncMediaSource::open("testdata/meta.mov").await.unwrap();
405        assert_eq!(trk.kind(), crate::MediaKind::Track);
406    }
407
408    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
409    async fn async_media_source_open() {
410        let ms = AsyncMediaSource::open("testdata/exif.jpg").await.unwrap();
411        assert_eq!(ms.kind(), crate::MediaKind::Image);
412    }
413}