Skip to main content

async_hdf5/
reader.rs

1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::ops::Range;
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use bytes::{Bytes, BytesMut};
8use tokio::sync::Mutex;
9
10use crate::error::{HDF5Error, Result};
11
12/// Async interface for reading byte ranges from an HDF5 file.
13///
14/// Modeled after async-tiff's `AsyncFileReader` trait. Implementations exist
15/// for `object_store::ObjectStore`, `reqwest`, and `tokio::fs::File`.
16#[async_trait]
17pub trait AsyncFileReader: Debug + Send + Sync + 'static {
18    /// Fetch the bytes in the given range.
19    async fn get_bytes(&self, range: Range<u64>) -> Result<Bytes>;
20
21    /// Fetch multiple byte ranges. The default implementation calls `get_bytes`
22    /// sequentially; `ObjectReader` overrides this with `get_ranges()`.
23    async fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> Result<Vec<Bytes>> {
24        let mut result = Vec::with_capacity(ranges.len());
25        for range in ranges {
26            let data = self.get_bytes(range).await?;
27            result.push(data);
28        }
29        Ok(result)
30    }
31
32    /// Return the total file size, if known.  Used by `BlockCache::pre_warm`
33    /// to fetch all blocks in parallel.  The default returns `None`.
34    async fn file_size(&self) -> Result<Option<u64>> {
35        Ok(None)
36    }
37}
38
39#[async_trait]
40impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
41    async fn get_bytes(&self, range: Range<u64>) -> Result<Bytes> {
42        self.as_ref().get_bytes(range).await
43    }
44
45    async fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> Result<Vec<Bytes>> {
46        self.as_ref().get_byte_ranges(ranges).await
47    }
48
49    async fn file_size(&self) -> Result<Option<u64>> {
50        self.as_ref().file_size().await
51    }
52}
53
54#[async_trait]
55impl AsyncFileReader for Arc<dyn AsyncFileReader + '_> {
56    async fn get_bytes(&self, range: Range<u64>) -> Result<Bytes> {
57        self.as_ref().get_bytes(range).await
58    }
59
60    async fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> Result<Vec<Bytes>> {
61        self.as_ref().get_byte_ranges(ranges).await
62    }
63
64    async fn file_size(&self) -> Result<Option<u64>> {
65        self.as_ref().file_size().await
66    }
67}
68
69// ── ObjectReader ────────────────────────────────────────────────────────────
70
71/// An AsyncFileReader that reads from an [`ObjectStore`](object_store::ObjectStore).
72#[cfg(feature = "object_store")]
73#[derive(Clone, Debug)]
74pub struct ObjectReader {
75    store: Arc<dyn object_store::ObjectStore>,
76    path: object_store::path::Path,
77}
78
79#[cfg(feature = "object_store")]
80impl ObjectReader {
81    /// Create a new ObjectReader.
82    pub fn new(store: Arc<dyn object_store::ObjectStore>, path: object_store::path::Path) -> Self {
83        Self { store, path }
84    }
85}
86
87#[cfg(feature = "object_store")]
88#[async_trait]
89impl AsyncFileReader for ObjectReader {
90    async fn get_bytes(&self, range: Range<u64>) -> Result<Bytes> {
91        use object_store::ObjectStoreExt;
92
93        self.store
94            .get_range(&self.path, range)
95            .await
96            .map_err(HDF5Error::from)
97    }
98
99    async fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> Result<Vec<Bytes>> {
100        self.store
101            .get_ranges(&self.path, &ranges)
102            .await
103            .map_err(HDF5Error::from)
104    }
105
106    async fn file_size(&self) -> Result<Option<u64>> {
107        use object_store::ObjectStoreExt;
108
109        let meta: object_store::ObjectMeta =
110            self.store.head(&self.path).await.map_err(HDF5Error::from)?;
111        Ok(Some(meta.size))
112    }
113}
114
115// ── TokioReader ─────────────────────────────────────────────────────────────
116
117/// An AsyncFileReader that wraps a `tokio::fs::File` or similar async reader.
118#[cfg(feature = "tokio")]
119#[derive(Debug)]
120pub struct TokioReader<T: tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin + Send + Debug>(
121    tokio::sync::Mutex<T>,
122);
123
124#[cfg(feature = "tokio")]
125impl<T: tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin + Send + Debug> TokioReader<T> {
126    /// Create a new TokioReader.
127    pub fn new(inner: T) -> Self {
128        Self(tokio::sync::Mutex::new(inner))
129    }
130}
131
132#[cfg(feature = "tokio")]
133#[async_trait]
134impl<T: tokio::io::AsyncRead + tokio::io::AsyncSeek + Unpin + Send + Debug + 'static>
135    AsyncFileReader for TokioReader<T>
136{
137    async fn get_bytes(&self, range: Range<u64>) -> Result<Bytes> {
138        use std::io::SeekFrom;
139
140        use tokio::io::{AsyncReadExt, AsyncSeekExt};
141
142        let mut file = self.0.lock().await;
143        file.seek(SeekFrom::Start(range.start)).await?;
144
145        let to_read = (range.end - range.start) as usize;
146        let mut buffer = vec![0u8; to_read];
147
148        // Use read_buf loop instead of read_exact to handle EOF gracefully.
149        // The BlockCache may request ranges past the end of the file.
150        let mut total_read = 0;
151        while total_read < to_read {
152            let n = file.read(&mut buffer[total_read..]).await?;
153            if n == 0 {
154                break; // EOF
155            }
156            total_read += n;
157        }
158        buffer.truncate(total_read);
159        Ok(buffer.into())
160    }
161}
162
163// ── ReqwestReader ───────────────────────────────────────────────────────────
164
165/// An AsyncFileReader that reads from a URL using reqwest HTTP range requests.
166#[cfg(feature = "reqwest")]
167#[derive(Debug, Clone)]
168pub struct ReqwestReader {
169    client: reqwest::Client,
170    url: reqwest::Url,
171}
172
173#[cfg(feature = "reqwest")]
174impl ReqwestReader {
175    /// Create a new ReqwestReader.
176    pub fn new(client: reqwest::Client, url: reqwest::Url) -> Self {
177        Self { client, url }
178    }
179}
180
181#[cfg(feature = "reqwest")]
182#[async_trait]
183impl AsyncFileReader for ReqwestReader {
184    async fn get_bytes(&self, range: Range<u64>) -> Result<Bytes> {
185        let range_header = format!("bytes={}-{}", range.start, range.end - 1);
186        let response = self
187            .client
188            .get(self.url.clone())
189            .header("Range", range_header)
190            .send()
191            .await?
192            .error_for_status()?;
193        let bytes = response.bytes().await?;
194        Ok(bytes)
195    }
196}
197
198// ── BlockCache ──────────────────────────────────────────────────────────────
199
200/// A caching wrapper that fetches fixed-size aligned blocks around each
201/// accessed offset.
202///
203/// Unlike a sequential readahead cache, a block cache handles the scattered
204/// access patterns of HDF5 metadata efficiently: the superblock is at offset 0,
205/// object headers and B-tree nodes are scattered across the file, and a
206/// sequential cache would waste bandwidth fetching unneeded array data between
207/// metadata structures.
208///
209/// When a byte range is requested, the cache fetches any aligned blocks that
210/// overlap the range and caches them for future reads.  Nearby metadata reads
211/// (e.g., an object header and its continuation chunk, or adjacent B-tree
212/// nodes) naturally share blocks.
213///
214/// Default block size is 8 MiB, which typically resolves all metadata for a
215/// GOES-16 MCMIPF file (~164 datasets) in about 10 requests.
216#[derive(Debug)]
217pub struct BlockCache<F: AsyncFileReader> {
218    inner: F,
219    blocks: Arc<Mutex<HashMap<u64, Bytes>>>,
220    block_size: u64,
221}
222
223impl<F: AsyncFileReader> BlockCache<F> {
224    /// Create a new BlockCache wrapping the given reader.
225    /// Default block size: 8 MiB.
226    pub fn new(inner: F) -> Self {
227        Self {
228            inner,
229            blocks: Arc::new(Mutex::new(HashMap::new())),
230            block_size: 8 * 1024 * 1024,
231        }
232    }
233
234    /// Access the inner reader.
235    pub fn inner(&self) -> &F {
236        &self.inner
237    }
238
239    /// Set the block size in bytes.
240    pub fn with_block_size(mut self, block_size: u64) -> Self {
241        self.block_size = block_size;
242        self
243    }
244
245    /// Aligned block start for a given offset.
246    fn block_start(&self, offset: u64) -> u64 {
247        offset / self.block_size * self.block_size
248    }
249}
250
251#[async_trait]
252impl<F: AsyncFileReader + Send + Sync> AsyncFileReader for BlockCache<F> {
253    async fn get_bytes(&self, range: Range<u64>) -> Result<Bytes> {
254        let len = (range.end - range.start) as usize;
255        if len == 0 {
256            return Ok(Bytes::new());
257        }
258
259        // Determine which blocks we need.
260        let first_block = self.block_start(range.start);
261        let last_block = self.block_start(range.end.saturating_sub(1));
262
263        // Fast path: single block (most common case for metadata reads).
264        if first_block == last_block {
265            let block = self.ensure_block(first_block).await?;
266            let local_start = (range.start - first_block) as usize;
267            let local_end = local_start + len;
268            // Handle reads near EOF where block may be shorter.
269            let actual_end = local_end.min(block.len());
270            if local_start >= block.len() {
271                return Ok(Bytes::new());
272            }
273            return Ok(block.slice(local_start..actual_end));
274        }
275
276        // Multi-block: assemble from consecutive blocks.
277        let mut out = BytesMut::with_capacity(len);
278        let mut offset = range.start;
279        let mut block_offset = first_block;
280
281        while offset < range.end {
282            let block = self.ensure_block(block_offset).await?;
283            let local_start = (offset - block_offset) as usize;
284            let bytes_from_block =
285                ((block_offset + self.block_size) - offset).min(range.end - offset) as usize;
286            let actual_end = (local_start + bytes_from_block).min(block.len());
287            if local_start >= block.len() {
288                break; // EOF
289            }
290            out.extend_from_slice(&block[local_start..actual_end]);
291            if actual_end < local_start + bytes_from_block {
292                break; // EOF mid-block
293            }
294            offset += bytes_from_block as u64;
295            block_offset += self.block_size;
296        }
297
298        Ok(out.into())
299    }
300
301    async fn get_byte_ranges(&self, ranges: Vec<Range<u64>>) -> Result<Vec<Bytes>> {
302        let mut result = Vec::with_capacity(ranges.len());
303        for range in ranges {
304            result.push(self.get_bytes(range).await?);
305        }
306        Ok(result)
307    }
308}
309
310impl<F: AsyncFileReader> BlockCache<F> {
311    /// Pre-fetch blocks in parallel to eliminate sequential cache misses
312    /// during metadata traversal (B-tree parsing, object headers, etc.).
313    ///
314    /// - If `max_bytes` covers the entire file, every block is fetched.
315    /// - Otherwise only the first `max_bytes / block_size` blocks are fetched
316    ///   (HDF5 metadata — superblock, root group, B-tree nodes — is typically
317    ///   concentrated near the beginning of the file).
318    ///
319    /// Issues a single batched `get_byte_ranges` call so that the backend
320    /// can coalesce and parallelize the fetches.
321    pub async fn pre_warm(&self, file_size: u64, max_bytes: u64) -> Result<()> {
322        let warm_end = file_size.min(max_bytes);
323        let num_blocks = warm_end.div_ceil(self.block_size);
324
325        // Collect ranges for blocks not already cached.
326        let mut ranges: Vec<Range<u64>> = Vec::new();
327        let mut block_starts: Vec<u64> = Vec::new();
328        {
329            let cache = self.blocks.lock().await;
330            for i in 0..num_blocks {
331                let start = i * self.block_size;
332                if !cache.contains_key(&start) {
333                    let end = (start + self.block_size).min(file_size);
334                    ranges.push(start..end);
335                    block_starts.push(start);
336                }
337            }
338        }
339
340        if ranges.is_empty() {
341            return Ok(());
342        }
343
344        // Single batched fetch — object_store will coalesce and parallelize.
345        let fetched = self.inner.get_byte_ranges(ranges).await?;
346
347        let mut cache = self.blocks.lock().await;
348        for (start, data) in block_starts.into_iter().zip(fetched) {
349            cache.insert(start, data);
350        }
351
352        Ok(())
353    }
354
355    /// Ensure a block is in the cache, fetching it if not.
356    async fn ensure_block(&self, block_start: u64) -> Result<Bytes> {
357        {
358            let cache = self.blocks.lock().await;
359            if let Some(block) = cache.get(&block_start) {
360                return Ok(block.clone());
361            }
362        }
363
364        // Fetch the block.  The block may be shorter than block_size at EOF.
365        let fetch_range = block_start..block_start + self.block_size;
366        let data = self.inner.get_bytes(fetch_range).await?;
367
368        let mut cache = self.blocks.lock().await;
369        cache.insert(block_start, data.clone());
370        Ok(data)
371    }
372}