1use std::fs::File;
2use std::num::NonZeroUsize;
3use std::ops::Deref;
4use std::path::Path;
5use std::sync::Arc;
6
7use lru::LruCache;
8use memmap2::Mmap;
9use parking_lot::Mutex;
10
11use crate::error::{Error, Result};
12
13#[derive(Clone)]
14enum StorageBacking {
15 Bytes(Arc<[u8]>),
16 Mmap(Arc<Mmap>),
17}
18
19#[derive(Clone)]
21pub struct StorageBuffer {
22 backing: StorageBacking,
23 start: usize,
24 len: usize,
25}
26
27impl StorageBuffer {
28 pub fn from_vec(bytes: Vec<u8>) -> Self {
29 let len = bytes.len();
30 Self {
31 backing: StorageBacking::Bytes(Arc::<[u8]>::from(bytes)),
32 start: 0,
33 len,
34 }
35 }
36
37 pub(crate) fn from_arc_bytes(bytes: Arc<[u8]>, start: usize, len: usize) -> Self {
38 Self {
39 backing: StorageBacking::Bytes(bytes),
40 start,
41 len,
42 }
43 }
44
45 pub(crate) fn from_arc_mmap(mmap: Arc<Mmap>, start: usize, len: usize) -> Self {
46 Self {
47 backing: StorageBacking::Mmap(mmap),
48 start,
49 len,
50 }
51 }
52
53 pub fn len(&self) -> usize {
54 self.len
55 }
56
57 pub fn is_empty(&self) -> bool {
58 self.len == 0
59 }
60}
61
62impl AsRef<[u8]> for StorageBuffer {
63 fn as_ref(&self) -> &[u8] {
64 self
65 }
66}
67
68impl Deref for StorageBuffer {
69 type Target = [u8];
70
71 fn deref(&self) -> &Self::Target {
72 match &self.backing {
73 StorageBacking::Bytes(bytes) => &bytes[self.start..self.start + self.len],
74 StorageBacking::Mmap(mmap) => &mmap[self.start..self.start + self.len],
75 }
76 }
77}
78
79pub trait Storage: Send + Sync {
81 fn len(&self) -> u64;
83
84 fn is_empty(&self) -> bool {
86 self.len() == 0
87 }
88
89 fn read_range(&self, offset: u64, len: usize) -> Result<StorageBuffer>;
91}
92
93pub type DynStorage = Arc<dyn Storage>;
94
95fn check_storage_range(total_len: u64, offset: u64, len: usize) -> Result<u64> {
96 let needed = u64::try_from(len).map_err(|_| Error::OffsetOutOfBounds(offset))?;
97 let end = offset
98 .checked_add(needed)
99 .ok_or(Error::OffsetOutOfBounds(offset))?;
100 if end > total_len {
101 return Err(Error::UnexpectedEof {
102 offset,
103 needed,
104 available: total_len.saturating_sub(offset),
105 });
106 }
107 Ok(end)
108}
109
110pub struct BytesStorage {
112 data: Arc<[u8]>,
113}
114
115impl BytesStorage {
116 pub fn new(data: Vec<u8>) -> Self {
117 Self {
118 data: Arc::<[u8]>::from(data),
119 }
120 }
121}
122
123impl Storage for BytesStorage {
124 fn len(&self) -> u64 {
125 self.data.len() as u64
126 }
127
128 fn read_range(&self, offset: u64, len: usize) -> Result<StorageBuffer> {
129 let start = usize::try_from(offset).map_err(|_| Error::OffsetOutOfBounds(offset))?;
130 let end = start
131 .checked_add(len)
132 .ok_or(Error::OffsetOutOfBounds(offset))?;
133 if end > self.data.len() {
134 return Err(Error::UnexpectedEof {
135 offset,
136 needed: len as u64,
137 available: self.len().saturating_sub(offset),
138 });
139 }
140 Ok(StorageBuffer::from_arc_bytes(self.data.clone(), start, len))
141 }
142}
143
144pub struct MmapStorage {
146 mmap: Arc<Mmap>,
147}
148
149impl MmapStorage {
150 pub fn new(mmap: Mmap) -> Self {
151 Self {
152 mmap: Arc::new(mmap),
153 }
154 }
155}
156
157impl Storage for MmapStorage {
158 fn len(&self) -> u64 {
159 self.mmap.len() as u64
160 }
161
162 fn read_range(&self, offset: u64, len: usize) -> Result<StorageBuffer> {
163 let start = usize::try_from(offset).map_err(|_| Error::OffsetOutOfBounds(offset))?;
164 let end = start
165 .checked_add(len)
166 .ok_or(Error::OffsetOutOfBounds(offset))?;
167 if end > self.mmap.len() {
168 return Err(Error::UnexpectedEof {
169 offset,
170 needed: len as u64,
171 available: self.len().saturating_sub(offset),
172 });
173 }
174 Ok(StorageBuffer::from_arc_mmap(self.mmap.clone(), start, len))
175 }
176}
177
178pub struct FileStorage {
180 file: Arc<File>,
181 len: u64,
182}
183
184impl FileStorage {
185 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
186 let file = File::open(path)?;
187 Self::from_file(file)
188 }
189
190 pub fn from_file(file: File) -> Result<Self> {
191 let len = file.metadata()?.len();
192 Ok(Self {
193 file: Arc::new(file),
194 len,
195 })
196 }
197}
198
199impl Storage for FileStorage {
200 fn len(&self) -> u64 {
201 self.len
202 }
203
204 fn read_range(&self, offset: u64, len: usize) -> Result<StorageBuffer> {
205 let needed = u64::try_from(len).map_err(|_| Error::OffsetOutOfBounds(offset))?;
206 let end = offset
207 .checked_add(needed)
208 .ok_or(Error::OffsetOutOfBounds(offset))?;
209 if end > self.len {
210 return Err(Error::UnexpectedEof {
211 offset,
212 needed,
213 available: self.len.saturating_sub(offset),
214 });
215 }
216
217 let mut buf = vec![0u8; len];
218 read_exact_at(self.file.as_ref(), &mut buf, offset)?;
219 Ok(StorageBuffer::from_vec(buf))
220 }
221}
222
223pub struct RangeRequestStorage {
228 len: u64,
229 reader: Arc<RangeReader>,
230}
231
232type RangeReader = dyn Fn(u64, usize) -> Result<Vec<u8>> + Send + Sync;
233
234impl RangeRequestStorage {
235 pub fn new<F>(len: u64, reader: F) -> Self
237 where
238 F: Fn(u64, usize) -> Result<Vec<u8>> + Send + Sync + 'static,
239 {
240 Self {
241 len,
242 reader: Arc::new(reader),
243 }
244 }
245}
246
247impl Storage for RangeRequestStorage {
248 fn len(&self) -> u64 {
249 self.len
250 }
251
252 fn read_range(&self, offset: u64, len: usize) -> Result<StorageBuffer> {
253 check_storage_range(self.len, offset, len)?;
254 let bytes = (self.reader)(offset, len)?;
255 if bytes.len() != len {
256 return Err(Error::UnexpectedEof {
257 offset,
258 needed: len as u64,
259 available: bytes.len() as u64,
260 });
261 }
262 Ok(StorageBuffer::from_vec(bytes))
263 }
264}
265
266#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
268pub struct BlockCacheStats {
269 pub hits: u64,
270 pub misses: u64,
271 pub inserts: u64,
272 pub evictions: u64,
273 pub current_bytes: usize,
274 pub entries: usize,
275 pub block_size: usize,
276 pub max_blocks: usize,
277}
278
279pub struct BlockCacheStorage {
285 inner: DynStorage,
286 block_size: NonZeroUsize,
287 max_blocks: NonZeroUsize,
288 state: Mutex<BlockCacheState>,
289}
290
291struct BlockCacheState {
292 cache: LruCache<u64, Arc<[u8]>>,
293 current_bytes: usize,
294 hits: u64,
295 misses: u64,
296 inserts: u64,
297 evictions: u64,
298}
299
300impl BlockCacheStorage {
301 pub fn new(inner: DynStorage, block_size: usize, max_blocks: usize) -> Self {
305 let block_size = NonZeroUsize::new(block_size).unwrap_or(NonZeroUsize::new(1).unwrap());
306 let max_blocks = NonZeroUsize::new(max_blocks).unwrap_or(NonZeroUsize::new(1).unwrap());
307 Self {
308 inner,
309 block_size,
310 max_blocks,
311 state: Mutex::new(BlockCacheState {
312 cache: LruCache::new(max_blocks),
313 current_bytes: 0,
314 hits: 0,
315 misses: 0,
316 inserts: 0,
317 evictions: 0,
318 }),
319 }
320 }
321
322 pub fn with_defaults(inner: DynStorage) -> Self {
324 Self::new(inner, 1024 * 1024, 128)
325 }
326
327 pub fn stats(&self) -> BlockCacheStats {
329 let state = self.state.lock();
330 BlockCacheStats {
331 hits: state.hits,
332 misses: state.misses,
333 inserts: state.inserts,
334 evictions: state.evictions,
335 current_bytes: state.current_bytes,
336 entries: state.cache.len(),
337 block_size: self.block_size.get(),
338 max_blocks: self.max_blocks.get(),
339 }
340 }
341
342 fn read_block(&self, block_index: u64) -> Result<Arc<[u8]>> {
343 {
344 let mut state = self.state.lock();
345 if let Some(block) = state.cache.get(&block_index).cloned() {
346 state.hits += 1;
347 return Ok(block);
348 }
349 state.misses += 1;
350 }
351
352 let block_size = self.block_size.get();
353 let block_start = block_index
354 .checked_mul(block_size as u64)
355 .ok_or(Error::OffsetOutOfBounds(u64::MAX))?;
356 let remaining = self.inner.len().saturating_sub(block_start);
357 let read_len = block_size.min(usize::try_from(remaining).unwrap_or(usize::MAX));
358 let bytes = self.inner.read_range(block_start, read_len)?;
359 let block = Arc::<[u8]>::from(bytes.as_ref());
360
361 let mut state = self.state.lock();
362 if let Some(replaced) = state.cache.peek(&block_index) {
363 state.current_bytes = state.current_bytes.saturating_sub(replaced.len());
364 } else {
365 while state.cache.len() >= self.max_blocks.get() && !state.cache.is_empty() {
366 if let Some((_, evicted)) = state.cache.pop_lru() {
367 state.current_bytes = state.current_bytes.saturating_sub(evicted.len());
368 state.evictions += 1;
369 }
370 }
371 }
372
373 state.current_bytes += block.len();
374 state.inserts += 1;
375 state.cache.put(block_index, block.clone());
376 Ok(block)
377 }
378}
379
380impl Storage for BlockCacheStorage {
381 fn len(&self) -> u64 {
382 self.inner.len()
383 }
384
385 fn read_range(&self, offset: u64, len: usize) -> Result<StorageBuffer> {
386 let end = check_storage_range(self.len(), offset, len)?;
387 if len == 0 {
388 return Ok(StorageBuffer::from_vec(Vec::new()));
389 }
390
391 let block_size = self.block_size.get() as u64;
392 let first_block = offset / block_size;
393 let last_block = (end - 1) / block_size;
394
395 if first_block == last_block {
396 let block = self.read_block(first_block)?;
397 let block_start = first_block
398 .checked_mul(block_size)
399 .ok_or(Error::OffsetOutOfBounds(offset))?;
400 let start = usize::try_from(offset - block_start)
401 .map_err(|_| Error::OffsetOutOfBounds(offset))?;
402 return Ok(StorageBuffer::from_arc_bytes(block, start, len));
403 }
404
405 let mut output = vec![0u8; len];
406 let mut written = 0usize;
407 for block_index in first_block..=last_block {
408 let block = self.read_block(block_index)?;
409 let block_start = block_index
410 .checked_mul(block_size)
411 .ok_or(Error::OffsetOutOfBounds(offset))?;
412 let copy_start = offset.max(block_start);
413 let copy_end = end.min(block_start + block.len() as u64);
414 let src_start = usize::try_from(copy_start - block_start)
415 .map_err(|_| Error::OffsetOutOfBounds(offset))?;
416 let copy_len = usize::try_from(copy_end - copy_start)
417 .map_err(|_| Error::OffsetOutOfBounds(offset))?;
418 output[written..written + copy_len]
419 .copy_from_slice(&block[src_start..src_start + copy_len]);
420 written += copy_len;
421 }
422
423 Ok(StorageBuffer::from_vec(output))
424 }
425}
426
427#[cfg(unix)]
428fn read_exact_at(file: &File, mut buf: &mut [u8], mut offset: u64) -> std::io::Result<()> {
429 use std::os::unix::fs::FileExt;
430
431 while !buf.is_empty() {
432 let n = file.read_at(buf, offset)?;
433 if n == 0 {
434 return Err(std::io::Error::new(
435 std::io::ErrorKind::UnexpectedEof,
436 "failed to fill whole buffer",
437 ));
438 }
439 offset += n as u64;
440 buf = &mut buf[n..];
441 }
442 Ok(())
443}
444
445#[cfg(windows)]
446fn read_exact_at(file: &File, mut buf: &mut [u8], mut offset: u64) -> std::io::Result<()> {
447 use std::os::windows::fs::FileExt;
448
449 while !buf.is_empty() {
450 let n = file.seek_read(buf, offset)?;
451 if n == 0 {
452 return Err(std::io::Error::new(
453 std::io::ErrorKind::UnexpectedEof,
454 "failed to fill whole buffer",
455 ));
456 }
457 offset += n as u64;
458 buf = &mut buf[n..];
459 }
460 Ok(())
461}
462
463#[cfg(not(any(unix, windows)))]
469fn read_exact_at(_file: &File, _buf: &mut [u8], _offset: u64) -> std::io::Result<()> {
470 Err(std::io::Error::new(
471 std::io::ErrorKind::Unsupported,
472 "FileStorage is unavailable on this target; use BytesStorage or Hdf5File::from_bytes",
473 ))
474}
475
476#[cfg(test)]
477mod tests {
478 use super::*;
479
480 use std::sync::Mutex as StdMutex;
481
482 #[test]
483 fn range_request_storage_reads_exact_ranges() {
484 let data: Arc<[u8]> = Arc::from((0u8..32).collect::<Vec<_>>());
485 let storage = RangeRequestStorage::new(data.len() as u64, {
486 let data = data.clone();
487 move |offset, len| {
488 let start = offset as usize;
489 Ok(data[start..start + len].to_vec())
490 }
491 });
492
493 let bytes = storage.read_range(4, 6).unwrap();
494 assert_eq!(bytes.as_ref(), &[4, 5, 6, 7, 8, 9]);
495 }
496
497 #[test]
498 fn range_request_storage_rejects_short_reads() {
499 let storage = RangeRequestStorage::new(16, |_offset, _len| Ok(vec![1, 2]));
500 let err = match storage.read_range(0, 4) {
501 Ok(_) => panic!("short range read should fail"),
502 Err(err) => err,
503 };
504 assert!(matches!(err, Error::UnexpectedEof { .. }));
505 }
506
507 #[test]
508 fn block_cache_storage_reuses_aligned_blocks() {
509 let data: Arc<[u8]> = Arc::from((0u8..64).collect::<Vec<_>>());
510 let reads = Arc::new(StdMutex::new(Vec::new()));
511 let inner = Arc::new(RangeRequestStorage::new(data.len() as u64, {
512 let data = data.clone();
513 let reads = reads.clone();
514 move |offset, len| {
515 reads.lock().unwrap().push((offset, len));
516 let start = offset as usize;
517 Ok(data[start..start + len].to_vec())
518 }
519 }));
520 let storage = BlockCacheStorage::new(inner, 8, 2);
521
522 assert_eq!(storage.read_range(2, 4).unwrap().as_ref(), &[2, 3, 4, 5]);
523 assert_eq!(storage.read_range(4, 2).unwrap().as_ref(), &[4, 5]);
524 assert_eq!(
525 storage.read_range(6, 6).unwrap().as_ref(),
526 &[6, 7, 8, 9, 10, 11]
527 );
528 assert_eq!(storage.read_range(18, 2).unwrap().as_ref(), &[18, 19]);
529
530 assert_eq!(*reads.lock().unwrap(), vec![(0, 8), (8, 8), (16, 8)]);
531 let stats = storage.stats();
532 assert_eq!(stats.hits, 2);
533 assert_eq!(stats.misses, 3);
534 assert_eq!(stats.inserts, 3);
535 assert_eq!(stats.evictions, 1);
536 assert_eq!(stats.entries, 2);
537 }
538}