async_tiff/
reader.rs

1//! Abstractions for network reading.
2
3use std::fmt::Debug;
4use std::io::Read;
5use std::ops::Range;
6use std::sync::Arc;
7
8use byteorder::{BigEndian, LittleEndian, ReadBytesExt};
9use bytes::buf::Reader;
10use bytes::{Buf, Bytes};
11use futures::future::{BoxFuture, FutureExt, TryFutureExt};
12use object_store::ObjectStore;
13
14use crate::error::{AsyncTiffError, AsyncTiffResult};
15
16/// The asynchronous interface used to read COG files
17///
18/// This was derived from the Parquet
19/// [`AsyncFileReader`](https://docs.rs/parquet/latest/parquet/arrow/async_reader/trait.AsyncFileReader.html)
20///
21/// Notes:
22///
23/// 1. There is a default implementation for types that implement [`tokio::io::AsyncRead`]
24///    and [`tokio::io::AsyncSeek`], for example [`tokio::fs::File`].
25///
26/// 2. [`ObjectReader`], available when the `object_store` crate feature
27///    is enabled, implements this interface for [`ObjectStore`].
28///
29/// [`ObjectStore`]: object_store::ObjectStore
30///
31/// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html
32pub trait AsyncFileReader: Debug + Send + Sync {
33    /// Retrieve the bytes in `range`
34    fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>>;
35
36    /// Retrieve multiple byte ranges. The default implementation will call `get_bytes`
37    /// sequentially
38    fn get_byte_ranges(
39        &self,
40        ranges: Vec<Range<u64>>,
41    ) -> BoxFuture<'_, AsyncTiffResult<Vec<Bytes>>> {
42        async move {
43            let mut result = Vec::with_capacity(ranges.len());
44
45            for range in ranges.into_iter() {
46                let data = self.get_bytes(range).await?;
47                result.push(data);
48            }
49
50            Ok(result)
51        }
52        .boxed()
53    }
54}
55
56/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
57impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
58    fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
59        self.as_ref().get_bytes(range)
60    }
61
62    fn get_byte_ranges(
63        &self,
64        ranges: Vec<Range<u64>>,
65    ) -> BoxFuture<'_, AsyncTiffResult<Vec<Bytes>>> {
66        self.as_ref().get_byte_ranges(ranges)
67    }
68}
69
70// #[cfg(feature = "tokio")]
71// impl<T: tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin + Debug + Send + Sync> AsyncFileReader
72//     for T
73// {
74//     fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
75//         use tokio::io::{AsyncReadExt, AsyncSeekExt};
76
77//         async move {
78//             self.seek(std::io::SeekFrom::Start(range.start)).await?;
79
80//             let to_read = (range.end - range.start).try_into().unwrap();
81//             let mut buffer = Vec::with_capacity(to_read);
82//             let read = self.take(to_read as u64).read_to_end(&mut buffer).await?;
83//             if read != to_read {
84//                 return Err(AsyncTiffError::EndOfFile(to_read, read));
85//             }
86
87//             Ok(buffer.into())
88//         }
89//         .boxed()
90//     }
91// }
92
93/// An AsyncFileReader that reads from an [`ObjectStore`] instance.
94#[derive(Clone, Debug)]
95pub struct ObjectReader {
96    store: Arc<dyn ObjectStore>,
97    path: object_store::path::Path,
98}
99
100impl ObjectReader {
101    /// Creates a new [`ObjectReader`] for the provided [`ObjectStore`] and path
102    ///
103    /// [`ObjectMeta`] can be obtained using [`ObjectStore::list`] or [`ObjectStore::head`]
104    pub fn new(store: Arc<dyn ObjectStore>, path: object_store::path::Path) -> Self {
105        Self { store, path }
106    }
107}
108
109impl AsyncFileReader for ObjectReader {
110    fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
111        let range = range.start as _..range.end as _;
112        self.store
113            .get_range(&self.path, range)
114            .map_err(|e| e.into())
115            .boxed()
116    }
117
118    fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, AsyncTiffResult<Vec<Bytes>>>
119    where
120        Self: Send,
121    {
122        let ranges = ranges
123            .into_iter()
124            .map(|r| r.start as _..r.end as _)
125            .collect::<Vec<_>>();
126        async move {
127            self.store
128                .get_ranges(&self.path, &ranges)
129                .await
130                .map_err(|e| e.into())
131        }
132        .boxed()
133    }
134}
135
136/// An AsyncFileReader that reads from a URL using reqwest.
137#[derive(Debug, Clone)]
138pub struct ReqwestReader {
139    client: reqwest::Client,
140    url: reqwest::Url,
141}
142
143impl ReqwestReader {
144    /// Construct a new ReqwestReader from a reqwest client and URL.
145    pub fn new(client: reqwest::Client, url: reqwest::Url) -> Self {
146        Self { client, url }
147    }
148}
149
150impl AsyncFileReader for ReqwestReader {
151    fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
152        let url = self.url.clone();
153        let client = self.client.clone();
154        // HTTP range is inclusive, so we need to subtract 1 from the end
155        let range = format!("bytes={}-{}", range.start, range.end - 1);
156        async move {
157            let response = client.get(url).header("Range", range).send().await?;
158            let bytes = response.bytes().await?;
159            Ok(bytes)
160        }
161        .boxed()
162    }
163}
164
165/// An AsyncFileReader that caches the first `prefetch` bytes of a file.
166#[derive(Debug)]
167pub struct PrefetchReader {
168    reader: Box<dyn AsyncFileReader>,
169    buffer: Bytes,
170}
171
172impl PrefetchReader {
173    /// Construct a new PrefetchReader, catching the first `prefetch` bytes of the file.
174    pub async fn new(reader: Box<dyn AsyncFileReader>, prefetch: u64) -> AsyncTiffResult<Self> {
175        let buffer = reader.get_bytes(0..prefetch).await?;
176        Ok(Self { reader, buffer })
177    }
178}
179
180impl AsyncFileReader for PrefetchReader {
181    fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
182        if range.start < self.buffer.len() as _ {
183            if range.end < self.buffer.len() as _ {
184                let usize_range = range.start as usize..range.end as usize;
185                let result = self.buffer.slice(usize_range);
186                async { Ok(result) }.boxed()
187            } else {
188                // TODO: reuse partial internal buffer
189                self.reader.get_bytes(range)
190            }
191        } else {
192            self.reader.get_bytes(range)
193        }
194    }
195
196    fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, AsyncTiffResult<Vec<Bytes>>>
197    where
198        Self: Send,
199    {
200        // In practice, get_byte_ranges is only used for fetching tiles, which are unlikely to
201        // overlap a metadata prefetch.
202        self.reader.get_byte_ranges(ranges)
203    }
204}
205
206#[derive(Debug, Clone, Copy)]
207pub(crate) enum Endianness {
208    LittleEndian,
209    BigEndian,
210}
211
212/// A wrapper around an [ObjectStore] that provides a seek-oriented interface
213// TODO: in the future add buffering to this
214#[derive(Debug)]
215pub(crate) struct AsyncCursor {
216    reader: Box<dyn AsyncFileReader>,
217    offset: u64,
218    endianness: Endianness,
219}
220
221impl AsyncCursor {
222    /// Create a new AsyncCursor from a reader and endianness.
223    pub(crate) fn new(reader: Box<dyn AsyncFileReader>, endianness: Endianness) -> Self {
224        Self {
225            reader,
226            offset: 0,
227            endianness,
228        }
229    }
230
231    /// Create a new AsyncCursor for a TIFF file, automatically inferring endianness from the first
232    /// two bytes.
233    pub(crate) async fn try_open_tiff(reader: Box<dyn AsyncFileReader>) -> AsyncTiffResult<Self> {
234        // Initialize with little endianness and then set later
235        let mut cursor = Self::new(reader, Endianness::LittleEndian);
236        let magic_bytes = cursor.read(2).await?;
237        let magic_bytes = magic_bytes.as_ref();
238
239        // Should be b"II" for little endian or b"MM" for big endian
240        if magic_bytes == Bytes::from_static(b"II") {
241            cursor.endianness = Endianness::LittleEndian;
242        } else if magic_bytes == Bytes::from_static(b"MM") {
243            cursor.endianness = Endianness::BigEndian;
244        } else {
245            return Err(AsyncTiffError::General(format!(
246                "unexpected magic bytes {magic_bytes:?}"
247            )));
248        };
249
250        Ok(cursor)
251    }
252
253    /// Consume self and return the underlying [`AsyncFileReader`].
254    #[allow(dead_code)]
255    pub(crate) fn into_inner(self) -> Box<dyn AsyncFileReader> {
256        self.reader
257    }
258
259    /// Read the given number of bytes, advancing the internal cursor state by the same amount.
260    pub(crate) async fn read(&mut self, length: u64) -> AsyncTiffResult<EndianAwareReader> {
261        let range = self.offset as _..(self.offset + length) as _;
262        self.offset += length;
263        let bytes = self.reader.get_bytes(range).await?;
264        Ok(EndianAwareReader {
265            reader: bytes.reader(),
266            endianness: self.endianness,
267        })
268    }
269
270    /// Read a u8 from the cursor, advancing the internal state by 1 byte.
271    pub(crate) async fn read_u8(&mut self) -> AsyncTiffResult<u8> {
272        self.read(1).await?.read_u8()
273    }
274
275    /// Read a i8 from the cursor, advancing the internal state by 1 byte.
276    pub(crate) async fn read_i8(&mut self) -> AsyncTiffResult<i8> {
277        self.read(1).await?.read_i8()
278    }
279
280    /// Read a u16 from the cursor, advancing the internal state by 2 bytes.
281    pub(crate) async fn read_u16(&mut self) -> AsyncTiffResult<u16> {
282        self.read(2).await?.read_u16()
283    }
284
285    /// Read a i16 from the cursor, advancing the internal state by 2 bytes.
286    pub(crate) async fn read_i16(&mut self) -> AsyncTiffResult<i16> {
287        self.read(2).await?.read_i16()
288    }
289
290    /// Read a u32 from the cursor, advancing the internal state by 4 bytes.
291    pub(crate) async fn read_u32(&mut self) -> AsyncTiffResult<u32> {
292        self.read(4).await?.read_u32()
293    }
294
295    /// Read a i32 from the cursor, advancing the internal state by 4 bytes.
296    pub(crate) async fn read_i32(&mut self) -> AsyncTiffResult<i32> {
297        self.read(4).await?.read_i32()
298    }
299
300    /// Read a u64 from the cursor, advancing the internal state by 8 bytes.
301    pub(crate) async fn read_u64(&mut self) -> AsyncTiffResult<u64> {
302        self.read(8).await?.read_u64()
303    }
304
305    /// Read a i64 from the cursor, advancing the internal state by 8 bytes.
306    pub(crate) async fn read_i64(&mut self) -> AsyncTiffResult<i64> {
307        self.read(8).await?.read_i64()
308    }
309
310    pub(crate) async fn read_f32(&mut self) -> AsyncTiffResult<f32> {
311        self.read(4).await?.read_f32()
312    }
313
314    pub(crate) async fn read_f64(&mut self) -> AsyncTiffResult<f64> {
315        self.read(8).await?.read_f64()
316    }
317
318    #[allow(dead_code)]
319    pub(crate) fn reader(&self) -> &dyn AsyncFileReader {
320        &self.reader
321    }
322
323    #[allow(dead_code)]
324    pub(crate) fn endianness(&self) -> Endianness {
325        self.endianness
326    }
327
328    /// Advance cursor position by a set amount
329    pub(crate) fn advance(&mut self, amount: u64) {
330        self.offset += amount;
331    }
332
333    pub(crate) fn seek(&mut self, offset: u64) {
334        self.offset = offset;
335    }
336
337    pub(crate) fn position(&self) -> u64 {
338        self.offset
339    }
340}
341
342pub(crate) struct EndianAwareReader {
343    reader: Reader<Bytes>,
344    endianness: Endianness,
345}
346
347impl EndianAwareReader {
348    /// Read a u8 from the cursor, advancing the internal state by 1 byte.
349    pub(crate) fn read_u8(&mut self) -> AsyncTiffResult<u8> {
350        Ok(self.reader.read_u8()?)
351    }
352
353    /// Read a i8 from the cursor, advancing the internal state by 1 byte.
354    pub(crate) fn read_i8(&mut self) -> AsyncTiffResult<i8> {
355        Ok(self.reader.read_i8()?)
356    }
357
358    pub(crate) fn read_u16(&mut self) -> AsyncTiffResult<u16> {
359        match self.endianness {
360            Endianness::LittleEndian => Ok(self.reader.read_u16::<LittleEndian>()?),
361            Endianness::BigEndian => Ok(self.reader.read_u16::<BigEndian>()?),
362        }
363    }
364
365    pub(crate) fn read_i16(&mut self) -> AsyncTiffResult<i16> {
366        match self.endianness {
367            Endianness::LittleEndian => Ok(self.reader.read_i16::<LittleEndian>()?),
368            Endianness::BigEndian => Ok(self.reader.read_i16::<BigEndian>()?),
369        }
370    }
371
372    pub(crate) fn read_u32(&mut self) -> AsyncTiffResult<u32> {
373        match self.endianness {
374            Endianness::LittleEndian => Ok(self.reader.read_u32::<LittleEndian>()?),
375            Endianness::BigEndian => Ok(self.reader.read_u32::<BigEndian>()?),
376        }
377    }
378
379    pub(crate) fn read_i32(&mut self) -> AsyncTiffResult<i32> {
380        match self.endianness {
381            Endianness::LittleEndian => Ok(self.reader.read_i32::<LittleEndian>()?),
382            Endianness::BigEndian => Ok(self.reader.read_i32::<BigEndian>()?),
383        }
384    }
385
386    pub(crate) fn read_u64(&mut self) -> AsyncTiffResult<u64> {
387        match self.endianness {
388            Endianness::LittleEndian => Ok(self.reader.read_u64::<LittleEndian>()?),
389            Endianness::BigEndian => Ok(self.reader.read_u64::<BigEndian>()?),
390        }
391    }
392
393    pub(crate) fn read_i64(&mut self) -> AsyncTiffResult<i64> {
394        match self.endianness {
395            Endianness::LittleEndian => Ok(self.reader.read_i64::<LittleEndian>()?),
396            Endianness::BigEndian => Ok(self.reader.read_i64::<BigEndian>()?),
397        }
398    }
399
400    pub(crate) fn read_f32(&mut self) -> AsyncTiffResult<f32> {
401        match self.endianness {
402            Endianness::LittleEndian => Ok(self.reader.read_f32::<LittleEndian>()?),
403            Endianness::BigEndian => Ok(self.reader.read_f32::<BigEndian>()?),
404        }
405    }
406
407    pub(crate) fn read_f64(&mut self) -> AsyncTiffResult<f64> {
408        match self.endianness {
409            Endianness::LittleEndian => Ok(self.reader.read_f64::<LittleEndian>()?),
410            Endianness::BigEndian => Ok(self.reader.read_f64::<BigEndian>()?),
411        }
412    }
413
414    #[allow(dead_code)]
415    pub(crate) fn into_inner(self) -> (Reader<Bytes>, Endianness) {
416        (self.reader, self.endianness)
417    }
418}
419
420impl AsRef<[u8]> for EndianAwareReader {
421    fn as_ref(&self) -> &[u8] {
422        self.reader.get_ref().as_ref()
423    }
424}
425
426impl Read for EndianAwareReader {
427    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
428        self.reader.read(buf)
429    }
430}