Skip to main content

obj_core/btree/
node.rs

1//! B+tree node encode / decode.
2//!
3//! See `docs/format.md` § B+tree for the on-disk layout. This module
4//! is the reference implementation; the tests at the bottom validate
5//! round-trip equality and rejection of every documented invariant
6//! violation.
7//!
8//! # Power-of-ten posture
9//!
10//! - **Rule 2.** Every loop iterates over `key_count`, the slot
11//!   directory length, or the page-byte range — all bounded by
12//!   `PAGE_SIZE`.
13//! - **Rule 3.** The in-memory [`DecodedNode`] uses `Vec`s, not
14//!   `heapless::Vec`s of the worst-case slot count: the worst-case
15//!   slot count times the worst-case per-key buffer (~1 MiB) would
16//!   overflow a 2 MiB stack. The B+tree's *traversal stack* uses
17//!   `heapless::Vec` to satisfy Rule 3 on the hot read path; the
18//!   *decoded representation* of a single node is a transient
19//!   per-page artifact whose heap allocation is unavoidable and is
20//!   not on a real-time path.
21//! - **Rule 5.** `validate_node` is the central invariant check; it
22//!   is `debug_assert!`-ed at every mutation site and surfaces as
23//!   `Error::Corruption` on decode of a malformed page.
24//! - **Rule 7.** No `unwrap` / `expect` on any error-bearing path;
25//!   length-prefixed varint reads return `Error::Corruption` on
26//!   underflow.
27
28#![forbid(unsafe_code)]
29
30use crate::error::{Error, Result};
31use crate::pager::page::{Page, PAGE_SIZE, PAGE_TRAILER_SIZE};
32
33/// Page-type tag for an internal B+tree node. See `docs/format.md`
34/// § Page types.
35pub const PAGE_TYPE_BTREE_INTERNAL: u8 = 0x02;
36/// Page-type tag for a leaf B+tree node.
37pub const PAGE_TYPE_BTREE_LEAF: u8 = 0x03;
38
39/// Fixed-size B+tree node header. See `docs/format.md` § Node header.
40pub const NODE_HEADER_SIZE: usize = 24;
41
42// Field offsets within the node header.
43const OFF_PAGE_TYPE: usize = 0;
44const OFF_LEVEL: usize = 1;
45const OFF_KEY_COUNT: usize = 2;
46const OFF_FREE_SPACE_OFF: usize = 4;
47const OFF_NEXT_SIBLING: usize = 8;
48const OFF_RESERVED: usize = 16;
49
50/// Bytes per slot in a leaf slot directory. The entry is a single
51/// u32 LE giving the byte offset of the corresponding heap entry.
52pub const LEAF_SLOT_BYTES: usize = 4;
53
54/// Bytes per slot in an internal slot directory. The entry is a u32
55/// LE offset followed by a u64 LE right-child page-id.
56pub const INTERNAL_SLOT_BYTES: usize = 12;
57
58/// Bytes the leftmost-child page-id occupies in an internal node,
59/// immediately after the node header.
60pub const INTERNAL_LEFTMOST_CHILD_BYTES: usize = 8;
61
62/// Byte offset of the first byte after the node header (leaf slot
63/// directory start, OR internal leftmost-child).
64const PAYLOAD_OFFSET: usize = NODE_HEADER_SIZE;
65
66/// Last byte index (exclusive) of the in-node payload. Beyond this
67/// lies the page trailer.
68const PAYLOAD_END: usize = PAGE_SIZE - PAGE_TRAILER_SIZE;
69
70/// Maximum number of slots a leaf can hold at the worst-case empty
71/// key/value sizes. Used to size the in-memory slot vector.
72pub const LEAF_SLOT_CAP: usize = (PAYLOAD_END - PAYLOAD_OFFSET) / LEAF_SLOT_BYTES;
73
74/// Maximum number of slots an internal node can hold at the worst
75/// case. Used to size the in-memory slot vector.
76pub const INTERNAL_SLOT_CAP: usize =
77    (PAYLOAD_END - PAYLOAD_OFFSET - INTERNAL_LEFTMOST_CHILD_BYTES) / INTERNAL_SLOT_BYTES;
78
79/// Maximum key length the format permits. See `docs/format.md`
80/// § Key and value encoding.
81#[must_use]
82pub const fn max_key_len() -> usize {
83    PAGE_SIZE / 4
84}
85
86/// Maximum value length that still fits inline in a leaf alongside
87/// at least one slot. See `docs/format.md` § Key and value encoding.
88#[must_use]
89pub const fn max_inline_value(key_len: usize) -> usize {
90    // Available payload: PAGE_SIZE - trailer - node header - one slot
91    // dir entry. Minus the heap entry's two varints.
92    let base = PAYLOAD_END - PAYLOAD_OFFSET - LEAF_SLOT_BYTES;
93    // 9 bytes max per LEB128 varint at 64-bit; we use two of them.
94    let varints = 9 + 9;
95    if key_len + varints >= base {
96        0
97    } else {
98        base - key_len - varints
99    }
100}
101
102/// Whether a node is an internal node or a leaf.
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104pub enum NodeKind {
105    /// Internal node — pivots + child pointers, no user values.
106    Internal,
107    /// Leaf node — `(key, value)` slots.
108    Leaf,
109}
110
111/// One leaf slot in the decoded in-memory representation.
112#[derive(Debug, Clone)]
113pub struct LeafEntry {
114    /// The key bytes.
115    pub key: Vec<u8>,
116    /// The value bytes.
117    pub value: Vec<u8>,
118}
119
120/// One internal-node pivot entry in the decoded in-memory
121/// representation. The corresponding right child is stored in
122/// [`DecodedNode::children`] at index `i + 1`.
123#[derive(Debug, Clone)]
124pub struct InternalEntry {
125    /// The pivot key.
126    pub key: Vec<u8>,
127}
128
129/// Alias for "one of the right-child page-ids in an internal node".
130/// We store children as raw `u64`s; the on-disk format permits `0`
131/// only as the leaf `next_sibling` sentinel, never as an internal
132/// child pointer.
133pub type ChildEntry = u64;
134
135/// Decoded in-memory representation of a B+tree node.
136///
137/// One [`DecodedNode`] is built per `Pager::read_page` of a B+tree
138/// page; lifetime is "until the caller is done inspecting the
139/// node." See the module-level Rule 3 note for why we use `Vec`
140/// rather than `heapless::Vec` for the slot collections.
141#[derive(Debug, Clone)]
142pub struct DecodedNode {
143    /// Node kind (internal or leaf).
144    pub kind: NodeKind,
145    /// Tree level. Leaves are level 0; internals are level ≥ 1.
146    pub level: u8,
147    /// Right-sibling leaf page-id (leaves only; `0` on the last leaf
148    /// and on internal nodes).
149    pub next_sibling: u64,
150    /// Child page-ids (internal nodes only). Length is
151    /// `internals.len() + 1` when the node is internal, `0` for
152    /// leaves.
153    pub children: Vec<ChildEntry>,
154    /// Leaf slots (leaf nodes only).
155    pub leaves: Vec<LeafEntry>,
156    /// Internal-node pivots (internal nodes only).
157    pub internals: Vec<InternalEntry>,
158}
159
160impl DecodedNode {
161    /// Number of bytes the slot directory + key/value heap currently
162    /// occupy in a freshly-encoded page. Used by the insert path to
163    /// decide whether a node has room for one more entry.
164    #[must_use]
165    pub fn occupied_bytes(&self) -> usize {
166        match self.kind {
167            NodeKind::Leaf => {
168                let slot_bytes = self.leaves.len() * LEAF_SLOT_BYTES;
169                let heap_bytes: usize = self
170                    .leaves
171                    .iter()
172                    .map(|e| {
173                        varint_len(u64_from_usize(e.key.len()))
174                            + e.key.len()
175                            + varint_len(u64_from_usize(e.value.len()))
176                            + e.value.len()
177                    })
178                    .sum();
179                slot_bytes + heap_bytes
180            }
181            NodeKind::Internal => {
182                let slot_bytes = self.internals.len() * INTERNAL_SLOT_BYTES;
183                let heap_bytes: usize = self
184                    .internals
185                    .iter()
186                    .map(|e| varint_len(u64_from_usize(e.key.len())) + e.key.len())
187                    .sum();
188                INTERNAL_LEFTMOST_CHILD_BYTES + slot_bytes + heap_bytes
189            }
190        }
191    }
192
193    /// Available payload bytes a fresh encode would still have.
194    #[must_use]
195    pub fn free_bytes(&self) -> usize {
196        let cap = PAYLOAD_END - PAYLOAD_OFFSET;
197        cap.saturating_sub(self.occupied_bytes())
198    }
199}
200
201/// Encode `node` into `page`. Stamps the page trailer.
202///
203/// # Errors
204///
205/// Returns [`Error::BTreeInvariantViolated`] if the node's slot count
206/// or key/value sizes do not fit in a single page.
207pub fn encode_node(node: &DecodedNode, page: &mut Page) -> Result<()> {
208    validate_node_release(node)?;
209    let buf = page.as_bytes_mut();
210    buf.fill(0);
211    write_node_header(buf, node);
212    match node.kind {
213        NodeKind::Leaf => encode_leaf_body(buf, node)?,
214        NodeKind::Internal => encode_internal_body(buf, node)?,
215    }
216    // Stamp the trailer using the pager helper so the page is
217    // immediately readable through `Pager::write_page` (which leaves
218    // the trailer alone — it expects a stamped page).
219    crate::pager::checksum::write_page_trailer(page);
220    Ok(())
221}
222
223fn write_node_header(buf: &mut [u8; PAGE_SIZE], node: &DecodedNode) {
224    let page_type = match node.kind {
225        NodeKind::Leaf => PAGE_TYPE_BTREE_LEAF,
226        NodeKind::Internal => PAGE_TYPE_BTREE_INTERNAL,
227    };
228    buf[OFF_PAGE_TYPE] = page_type;
229    buf[OFF_LEVEL] = node.level;
230    let key_count = match node.kind {
231        NodeKind::Leaf => node.leaves.len(),
232        NodeKind::Internal => node.internals.len(),
233    };
234    // Bounded by LEAF_SLOT_CAP / INTERNAL_SLOT_CAP at encode time; the
235    // narrowing cast is safe because both caps fit in u16 (LEAF_SLOT_CAP
236    // = (4092 - 24) / 4 = 1017 < 65535).
237    let kc16 = u16_from_usize(key_count);
238    buf[OFF_KEY_COUNT..OFF_KEY_COUNT + 2].copy_from_slice(&kc16.to_le_bytes());
239    // `free_space_off` is the heap watermark — the byte offset of
240    // the first heap entry currently in use. After a compacting
241    // encode this is `PAYLOAD_END - heap_bytes`.
242    let heap_bytes: usize = match node.kind {
243        NodeKind::Leaf => node
244            .leaves
245            .iter()
246            .map(|e| {
247                varint_len(u64_from_usize(e.key.len()))
248                    + e.key.len()
249                    + varint_len(u64_from_usize(e.value.len()))
250                    + e.value.len()
251            })
252            .sum(),
253        NodeKind::Internal => node
254            .internals
255            .iter()
256            .map(|e| varint_len(u64_from_usize(e.key.len())) + e.key.len())
257            .sum(),
258    };
259    let fso = u32_from_usize(PAYLOAD_END.saturating_sub(heap_bytes));
260    buf[OFF_FREE_SPACE_OFF..OFF_FREE_SPACE_OFF + 4].copy_from_slice(&fso.to_le_bytes());
261    buf[OFF_NEXT_SIBLING..OFF_NEXT_SIBLING + 8].copy_from_slice(&node.next_sibling.to_le_bytes());
262    // OFF_RESERVED (16..24) stays zero per the spec.
263    let _ = OFF_RESERVED;
264}
265
266fn encode_leaf_body(buf: &mut [u8; PAGE_SIZE], node: &DecodedNode) -> Result<()> {
267    let mut heap_cursor = PAYLOAD_END;
268    let mut slot_off = PAYLOAD_OFFSET;
269    for entry in &node.leaves {
270        let entry_len = varint_len(u64_from_usize(entry.key.len()))
271            + entry.key.len()
272            + varint_len(u64_from_usize(entry.value.len()))
273            + entry.value.len();
274        if heap_cursor < entry_len + slot_off + LEAF_SLOT_BYTES {
275            return Err(Error::BTreeInvariantViolated {
276                reason: "leaf encode: slot dir and heap collide",
277            });
278        }
279        heap_cursor -= entry_len;
280        let mut cur = heap_cursor;
281        let kl = u64_from_usize(entry.key.len());
282        cur += write_varint(&mut buf[cur..], kl);
283        buf[cur..cur + entry.key.len()].copy_from_slice(&entry.key);
284        cur += entry.key.len();
285        let vl = u64_from_usize(entry.value.len());
286        cur += write_varint(&mut buf[cur..], vl);
287        buf[cur..cur + entry.value.len()].copy_from_slice(&entry.value);
288        let off32 = u32_from_usize(heap_cursor);
289        buf[slot_off..slot_off + LEAF_SLOT_BYTES].copy_from_slice(&off32.to_le_bytes());
290        slot_off += LEAF_SLOT_BYTES;
291    }
292    Ok(())
293}
294
295fn encode_internal_body(buf: &mut [u8; PAGE_SIZE], node: &DecodedNode) -> Result<()> {
296    if node.children.len() != node.internals.len() + 1 {
297        return Err(Error::BTreeInvariantViolated {
298            reason: "internal node has children.len() != pivots+1",
299        });
300    }
301    let leftmost = node.children[0];
302    buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + INTERNAL_LEFTMOST_CHILD_BYTES]
303        .copy_from_slice(&leftmost.to_le_bytes());
304    let mut heap_cursor = PAYLOAD_END;
305    let mut slot_off = PAYLOAD_OFFSET + INTERNAL_LEFTMOST_CHILD_BYTES;
306    for (i, pivot) in node.internals.iter().enumerate() {
307        let right_child = node.children[i + 1];
308        let entry_len = varint_len(u64_from_usize(pivot.key.len())) + pivot.key.len();
309        if heap_cursor < entry_len + slot_off + INTERNAL_SLOT_BYTES {
310            return Err(Error::BTreeInvariantViolated {
311                reason: "internal encode: slot dir and heap collide",
312            });
313        }
314        heap_cursor -= entry_len;
315        let mut cur = heap_cursor;
316        cur += write_varint(&mut buf[cur..], u64_from_usize(pivot.key.len()));
317        buf[cur..cur + pivot.key.len()].copy_from_slice(&pivot.key);
318        let off32 = u32_from_usize(heap_cursor);
319        buf[slot_off..slot_off + 4].copy_from_slice(&off32.to_le_bytes());
320        buf[slot_off + 4..slot_off + INTERNAL_SLOT_BYTES]
321            .copy_from_slice(&right_child.to_le_bytes());
322        slot_off += INTERNAL_SLOT_BYTES;
323    }
324    Ok(())
325}
326
327/// Decode a B+tree page into a [`DecodedNode`].
328///
329/// # Errors
330///
331/// Returns [`Error::Corruption`] if any field is malformed: bad
332/// page-type tag, key-count exceeding the slot cap, slot offset
333/// outside the page, varint length running past the page end, or
334/// keys not in ascending order.
335pub fn decode_node(buf: &[u8; PAGE_SIZE]) -> Result<DecodedNode> {
336    let page_type = buf[OFF_PAGE_TYPE];
337    let kind = match page_type {
338        PAGE_TYPE_BTREE_LEAF => NodeKind::Leaf,
339        PAGE_TYPE_BTREE_INTERNAL => NodeKind::Internal,
340        _ => return Err(Error::Corruption { page_id: 0 }),
341    };
342    let level = buf[OFF_LEVEL];
343    let key_count = u16::from_le_bytes([buf[OFF_KEY_COUNT], buf[OFF_KEY_COUNT + 1]]) as usize;
344    let next_sibling = u64::from_le_bytes([
345        buf[OFF_NEXT_SIBLING],
346        buf[OFF_NEXT_SIBLING + 1],
347        buf[OFF_NEXT_SIBLING + 2],
348        buf[OFF_NEXT_SIBLING + 3],
349        buf[OFF_NEXT_SIBLING + 4],
350        buf[OFF_NEXT_SIBLING + 5],
351        buf[OFF_NEXT_SIBLING + 6],
352        buf[OFF_NEXT_SIBLING + 7],
353    ]);
354    let mut node = DecodedNode {
355        kind,
356        level,
357        next_sibling,
358        children: Vec::new(),
359        leaves: Vec::new(),
360        internals: Vec::new(),
361    };
362    match kind {
363        NodeKind::Leaf => decode_leaf_body(buf, &mut node, key_count)?,
364        NodeKind::Internal => decode_internal_body(buf, &mut node, key_count)?,
365    }
366    validate_node_release(&node)?;
367    Ok(node)
368}
369
370fn decode_leaf_body(buf: &[u8; PAGE_SIZE], node: &mut DecodedNode, key_count: usize) -> Result<()> {
371    if key_count > LEAF_SLOT_CAP {
372        return Err(Error::Corruption { page_id: 0 });
373    }
374    node.leaves.reserve(key_count);
375    let mut prev_key: Option<Vec<u8>> = None;
376    for i in 0..key_count {
377        let slot_off = PAYLOAD_OFFSET + i * LEAF_SLOT_BYTES;
378        if slot_off + LEAF_SLOT_BYTES > PAYLOAD_END {
379            return Err(Error::Corruption { page_id: 0 });
380        }
381        let entry_off = u32::from_le_bytes([
382            buf[slot_off],
383            buf[slot_off + 1],
384            buf[slot_off + 2],
385            buf[slot_off + 3],
386        ]) as usize;
387        if !(PAYLOAD_OFFSET..PAYLOAD_END).contains(&entry_off) {
388            return Err(Error::Corruption { page_id: 0 });
389        }
390        let (key, after_key) = read_length_prefixed(buf, entry_off)?;
391        let (value, _) = read_length_prefixed(buf, after_key)?;
392        if key.len() > max_key_len() {
393            return Err(Error::Corruption { page_id: 0 });
394        }
395        let key_vec = key.to_vec();
396        if let Some(prev) = &prev_key {
397            if key_vec.as_slice() <= prev.as_slice() {
398                return Err(Error::Corruption { page_id: 0 });
399            }
400        }
401        prev_key = Some(key_vec.clone());
402        node.leaves.push(LeafEntry {
403            key: key_vec,
404            value: value.to_vec(),
405        });
406    }
407    Ok(())
408}
409
410fn decode_internal_body(
411    buf: &[u8; PAGE_SIZE],
412    node: &mut DecodedNode,
413    key_count: usize,
414) -> Result<()> {
415    if key_count > INTERNAL_SLOT_CAP {
416        return Err(Error::Corruption { page_id: 0 });
417    }
418    let leftmost = read_u64(buf, PAYLOAD_OFFSET);
419    if leftmost == 0 {
420        return Err(Error::Corruption { page_id: 0 });
421    }
422    node.children.reserve(key_count + 1);
423    node.internals.reserve(key_count);
424    node.children.push(leftmost);
425    let mut prev_key: Option<Vec<u8>> = None;
426    for i in 0..key_count {
427        let (key_vec, right_child) = decode_internal_slot(buf, i)?;
428        if let Some(prev) = &prev_key {
429            if key_vec.as_slice() <= prev.as_slice() {
430                return Err(Error::Corruption { page_id: 0 });
431            }
432        }
433        prev_key = Some(key_vec.clone());
434        node.internals.push(InternalEntry { key: key_vec });
435        node.children.push(right_child);
436    }
437    Ok(())
438}
439
440/// Decode the i-th slot of an internal-node body. Returns the pivot
441/// key and the right-child page-id, or `Error::Corruption` if the
442/// slot is malformed.
443fn decode_internal_slot(buf: &[u8; PAGE_SIZE], i: usize) -> Result<(Vec<u8>, u64)> {
444    let slot_off = PAYLOAD_OFFSET + INTERNAL_LEFTMOST_CHILD_BYTES + i * INTERNAL_SLOT_BYTES;
445    if slot_off + INTERNAL_SLOT_BYTES > PAYLOAD_END {
446        return Err(Error::Corruption { page_id: 0 });
447    }
448    let entry_off = u32::from_le_bytes([
449        buf[slot_off],
450        buf[slot_off + 1],
451        buf[slot_off + 2],
452        buf[slot_off + 3],
453    ]) as usize;
454    let right_child = read_u64(buf, slot_off + 4);
455    if right_child == 0 || !(PAYLOAD_OFFSET..PAYLOAD_END).contains(&entry_off) {
456        return Err(Error::Corruption { page_id: 0 });
457    }
458    let (key, _) = read_length_prefixed(buf, entry_off)?;
459    if key.len() > max_key_len() {
460        return Err(Error::Corruption { page_id: 0 });
461    }
462    Ok((key.to_vec(), right_child))
463}
464
465/// Read a u64 LE from `buf` at `offset`. Panics if `offset + 8` is
466/// out of bounds; callers must validate the offset.
467fn read_u64(buf: &[u8; PAGE_SIZE], offset: usize) -> u64 {
468    u64::from_le_bytes([
469        buf[offset],
470        buf[offset + 1],
471        buf[offset + 2],
472        buf[offset + 3],
473        buf[offset + 4],
474        buf[offset + 5],
475        buf[offset + 6],
476        buf[offset + 7],
477    ])
478}
479
480/// Read a `(length-prefixed bytes, next_offset)` pair from `buf`
481/// starting at `offset`. Length is an unsigned LEB128 varint; the
482/// payload is `length` raw bytes.
483///
484/// # Power-of-ten Rule 7 posture
485///
486/// `len` is caller-controlled (it lives on disk in a page byte that
487/// can be tampered with or random in a fuzz harness). The
488/// `after + len_usize` end-of-slice computation MUST use
489/// `checked_add`: with `overflow-checks = true` in release (per
490/// Rule 10) a raw `+` panics on a wrap; without overflow-checks the
491/// bounds check is silently bypassed. Either failure mode is a
492/// denial-of-service hazard reachable from
493/// `Pager::read_page → BTree::open → decode_node` on any tampered
494/// B+tree page (M13 #115).
495fn read_length_prefixed(buf: &[u8; PAGE_SIZE], offset: usize) -> Result<(&[u8], usize)> {
496    if offset >= PAYLOAD_END {
497        return Err(Error::Corruption { page_id: 0 });
498    }
499    let (len, after) = read_varint(buf, offset)?;
500    let len_usize = usize_from_u64(len)?;
501    // Rule 7: `len_usize` is unbounded (varint domain is u64); raw
502    // `after + len_usize` wraps on tampered input.
503    let end = after
504        .checked_add(len_usize)
505        .ok_or(Error::Corruption { page_id: 0 })?;
506    if end > PAYLOAD_END {
507        return Err(Error::Corruption { page_id: 0 });
508    }
509    Ok((&buf[after..end], end))
510}
511
512/// Validate `node` against the format-version-0 invariants.
513///
514/// # Errors
515///
516/// Returns [`Error::BTreeInvariantViolated`] if any invariant fails.
517/// Callers can `debug_assert!` on a wrapping helper for development
518/// builds; release builds get the error surfaced cleanly.
519pub fn validate_node(node: &DecodedNode) -> Result<()> {
520    validate_node_release(node)
521}
522
523fn validate_node_release(node: &DecodedNode) -> Result<()> {
524    match node.kind {
525        NodeKind::Leaf => {
526            if node.level != 0 {
527                return Err(Error::BTreeInvariantViolated {
528                    reason: "leaf has non-zero level",
529                });
530            }
531            if node.leaves.len() > LEAF_SLOT_CAP {
532                return Err(Error::BTreeInvariantViolated {
533                    reason: "leaf key count exceeds slot cap",
534                });
535            }
536            for w in node.leaves.windows(2) {
537                if w[0].key.as_slice() >= w[1].key.as_slice() {
538                    return Err(Error::BTreeInvariantViolated {
539                        reason: "leaf keys not strictly sorted",
540                    });
541                }
542            }
543        }
544        NodeKind::Internal => {
545            if node.level == 0 {
546                return Err(Error::BTreeInvariantViolated {
547                    reason: "internal node at level 0",
548                });
549            }
550            if node.internals.len() > INTERNAL_SLOT_CAP {
551                return Err(Error::BTreeInvariantViolated {
552                    reason: "internal key count exceeds slot cap",
553                });
554            }
555            if node.children.len() != node.internals.len() + 1 {
556                return Err(Error::BTreeInvariantViolated {
557                    reason: "internal children.len() != pivots+1",
558                });
559            }
560            for c in &node.children {
561                if *c == 0 {
562                    return Err(Error::BTreeInvariantViolated {
563                        reason: "internal node has zero child page-id",
564                    });
565                }
566            }
567            for w in node.internals.windows(2) {
568                if w[0].key.as_slice() >= w[1].key.as_slice() {
569                    return Err(Error::BTreeInvariantViolated {
570                        reason: "internal keys not strictly sorted",
571                    });
572                }
573            }
574            if node.next_sibling != 0 {
575                return Err(Error::BTreeInvariantViolated {
576                    reason: "internal node has non-zero next_sibling",
577                });
578            }
579        }
580    }
581    Ok(())
582}
583
584// --------- varint helpers ---------
585
586/// Number of bytes an unsigned LEB128 varint would use for `v`.
587#[must_use]
588pub fn varint_len(v: u64) -> usize {
589    let mut n: usize = 1;
590    let mut x = v >> 7;
591    while x != 0 {
592        n += 1;
593        x >>= 7;
594    }
595    n
596}
597
598/// Write an unsigned LEB128 varint into `dst` starting at byte 0.
599/// Returns the number of bytes written.
600fn write_varint(dst: &mut [u8], mut v: u64) -> usize {
601    let mut i = 0;
602    loop {
603        let mut byte = (v & 0x7F) as u8;
604        v >>= 7;
605        if v != 0 {
606            byte |= 0x80;
607            dst[i] = byte;
608            i += 1;
609        } else {
610            dst[i] = byte;
611            i += 1;
612            return i;
613        }
614    }
615}
616
617/// Read an unsigned LEB128 varint from `buf` starting at `offset`.
618/// Returns `(value, next_offset)`. Power-of-ten Rule 2: the loop is
619/// bounded by 10 bytes (`ceil(64 / 7)`), which is the maximum a
620/// 64-bit varint can occupy.
621fn read_varint(buf: &[u8; PAGE_SIZE], offset: usize) -> Result<(u64, usize)> {
622    let mut value: u64 = 0;
623    let mut shift: u32 = 0;
624    let mut i = offset;
625    for _ in 0..10 {
626        if i >= PAYLOAD_END {
627            return Err(Error::Corruption { page_id: 0 });
628        }
629        let byte = buf[i];
630        i += 1;
631        let chunk = u64::from(byte & 0x7F);
632        // `shift` is in range [0..63]; the cast cannot truncate.
633        value |= chunk << shift;
634        if (byte & 0x80) == 0 {
635            return Ok((value, i));
636        }
637        shift += 7;
638        if shift >= 64 {
639            return Err(Error::Corruption { page_id: 0 });
640        }
641    }
642    Err(Error::Corruption { page_id: 0 })
643}
644
645// --------- narrow casts ---------
646
647fn u32_from_usize(v: usize) -> u32 {
648    // PAYLOAD_END < 2^32; every caller passes such a value.
649    debug_assert!(u32::try_from(v).is_ok());
650    // Saturating downcast: the asserted bound is the binding contract;
651    // a release-mode violation would still return a too-large u32, which
652    // an encoder cross-check (decode_node) would catch on read.
653    u32::try_from(v).unwrap_or(u32::MAX)
654}
655
656fn u16_from_usize(v: usize) -> u16 {
657    debug_assert!(u16::try_from(v).is_ok());
658    u16::try_from(v).unwrap_or(u16::MAX)
659}
660
661fn u64_from_usize(v: usize) -> u64 {
662    v as u64
663}
664
665fn usize_from_u64(v: u64) -> Result<usize> {
666    usize::try_from(v).map_err(|_| Error::Corruption { page_id: 0 })
667}
668
669#[cfg(test)]
670mod tests {
671    use super::*;
672
673    fn empty_leaf() -> DecodedNode {
674        DecodedNode {
675            kind: NodeKind::Leaf,
676            level: 0,
677            next_sibling: 0,
678            children: Vec::new(),
679            leaves: Vec::new(),
680            internals: Vec::new(),
681        }
682    }
683
684    fn leaf_with(entries: &[(&[u8], &[u8])]) -> DecodedNode {
685        let mut leaf = empty_leaf();
686        for (k, v) in entries {
687            leaf.leaves.push(LeafEntry {
688                key: k.to_vec(),
689                value: v.to_vec(),
690            });
691        }
692        leaf
693    }
694
695    #[test]
696    fn round_trip_empty_leaf() {
697        let leaf = empty_leaf();
698        let mut page = Page::zeroed();
699        encode_node(&leaf, &mut page).expect("encode");
700        let decoded = decode_node(page.as_bytes()).expect("decode");
701        assert_eq!(decoded.kind, NodeKind::Leaf);
702        assert_eq!(decoded.level, 0);
703        assert_eq!(decoded.next_sibling, 0);
704        assert!(decoded.leaves.is_empty());
705    }
706
707    #[test]
708    fn round_trip_populated_leaf() {
709        let leaf = leaf_with(&[(b"alpha", b"AAA"), (b"bravo", b"BBB"), (b"charlie", b"CCC")]);
710        let mut page = Page::zeroed();
711        encode_node(&leaf, &mut page).expect("encode");
712        let decoded = decode_node(page.as_bytes()).expect("decode");
713        assert_eq!(decoded.leaves.len(), 3);
714        assert_eq!(decoded.leaves[0].key.as_slice(), b"alpha");
715        assert_eq!(decoded.leaves[0].value.as_slice(), b"AAA");
716        assert_eq!(decoded.leaves[2].key.as_slice(), b"charlie");
717    }
718
719    #[test]
720    fn round_trip_internal() {
721        let mut internal = DecodedNode {
722            kind: NodeKind::Internal,
723            level: 1,
724            next_sibling: 0,
725            children: Vec::new(),
726            leaves: Vec::new(),
727            internals: Vec::new(),
728        };
729        // 3 pivots → 4 children.
730        for raw in [10u64, 20, 30, 40] {
731            internal.children.push(raw);
732        }
733        for k in [b"d".as_slice(), b"h", b"m"] {
734            internal.internals.push(InternalEntry { key: k.to_vec() });
735        }
736        let mut page = Page::zeroed();
737        encode_node(&internal, &mut page).expect("encode");
738        let decoded = decode_node(page.as_bytes()).expect("decode");
739        assert_eq!(decoded.kind, NodeKind::Internal);
740        assert_eq!(decoded.level, 1);
741        assert_eq!(decoded.internals.len(), 3);
742        assert_eq!(decoded.children.as_slice(), &[10, 20, 30, 40]);
743        assert_eq!(decoded.internals[0].key.as_slice(), b"d");
744        assert_eq!(decoded.internals[2].key.as_slice(), b"m");
745    }
746
747    #[test]
748    fn decode_rejects_unsorted_leaf() {
749        // Encode a deliberately-malformed leaf by hand: write two
750        // out-of-order entries and check decode errors.
751        let mut page = Page::zeroed();
752        let buf = page.as_bytes_mut();
753        buf[OFF_PAGE_TYPE] = PAGE_TYPE_BTREE_LEAF;
754        buf[OFF_LEVEL] = 0;
755        buf[OFF_KEY_COUNT..OFF_KEY_COUNT + 2].copy_from_slice(&2u16.to_le_bytes());
756        // Slot 0 at PAYLOAD_END - 5  ("c" + "x").
757        // Slot 1 at PAYLOAD_END - 10 ("a" + "y").
758        let slot0 = u32_from_usize(PAYLOAD_END - 4);
759        let slot1 = u32_from_usize(PAYLOAD_END - 8);
760        buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + 4].copy_from_slice(&slot0.to_le_bytes());
761        buf[PAYLOAD_OFFSET + 4..PAYLOAD_OFFSET + 8].copy_from_slice(&slot1.to_le_bytes());
762        // Heap entry for slot 0: key="c", value="x"  -> 1,'c',1,'x'.
763        buf[PAYLOAD_END - 4] = 1;
764        buf[PAYLOAD_END - 3] = b'c';
765        buf[PAYLOAD_END - 2] = 1;
766        buf[PAYLOAD_END - 1] = b'x';
767        // Heap entry for slot 1: key="a", value="y"  -> 1,'a',1,'y'.
768        buf[PAYLOAD_END - 8] = 1;
769        buf[PAYLOAD_END - 7] = b'a';
770        buf[PAYLOAD_END - 6] = 1;
771        buf[PAYLOAD_END - 5] = b'y';
772        crate::pager::checksum::write_page_trailer(&mut page);
773        let err = decode_node(page.as_bytes()).expect_err("decode must reject");
774        assert!(matches!(err, Error::Corruption { .. }));
775    }
776
777    #[test]
778    fn varint_round_trip() {
779        for v in [0u64, 1, 127, 128, 0xFFFF, 0xDEAD_BEEF, u64::MAX] {
780            let mut buf = [0u8; 16];
781            let n = write_varint(&mut buf, v);
782            assert_eq!(n, varint_len(v));
783            // Read back using a faux PAGE_SIZE buffer.
784            let mut page = [0u8; PAGE_SIZE];
785            page[..n].copy_from_slice(&buf[..n]);
786            let (decoded, after) = read_varint(&page, 0).expect("decode varint");
787            assert_eq!(decoded, v);
788            assert_eq!(after, n);
789        }
790    }
791
792    /// M13 #115 regression: a tampered B+tree leaf carrying a
793    /// length-prefix varint that encodes `u64::MAX` must return
794    /// `Error::Corruption`, NOT panic in `read_length_prefixed` on
795    /// the `after + len_usize` overflow.
796    ///
797    /// Critical: this test must also pass under `cargo test --release`
798    /// — the workspace profile enables `overflow-checks = true` per
799    /// power-of-ten Rule 10, which is the mechanism that turned the
800    /// arithmetic wrap into a panic in the first place. A `checked_add`
801    /// fix is the only thing that makes both debug and release happy.
802    #[test]
803    fn decode_node_varint_max_returns_corruption_not_panic() {
804        let mut page = Page::zeroed();
805        {
806            let buf = page.as_bytes_mut();
807            buf[OFF_PAGE_TYPE] = PAGE_TYPE_BTREE_LEAF;
808            buf[OFF_LEVEL] = 0;
809            // One slot. The varint at the slot's entry-offset will
810            // encode `u64::MAX`, which `read_length_prefixed` must
811            // reject rather than wrap.
812            buf[OFF_KEY_COUNT..OFF_KEY_COUNT + 2].copy_from_slice(&1u16.to_le_bytes());
813            // Place the heap entry near the end of the payload. The
814            // bug triggers BEFORE the bounds check, so the absolute
815            // offset value only matters to the extent that it points
816            // somewhere parseable.
817            let entry_off = PAYLOAD_END - 16;
818            let slot_off_bytes = u32_from_usize(entry_off).to_le_bytes();
819            buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + 4].copy_from_slice(&slot_off_bytes);
820            // 10-byte LEB128 encoding of `u64::MAX`. The 10th byte's
821            // top bit is clear so `read_varint` accepts it; the
822            // decoded length is `u64::MAX`, which `as usize` becomes
823            // `usize::MAX` on 64-bit and triggers the `after + len`
824            // overflow in `read_length_prefixed`.
825            let varint = [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01];
826            buf[entry_off..entry_off + varint.len()].copy_from_slice(&varint);
827        }
828        crate::pager::checksum::write_page_trailer(&mut page);
829        let result = decode_node(page.as_bytes());
830        match result {
831            Err(Error::Corruption { .. }) => {}
832            other => panic!(
833                "expected Err(Error::Corruption), got {:?} \
834                 (a panic here would indicate the M13 #115 bug \
835                  is still present in release builds)",
836                other.map(|_| "Ok(_)").unwrap_or("non-corruption err")
837            ),
838        }
839    }
840
841    /// M13 #115 regression for the internal-node variant: a tampered
842    /// internal-node entry pointing at a varint that encodes
843    /// `u64::MAX` must surface as `Error::Corruption`, not panic via
844    /// `decode_internal_slot → read_length_prefixed`.
845    #[test]
846    fn decode_internal_node_varint_max_returns_corruption() {
847        let mut page = Page::zeroed();
848        {
849            let buf = page.as_bytes_mut();
850            buf[OFF_PAGE_TYPE] = PAGE_TYPE_BTREE_INTERNAL;
851            buf[OFF_LEVEL] = 1;
852            buf[OFF_KEY_COUNT..OFF_KEY_COUNT + 2].copy_from_slice(&1u16.to_le_bytes());
853            // Leftmost child page-id — must be non-zero.
854            buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + 8].copy_from_slice(&42u64.to_le_bytes());
855            // One internal slot: u32 entry_off + u64 right_child.
856            let slot_base = PAYLOAD_OFFSET + INTERNAL_LEFTMOST_CHILD_BYTES;
857            let entry_off = PAYLOAD_END - 16;
858            buf[slot_base..slot_base + 4].copy_from_slice(&u32_from_usize(entry_off).to_le_bytes());
859            buf[slot_base + 4..slot_base + INTERNAL_SLOT_BYTES]
860                .copy_from_slice(&7u64.to_le_bytes());
861            // Same `u64::MAX` varint as the leaf test above.
862            let varint = [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01];
863            buf[entry_off..entry_off + varint.len()].copy_from_slice(&varint);
864        }
865        crate::pager::checksum::write_page_trailer(&mut page);
866        let result = decode_node(page.as_bytes());
867        assert!(
868            matches!(result, Err(Error::Corruption { .. })),
869            "expected Err(Error::Corruption) on u64::MAX varint in internal slot"
870        );
871    }
872
873    #[test]
874    fn max_inline_value_is_below_payload() {
875        let k = 8;
876        let v = max_inline_value(k);
877        // Compute slot+heap bytes for one entry at this max value
878        // length and assert it fits.
879        let entry_len = varint_len(u64_from_usize(k)) + k + varint_len(u64_from_usize(v)) + v;
880        let payload = entry_len + LEAF_SLOT_BYTES;
881        assert!(payload <= PAYLOAD_END - PAYLOAD_OFFSET);
882    }
883}