Skip to main content

hdf5_reader/
storage.rs

1use std::fs::File;
2use std::num::NonZeroUsize;
3use std::ops::Deref;
4use std::path::Path;
5use std::sync::Arc;
6
7use lru::LruCache;
8use memmap2::Mmap;
9use parking_lot::Mutex;
10
11use crate::error::{Error, Result};
12
13#[derive(Clone)]
14enum StorageBacking {
15    Bytes(Arc<[u8]>),
16    Mmap(Arc<Mmap>),
17}
18
19/// An immutable byte range returned by a storage backend.
20#[derive(Clone)]
21pub struct StorageBuffer {
22    backing: StorageBacking,
23    start: usize,
24    len: usize,
25}
26
27impl StorageBuffer {
28    pub fn from_vec(bytes: Vec<u8>) -> Self {
29        let len = bytes.len();
30        Self {
31            backing: StorageBacking::Bytes(Arc::<[u8]>::from(bytes)),
32            start: 0,
33            len,
34        }
35    }
36
37    pub(crate) fn from_arc_bytes(bytes: Arc<[u8]>, start: usize, len: usize) -> Self {
38        Self {
39            backing: StorageBacking::Bytes(bytes),
40            start,
41            len,
42        }
43    }
44
45    pub(crate) fn from_arc_mmap(mmap: Arc<Mmap>, start: usize, len: usize) -> Self {
46        Self {
47            backing: StorageBacking::Mmap(mmap),
48            start,
49            len,
50        }
51    }
52
53    pub fn len(&self) -> usize {
54        self.len
55    }
56
57    pub fn is_empty(&self) -> bool {
58        self.len == 0
59    }
60}
61
62impl AsRef<[u8]> for StorageBuffer {
63    fn as_ref(&self) -> &[u8] {
64        self
65    }
66}
67
68impl Deref for StorageBuffer {
69    type Target = [u8];
70
71    fn deref(&self) -> &Self::Target {
72        match &self.backing {
73            StorageBacking::Bytes(bytes) => &bytes[self.start..self.start + self.len],
74            StorageBacking::Mmap(mmap) => &mmap[self.start..self.start + self.len],
75        }
76    }
77}
78
79/// Random-access, immutable byte storage for HDF5 parsing and reads.
80pub trait Storage: Send + Sync {
81    /// Total length in bytes.
82    fn len(&self) -> u64;
83
84    /// Returns `true` if the storage is empty.
85    fn is_empty(&self) -> bool {
86        self.len() == 0
87    }
88
89    /// Read a byte range from `offset..offset+len`.
90    fn read_range(&self, offset: u64, len: usize) -> Result<StorageBuffer>;
91}
92
93pub type DynStorage = Arc<dyn Storage>;
94
95fn check_storage_range(total_len: u64, offset: u64, len: usize) -> Result<u64> {
96    let needed = u64::try_from(len).map_err(|_| Error::OffsetOutOfBounds(offset))?;
97    let end = offset
98        .checked_add(needed)
99        .ok_or(Error::OffsetOutOfBounds(offset))?;
100    if end > total_len {
101        return Err(Error::UnexpectedEof {
102            offset,
103            needed,
104            available: total_len.saturating_sub(offset),
105        });
106    }
107    Ok(end)
108}
109
110/// In-memory storage backed by owned bytes.
111pub struct BytesStorage {
112    data: Arc<[u8]>,
113}
114
115impl BytesStorage {
116    pub fn new(data: Vec<u8>) -> Self {
117        Self {
118            data: Arc::<[u8]>::from(data),
119        }
120    }
121}
122
123impl Storage for BytesStorage {
124    fn len(&self) -> u64 {
125        self.data.len() as u64
126    }
127
128    fn read_range(&self, offset: u64, len: usize) -> Result<StorageBuffer> {
129        let start = usize::try_from(offset).map_err(|_| Error::OffsetOutOfBounds(offset))?;
130        let end = start
131            .checked_add(len)
132            .ok_or(Error::OffsetOutOfBounds(offset))?;
133        if end > self.data.len() {
134            return Err(Error::UnexpectedEof {
135                offset,
136                needed: len as u64,
137                available: self.len().saturating_sub(offset),
138            });
139        }
140        Ok(StorageBuffer::from_arc_bytes(self.data.clone(), start, len))
141    }
142}
143
144/// In-memory storage backed by a read-only memory map.
145pub struct MmapStorage {
146    mmap: Arc<Mmap>,
147}
148
149impl MmapStorage {
150    pub fn new(mmap: Mmap) -> Self {
151        Self {
152            mmap: Arc::new(mmap),
153        }
154    }
155}
156
157impl Storage for MmapStorage {
158    fn len(&self) -> u64 {
159        self.mmap.len() as u64
160    }
161
162    fn read_range(&self, offset: u64, len: usize) -> Result<StorageBuffer> {
163        let start = usize::try_from(offset).map_err(|_| Error::OffsetOutOfBounds(offset))?;
164        let end = start
165            .checked_add(len)
166            .ok_or(Error::OffsetOutOfBounds(offset))?;
167        if end > self.mmap.len() {
168            return Err(Error::UnexpectedEof {
169                offset,
170                needed: len as u64,
171                available: self.len().saturating_sub(offset),
172            });
173        }
174        Ok(StorageBuffer::from_arc_mmap(self.mmap.clone(), start, len))
175    }
176}
177
178/// File-backed storage that serves explicit byte ranges via positional reads.
179pub struct FileStorage {
180    file: Arc<File>,
181    len: u64,
182}
183
184impl FileStorage {
185    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
186        let file = File::open(path)?;
187        Self::from_file(file)
188    }
189
190    pub fn from_file(file: File) -> Result<Self> {
191        let len = file.metadata()?.len();
192        Ok(Self {
193            file: Arc::new(file),
194            len,
195        })
196    }
197}
198
199impl Storage for FileStorage {
200    fn len(&self) -> u64 {
201        self.len
202    }
203
204    fn read_range(&self, offset: u64, len: usize) -> Result<StorageBuffer> {
205        let needed = u64::try_from(len).map_err(|_| Error::OffsetOutOfBounds(offset))?;
206        let end = offset
207            .checked_add(needed)
208            .ok_or(Error::OffsetOutOfBounds(offset))?;
209        if end > self.len {
210            return Err(Error::UnexpectedEof {
211                offset,
212                needed,
213                available: self.len.saturating_sub(offset),
214            });
215        }
216
217        let mut buf = vec![0u8; len];
218        read_exact_at(self.file.as_ref(), &mut buf, offset)?;
219        Ok(StorageBuffer::from_vec(buf))
220    }
221}
222
223/// Storage backed by a caller-provided byte-range reader.
224///
225/// This is intended for HTTP range requests, S3/object-store clients, or other
226/// remote sources that can return exactly the requested byte range.
227pub struct RangeRequestStorage {
228    len: u64,
229    reader: Arc<RangeReader>,
230}
231
232type RangeReader = dyn Fn(u64, usize) -> Result<Vec<u8>> + Send + Sync;
233
234impl RangeRequestStorage {
235    /// Create a storage backend from a total length and a range reader.
236    pub fn new<F>(len: u64, reader: F) -> Self
237    where
238        F: Fn(u64, usize) -> Result<Vec<u8>> + Send + Sync + 'static,
239    {
240        Self {
241            len,
242            reader: Arc::new(reader),
243        }
244    }
245}
246
247impl Storage for RangeRequestStorage {
248    fn len(&self) -> u64 {
249        self.len
250    }
251
252    fn read_range(&self, offset: u64, len: usize) -> Result<StorageBuffer> {
253        check_storage_range(self.len, offset, len)?;
254        let bytes = (self.reader)(offset, len)?;
255        if bytes.len() != len {
256            return Err(Error::UnexpectedEof {
257                offset,
258                needed: len as u64,
259                available: bytes.len() as u64,
260            });
261        }
262        Ok(StorageBuffer::from_vec(bytes))
263    }
264}
265
266/// Point-in-time statistics for [`BlockCacheStorage`].
267#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
268pub struct BlockCacheStats {
269    pub hits: u64,
270    pub misses: u64,
271    pub inserts: u64,
272    pub evictions: u64,
273    pub current_bytes: usize,
274    pub entries: usize,
275    pub block_size: usize,
276    pub max_blocks: usize,
277}
278
279/// Block-aligned LRU cache for any random-access storage backend.
280///
281/// This is useful when the underlying storage is remote or has high per-request
282/// latency. Reads are rounded out to fixed-size blocks and cached by block
283/// number; callers still see the normal [`Storage::read_range`] interface.
284pub struct BlockCacheStorage {
285    inner: DynStorage,
286    block_size: NonZeroUsize,
287    max_blocks: NonZeroUsize,
288    state: Mutex<BlockCacheState>,
289}
290
291struct BlockCacheState {
292    cache: LruCache<u64, Arc<[u8]>>,
293    current_bytes: usize,
294    hits: u64,
295    misses: u64,
296    inserts: u64,
297    evictions: u64,
298}
299
300impl BlockCacheStorage {
301    /// Create a block cache around an existing storage backend.
302    ///
303    /// `block_size` and `max_blocks` values of zero are normalized to 1.
304    pub fn new(inner: DynStorage, block_size: usize, max_blocks: usize) -> Self {
305        let block_size = NonZeroUsize::new(block_size).unwrap_or(NonZeroUsize::new(1).unwrap());
306        let max_blocks = NonZeroUsize::new(max_blocks).unwrap_or(NonZeroUsize::new(1).unwrap());
307        Self {
308            inner,
309            block_size,
310            max_blocks,
311            state: Mutex::new(BlockCacheState {
312                cache: LruCache::new(max_blocks),
313                current_bytes: 0,
314                hits: 0,
315                misses: 0,
316                inserts: 0,
317                evictions: 0,
318            }),
319        }
320    }
321
322    /// Create a block cache with conservative defaults: 1 MiB blocks, 128 slots.
323    pub fn with_defaults(inner: DynStorage) -> Self {
324        Self::new(inner, 1024 * 1024, 128)
325    }
326
327    /// Return current cache statistics.
328    pub fn stats(&self) -> BlockCacheStats {
329        let state = self.state.lock();
330        BlockCacheStats {
331            hits: state.hits,
332            misses: state.misses,
333            inserts: state.inserts,
334            evictions: state.evictions,
335            current_bytes: state.current_bytes,
336            entries: state.cache.len(),
337            block_size: self.block_size.get(),
338            max_blocks: self.max_blocks.get(),
339        }
340    }
341
342    fn read_block(&self, block_index: u64) -> Result<Arc<[u8]>> {
343        {
344            let mut state = self.state.lock();
345            if let Some(block) = state.cache.get(&block_index).cloned() {
346                state.hits += 1;
347                return Ok(block);
348            }
349            state.misses += 1;
350        }
351
352        let block_size = self.block_size.get();
353        let block_start = block_index
354            .checked_mul(block_size as u64)
355            .ok_or(Error::OffsetOutOfBounds(u64::MAX))?;
356        let remaining = self.inner.len().saturating_sub(block_start);
357        let read_len = block_size.min(usize::try_from(remaining).unwrap_or(usize::MAX));
358        let bytes = self.inner.read_range(block_start, read_len)?;
359        let block = Arc::<[u8]>::from(bytes.as_ref());
360
361        let mut state = self.state.lock();
362        if let Some(replaced) = state.cache.peek(&block_index) {
363            state.current_bytes = state.current_bytes.saturating_sub(replaced.len());
364        } else {
365            while state.cache.len() >= self.max_blocks.get() && !state.cache.is_empty() {
366                if let Some((_, evicted)) = state.cache.pop_lru() {
367                    state.current_bytes = state.current_bytes.saturating_sub(evicted.len());
368                    state.evictions += 1;
369                }
370            }
371        }
372
373        state.current_bytes += block.len();
374        state.inserts += 1;
375        state.cache.put(block_index, block.clone());
376        Ok(block)
377    }
378}
379
380impl Storage for BlockCacheStorage {
381    fn len(&self) -> u64 {
382        self.inner.len()
383    }
384
385    fn read_range(&self, offset: u64, len: usize) -> Result<StorageBuffer> {
386        let end = check_storage_range(self.len(), offset, len)?;
387        if len == 0 {
388            return Ok(StorageBuffer::from_vec(Vec::new()));
389        }
390
391        let block_size = self.block_size.get() as u64;
392        let first_block = offset / block_size;
393        let last_block = (end - 1) / block_size;
394
395        if first_block == last_block {
396            let block = self.read_block(first_block)?;
397            let block_start = first_block
398                .checked_mul(block_size)
399                .ok_or(Error::OffsetOutOfBounds(offset))?;
400            let start = usize::try_from(offset - block_start)
401                .map_err(|_| Error::OffsetOutOfBounds(offset))?;
402            return Ok(StorageBuffer::from_arc_bytes(block, start, len));
403        }
404
405        let mut output = vec![0u8; len];
406        let mut written = 0usize;
407        for block_index in first_block..=last_block {
408            let block = self.read_block(block_index)?;
409            let block_start = block_index
410                .checked_mul(block_size)
411                .ok_or(Error::OffsetOutOfBounds(offset))?;
412            let copy_start = offset.max(block_start);
413            let copy_end = end.min(block_start + block.len() as u64);
414            let src_start = usize::try_from(copy_start - block_start)
415                .map_err(|_| Error::OffsetOutOfBounds(offset))?;
416            let copy_len = usize::try_from(copy_end - copy_start)
417                .map_err(|_| Error::OffsetOutOfBounds(offset))?;
418            output[written..written + copy_len]
419                .copy_from_slice(&block[src_start..src_start + copy_len]);
420            written += copy_len;
421        }
422
423        Ok(StorageBuffer::from_vec(output))
424    }
425}
426
427#[cfg(unix)]
428fn read_exact_at(file: &File, mut buf: &mut [u8], mut offset: u64) -> std::io::Result<()> {
429    use std::os::unix::fs::FileExt;
430
431    while !buf.is_empty() {
432        let n = file.read_at(buf, offset)?;
433        if n == 0 {
434            return Err(std::io::Error::new(
435                std::io::ErrorKind::UnexpectedEof,
436                "failed to fill whole buffer",
437            ));
438        }
439        offset += n as u64;
440        buf = &mut buf[n..];
441    }
442    Ok(())
443}
444
445#[cfg(windows)]
446fn read_exact_at(file: &File, mut buf: &mut [u8], mut offset: u64) -> std::io::Result<()> {
447    use std::os::windows::fs::FileExt;
448
449    while !buf.is_empty() {
450        let n = file.seek_read(buf, offset)?;
451        if n == 0 {
452            return Err(std::io::Error::new(
453                std::io::ErrorKind::UnexpectedEof,
454                "failed to fill whole buffer",
455            ));
456        }
457        offset += n as u64;
458        buf = &mut buf[n..];
459    }
460    Ok(())
461}
462
463/// Fallback for targets without a positional file-read syscall (in
464/// practice `wasm32-unknown-unknown` and similar no-OS targets).
465/// `FileStorage` is never instantiated there because there is no
466/// `std::fs::File`-backed flow; this stub keeps the crate linkable
467/// when only `BytesStorage` / `MmapStorage` are used.
468#[cfg(not(any(unix, windows)))]
469fn read_exact_at(_file: &File, _buf: &mut [u8], _offset: u64) -> std::io::Result<()> {
470    Err(std::io::Error::new(
471        std::io::ErrorKind::Unsupported,
472        "FileStorage is unavailable on this target; use BytesStorage or Hdf5File::from_bytes",
473    ))
474}
475
476#[cfg(test)]
477mod tests {
478    use super::*;
479
480    use std::sync::Mutex as StdMutex;
481
482    #[test]
483    fn range_request_storage_reads_exact_ranges() {
484        let data: Arc<[u8]> = Arc::from((0u8..32).collect::<Vec<_>>());
485        let storage = RangeRequestStorage::new(data.len() as u64, {
486            let data = data.clone();
487            move |offset, len| {
488                let start = offset as usize;
489                Ok(data[start..start + len].to_vec())
490            }
491        });
492
493        let bytes = storage.read_range(4, 6).unwrap();
494        assert_eq!(bytes.as_ref(), &[4, 5, 6, 7, 8, 9]);
495    }
496
497    #[test]
498    fn range_request_storage_rejects_short_reads() {
499        let storage = RangeRequestStorage::new(16, |_offset, _len| Ok(vec![1, 2]));
500        let err = match storage.read_range(0, 4) {
501            Ok(_) => panic!("short range read should fail"),
502            Err(err) => err,
503        };
504        assert!(matches!(err, Error::UnexpectedEof { .. }));
505    }
506
507    #[test]
508    fn block_cache_storage_reuses_aligned_blocks() {
509        let data: Arc<[u8]> = Arc::from((0u8..64).collect::<Vec<_>>());
510        let reads = Arc::new(StdMutex::new(Vec::new()));
511        let inner = Arc::new(RangeRequestStorage::new(data.len() as u64, {
512            let data = data.clone();
513            let reads = reads.clone();
514            move |offset, len| {
515                reads.lock().unwrap().push((offset, len));
516                let start = offset as usize;
517                Ok(data[start..start + len].to_vec())
518            }
519        }));
520        let storage = BlockCacheStorage::new(inner, 8, 2);
521
522        assert_eq!(storage.read_range(2, 4).unwrap().as_ref(), &[2, 3, 4, 5]);
523        assert_eq!(storage.read_range(4, 2).unwrap().as_ref(), &[4, 5]);
524        assert_eq!(
525            storage.read_range(6, 6).unwrap().as_ref(),
526            &[6, 7, 8, 9, 10, 11]
527        );
528        assert_eq!(storage.read_range(18, 2).unwrap().as_ref(), &[18, 19]);
529
530        assert_eq!(*reads.lock().unwrap(), vec![(0, 8), (8, 8), (16, 8)]);
531        let stats = storage.stats();
532        assert_eq!(stats.hits, 2);
533        assert_eq!(stats.misses, 3);
534        assert_eq!(stats.inserts, 3);
535        assert_eq!(stats.evictions, 1);
536        assert_eq!(stats.entries, 2);
537    }
538}