Skip to main content

pure_magic/readers/
cache.rs

1#![deny(unsafe_code)]
2
3use std::{
4    cmp::{max, min},
5    fs::File,
6    io::{self, Read, Seek, SeekFrom, Write},
7    ops::Range,
8    path::Path,
9};
10
11use crate::readers::DataRead;
12use memmap2::MmapMut;
13
14const EMPTY_RANGE: &[u8] = &[];
15
16/// A lazy-loading cache reader with a multi-tiered caching strategy.
17///
18/// Wraps a [`Read`] + [`Seek`] type and provides efficient cached reads using
19/// a hierarchy of caches: hot (head/tail), warm (memory-mapped), and cold (direct).
20///
21/// The cache automatically loads data in blocks as needed, minimizing I/O operations
22/// for sequential and random access patterns.
23///
24/// # Cache Tiers
25///
26/// - **Hot cache**: Small buffers at the head and tail of the source, always available.
27/// - **Warm cache**: Memory-mapped region for frequently accessed data.
28/// - **Cold cache**: Fallback buffer for reads that don't fit in other caches.
29///
30/// See [`LazyCache::from_read_seek`], [`LazyCache::open`], [`LazyCache::with_hot_cache`],
31/// and [`LazyCache::with_warm_cache`] for construction.
32pub struct LazyCache<R>
33where
34    R: Read + Seek,
35{
36    source: R,
37    loaded: Vec<bool>,
38    hot_head: Vec<u8>,
39    hot_tail: Vec<u8>,
40    warm: Option<MmapMut>,
41    cold: Vec<u8>,
42    block_size: u64,
43    warm_size: Option<u64>,
44    stream_pos: u64,
45    pos_end: u64,
46}
47
48const BLOCK_SIZE: usize = 4096;
49
50impl<R> DataRead for LazyCache<R>
51where
52    R: Read + Seek,
53{
54    #[inline(always)]
55    fn stream_position(&self) -> u64 {
56        self.stream_pos
57    }
58
59    fn read_range(&mut self, range: Range<u64>) -> Result<&[u8], io::Error> {
60        self.get_range_u64(range)
61    }
62
63    fn read_until_any_delim_or_limit(
64        &mut self,
65        delims: &[u8],
66        limit: u64,
67    ) -> Result<&[u8], io::Error> {
68        self._read_while_or_limit(|b| !delims.contains(&b), limit, true)
69    }
70
71    fn read_until_or_limit(&mut self, byte: u8, limit: u64) -> Result<&[u8], io::Error> {
72        self._read_while_or_limit(|b| b != byte, limit, true)
73    }
74
75    fn read_while_or_limit<F>(&mut self, f: F, limit: u64) -> Result<&[u8], io::Error>
76    where
77        F: Fn(u8) -> bool,
78    {
79        self._read_while_or_limit(f, limit, false)
80    }
81
82    fn read_until_utf16_or_limit(
83        &mut self,
84        utf16_char: &[u8; 2],
85        limit: u64,
86    ) -> Result<&[u8], io::Error> {
87        let start = self.stream_pos;
88        let mut end = 0;
89
90        let even_bs = if self.block_size.is_multiple_of(2) {
91            self.block_size
92        } else {
93            self.block_size.saturating_add(1)
94        };
95
96        'outer: while limit.saturating_sub(end) > 0 {
97            let buf = self.read_count(even_bs)?;
98
99            let even = buf
100                .iter()
101                .enumerate()
102                .filter(|(i, _)| i % 2 == 0)
103                .map(|t| t.1);
104
105            let odd = buf
106                .iter()
107                .enumerate()
108                .filter(|(i, _)| i % 2 != 0)
109                .map(|t| t.1);
110
111            for t in even.zip(odd) {
112                if limit.saturating_sub(end) == 0 {
113                    break 'outer;
114                }
115
116                end += 2;
117
118                // tail check
119                if t.0 == &utf16_char[0] && t.1 == &utf16_char[1] {
120                    // we include char
121                    break 'outer;
122                }
123            }
124
125            // we processed the last chunk
126            if buf.len() as u64 != even_bs {
127                // if we arrive here we reached end of file
128                if buf.len() % 2 != 0 {
129                    // we include last byte missed by zip
130                    end += 1
131                }
132                break;
133            }
134        }
135
136        self.read_exact_range(start..start + end)
137    }
138
139    fn data_size(&self) -> u64 {
140        self.pos_end
141    }
142
143    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
144        self.stream_pos = self.offset_from_start(pos);
145        Ok(self.stream_pos)
146    }
147}
148
149impl LazyCache<File> {
150    /// Opens a file and creates a new `LazyCache` for it.
151    ///
152    /// This is a convenience constructor equivalent to calling [`LazyCache::from_read_seek`]
153    /// with a [`File`].
154    ///
155    /// # Errors
156    ///
157    /// Returns an error if the file cannot be opened.
158    ///
159    /// # Examples
160    ///
161    /// ```no_run
162    /// use pure_magic::readers::LazyCache;
163    /// use std::path::Path;
164    ///
165    /// let cache = LazyCache::<std::fs::File>::open(Path::new("file.bin"))?;
166    /// # Ok::<_, std::io::Error>(())
167    /// ```
168    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, io::Error> {
169        Self::from_read_seek(File::open(path)?)
170    }
171}
172
173impl<R> io::Read for LazyCache<R>
174where
175    R: Read + Seek,
176{
177    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
178        let r = self.read_count(buf.len() as u64)?;
179        for (i, b) in r.iter().enumerate() {
180            buf[i] = *b;
181        }
182        Ok(r.len())
183    }
184}
185
186impl<R> LazyCache<R>
187where
188    R: Read + Seek,
189{
190    /// Creates a new `LazyCache` wrapping a [`Read`] + [`Seek`] type.
191    ///
192    /// The cache is initialized with default settings: no hot or warm caches.
193    /// Use [`LazyCache::with_hot_cache`] and [`LazyCache::with_warm_cache`] to enable additional cache tiers.
194    ///
195    /// # Errors
196    ///
197    /// Returns an error if seeking to the end of the source fails.
198    ///
199    /// # Examples
200    ///
201    /// ```
202    /// use pure_magic::readers::{LazyCache, DataRead};
203    /// use std::io::Cursor;
204    ///
205    /// let data = b"hello world";
206    /// let cache = LazyCache::from_read_seek(Cursor::new(data)).unwrap();
207    /// assert_eq!(cache.data_size(), data.len() as u64);
208    /// ```
209    pub fn from_read_seek(mut rs: R) -> Result<Self, io::Error> {
210        let block_size = BLOCK_SIZE as u64;
211        let pos_end = rs.seek(SeekFrom::End(0))?;
212        let cache_cap = pos_end.div_ceil(BLOCK_SIZE as u64);
213
214        Ok(Self {
215            source: rs,
216            hot_head: vec![],
217            hot_tail: vec![],
218            warm: None,
219            cold: vec![0; block_size as usize],
220            loaded: vec![false; cache_cap as usize],
221            block_size,
222            warm_size: None,
223            stream_pos: 0,
224            pos_end,
225        })
226    }
227
228    /// Enables the hot cache with the specified size.
229    ///
230    /// The hot cache maintains two buffers: one at the head (beginning) and one
231    /// at the tail (end) of the source, each with size `size / 2`. This is useful
232    /// for optimizing access to the start and end of files.
233    ///
234    /// # Errors
235    ///
236    /// Returns an error if seeking or reading from the source fails.
237    pub fn with_hot_cache(mut self, size: usize) -> Result<Self, io::Error> {
238        let head_tail_size = size / 2;
239
240        self.source.seek(SeekFrom::Start(0))?;
241
242        if self.pos_end > size as u64 {
243            self.hot_head = vec![0u8; head_tail_size];
244            self.source.read_exact(self.hot_head.as_mut_slice())?;
245
246            self.source.seek(SeekFrom::End(-(head_tail_size as i64)))?;
247            self.hot_tail = vec![0u8; head_tail_size];
248            self.source.read_exact(self.hot_tail.as_mut_slice())?;
249        } else {
250            self.hot_head = vec![0u8; self.pos_end as usize];
251            self.source.read_exact(self.hot_head.as_mut())?;
252        }
253
254        Ok(self)
255    }
256
257    /// Enables the warm cache with the specified size.
258    ///
259    /// The warm cache uses memory-mapped storage for improved performance when
260    /// reading larger regions. The size is clamped to be at least as large as
261    /// the block size to ensure proper alignment.
262    ///
263    /// Note: The memory mapping is performed lazily on first access.
264    pub fn with_warm_cache(mut self, mut warm_size: u64) -> Self {
265        // if warm_size is smaller than block_size we will not
266        // be able to write chunks into the warm cache
267        warm_size = max(warm_size, self.block_size);
268        self.warm_size = Some(warm_size);
269        self
270    }
271
272    #[inline(always)]
273    fn warm(&mut self) -> Result<&mut MmapMut, io::Error> {
274        if self.warm.is_none() && self.warm_size.is_some() {
275            self.warm = Some(MmapMut::map_anon(
276                self.warm_size.unwrap_or_default() as usize
277            )?);
278        }
279        Ok(self.warm.as_mut().unwrap())
280    }
281
282    #[inline(always)]
283    fn range_warmup(&mut self, range: Range<u64>) -> Result<(), io::Error> {
284        let start_chunk_id = range.start / self.block_size;
285        let end_chunk_id = (range.end.saturating_sub(1)) / self.block_size;
286
287        if self.loaded.is_empty() {
288            return Ok(());
289        }
290
291        for chunk_id in start_chunk_id..=end_chunk_id {
292            if self.loaded[chunk_id as usize] {
293                continue;
294            }
295
296            let offset = chunk_id * self.block_size;
297            let buf_size = min(
298                self.block_size as usize,
299                (self.pos_end.saturating_sub(offset)) as usize,
300            );
301            let mut buf = vec![0u8; buf_size];
302            self.source.seek(SeekFrom::Start(offset))?;
303            self.source.read_exact(&mut buf)?;
304
305            (&mut self.warm()?[offset as usize..]).write_all(&buf)?;
306            self.loaded[chunk_id as usize] = true;
307        }
308
309        Ok(())
310    }
311
312    #[inline(always)]
313    fn get_range_u64(&mut self, range: Range<u64>) -> Result<&[u8], io::Error> {
314        // we fix range in case we attempt at reading beyond end of file
315        let range = if range.end > self.pos_end {
316            range.start..self.pos_end
317        } else {
318            range
319        };
320
321        let range_len = range.end.saturating_sub(range.start);
322
323        if range.start > self.pos_end || range_len == 0 {
324            Ok(EMPTY_RANGE)
325        } else if range.start < self.hot_head.len() as u64
326            && range.end <= self.hot_head.len() as u64
327        {
328            self.seek(SeekFrom::Start(range.end))?;
329
330            Ok(&self.hot_head[range.start as usize..range.end as usize])
331        } else if range.start >= (self.pos_end.saturating_sub(self.hot_tail.len() as u64)) {
332            let tail_base = self.pos_end.saturating_sub(self.hot_tail.len() as u64);
333
334            let start = range.start - tail_base;
335            let end = range.end - tail_base;
336
337            self.seek(SeekFrom::Start(range.end))?;
338
339            Ok(&self.hot_tail[start as usize..end as usize])
340        } else if range.end < self.warm_size.unwrap_or_default() {
341            self.range_warmup(range.clone())?;
342            self.seek(SeekFrom::Start(range.end))?;
343
344            Ok(&self.warm()?[range.start as usize..range.end as usize])
345        } else {
346            if range_len > self.cold.len() as u64 {
347                self.cold.resize(range_len as usize, 0);
348            }
349
350            self.source.seek(SeekFrom::Start(range.start))?;
351            let n = self.source.read(self.cold[..range_len as usize].as_mut())?;
352            self.seek(SeekFrom::Start(range.end))?;
353
354            Ok(&self.cold[..n])
355        }
356    }
357
358    // reads while f returns true or we reach limit
359    #[inline(always)]
360    fn _read_while_or_limit<F>(
361        &mut self,
362        f: F,
363        limit: u64,
364        include_last: bool,
365    ) -> Result<&[u8], io::Error>
366    where
367        F: Fn(u8) -> bool,
368    {
369        let start = self.stream_pos;
370        let mut end = 0;
371
372        'outer: while limit - end > 0 {
373            let buf = self.read_count(self.block_size)?;
374
375            for b in buf {
376                if limit - end == 0 {
377                    break 'outer;
378                }
379
380                if !f(*b) {
381                    if include_last && end < self.data_size() {
382                        end += 1;
383                    }
384                    // read_until includes delimiter
385                    break 'outer;
386                }
387
388                end += 1;
389            }
390
391            // we processed last chunk
392            if buf.len() as u64 != self.block_size {
393                break;
394            }
395        }
396
397        self.read_exact_range(start..start + end)
398    }
399}
400
401#[cfg(test)]
402mod tests {
403    use std::os::unix::fs::MetadataExt;
404
405    use super::*;
406
407    macro_rules! lazy_cache {
408        ($content: literal) => {
409            LazyCache::from_read_seek(std::io::Cursor::new($content)).unwrap()
410        };
411    }
412
413    /// reads io::Reader `r` by chunks of size `cs` until the end
414    macro_rules! read_to_end {
415        ($r: expr, $cs: literal) => {{
416            let mut buf = [0u8; $cs];
417            let mut out: Vec<u8> = vec![];
418            while let Ok(n) = $r.read(&mut buf[..]) {
419                if n == 0 {
420                    break;
421                }
422                out.extend(&buf[..n]);
423            }
424            out
425        }};
426    }
427
428    #[test]
429    fn test_get_single_block() {
430        let mut cache = lazy_cache!(b"hello world");
431        let data = cache.read_range(0..4).unwrap();
432        assert_eq!(data, b"hell");
433    }
434
435    #[test]
436    fn test_get_across_blocks() {
437        let mut cache = lazy_cache!(b"hello world");
438        let data = cache.read_range(2..7).unwrap();
439        assert_eq!(data, b"llo w");
440    }
441
442    #[test]
443    fn test_get_entire_file() {
444        let mut cache = lazy_cache!(b"hello world");
445        let data = cache.read_range(0..11).unwrap();
446        assert_eq!(data, b"hello world");
447    }
448
449    #[test]
450    fn test_get_empty_range() {
451        let mut cache = lazy_cache!(b"hello world");
452        let data = cache.read_range(0..0).unwrap();
453        assert!(data.is_empty());
454    }
455
456    #[test]
457    fn test_get_out_of_bounds() {
458        let mut cache = lazy_cache!(b"hello world");
459        // This should not panic, but return an error or empty slice depending on your design
460        // Currently, your code will panic due to `unwrap()` on `None`
461        // You may want to handle this case more gracefully
462        assert!(cache.read_range(20..30).unwrap().is_empty());
463    }
464
465    #[test]
466    fn test_cache_eviction() {
467        let mut cache = lazy_cache!(b"0123456789abcdef");
468        // Load blocks 0 and 1
469        let _ = cache.read_range(0..8).unwrap();
470        // Load block 2, which should evict block 0 or 1 due to max_size=8
471        let _ = cache.read_range(8..12).unwrap();
472        // Check that the cache still works
473        let data = cache.read_range(8..12).unwrap();
474        assert_eq!(data, b"89ab");
475    }
476
477    #[test]
478    fn test_chunk_consolidation() {
479        let mut cache = lazy_cache!(b"0123456789abcdef");
480        // Load blocks 0 and 1 separately
481        let _ = cache.read_range(0..4).unwrap();
482        let _ = cache.read_range(4..8).unwrap();
483        // Load block 2, which should not consolidate with 0 or 1
484        let _ = cache.read_range(8..12).unwrap();
485        // Now load block 1 again, which should consolidate with block 0
486        let _ = cache.read_range(2..6).unwrap();
487        // Check that the consolidated chunk is correct
488        let data = cache.read_range(0..8).unwrap();
489        assert_eq!(data, b"01234567");
490    }
491
492    #[test]
493    fn test_overlapping_ranges() {
494        let mut cache = lazy_cache!(b"0123456789abcdef");
495        // Load overlapping ranges
496        let _ = cache.read_range(2..6).unwrap();
497        let _ = cache.read_range(4..10).unwrap();
498        // Check that the data is correct
499        let data = cache.read_range(2..10).unwrap();
500        assert_eq!(data, b"23456789");
501    }
502
503    #[test]
504    fn test_lru_behavior() {
505        let mut cache = lazy_cache!(b"0123456789abcdef");
506        // Load block 0
507        let _ = cache.read_range(0..4).unwrap();
508        // Load block 1
509        let _ = cache.read_range(4..8).unwrap();
510        // Load block 2, which should evict block 0
511        let _ = cache.read_range(8..12).unwrap();
512        // Block 0 should be evicted, so accessing it again should reload it
513        let data = cache.read_range(0..4).unwrap();
514        assert_eq!(data, b"0123");
515    }
516
517    #[test]
518    fn test_small_block_size() {
519        let mut cache = lazy_cache!(b"abc");
520        let data = cache.read_range(0..3).unwrap();
521        assert_eq!(data, b"abc");
522    }
523
524    #[test]
525    fn test_large_block_size() {
526        let mut cache = lazy_cache!(b"hello world");
527        let data = cache.read_range(0..11).unwrap();
528        assert_eq!(data, b"hello world");
529    }
530
531    #[test]
532    fn test_file_smaller_than_block() {
533        let mut cache = lazy_cache!(b"abc");
534        let data = cache.read_range(0..3).unwrap();
535        assert_eq!(data, b"abc");
536    }
537
538    #[test]
539    fn test_multiple_gets_same_block() {
540        let mut cache = lazy_cache!(b"0123456789abcdef");
541        // Get the same block multiple times
542        let _ = cache.read_range(0..4).unwrap();
543        let _ = cache.read_range(0..4).unwrap();
544        let _ = cache.read_range(0..4).unwrap();
545        // The block should still be in the cache
546        let data = cache.read_range(0..4).unwrap();
547        assert_eq!(data, b"0123");
548    }
549
550    #[test]
551    fn test_read_method() {
552        let mut cache = lazy_cache!(b"hello world");
553        let _ = cache.read_count(6).unwrap();
554        let data = cache.read_count(5).unwrap();
555        assert_eq!(data, b"world");
556        // We reached the end so next read should bring an empty slice
557        assert!(cache.read_count(1).unwrap().is_empty());
558    }
559
560    #[test]
561    fn test_read_empty() {
562        let mut cache = lazy_cache!(b"hello world");
563        let data = cache.read_count(0).unwrap();
564        assert!(data.is_empty());
565    }
566
567    #[test]
568    fn test_read_beyond_end() {
569        let mut cache = lazy_cache!(b"hello world");
570        let _ = cache.read_count(11).unwrap();
571        let data = cache.read_count(5).unwrap();
572        assert!(data.is_empty());
573    }
574
575    #[test]
576    fn test_read_exact_range() {
577        let mut cache = lazy_cache!(b"hello world");
578        let data = cache.read_exact_range(0..5).unwrap();
579        assert_eq!(data, b"hello");
580        assert_eq!(cache.read_exact_range(5..11).unwrap(), b" world");
581        assert!(cache.read_exact_range(12..13).is_err());
582    }
583
584    #[test]
585    fn test_read_exact_range_error() {
586        let mut cache = lazy_cache!(b"hello world");
587        let result = cache.read_exact_range(0..20);
588        assert!(result.is_err());
589    }
590
591    #[test]
592    fn test_read_exact() {
593        let mut cache = lazy_cache!(b"hello world");
594        let data = cache.read_exact_count(5).unwrap();
595        assert_eq!(data, b"hello");
596        assert_eq!(cache.read_exact_count(6).unwrap(), b" world");
597        assert!(cache.read_exact_count(0).is_ok());
598        assert!(cache.read_exact_count(1).is_err());
599    }
600
601    #[test]
602    fn test_read_exact_error() {
603        let mut cache = lazy_cache!(b"hello world");
604        let result = cache.read_exact_count(20);
605        assert!(result.is_err());
606    }
607
608    #[test]
609    fn test_read_until_limit() {
610        let mut cache = lazy_cache!(b"hello world");
611        let data = cache.read_until_or_limit(b' ', 10).unwrap();
612        assert_eq!(data, b"hello ");
613        assert_eq!(cache.read_exact_count(5).unwrap(), b"world");
614    }
615
616    #[test]
617    fn test_read_until_limit_not_found() {
618        let mut cache = lazy_cache!(b"hello world");
619        let data = cache.read_until_or_limit(b'\n', 11).unwrap();
620        assert_eq!(data, b"hello world");
621        assert!(cache.read_count(1).unwrap().is_empty());
622    }
623
624    #[test]
625    fn test_read_until_limit_beyond_stream() {
626        let mut cache = lazy_cache!(b"hello world");
627        let data = cache.read_until_or_limit(b'\n', 42).unwrap();
628        assert_eq!(data, b"hello world");
629        assert!(cache.read_count(1).unwrap().is_empty());
630    }
631
632    #[test]
633    fn test_read_until_limit_with_limit() {
634        let mut cache = lazy_cache!(b"hello world");
635        let data = cache.read_until_or_limit(b' ', 42).unwrap();
636        assert_eq!(data, b"hello ");
637
638        let data = cache.read_until_or_limit(b' ', 2).unwrap();
639        assert_eq!(data, b"wo");
640
641        let data = cache.read_until_or_limit(b' ', 42).unwrap();
642        assert_eq!(data, b"rld");
643    }
644
645    #[test]
646    fn test_read_until_utf16_limit() {
647        let mut cache = lazy_cache!(
648            b"\x61\x00\x62\x00\x63\x00\x64\x00\x00\x00\x61\x00\x62\x00\x63\x00\x64\x00\x00"
649        );
650        let data = cache.read_until_utf16_or_limit(b"\x00\x00", 512).unwrap();
651        assert_eq!(data, b"\x61\x00\x62\x00\x63\x00\x64\x00\x00\x00");
652
653        let data = cache.read_until_utf16_or_limit(b"\x00\x00", 1).unwrap();
654        assert_eq!(data, b"\x61\x00");
655
656        assert_eq!(
657            cache.read_until_utf16_or_limit(b"\xff\xff", 64).unwrap(),
658            b"\x62\x00\x63\x00\x64\x00\x00"
659        );
660    }
661
662    #[test]
663    fn test_io_read() {
664        let p = "./src/lib.rs";
665        let mut f = File::open(p).unwrap();
666        let mut lr = LazyCache::from_read_seek(File::open(p).unwrap())
667            .unwrap()
668            .with_hot_cache(512)
669            .unwrap()
670            .with_warm_cache(1024);
671
672        let fb = read_to_end!(f, 32);
673        let lcb = read_to_end!(lr, 16);
674
675        assert_eq!(lcb, fb);
676    }
677
678    #[test]
679    fn test_data_size() {
680        let f = File::open("./src/lib.rs").unwrap();
681        let size = f.metadata().unwrap().size();
682
683        let c = LazyCache::from_read_seek(f).unwrap();
684        assert_eq!(size, c.data_size());
685
686        assert_eq!(
687            LazyCache::from_read_seek(io::Cursor::new(&[]))
688                .unwrap()
689                .data_size(),
690            0
691        );
692    }
693}