1use std::{
2 cmp::{max, min},
3 fmt::Debug,
4 io::{self},
5 path::Path,
6};
7
8use tokio::{
9 fs::File,
10 io::{AsyncRead, AsyncReadExt, AsyncSeek},
11};
12
13use crate::{
14 error::{ParsedError, ParsingErrorState},
15 parser::{
16 clear_and_skip_decide, parse_loop_step, Buf, LoopAction, ParsingState, SkipPlan,
17 MAX_PARSE_BUF_SIZE, MIN_GROW_SIZE,
18 },
19};
20
21const HEADER_PARSE_BUF_SIZE: usize = 128;
23
24pub(crate) type AsyncSkipBySeekFn<R> = for<'a> fn(
29 &'a mut R,
30 u64,
31) -> std::pin::Pin<
32 Box<dyn std::future::Future<Output = io::Result<bool>> + Send + 'a>,
33>;
34
35pub struct AsyncMediaSource<R> {
36 pub(crate) reader: R,
37 pub(crate) buf: Vec<u8>,
38 pub(crate) mime: crate::file::MediaMime,
39 pub(crate) skip_by_seek: AsyncSkipBySeekFn<R>,
40}
41
42impl<R> Debug for AsyncMediaSource<R> {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 f.debug_struct("AsyncMediaSource")
45 .field("mime", &self.mime)
46 .finish_non_exhaustive()
47 }
48}
49
50impl<R: AsyncRead + Unpin> AsyncMediaSource<R> {
51 async fn build(mut reader: R, skip_by_seek: AsyncSkipBySeekFn<R>) -> crate::Result<Self> {
52 let mut buf = Vec::with_capacity(HEADER_PARSE_BUF_SIZE);
53 (&mut reader)
54 .take(HEADER_PARSE_BUF_SIZE as u64)
55 .read_to_end(&mut buf)
56 .await?;
57 let mime: crate::file::MediaMime = buf.as_slice().try_into()?;
58 Ok(Self {
59 reader,
60 buf,
61 mime,
62 skip_by_seek,
63 })
64 }
65
66 pub fn kind(&self) -> crate::MediaKind {
67 match self.mime {
68 crate::file::MediaMime::Image(_) => crate::MediaKind::Image,
69 crate::file::MediaMime::Track(_) => crate::MediaKind::Track,
70 }
71 }
72}
73
74fn make_seekable_skip<R: AsyncRead + AsyncSeek + Unpin + Send>() -> AsyncSkipBySeekFn<R> {
75 |r, n| {
76 Box::pin(async move {
77 use std::io::SeekFrom;
78 use tokio::io::AsyncSeekExt;
79 let signed: i64 = n
80 .try_into()
81 .map_err(|_| io::Error::from(io::ErrorKind::InvalidInput))?;
82 r.seek(SeekFrom::Current(signed)).await?;
83 Ok(true)
84 })
85 }
86}
87
88fn make_unseekable_skip<R: AsyncRead + Unpin + Send>() -> AsyncSkipBySeekFn<R> {
89 |_, _| Box::pin(async move { Ok(false) })
90}
91
92impl<R: AsyncRead + AsyncSeek + Unpin + Send> AsyncMediaSource<R> {
93 pub async fn seekable(reader: R) -> crate::Result<Self> {
94 Self::build(reader, make_seekable_skip::<R>()).await
95 }
96}
97
98impl<R: AsyncRead + Unpin + Send> AsyncMediaSource<R> {
99 pub async fn unseekable(reader: R) -> crate::Result<Self> {
100 Self::build(reader, make_unseekable_skip::<R>()).await
101 }
102}
103
104impl AsyncMediaSource<File> {
105 pub async fn open<P: AsRef<Path>>(path: P) -> crate::Result<Self> {
108 Self::seekable(File::open(path).await?).await
109 }
110}
111
112pub(crate) trait AsyncBufParser: Buf + Debug {
113 async fn fill_buf<R: AsyncRead + Unpin>(
114 &mut self,
115 reader: &mut R,
116 size: usize,
117 ) -> io::Result<usize>;
118
119 async fn load_and_parse<R: AsyncRead + Unpin, P, O>(
120 &mut self,
121 reader: &mut R,
122 skip_by_seek: AsyncSkipBySeekFn<R>,
123 parse: P,
124 ) -> Result<O, ParsedError>
125 where
126 P: Fn(&[u8], Option<ParsingState>) -> Result<O, ParsingErrorState>,
127 {
128 self.load_and_parse_with_offset(
129 reader,
130 skip_by_seek,
131 |data, _, state| parse(data, state),
132 0,
133 )
134 .await
135 }
136
137 #[tracing::instrument(skip_all)]
138 async fn load_and_parse_with_offset<R: AsyncRead + Unpin, P, O>(
139 &mut self,
140 reader: &mut R,
141 skip_by_seek: AsyncSkipBySeekFn<R>,
142 parse: P,
143 offset: usize,
144 ) -> Result<O, ParsedError>
145 where
146 P: Fn(&[u8], usize, Option<ParsingState>) -> Result<O, ParsingErrorState>,
147 {
148 if offset >= self.buffer().len() {
149 self.fill_buf(reader, MIN_GROW_SIZE).await?;
150 }
151
152 let mut parsing_state: Option<ParsingState> = None;
153 let mut parse = parse; loop {
155 match parse_loop_step(self.buffer(), offset, &mut parsing_state, &mut parse) {
156 LoopAction::Done(o) => return Ok(o),
157 LoopAction::NeedFill(needed) => {
158 let to_read = max(needed, MIN_GROW_SIZE);
159 let n = self.fill_buf(reader, to_read).await?;
160 if n == 0 {
161 return Err(ParsedError::NoEnoughBytes);
162 }
163 }
164 LoopAction::Skip(n) => {
165 self.clear_and_skip(reader, skip_by_seek, n).await?;
166 }
167 LoopAction::Failed(s) => return Err(ParsedError::Failed(s)),
168 }
169 }
170 }
171
172 #[tracing::instrument(skip(reader, skip_by_seek))]
173 async fn clear_and_skip<R: AsyncRead + Unpin>(
174 &mut self,
175 reader: &mut R,
176 skip_by_seek: AsyncSkipBySeekFn<R>,
177 n: usize,
178 ) -> Result<(), ParsedError> {
179 match clear_and_skip_decide(self.buffer().len(), n) {
180 SkipPlan::AdvanceOnly => {
181 self.set_position(self.position() + n);
182 return Ok(());
183 }
184 SkipPlan::ClearAndSkip { extra: skip_n } => {
185 self.clear();
186 let done = (skip_by_seek)(
187 reader,
188 skip_n
189 .try_into()
190 .map_err(|_| ParsedError::Failed("skip too many bytes".into()))?,
191 )
192 .await?;
193 if !done {
194 let mut skipped = 0;
195 while skipped < skip_n {
196 let mut to_skip = skip_n - skipped;
197 to_skip = min(to_skip, MAX_PARSE_BUF_SIZE);
198 let n = self.fill_buf(reader, to_skip).await?;
199 skipped += n;
200 if skipped <= skip_n {
201 self.clear();
202 } else {
203 let remain = skipped - skip_n;
204 self.set_position(self.buffer().len() - remain);
205 break;
206 }
207 }
208 }
209
210 if self.buffer().is_empty() {
211 self.fill_buf(reader, MIN_GROW_SIZE).await?;
212 }
213 Ok(())
214 }
215 }
216 }
217}
218
219#[cfg(test)]
220mod tests {
221 use std::path::Path;
222
223 use super::*;
224 use crate::{ExifIter, TrackInfo};
225 use test_case::case;
226
227 enum TrackExif {
228 Track,
229 Exif,
230 NoData,
231 Invalid,
232 }
233 use tokio::fs::File;
234 use TrackExif::*;
235
236 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
237 #[case("3gp_640x360.3gp", Track)]
238 #[case("broken.jpg", Exif)]
239 #[case("compatible-brands-fail.heic", Invalid)]
240 #[case("compatible-brands-fail.mov", Invalid)]
241 #[case("compatible-brands.heic", NoData)]
242 #[case("compatible-brands.mov", NoData)]
243 #[case("embedded-in-heic.mov", Track)]
244 #[case("exif.heic", Exif)]
245 #[case("exif.jpg", Exif)]
246 #[case("meta.mov", Track)]
247 #[case("meta.mp4", Track)]
248 #[case("mka.mka", Track)]
249 #[case("mkv_640x360.mkv", Track)]
250 #[case("exif-one-entry.heic", Exif)]
251 #[case("no-exif.jpg", NoData)]
252 #[case("tif.tif", Exif)]
253 #[case("ramdisk.img", Invalid)]
254 #[case("webm_480.webm", Track)]
255 async fn parse_media(path: &str, te: TrackExif) {
256 use crate::MediaParser;
257 let mut parser = MediaParser::new();
258 let ms = AsyncMediaSource::open(Path::new("testdata").join(path)).await;
259 match te {
260 Track => {
261 let ms = ms.unwrap();
262 assert_eq!(ms.kind(), crate::MediaKind::Track);
263 let _: TrackInfo = parser.parse_track_async(ms).await.unwrap();
264 }
265 Exif => {
266 let ms = ms.unwrap();
267 assert_eq!(ms.kind(), crate::MediaKind::Image);
268 let mut it: ExifIter = parser.parse_exif_async(ms).await.unwrap();
269 let _ = it.parse_gps();
270
271 if path.contains("one-entry") {
272 assert!(it.next().is_some());
273 assert!(it.next().is_none());
274
275 let exif: crate::Exif = it.clone_rewound().into();
276 assert!(exif.get(ExifTag::Orientation).is_some());
277 } else {
278 let _: crate::Exif = it.clone_rewound().into();
279 }
280 }
281 NoData => {
282 let ms = ms.unwrap();
283 match ms.kind() {
284 crate::MediaKind::Image => {
285 let res = parser.parse_exif_async(ms).await;
286 res.unwrap_err();
287 }
288 crate::MediaKind::Track => {
289 let res = parser.parse_track_async(ms).await;
290 res.unwrap_err();
291 }
292 }
293 }
294 Invalid => {
295 ms.unwrap_err();
296 }
297 }
298 }
299
300 use crate::{EntryValue, ExifTag, TrackInfoTag};
301 use chrono::DateTime;
302 use test_case::test_case;
303
304 use crate::video::TrackInfoTag::*;
305
306 #[test]
307 fn fill_buf_check_rejects_oversize_when_combined_with_existing() {
308 use crate::parser::check_fill_size;
309 let res = check_fill_size(MAX_PARSE_BUF_SIZE - 1024, 2 * 1024);
312 assert!(res.is_err(), "expected Err, got Ok");
313 let res = check_fill_size(MAX_PARSE_BUF_SIZE - 4096, 1024);
315 assert!(res.is_ok());
316 }
317
318 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
319 #[test_case("mkv_640x360.mkv", Width, 640_u32.into())]
320 #[test_case("mkv_640x360.mkv", Height, 360_u32.into())]
321 #[test_case("mkv_640x360.mkv", DurationMs, 13346_u64.into())]
322 #[test_case("mkv_640x360.mkv", CreateDate, DateTime::parse_from_str("2008-08-08T08:08:08Z", "%+").unwrap().into())]
323 #[test_case("meta.mov", Make, "Apple".into())]
324 #[test_case("meta.mov", Model, "iPhone X".into())]
325 #[test_case("meta.mov", GpsIso6709, "+27.1281+100.2508+000.000/".into())]
326 #[test_case("meta.mp4", Width, 1920_u32.into())]
327 #[test_case("meta.mp4", Height, 1080_u32.into())]
328 #[test_case("meta.mp4", DurationMs, 1063_u64.into())]
329 #[test_case("meta.mp4", GpsIso6709, "+27.2939+112.6932/".into())]
330 #[test_case("meta.mp4", CreateDate, DateTime::parse_from_str("2024-02-03T07:05:38Z", "%+").unwrap().into())]
331 async fn parse_track_info(path: &str, tag: TrackInfoTag, v: EntryValue) {
332 use crate::MediaParser;
333 let mut parser = MediaParser::new();
334
335 let f = File::open(Path::new("testdata").join(path)).await.unwrap();
336 let ms = AsyncMediaSource::seekable(f).await.unwrap();
337 let info: TrackInfo = parser.parse_track_async(ms).await.unwrap();
338 assert_eq!(info.get(tag).unwrap(), &v);
339
340 let f = File::open(Path::new("testdata").join(path)).await.unwrap();
341 let ms = AsyncMediaSource::unseekable(f).await.unwrap();
342 let info: TrackInfo = parser.parse_track_async(ms).await.unwrap();
343 assert_eq!(info.get(tag).unwrap(), &v);
344 }
345
346 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
347 async fn async_media_kind_classifies_image_and_track() {
348 let img = AsyncMediaSource::open("testdata/exif.jpg").await.unwrap();
349 assert_eq!(img.kind(), crate::MediaKind::Image);
350
351 let trk = AsyncMediaSource::open("testdata/meta.mov").await.unwrap();
352 assert_eq!(trk.kind(), crate::MediaKind::Track);
353 }
354
355 #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
356 async fn async_media_source_open() {
357 let ms = AsyncMediaSource::open("testdata/exif.jpg").await.unwrap();
358 assert_eq!(ms.kind(), crate::MediaKind::Image);
359 }
360}