Skip to main content

async_tiff/
reader.rs

1//! Abstractions for network reading.
2
3use std::fmt::Debug;
4use std::io::Read;
5use std::ops::Range;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use byteorder::{BigEndian, LittleEndian, ReadBytesExt};
10use bytes::buf::Reader;
11use bytes::{Buf, Bytes};
12use futures::TryFutureExt;
13
14use crate::error::AsyncTiffResult;
15
16/// The asynchronous interface used to read COG files
17///
18/// This was derived from the Parquet
19/// [`AsyncFileReader`](https://docs.rs/parquet/latest/parquet/arrow/async_reader/trait.AsyncFileReader.html)
20///
21/// Notes:
22///
23/// 1. [`ObjectReader`], available when the `object_store` crate feature
24///    is enabled, implements this interface for [`ObjectStore`].
25///
26/// 2. You can use [`TokioReader`] to implement [`AsyncFileReader`] for types that implement
27///    [`tokio::io::AsyncRead`] and [`tokio::io::AsyncSeek`], for example [`tokio::fs::File`].
28///
29/// [`ObjectStore`]: object_store::ObjectStore
30///
31/// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html
32#[async_trait]
33pub trait AsyncFileReader: Debug + Send + Sync + 'static {
34    /// Retrieve the bytes in `range` as part of a request for image data, not header metadata.
35    ///
36    /// This is also used as the default implementation of
37    /// [`MetadataFetch`][crate::metadata::MetadataFetch] if not overridden.
38    async fn get_bytes(&self, range: Range<u64>) -> AsyncTiffResult<Bytes>;
39
40    /// Retrieve multiple byte ranges as part of a request for image data, not header metadata. The
41    /// default implementation will call `get_bytes` sequentially
42    async fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> AsyncTiffResult<Vec<Bytes>> {
43        let mut result = Vec::with_capacity(ranges.len());
44
45        for range in ranges.into_iter() {
46            let data = self.get_bytes(range).await?;
47            result.push(data);
48        }
49
50        Ok(result)
51    }
52}
53
54/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
55#[async_trait]
56impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
57    async fn get_bytes(&self, range: Range<u64>) -> AsyncTiffResult<Bytes> {
58        self.as_ref().get_bytes(range).await
59    }
60
61    async fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> AsyncTiffResult<Vec<Bytes>> {
62        self.as_ref().get_byte_ranges(ranges).await
63    }
64}
65
66/// This allows Arc<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
67#[async_trait]
68impl AsyncFileReader for Arc<dyn AsyncFileReader + '_> {
69    async fn get_bytes(&self, range: Range<u64>) -> AsyncTiffResult<Bytes> {
70        self.as_ref().get_bytes(range).await
71    }
72
73    async fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> AsyncTiffResult<Vec<Bytes>> {
74        self.as_ref().get_byte_ranges(ranges).await
75    }
76}
77
78/// A wrapper for things that implement [AsyncRead] and [AsyncSeek] to also implement
79/// [AsyncFileReader].
80///
81/// This wrapper is needed because `AsyncRead` and `AsyncSeek` require mutable access to seek and
82/// read data, while the `AsyncFileReader` trait requires immutable access to read data.
83///
84/// This wrapper stores the inner reader in a `Mutex`.
85///
86/// [AsyncRead]: tokio::io::AsyncRead
87/// [AsyncSeek]: tokio::io::AsyncSeek
88#[cfg(feature = "tokio")]
89#[derive(Debug)]
90pub struct TokioReader<T: tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin + Send + Debug>(
91    tokio::sync::Mutex<T>,
92);
93
94#[cfg(feature = "tokio")]
95impl<T: tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin + Send + Debug> TokioReader<T> {
96    /// Create a new TokioReader from a reader.
97    pub fn new(inner: T) -> Self {
98        Self(tokio::sync::Mutex::new(inner))
99    }
100
101    async fn make_range_request(&self, range: Range<u64>) -> AsyncTiffResult<Bytes> {
102        use std::io::SeekFrom;
103
104        use tokio::io::{AsyncReadExt, AsyncSeekExt};
105
106        use crate::error::AsyncTiffError;
107
108        let mut file = self.0.lock().await;
109
110        file.seek(SeekFrom::Start(range.start)).await?;
111
112        let to_read = range.end - range.start;
113        let mut buffer = Vec::with_capacity(to_read as usize);
114        let read = file.read(&mut buffer).await? as u64;
115        if read != to_read {
116            return Err(AsyncTiffError::EndOfFile(to_read, read));
117        }
118
119        Ok(buffer.into())
120    }
121}
122
123#[cfg(feature = "tokio")]
124#[async_trait]
125impl<T: tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin + Send + Debug + 'static>
126    AsyncFileReader for TokioReader<T>
127{
128    async fn get_bytes(&self, range: Range<u64>) -> AsyncTiffResult<Bytes> {
129        self.make_range_request(range).await
130    }
131}
132
133/// An AsyncFileReader that reads from an [`ObjectStore`][object_store::ObjectStore] instance.
134#[cfg(feature = "object_store")]
135#[derive(Clone, Debug)]
136pub struct ObjectReader {
137    store: Arc<dyn object_store::ObjectStore>,
138    path: object_store::path::Path,
139}
140
141#[cfg(feature = "object_store")]
142impl ObjectReader {
143    /// Creates a new [`ObjectReader`] for the provided [`ObjectStore`][object_store::ObjectStore]
144    /// and path.
145    pub fn new(store: Arc<dyn object_store::ObjectStore>, path: object_store::path::Path) -> Self {
146        Self { store, path }
147    }
148
149    async fn make_range_request(&self, range: Range<u64>) -> AsyncTiffResult<Bytes> {
150        use object_store::ObjectStoreExt;
151
152        let range = range.start as _..range.end as _;
153        self.store
154            .get_range(&self.path, range)
155            .map_err(|e| e.into())
156            .await
157    }
158}
159
160#[cfg(feature = "object_store")]
161#[async_trait]
162impl AsyncFileReader for ObjectReader {
163    async fn get_bytes(&self, range: Range<u64>) -> AsyncTiffResult<Bytes> {
164        self.make_range_request(range).await
165    }
166
167    async fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> AsyncTiffResult<Vec<Bytes>>
168    where
169        Self: Send,
170    {
171        let ranges = ranges
172            .into_iter()
173            .map(|r| r.start as _..r.end as _)
174            .collect::<Vec<_>>();
175        self.store
176            .get_ranges(&self.path, &ranges)
177            .await
178            .map_err(|e| e.into())
179    }
180}
181
182/// An AsyncFileReader that reads from a URL using reqwest.
183#[cfg(feature = "reqwest")]
184#[derive(Debug, Clone)]
185pub struct ReqwestReader {
186    client: reqwest::Client,
187    url: reqwest::Url,
188}
189
190#[cfg(feature = "reqwest")]
191impl ReqwestReader {
192    /// Construct a new ReqwestReader from a reqwest client and URL.
193    pub fn new(client: reqwest::Client, url: reqwest::Url) -> Self {
194        Self { client, url }
195    }
196
197    async fn make_range_request(&self, range: Range<u64>) -> AsyncTiffResult<Bytes> {
198        let url = self.url.clone();
199        let client = self.client.clone();
200        // HTTP range is inclusive, so we need to subtract 1 from the end
201        let range = format!("bytes={}-{}", range.start, range.end - 1);
202        let response = client
203            .get(url)
204            .header("Range", range)
205            .send()
206            .await?
207            .error_for_status()?;
208        let bytes = response.bytes().await?;
209        Ok(bytes)
210    }
211}
212
213#[cfg(feature = "reqwest")]
214#[async_trait]
215impl AsyncFileReader for ReqwestReader {
216    async fn get_bytes(&self, range: Range<u64>) -> AsyncTiffResult<Bytes> {
217        self.make_range_request(range).await
218    }
219}
220
221/// Endianness
222#[derive(Debug, Clone, Copy, PartialEq)]
223pub enum Endianness {
224    /// Little Endian
225    LittleEndian,
226    /// Big Endian
227    BigEndian,
228}
229
230impl Endianness {
231    /// Check if the endianness matches the native endianness of the host system.
232    ///
233    /// ```
234    /// use async_tiff::reader::Endianness;
235    ///
236    /// if cfg!(target_endian = "little") {
237    ///     assert!(Endianness::LittleEndian.is_native());
238    ///     assert!(!Endianness::BigEndian.is_native());
239    /// } else {
240    ///     assert!(Endianness::BigEndian.is_native());
241    ///     assert!(!Endianness::LittleEndian.is_native());
242    /// }
243    /// ```
244    pub fn is_native(&self) -> bool {
245        let native_endianness = if cfg!(target_endian = "little") {
246            Endianness::LittleEndian
247        } else {
248            Endianness::BigEndian
249        };
250
251        *self == native_endianness
252    }
253}
254
255pub(crate) struct EndianAwareReader {
256    reader: Reader<Bytes>,
257    endianness: Endianness,
258}
259
260impl EndianAwareReader {
261    pub(crate) fn new(bytes: Bytes, endianness: Endianness) -> Self {
262        Self {
263            reader: bytes.reader(),
264            endianness,
265        }
266    }
267
268    /// Read a u8 from the cursor, advancing the internal state by 1 byte.
269    pub(crate) fn read_u8(&mut self) -> AsyncTiffResult<u8> {
270        Ok(self.reader.read_u8()?)
271    }
272
273    /// Read a i8 from the cursor, advancing the internal state by 1 byte.
274    pub(crate) fn read_i8(&mut self) -> AsyncTiffResult<i8> {
275        Ok(self.reader.read_i8()?)
276    }
277
278    pub(crate) fn read_u16(&mut self) -> AsyncTiffResult<u16> {
279        match self.endianness {
280            Endianness::LittleEndian => Ok(self.reader.read_u16::<LittleEndian>()?),
281            Endianness::BigEndian => Ok(self.reader.read_u16::<BigEndian>()?),
282        }
283    }
284
285    pub(crate) fn read_i16(&mut self) -> AsyncTiffResult<i16> {
286        match self.endianness {
287            Endianness::LittleEndian => Ok(self.reader.read_i16::<LittleEndian>()?),
288            Endianness::BigEndian => Ok(self.reader.read_i16::<BigEndian>()?),
289        }
290    }
291
292    pub(crate) fn read_u32(&mut self) -> AsyncTiffResult<u32> {
293        match self.endianness {
294            Endianness::LittleEndian => Ok(self.reader.read_u32::<LittleEndian>()?),
295            Endianness::BigEndian => Ok(self.reader.read_u32::<BigEndian>()?),
296        }
297    }
298
299    pub(crate) fn read_i32(&mut self) -> AsyncTiffResult<i32> {
300        match self.endianness {
301            Endianness::LittleEndian => Ok(self.reader.read_i32::<LittleEndian>()?),
302            Endianness::BigEndian => Ok(self.reader.read_i32::<BigEndian>()?),
303        }
304    }
305
306    pub(crate) fn read_u64(&mut self) -> AsyncTiffResult<u64> {
307        match self.endianness {
308            Endianness::LittleEndian => Ok(self.reader.read_u64::<LittleEndian>()?),
309            Endianness::BigEndian => Ok(self.reader.read_u64::<BigEndian>()?),
310        }
311    }
312
313    pub(crate) fn read_i64(&mut self) -> AsyncTiffResult<i64> {
314        match self.endianness {
315            Endianness::LittleEndian => Ok(self.reader.read_i64::<LittleEndian>()?),
316            Endianness::BigEndian => Ok(self.reader.read_i64::<BigEndian>()?),
317        }
318    }
319
320    pub(crate) fn read_f32(&mut self) -> AsyncTiffResult<f32> {
321        match self.endianness {
322            Endianness::LittleEndian => Ok(self.reader.read_f32::<LittleEndian>()?),
323            Endianness::BigEndian => Ok(self.reader.read_f32::<BigEndian>()?),
324        }
325    }
326
327    pub(crate) fn read_f64(&mut self) -> AsyncTiffResult<f64> {
328        match self.endianness {
329            Endianness::LittleEndian => Ok(self.reader.read_f64::<LittleEndian>()?),
330            Endianness::BigEndian => Ok(self.reader.read_f64::<BigEndian>()?),
331        }
332    }
333
334    #[allow(dead_code)]
335    pub(crate) fn into_inner(self) -> (Reader<Bytes>, Endianness) {
336        (self.reader, self.endianness)
337    }
338}
339
340impl AsRef<[u8]> for EndianAwareReader {
341    fn as_ref(&self) -> &[u8] {
342        self.reader.get_ref().as_ref()
343    }
344}
345
346impl Read for EndianAwareReader {
347    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
348        self.reader.read(buf)
349    }
350}