1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::ops::Range;
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use bytes::{Bytes, BytesMut};
8use tokio::sync::Mutex;
9
10use crate::error::{HDF5Error, Result};
11
12#[async_trait]
17pub trait AsyncFileReader: Debug + Send + Sync + 'static {
18 async fn get_bytes(&self, range: Range<u64>) -> Result<Bytes>;
20
21 async fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> Result<Vec<Bytes>> {
24 let mut result = Vec::with_capacity(ranges.len());
25 for range in ranges {
26 let data = self.get_bytes(range).await?;
27 result.push(data);
28 }
29 Ok(result)
30 }
31
32 async fn file_size(&self) -> Result<Option<u64>> {
35 Ok(None)
36 }
37}
38
39#[async_trait]
40impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
41 async fn get_bytes(&self, range: Range<u64>) -> Result<Bytes> {
42 self.as_ref().get_bytes(range).await
43 }
44
45 async fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> Result<Vec<Bytes>> {
46 self.as_ref().get_byte_ranges(ranges).await
47 }
48
49 async fn file_size(&self) -> Result<Option<u64>> {
50 self.as_ref().file_size().await
51 }
52}
53
54#[async_trait]
55impl AsyncFileReader for Arc<dyn AsyncFileReader + '_> {
56 async fn get_bytes(&self, range: Range<u64>) -> Result<Bytes> {
57 self.as_ref().get_bytes(range).await
58 }
59
60 async fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> Result<Vec<Bytes>> {
61 self.as_ref().get_byte_ranges(ranges).await
62 }
63
64 async fn file_size(&self) -> Result<Option<u64>> {
65 self.as_ref().file_size().await
66 }
67}
68
69#[cfg(feature = "object_store")]
73#[derive(Clone, Debug)]
74pub struct ObjectReader {
75 store: Arc<dyn object_store::ObjectStore>,
76 path: object_store::path::Path,
77}
78
79#[cfg(feature = "object_store")]
80impl ObjectReader {
81 pub fn new(store: Arc<dyn object_store::ObjectStore>, path: object_store::path::Path) -> Self {
83 Self { store, path }
84 }
85}
86
87#[cfg(feature = "object_store")]
88#[async_trait]
89impl AsyncFileReader for ObjectReader {
90 async fn get_bytes(&self, range: Range<u64>) -> Result<Bytes> {
91 use object_store::ObjectStoreExt;
92
93 self.store
94 .get_range(&self.path, range)
95 .await
96 .map_err(HDF5Error::from)
97 }
98
99 async fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> Result<Vec<Bytes>> {
100 self.store
101 .get_ranges(&self.path, &ranges)
102 .await
103 .map_err(HDF5Error::from)
104 }
105
106 async fn file_size(&self) -> Result<Option<u64>> {
107 use object_store::ObjectStoreExt;
108
109 let meta: object_store::ObjectMeta =
110 self.store.head(&self.path).await.map_err(HDF5Error::from)?;
111 Ok(Some(meta.size))
112 }
113}
114
115#[cfg(feature = "tokio")]
119#[derive(Debug)]
120pub struct TokioReader<T: tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin + Send + Debug>(
121 tokio::sync::Mutex<T>,
122);
123
124#[cfg(feature = "tokio")]
125impl<T: tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin + Send + Debug> TokioReader<T> {
126 pub fn new(inner: T) -> Self {
128 Self(tokio::sync::Mutex::new(inner))
129 }
130}
131
132#[cfg(feature = "tokio")]
133#[async_trait]
134impl<T: tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin + Send + Debug + 'static>
135 AsyncFileReader for TokioReader<T>
136{
137 async fn get_bytes(&self, range: Range<u64>) -> Result<Bytes> {
138 use std::io::SeekFrom;
139
140 use tokio::io::{AsyncReadExt, AsyncSeekExt};
141
142 let mut file = self.0.lock().await;
143 file.seek(SeekFrom::Start(range.start)).await?;
144
145 let to_read = (range.end - range.start) as usize;
146 let mut buffer = vec![0u8; to_read];
147
148 let mut total_read = 0;
151 while total_read < to_read {
152 let n = file.read(&mut buffer[total_read..]).await?;
153 if n == 0 {
154 break; }
156 total_read += n;
157 }
158 buffer.truncate(total_read);
159 Ok(buffer.into())
160 }
161}
162
163#[cfg(feature = "reqwest")]
167#[derive(Debug, Clone)]
168pub struct ReqwestReader {
169 client: reqwest::Client,
170 url: reqwest::Url,
171}
172
173#[cfg(feature = "reqwest")]
174impl ReqwestReader {
175 pub fn new(client: reqwest::Client, url: reqwest::Url) -> Self {
177 Self { client, url }
178 }
179}
180
181#[cfg(feature = "reqwest")]
182#[async_trait]
183impl AsyncFileReader for ReqwestReader {
184 async fn get_bytes(&self, range: Range<u64>) -> Result<Bytes> {
185 let range_header = format!("bytes={}-{}", range.start, range.end - 1);
186 let response = self
187 .client
188 .get(self.url.clone())
189 .header("Range", range_header)
190 .send()
191 .await?
192 .error_for_status()?;
193 let bytes = response.bytes().await?;
194 Ok(bytes)
195 }
196}
197
198#[derive(Debug)]
217pub struct BlockCache<F: AsyncFileReader> {
218 inner: F,
219 blocks: Arc<Mutex<HashMap<u64, Bytes>>>,
220 block_size: u64,
221}
222
223impl<F: AsyncFileReader> BlockCache<F> {
224 pub fn new(inner: F) -> Self {
227 Self {
228 inner,
229 blocks: Arc::new(Mutex::new(HashMap::new())),
230 block_size: 8 * 1024 * 1024,
231 }
232 }
233
234 pub fn inner(&self) -> &F {
236 &self.inner
237 }
238
239 pub fn with_block_size(mut self, block_size: u64) -> Self {
241 self.block_size = block_size;
242 self
243 }
244
245 fn block_start(&self, offset: u64) -> u64 {
247 offset / self.block_size * self.block_size
248 }
249}
250
251#[async_trait]
252impl<F: AsyncFileReader + Send + Sync> AsyncFileReader for BlockCache<F> {
253 async fn get_bytes(&self, range: Range<u64>) -> Result<Bytes> {
254 let len = (range.end - range.start) as usize;
255 if len == 0 {
256 return Ok(Bytes::new());
257 }
258
259 let first_block = self.block_start(range.start);
261 let last_block = self.block_start(range.end.saturating_sub(1));
262
263 if first_block == last_block {
265 let block = self.ensure_block(first_block).await?;
266 let local_start = (range.start - first_block) as usize;
267 let local_end = local_start + len;
268 let actual_end = local_end.min(block.len());
270 if local_start >= block.len() {
271 return Ok(Bytes::new());
272 }
273 return Ok(block.slice(local_start..actual_end));
274 }
275
276 let mut out = BytesMut::with_capacity(len);
278 let mut offset = range.start;
279 let mut block_offset = first_block;
280
281 while offset < range.end {
282 let block = self.ensure_block(block_offset).await?;
283 let local_start = (offset - block_offset) as usize;
284 let bytes_from_block =
285 ((block_offset + self.block_size) - offset).min(range.end - offset) as usize;
286 let actual_end = (local_start + bytes_from_block).min(block.len());
287 if local_start >= block.len() {
288 break; }
290 out.extend_from_slice(&block[local_start..actual_end]);
291 if actual_end < local_start + bytes_from_block {
292 break; }
294 offset += bytes_from_block as u64;
295 block_offset += self.block_size;
296 }
297
298 Ok(out.into())
299 }
300
301 async fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> Result<Vec<Bytes>> {
302 let mut result = Vec::with_capacity(ranges.len());
303 for range in ranges {
304 result.push(self.get_bytes(range).await?);
305 }
306 Ok(result)
307 }
308}
309
310impl<F: AsyncFileReader> BlockCache<F> {
311 pub async fn pre_warm(&self, file_size: u64, max_bytes: u64) -> Result<()> {
322 let warm_end = file_size.min(max_bytes);
323 let num_blocks = warm_end.div_ceil(self.block_size);
324
325 let mut ranges: Vec<Range<u64>> = Vec::new();
327 let mut block_starts: Vec<u64> = Vec::new();
328 {
329 let cache = self.blocks.lock().await;
330 for i in 0..num_blocks {
331 let start = i * self.block_size;
332 if !cache.contains_key(&start) {
333 let end = (start + self.block_size).min(file_size);
334 ranges.push(start..end);
335 block_starts.push(start);
336 }
337 }
338 }
339
340 if ranges.is_empty() {
341 return Ok(());
342 }
343
344 let fetched = self.inner.get_byte_ranges(ranges).await?;
346
347 let mut cache = self.blocks.lock().await;
348 for (start, data) in block_starts.into_iter().zip(fetched) {
349 cache.insert(start, data);
350 }
351
352 Ok(())
353 }
354
355 async fn ensure_block(&self, block_start: u64) -> Result<Bytes> {
357 {
358 let cache = self.blocks.lock().await;
359 if let Some(block) = cache.get(&block_start) {
360 return Ok(block.clone());
361 }
362 }
363
364 let fetch_range = block_start..block_start + self.block_size;
366 let data = self.inner.get_bytes(fetch_range).await?;
367
368 let mut cache = self.blocks.lock().await;
369 cache.insert(block_start, data.clone());
370 Ok(data)
371 }
372}