1use std::collections::HashMap;
2use std::io;
3use std::ops::RangeInclusive;
4use std::sync::Arc;
5use std::sync::Mutex;
6
7use crate::error::DbError;
8use crate::storage::Store;
9
10pub const DEFAULT_PAGE_SIZE: u64 = 16 * 1024;
11
12#[derive(Debug)]
17pub struct PagedStore<S: Store> {
18 inner: S,
19 page_size: u64,
20 cache: Arc<Mutex<HashMap<u64, Vec<u8>>>>,
22}
23
24impl<S: Store> PagedStore<S> {
25 pub fn new(inner: S, page_size: u64) -> Self {
26 let page_size = page_size.max(512); Self {
28 inner,
29 page_size,
30 cache: Arc::new(Mutex::new(HashMap::new())),
31 }
32 }
33
34 pub fn into_inner(self) -> S {
35 self.inner
36 }
37
38 pub fn page_size(&self) -> u64 {
39 self.page_size
40 }
41
42 fn page_range_touched(&self, offset: u64, len: usize) -> RangeInclusive<u64> {
43 if len == 0 {
44 return 0..=0;
45 }
46 let start = offset / self.page_size;
47 let end = offset.saturating_add(len as u64 - 1) / self.page_size;
48 start..=end
49 }
50
51 fn get_page(&mut self, page_idx: u64) -> Result<Vec<u8>, DbError> {
52 if let Some(hit) = self
53 .cache
54 .lock()
55 .unwrap_or_else(|e| e.into_inner())
56 .get(&page_idx)
57 .cloned()
58 {
59 return Ok(hit);
60 }
61
62 let len = self.inner.len()?;
63 let page_start = page_idx
64 .checked_mul(self.page_size)
65 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "page offset overflow"))?;
66
67 if page_start >= len {
69 return Err(DbError::Io(io::Error::new(
70 io::ErrorKind::UnexpectedEof,
71 "read past end of store",
72 )));
73 }
74
75 let to_read = (len - page_start).min(self.page_size) as usize;
76 let mut page = vec![0u8; self.page_size as usize];
77 self.inner.read_exact_at(page_start, &mut page[..to_read])?;
78
79 self.cache
80 .lock()
81 .unwrap_or_else(|e| e.into_inner())
82 .insert(page_idx, page.clone());
83
84 Ok(page)
85 }
86
87 fn invalidate_range(&mut self, offset: u64, len: usize) -> Result<(), DbError> {
88 if len == 0 {
89 return Ok(());
90 }
91 let pages = self.page_range_touched(offset, len);
92 let mut cache = self.cache.lock().unwrap_or_else(|e| e.into_inner());
93 for p in pages {
94 cache.remove(&p);
95 }
96 Ok(())
97 }
98
99 fn clear_truncated(&mut self, new_len: u64) -> Result<(), DbError> {
100 let mut cache = self.cache.lock().unwrap_or_else(|e| e.into_inner());
101 let ps = self.page_size;
102 cache.retain(|page_idx, _| {
103 let start = page_idx.saturating_mul(ps);
104 start < new_len && start.saturating_add(ps) <= new_len
105 });
106 Ok(())
107 }
108}
109
110impl<S: Store> Store for PagedStore<S> {
111 fn len(&self) -> Result<u64, DbError> {
112 self.inner.len()
113 }
114
115 fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError> {
116 let len = self.inner.len()?;
117 let end = offset
118 .checked_add(buf.len() as u64)
119 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "overflow"))?;
120 if end > len {
121 return Err(DbError::Io(io::Error::new(
122 io::ErrorKind::UnexpectedEof,
123 "read past end of store",
124 )));
125 }
126
127 let mut remaining = buf.len();
128 let mut out_pos = 0usize;
129 while remaining > 0 {
130 let cur_off = offset + out_pos as u64;
131 let page_idx = cur_off / self.page_size;
132 let page_off = (cur_off % self.page_size) as usize;
133 let take = remaining.min(self.page_size as usize - page_off);
134
135 let page = self.get_page(page_idx)?;
136 buf[out_pos..out_pos + take].copy_from_slice(&page[page_off..page_off + take]);
137
138 out_pos += take;
139 remaining -= take;
140 }
141 Ok(())
142 }
143
144 fn write_all_at(&mut self, offset: u64, buf: &[u8]) -> Result<(), DbError> {
145 self.inner.write_all_at(offset, buf)?;
146 self.invalidate_range(offset, buf.len())?;
147 Ok(())
148 }
149
150 fn sync(&mut self) -> Result<(), DbError> {
151 self.inner.sync()
152 }
153
154 fn truncate(&mut self, len: u64) -> Result<(), DbError> {
155 self.inner.truncate(len)?;
156 self.clear_truncated(len)?;
157 Ok(())
158 }
159}
160
161#[cfg(test)]
162mod tests {
163 include!(concat!(
164 env!("CARGO_MANIFEST_DIR"),
165 "/tests/unit/src_pager_tests.rs"
166 ));
167}