Skip to main content

modelvault_core/
pager.rs

1use std::collections::HashMap;
2use std::io;
3use std::ops::RangeInclusive;
4use std::sync::Arc;
5use std::sync::Mutex;
6
7use crate::error::DbError;
8use crate::storage::Store;
9
10pub const DEFAULT_PAGE_SIZE: u64 = 16 * 1024;
11
12/// A simple fixed-size page cache wrapper over any [`Store`].
13///
14/// This is intentionally minimal (no eviction policy yet). It exists to decouple the engine’s
15/// random-access reads from the OS file descriptor and provide a hook for future buffer pool work.
16#[derive(Debug)]
17pub struct PagedStore<S: Store> {
18    inner: S,
19    page_size: u64,
20    // Interior mutability so we can keep the `Store` trait surface unchanged.
21    cache: Arc<Mutex<HashMap<u64, Vec<u8>>>>,
22}
23
24impl<S: Store> PagedStore<S> {
25    pub fn new(inner: S, page_size: u64) -> Self {
26        let page_size = page_size.max(512); // basic sanity guard
27        Self {
28            inner,
29            page_size,
30            cache: Arc::new(Mutex::new(HashMap::new())),
31        }
32    }
33
34    pub fn into_inner(self) -> S {
35        self.inner
36    }
37
38    pub fn page_size(&self) -> u64 {
39        self.page_size
40    }
41
42    fn page_range_touched(&self, offset: u64, len: usize) -> RangeInclusive<u64> {
43        if len == 0 {
44            return 0..=0;
45        }
46        let start = offset / self.page_size;
47        let end = offset.saturating_add(len as u64 - 1) / self.page_size;
48        start..=end
49    }
50
51    fn get_page(&mut self, page_idx: u64) -> Result<Vec<u8>, DbError> {
52        if let Some(hit) = self
53            .cache
54            .lock()
55            .unwrap_or_else(|e| e.into_inner())
56            .get(&page_idx)
57            .cloned()
58        {
59            return Ok(hit);
60        }
61
62        let len = self.inner.len()?;
63        let page_start = page_idx
64            .checked_mul(self.page_size)
65            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "page offset overflow"))?;
66
67        // Missing pages beyond EOF are never valid; fail deterministically.
68        if page_start >= len {
69            return Err(DbError::Io(io::Error::new(
70                io::ErrorKind::UnexpectedEof,
71                "read past end of store",
72            )));
73        }
74
75        let to_read = (len - page_start).min(self.page_size) as usize;
76        let mut page = vec![0u8; self.page_size as usize];
77        self.inner.read_exact_at(page_start, &mut page[..to_read])?;
78
79        self.cache
80            .lock()
81            .unwrap_or_else(|e| e.into_inner())
82            .insert(page_idx, page.clone());
83
84        Ok(page)
85    }
86
87    fn invalidate_range(&mut self, offset: u64, len: usize) -> Result<(), DbError> {
88        if len == 0 {
89            return Ok(());
90        }
91        let pages = self.page_range_touched(offset, len);
92        let mut cache = self.cache.lock().unwrap_or_else(|e| e.into_inner());
93        for p in pages {
94            cache.remove(&p);
95        }
96        Ok(())
97    }
98
99    fn clear_truncated(&mut self, new_len: u64) -> Result<(), DbError> {
100        let mut cache = self.cache.lock().unwrap_or_else(|e| e.into_inner());
101        let ps = self.page_size;
102        cache.retain(|page_idx, _| {
103            let start = page_idx.saturating_mul(ps);
104            start < new_len && start.saturating_add(ps) <= new_len
105        });
106        Ok(())
107    }
108}
109
110impl<S: Store> Store for PagedStore<S> {
111    fn len(&self) -> Result<u64, DbError> {
112        self.inner.len()
113    }
114
115    fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError> {
116        let len = self.inner.len()?;
117        let end = offset
118            .checked_add(buf.len() as u64)
119            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "overflow"))?;
120        if end > len {
121            return Err(DbError::Io(io::Error::new(
122                io::ErrorKind::UnexpectedEof,
123                "read past end of store",
124            )));
125        }
126
127        let mut remaining = buf.len();
128        let mut out_pos = 0usize;
129        while remaining > 0 {
130            let cur_off = offset + out_pos as u64;
131            let page_idx = cur_off / self.page_size;
132            let page_off = (cur_off % self.page_size) as usize;
133            let take = remaining.min(self.page_size as usize - page_off);
134
135            let page = self.get_page(page_idx)?;
136            buf[out_pos..out_pos + take].copy_from_slice(&page[page_off..page_off + take]);
137
138            out_pos += take;
139            remaining -= take;
140        }
141        Ok(())
142    }
143
144    fn write_all_at(&mut self, offset: u64, buf: &[u8]) -> Result<(), DbError> {
145        self.inner.write_all_at(offset, buf)?;
146        self.invalidate_range(offset, buf.len())?;
147        Ok(())
148    }
149
150    fn sync(&mut self) -> Result<(), DbError> {
151        self.inner.sync()
152    }
153
154    fn truncate(&mut self, len: u64) -> Result<(), DbError> {
155        self.inner.truncate(len)?;
156        self.clear_truncated(len)?;
157        Ok(())
158    }
159}
160
161#[cfg(test)]
162mod tests {
163    include!(concat!(
164        env!("CARGO_MANIFEST_DIR"),
165        "/tests/unit/src_pager_tests.rs"
166    ));
167}