1use std::fs::File;
2use std::num::NonZeroUsize;
3use std::ops::Deref;
4use std::path::Path;
5use std::sync::Arc;
6
7use lru::LruCache;
8use memmap2::Mmap;
9use parking_lot::Mutex;
10
11use crate::error::{Error, Result};
12
13#[derive(Clone)]
14enum StorageBacking {
15 Bytes(Arc<[u8]>),
16 Mmap(Arc<Mmap>),
17}
18
19#[derive(Clone)]
21pub struct StorageBuffer {
22 backing: StorageBacking,
23 start: usize,
24 len: usize,
25}
26
27impl StorageBuffer {
28 pub fn from_vec(bytes: Vec<u8>) -> Self {
29 let len = bytes.len();
30 Self {
31 backing: StorageBacking::Bytes(Arc::<[u8]>::from(bytes)),
32 start: 0,
33 len,
34 }
35 }
36
37 pub(crate) fn from_arc_bytes(bytes: Arc<[u8]>, start: usize, len: usize) -> Self {
38 Self {
39 backing: StorageBacking::Bytes(bytes),
40 start,
41 len,
42 }
43 }
44
45 pub(crate) fn from_arc_mmap(mmap: Arc<Mmap>, start: usize, len: usize) -> Self {
46 Self {
47 backing: StorageBacking::Mmap(mmap),
48 start,
49 len,
50 }
51 }
52
53 pub fn len(&self) -> usize {
54 self.len
55 }
56
57 pub fn is_empty(&self) -> bool {
58 self.len == 0
59 }
60}
61
62impl AsRef<[u8]> for StorageBuffer {
63 fn as_ref(&self) -> &[u8] {
64 self
65 }
66}
67
68impl Deref for StorageBuffer {
69 type Target = [u8];
70
71 fn deref(&self) -> &Self::Target {
72 match &self.backing {
73 StorageBacking::Bytes(bytes) => &bytes[self.start..self.start + self.len],
74 StorageBacking::Mmap(mmap) => &mmap[self.start..self.start + self.len],
75 }
76 }
77}
78
79pub trait Storage: Send + Sync {
81 fn len(&self) -> u64;
83
84 fn is_empty(&self) -> bool {
86 self.len() == 0
87 }
88
89 fn read_range(&self, offset: u64, len: usize) -> Result<StorageBuffer>;
91}
92
93pub type DynStorage = Arc<dyn Storage>;
94
95fn check_storage_range(total_len: u64, offset: u64, len: usize) -> Result<u64> {
96 let needed = u64::try_from(len).map_err(|_| Error::OffsetOutOfBounds(offset))?;
97 let end = offset
98 .checked_add(needed)
99 .ok_or(Error::OffsetOutOfBounds(offset))?;
100 if end > total_len {
101 return Err(Error::UnexpectedEof {
102 offset,
103 needed,
104 available: total_len.saturating_sub(offset),
105 });
106 }
107 Ok(end)
108}
109
110pub struct BytesStorage {
112 data: Arc<[u8]>,
113}
114
115impl BytesStorage {
116 pub fn new(data: Vec<u8>) -> Self {
117 Self {
118 data: Arc::<[u8]>::from(data),
119 }
120 }
121}
122
123impl Storage for BytesStorage {
124 fn len(&self) -> u64 {
125 self.data.len() as u64
126 }
127
128 fn read_range(&self, offset: u64, len: usize) -> Result<StorageBuffer> {
129 let start = usize::try_from(offset).map_err(|_| Error::OffsetOutOfBounds(offset))?;
130 let end = start
131 .checked_add(len)
132 .ok_or(Error::OffsetOutOfBounds(offset))?;
133 if end > self.data.len() {
134 return Err(Error::UnexpectedEof {
135 offset,
136 needed: len as u64,
137 available: self.len().saturating_sub(offset),
138 });
139 }
140 Ok(StorageBuffer::from_arc_bytes(self.data.clone(), start, len))
141 }
142}
143
144pub struct MmapStorage {
146 mmap: Arc<Mmap>,
147}
148
149impl MmapStorage {
150 pub fn new(mmap: Mmap) -> Self {
151 Self {
152 mmap: Arc::new(mmap),
153 }
154 }
155}
156
157impl Storage for MmapStorage {
158 fn len(&self) -> u64 {
159 self.mmap.len() as u64
160 }
161
162 fn read_range(&self, offset: u64, len: usize) -> Result<StorageBuffer> {
163 let start = usize::try_from(offset).map_err(|_| Error::OffsetOutOfBounds(offset))?;
164 let end = start
165 .checked_add(len)
166 .ok_or(Error::OffsetOutOfBounds(offset))?;
167 if end > self.mmap.len() {
168 return Err(Error::UnexpectedEof {
169 offset,
170 needed: len as u64,
171 available: self.len().saturating_sub(offset),
172 });
173 }
174 Ok(StorageBuffer::from_arc_mmap(self.mmap.clone(), start, len))
175 }
176}
177
178pub struct FileStorage {
180 file: Arc<File>,
181 len: u64,
182}
183
184impl FileStorage {
185 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
186 let file = File::open(path)?;
187 let len = file.metadata()?.len();
188 Ok(Self {
189 file: Arc::new(file),
190 len,
191 })
192 }
193}
194
195impl Storage for FileStorage {
196 fn len(&self) -> u64 {
197 self.len
198 }
199
200 fn read_range(&self, offset: u64, len: usize) -> Result<StorageBuffer> {
201 let needed = u64::try_from(len).map_err(|_| Error::OffsetOutOfBounds(offset))?;
202 let end = offset
203 .checked_add(needed)
204 .ok_or(Error::OffsetOutOfBounds(offset))?;
205 if end > self.len {
206 return Err(Error::UnexpectedEof {
207 offset,
208 needed,
209 available: self.len.saturating_sub(offset),
210 });
211 }
212
213 let mut buf = vec![0u8; len];
214 read_exact_at(self.file.as_ref(), &mut buf, offset)?;
215 Ok(StorageBuffer::from_vec(buf))
216 }
217}
218
219pub struct RangeRequestStorage {
224 len: u64,
225 reader: Arc<RangeReader>,
226}
227
228type RangeReader = dyn Fn(u64, usize) -> Result<Vec<u8>> + Send + Sync;
229
230impl RangeRequestStorage {
231 pub fn new<F>(len: u64, reader: F) -> Self
233 where
234 F: Fn(u64, usize) -> Result<Vec<u8>> + Send + Sync + 'static,
235 {
236 Self {
237 len,
238 reader: Arc::new(reader),
239 }
240 }
241}
242
243impl Storage for RangeRequestStorage {
244 fn len(&self) -> u64 {
245 self.len
246 }
247
248 fn read_range(&self, offset: u64, len: usize) -> Result<StorageBuffer> {
249 check_storage_range(self.len, offset, len)?;
250 let bytes = (self.reader)(offset, len)?;
251 if bytes.len() != len {
252 return Err(Error::UnexpectedEof {
253 offset,
254 needed: len as u64,
255 available: bytes.len() as u64,
256 });
257 }
258 Ok(StorageBuffer::from_vec(bytes))
259 }
260}
261
262#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
264pub struct BlockCacheStats {
265 pub hits: u64,
266 pub misses: u64,
267 pub inserts: u64,
268 pub evictions: u64,
269 pub current_bytes: usize,
270 pub entries: usize,
271 pub block_size: usize,
272 pub max_blocks: usize,
273}
274
275pub struct BlockCacheStorage {
281 inner: DynStorage,
282 block_size: NonZeroUsize,
283 max_blocks: NonZeroUsize,
284 state: Mutex<BlockCacheState>,
285}
286
287struct BlockCacheState {
288 cache: LruCache<u64, Arc<[u8]>>,
289 current_bytes: usize,
290 hits: u64,
291 misses: u64,
292 inserts: u64,
293 evictions: u64,
294}
295
296impl BlockCacheStorage {
297 pub fn new(inner: DynStorage, block_size: usize, max_blocks: usize) -> Self {
301 let block_size = NonZeroUsize::new(block_size).unwrap_or(NonZeroUsize::new(1).unwrap());
302 let max_blocks = NonZeroUsize::new(max_blocks).unwrap_or(NonZeroUsize::new(1).unwrap());
303 Self {
304 inner,
305 block_size,
306 max_blocks,
307 state: Mutex::new(BlockCacheState {
308 cache: LruCache::new(max_blocks),
309 current_bytes: 0,
310 hits: 0,
311 misses: 0,
312 inserts: 0,
313 evictions: 0,
314 }),
315 }
316 }
317
318 pub fn with_defaults(inner: DynStorage) -> Self {
320 Self::new(inner, 1024 * 1024, 128)
321 }
322
323 pub fn stats(&self) -> BlockCacheStats {
325 let state = self.state.lock();
326 BlockCacheStats {
327 hits: state.hits,
328 misses: state.misses,
329 inserts: state.inserts,
330 evictions: state.evictions,
331 current_bytes: state.current_bytes,
332 entries: state.cache.len(),
333 block_size: self.block_size.get(),
334 max_blocks: self.max_blocks.get(),
335 }
336 }
337
338 fn read_block(&self, block_index: u64) -> Result<Arc<[u8]>> {
339 {
340 let mut state = self.state.lock();
341 if let Some(block) = state.cache.get(&block_index).cloned() {
342 state.hits += 1;
343 return Ok(block);
344 }
345 state.misses += 1;
346 }
347
348 let block_size = self.block_size.get();
349 let block_start = block_index
350 .checked_mul(block_size as u64)
351 .ok_or(Error::OffsetOutOfBounds(u64::MAX))?;
352 let remaining = self.inner.len().saturating_sub(block_start);
353 let read_len = block_size.min(usize::try_from(remaining).unwrap_or(usize::MAX));
354 let bytes = self.inner.read_range(block_start, read_len)?;
355 let block = Arc::<[u8]>::from(bytes.as_ref());
356
357 let mut state = self.state.lock();
358 if let Some(replaced) = state.cache.peek(&block_index) {
359 state.current_bytes = state.current_bytes.saturating_sub(replaced.len());
360 } else {
361 while state.cache.len() >= self.max_blocks.get() && !state.cache.is_empty() {
362 if let Some((_, evicted)) = state.cache.pop_lru() {
363 state.current_bytes = state.current_bytes.saturating_sub(evicted.len());
364 state.evictions += 1;
365 }
366 }
367 }
368
369 state.current_bytes += block.len();
370 state.inserts += 1;
371 state.cache.put(block_index, block.clone());
372 Ok(block)
373 }
374}
375
376impl Storage for BlockCacheStorage {
377 fn len(&self) -> u64 {
378 self.inner.len()
379 }
380
381 fn read_range(&self, offset: u64, len: usize) -> Result<StorageBuffer> {
382 let end = check_storage_range(self.len(), offset, len)?;
383 if len == 0 {
384 return Ok(StorageBuffer::from_vec(Vec::new()));
385 }
386
387 let block_size = self.block_size.get() as u64;
388 let first_block = offset / block_size;
389 let last_block = (end - 1) / block_size;
390
391 if first_block == last_block {
392 let block = self.read_block(first_block)?;
393 let block_start = first_block
394 .checked_mul(block_size)
395 .ok_or(Error::OffsetOutOfBounds(offset))?;
396 let start = usize::try_from(offset - block_start)
397 .map_err(|_| Error::OffsetOutOfBounds(offset))?;
398 return Ok(StorageBuffer::from_arc_bytes(block, start, len));
399 }
400
401 let mut output = vec![0u8; len];
402 let mut written = 0usize;
403 for block_index in first_block..=last_block {
404 let block = self.read_block(block_index)?;
405 let block_start = block_index
406 .checked_mul(block_size)
407 .ok_or(Error::OffsetOutOfBounds(offset))?;
408 let copy_start = offset.max(block_start);
409 let copy_end = end.min(block_start + block.len() as u64);
410 let src_start = usize::try_from(copy_start - block_start)
411 .map_err(|_| Error::OffsetOutOfBounds(offset))?;
412 let copy_len = usize::try_from(copy_end - copy_start)
413 .map_err(|_| Error::OffsetOutOfBounds(offset))?;
414 output[written..written + copy_len]
415 .copy_from_slice(&block[src_start..src_start + copy_len]);
416 written += copy_len;
417 }
418
419 Ok(StorageBuffer::from_vec(output))
420 }
421}
422
423#[cfg(unix)]
424fn read_exact_at(file: &File, mut buf: &mut [u8], mut offset: u64) -> std::io::Result<()> {
425 use std::os::unix::fs::FileExt;
426
427 while !buf.is_empty() {
428 let n = file.read_at(buf, offset)?;
429 if n == 0 {
430 return Err(std::io::Error::new(
431 std::io::ErrorKind::UnexpectedEof,
432 "failed to fill whole buffer",
433 ));
434 }
435 offset += n as u64;
436 buf = &mut buf[n..];
437 }
438 Ok(())
439}
440
441#[cfg(windows)]
442fn read_exact_at(file: &File, mut buf: &mut [u8], mut offset: u64) -> std::io::Result<()> {
443 use std::os::windows::fs::FileExt;
444
445 while !buf.is_empty() {
446 let n = file.seek_read(buf, offset)?;
447 if n == 0 {
448 return Err(std::io::Error::new(
449 std::io::ErrorKind::UnexpectedEof,
450 "failed to fill whole buffer",
451 ));
452 }
453 offset += n as u64;
454 buf = &mut buf[n..];
455 }
456 Ok(())
457}
458
459#[cfg(test)]
460mod tests {
461 use super::*;
462
463 use std::sync::Mutex as StdMutex;
464
465 #[test]
466 fn range_request_storage_reads_exact_ranges() {
467 let data: Arc<[u8]> = Arc::from((0u8..32).collect::<Vec<_>>());
468 let storage = RangeRequestStorage::new(data.len() as u64, {
469 let data = data.clone();
470 move |offset, len| {
471 let start = offset as usize;
472 Ok(data[start..start + len].to_vec())
473 }
474 });
475
476 let bytes = storage.read_range(4, 6).unwrap();
477 assert_eq!(bytes.as_ref(), &[4, 5, 6, 7, 8, 9]);
478 }
479
480 #[test]
481 fn range_request_storage_rejects_short_reads() {
482 let storage = RangeRequestStorage::new(16, |_offset, _len| Ok(vec![1, 2]));
483 let err = match storage.read_range(0, 4) {
484 Ok(_) => panic!("short range read should fail"),
485 Err(err) => err,
486 };
487 assert!(matches!(err, Error::UnexpectedEof { .. }));
488 }
489
490 #[test]
491 fn block_cache_storage_reuses_aligned_blocks() {
492 let data: Arc<[u8]> = Arc::from((0u8..64).collect::<Vec<_>>());
493 let reads = Arc::new(StdMutex::new(Vec::new()));
494 let inner = Arc::new(RangeRequestStorage::new(data.len() as u64, {
495 let data = data.clone();
496 let reads = reads.clone();
497 move |offset, len| {
498 reads.lock().unwrap().push((offset, len));
499 let start = offset as usize;
500 Ok(data[start..start + len].to_vec())
501 }
502 }));
503 let storage = BlockCacheStorage::new(inner, 8, 2);
504
505 assert_eq!(storage.read_range(2, 4).unwrap().as_ref(), &[2, 3, 4, 5]);
506 assert_eq!(storage.read_range(4, 2).unwrap().as_ref(), &[4, 5]);
507 assert_eq!(
508 storage.read_range(6, 6).unwrap().as_ref(),
509 &[6, 7, 8, 9, 10, 11]
510 );
511 assert_eq!(storage.read_range(18, 2).unwrap().as_ref(), &[18, 19]);
512
513 assert_eq!(*reads.lock().unwrap(), vec![(0, 8), (8, 8), (16, 8)]);
514 let stats = storage.stats();
515 assert_eq!(stats.hits, 2);
516 assert_eq!(stats.misses, 3);
517 assert_eq!(stats.inserts, 3);
518 assert_eq!(stats.evictions, 1);
519 assert_eq!(stats.entries, 2);
520 }
521}