Skip to main content

reddb_server/storage/engine/
overflow.rs

1//! Overflow chain storage
2//!
3//! Owns chains of dedicated overflow pages (PageType::Overflow), independent of
4//! B-tree semantics and MVCC. Backs the large-value spill path from ADR 0023
5//! (slice B of PRD #662).
6//!
7//! Each overflow page lays a small chain-local header at the start of its
8//! content area:
9//!
10//! ```text
11//! Offset  Size  Field
12//! ------  ----  -----
13//!   0      4    next_page_id (u32 LE, 0 = tail sentinel)
14//!   4      4    payload_len  (u32 LE, payload bytes on this page)
15//!   8     ..    payload bytes
16//! ```
17//!
18//! `version` (per ADR 0025) is added in slice C (#700); this slice only owns
19//! the chain mechanics — allocate, link, walk, free.
20//!
21//! Whole-value materialisation only — no streaming reads.
22
23use super::page::{Page, PageType, CONTENT_SIZE, HEADER_SIZE, PAGE_SIZE};
24use super::pager::{Pager, PagerError};
25
26/// Bytes consumed by the per-page chain header (next + payload_len).
27pub const OVERFLOW_PAGE_HEADER: usize = 8;
28
29/// Payload bytes that fit on a single overflow page.
30pub const OVERFLOW_PAYLOAD_PER_PAGE: usize = CONTENT_SIZE - OVERFLOW_PAGE_HEADER;
31
32const _: () = assert!(OVERFLOW_PAYLOAD_PER_PAGE == PAGE_SIZE - HEADER_SIZE - OVERFLOW_PAGE_HEADER);
33
34/// Errors returned by overflow-chain operations.
35#[derive(Debug)]
36pub enum OverflowError {
37    /// Underlying pager call failed.
38    Pager(PagerError),
39    /// A page reached while walking the chain is not an Overflow page.
40    NotOverflowPage { page_id: u32 },
41    /// A page declared a payload longer than the per-page capacity.
42    PayloadTooLarge { page_id: u32, len: u32 },
43    /// The advertised `total_len` disagrees with what the chain actually holds.
44    LengthMismatch { expected: u64, actual: u64 },
45    /// Caller asked to free a non-overflow page as a chain head.
46    InvalidHead { page_id: u32 },
47}
48
49impl std::fmt::Display for OverflowError {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        match self {
52            Self::Pager(e) => write!(f, "pager error: {}", e),
53            Self::NotOverflowPage { page_id } => {
54                write!(f, "page {} is not an overflow page", page_id)
55            }
56            Self::PayloadTooLarge { page_id, len } => {
57                write!(
58                    f,
59                    "overflow page {} declares payload_len {} (max {})",
60                    page_id, len, OVERFLOW_PAYLOAD_PER_PAGE
61                )
62            }
63            Self::LengthMismatch { expected, actual } => write!(
64                f,
65                "overflow chain length mismatch: expected {} bytes, chain holds {}",
66                expected, actual
67            ),
68            Self::InvalidHead { page_id } => {
69                write!(f, "free called with non-overflow head page {}", page_id)
70            }
71        }
72    }
73}
74
75impl std::error::Error for OverflowError {}
76
77impl From<PagerError> for OverflowError {
78    fn from(e: PagerError) -> Self {
79        Self::Pager(e)
80    }
81}
82
83/// Number of overflow pages required to hold `len` bytes.
84pub fn pages_needed(len: usize) -> usize {
85    if len == 0 {
86        1
87    } else {
88        len.div_ceil(OVERFLOW_PAYLOAD_PER_PAGE)
89    }
90}
91
92fn read_chain_header(page: &Page) -> Result<(u32, u32), OverflowError> {
93    if page.page_type().map_err(PagerError::from)? != PageType::Overflow {
94        return Err(OverflowError::NotOverflowPage {
95            page_id: page.page_id(),
96        });
97    }
98    let content = page.content();
99    let next = u32::from_le_bytes([content[0], content[1], content[2], content[3]]);
100    let len = u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
101    if len as usize > OVERFLOW_PAYLOAD_PER_PAGE {
102        return Err(OverflowError::PayloadTooLarge {
103            page_id: page.page_id(),
104            len,
105        });
106    }
107    Ok((next, len))
108}
109
110fn write_chain_header(page: &mut Page, next: u32, payload_len: u32) {
111    let content = page.content_mut();
112    content[0..4].copy_from_slice(&next.to_le_bytes());
113    content[4..8].copy_from_slice(&payload_len.to_le_bytes());
114}
115
116/// Chain operations over a pager.
117///
118/// Held by value rather than as `impl Pager` methods to keep overflow concerns
119/// out of `engine::pager`. Slice E will hold one of these from inside the
120/// B-tree write path.
121pub struct OverflowChain<'p> {
122    pager: &'p Pager,
123}
124
125impl<'p> OverflowChain<'p> {
126    pub fn new(pager: &'p Pager) -> Self {
127        Self { pager }
128    }
129
130    /// Allocate enough overflow pages to hold `bytes`, link them, and return
131    /// the head page id together with the total length.
132    ///
133    /// Empty input still produces a single zero-length head page so that the
134    /// leaf-side pointer is always valid.
135    pub fn store(&self, bytes: &[u8]) -> Result<(u32, u64), OverflowError> {
136        let total_len = bytes.len() as u64;
137        let n_pages = pages_needed(bytes.len());
138
139        let mut page_ids = Vec::with_capacity(n_pages);
140        for _ in 0..n_pages {
141            let page = self.pager.allocate_page(PageType::Overflow)?;
142            page_ids.push(page.page_id());
143        }
144
145        let mut offset = 0usize;
146        for (i, &pid) in page_ids.iter().enumerate() {
147            let next = if i + 1 < page_ids.len() {
148                page_ids[i + 1]
149            } else {
150                0
151            };
152            let chunk_end = (offset + OVERFLOW_PAYLOAD_PER_PAGE).min(bytes.len());
153            let chunk = &bytes[offset..chunk_end];
154            offset = chunk_end;
155
156            let mut page = Page::new(PageType::Overflow, pid);
157            write_chain_header(&mut page, next, chunk.len() as u32);
158            page.content_mut()[OVERFLOW_PAGE_HEADER..OVERFLOW_PAGE_HEADER + chunk.len()]
159                .copy_from_slice(chunk);
160
161            self.pager.write_page(pid, page)?;
162        }
163
164        Ok((page_ids[0], total_len))
165    }
166
167    /// Walk the chain and return the concatenated payload.
168    ///
169    /// `total_len` must match the actual bytes carried by the chain; if it
170    /// does not, a `LengthMismatch` error is returned (no truncation, no
171    /// silent extension).
172    pub fn read(&self, head_page_id: u32, total_len: u64) -> Result<Vec<u8>, OverflowError> {
173        let expected = total_len as usize;
174        let mut out = Vec::with_capacity(expected);
175        let mut current = head_page_id;
176        let mut collected: u64 = 0;
177
178        while current != 0 {
179            let page = self.pager.read_page(current).map_err(OverflowError::from)?;
180            let (next, len) = read_chain_header(&page)?;
181            let len_usize = len as usize;
182            collected += len as u64;
183
184            if collected > total_len {
185                return Err(OverflowError::LengthMismatch {
186                    expected: total_len,
187                    actual: collected,
188                });
189            }
190
191            let payload = &page.content()[OVERFLOW_PAGE_HEADER..OVERFLOW_PAGE_HEADER + len_usize];
192            out.extend_from_slice(payload);
193            current = next;
194        }
195
196        if collected != total_len {
197            return Err(OverflowError::LengthMismatch {
198                expected: total_len,
199                actual: collected,
200            });
201        }
202
203        Ok(out)
204    }
205
206    /// Walk the chain starting at `head_page_id` and return every page to the
207    /// free list.
208    pub fn free(&self, head_page_id: u32) -> Result<(), OverflowError> {
209        let mut current = head_page_id;
210        let mut first = true;
211        while current != 0 {
212            let page = self.pager.read_page(current).map_err(OverflowError::from)?;
213            if page.page_type().map_err(PagerError::from)? != PageType::Overflow {
214                return Err(if first {
215                    OverflowError::InvalidHead { page_id: current }
216                } else {
217                    OverflowError::NotOverflowPage { page_id: current }
218                });
219            }
220            let (next, _) = read_chain_header(&page)?;
221            self.pager.free_page(current)?;
222            current = next;
223            first = false;
224        }
225        Ok(())
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    use super::*;
232    use crate::storage::engine::pager::Pager;
233    use std::path::PathBuf;
234    use std::sync::atomic::{AtomicU64, Ordering};
235
236    fn temp_db_path() -> PathBuf {
237        static COUNTER: AtomicU64 = AtomicU64::new(0);
238        let id = COUNTER.fetch_add(1, Ordering::Relaxed);
239        let mut path = std::env::temp_dir();
240        path.push(format!(
241            "reddb_overflow_test_{}_{}.db",
242            std::process::id(),
243            id
244        ));
245        path
246    }
247
248    fn cleanup(path: &std::path::Path) {
249        let _ = std::fs::remove_file(path);
250        for suffix in ["-hdr", "-meta", "-dwb"] {
251            let mut p = path.to_path_buf().into_os_string();
252            p.push(suffix);
253            let _ = std::fs::remove_file(&p);
254        }
255    }
256
257    fn pattern(len: usize) -> Vec<u8> {
258        (0..len).map(|i| ((i * 31 + 7) & 0xff) as u8).collect()
259    }
260
261    #[test]
262    fn pages_needed_boundaries() {
263        assert_eq!(pages_needed(0), 1);
264        assert_eq!(pages_needed(1), 1);
265        assert_eq!(pages_needed(OVERFLOW_PAYLOAD_PER_PAGE), 1);
266        assert_eq!(pages_needed(OVERFLOW_PAYLOAD_PER_PAGE + 1), 2);
267        assert_eq!(pages_needed(OVERFLOW_PAYLOAD_PER_PAGE * 4), 4);
268        assert_eq!(pages_needed(OVERFLOW_PAYLOAD_PER_PAGE * 4 + 1), 5);
269    }
270
271    fn roundtrip(pager: &Pager, len: usize) {
272        let chain = OverflowChain::new(pager);
273        let data = pattern(len);
274        let (head, total) = chain.store(&data).unwrap();
275        assert_eq!(total, len as u64);
276        let read_back = chain.read(head, total).unwrap();
277        assert_eq!(read_back.len(), len);
278        assert_eq!(read_back, data);
279        chain.free(head).unwrap();
280    }
281
282    #[test]
283    fn store_read_roundtrips_across_sizes() {
284        let path = temp_db_path();
285        cleanup(&path);
286        {
287            let pager = Pager::open_default(&path).unwrap();
288            // one page
289            roundtrip(&pager, 1);
290            roundtrip(&pager, 100);
291            roundtrip(&pager, OVERFLOW_PAYLOAD_PER_PAGE - 1);
292            // exact one-page boundary
293            roundtrip(&pager, OVERFLOW_PAYLOAD_PER_PAGE);
294            // two pages
295            roundtrip(&pager, OVERFLOW_PAYLOAD_PER_PAGE + 1);
296            roundtrip(&pager, OVERFLOW_PAYLOAD_PER_PAGE * 2);
297            // several pages
298            roundtrip(&pager, OVERFLOW_PAYLOAD_PER_PAGE * 5 + 123);
299            // many pages
300            roundtrip(&pager, OVERFLOW_PAYLOAD_PER_PAGE * 32);
301            // exact multi-page boundary
302            roundtrip(&pager, OVERFLOW_PAYLOAD_PER_PAGE * 7);
303        }
304        cleanup(&path);
305    }
306
307    #[test]
308    fn store_empty_value_produces_single_page() {
309        let path = temp_db_path();
310        cleanup(&path);
311        {
312            let pager = Pager::open_default(&path).unwrap();
313            let chain = OverflowChain::new(&pager);
314            let (head, total) = chain.store(&[]).unwrap();
315            assert_eq!(total, 0);
316            let bytes = chain.read(head, total).unwrap();
317            assert!(bytes.is_empty());
318
319            // Confirm it's exactly one page in the chain.
320            let page = pager.read_page(head).unwrap();
321            let (next, len) = read_chain_header(&page).unwrap();
322            assert_eq!(next, 0);
323            assert_eq!(len, 0);
324
325            chain.free(head).unwrap();
326        }
327        cleanup(&path);
328    }
329
330    #[test]
331    fn read_with_wrong_total_len_errors() {
332        let path = temp_db_path();
333        cleanup(&path);
334        {
335            let pager = Pager::open_default(&path).unwrap();
336            let chain = OverflowChain::new(&pager);
337            let data = pattern(OVERFLOW_PAYLOAD_PER_PAGE * 3 + 50);
338            let (head, total) = chain.store(&data).unwrap();
339
340            // Too short: chain reports more bytes than caller expects.
341            let err = chain.read(head, total - 10).unwrap_err();
342            assert!(matches!(err, OverflowError::LengthMismatch { .. }));
343
344            // Too long: chain ends before caller's expected length.
345            let err = chain.read(head, total + 10).unwrap_err();
346            assert!(matches!(err, OverflowError::LengthMismatch { .. }));
347
348            chain.free(head).unwrap();
349        }
350        cleanup(&path);
351    }
352
353    #[test]
354    fn free_returns_pages_to_freelist_observably() {
355        let path = temp_db_path();
356        cleanup(&path);
357        {
358            let pager = Pager::open_default(&path).unwrap();
359            let chain = OverflowChain::new(&pager);
360
361            let len = OVERFLOW_PAYLOAD_PER_PAGE * 6 + 17;
362            let n = pages_needed(len);
363            let data = pattern(len);
364
365            let before_alloc = pager.page_count().unwrap();
366            let (head, _) = chain.store(&data).unwrap();
367            let after_alloc = pager.page_count().unwrap();
368            assert_eq!((after_alloc - before_alloc) as usize, n);
369
370            chain.free(head).unwrap();
371
372            // A second store of the same size must reuse the freed pages
373            // rather than extending the file.
374            let after_free = pager.page_count().unwrap();
375            let (head2, _) = chain.store(&data).unwrap();
376            let after_realloc = pager.page_count().unwrap();
377            assert_eq!(
378                after_realloc, after_free,
379                "second store should reuse freed pages"
380            );
381
382            chain.free(head2).unwrap();
383        }
384        cleanup(&path);
385    }
386
387    #[test]
388    fn free_then_store_reuses_pages_exact_count() {
389        let path = temp_db_path();
390        cleanup(&path);
391        {
392            let pager = Pager::open_default(&path).unwrap();
393            let chain = OverflowChain::new(&pager);
394
395            let len = OVERFLOW_PAYLOAD_PER_PAGE * 4;
396            let (head, _) = chain.store(&pattern(len)).unwrap();
397            let baseline = pager.page_count().unwrap();
398            chain.free(head).unwrap();
399            // free does not shrink the file
400            assert_eq!(pager.page_count().unwrap(), baseline);
401
402            let (head2, _) = chain.store(&pattern(len)).unwrap();
403            assert_eq!(pager.page_count().unwrap(), baseline);
404            chain.free(head2).unwrap();
405        }
406        cleanup(&path);
407    }
408}