Skip to main content

reddb_server/storage/engine/
overflow.rs

1//! Overflow chain storage
2//!
3//! Owns chains of dedicated overflow pages (PageType::Overflow), independent of
4//! B-tree semantics and MVCC. Backs the large-value spill path from ADR 0023
5//! (slice B of PRD #662).
6//!
7//! Each overflow page lays a small chain-local header at the start of its
8//! content area:
9//!
10//! ```text
11//! Offset  Size  Field
12//! ------  ----  -----
13//!   0      4    next_page_id (u32 LE, 0 = tail sentinel)
14//!   4      4    payload_len  (u32 LE, payload bytes on this page)
15//!   8     ..    payload bytes
16//! ```
17//!
18//! `version` (per ADR 0025) is added in slice C (#700); this slice only owns
19//! the chain mechanics — allocate, link, walk, free.
20//!
21//! Whole-value materialisation only — no streaming reads.
22
23use super::page::{Page, PageType, CONTENT_SIZE, HEADER_SIZE, PAGE_SIZE};
24use super::pager::{Pager, PagerError};
25
26/// Bytes consumed by the per-page chain header (next + payload_len).
27pub const OVERFLOW_PAGE_HEADER: usize = 8;
28
29/// Payload bytes that fit on a single overflow page.
30pub const OVERFLOW_PAYLOAD_PER_PAGE: usize = CONTENT_SIZE - OVERFLOW_PAGE_HEADER;
31
32const _: () = assert!(OVERFLOW_PAYLOAD_PER_PAGE == PAGE_SIZE - HEADER_SIZE - OVERFLOW_PAGE_HEADER);
33
34/// Errors returned by overflow-chain operations.
35#[derive(Debug)]
36pub enum OverflowError {
37    /// Underlying pager call failed.
38    Pager(PagerError),
39    /// A page reached while walking the chain is not an Overflow page.
40    NotOverflowPage { page_id: u32 },
41    /// A page declared a payload longer than the per-page capacity.
42    PayloadTooLarge { page_id: u32, len: u32 },
43    /// The advertised `total_len` disagrees with what the chain actually holds.
44    LengthMismatch { expected: u64, actual: u64 },
45    /// Caller asked to free a non-overflow page as a chain head.
46    InvalidHead { page_id: u32 },
47}
48
49impl std::fmt::Display for OverflowError {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        match self {
52            Self::Pager(e) => write!(f, "pager error: {}", e),
53            Self::NotOverflowPage { page_id } => {
54                write!(f, "page {} is not an overflow page", page_id)
55            }
56            Self::PayloadTooLarge { page_id, len } => {
57                write!(
58                    f,
59                    "overflow page {} declares payload_len {} (max {})",
60                    page_id, len, OVERFLOW_PAYLOAD_PER_PAGE
61                )
62            }
63            Self::LengthMismatch { expected, actual } => write!(
64                f,
65                "overflow chain length mismatch: expected {} bytes, chain holds {}",
66                expected, actual
67            ),
68            Self::InvalidHead { page_id } => {
69                write!(f, "free called with non-overflow head page {}", page_id)
70            }
71        }
72    }
73}
74
75impl std::error::Error for OverflowError {}
76
77impl From<PagerError> for OverflowError {
78    fn from(e: PagerError) -> Self {
79        Self::Pager(e)
80    }
81}
82
83/// Number of overflow pages required to hold `len` bytes.
84pub fn pages_needed(len: usize) -> usize {
85    if len == 0 {
86        1
87    } else {
88        len.div_ceil(OVERFLOW_PAYLOAD_PER_PAGE)
89    }
90}
91
92fn read_chain_header(page: &Page) -> Result<(u32, u32), OverflowError> {
93    if page.page_type().map_err(PagerError::from)? != PageType::Overflow {
94        return Err(OverflowError::NotOverflowPage {
95            page_id: page.page_id(),
96        });
97    }
98    let content = page.content();
99    let next = u32::from_le_bytes([content[0], content[1], content[2], content[3]]);
100    let len = u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
101    if len as usize > OVERFLOW_PAYLOAD_PER_PAGE {
102        return Err(OverflowError::PayloadTooLarge {
103            page_id: page.page_id(),
104            len,
105        });
106    }
107    Ok((next, len))
108}
109
110fn write_chain_header(page: &mut Page, next: u32, payload_len: u32) {
111    let content = page.content_mut();
112    content[0..4].copy_from_slice(&next.to_le_bytes());
113    content[4..8].copy_from_slice(&payload_len.to_le_bytes());
114}
115
116/// Chain operations over a pager.
117///
118/// Held by value rather than as `impl Pager` methods to keep overflow concerns
119/// out of `engine::pager`. Slice E will hold one of these from inside the
120/// B-tree write path.
121pub struct OverflowChain<'p> {
122    pager: &'p Pager,
123}
124
125impl<'p> OverflowChain<'p> {
126    pub fn new(pager: &'p Pager) -> Self {
127        Self { pager }
128    }
129
130    /// Allocate enough overflow pages to hold `bytes`, link them, and return
131    /// the head page id together with the total length.
132    ///
133    /// Empty input still produces a single zero-length head page so that the
134    /// leaf-side pointer is always valid.
135    pub fn store(&self, bytes: &[u8]) -> Result<(u32, u64), OverflowError> {
136        let total_len = bytes.len() as u64;
137        let n_pages = pages_needed(bytes.len());
138
139        let mut page_ids = Vec::with_capacity(n_pages);
140        for _ in 0..n_pages {
141            let page = self.pager.allocate_page(PageType::Overflow)?;
142            page_ids.push(page.page_id());
143        }
144
145        let mut offset = 0usize;
146        for (i, &pid) in page_ids.iter().enumerate() {
147            let next = if i + 1 < page_ids.len() {
148                page_ids[i + 1]
149            } else {
150                0
151            };
152            let chunk_end = (offset + OVERFLOW_PAYLOAD_PER_PAGE).min(bytes.len());
153            let chunk = &bytes[offset..chunk_end];
154            offset = chunk_end;
155
156            let mut page = Page::new(PageType::Overflow, pid);
157            write_chain_header(&mut page, next, chunk.len() as u32);
158            page.content_mut()[OVERFLOW_PAGE_HEADER..OVERFLOW_PAGE_HEADER + chunk.len()]
159                .copy_from_slice(chunk);
160
161            self.pager.write_page(pid, page)?;
162        }
163
164        Ok((page_ids[0], total_len))
165    }
166
167    /// Walk the chain and return the concatenated payload.
168    ///
169    /// `total_len` must match the actual bytes carried by the chain; if it
170    /// does not, a `LengthMismatch` error is returned (no truncation, no
171    /// silent extension).
172    pub fn read(&self, head_page_id: u32, total_len: u64) -> Result<Vec<u8>, OverflowError> {
173        let expected = total_len as usize;
174        let mut out = Vec::with_capacity(expected);
175        let mut current = head_page_id;
176        let mut collected: u64 = 0;
177
178        while current != 0 {
179            let page = self.pager.read_page(current).map_err(OverflowError::from)?;
180            let (next, len) = read_chain_header(&page)?;
181            let len_usize = len as usize;
182            collected += len as u64;
183
184            if collected > total_len {
185                return Err(OverflowError::LengthMismatch {
186                    expected: total_len,
187                    actual: collected,
188                });
189            }
190
191            let payload = &page.content()[OVERFLOW_PAGE_HEADER..OVERFLOW_PAGE_HEADER + len_usize];
192            out.extend_from_slice(payload);
193            current = next;
194        }
195
196        if collected != total_len {
197            return Err(OverflowError::LengthMismatch {
198                expected: total_len,
199                actual: collected,
200            });
201        }
202
203        Ok(out)
204    }
205
206    /// Walk the chain starting at `head_page_id` and return every page to the
207    /// free list.
208    pub fn free(&self, head_page_id: u32) -> Result<(), OverflowError> {
209        let mut current = head_page_id;
210        let mut first = true;
211        while current != 0 {
212            let page = self.pager.read_page(current).map_err(OverflowError::from)?;
213            if page.page_type().map_err(PagerError::from)? != PageType::Overflow {
214                return Err(if first {
215                    OverflowError::InvalidHead { page_id: current }
216                } else {
217                    OverflowError::NotOverflowPage { page_id: current }
218                });
219            }
220            let (next, _) = read_chain_header(&page)?;
221            self.pager.free_page(current)?;
222            current = next;
223            first = false;
224        }
225        Ok(())
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    use super::*;
232    use crate::storage::engine::pager::Pager;
233    use std::path::PathBuf;
234    use std::sync::atomic::{AtomicU64, Ordering};
235
236    fn temp_db_path() -> PathBuf {
237        static COUNTER: AtomicU64 = AtomicU64::new(0);
238        let id = COUNTER.fetch_add(1, Ordering::Relaxed);
239        let mut path = std::env::temp_dir();
240        path.push(format!(
241            "reddb_overflow_test_{}_{}.db",
242            std::process::id(),
243            id
244        ));
245        path
246    }
247
248    fn cleanup(path: &std::path::Path) {
249        let _ = std::fs::remove_file(path);
250        for sidecar in reddb_file::layout::pager_shadow_sidecar_paths(path) {
251            let _ = std::fs::remove_file(sidecar);
252        }
253    }
254
255    fn pattern(len: usize) -> Vec<u8> {
256        (0..len).map(|i| ((i * 31 + 7) & 0xff) as u8).collect()
257    }
258
259    #[test]
260    fn pages_needed_boundaries() {
261        assert_eq!(pages_needed(0), 1);
262        assert_eq!(pages_needed(1), 1);
263        assert_eq!(pages_needed(OVERFLOW_PAYLOAD_PER_PAGE), 1);
264        assert_eq!(pages_needed(OVERFLOW_PAYLOAD_PER_PAGE + 1), 2);
265        assert_eq!(pages_needed(OVERFLOW_PAYLOAD_PER_PAGE * 4), 4);
266        assert_eq!(pages_needed(OVERFLOW_PAYLOAD_PER_PAGE * 4 + 1), 5);
267    }
268
269    fn roundtrip(pager: &Pager, len: usize) {
270        let chain = OverflowChain::new(pager);
271        let data = pattern(len);
272        let (head, total) = chain.store(&data).unwrap();
273        assert_eq!(total, len as u64);
274        let read_back = chain.read(head, total).unwrap();
275        assert_eq!(read_back.len(), len);
276        assert_eq!(read_back, data);
277        chain.free(head).unwrap();
278    }
279
280    #[test]
281    fn store_read_roundtrips_across_sizes() {
282        let path = temp_db_path();
283        cleanup(&path);
284        {
285            let pager = Pager::open_default(&path).unwrap();
286            // one page
287            roundtrip(&pager, 1);
288            roundtrip(&pager, 100);
289            roundtrip(&pager, OVERFLOW_PAYLOAD_PER_PAGE - 1);
290            // exact one-page boundary
291            roundtrip(&pager, OVERFLOW_PAYLOAD_PER_PAGE);
292            // two pages
293            roundtrip(&pager, OVERFLOW_PAYLOAD_PER_PAGE + 1);
294            roundtrip(&pager, OVERFLOW_PAYLOAD_PER_PAGE * 2);
295            // several pages
296            roundtrip(&pager, OVERFLOW_PAYLOAD_PER_PAGE * 5 + 123);
297            // many pages
298            roundtrip(&pager, OVERFLOW_PAYLOAD_PER_PAGE * 32);
299            // exact multi-page boundary
300            roundtrip(&pager, OVERFLOW_PAYLOAD_PER_PAGE * 7);
301        }
302        cleanup(&path);
303    }
304
305    #[test]
306    fn store_empty_value_produces_single_page() {
307        let path = temp_db_path();
308        cleanup(&path);
309        {
310            let pager = Pager::open_default(&path).unwrap();
311            let chain = OverflowChain::new(&pager);
312            let (head, total) = chain.store(&[]).unwrap();
313            assert_eq!(total, 0);
314            let bytes = chain.read(head, total).unwrap();
315            assert!(bytes.is_empty());
316
317            // Confirm it's exactly one page in the chain.
318            let page = pager.read_page(head).unwrap();
319            let (next, len) = read_chain_header(&page).unwrap();
320            assert_eq!(next, 0);
321            assert_eq!(len, 0);
322
323            chain.free(head).unwrap();
324        }
325        cleanup(&path);
326    }
327
328    #[test]
329    fn read_with_wrong_total_len_errors() {
330        let path = temp_db_path();
331        cleanup(&path);
332        {
333            let pager = Pager::open_default(&path).unwrap();
334            let chain = OverflowChain::new(&pager);
335            let data = pattern(OVERFLOW_PAYLOAD_PER_PAGE * 3 + 50);
336            let (head, total) = chain.store(&data).unwrap();
337
338            // Too short: chain reports more bytes than caller expects.
339            let err = chain.read(head, total - 10).unwrap_err();
340            assert!(matches!(err, OverflowError::LengthMismatch { .. }));
341
342            // Too long: chain ends before caller's expected length.
343            let err = chain.read(head, total + 10).unwrap_err();
344            assert!(matches!(err, OverflowError::LengthMismatch { .. }));
345
346            chain.free(head).unwrap();
347        }
348        cleanup(&path);
349    }
350
351    #[test]
352    fn free_returns_pages_to_freelist_observably() {
353        let path = temp_db_path();
354        cleanup(&path);
355        {
356            let pager = Pager::open_default(&path).unwrap();
357            let chain = OverflowChain::new(&pager);
358
359            let len = OVERFLOW_PAYLOAD_PER_PAGE * 6 + 17;
360            let n = pages_needed(len);
361            let data = pattern(len);
362
363            let before_alloc = pager.page_count().unwrap();
364            let (head, _) = chain.store(&data).unwrap();
365            let after_alloc = pager.page_count().unwrap();
366            assert_eq!((after_alloc - before_alloc) as usize, n);
367
368            chain.free(head).unwrap();
369
370            // A second store of the same size must reuse the freed pages
371            // rather than extending the file.
372            let after_free = pager.page_count().unwrap();
373            let (head2, _) = chain.store(&data).unwrap();
374            let after_realloc = pager.page_count().unwrap();
375            assert_eq!(
376                after_realloc, after_free,
377                "second store should reuse freed pages"
378            );
379
380            chain.free(head2).unwrap();
381        }
382        cleanup(&path);
383    }
384
385    #[test]
386    fn free_then_store_reuses_pages_exact_count() {
387        let path = temp_db_path();
388        cleanup(&path);
389        {
390            let pager = Pager::open_default(&path).unwrap();
391            let chain = OverflowChain::new(&pager);
392
393            let len = OVERFLOW_PAYLOAD_PER_PAGE * 4;
394            let (head, _) = chain.store(&pattern(len)).unwrap();
395            let baseline = pager.page_count().unwrap();
396            chain.free(head).unwrap();
397            // free does not shrink the file
398            assert_eq!(pager.page_count().unwrap(), baseline);
399
400            let (head2, _) = chain.store(&pattern(len)).unwrap();
401            assert_eq!(pager.page_count().unwrap(), baseline);
402            chain.free(head2).unwrap();
403        }
404        cleanup(&path);
405    }
406}