1use std::fmt::Debug;
4use std::io::Read;
5use std::ops::Range;
6use std::sync::Arc;
7
8use byteorder::{BigEndian, LittleEndian, ReadBytesExt};
9use bytes::buf::Reader;
10use bytes::{Buf, Bytes};
11use futures::future::{BoxFuture, FutureExt, TryFutureExt};
12use object_store::ObjectStore;
13
14use crate::error::{AsyncTiffError, AsyncTiffResult};
15
16pub trait AsyncFileReader: Debug + Send + Sync {
33 fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>>;
35
36 fn get_byte_ranges(
39 &self,
40 ranges: Vec<Range<u64>>,
41 ) -> BoxFuture<'_, AsyncTiffResult<Vec<Bytes>>> {
42 async move {
43 let mut result = Vec::with_capacity(ranges.len());
44
45 for range in ranges.into_iter() {
46 let data = self.get_bytes(range).await?;
47 result.push(data);
48 }
49
50 Ok(result)
51 }
52 .boxed()
53 }
54}
55
56impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
58 fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
59 self.as_ref().get_bytes(range)
60 }
61
62 fn get_byte_ranges(
63 &self,
64 ranges: Vec<Range<u64>>,
65 ) -> BoxFuture<'_, AsyncTiffResult<Vec<Bytes>>> {
66 self.as_ref().get_byte_ranges(ranges)
67 }
68}
69
70#[derive(Clone, Debug)]
95pub struct ObjectReader {
96 store: Arc<dyn ObjectStore>,
97 path: object_store::path::Path,
98}
99
100impl ObjectReader {
101 pub fn new(store: Arc<dyn ObjectStore>, path: object_store::path::Path) -> Self {
105 Self { store, path }
106 }
107}
108
109impl AsyncFileReader for ObjectReader {
110 fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
111 let range = range.start as _..range.end as _;
112 self.store
113 .get_range(&self.path, range)
114 .map_err(|e| e.into())
115 .boxed()
116 }
117
118 fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, AsyncTiffResult<Vec<Bytes>>>
119 where
120 Self: Send,
121 {
122 let ranges = ranges
123 .into_iter()
124 .map(|r| r.start as _..r.end as _)
125 .collect::<Vec<_>>();
126 async move {
127 self.store
128 .get_ranges(&self.path, &ranges)
129 .await
130 .map_err(|e| e.into())
131 }
132 .boxed()
133 }
134}
135
136#[derive(Debug, Clone)]
138pub struct ReqwestReader {
139 client: reqwest::Client,
140 url: reqwest::Url,
141}
142
143impl ReqwestReader {
144 pub fn new(client: reqwest::Client, url: reqwest::Url) -> Self {
146 Self { client, url }
147 }
148}
149
150impl AsyncFileReader for ReqwestReader {
151 fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
152 let url = self.url.clone();
153 let client = self.client.clone();
154 let range = format!("bytes={}-{}", range.start, range.end - 1);
156 async move {
157 let response = client.get(url).header("Range", range).send().await?;
158 let bytes = response.bytes().await?;
159 Ok(bytes)
160 }
161 .boxed()
162 }
163}
164
165#[derive(Debug)]
167pub struct PrefetchReader {
168 reader: Box<dyn AsyncFileReader>,
169 buffer: Bytes,
170}
171
172impl PrefetchReader {
173 pub async fn new(reader: Box<dyn AsyncFileReader>, prefetch: u64) -> AsyncTiffResult<Self> {
175 let buffer = reader.get_bytes(0..prefetch).await?;
176 Ok(Self { reader, buffer })
177 }
178}
179
180impl AsyncFileReader for PrefetchReader {
181 fn get_bytes(&self, range: Range<u64>) -> BoxFuture<'_, AsyncTiffResult<Bytes>> {
182 if range.start < self.buffer.len() as _ {
183 if range.end < self.buffer.len() as _ {
184 let usize_range = range.start as usize..range.end as usize;
185 let result = self.buffer.slice(usize_range);
186 async { Ok(result) }.boxed()
187 } else {
188 self.reader.get_bytes(range)
190 }
191 } else {
192 self.reader.get_bytes(range)
193 }
194 }
195
196 fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, AsyncTiffResult<Vec<Bytes>>>
197 where
198 Self: Send,
199 {
200 self.reader.get_byte_ranges(ranges)
203 }
204}
205
206#[derive(Debug, Clone, Copy)]
207pub(crate) enum Endianness {
208 LittleEndian,
209 BigEndian,
210}
211
212#[derive(Debug)]
215pub(crate) struct AsyncCursor {
216 reader: Box<dyn AsyncFileReader>,
217 offset: u64,
218 endianness: Endianness,
219}
220
221impl AsyncCursor {
222 pub(crate) fn new(reader: Box<dyn AsyncFileReader>, endianness: Endianness) -> Self {
224 Self {
225 reader,
226 offset: 0,
227 endianness,
228 }
229 }
230
231 pub(crate) async fn try_open_tiff(reader: Box<dyn AsyncFileReader>) -> AsyncTiffResult<Self> {
234 let mut cursor = Self::new(reader, Endianness::LittleEndian);
236 let magic_bytes = cursor.read(2).await?;
237 let magic_bytes = magic_bytes.as_ref();
238
239 if magic_bytes == Bytes::from_static(b"II") {
241 cursor.endianness = Endianness::LittleEndian;
242 } else if magic_bytes == Bytes::from_static(b"MM") {
243 cursor.endianness = Endianness::BigEndian;
244 } else {
245 return Err(AsyncTiffError::General(format!(
246 "unexpected magic bytes {magic_bytes:?}"
247 )));
248 };
249
250 Ok(cursor)
251 }
252
253 #[allow(dead_code)]
255 pub(crate) fn into_inner(self) -> Box<dyn AsyncFileReader> {
256 self.reader
257 }
258
259 pub(crate) async fn read(&mut self, length: u64) -> AsyncTiffResult<EndianAwareReader> {
261 let range = self.offset as _..(self.offset + length) as _;
262 self.offset += length;
263 let bytes = self.reader.get_bytes(range).await?;
264 Ok(EndianAwareReader {
265 reader: bytes.reader(),
266 endianness: self.endianness,
267 })
268 }
269
270 pub(crate) async fn read_u8(&mut self) -> AsyncTiffResult<u8> {
272 self.read(1).await?.read_u8()
273 }
274
275 pub(crate) async fn read_i8(&mut self) -> AsyncTiffResult<i8> {
277 self.read(1).await?.read_i8()
278 }
279
280 pub(crate) async fn read_u16(&mut self) -> AsyncTiffResult<u16> {
282 self.read(2).await?.read_u16()
283 }
284
285 pub(crate) async fn read_i16(&mut self) -> AsyncTiffResult<i16> {
287 self.read(2).await?.read_i16()
288 }
289
290 pub(crate) async fn read_u32(&mut self) -> AsyncTiffResult<u32> {
292 self.read(4).await?.read_u32()
293 }
294
295 pub(crate) async fn read_i32(&mut self) -> AsyncTiffResult<i32> {
297 self.read(4).await?.read_i32()
298 }
299
300 pub(crate) async fn read_u64(&mut self) -> AsyncTiffResult<u64> {
302 self.read(8).await?.read_u64()
303 }
304
305 pub(crate) async fn read_i64(&mut self) -> AsyncTiffResult<i64> {
307 self.read(8).await?.read_i64()
308 }
309
310 pub(crate) async fn read_f32(&mut self) -> AsyncTiffResult<f32> {
311 self.read(4).await?.read_f32()
312 }
313
314 pub(crate) async fn read_f64(&mut self) -> AsyncTiffResult<f64> {
315 self.read(8).await?.read_f64()
316 }
317
318 #[allow(dead_code)]
319 pub(crate) fn reader(&self) -> &dyn AsyncFileReader {
320 &self.reader
321 }
322
323 #[allow(dead_code)]
324 pub(crate) fn endianness(&self) -> Endianness {
325 self.endianness
326 }
327
328 pub(crate) fn advance(&mut self, amount: u64) {
330 self.offset += amount;
331 }
332
333 pub(crate) fn seek(&mut self, offset: u64) {
334 self.offset = offset;
335 }
336
337 pub(crate) fn position(&self) -> u64 {
338 self.offset
339 }
340}
341
342pub(crate) struct EndianAwareReader {
343 reader: Reader<Bytes>,
344 endianness: Endianness,
345}
346
347impl EndianAwareReader {
348 pub(crate) fn read_u8(&mut self) -> AsyncTiffResult<u8> {
350 Ok(self.reader.read_u8()?)
351 }
352
353 pub(crate) fn read_i8(&mut self) -> AsyncTiffResult<i8> {
355 Ok(self.reader.read_i8()?)
356 }
357
358 pub(crate) fn read_u16(&mut self) -> AsyncTiffResult<u16> {
359 match self.endianness {
360 Endianness::LittleEndian => Ok(self.reader.read_u16::<LittleEndian>()?),
361 Endianness::BigEndian => Ok(self.reader.read_u16::<BigEndian>()?),
362 }
363 }
364
365 pub(crate) fn read_i16(&mut self) -> AsyncTiffResult<i16> {
366 match self.endianness {
367 Endianness::LittleEndian => Ok(self.reader.read_i16::<LittleEndian>()?),
368 Endianness::BigEndian => Ok(self.reader.read_i16::<BigEndian>()?),
369 }
370 }
371
372 pub(crate) fn read_u32(&mut self) -> AsyncTiffResult<u32> {
373 match self.endianness {
374 Endianness::LittleEndian => Ok(self.reader.read_u32::<LittleEndian>()?),
375 Endianness::BigEndian => Ok(self.reader.read_u32::<BigEndian>()?),
376 }
377 }
378
379 pub(crate) fn read_i32(&mut self) -> AsyncTiffResult<i32> {
380 match self.endianness {
381 Endianness::LittleEndian => Ok(self.reader.read_i32::<LittleEndian>()?),
382 Endianness::BigEndian => Ok(self.reader.read_i32::<BigEndian>()?),
383 }
384 }
385
386 pub(crate) fn read_u64(&mut self) -> AsyncTiffResult<u64> {
387 match self.endianness {
388 Endianness::LittleEndian => Ok(self.reader.read_u64::<LittleEndian>()?),
389 Endianness::BigEndian => Ok(self.reader.read_u64::<BigEndian>()?),
390 }
391 }
392
393 pub(crate) fn read_i64(&mut self) -> AsyncTiffResult<i64> {
394 match self.endianness {
395 Endianness::LittleEndian => Ok(self.reader.read_i64::<LittleEndian>()?),
396 Endianness::BigEndian => Ok(self.reader.read_i64::<BigEndian>()?),
397 }
398 }
399
400 pub(crate) fn read_f32(&mut self) -> AsyncTiffResult<f32> {
401 match self.endianness {
402 Endianness::LittleEndian => Ok(self.reader.read_f32::<LittleEndian>()?),
403 Endianness::BigEndian => Ok(self.reader.read_f32::<BigEndian>()?),
404 }
405 }
406
407 pub(crate) fn read_f64(&mut self) -> AsyncTiffResult<f64> {
408 match self.endianness {
409 Endianness::LittleEndian => Ok(self.reader.read_f64::<LittleEndian>()?),
410 Endianness::BigEndian => Ok(self.reader.read_f64::<BigEndian>()?),
411 }
412 }
413
414 #[allow(dead_code)]
415 pub(crate) fn into_inner(self) -> (Reader<Bytes>, Endianness) {
416 (self.reader, self.endianness)
417 }
418}
419
420impl AsRef<[u8]> for EndianAwareReader {
421 fn as_ref(&self) -> &[u8] {
422 self.reader.get_ref().as_ref()
423 }
424}
425
426impl Read for EndianAwareReader {
427 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
428 self.reader.read(buf)
429 }
430}