reddb_server/storage/engine/
overflow.rs1use super::page::{Page, PageType, CONTENT_SIZE, HEADER_SIZE, PAGE_SIZE};
24use super::pager::{Pager, PagerError};
25
26pub const OVERFLOW_PAGE_HEADER: usize = 8;
28
29pub const OVERFLOW_PAYLOAD_PER_PAGE: usize = CONTENT_SIZE - OVERFLOW_PAGE_HEADER;
31
32const _: () = assert!(OVERFLOW_PAYLOAD_PER_PAGE == PAGE_SIZE - HEADER_SIZE - OVERFLOW_PAGE_HEADER);
33
34#[derive(Debug)]
36pub enum OverflowError {
37 Pager(PagerError),
39 NotOverflowPage { page_id: u32 },
41 PayloadTooLarge { page_id: u32, len: u32 },
43 LengthMismatch { expected: u64, actual: u64 },
45 InvalidHead { page_id: u32 },
47}
48
49impl std::fmt::Display for OverflowError {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 match self {
52 Self::Pager(e) => write!(f, "pager error: {}", e),
53 Self::NotOverflowPage { page_id } => {
54 write!(f, "page {} is not an overflow page", page_id)
55 }
56 Self::PayloadTooLarge { page_id, len } => {
57 write!(
58 f,
59 "overflow page {} declares payload_len {} (max {})",
60 page_id, len, OVERFLOW_PAYLOAD_PER_PAGE
61 )
62 }
63 Self::LengthMismatch { expected, actual } => write!(
64 f,
65 "overflow chain length mismatch: expected {} bytes, chain holds {}",
66 expected, actual
67 ),
68 Self::InvalidHead { page_id } => {
69 write!(f, "free called with non-overflow head page {}", page_id)
70 }
71 }
72 }
73}
74
75impl std::error::Error for OverflowError {}
76
77impl From<PagerError> for OverflowError {
78 fn from(e: PagerError) -> Self {
79 Self::Pager(e)
80 }
81}
82
83pub fn pages_needed(len: usize) -> usize {
85 if len == 0 {
86 1
87 } else {
88 len.div_ceil(OVERFLOW_PAYLOAD_PER_PAGE)
89 }
90}
91
92fn read_chain_header(page: &Page) -> Result<(u32, u32), OverflowError> {
93 if page.page_type().map_err(PagerError::from)? != PageType::Overflow {
94 return Err(OverflowError::NotOverflowPage {
95 page_id: page.page_id(),
96 });
97 }
98 let content = page.content();
99 let next = u32::from_le_bytes([content[0], content[1], content[2], content[3]]);
100 let len = u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
101 if len as usize > OVERFLOW_PAYLOAD_PER_PAGE {
102 return Err(OverflowError::PayloadTooLarge {
103 page_id: page.page_id(),
104 len,
105 });
106 }
107 Ok((next, len))
108}
109
110fn write_chain_header(page: &mut Page, next: u32, payload_len: u32) {
111 let content = page.content_mut();
112 content[0..4].copy_from_slice(&next.to_le_bytes());
113 content[4..8].copy_from_slice(&payload_len.to_le_bytes());
114}
115
116pub struct OverflowChain<'p> {
122 pager: &'p Pager,
123}
124
125impl<'p> OverflowChain<'p> {
126 pub fn new(pager: &'p Pager) -> Self {
127 Self { pager }
128 }
129
130 pub fn store(&self, bytes: &[u8]) -> Result<(u32, u64), OverflowError> {
136 let total_len = bytes.len() as u64;
137 let n_pages = pages_needed(bytes.len());
138
139 let mut page_ids = Vec::with_capacity(n_pages);
140 for _ in 0..n_pages {
141 let page = self.pager.allocate_page(PageType::Overflow)?;
142 page_ids.push(page.page_id());
143 }
144
145 let mut offset = 0usize;
146 for (i, &pid) in page_ids.iter().enumerate() {
147 let next = if i + 1 < page_ids.len() {
148 page_ids[i + 1]
149 } else {
150 0
151 };
152 let chunk_end = (offset + OVERFLOW_PAYLOAD_PER_PAGE).min(bytes.len());
153 let chunk = &bytes[offset..chunk_end];
154 offset = chunk_end;
155
156 let mut page = Page::new(PageType::Overflow, pid);
157 write_chain_header(&mut page, next, chunk.len() as u32);
158 page.content_mut()[OVERFLOW_PAGE_HEADER..OVERFLOW_PAGE_HEADER + chunk.len()]
159 .copy_from_slice(chunk);
160
161 self.pager.write_page(pid, page)?;
162 }
163
164 Ok((page_ids[0], total_len))
165 }
166
167 pub fn read(&self, head_page_id: u32, total_len: u64) -> Result<Vec<u8>, OverflowError> {
173 let expected = total_len as usize;
174 let mut out = Vec::with_capacity(expected);
175 let mut current = head_page_id;
176 let mut collected: u64 = 0;
177
178 while current != 0 {
179 let page = self.pager.read_page(current).map_err(OverflowError::from)?;
180 let (next, len) = read_chain_header(&page)?;
181 let len_usize = len as usize;
182 collected += len as u64;
183
184 if collected > total_len {
185 return Err(OverflowError::LengthMismatch {
186 expected: total_len,
187 actual: collected,
188 });
189 }
190
191 let payload = &page.content()[OVERFLOW_PAGE_HEADER..OVERFLOW_PAGE_HEADER + len_usize];
192 out.extend_from_slice(payload);
193 current = next;
194 }
195
196 if collected != total_len {
197 return Err(OverflowError::LengthMismatch {
198 expected: total_len,
199 actual: collected,
200 });
201 }
202
203 Ok(out)
204 }
205
206 pub fn free(&self, head_page_id: u32) -> Result<(), OverflowError> {
209 let mut current = head_page_id;
210 let mut first = true;
211 while current != 0 {
212 let page = self.pager.read_page(current).map_err(OverflowError::from)?;
213 if page.page_type().map_err(PagerError::from)? != PageType::Overflow {
214 return Err(if first {
215 OverflowError::InvalidHead { page_id: current }
216 } else {
217 OverflowError::NotOverflowPage { page_id: current }
218 });
219 }
220 let (next, _) = read_chain_header(&page)?;
221 self.pager.free_page(current)?;
222 current = next;
223 first = false;
224 }
225 Ok(())
226 }
227}
228
229#[cfg(test)]
230mod tests {
231 use super::*;
232 use crate::storage::engine::pager::Pager;
233 use std::path::PathBuf;
234 use std::sync::atomic::{AtomicU64, Ordering};
235
236 fn temp_db_path() -> PathBuf {
237 static COUNTER: AtomicU64 = AtomicU64::new(0);
238 let id = COUNTER.fetch_add(1, Ordering::Relaxed);
239 let mut path = std::env::temp_dir();
240 path.push(format!(
241 "reddb_overflow_test_{}_{}.db",
242 std::process::id(),
243 id
244 ));
245 path
246 }
247
248 fn cleanup(path: &std::path::Path) {
249 let _ = std::fs::remove_file(path);
250 for suffix in ["-hdr", "-meta", "-dwb"] {
251 let mut p = path.to_path_buf().into_os_string();
252 p.push(suffix);
253 let _ = std::fs::remove_file(&p);
254 }
255 }
256
257 fn pattern(len: usize) -> Vec<u8> {
258 (0..len).map(|i| ((i * 31 + 7) & 0xff) as u8).collect()
259 }
260
261 #[test]
262 fn pages_needed_boundaries() {
263 assert_eq!(pages_needed(0), 1);
264 assert_eq!(pages_needed(1), 1);
265 assert_eq!(pages_needed(OVERFLOW_PAYLOAD_PER_PAGE), 1);
266 assert_eq!(pages_needed(OVERFLOW_PAYLOAD_PER_PAGE + 1), 2);
267 assert_eq!(pages_needed(OVERFLOW_PAYLOAD_PER_PAGE * 4), 4);
268 assert_eq!(pages_needed(OVERFLOW_PAYLOAD_PER_PAGE * 4 + 1), 5);
269 }
270
271 fn roundtrip(pager: &Pager, len: usize) {
272 let chain = OverflowChain::new(pager);
273 let data = pattern(len);
274 let (head, total) = chain.store(&data).unwrap();
275 assert_eq!(total, len as u64);
276 let read_back = chain.read(head, total).unwrap();
277 assert_eq!(read_back.len(), len);
278 assert_eq!(read_back, data);
279 chain.free(head).unwrap();
280 }
281
282 #[test]
283 fn store_read_roundtrips_across_sizes() {
284 let path = temp_db_path();
285 cleanup(&path);
286 {
287 let pager = Pager::open_default(&path).unwrap();
288 roundtrip(&pager, 1);
290 roundtrip(&pager, 100);
291 roundtrip(&pager, OVERFLOW_PAYLOAD_PER_PAGE - 1);
292 roundtrip(&pager, OVERFLOW_PAYLOAD_PER_PAGE);
294 roundtrip(&pager, OVERFLOW_PAYLOAD_PER_PAGE + 1);
296 roundtrip(&pager, OVERFLOW_PAYLOAD_PER_PAGE * 2);
297 roundtrip(&pager, OVERFLOW_PAYLOAD_PER_PAGE * 5 + 123);
299 roundtrip(&pager, OVERFLOW_PAYLOAD_PER_PAGE * 32);
301 roundtrip(&pager, OVERFLOW_PAYLOAD_PER_PAGE * 7);
303 }
304 cleanup(&path);
305 }
306
307 #[test]
308 fn store_empty_value_produces_single_page() {
309 let path = temp_db_path();
310 cleanup(&path);
311 {
312 let pager = Pager::open_default(&path).unwrap();
313 let chain = OverflowChain::new(&pager);
314 let (head, total) = chain.store(&[]).unwrap();
315 assert_eq!(total, 0);
316 let bytes = chain.read(head, total).unwrap();
317 assert!(bytes.is_empty());
318
319 let page = pager.read_page(head).unwrap();
321 let (next, len) = read_chain_header(&page).unwrap();
322 assert_eq!(next, 0);
323 assert_eq!(len, 0);
324
325 chain.free(head).unwrap();
326 }
327 cleanup(&path);
328 }
329
330 #[test]
331 fn read_with_wrong_total_len_errors() {
332 let path = temp_db_path();
333 cleanup(&path);
334 {
335 let pager = Pager::open_default(&path).unwrap();
336 let chain = OverflowChain::new(&pager);
337 let data = pattern(OVERFLOW_PAYLOAD_PER_PAGE * 3 + 50);
338 let (head, total) = chain.store(&data).unwrap();
339
340 let err = chain.read(head, total - 10).unwrap_err();
342 assert!(matches!(err, OverflowError::LengthMismatch { .. }));
343
344 let err = chain.read(head, total + 10).unwrap_err();
346 assert!(matches!(err, OverflowError::LengthMismatch { .. }));
347
348 chain.free(head).unwrap();
349 }
350 cleanup(&path);
351 }
352
353 #[test]
354 fn free_returns_pages_to_freelist_observably() {
355 let path = temp_db_path();
356 cleanup(&path);
357 {
358 let pager = Pager::open_default(&path).unwrap();
359 let chain = OverflowChain::new(&pager);
360
361 let len = OVERFLOW_PAYLOAD_PER_PAGE * 6 + 17;
362 let n = pages_needed(len);
363 let data = pattern(len);
364
365 let before_alloc = pager.page_count().unwrap();
366 let (head, _) = chain.store(&data).unwrap();
367 let after_alloc = pager.page_count().unwrap();
368 assert_eq!((after_alloc - before_alloc) as usize, n);
369
370 chain.free(head).unwrap();
371
372 let after_free = pager.page_count().unwrap();
375 let (head2, _) = chain.store(&data).unwrap();
376 let after_realloc = pager.page_count().unwrap();
377 assert_eq!(
378 after_realloc, after_free,
379 "second store should reuse freed pages"
380 );
381
382 chain.free(head2).unwrap();
383 }
384 cleanup(&path);
385 }
386
387 #[test]
388 fn free_then_store_reuses_pages_exact_count() {
389 let path = temp_db_path();
390 cleanup(&path);
391 {
392 let pager = Pager::open_default(&path).unwrap();
393 let chain = OverflowChain::new(&pager);
394
395 let len = OVERFLOW_PAYLOAD_PER_PAGE * 4;
396 let (head, _) = chain.store(&pattern(len)).unwrap();
397 let baseline = pager.page_count().unwrap();
398 chain.free(head).unwrap();
399 assert_eq!(pager.page_count().unwrap(), baseline);
401
402 let (head2, _) = chain.store(&pattern(len)).unwrap();
403 assert_eq!(pager.page_count().unwrap(), baseline);
404 chain.free(head2).unwrap();
405 }
406 cleanup(&path);
407 }
408}