Skip to main content

pure_magic/readers/
cache.rs

1#![deny(unsafe_code)]
2
3use std::{
4    cmp::{max, min},
5    fs::File,
6    io::{self, Read, Seek, SeekFrom, Write},
7    ops::Range,
8    path::Path,
9};
10
11use crate::readers::DataRead;
12use memmap2::MmapMut;
13
14/// A lazy-loading cache reader with a multi-tiered caching strategy.
15///
16/// Wraps a [`Read`] + [`Seek`] type and provides efficient cached reads using
17/// a hierarchy of caches: hot (head/tail), warm (memory-mapped), and cold (direct).
18///
19/// The cache automatically loads data in blocks as needed, minimizing I/O operations
20/// for sequential and random access patterns.
21///
22/// # Cache Tiers
23///
24/// - **Hot cache**: Small buffers at the head and tail of the source, always available.
25/// - **Warm cache**: Memory-mapped region for frequently accessed data.
26/// - **Cold cache**: Fallback buffer for reads that don't fit in other caches.
27///
28/// See [`LazyCache::from_read_seek`], [`LazyCache::open`], [`LazyCache::with_hot_cache`],
29/// and [`LazyCache::with_warm_cache`] for construction.
30pub struct LazyCache<R>
31where
32    R: Read + Seek,
33{
34    source: R,
35    loaded: Vec<bool>,
36    hot_head: Vec<u8>,
37    hot_tail: Vec<u8>,
38    warm: Option<MmapMut>,
39    cold_range: Range<u64>,
40    cold: Vec<u8>,
41    block_size: u64,
42    warm_size: Option<u64>,
43    stream_pos: u64,
44    pos_end: u64,
45}
46
47const BLOCK_SIZE: usize = 4096;
48
49impl<R> DataRead for LazyCache<R>
50where
51    R: Read + Seek,
52{
53    #[inline(always)]
54    fn stream_position(&self) -> u64 {
55        self.stream_pos
56    }
57
58    fn read_range(&mut self, range: Range<u64>) -> Result<&[u8], io::Error> {
59        self.get_range_u64(range)
60    }
61
62    fn read_until_any_delim_or_limit(
63        &mut self,
64        delims: &[u8],
65        limit: u64,
66    ) -> Result<&[u8], io::Error> {
67        self._read_while_or_limit(|b| !delims.contains(&b), limit, true)
68    }
69
70    fn read_until_or_limit(&mut self, byte: u8, limit: u64) -> Result<&[u8], io::Error> {
71        self._read_while_or_limit(|b| b != byte, limit, true)
72    }
73
74    fn read_while_or_limit<F>(&mut self, f: F, limit: u64) -> Result<&[u8], io::Error>
75    where
76        F: Fn(u8) -> bool,
77    {
78        self._read_while_or_limit(f, limit, false)
79    }
80
81    fn read_until_utf16_or_limit(
82        &mut self,
83        utf16_char: &[u8; 2],
84        limit: u64,
85    ) -> Result<&[u8], io::Error> {
86        let start = self.stream_pos;
87        let mut end = 0;
88
89        let even_bs = if self.block_size.is_multiple_of(2) {
90            self.block_size
91        } else {
92            self.block_size.saturating_add(1)
93        };
94
95        'outer: while limit.saturating_sub(end) > 0 {
96            let buf = self.read_count(even_bs)?;
97
98            let even = buf
99                .iter()
100                .enumerate()
101                .filter(|(i, _)| i % 2 == 0)
102                .map(|t| t.1);
103
104            let odd = buf
105                .iter()
106                .enumerate()
107                .filter(|(i, _)| i % 2 != 0)
108                .map(|t| t.1);
109
110            for t in even.zip(odd) {
111                if limit.saturating_sub(end) == 0 {
112                    break 'outer;
113                }
114
115                end += 2;
116
117                // tail check
118                if t.0 == &utf16_char[0] && t.1 == &utf16_char[1] {
119                    // we include char
120                    break 'outer;
121                }
122            }
123
124            // we processed the last chunk
125            if buf.len() as u64 != even_bs {
126                // if we arrive here we reached end of file
127                if buf.len() % 2 != 0 {
128                    // we include last byte missed by zip
129                    end += 1
130                }
131                break;
132            }
133        }
134
135        self.read_exact_range(start..start + end)
136    }
137
138    fn data_size(&self) -> u64 {
139        self.pos_end
140    }
141
142    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
143        self.stream_pos = self.offset_from_start(pos);
144        Ok(self.stream_pos)
145    }
146}
147
148impl LazyCache<File> {
149    /// Opens a file and creates a new `LazyCache` for it.
150    ///
151    /// This is a convenience constructor equivalent to calling [`LazyCache::from_read_seek`]
152    /// with a [`File`].
153    ///
154    /// # Errors
155    ///
156    /// Returns an error if the file cannot be opened.
157    ///
158    /// # Examples
159    ///
160    /// ```no_run
161    /// use pure_magic::readers::LazyCache;
162    /// use std::path::Path;
163    ///
164    /// let cache = LazyCache::<std::fs::File>::open(Path::new("file.bin"))?;
165    /// # Ok::<_, std::io::Error>(())
166    /// ```
167    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, io::Error> {
168        Self::from_read_seek(File::open(path)?)
169    }
170}
171
172impl<R> io::Read for LazyCache<R>
173where
174    R: Read + Seek,
175{
176    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
177        let r = self.read_count(buf.len() as u64)?;
178        for (i, b) in r.iter().enumerate() {
179            buf[i] = *b;
180        }
181        Ok(r.len())
182    }
183}
184
185impl<R> LazyCache<R>
186where
187    R: Read + Seek,
188{
189    /// Creates a new `LazyCache` wrapping a [`Read`] + [`Seek`] type.
190    ///
191    /// The cache is initialized with default settings: no hot or warm caches.
192    /// Use [`LazyCache::with_hot_cache`] and [`LazyCache::with_warm_cache`] to enable additional cache tiers.
193    ///
194    /// # Errors
195    ///
196    /// Returns an error if seeking to the end of the source fails.
197    ///
198    /// # Examples
199    ///
200    /// ```
201    /// use pure_magic::readers::{LazyCache, DataRead};
202    /// use std::io::Cursor;
203    ///
204    /// let data = b"hello world";
205    /// let cache = LazyCache::from_read_seek(Cursor::new(data)).unwrap();
206    /// assert_eq!(cache.data_size(), data.len() as u64);
207    /// ```
208    pub fn from_read_seek(mut rs: R) -> Result<Self, io::Error> {
209        let block_size = BLOCK_SIZE as u64;
210        let pos_end = rs.seek(SeekFrom::End(0))?;
211        let cache_cap = pos_end.div_ceil(BLOCK_SIZE as u64);
212
213        Ok(Self {
214            source: rs,
215            hot_head: vec![],
216            hot_tail: vec![],
217            warm: None,
218            cold_range: 0..0,
219            cold: vec![0; block_size as usize],
220            loaded: vec![false; cache_cap as usize],
221            block_size,
222            warm_size: None,
223            stream_pos: 0,
224            pos_end,
225        })
226    }
227
228    /// Enables the hot cache with the specified size.
229    ///
230    /// The hot cache maintains two buffers: one at the head (beginning) and one
231    /// at the tail (end) of the source, each with size `size / 2`. This is useful
232    /// for optimizing access to the start and end of files.
233    ///
234    /// # Errors
235    ///
236    /// Returns an error if seeking or reading from the source fails.
237    pub fn with_hot_cache(mut self, size: usize) -> Result<Self, io::Error> {
238        let head_tail_size = size / 2;
239
240        self.source.seek(SeekFrom::Start(0))?;
241
242        if self.pos_end > size as u64 {
243            self.hot_head = vec![0u8; head_tail_size];
244            self.source.read_exact(self.hot_head.as_mut_slice())?;
245
246            self.source.seek(SeekFrom::End(-(head_tail_size as i64)))?;
247            self.hot_tail = vec![0u8; head_tail_size];
248            self.source.read_exact(self.hot_tail.as_mut_slice())?;
249        } else {
250            self.hot_head = vec![0u8; self.pos_end as usize];
251            self.source.read_exact(self.hot_head.as_mut())?;
252        }
253
254        Ok(self)
255    }
256
257    /// Enables the warm cache with the specified size.
258    ///
259    /// The warm cache uses memory-mapped storage for improved performance when
260    /// reading larger regions. The size is clamped to be at least as large as
261    /// the block size to ensure proper alignment.
262    ///
263    /// Note: The memory mapping is performed lazily on first access.
264    pub fn with_warm_cache(mut self, mut warm_size: u64) -> Self {
265        // if warm_size is smaller than block_size we will not
266        // be able to write chunks into the warm cache
267        warm_size = max(warm_size, self.block_size);
268        self.warm_size = Some(warm_size);
269        self
270    }
271
272    #[inline(always)]
273    fn warm(&mut self) -> Result<&mut MmapMut, io::Error> {
274        if self.warm.is_none() && self.warm_size.is_some() {
275            self.warm = Some(MmapMut::map_anon(
276                self.warm_size.unwrap_or_default() as usize
277            )?);
278        }
279        Ok(self.warm.as_mut().unwrap())
280    }
281
282    #[inline(always)]
283    fn range_warmup(&mut self, range: Range<u64>) -> Result<(), io::Error> {
284        let start_chunk_id = range.start / self.block_size;
285        let end_chunk_id = (range.end.saturating_sub(1)) / self.block_size;
286
287        if self.loaded.is_empty() {
288            return Ok(());
289        }
290
291        for chunk_id in start_chunk_id..=end_chunk_id {
292            if self.loaded[chunk_id as usize] {
293                continue;
294            }
295
296            let offset = chunk_id * self.block_size;
297            let buf_size = min(
298                self.block_size as usize,
299                (self.pos_end.saturating_sub(offset)) as usize,
300            );
301            let mut buf = vec![0u8; buf_size];
302            self.source.seek(SeekFrom::Start(offset))?;
303            self.source.read_exact(&mut buf)?;
304
305            (&mut self.warm()?[offset as usize..]).write_all(&buf)?;
306            self.loaded[chunk_id as usize] = true;
307        }
308
309        Ok(())
310    }
311
312    #[inline(always)]
313    fn get_range_u64(&mut self, range: Range<u64>) -> Result<&[u8], io::Error> {
314        // we fix range in case we attempt at reading beyond end of file
315        let range = if range.end > self.pos_end {
316            range.start..self.pos_end
317        } else {
318            range
319        };
320
321        let range_len = range.end.saturating_sub(range.start);
322
323        if range.start > self.pos_end || range_len == 0 {
324            Ok(&[])
325        } else if range.start < self.hot_head.len() as u64
326            && range.end <= self.hot_head.len() as u64
327        {
328            self.seek(SeekFrom::Start(range.end))?;
329            Ok(&self.hot_head[range.start as usize..range.end as usize])
330        } else if range.start >= (self.pos_end.saturating_sub(self.hot_tail.len() as u64)) {
331            let tail_base = self.pos_end.saturating_sub(self.hot_tail.len() as u64);
332
333            let start = range.start - tail_base;
334            let end = range.end - tail_base;
335
336            self.seek(SeekFrom::Start(range.end))?;
337
338            Ok(&self.hot_tail[start as usize..end as usize])
339        } else if range.end < self.warm_size.unwrap_or_default() {
340            self.range_warmup(range.clone())?;
341            self.seek(SeekFrom::Start(range.end))?;
342
343            Ok(&self.warm()?[range.start as usize..range.end as usize])
344        } else {
345            if self.cold_range.contains(&range.start)
346                && self.cold_range.contains(&range.end.saturating_sub(1))
347            {
348                let rel_start = range.start - self.cold_range.start;
349                self.seek(SeekFrom::Start(range.end))?;
350
351                Ok(&self.cold[rel_start as usize..(rel_start + range_len) as usize])
352            } else {
353                // we read one block in advance
354                let range_len_ext = range_len.saturating_add(self.block_size);
355                if range_len_ext > self.cold.len() as u64 {
356                    self.cold.resize(range_len_ext as usize, 0);
357                }
358
359                self.source.seek(SeekFrom::Start(range.start))?;
360                let n = self
361                    .source
362                    .read(self.cold[..range_len_ext as usize].as_mut())?;
363                self.seek(SeekFrom::Start(range.end))?;
364
365                Ok(&self.cold[..min(range_len as usize, n)])
366            }
367        }
368    }
369
370    // reads while f returns true or we reach limit
371    #[inline(always)]
372    fn _read_while_or_limit<F>(
373        &mut self,
374        f: F,
375        limit: u64,
376        include_last: bool,
377    ) -> Result<&[u8], io::Error>
378    where
379        F: Fn(u8) -> bool,
380    {
381        let start = self.stream_pos;
382        let mut end = 0;
383
384        'outer: while limit - end > 0 {
385            let buf = self.read_count(self.block_size)?;
386
387            for b in buf {
388                if limit - end == 0 {
389                    break 'outer;
390                }
391
392                if !f(*b) {
393                    if include_last && end < self.data_size() {
394                        end += 1;
395                    }
396                    // read_until includes delimiter
397                    break 'outer;
398                }
399
400                end += 1;
401            }
402
403            // we processed last chunk
404            if buf.len() as u64 != self.block_size {
405                break;
406            }
407        }
408
409        self.read_exact_range(start..start + end)
410    }
411}
412
413#[cfg(test)]
414mod tests {
415    use std::os::unix::fs::MetadataExt;
416
417    use super::*;
418
419    macro_rules! lazy_cache {
420        ($content: literal) => {
421            LazyCache::from_read_seek(std::io::Cursor::new($content)).unwrap()
422        };
423    }
424
425    /// reads io::Reader `r` by chunks of size `cs` until the end
426    macro_rules! read_to_end {
427        ($r: expr, $cs: literal) => {{
428            let mut buf = [0u8; $cs];
429            let mut out: Vec<u8> = vec![];
430            while let Ok(n) = $r.read(&mut buf[..]) {
431                if n == 0 {
432                    break;
433                }
434                out.extend(&buf[..n]);
435            }
436            out
437        }};
438    }
439
440    #[test]
441    fn test_get_single_block() {
442        let mut cache = lazy_cache!(b"hello world");
443        let data = cache.read_range(0..4).unwrap();
444        assert_eq!(data, b"hell");
445    }
446
447    #[test]
448    fn test_get_across_blocks() {
449        let mut cache = lazy_cache!(b"hello world");
450        let data = cache.read_range(2..7).unwrap();
451        assert_eq!(data, b"llo w");
452    }
453
454    #[test]
455    fn test_get_entire_file() {
456        let mut cache = lazy_cache!(b"hello world");
457        let data = cache.read_range(0..11).unwrap();
458        assert_eq!(data, b"hello world");
459    }
460
461    #[test]
462    fn test_get_empty_range() {
463        let mut cache = lazy_cache!(b"hello world");
464        let data = cache.read_range(0..0).unwrap();
465        assert!(data.is_empty());
466    }
467
468    #[test]
469    fn test_get_out_of_bounds() {
470        let mut cache = lazy_cache!(b"hello world");
471        // This should not panic, but return an error or empty slice depending on your design
472        // Currently, your code will panic due to `unwrap()` on `None`
473        // You may want to handle this case more gracefully
474        assert!(cache.read_range(20..30).unwrap().is_empty());
475    }
476
477    #[test]
478    fn test_cache_eviction() {
479        let mut cache = lazy_cache!(b"0123456789abcdef");
480        // Load blocks 0 and 1
481        let _ = cache.read_range(0..8).unwrap();
482        // Load block 2, which should evict block 0 or 1 due to max_size=8
483        let _ = cache.read_range(8..12).unwrap();
484        // Check that the cache still works
485        let data = cache.read_range(8..12).unwrap();
486        assert_eq!(data, b"89ab");
487    }
488
489    #[test]
490    fn test_chunk_consolidation() {
491        let mut cache = lazy_cache!(b"0123456789abcdef");
492        // Load blocks 0 and 1 separately
493        let _ = cache.read_range(0..4).unwrap();
494        let _ = cache.read_range(4..8).unwrap();
495        // Load block 2, which should not consolidate with 0 or 1
496        let _ = cache.read_range(8..12).unwrap();
497        // Now load block 1 again, which should consolidate with block 0
498        let _ = cache.read_range(2..6).unwrap();
499        // Check that the consolidated chunk is correct
500        let data = cache.read_range(0..8).unwrap();
501        assert_eq!(data, b"01234567");
502    }
503
504    #[test]
505    fn test_overlapping_ranges() {
506        let mut cache = lazy_cache!(b"0123456789abcdef");
507        // Load overlapping ranges
508        let _ = cache.read_range(2..6).unwrap();
509        let _ = cache.read_range(4..10).unwrap();
510        // Check that the data is correct
511        let data = cache.read_range(2..10).unwrap();
512        assert_eq!(data, b"23456789");
513    }
514
515    #[test]
516    fn test_lru_behavior() {
517        let mut cache = lazy_cache!(b"0123456789abcdef");
518        // Load block 0
519        let _ = cache.read_range(0..4).unwrap();
520        // Load block 1
521        let _ = cache.read_range(4..8).unwrap();
522        // Load block 2, which should evict block 0
523        let _ = cache.read_range(8..12).unwrap();
524        // Block 0 should be evicted, so accessing it again should reload it
525        let data = cache.read_range(0..4).unwrap();
526        assert_eq!(data, b"0123");
527    }
528
529    #[test]
530    fn test_small_block_size() {
531        let mut cache = lazy_cache!(b"abc");
532        let data = cache.read_range(0..3).unwrap();
533        assert_eq!(data, b"abc");
534    }
535
536    #[test]
537    fn test_large_block_size() {
538        let mut cache = lazy_cache!(b"hello world");
539        let data = cache.read_range(0..11).unwrap();
540        assert_eq!(data, b"hello world");
541    }
542
543    #[test]
544    fn test_file_smaller_than_block() {
545        let mut cache = lazy_cache!(b"abc");
546        let data = cache.read_range(0..3).unwrap();
547        assert_eq!(data, b"abc");
548    }
549
550    #[test]
551    fn test_multiple_gets_same_block() {
552        let mut cache = lazy_cache!(b"0123456789abcdef");
553        // Get the same block multiple times
554        let _ = cache.read_range(0..4).unwrap();
555        let _ = cache.read_range(0..4).unwrap();
556        let _ = cache.read_range(0..4).unwrap();
557        // The block should still be in the cache
558        let data = cache.read_range(0..4).unwrap();
559        assert_eq!(data, b"0123");
560    }
561
562    #[test]
563    fn test_read_method() {
564        let mut cache = lazy_cache!(b"hello world");
565        let _ = cache.read_count(6).unwrap();
566        let data = cache.read_count(5).unwrap();
567        assert_eq!(data, b"world");
568        // We reached the end so next read should bring an empty slice
569        assert!(cache.read_count(1).unwrap().is_empty());
570    }
571
572    #[test]
573    fn test_read_empty() {
574        let mut cache = lazy_cache!(b"hello world");
575        let data = cache.read_count(0).unwrap();
576        assert!(data.is_empty());
577    }
578
579    #[test]
580    fn test_read_beyond_end() {
581        let mut cache = lazy_cache!(b"hello world");
582        let _ = cache.read_count(11).unwrap();
583        let data = cache.read_count(5).unwrap();
584        assert!(data.is_empty());
585    }
586
587    #[test]
588    fn test_read_exact_range() {
589        let mut cache = lazy_cache!(b"hello world");
590        let data = cache.read_exact_range(0..5).unwrap();
591        assert_eq!(data, b"hello");
592        assert_eq!(cache.read_exact_range(5..11).unwrap(), b" world");
593        assert!(cache.read_exact_range(12..13).is_err());
594    }
595
596    #[test]
597    fn test_read_exact_range_error() {
598        let mut cache = lazy_cache!(b"hello world");
599        let result = cache.read_exact_range(0..20);
600        assert!(result.is_err());
601    }
602
603    #[test]
604    fn test_read_exact() {
605        let mut cache = lazy_cache!(b"hello world");
606        let data = cache.read_exact_count(5).unwrap();
607        assert_eq!(data, b"hello");
608        assert_eq!(cache.read_exact_count(6).unwrap(), b" world");
609        assert!(cache.read_exact_count(0).is_ok());
610        assert!(cache.read_exact_count(1).is_err());
611    }
612
613    #[test]
614    fn test_read_exact_error() {
615        let mut cache = lazy_cache!(b"hello world");
616        let result = cache.read_exact_count(20);
617        assert!(result.is_err());
618    }
619
620    #[test]
621    fn test_read_until_limit() {
622        let mut cache = lazy_cache!(b"hello world");
623        let data = cache.read_until_or_limit(b' ', 10).unwrap();
624        assert_eq!(data, b"hello ");
625        assert_eq!(cache.read_exact_count(5).unwrap(), b"world");
626    }
627
628    #[test]
629    fn test_read_until_limit_not_found() {
630        let mut cache = lazy_cache!(b"hello world");
631        let data = cache.read_until_or_limit(b'\n', 11).unwrap();
632        assert_eq!(data, b"hello world");
633        assert!(cache.read_count(1).unwrap().is_empty());
634    }
635
636    #[test]
637    fn test_read_until_limit_beyond_stream() {
638        let mut cache = lazy_cache!(b"hello world");
639        let data = cache.read_until_or_limit(b'\n', 42).unwrap();
640        assert_eq!(data, b"hello world");
641        assert!(cache.read_count(1).unwrap().is_empty());
642    }
643
644    #[test]
645    fn test_read_until_limit_with_limit() {
646        let mut cache = lazy_cache!(b"hello world");
647        let data = cache.read_until_or_limit(b' ', 42).unwrap();
648        assert_eq!(data, b"hello ");
649
650        let data = cache.read_until_or_limit(b' ', 2).unwrap();
651        assert_eq!(data, b"wo");
652
653        let data = cache.read_until_or_limit(b' ', 42).unwrap();
654        assert_eq!(data, b"rld");
655    }
656
657    #[test]
658    fn test_read_until_utf16_limit() {
659        let mut cache = lazy_cache!(
660            b"\x61\x00\x62\x00\x63\x00\x64\x00\x00\x00\x61\x00\x62\x00\x63\x00\x64\x00\x00"
661        );
662        let data = cache.read_until_utf16_or_limit(b"\x00\x00", 512).unwrap();
663        assert_eq!(data, b"\x61\x00\x62\x00\x63\x00\x64\x00\x00\x00");
664
665        let data = cache.read_until_utf16_or_limit(b"\x00\x00", 1).unwrap();
666        assert_eq!(data, b"\x61\x00");
667
668        assert_eq!(
669            cache.read_until_utf16_or_limit(b"\xff\xff", 64).unwrap(),
670            b"\x62\x00\x63\x00\x64\x00\x00"
671        );
672    }
673
674    #[test]
675    fn test_io_read() {
676        let p = "./src/lib.rs";
677        let mut f = File::open(p).unwrap();
678        let mut lr = LazyCache::from_read_seek(File::open(p).unwrap())
679            .unwrap()
680            .with_hot_cache(512)
681            .unwrap()
682            .with_warm_cache(1024);
683
684        let fb = read_to_end!(f, 32);
685        let lcb = read_to_end!(lr, 16);
686
687        assert_eq!(lcb, fb);
688    }
689
690    #[test]
691    fn test_data_size() {
692        let f = File::open("./src/lib.rs").unwrap();
693        let size = f.metadata().unwrap().size();
694
695        let c = LazyCache::from_read_seek(f).unwrap();
696        assert_eq!(size, c.data_size());
697
698        assert_eq!(
699            LazyCache::from_read_seek(io::Cursor::new(&[]))
700                .unwrap()
701                .data_size(),
702            0
703        );
704    }
705}