1use std::{
2 cmp::{max, min},
3 fmt::Debug,
4 io::{self},
5 path::Path,
6};
7
8use tokio::{
9 fs::File,
10 io::{AsyncRead, AsyncReadExt, AsyncSeek},
11};
12
13use crate::{
14 error::{ParsedError, ParsingErrorState},
15 parser::{
16 clear_and_skip_decide, parse_loop_step, Buf, LoopAction, ParsingState, SkipPlan,
17 MAX_PARSE_BUF_SIZE, MIN_GROW_SIZE,
18 },
19};
20
21const HEADER_PARSE_BUF_SIZE: usize = 128;
23
24pub(crate) type AsyncSkipBySeekFn<R> = for<'a> fn(
29 &'a mut R,
30 u64,
31) -> std::pin::Pin<
32 Box<dyn std::future::Future<Output = io::Result<bool>> + Send + 'a>,
33>;
34
35pub struct AsyncMediaSource<R> {
36 pub(crate) reader: R,
37 pub(crate) buf: Vec<u8>,
38 pub(crate) mime: crate::file::MediaMime,
39 pub(crate) skip_by_seek: AsyncSkipBySeekFn<R>,
40 pub(crate) memory: Option<bytes::Bytes>,
45}
46
47impl<R> Debug for AsyncMediaSource<R> {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 f.debug_struct("AsyncMediaSource")
50 .field("mime", &self.mime)
51 .finish_non_exhaustive()
52 }
53}
54
55impl<R: AsyncRead + Unpin> AsyncMediaSource<R> {
56 async fn build(mut reader: R, skip_by_seek: AsyncSkipBySeekFn<R>) -> crate::Result<Self> {
57 let mut buf = Vec::with_capacity(HEADER_PARSE_BUF_SIZE);
58 (&mut reader)
59 .take(HEADER_PARSE_BUF_SIZE as u64)
60 .read_to_end(&mut buf)
61 .await?;
62 let mime: crate::file::MediaMime = buf.as_slice().try_into()?;
63 Ok(Self {
64 reader,
65 buf,
66 mime,
67 skip_by_seek,
68 memory: None,
69 })
70 }
71
72 pub fn kind(&self) -> crate::MediaKind {
73 match self.mime {
74 crate::file::MediaMime::Image(_) => crate::MediaKind::Image,
75 crate::file::MediaMime::Track(_) => crate::MediaKind::Track,
76 }
77 }
78}
79
80impl AsyncMediaSource<tokio::io::Empty> {
81 pub fn from_memory(bytes: impl Into<bytes::Bytes>) -> crate::Result<Self> {
113 let bytes = bytes.into();
114 let head_end = bytes.len().min(HEADER_PARSE_BUF_SIZE);
115 let mime: crate::file::MediaMime = bytes[..head_end].try_into()?;
116 Ok(Self {
117 reader: tokio::io::empty(),
118 buf: Vec::new(),
119 mime,
120 skip_by_seek: |_, _| Box::pin(async move { Ok(false) }),
122 memory: Some(bytes),
123 })
124 }
125}
126
127fn make_seekable_skip<R: AsyncRead + AsyncSeek + Unpin + Send>() -> AsyncSkipBySeekFn<R> {
128 |r, n| {
129 Box::pin(async move {
130 use std::io::SeekFrom;
131 use tokio::io::AsyncSeekExt;
132 let signed: i64 = n
133 .try_into()
134 .map_err(|_| io::Error::from(io::ErrorKind::InvalidInput))?;
135 r.seek(SeekFrom::Current(signed)).await?;
136 Ok(true)
137 })
138 }
139}
140
141fn make_unseekable_skip<R: AsyncRead + Unpin + Send>() -> AsyncSkipBySeekFn<R> {
142 |_, _| Box::pin(async move { Ok(false) })
143}
144
145impl<R: AsyncRead + AsyncSeek + Unpin + Send> AsyncMediaSource<R> {
146 pub async fn seekable(reader: R) -> crate::Result<Self> {
147 Self::build(reader, make_seekable_skip::<R>()).await
148 }
149}
150
151impl<R: AsyncRead + Unpin + Send> AsyncMediaSource<R> {
152 pub async fn unseekable(reader: R) -> crate::Result<Self> {
153 Self::build(reader, make_unseekable_skip::<R>()).await
154 }
155}
156
157impl AsyncMediaSource<File> {
158 pub async fn open<P: AsRef<Path>>(path: P) -> crate::Result<Self> {
161 Self::seekable(File::open(path).await?).await
162 }
163}
164
165pub(crate) trait AsyncBufParser: Buf + Debug {
166 async fn fill_buf<R: AsyncRead + Unpin>(
167 &mut self,
168 reader: &mut R,
169 size: usize,
170 ) -> io::Result<usize>;
171
172 async fn load_and_parse<R: AsyncRead + Unpin, P, O>(
173 &mut self,
174 reader: &mut R,
175 skip_by_seek: AsyncSkipBySeekFn<R>,
176 parse: P,
177 ) -> Result<O, ParsedError>
178 where
179 P: Fn(&[u8], Option<ParsingState>) -> Result<O, ParsingErrorState>,
180 {
181 self.load_and_parse_with_offset(
182 reader,
183 skip_by_seek,
184 |data, _, state| parse(data, state),
185 0,
186 )
187 .await
188 }
189
190 #[tracing::instrument(skip_all)]
191 async fn load_and_parse_with_offset<R: AsyncRead + Unpin, P, O>(
192 &mut self,
193 reader: &mut R,
194 skip_by_seek: AsyncSkipBySeekFn<R>,
195 parse: P,
196 offset: usize,
197 ) -> Result<O, ParsedError>
198 where
199 P: Fn(&[u8], usize, Option<ParsingState>) -> Result<O, ParsingErrorState>,
200 {
201 if offset >= self.buffer().len() {
202 self.fill_buf(reader, MIN_GROW_SIZE).await?;
203 }
204
205 let mut parsing_state: Option<ParsingState> = None;
206 let mut parse = parse; loop {
208 match parse_loop_step(self.buffer(), offset, &mut parsing_state, &mut parse) {
209 LoopAction::Done(o) => return Ok(o),
210 LoopAction::NeedFill(needed) => {
211 let to_read = max(needed, MIN_GROW_SIZE);
212 let n = self.fill_buf(reader, to_read).await?;
213 if n == 0 {
214 return Err(ParsedError::NoEnoughBytes);
215 }
216 }
217 LoopAction::Skip(n) => {
218 self.clear_and_skip(reader, skip_by_seek, n).await?;
219 }
220 LoopAction::Failed(s) => return Err(ParsedError::Failed(s)),
221 }
222 }
223 }
224
225 #[tracing::instrument(skip(reader, skip_by_seek))]
226 async fn clear_and_skip<R: AsyncRead + Unpin>(
227 &mut self,
228 reader: &mut R,
229 skip_by_seek: AsyncSkipBySeekFn<R>,
230 n: usize,
231 ) -> Result<(), ParsedError> {
232 match clear_and_skip_decide(self.buffer().len(), n) {
233 SkipPlan::AdvanceOnly => {
234 self.set_position(self.position() + n);
235 return Ok(());
236 }
237 SkipPlan::ClearAndSkip { extra: skip_n } => {
238 self.clear();
239 let done = (skip_by_seek)(
240 reader,
241 skip_n
242 .try_into()
243 .map_err(|_| ParsedError::Failed("skip too many bytes".into()))?,
244 )
245 .await?;
246 if !done {
247 let mut skipped = 0;
248 while skipped < skip_n {
249 let mut to_skip = skip_n - skipped;
250 to_skip = min(to_skip, MAX_PARSE_BUF_SIZE);
251 let n = self.fill_buf(reader, to_skip).await?;
252 skipped += n;
253 if skipped <= skip_n {
254 self.clear();
255 } else {
256 let remain = skipped - skip_n;
257 self.set_position(self.buffer().len() - remain);
258 break;
259 }
260 }
261 }
262
263 if self.buffer().is_empty() {
264 self.fill_buf(reader, MIN_GROW_SIZE).await?;
265 }
266 Ok(())
267 }
268 }
269 }
270}
271
272#[cfg(test)]
273mod tests {
274 use std::path::Path;
275
276 use super::*;
277 use crate::{ExifIter, TrackInfo};
278 use test_case::case;
279
280 enum TrackExif {
281 Track,
282 Exif,
283 NoData,
284 Invalid,
285 }
286 use tokio::fs::File;
287 use TrackExif::*;
288
289 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
290 #[case("3gp_640x360.3gp", Track)]
291 #[case("broken.jpg", Exif)]
292 #[case("compatible-brands-fail.heic", Invalid)]
293 #[case("compatible-brands-fail.mov", Invalid)]
294 #[case("compatible-brands.heic", NoData)]
295 #[case("compatible-brands.mov", NoData)]
296 #[case("embedded-in-heic.mov", Track)]
297 #[case("exif.heic", Exif)]
298 #[case("exif.jpg", Exif)]
299 #[case("meta.mov", Track)]
300 #[case("meta.mp4", Track)]
301 #[case("mka.mka", Track)]
302 #[case("mkv_640x360.mkv", Track)]
303 #[case("exif-one-entry.heic", Exif)]
304 #[case("no-exif.jpg", NoData)]
305 #[case("tif.tif", Exif)]
306 #[case("ramdisk.img", Invalid)]
307 #[case("webm_480.webm", Track)]
308 async fn parse_media(path: &str, te: TrackExif) {
309 use crate::MediaParser;
310 let mut parser = MediaParser::new();
311 let ms = AsyncMediaSource::open(Path::new("testdata").join(path)).await;
312 match te {
313 Track => {
314 let ms = ms.unwrap();
315 assert_eq!(ms.kind(), crate::MediaKind::Track);
316 let _: TrackInfo = parser.parse_track_async(ms).await.unwrap();
317 }
318 Exif => {
319 let ms = ms.unwrap();
320 assert_eq!(ms.kind(), crate::MediaKind::Image);
321 let mut it: ExifIter = parser.parse_exif_async(ms).await.unwrap();
322 let _ = it.parse_gps();
323
324 if path.contains("one-entry") {
325 assert!(it.next().is_some());
326 assert!(it.next().is_none());
327
328 let exif: crate::Exif = it.clone_rewound().into();
329 assert!(exif.get(ExifTag::Orientation).is_some());
330 } else {
331 let _: crate::Exif = it.clone_rewound().into();
332 }
333 }
334 NoData => {
335 let ms = ms.unwrap();
336 match ms.kind() {
337 crate::MediaKind::Image => {
338 let res = parser.parse_exif_async(ms).await;
339 res.unwrap_err();
340 }
341 crate::MediaKind::Track => {
342 let res = parser.parse_track_async(ms).await;
343 res.unwrap_err();
344 }
345 }
346 }
347 Invalid => {
348 ms.unwrap_err();
349 }
350 }
351 }
352
353 use crate::{EntryValue, ExifTag, TrackInfoTag};
354 use chrono::DateTime;
355 use test_case::test_case;
356
357 use crate::video::TrackInfoTag::*;
358
359 #[test]
360 fn fill_buf_check_rejects_oversize_when_combined_with_existing() {
361 use crate::parser::check_fill_size;
362 let res = check_fill_size(MAX_PARSE_BUF_SIZE - 1024, 2 * 1024);
365 assert!(res.is_err(), "expected Err, got Ok");
366 let res = check_fill_size(MAX_PARSE_BUF_SIZE - 4096, 1024);
368 assert!(res.is_ok());
369 }
370
371 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
372 #[test_case("mkv_640x360.mkv", Width, 640_u32.into())]
373 #[test_case("mkv_640x360.mkv", Height, 360_u32.into())]
374 #[test_case("mkv_640x360.mkv", DurationMs, 13346_u64.into())]
375 #[test_case("mkv_640x360.mkv", CreateDate, DateTime::parse_from_str("2008-08-08T08:08:08Z", "%+").unwrap().into())]
376 #[test_case("meta.mov", Make, "Apple".into())]
377 #[test_case("meta.mov", Model, "iPhone X".into())]
378 #[test_case("meta.mov", GpsIso6709, "+27.1281+100.2508+000.000/".into())]
379 #[test_case("meta.mp4", Width, 1920_u32.into())]
380 #[test_case("meta.mp4", Height, 1080_u32.into())]
381 #[test_case("meta.mp4", DurationMs, 1063_u64.into())]
382 #[test_case("meta.mp4", GpsIso6709, "+27.2939+112.6932/".into())]
383 #[test_case("meta.mp4", CreateDate, DateTime::parse_from_str("2024-02-03T07:05:38Z", "%+").unwrap().into())]
384 async fn parse_track_info(path: &str, tag: TrackInfoTag, v: EntryValue) {
385 use crate::MediaParser;
386 let mut parser = MediaParser::new();
387
388 let f = File::open(Path::new("testdata").join(path)).await.unwrap();
389 let ms = AsyncMediaSource::seekable(f).await.unwrap();
390 let info: TrackInfo = parser.parse_track_async(ms).await.unwrap();
391 assert_eq!(info.get(tag).unwrap(), &v);
392
393 let f = File::open(Path::new("testdata").join(path)).await.unwrap();
394 let ms = AsyncMediaSource::unseekable(f).await.unwrap();
395 let info: TrackInfo = parser.parse_track_async(ms).await.unwrap();
396 assert_eq!(info.get(tag).unwrap(), &v);
397 }
398
399 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
400 async fn async_media_kind_classifies_image_and_track() {
401 let img = AsyncMediaSource::open("testdata/exif.jpg").await.unwrap();
402 assert_eq!(img.kind(), crate::MediaKind::Image);
403
404 let trk = AsyncMediaSource::open("testdata/meta.mov").await.unwrap();
405 assert_eq!(trk.kind(), crate::MediaKind::Track);
406 }
407
408 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
409 async fn async_media_source_open() {
410 let ms = AsyncMediaSource::open("testdata/exif.jpg").await.unwrap();
411 assert_eq!(ms.kind(), crate::MediaKind::Image);
412 }
413}