Skip to main content

reddb_server/storage/engine/btree/
value_layout.rs

1//! Large-value layout for B-tree leaf cells (slice E of PRD #662).
2//!
3//! Wires the standalone slice modules together at the write/read boundary:
4//!
5//! - [`value_codec`] (slice A, #698) decides whether the bytes are worth
6//!   compressing.
7//! - [`OverflowChain`] (slice B, #699) owns the chain of dedicated overflow
8//!   pages.
9//! - [`page_format::LeafCellFlags`] (slice C, #700) pins the two-bit
10//!   `(pointer, compressed)` shape — this module uses the same bit
11//!   layout for the per-cell flag byte so the page-format decoder
12//!   stays the single source of truth for the bit positions.
13//!
14//! The function entry points are [`encode`] and [`decode`]. They share
15//! one on-disk byte layout the rest of the engine can treat as opaque
16//! stored bytes:
17//!
18//! ```text
19//! Inline raw:        [0x00][bytes...]
20//! Inline compressed: [0x02][lz4_len: u32 LE][lz4 bytes...]
21//! Pointer raw:       [0x01][head_page_id: u32 LE][total_len: u64 LE]
22//! Pointer compressed:[0x03][head_page_id: u32 LE][total_len: u64 LE]
23//! ```
24//!
25//! All four shapes are valid leaf-cell payloads; the leaf layer never
26//! decodes them — only the read path does (via [`decode`]).
27//!
28//! Decision ladder applied by [`encode`]:
29//!
30//! 1. `value.len() > MAX_VALUE_SIZE` → [`ValueLayoutError::ValueTooLarge`]
31//!    (no allocation, no LZ4 work).
32//! 2. `value.len() <= OVERFLOW_THRESHOLD` → inline raw.
33//! 3. Else try LZ4. If the compressed bytes fit inline → inline compressed.
34//! 4. Else spill via [`OverflowChain`], storing only the pointer in the
35//!    leaf cell.
36//!
37//! Per ADR 0025 (overflow chain MVCC) the chain identity is anchored at
38//! the leaf cell, so this slice does not add any WAL records of its own
39//! — the overflow page writes go through the existing pager path.
40
41use crate::storage::engine::overflow::{OverflowChain, OverflowError};
42use crate::storage::engine::pager::Pager;
43use crate::storage::engine::vector_btree::value_codec;
44
45/// At or below this length a value stores inline in the leaf, raw or
46/// compressed. Above it, the value spills via [`OverflowChain`]. Set per
47/// ADR 0023 — preserves leaf fanout regardless of page size.
48pub const OVERFLOW_THRESHOLD: usize = 1024;
49
50/// Hard upper bound on logical value size. Values above this are
51/// rejected before any LZ4 or overflow work runs.
52/// 2^28 = 256 MiB per ADR 0023.
53pub const MAX_VALUE_SIZE: usize = 256 * 1024 * 1024;
54
55/// Bit positions match [`crate::storage::engine::vector_btree::page_format::LeafCellFlags`]
56/// so the on-disk decoder stays the single source of truth.
57const FLAG_POINTER: u8 = 0b0000_0001;
58const FLAG_COMPRESSED: u8 = 0b0000_0010;
59const FLAG_RESERVED_MASK: u8 = !(FLAG_POINTER | FLAG_COMPRESSED);
60
61/// Length of the spill pointer payload: `head_page_id: u32 LE` then
62/// `total_len: u64 LE`.
63const POINTER_PAYLOAD_LEN: usize = 4 + 8;
64
65/// Total stored bytes for a pointer cell — flag byte + payload.
66pub const POINTER_CELL_LEN: usize = 1 + POINTER_PAYLOAD_LEN;
67
68/// Errors returned by [`encode`] and [`decode`].
69#[derive(Debug)]
70pub enum ValueLayoutError {
71    /// Logical value length exceeds [`MAX_VALUE_SIZE`].
72    ValueTooLarge(usize),
73    /// Stored cell flag byte sets a reserved bit.
74    UnknownFlag(u8),
75    /// Stored bytes ended before the expected pointer payload.
76    TruncatedPointer { got: usize },
77    /// LZ4 decode failed.
78    Codec(value_codec::ValueCodecError),
79    /// Overflow chain operation failed.
80    Overflow(OverflowError),
81}
82
83impl std::fmt::Display for ValueLayoutError {
84    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85        match self {
86            Self::ValueTooLarge(n) => {
87                write!(f, "value too large: {} bytes (max {})", n, MAX_VALUE_SIZE)
88            }
89            Self::UnknownFlag(b) => write!(f, "unknown leaf-cell flag byte: 0b{:08b}", b),
90            Self::TruncatedPointer { got } => {
91                write!(
92                    f,
93                    "pointer cell truncated: need {} bytes after flag, got {}",
94                    POINTER_PAYLOAD_LEN, got
95                )
96            }
97            Self::Codec(e) => write!(f, "value codec: {}", e),
98            Self::Overflow(e) => write!(f, "overflow chain: {}", e),
99        }
100    }
101}
102
103impl std::error::Error for ValueLayoutError {}
104
105impl From<value_codec::ValueCodecError> for ValueLayoutError {
106    fn from(e: value_codec::ValueCodecError) -> Self {
107        Self::Codec(e)
108    }
109}
110
111impl From<OverflowError> for ValueLayoutError {
112    fn from(e: OverflowError) -> Self {
113        Self::Overflow(e)
114    }
115}
116
117/// Apply the decision ladder and return the bytes the B-tree should
118/// persist in the leaf cell value slot. May allocate overflow pages
119/// through `pager`.
120pub fn encode(pager: &Pager, value: &[u8]) -> Result<Vec<u8>, ValueLayoutError> {
121    if value.len() > MAX_VALUE_SIZE {
122        return Err(ValueLayoutError::ValueTooLarge(value.len()));
123    }
124
125    // Step 1: small values short-circuit to inline raw. No LZ4 work, no
126    // overflow allocation — preserves the legacy hot path for tiny
127    // values like entity IDs or short JSON.
128    if value.len() <= OVERFLOW_THRESHOLD {
129        let mut out = Vec::with_capacity(1 + value.len());
130        out.push(0);
131        out.extend_from_slice(value);
132        return Ok(out);
133    }
134
135    // Step 2: above the threshold, try LZ4. The codec returns Raw when
136    // compression would not shrink the input.
137    let (codec_flag, codec_bytes) = value_codec::encode(value);
138
139    // Step 3: if compressed bytes (including their length header) fit
140    // inline, keep them in the leaf. Note this only fires when codec
141    // actually produced Lz4 bytes — Raw bytes ≥ original length > threshold
142    // by construction, so they cannot inline.
143    if codec_flag == value_codec::ValueFlag::Lz4 && codec_bytes.len() <= OVERFLOW_THRESHOLD {
144        let mut out = Vec::with_capacity(1 + codec_bytes.len());
145        out.push(FLAG_COMPRESSED);
146        out.extend_from_slice(&codec_bytes);
147        return Ok(out);
148    }
149
150    // Step 4: spill. We always store the bytes the codec produced —
151    // either Lz4 (with its 4-byte length header) so the reader can
152    // round-trip via the same codec, or Raw bytes when compression
153    // failed to shrink.
154    let is_compressed = codec_flag == value_codec::ValueFlag::Lz4;
155    let chain = OverflowChain::new(pager);
156    let (head, total_len) = chain.store(&codec_bytes)?;
157
158    let mut flag = FLAG_POINTER;
159    if is_compressed {
160        flag |= FLAG_COMPRESSED;
161    }
162    let mut out = Vec::with_capacity(POINTER_CELL_LEN);
163    out.push(flag);
164    out.extend_from_slice(&head.to_le_bytes());
165    out.extend_from_slice(&total_len.to_le_bytes());
166    Ok(out)
167}
168
169/// Inspect leaf-cell flags, follow the pointer if any, concatenate the
170/// chain, decode if compressed, return the materialised value.
171pub fn decode(pager: &Pager, stored: &[u8]) -> Result<Vec<u8>, ValueLayoutError> {
172    if stored.is_empty() {
173        // An empty cell payload encodes an empty value the same way as
174        // an inline-raw cell of length zero — there is no flag byte to
175        // read, but the materialised value is unambiguous.
176        return Ok(Vec::new());
177    }
178
179    let flag = stored[0];
180    if flag & FLAG_RESERVED_MASK != 0 {
181        return Err(ValueLayoutError::UnknownFlag(flag));
182    }
183    let is_pointer = flag & FLAG_POINTER != 0;
184    let is_compressed = flag & FLAG_COMPRESSED != 0;
185    let payload = &stored[1..];
186
187    if is_pointer {
188        if payload.len() != POINTER_PAYLOAD_LEN {
189            return Err(ValueLayoutError::TruncatedPointer { got: payload.len() });
190        }
191        let head = u32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]);
192        let total_len = u64::from_le_bytes([
193            payload[4],
194            payload[5],
195            payload[6],
196            payload[7],
197            payload[8],
198            payload[9],
199            payload[10],
200            payload[11],
201        ]);
202        let chain = OverflowChain::new(pager);
203        let chain_bytes = chain.read(head, total_len)?;
204        if is_compressed {
205            Ok(value_codec::decode(
206                value_codec::ValueFlag::Lz4,
207                &chain_bytes,
208            )?)
209        } else {
210            Ok(chain_bytes)
211        }
212    } else if is_compressed {
213        Ok(value_codec::decode(value_codec::ValueFlag::Lz4, payload)?)
214    } else {
215        Ok(payload.to_vec())
216    }
217}
218
219/// Return the head overflow page id when `stored` represents a pointer
220/// cell, else `None`. Returns `None` for inline (raw or compressed)
221/// cells and for empty/malformed cells. Callers use this from the
222/// B-tree delete and shrinking-update paths (slice F of PRD #662) to
223/// free the chain before the leaf cell goes away.
224pub fn pointer_head(stored: &[u8]) -> Option<u32> {
225    if stored.is_empty() {
226        return None;
227    }
228    let flag = stored[0];
229    if flag & FLAG_RESERVED_MASK != 0 {
230        return None;
231    }
232    if flag & FLAG_POINTER == 0 {
233        return None;
234    }
235    if stored.len() < 1 + POINTER_PAYLOAD_LEN {
236        return None;
237    }
238    let payload = &stored[1..];
239    Some(u32::from_le_bytes([
240        payload[0], payload[1], payload[2], payload[3],
241    ]))
242}
243
244/// `true` when [`encode`] would emit a spill pointer for a value of
245/// this length (without actually spilling). Used by the leaf-fit check
246/// in `bulk_insert_sorted` to size cells before allocation.
247#[inline]
248#[allow(dead_code)]
249pub fn projected_cell_len(input: &[u8]) -> usize {
250    if input.len() <= OVERFLOW_THRESHOLD {
251        return 1 + input.len();
252    }
253    let codec_len = value_codec::would_encode_to(input);
254    // Track the codec's own decision: when LZ4 would not shrink the
255    // input, codec_len == input.len() and the encoded flag is Raw. In
256    // that case the encoded payload cannot fit inline (input > threshold
257    // is the entry condition) so we spill.
258    if codec_len < input.len() && codec_len <= OVERFLOW_THRESHOLD {
259        1 + codec_len
260    } else {
261        POINTER_CELL_LEN
262    }
263}
264
265#[cfg(test)]
266mod tests {
267    use super::*;
268    use crate::storage::engine::pager::Pager;
269    use std::path::PathBuf;
270    use std::sync::atomic::{AtomicU64, Ordering};
271
272    fn temp_db_path() -> PathBuf {
273        static COUNTER: AtomicU64 = AtomicU64::new(0);
274        let id = COUNTER.fetch_add(1, Ordering::Relaxed);
275        let mut path = std::env::temp_dir();
276        path.push(format!(
277            "reddb_value_layout_test_{}_{}.db",
278            std::process::id(),
279            id
280        ));
281        path
282    }
283
284    fn cleanup(path: &std::path::Path) {
285        let _ = std::fs::remove_file(path);
286        for suffix in ["-hdr", "-meta", "-dwb"] {
287            let mut p = path.to_path_buf().into_os_string();
288            p.push(suffix);
289            let _ = std::fs::remove_file(&p);
290        }
291    }
292
293    fn fresh_pager() -> (Pager, PathBuf) {
294        let path = temp_db_path();
295        cleanup(&path);
296        let pager = Pager::open_default(&path).unwrap();
297        (pager, path)
298    }
299
300    #[test]
301    fn inline_raw_round_trip_below_threshold() {
302        let (pager, path) = fresh_pager();
303        let value = vec![0xABu8; OVERFLOW_THRESHOLD - 1];
304        let stored = encode(&pager, &value).unwrap();
305        assert_eq!(stored[0], 0, "inline raw flag must be zero");
306        assert_eq!(stored.len(), 1 + value.len());
307        let decoded = decode(&pager, &stored).unwrap();
308        assert_eq!(decoded, value);
309        cleanup(&path);
310    }
311
312    #[test]
313    fn inline_raw_at_exact_threshold() {
314        let (pager, path) = fresh_pager();
315        let value = vec![0x7Eu8; OVERFLOW_THRESHOLD];
316        let stored = encode(&pager, &value).unwrap();
317        assert_eq!(stored[0], 0);
318        assert_eq!(decode(&pager, &stored).unwrap(), value);
319        cleanup(&path);
320    }
321
322    #[test]
323    fn compressible_above_threshold_inlines_compressed() {
324        let (pager, path) = fresh_pager();
325        // ~44 KB of repeating text — compresses into a few hundred bytes
326        // which fit inline.
327        let value = "the quick brown fox jumps over the lazy dog\n"
328            .repeat(1024)
329            .into_bytes();
330        assert!(value.len() > OVERFLOW_THRESHOLD);
331        let stored = encode(&pager, &value).unwrap();
332        assert_eq!(
333            stored[0], FLAG_COMPRESSED,
334            "highly repetitive payload must inline compressed"
335        );
336        assert!(
337            stored.len() <= 1 + OVERFLOW_THRESHOLD,
338            "compressed cell must fit inline budget"
339        );
340        let decoded = decode(&pager, &stored).unwrap();
341        assert_eq!(decoded, value);
342        cleanup(&path);
343    }
344
345    #[test]
346    fn incompressible_above_threshold_spills_raw() {
347        let (pager, path) = fresh_pager();
348        // Pseudo-random bytes — incompressible.
349        let mut state: u64 = 0x1234_5678_9ABC_DEF0;
350        let value: Vec<u8> = (0..5 * 1024 * 1024)
351            .map(|_| {
352                state ^= state << 13;
353                state ^= state >> 7;
354                state ^= state << 17;
355                state as u8
356            })
357            .collect();
358        let stored = encode(&pager, &value).unwrap();
359        assert_eq!(
360            stored[0] & FLAG_POINTER,
361            FLAG_POINTER,
362            "incompressible spill must set pointer flag"
363        );
364        assert_eq!(
365            stored[0] & FLAG_COMPRESSED,
366            0,
367            "incompressible payload must not claim to be compressed"
368        );
369        assert_eq!(stored.len(), POINTER_CELL_LEN);
370        let decoded = decode(&pager, &stored).unwrap();
371        assert_eq!(decoded.len(), value.len());
372        assert_eq!(decoded, value);
373        cleanup(&path);
374    }
375
376    #[test]
377    fn value_above_max_rejected_without_allocation() {
378        let (pager, path) = fresh_pager();
379        let before = pager.page_count().unwrap();
380        let value = vec![0u8; MAX_VALUE_SIZE + 1];
381        let err = encode(&pager, &value).unwrap_err();
382        assert!(matches!(err, ValueLayoutError::ValueTooLarge(_)));
383        let after = pager.page_count().unwrap();
384        assert_eq!(before, after, "rejected value must not allocate pages");
385        cleanup(&path);
386    }
387
388    #[test]
389    fn decode_rejects_reserved_flag_bits() {
390        let (pager, path) = fresh_pager();
391        let stored = vec![0b0000_0100u8];
392        let err = decode(&pager, &stored).unwrap_err();
393        assert!(matches!(err, ValueLayoutError::UnknownFlag(_)));
394        cleanup(&path);
395    }
396
397    #[test]
398    fn pointer_head_extracts_head_id_only_for_pointer_cells() {
399        // Inline raw (flag 0x00) → None.
400        assert_eq!(pointer_head(&[0x00, 1, 2, 3]), None);
401        // Inline compressed (flag 0x02) → None.
402        assert_eq!(pointer_head(&[0x02, 0, 0, 0, 5, 0xff, 0xfe]), None);
403        // Empty stored bytes → None.
404        assert_eq!(pointer_head(&[]), None);
405        // Reserved bits set → None (callers must not free anything they
406        // cannot interpret).
407        assert_eq!(pointer_head(&[0b0000_0100]), None);
408        // Pointer raw with head id 0x01020304 and total_len 0 → Some(...)
409        let mut cell = vec![FLAG_POINTER];
410        cell.extend_from_slice(&0x0102_0304u32.to_le_bytes());
411        cell.extend_from_slice(&0u64.to_le_bytes());
412        assert_eq!(pointer_head(&cell), Some(0x0102_0304));
413        // Pointer compressed flag also yields the same head.
414        cell[0] = FLAG_POINTER | FLAG_COMPRESSED;
415        assert_eq!(pointer_head(&cell), Some(0x0102_0304));
416        // Pointer flag but payload truncated → None (no UB).
417        assert_eq!(pointer_head(&[FLAG_POINTER, 1, 2]), None);
418    }
419
420    #[test]
421    fn empty_value_round_trips() {
422        let (pager, path) = fresh_pager();
423        let stored = encode(&pager, &[]).unwrap();
424        assert_eq!(stored, vec![0u8]);
425        assert!(decode(&pager, &stored).unwrap().is_empty());
426        cleanup(&path);
427    }
428}