1use std::fmt::Debug;
4use std::io::Read;
5use std::ops::Range;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use byteorder::{BigEndian, LittleEndian, ReadBytesExt};
10use bytes::buf::Reader;
11use bytes::{Buf, Bytes};
12use futures::TryFutureExt;
13
14use crate::error::AsyncTiffResult;
15
16#[async_trait]
33pub trait AsyncFileReader: Debug + Send + Sync + 'static {
34 async fn get_bytes(&self, range: Range<u64>) -> AsyncTiffResult<Bytes>;
39
40 async fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> AsyncTiffResult<Vec<Bytes>> {
43 let mut result = Vec::with_capacity(ranges.len());
44
45 for range in ranges.into_iter() {
46 let data = self.get_bytes(range).await?;
47 result.push(data);
48 }
49
50 Ok(result)
51 }
52}
53
54#[async_trait]
56impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
57 async fn get_bytes(&self, range: Range<u64>) -> AsyncTiffResult<Bytes> {
58 self.as_ref().get_bytes(range).await
59 }
60
61 async fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> AsyncTiffResult<Vec<Bytes>> {
62 self.as_ref().get_byte_ranges(ranges).await
63 }
64}
65
66#[async_trait]
68impl AsyncFileReader for Arc<dyn AsyncFileReader + '_> {
69 async fn get_bytes(&self, range: Range<u64>) -> AsyncTiffResult<Bytes> {
70 self.as_ref().get_bytes(range).await
71 }
72
73 async fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> AsyncTiffResult<Vec<Bytes>> {
74 self.as_ref().get_byte_ranges(ranges).await
75 }
76}
77
78#[cfg(feature = "tokio")]
89#[derive(Debug)]
90pub struct TokioReader<T: tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin + Send + Debug>(
91 tokio::sync::Mutex<T>,
92);
93
94#[cfg(feature = "tokio")]
95impl<T: tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin + Send + Debug> TokioReader<T> {
96 pub fn new(inner: T) -> Self {
98 Self(tokio::sync::Mutex::new(inner))
99 }
100
101 async fn make_range_request(&self, range: Range<u64>) -> AsyncTiffResult<Bytes> {
102 use std::io::SeekFrom;
103
104 use tokio::io::{AsyncReadExt, AsyncSeekExt};
105
106 use crate::error::AsyncTiffError;
107
108 let mut file = self.0.lock().await;
109
110 file.seek(SeekFrom::Start(range.start)).await?;
111
112 let to_read = range.end - range.start;
113 let mut buffer = Vec::with_capacity(to_read as usize);
114 let read = file.read(&mut buffer).await? as u64;
115 if read != to_read {
116 return Err(AsyncTiffError::EndOfFile(to_read, read));
117 }
118
119 Ok(buffer.into())
120 }
121}
122
123#[cfg(feature = "tokio")]
124#[async_trait]
125impl<T: tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin + Send + Debug + 'static>
126 AsyncFileReader for TokioReader<T>
127{
128 async fn get_bytes(&self, range: Range<u64>) -> AsyncTiffResult<Bytes> {
129 self.make_range_request(range).await
130 }
131}
132
133#[cfg(feature = "object_store")]
135#[derive(Clone, Debug)]
136pub struct ObjectReader {
137 store: Arc<dyn object_store::ObjectStore>,
138 path: object_store::path::Path,
139}
140
141#[cfg(feature = "object_store")]
142impl ObjectReader {
143 pub fn new(store: Arc<dyn object_store::ObjectStore>, path: object_store::path::Path) -> Self {
146 Self { store, path }
147 }
148
149 async fn make_range_request(&self, range: Range<u64>) -> AsyncTiffResult<Bytes> {
150 use object_store::ObjectStoreExt;
151
152 let range = range.start as _..range.end as _;
153 self.store
154 .get_range(&self.path, range)
155 .map_err(|e| e.into())
156 .await
157 }
158}
159
160#[cfg(feature = "object_store")]
161#[async_trait]
162impl AsyncFileReader for ObjectReader {
163 async fn get_bytes(&self, range: Range<u64>) -> AsyncTiffResult<Bytes> {
164 self.make_range_request(range).await
165 }
166
167 async fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> AsyncTiffResult<Vec<Bytes>>
168 where
169 Self: Send,
170 {
171 let ranges = ranges
172 .into_iter()
173 .map(|r| r.start as _..r.end as _)
174 .collect::<Vec<_>>();
175 self.store
176 .get_ranges(&self.path, &ranges)
177 .await
178 .map_err(|e| e.into())
179 }
180}
181
182#[cfg(feature = "reqwest")]
184#[derive(Debug, Clone)]
185pub struct ReqwestReader {
186 client: reqwest::Client,
187 url: reqwest::Url,
188}
189
190#[cfg(feature = "reqwest")]
191impl ReqwestReader {
192 pub fn new(client: reqwest::Client, url: reqwest::Url) -> Self {
194 Self { client, url }
195 }
196
197 async fn make_range_request(&self, range: Range<u64>) -> AsyncTiffResult<Bytes> {
198 let url = self.url.clone();
199 let client = self.client.clone();
200 let range = format!("bytes={}-{}", range.start, range.end - 1);
202 let response = client
203 .get(url)
204 .header("Range", range)
205 .send()
206 .await?
207 .error_for_status()?;
208 let bytes = response.bytes().await?;
209 Ok(bytes)
210 }
211}
212
213#[cfg(feature = "reqwest")]
214#[async_trait]
215impl AsyncFileReader for ReqwestReader {
216 async fn get_bytes(&self, range: Range<u64>) -> AsyncTiffResult<Bytes> {
217 self.make_range_request(range).await
218 }
219}
220
221#[derive(Debug, Clone, Copy, PartialEq)]
223pub enum Endianness {
224 LittleEndian,
226 BigEndian,
228}
229
230impl Endianness {
231 pub fn is_native(&self) -> bool {
245 let native_endianness = if cfg!(target_endian = "little") {
246 Endianness::LittleEndian
247 } else {
248 Endianness::BigEndian
249 };
250
251 *self == native_endianness
252 }
253}
254
255pub(crate) struct EndianAwareReader {
256 reader: Reader<Bytes>,
257 endianness: Endianness,
258}
259
260impl EndianAwareReader {
261 pub(crate) fn new(bytes: Bytes, endianness: Endianness) -> Self {
262 Self {
263 reader: bytes.reader(),
264 endianness,
265 }
266 }
267
268 pub(crate) fn read_u8(&mut self) -> AsyncTiffResult<u8> {
270 Ok(self.reader.read_u8()?)
271 }
272
273 pub(crate) fn read_i8(&mut self) -> AsyncTiffResult<i8> {
275 Ok(self.reader.read_i8()?)
276 }
277
278 pub(crate) fn read_u16(&mut self) -> AsyncTiffResult<u16> {
279 match self.endianness {
280 Endianness::LittleEndian => Ok(self.reader.read_u16::<LittleEndian>()?),
281 Endianness::BigEndian => Ok(self.reader.read_u16::<BigEndian>()?),
282 }
283 }
284
285 pub(crate) fn read_i16(&mut self) -> AsyncTiffResult<i16> {
286 match self.endianness {
287 Endianness::LittleEndian => Ok(self.reader.read_i16::<LittleEndian>()?),
288 Endianness::BigEndian => Ok(self.reader.read_i16::<BigEndian>()?),
289 }
290 }
291
292 pub(crate) fn read_u32(&mut self) -> AsyncTiffResult<u32> {
293 match self.endianness {
294 Endianness::LittleEndian => Ok(self.reader.read_u32::<LittleEndian>()?),
295 Endianness::BigEndian => Ok(self.reader.read_u32::<BigEndian>()?),
296 }
297 }
298
299 pub(crate) fn read_i32(&mut self) -> AsyncTiffResult<i32> {
300 match self.endianness {
301 Endianness::LittleEndian => Ok(self.reader.read_i32::<LittleEndian>()?),
302 Endianness::BigEndian => Ok(self.reader.read_i32::<BigEndian>()?),
303 }
304 }
305
306 pub(crate) fn read_u64(&mut self) -> AsyncTiffResult<u64> {
307 match self.endianness {
308 Endianness::LittleEndian => Ok(self.reader.read_u64::<LittleEndian>()?),
309 Endianness::BigEndian => Ok(self.reader.read_u64::<BigEndian>()?),
310 }
311 }
312
313 pub(crate) fn read_i64(&mut self) -> AsyncTiffResult<i64> {
314 match self.endianness {
315 Endianness::LittleEndian => Ok(self.reader.read_i64::<LittleEndian>()?),
316 Endianness::BigEndian => Ok(self.reader.read_i64::<BigEndian>()?),
317 }
318 }
319
320 pub(crate) fn read_f32(&mut self) -> AsyncTiffResult<f32> {
321 match self.endianness {
322 Endianness::LittleEndian => Ok(self.reader.read_f32::<LittleEndian>()?),
323 Endianness::BigEndian => Ok(self.reader.read_f32::<BigEndian>()?),
324 }
325 }
326
327 pub(crate) fn read_f64(&mut self) -> AsyncTiffResult<f64> {
328 match self.endianness {
329 Endianness::LittleEndian => Ok(self.reader.read_f64::<LittleEndian>()?),
330 Endianness::BigEndian => Ok(self.reader.read_f64::<BigEndian>()?),
331 }
332 }
333
334 #[allow(dead_code)]
335 pub(crate) fn into_inner(self) -> (Reader<Bytes>, Endianness) {
336 (self.reader, self.endianness)
337 }
338}
339
340impl AsRef<[u8]> for EndianAwareReader {
341 fn as_ref(&self) -> &[u8] {
342 self.reader.get_ref().as_ref()
343 }
344}
345
346impl Read for EndianAwareReader {
347 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
348 self.reader.read(buf)
349 }
350}