Skip to main content

hdf5_reader/
storage.rs

1use std::fs::File;
2use std::num::NonZeroUsize;
3use std::ops::Deref;
4use std::path::Path;
5use std::sync::Arc;
6
7use lru::LruCache;
8use memmap2::Mmap;
9use parking_lot::Mutex;
10
11use crate::error::{Error, Result};
12
13#[derive(Clone)]
14enum StorageBacking {
15    Bytes(Arc<[u8]>),
16    Mmap(Arc<Mmap>),
17}
18
19/// An immutable byte range returned by a storage backend.
20#[derive(Clone)]
21pub struct StorageBuffer {
22    backing: StorageBacking,
23    start: usize,
24    len: usize,
25}
26
27impl StorageBuffer {
28    pub fn from_vec(bytes: Vec<u8>) -> Self {
29        let len = bytes.len();
30        Self {
31            backing: StorageBacking::Bytes(Arc::<[u8]>::from(bytes)),
32            start: 0,
33            len,
34        }
35    }
36
37    pub(crate) fn from_arc_bytes(bytes: Arc<[u8]>, start: usize, len: usize) -> Self {
38        Self {
39            backing: StorageBacking::Bytes(bytes),
40            start,
41            len,
42        }
43    }
44
45    pub(crate) fn from_arc_mmap(mmap: Arc<Mmap>, start: usize, len: usize) -> Self {
46        Self {
47            backing: StorageBacking::Mmap(mmap),
48            start,
49            len,
50        }
51    }
52
53    pub fn len(&self) -> usize {
54        self.len
55    }
56
57    pub fn is_empty(&self) -> bool {
58        self.len == 0
59    }
60}
61
62impl AsRef<[u8]> for StorageBuffer {
63    fn as_ref(&self) -> &[u8] {
64        self
65    }
66}
67
68impl Deref for StorageBuffer {
69    type Target = [u8];
70
71    fn deref(&self) -> &Self::Target {
72        match &self.backing {
73            StorageBacking::Bytes(bytes) => &bytes[self.start..self.start + self.len],
74            StorageBacking::Mmap(mmap) => &mmap[self.start..self.start + self.len],
75        }
76    }
77}
78
79/// Random-access, immutable byte storage for HDF5 parsing and reads.
80pub trait Storage: Send + Sync {
81    /// Total length in bytes.
82    fn len(&self) -> u64;
83
84    /// Returns `true` if the storage is empty.
85    fn is_empty(&self) -> bool {
86        self.len() == 0
87    }
88
89    /// Read a byte range from `offset..offset+len`.
90    fn read_range(&self, offset: u64, len: usize) -> Result<StorageBuffer>;
91}
92
93pub type DynStorage = Arc<dyn Storage>;
94
95fn check_storage_range(total_len: u64, offset: u64, len: usize) -> Result<u64> {
96    let needed = u64::try_from(len).map_err(|_| Error::OffsetOutOfBounds(offset))?;
97    let end = offset
98        .checked_add(needed)
99        .ok_or(Error::OffsetOutOfBounds(offset))?;
100    if end > total_len {
101        return Err(Error::UnexpectedEof {
102            offset,
103            needed,
104            available: total_len.saturating_sub(offset),
105        });
106    }
107    Ok(end)
108}
109
110/// In-memory storage backed by owned bytes.
111pub struct BytesStorage {
112    data: Arc<[u8]>,
113}
114
115impl BytesStorage {
116    pub fn new(data: Vec<u8>) -> Self {
117        Self {
118            data: Arc::<[u8]>::from(data),
119        }
120    }
121}
122
123impl Storage for BytesStorage {
124    fn len(&self) -> u64 {
125        self.data.len() as u64
126    }
127
128    fn read_range(&self, offset: u64, len: usize) -> Result<StorageBuffer> {
129        let start = usize::try_from(offset).map_err(|_| Error::OffsetOutOfBounds(offset))?;
130        let end = start
131            .checked_add(len)
132            .ok_or(Error::OffsetOutOfBounds(offset))?;
133        if end > self.data.len() {
134            return Err(Error::UnexpectedEof {
135                offset,
136                needed: len as u64,
137                available: self.len().saturating_sub(offset),
138            });
139        }
140        Ok(StorageBuffer::from_arc_bytes(self.data.clone(), start, len))
141    }
142}
143
144/// In-memory storage backed by a read-only memory map.
145pub struct MmapStorage {
146    mmap: Arc<Mmap>,
147}
148
149impl MmapStorage {
150    pub fn new(mmap: Mmap) -> Self {
151        Self {
152            mmap: Arc::new(mmap),
153        }
154    }
155}
156
157impl Storage for MmapStorage {
158    fn len(&self) -> u64 {
159        self.mmap.len() as u64
160    }
161
162    fn read_range(&self, offset: u64, len: usize) -> Result<StorageBuffer> {
163        let start = usize::try_from(offset).map_err(|_| Error::OffsetOutOfBounds(offset))?;
164        let end = start
165            .checked_add(len)
166            .ok_or(Error::OffsetOutOfBounds(offset))?;
167        if end > self.mmap.len() {
168            return Err(Error::UnexpectedEof {
169                offset,
170                needed: len as u64,
171                available: self.len().saturating_sub(offset),
172            });
173        }
174        Ok(StorageBuffer::from_arc_mmap(self.mmap.clone(), start, len))
175    }
176}
177
178/// File-backed storage that serves explicit byte ranges via positional reads.
179pub struct FileStorage {
180    file: Arc<File>,
181    len: u64,
182}
183
184impl FileStorage {
185    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
186        let file = File::open(path)?;
187        let len = file.metadata()?.len();
188        Ok(Self {
189            file: Arc::new(file),
190            len,
191        })
192    }
193}
194
195impl Storage for FileStorage {
196    fn len(&self) -> u64 {
197        self.len
198    }
199
200    fn read_range(&self, offset: u64, len: usize) -> Result<StorageBuffer> {
201        let needed = u64::try_from(len).map_err(|_| Error::OffsetOutOfBounds(offset))?;
202        let end = offset
203            .checked_add(needed)
204            .ok_or(Error::OffsetOutOfBounds(offset))?;
205        if end > self.len {
206            return Err(Error::UnexpectedEof {
207                offset,
208                needed,
209                available: self.len.saturating_sub(offset),
210            });
211        }
212
213        let mut buf = vec![0u8; len];
214        read_exact_at(self.file.as_ref(), &mut buf, offset)?;
215        Ok(StorageBuffer::from_vec(buf))
216    }
217}
218
219/// Storage backed by a caller-provided byte-range reader.
220///
221/// This is intended for HTTP range requests, S3/object-store clients, or other
222/// remote sources that can return exactly the requested byte range.
223pub struct RangeRequestStorage {
224    len: u64,
225    reader: Arc<RangeReader>,
226}
227
228type RangeReader = dyn Fn(u64, usize) -> Result<Vec<u8>> + Send + Sync;
229
230impl RangeRequestStorage {
231    /// Create a storage backend from a total length and a range reader.
232    pub fn new<F>(len: u64, reader: F) -> Self
233    where
234        F: Fn(u64, usize) -> Result<Vec<u8>> + Send + Sync + 'static,
235    {
236        Self {
237            len,
238            reader: Arc::new(reader),
239        }
240    }
241}
242
243impl Storage for RangeRequestStorage {
244    fn len(&self) -> u64 {
245        self.len
246    }
247
248    fn read_range(&self, offset: u64, len: usize) -> Result<StorageBuffer> {
249        check_storage_range(self.len, offset, len)?;
250        let bytes = (self.reader)(offset, len)?;
251        if bytes.len() != len {
252            return Err(Error::UnexpectedEof {
253                offset,
254                needed: len as u64,
255                available: bytes.len() as u64,
256            });
257        }
258        Ok(StorageBuffer::from_vec(bytes))
259    }
260}
261
262/// Point-in-time statistics for [`BlockCacheStorage`].
263#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
264pub struct BlockCacheStats {
265    pub hits: u64,
266    pub misses: u64,
267    pub inserts: u64,
268    pub evictions: u64,
269    pub current_bytes: usize,
270    pub entries: usize,
271    pub block_size: usize,
272    pub max_blocks: usize,
273}
274
275/// Block-aligned LRU cache for any random-access storage backend.
276///
277/// This is useful when the underlying storage is remote or has high per-request
278/// latency. Reads are rounded out to fixed-size blocks and cached by block
279/// number; callers still see the normal [`Storage::read_range`] interface.
280pub struct BlockCacheStorage {
281    inner: DynStorage,
282    block_size: NonZeroUsize,
283    max_blocks: NonZeroUsize,
284    state: Mutex<BlockCacheState>,
285}
286
287struct BlockCacheState {
288    cache: LruCache<u64, Arc<[u8]>>,
289    current_bytes: usize,
290    hits: u64,
291    misses: u64,
292    inserts: u64,
293    evictions: u64,
294}
295
296impl BlockCacheStorage {
297    /// Create a block cache around an existing storage backend.
298    ///
299    /// `block_size` and `max_blocks` values of zero are normalized to 1.
300    pub fn new(inner: DynStorage, block_size: usize, max_blocks: usize) -> Self {
301        let block_size = NonZeroUsize::new(block_size).unwrap_or(NonZeroUsize::new(1).unwrap());
302        let max_blocks = NonZeroUsize::new(max_blocks).unwrap_or(NonZeroUsize::new(1).unwrap());
303        Self {
304            inner,
305            block_size,
306            max_blocks,
307            state: Mutex::new(BlockCacheState {
308                cache: LruCache::new(max_blocks),
309                current_bytes: 0,
310                hits: 0,
311                misses: 0,
312                inserts: 0,
313                evictions: 0,
314            }),
315        }
316    }
317
318    /// Create a block cache with conservative defaults: 1 MiB blocks, 128 slots.
319    pub fn with_defaults(inner: DynStorage) -> Self {
320        Self::new(inner, 1024 * 1024, 128)
321    }
322
323    /// Return current cache statistics.
324    pub fn stats(&self) -> BlockCacheStats {
325        let state = self.state.lock();
326        BlockCacheStats {
327            hits: state.hits,
328            misses: state.misses,
329            inserts: state.inserts,
330            evictions: state.evictions,
331            current_bytes: state.current_bytes,
332            entries: state.cache.len(),
333            block_size: self.block_size.get(),
334            max_blocks: self.max_blocks.get(),
335        }
336    }
337
338    fn read_block(&self, block_index: u64) -> Result<Arc<[u8]>> {
339        {
340            let mut state = self.state.lock();
341            if let Some(block) = state.cache.get(&block_index).cloned() {
342                state.hits += 1;
343                return Ok(block);
344            }
345            state.misses += 1;
346        }
347
348        let block_size = self.block_size.get();
349        let block_start = block_index
350            .checked_mul(block_size as u64)
351            .ok_or(Error::OffsetOutOfBounds(u64::MAX))?;
352        let remaining = self.inner.len().saturating_sub(block_start);
353        let read_len = block_size.min(usize::try_from(remaining).unwrap_or(usize::MAX));
354        let bytes = self.inner.read_range(block_start, read_len)?;
355        let block = Arc::<[u8]>::from(bytes.as_ref());
356
357        let mut state = self.state.lock();
358        if let Some(replaced) = state.cache.peek(&block_index) {
359            state.current_bytes = state.current_bytes.saturating_sub(replaced.len());
360        } else {
361            while state.cache.len() >= self.max_blocks.get() && !state.cache.is_empty() {
362                if let Some((_, evicted)) = state.cache.pop_lru() {
363                    state.current_bytes = state.current_bytes.saturating_sub(evicted.len());
364                    state.evictions += 1;
365                }
366            }
367        }
368
369        state.current_bytes += block.len();
370        state.inserts += 1;
371        state.cache.put(block_index, block.clone());
372        Ok(block)
373    }
374}
375
376impl Storage for BlockCacheStorage {
377    fn len(&self) -> u64 {
378        self.inner.len()
379    }
380
381    fn read_range(&self, offset: u64, len: usize) -> Result<StorageBuffer> {
382        let end = check_storage_range(self.len(), offset, len)?;
383        if len == 0 {
384            return Ok(StorageBuffer::from_vec(Vec::new()));
385        }
386
387        let block_size = self.block_size.get() as u64;
388        let first_block = offset / block_size;
389        let last_block = (end - 1) / block_size;
390
391        if first_block == last_block {
392            let block = self.read_block(first_block)?;
393            let block_start = first_block
394                .checked_mul(block_size)
395                .ok_or(Error::OffsetOutOfBounds(offset))?;
396            let start = usize::try_from(offset - block_start)
397                .map_err(|_| Error::OffsetOutOfBounds(offset))?;
398            return Ok(StorageBuffer::from_arc_bytes(block, start, len));
399        }
400
401        let mut output = vec![0u8; len];
402        let mut written = 0usize;
403        for block_index in first_block..=last_block {
404            let block = self.read_block(block_index)?;
405            let block_start = block_index
406                .checked_mul(block_size)
407                .ok_or(Error::OffsetOutOfBounds(offset))?;
408            let copy_start = offset.max(block_start);
409            let copy_end = end.min(block_start + block.len() as u64);
410            let src_start = usize::try_from(copy_start - block_start)
411                .map_err(|_| Error::OffsetOutOfBounds(offset))?;
412            let copy_len = usize::try_from(copy_end - copy_start)
413                .map_err(|_| Error::OffsetOutOfBounds(offset))?;
414            output[written..written + copy_len]
415                .copy_from_slice(&block[src_start..src_start + copy_len]);
416            written += copy_len;
417        }
418
419        Ok(StorageBuffer::from_vec(output))
420    }
421}
422
423#[cfg(unix)]
424fn read_exact_at(file: &File, mut buf: &mut [u8], mut offset: u64) -> std::io::Result<()> {
425    use std::os::unix::fs::FileExt;
426
427    while !buf.is_empty() {
428        let n = file.read_at(buf, offset)?;
429        if n == 0 {
430            return Err(std::io::Error::new(
431                std::io::ErrorKind::UnexpectedEof,
432                "failed to fill whole buffer",
433            ));
434        }
435        offset += n as u64;
436        buf = &mut buf[n..];
437    }
438    Ok(())
439}
440
441#[cfg(windows)]
442fn read_exact_at(file: &File, mut buf: &mut [u8], mut offset: u64) -> std::io::Result<()> {
443    use std::os::windows::fs::FileExt;
444
445    while !buf.is_empty() {
446        let n = file.seek_read(buf, offset)?;
447        if n == 0 {
448            return Err(std::io::Error::new(
449                std::io::ErrorKind::UnexpectedEof,
450                "failed to fill whole buffer",
451            ));
452        }
453        offset += n as u64;
454        buf = &mut buf[n..];
455    }
456    Ok(())
457}
458
459#[cfg(test)]
460mod tests {
461    use super::*;
462
463    use std::sync::Mutex as StdMutex;
464
465    #[test]
466    fn range_request_storage_reads_exact_ranges() {
467        let data: Arc<[u8]> = Arc::from((0u8..32).collect::<Vec<_>>());
468        let storage = RangeRequestStorage::new(data.len() as u64, {
469            let data = data.clone();
470            move |offset, len| {
471                let start = offset as usize;
472                Ok(data[start..start + len].to_vec())
473            }
474        });
475
476        let bytes = storage.read_range(4, 6).unwrap();
477        assert_eq!(bytes.as_ref(), &[4, 5, 6, 7, 8, 9]);
478    }
479
480    #[test]
481    fn range_request_storage_rejects_short_reads() {
482        let storage = RangeRequestStorage::new(16, |_offset, _len| Ok(vec![1, 2]));
483        let err = match storage.read_range(0, 4) {
484            Ok(_) => panic!("short range read should fail"),
485            Err(err) => err,
486        };
487        assert!(matches!(err, Error::UnexpectedEof { .. }));
488    }
489
490    #[test]
491    fn block_cache_storage_reuses_aligned_blocks() {
492        let data: Arc<[u8]> = Arc::from((0u8..64).collect::<Vec<_>>());
493        let reads = Arc::new(StdMutex::new(Vec::new()));
494        let inner = Arc::new(RangeRequestStorage::new(data.len() as u64, {
495            let data = data.clone();
496            let reads = reads.clone();
497            move |offset, len| {
498                reads.lock().unwrap().push((offset, len));
499                let start = offset as usize;
500                Ok(data[start..start + len].to_vec())
501            }
502        }));
503        let storage = BlockCacheStorage::new(inner, 8, 2);
504
505        assert_eq!(storage.read_range(2, 4).unwrap().as_ref(), &[2, 3, 4, 5]);
506        assert_eq!(storage.read_range(4, 2).unwrap().as_ref(), &[4, 5]);
507        assert_eq!(
508            storage.read_range(6, 6).unwrap().as_ref(),
509            &[6, 7, 8, 9, 10, 11]
510        );
511        assert_eq!(storage.read_range(18, 2).unwrap().as_ref(), &[18, 19]);
512
513        assert_eq!(*reads.lock().unwrap(), vec![(0, 8), (8, 8), (16, 8)]);
514        let stats = storage.stats();
515        assert_eq!(stats.hits, 2);
516        assert_eq!(stats.misses, 3);
517        assert_eq!(stats.inserts, 3);
518        assert_eq!(stats.evictions, 1);
519        assert_eq!(stats.entries, 2);
520    }
521}