Skip to main content

obj_core/btree/
node.rs

1//! B+tree node encode / decode.
2//!
3//! See `docs/format.md` § B+tree for the on-disk layout. This module
4//! is the reference implementation; the tests at the bottom validate
5//! round-trip equality and rejection of every documented invariant
6//! violation.
7//!
8//! # Power-of-ten posture
9//!
10//! - **Rule 2.** Every loop iterates over `key_count`, the slot
11//!   directory length, or the page-byte range — all bounded by
12//!   `PAGE_SIZE`.
13//! - **Rule 3.** The in-memory [`DecodedNode`] uses `Vec`s, not
14//!   `heapless::Vec`s of the worst-case slot count: the worst-case
15//!   slot count times the worst-case per-key buffer (~1 MiB) would
16//!   overflow a 2 MiB stack. The B+tree's *traversal stack* uses
17//!   `heapless::Vec` to satisfy Rule 3 on the hot read path; the
18//!   *decoded representation* of a single node is a transient
19//!   per-page artifact whose heap allocation is unavoidable and is
20//!   not on a real-time path.
21//!
22//!   Phase 7A perf-audit note (extended). The initial Power-of-Ten
23//!   audit flagged the three `Vec`s in [`DecodedNode`] (`children`,
24//!   `leaves`, `internals`) as a hot-path allocation cost. Inspection
25//!   shows the actual cost is far smaller than it looks and a
26//!   `heapless`-conversion is not worth shipping. Four reasons:
27//!   (1) `Vec::new()` does not allocate — the first `reserve` does —
28//!   so a leaf decode allocates exactly ONE outer `Vec` (the `leaves`
29//!   spine) and an internal decode allocates TWO (`children` +
30//!   `internals`); the unused vecs stay at zero capacity.
31//!   (2) Each leaf entry's `key` and `value` are themselves `Vec<u8>`;
32//!   for an N-entry leaf that's 2N inner allocations vs. the 1
33//!   outer, so converting the outer saves at best 1/(2N+1) on a real
34//!   workload (~0.5 % on a 100-entry leaf).
35//!   (3) `LEAF_SLOT_CAP ≈ 1017` and `sizeof(LeafEntry) = 48` — a
36//!   stack-allocated `heapless::Vec<LeafEntry, LEAF_SLOT_CAP>` adds
37//!   ~48 KiB to every [`decode_node`] frame, a stack-overflow hazard
38//!   on small-stack targets (embedded, fiber runtimes) and a 48 KiB
39//!   zero-write per call under debug + stack-protectors. The
40//!   conversion makes the hot path *worse* in latency-sensitive
41//!   deployments.
42//!   (4) `index_lookup` (warm-cache point read) spends most of its
43//!   time in `pager.read_page` (page-cache hash lookup), CRC32C
44//!   verification of the page trailer, and the descend stack — not
45//!   in `Vec::reserve`. A 100k-leaf `collection_scan` does ~100 000
46//!   outer allocs vs. ~50 000 000 inner allocs (~500 bytes/doc with
47//!   2 inner `Vec`s per entry); the outer count is a noise floor.
48//!
49//!   The conclusion: the outer `Vec`s stay. A real perf win for
50//!   `decode_node` requires attacking the inner-vec allocations
51//!   (e.g. decode-in-place with `&[u8]` slices into the page buffer,
52//!   borrowing the pager's page-cache buffer for the iterator's
53//!   lifetime), which is a format-aware refactor of the node API
54//!   and is out of scope for Phase 7 polish.
55//! - **Rule 5.** `validate_node` is the central invariant check; it
56//!   is `debug_assert!`-ed at every mutation site and surfaces as
57//!   `Error::Corruption` on decode of a malformed page.
58//! - **Rule 7.** No `unwrap` / `expect` on any error-bearing path;
59//!   length-prefixed varint reads return `Error::Corruption` on
60//!   underflow.
61
62#![forbid(unsafe_code)]
63
64use crate::error::{Error, Result};
65use crate::pager::page::{Page, PAGE_SIZE, PAGE_TRAILER_SIZE};
66
67/// Page-type tag for an internal B+tree node. See `docs/format.md`
68/// § Page types.
69pub const PAGE_TYPE_BTREE_INTERNAL: u8 = 0x02;
70/// Page-type tag for a leaf B+tree node.
71pub const PAGE_TYPE_BTREE_LEAF: u8 = 0x03;
72
73/// Fixed-size B+tree node header. See `docs/format.md` § Node header.
74pub const NODE_HEADER_SIZE: usize = 24;
75
76// Field offsets within the node header.
77const OFF_PAGE_TYPE: usize = 0;
78const OFF_LEVEL: usize = 1;
79const OFF_KEY_COUNT: usize = 2;
80const OFF_FREE_SPACE_OFF: usize = 4;
81const OFF_NEXT_SIBLING: usize = 8;
82const OFF_RESERVED: usize = 16;
83
84/// Bytes per slot in a leaf slot directory. The entry is a single
85/// u32 LE giving the byte offset of the corresponding heap entry.
86pub const LEAF_SLOT_BYTES: usize = 4;
87
88/// Bytes per slot in an internal slot directory. The entry is a u32
89/// LE offset followed by a u64 LE right-child page-id.
90pub const INTERNAL_SLOT_BYTES: usize = 12;
91
92/// Bytes the leftmost-child page-id occupies in an internal node,
93/// immediately after the node header.
94pub const INTERNAL_LEFTMOST_CHILD_BYTES: usize = 8;
95
96/// Byte offset of the first byte after the node header (leaf slot
97/// directory start, OR internal leftmost-child).
98const PAYLOAD_OFFSET: usize = NODE_HEADER_SIZE;
99
100/// Last byte index (exclusive) of the in-node payload. Beyond this
101/// lies the page trailer.
102const PAYLOAD_END: usize = PAGE_SIZE - PAGE_TRAILER_SIZE;
103
104/// Maximum number of slots a leaf can hold at the worst-case empty
105/// key/value sizes. Used to size the in-memory slot vector.
106pub const LEAF_SLOT_CAP: usize = (PAYLOAD_END - PAYLOAD_OFFSET) / LEAF_SLOT_BYTES;
107
108/// Maximum number of slots an internal node can hold at the worst
109/// case. Used to size the in-memory slot vector.
110pub const INTERNAL_SLOT_CAP: usize =
111    (PAYLOAD_END - PAYLOAD_OFFSET - INTERNAL_LEFTMOST_CHILD_BYTES) / INTERNAL_SLOT_BYTES;
112
113/// Maximum key length the format permits. See `docs/format.md`
114/// § Key and value encoding.
115#[must_use]
116pub const fn max_key_len() -> usize {
117    PAGE_SIZE / 4
118}
119
120/// Maximum value length that still fits inline in a leaf alongside
121/// at least one slot. See `docs/format.md` § Key and value encoding.
122#[must_use]
123pub const fn max_inline_value(key_len: usize) -> usize {
124    // Available payload: PAGE_SIZE - trailer - node header - one slot
125    // dir entry. Minus the heap entry's two varints.
126    let base = PAYLOAD_END - PAYLOAD_OFFSET - LEAF_SLOT_BYTES;
127    // 9 bytes max per LEB128 varint at 64-bit; we use two of them.
128    let varints = 9 + 9;
129    if key_len + varints >= base {
130        0
131    } else {
132        base - key_len - varints
133    }
134}
135
136/// Whether a node is an internal node or a leaf.
137#[derive(Debug, Clone, Copy, PartialEq, Eq)]
138pub enum NodeKind {
139    /// Internal node — pivots + child pointers, no user values.
140    Internal,
141    /// Leaf node — `(key, value)` slots.
142    Leaf,
143}
144
145/// One leaf slot in the decoded in-memory representation.
146#[derive(Debug, Clone)]
147pub struct LeafEntry {
148    /// The key bytes.
149    pub key: Vec<u8>,
150    /// The value bytes.
151    pub value: Vec<u8>,
152}
153
154/// One internal-node pivot entry in the decoded in-memory
155/// representation. The corresponding right child is stored in
156/// [`DecodedNode::children`] at index `i + 1`.
157#[derive(Debug, Clone)]
158pub struct InternalEntry {
159    /// The pivot key.
160    pub key: Vec<u8>,
161}
162
163/// Alias for "one of the right-child page-ids in an internal node".
164/// We store children as raw `u64`s; the on-disk format permits `0`
165/// only as the leaf `next_sibling` sentinel, never as an internal
166/// child pointer.
167pub type ChildEntry = u64;
168
169/// Decoded in-memory representation of a B+tree node.
170///
171/// One [`DecodedNode`] is built per `Pager::read_page` of a B+tree
172/// page; lifetime is "until the caller is done inspecting the
173/// node." See the module-level Rule 3 note for why we use `Vec`
174/// rather than `heapless::Vec` for the slot collections.
175#[derive(Debug, Clone)]
176pub struct DecodedNode {
177    /// Node kind (internal or leaf).
178    pub kind: NodeKind,
179    /// Tree level. Leaves are level 0; internals are level ≥ 1.
180    pub level: u8,
181    /// Right-sibling leaf page-id (leaves only; `0` on the last leaf
182    /// and on internal nodes).
183    pub next_sibling: u64,
184    /// Child page-ids (internal nodes only). Length is
185    /// `internals.len() + 1` when the node is internal, `0` for
186    /// leaves.
187    pub children: Vec<ChildEntry>,
188    /// Leaf slots (leaf nodes only).
189    pub leaves: Vec<LeafEntry>,
190    /// Internal-node pivots (internal nodes only).
191    pub internals: Vec<InternalEntry>,
192}
193
194impl DecodedNode {
195    /// Number of bytes the slot directory + key/value heap currently
196    /// occupy in a freshly-encoded page. Used by the insert path to
197    /// decide whether a node has room for one more entry.
198    #[must_use]
199    pub fn occupied_bytes(&self) -> usize {
200        match self.kind {
201            NodeKind::Leaf => {
202                let slot_bytes = self.leaves.len() * LEAF_SLOT_BYTES;
203                let heap_bytes: usize = self
204                    .leaves
205                    .iter()
206                    .map(|e| {
207                        varint_len(u64_from_usize(e.key.len()))
208                            + e.key.len()
209                            + varint_len(u64_from_usize(e.value.len()))
210                            + e.value.len()
211                    })
212                    .sum();
213                slot_bytes + heap_bytes
214            }
215            NodeKind::Internal => {
216                let slot_bytes = self.internals.len() * INTERNAL_SLOT_BYTES;
217                let heap_bytes: usize = self
218                    .internals
219                    .iter()
220                    .map(|e| varint_len(u64_from_usize(e.key.len())) + e.key.len())
221                    .sum();
222                INTERNAL_LEFTMOST_CHILD_BYTES + slot_bytes + heap_bytes
223            }
224        }
225    }
226
227    /// Available payload bytes a fresh encode would still have.
228    #[must_use]
229    pub fn free_bytes(&self) -> usize {
230        let cap = PAYLOAD_END - PAYLOAD_OFFSET;
231        cap.saturating_sub(self.occupied_bytes())
232    }
233}
234
235/// Encode `node` into `page`. Stamps the page trailer.
236///
237/// # Errors
238///
239/// Returns [`Error::BTreeInvariantViolated`] if the node's slot count
240/// or key/value sizes do not fit in a single page.
241pub fn encode_node(node: &DecodedNode, page: &mut Page) -> Result<()> {
242    validate_node_release(node)?;
243    let buf = page.as_bytes_mut();
244    buf.fill(0);
245    write_node_header(buf, node);
246    match node.kind {
247        NodeKind::Leaf => encode_leaf_body(buf, node)?,
248        NodeKind::Internal => encode_internal_body(buf, node)?,
249    }
250    // Stamp the trailer using the pager helper so the page is
251    // immediately readable through `Pager::write_page` (which leaves
252    // the trailer alone — it expects a stamped page).
253    crate::pager::checksum::write_page_trailer(page);
254    Ok(())
255}
256
257fn write_node_header(buf: &mut [u8; PAGE_SIZE], node: &DecodedNode) {
258    let page_type = match node.kind {
259        NodeKind::Leaf => PAGE_TYPE_BTREE_LEAF,
260        NodeKind::Internal => PAGE_TYPE_BTREE_INTERNAL,
261    };
262    buf[OFF_PAGE_TYPE] = page_type;
263    buf[OFF_LEVEL] = node.level;
264    let key_count = match node.kind {
265        NodeKind::Leaf => node.leaves.len(),
266        NodeKind::Internal => node.internals.len(),
267    };
268    // Bounded by LEAF_SLOT_CAP / INTERNAL_SLOT_CAP at encode time; the
269    // narrowing cast is safe because both caps fit in u16 (LEAF_SLOT_CAP
270    // = (4092 - 24) / 4 = 1017 < 65535).
271    let kc16 = u16_from_usize(key_count);
272    buf[OFF_KEY_COUNT..OFF_KEY_COUNT + 2].copy_from_slice(&kc16.to_le_bytes());
273    // `free_space_off` is the heap watermark — the byte offset of
274    // the first heap entry currently in use. After a compacting
275    // encode this is `PAYLOAD_END - heap_bytes`.
276    let heap_bytes: usize = match node.kind {
277        NodeKind::Leaf => node
278            .leaves
279            .iter()
280            .map(|e| {
281                varint_len(u64_from_usize(e.key.len()))
282                    + e.key.len()
283                    + varint_len(u64_from_usize(e.value.len()))
284                    + e.value.len()
285            })
286            .sum(),
287        NodeKind::Internal => node
288            .internals
289            .iter()
290            .map(|e| varint_len(u64_from_usize(e.key.len())) + e.key.len())
291            .sum(),
292    };
293    let fso = u32_from_usize(PAYLOAD_END.saturating_sub(heap_bytes));
294    buf[OFF_FREE_SPACE_OFF..OFF_FREE_SPACE_OFF + 4].copy_from_slice(&fso.to_le_bytes());
295    buf[OFF_NEXT_SIBLING..OFF_NEXT_SIBLING + 8].copy_from_slice(&node.next_sibling.to_le_bytes());
296    // OFF_RESERVED (16..24) stays zero per the spec.
297    let _ = OFF_RESERVED;
298}
299
300fn encode_leaf_body(buf: &mut [u8; PAGE_SIZE], node: &DecodedNode) -> Result<()> {
301    let mut heap_cursor = PAYLOAD_END;
302    let mut slot_off = PAYLOAD_OFFSET;
303    for entry in &node.leaves {
304        let entry_len = varint_len(u64_from_usize(entry.key.len()))
305            + entry.key.len()
306            + varint_len(u64_from_usize(entry.value.len()))
307            + entry.value.len();
308        if heap_cursor < entry_len + slot_off + LEAF_SLOT_BYTES {
309            return Err(Error::BTreeInvariantViolated {
310                reason: "leaf encode: slot dir and heap collide",
311            });
312        }
313        heap_cursor -= entry_len;
314        let mut cur = heap_cursor;
315        let kl = u64_from_usize(entry.key.len());
316        cur += write_varint(&mut buf[cur..], kl);
317        buf[cur..cur + entry.key.len()].copy_from_slice(&entry.key);
318        cur += entry.key.len();
319        let vl = u64_from_usize(entry.value.len());
320        cur += write_varint(&mut buf[cur..], vl);
321        buf[cur..cur + entry.value.len()].copy_from_slice(&entry.value);
322        let off32 = u32_from_usize(heap_cursor);
323        buf[slot_off..slot_off + LEAF_SLOT_BYTES].copy_from_slice(&off32.to_le_bytes());
324        slot_off += LEAF_SLOT_BYTES;
325    }
326    Ok(())
327}
328
329fn encode_internal_body(buf: &mut [u8; PAGE_SIZE], node: &DecodedNode) -> Result<()> {
330    if node.children.len() != node.internals.len() + 1 {
331        return Err(Error::BTreeInvariantViolated {
332            reason: "internal node has children.len() != pivots+1",
333        });
334    }
335    let leftmost = node.children[0];
336    buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + INTERNAL_LEFTMOST_CHILD_BYTES]
337        .copy_from_slice(&leftmost.to_le_bytes());
338    let mut heap_cursor = PAYLOAD_END;
339    let mut slot_off = PAYLOAD_OFFSET + INTERNAL_LEFTMOST_CHILD_BYTES;
340    for (i, pivot) in node.internals.iter().enumerate() {
341        let right_child = node.children[i + 1];
342        let entry_len = varint_len(u64_from_usize(pivot.key.len())) + pivot.key.len();
343        if heap_cursor < entry_len + slot_off + INTERNAL_SLOT_BYTES {
344            return Err(Error::BTreeInvariantViolated {
345                reason: "internal encode: slot dir and heap collide",
346            });
347        }
348        heap_cursor -= entry_len;
349        let mut cur = heap_cursor;
350        cur += write_varint(&mut buf[cur..], u64_from_usize(pivot.key.len()));
351        buf[cur..cur + pivot.key.len()].copy_from_slice(&pivot.key);
352        let off32 = u32_from_usize(heap_cursor);
353        buf[slot_off..slot_off + 4].copy_from_slice(&off32.to_le_bytes());
354        buf[slot_off + 4..slot_off + INTERNAL_SLOT_BYTES]
355            .copy_from_slice(&right_child.to_le_bytes());
356        slot_off += INTERNAL_SLOT_BYTES;
357    }
358    Ok(())
359}
360
361/// Decode a B+tree page into a [`DecodedNode`].
362///
363/// # Errors
364///
365/// Returns [`Error::Corruption`] if any field is malformed: bad
366/// page-type tag, key-count exceeding the slot cap, slot offset
367/// outside the page, varint length running past the page end, or
368/// keys not in ascending order.
369///
370/// # Perf note (Phase 7A audit)
371///
372/// The three `Vec`s on [`DecodedNode`] (`children`, `leaves`,
373/// `internals`) stay heap-allocated rather than `heapless::Vec`-
374/// backed. See the module-level Rule 3 note for the full
375/// allocator-vs-stack-frame analysis; in summary, the outer-vec
376/// cost is one allocation per leaf / two per internal — dwarfed
377/// by the 2N inner `Vec<u8>` allocations for the entry keys and
378/// values. The inner allocations are the real hot-path tax and
379/// require a borrowed-slice refactor of the node API to remove.
380pub fn decode_node(buf: &[u8; PAGE_SIZE]) -> Result<DecodedNode> {
381    let page_type = buf[OFF_PAGE_TYPE];
382    let kind = match page_type {
383        PAGE_TYPE_BTREE_LEAF => NodeKind::Leaf,
384        PAGE_TYPE_BTREE_INTERNAL => NodeKind::Internal,
385        _ => return Err(Error::Corruption { page_id: 0 }),
386    };
387    let level = buf[OFF_LEVEL];
388    let key_count = u16::from_le_bytes([buf[OFF_KEY_COUNT], buf[OFF_KEY_COUNT + 1]]) as usize;
389    let next_sibling = u64::from_le_bytes([
390        buf[OFF_NEXT_SIBLING],
391        buf[OFF_NEXT_SIBLING + 1],
392        buf[OFF_NEXT_SIBLING + 2],
393        buf[OFF_NEXT_SIBLING + 3],
394        buf[OFF_NEXT_SIBLING + 4],
395        buf[OFF_NEXT_SIBLING + 5],
396        buf[OFF_NEXT_SIBLING + 6],
397        buf[OFF_NEXT_SIBLING + 7],
398    ]);
399    let mut node = DecodedNode {
400        kind,
401        level,
402        next_sibling,
403        children: Vec::new(),
404        leaves: Vec::new(),
405        internals: Vec::new(),
406    };
407    match kind {
408        NodeKind::Leaf => decode_leaf_body(buf, &mut node, key_count)?,
409        NodeKind::Internal => decode_internal_body(buf, &mut node, key_count)?,
410    }
411    validate_node_release(&node)?;
412    Ok(node)
413}
414
415/// Read only the node-kind tag from a B+tree page header.
416///
417/// O(1), allocation-free. Used by the snapshot read path to decide
418/// whether to descend (full [`decode_node`] for an internal node, so
419/// `choose_child` has the pivots) or to copy out a single value via
420/// [`decode_node_find`] (a leaf). A malformed page-type tag surfaces
421/// as [`Error::Corruption`] identically to [`decode_node`].
422///
423/// # Errors
424///
425/// [`Error::Corruption`] if the page-type tag is neither leaf nor
426/// internal.
427pub fn peek_node_kind(buf: &[u8; PAGE_SIZE]) -> Result<NodeKind> {
428    match buf[OFF_PAGE_TYPE] {
429        PAGE_TYPE_BTREE_LEAF => Ok(NodeKind::Leaf),
430        PAGE_TYPE_BTREE_INTERNAL => Ok(NodeKind::Internal),
431        _ => Err(Error::Corruption { page_id: 0 }),
432    }
433}
434
435/// Find-only leaf decode: scan a leaf page for `target` and copy out
436/// ONLY the matched value as an owned `Vec<u8>` — never the whole
437/// 2N-entry node.
438///
439/// This is the read-path fast lane for [`crate::btree::BTree::get`] /
440/// `get_via_snapshot`. The page must be a leaf; an internal-node tag
441/// is rejected as [`Error::Corruption`] (a read never calls this on an
442/// internal node — `get_via_snapshot` gates on [`peek_node_kind`] and
443/// `BTree::get` descends to a leaf first).
444///
445/// # Safety / corruption posture
446///
447/// This path keeps EVERY per-slot integrity check that [`decode_node`]
448/// applies to the slots it actually reads: the leaf header invariants
449/// (`level == 0`, `key_count <= LEAF_SLOT_CAP`), the slot-offset range
450/// check, `read_length_prefixed`'s `checked_add` overflow guard and
451/// payload-end bound, the `key.len() <= max_key_len()` cap, and the
452/// inline strictly-ascending-key check. A tampered page therefore
453/// yields [`Error::Corruption`] identically to the full decode. The
454/// ONLY check it skips is the whole-node `windows(2)` ordering
455/// re-walk in `validate_node_release`, which is redundant because
456/// the inline ascending check is re-derived on every key the scan
457/// compares. No data-derived index touches `buf[i]` raw — every such
458/// access goes through `read_length_prefixed`'s checked slicing.
459///
460/// # Errors
461///
462/// [`Error::Corruption`] on any malformed header, slot, or varint;
463/// [`Error::BTreeInvariantViolated`] if the leaf header invariants
464/// fail.
465pub fn decode_node_find(buf: &[u8; PAGE_SIZE], target: &[u8]) -> Result<Option<Vec<u8>>> {
466    match peek_node_kind(buf)? {
467        NodeKind::Leaf => {}
468        NodeKind::Internal => {
469            return Err(Error::BTreeInvariantViolated {
470                reason: "decode_node_find called on an internal node",
471            });
472        }
473    }
474    // Leaf structural header invariants (the O(1) subset of the full
475    // validate that the inline scan does not cover).
476    if buf[OFF_LEVEL] != 0 {
477        return Err(Error::BTreeInvariantViolated {
478            reason: "leaf has non-zero level",
479        });
480    }
481    let key_count = u16::from_le_bytes([buf[OFF_KEY_COUNT], buf[OFF_KEY_COUNT + 1]]) as usize;
482    if key_count > LEAF_SLOT_CAP {
483        return Err(Error::Corruption { page_id: 0 });
484    }
485    find_leaf_value(buf, key_count, target)
486}
487
488/// Linear scan of a validated-header leaf for `target`. Applies the
489/// identical per-slot bounds / varint / ascending-key checks as
490/// [`decode_leaf_body`] on every slot it visits, comparing keys as
491/// borrowed `&[u8]` slices into `buf` — no per-entry `to_vec`.
492fn find_leaf_value(
493    buf: &[u8; PAGE_SIZE],
494    key_count: usize,
495    target: &[u8],
496) -> Result<Option<Vec<u8>>> {
497    let mut prev_key: Option<&[u8]> = None;
498    for i in 0..key_count {
499        let slot_off = PAYLOAD_OFFSET + i * LEAF_SLOT_BYTES;
500        if slot_off + LEAF_SLOT_BYTES > PAYLOAD_END {
501            return Err(Error::Corruption { page_id: 0 });
502        }
503        let entry_off = u32::from_le_bytes([
504            buf[slot_off],
505            buf[slot_off + 1],
506            buf[slot_off + 2],
507            buf[slot_off + 3],
508        ]) as usize;
509        if !(PAYLOAD_OFFSET..PAYLOAD_END).contains(&entry_off) {
510            return Err(Error::Corruption { page_id: 0 });
511        }
512        let (key, after_key) = read_length_prefixed(buf, entry_off)?;
513        let (value, _) = read_length_prefixed(buf, after_key)?;
514        if key.len() > max_key_len() {
515            return Err(Error::Corruption { page_id: 0 });
516        }
517        // Identical inline strictly-ascending check as decode_leaf_body.
518        if let Some(prev) = prev_key {
519            if key <= prev {
520                return Err(Error::Corruption { page_id: 0 });
521            }
522        }
523        // Copy out ONLY the matched value; keep scanning otherwise so
524        // every slot's checks still run (corruption past the match is
525        // not silently ignored on a hit at index i).
526        if key == target {
527            return Ok(Some(value.to_vec()));
528        }
529        prev_key = Some(key);
530    }
531    Ok(None)
532}
533
534fn decode_leaf_body(buf: &[u8; PAGE_SIZE], node: &mut DecodedNode, key_count: usize) -> Result<()> {
535    if key_count > LEAF_SLOT_CAP {
536        return Err(Error::Corruption { page_id: 0 });
537    }
538    node.leaves.reserve(key_count);
539    for i in 0..key_count {
540        let slot_off = PAYLOAD_OFFSET + i * LEAF_SLOT_BYTES;
541        if slot_off + LEAF_SLOT_BYTES > PAYLOAD_END {
542            return Err(Error::Corruption { page_id: 0 });
543        }
544        let entry_off = u32::from_le_bytes([
545            buf[slot_off],
546            buf[slot_off + 1],
547            buf[slot_off + 2],
548            buf[slot_off + 3],
549        ]) as usize;
550        if !(PAYLOAD_OFFSET..PAYLOAD_END).contains(&entry_off) {
551            return Err(Error::Corruption { page_id: 0 });
552        }
553        let (key, after_key) = read_length_prefixed(buf, entry_off)?;
554        let (value, _) = read_length_prefixed(buf, after_key)?;
555        if key.len() > max_key_len() {
556            return Err(Error::Corruption { page_id: 0 });
557        }
558        // Inline strictly-ascending check against the last key already
559        // in the vec — no per-iteration `prev_key` clone needed.
560        if let Some(prev) = node.leaves.last() {
561            if key <= prev.key.as_slice() {
562                return Err(Error::Corruption { page_id: 0 });
563            }
564        }
565        node.leaves.push(LeafEntry {
566            key: key.to_vec(),
567            value: value.to_vec(),
568        });
569    }
570    Ok(())
571}
572
573fn decode_internal_body(
574    buf: &[u8; PAGE_SIZE],
575    node: &mut DecodedNode,
576    key_count: usize,
577) -> Result<()> {
578    if key_count > INTERNAL_SLOT_CAP {
579        return Err(Error::Corruption { page_id: 0 });
580    }
581    let leftmost = read_u64(buf, PAYLOAD_OFFSET);
582    if leftmost == 0 {
583        return Err(Error::Corruption { page_id: 0 });
584    }
585    node.children.reserve(key_count + 1);
586    node.internals.reserve(key_count);
587    node.children.push(leftmost);
588    for i in 0..key_count {
589        let (key_vec, right_child) = decode_internal_slot(buf, i)?;
590        // Inline strictly-ascending check against the last pivot
591        // already in the vec — no per-iteration `prev_key` clone.
592        if let Some(prev) = node.internals.last() {
593            if key_vec.as_slice() <= prev.key.as_slice() {
594                return Err(Error::Corruption { page_id: 0 });
595            }
596        }
597        node.internals.push(InternalEntry { key: key_vec });
598        node.children.push(right_child);
599    }
600    Ok(())
601}
602
603/// Decode the i-th slot of an internal-node body. Returns the pivot
604/// key and the right-child page-id, or `Error::Corruption` if the
605/// slot is malformed.
606fn decode_internal_slot(buf: &[u8; PAGE_SIZE], i: usize) -> Result<(Vec<u8>, u64)> {
607    let slot_off = PAYLOAD_OFFSET + INTERNAL_LEFTMOST_CHILD_BYTES + i * INTERNAL_SLOT_BYTES;
608    if slot_off + INTERNAL_SLOT_BYTES > PAYLOAD_END {
609        return Err(Error::Corruption { page_id: 0 });
610    }
611    let entry_off = u32::from_le_bytes([
612        buf[slot_off],
613        buf[slot_off + 1],
614        buf[slot_off + 2],
615        buf[slot_off + 3],
616    ]) as usize;
617    let right_child = read_u64(buf, slot_off + 4);
618    if right_child == 0 || !(PAYLOAD_OFFSET..PAYLOAD_END).contains(&entry_off) {
619        return Err(Error::Corruption { page_id: 0 });
620    }
621    let (key, _) = read_length_prefixed(buf, entry_off)?;
622    if key.len() > max_key_len() {
623        return Err(Error::Corruption { page_id: 0 });
624    }
625    Ok((key.to_vec(), right_child))
626}
627
628/// Read a u64 LE from `buf` at `offset`. Panics if `offset + 8` is
629/// out of bounds; callers must validate the offset.
630fn read_u64(buf: &[u8; PAGE_SIZE], offset: usize) -> u64 {
631    u64::from_le_bytes([
632        buf[offset],
633        buf[offset + 1],
634        buf[offset + 2],
635        buf[offset + 3],
636        buf[offset + 4],
637        buf[offset + 5],
638        buf[offset + 6],
639        buf[offset + 7],
640    ])
641}
642
643/// Read a `(length-prefixed bytes, next_offset)` pair from `buf`
644/// starting at `offset`. Length is an unsigned LEB128 varint; the
645/// payload is `length` raw bytes.
646///
647/// # Power-of-ten Rule 7 posture
648///
649/// `len` is caller-controlled (it lives on disk in a page byte that
650/// can be tampered with or random in a fuzz harness). The
651/// `after + len_usize` end-of-slice computation MUST use
652/// `checked_add`: with `overflow-checks = true` in release (per
653/// Rule 10) a raw `+` panics on a wrap; without overflow-checks the
654/// bounds check is silently bypassed. Either failure mode is a
655/// denial-of-service hazard reachable from
656/// `Pager::read_page → BTree::open → decode_node` on any tampered
657/// B+tree page (M13 #115).
658fn read_length_prefixed(buf: &[u8; PAGE_SIZE], offset: usize) -> Result<(&[u8], usize)> {
659    if offset >= PAYLOAD_END {
660        return Err(Error::Corruption { page_id: 0 });
661    }
662    let (len, after) = read_varint(buf, offset)?;
663    let len_usize = usize_from_u64(len)?;
664    // Rule 7: `len_usize` is unbounded (varint domain is u64); raw
665    // `after + len_usize` wraps on tampered input.
666    let end = after
667        .checked_add(len_usize)
668        .ok_or(Error::Corruption { page_id: 0 })?;
669    if end > PAYLOAD_END {
670        return Err(Error::Corruption { page_id: 0 });
671    }
672    Ok((&buf[after..end], end))
673}
674
675/// Validate `node` against the format-version-0 invariants.
676///
677/// # Errors
678///
679/// Returns [`Error::BTreeInvariantViolated`] if any invariant fails.
680/// Callers can `debug_assert!` on a wrapping helper for development
681/// builds; release builds get the error surfaced cleanly.
682pub fn validate_node(node: &DecodedNode) -> Result<()> {
683    validate_node_release(node)
684}
685
686fn validate_node_release(node: &DecodedNode) -> Result<()> {
687    validate_node_structural(node)?;
688    validate_node_ordering(node)
689}
690
691/// The cheap O(1) header / capacity / children invariants that the
692/// inline decode loop does *not* enforce: leaf `level == 0`, internal
693/// `level != 0`, internal `next_sibling == 0`,
694/// `children.len() == pivots + 1`, slot-cap bounds, and non-zero
695/// internal child page-ids.
696///
697/// This is the only part of the full validate that the read find path
698/// needs: the strictly-ascending key ordering ([`validate_node_ordering`])
699/// is enforced inline during decode, so the find path — which already
700/// re-derives the same ordering on every slot it touches — does not
701/// repeat the whole-node `windows(2)` re-walk.
702fn validate_node_structural(node: &DecodedNode) -> Result<()> {
703    match node.kind {
704        NodeKind::Leaf => {
705            if node.level != 0 {
706                return Err(Error::BTreeInvariantViolated {
707                    reason: "leaf has non-zero level",
708                });
709            }
710            if node.leaves.len() > LEAF_SLOT_CAP {
711                return Err(Error::BTreeInvariantViolated {
712                    reason: "leaf key count exceeds slot cap",
713                });
714            }
715        }
716        NodeKind::Internal => {
717            if node.level == 0 {
718                return Err(Error::BTreeInvariantViolated {
719                    reason: "internal node at level 0",
720                });
721            }
722            if node.internals.len() > INTERNAL_SLOT_CAP {
723                return Err(Error::BTreeInvariantViolated {
724                    reason: "internal key count exceeds slot cap",
725                });
726            }
727            if node.children.len() != node.internals.len() + 1 {
728                return Err(Error::BTreeInvariantViolated {
729                    reason: "internal children.len() != pivots+1",
730                });
731            }
732            for c in &node.children {
733                if *c == 0 {
734                    return Err(Error::BTreeInvariantViolated {
735                        reason: "internal node has zero child page-id",
736                    });
737                }
738            }
739            if node.next_sibling != 0 {
740                return Err(Error::BTreeInvariantViolated {
741                    reason: "internal node has non-zero next_sibling",
742                });
743            }
744        }
745    }
746    Ok(())
747}
748
749/// The strictly-ascending key-ordering re-walk. On the full decode and
750/// every mutation/integrity caller this confirms the whole node is
751/// sorted; the read find path skips it because [`decode_leaf_body`] /
752/// [`decode_internal_body`] enforce the identical inline check on every
753/// key as it is pushed (node.rs leaf/internal `last()` compares).
754fn validate_node_ordering(node: &DecodedNode) -> Result<()> {
755    match node.kind {
756        NodeKind::Leaf => {
757            for w in node.leaves.windows(2) {
758                if w[0].key.as_slice() >= w[1].key.as_slice() {
759                    return Err(Error::BTreeInvariantViolated {
760                        reason: "leaf keys not strictly sorted",
761                    });
762                }
763            }
764        }
765        NodeKind::Internal => {
766            for w in node.internals.windows(2) {
767                if w[0].key.as_slice() >= w[1].key.as_slice() {
768                    return Err(Error::BTreeInvariantViolated {
769                        reason: "internal keys not strictly sorted",
770                    });
771                }
772            }
773        }
774    }
775    Ok(())
776}
777
778// --------- varint helpers ---------
779
780/// Number of bytes an unsigned LEB128 varint would use for `v`.
781#[must_use]
782pub fn varint_len(v: u64) -> usize {
783    let mut n: usize = 1;
784    let mut x = v >> 7;
785    while x != 0 {
786        n += 1;
787        x >>= 7;
788    }
789    n
790}
791
792/// Write an unsigned LEB128 varint into `dst` starting at byte 0.
793/// Returns the number of bytes written.
794fn write_varint(dst: &mut [u8], mut v: u64) -> usize {
795    let mut i = 0;
796    loop {
797        let mut byte = (v & 0x7F) as u8;
798        v >>= 7;
799        if v != 0 {
800            byte |= 0x80;
801            dst[i] = byte;
802            i += 1;
803        } else {
804            dst[i] = byte;
805            i += 1;
806            return i;
807        }
808    }
809}
810
811/// Read an unsigned LEB128 varint from `buf` starting at `offset`.
812/// Returns `(value, next_offset)`. Power-of-ten Rule 2: the loop is
813/// bounded by 10 bytes (`ceil(64 / 7)`), which is the maximum a
814/// 64-bit varint can occupy.
815fn read_varint(buf: &[u8; PAGE_SIZE], offset: usize) -> Result<(u64, usize)> {
816    let mut value: u64 = 0;
817    let mut shift: u32 = 0;
818    let mut i = offset;
819    for _ in 0..10 {
820        if i >= PAYLOAD_END {
821            return Err(Error::Corruption { page_id: 0 });
822        }
823        let byte = buf[i];
824        i += 1;
825        let chunk = u64::from(byte & 0x7F);
826        // `shift` is in range [0..63]; the cast cannot truncate.
827        value |= chunk << shift;
828        if (byte & 0x80) == 0 {
829            return Ok((value, i));
830        }
831        shift += 7;
832        if shift >= 64 {
833            return Err(Error::Corruption { page_id: 0 });
834        }
835    }
836    Err(Error::Corruption { page_id: 0 })
837}
838
839// --------- narrow casts ---------
840
841fn u32_from_usize(v: usize) -> u32 {
842    // PAYLOAD_END < 2^32; every caller passes such a value.
843    debug_assert!(u32::try_from(v).is_ok());
844    // Saturating downcast: the asserted bound is the binding contract;
845    // a release-mode violation would still return a too-large u32, which
846    // an encoder cross-check (decode_node) would catch on read.
847    u32::try_from(v).unwrap_or(u32::MAX)
848}
849
850fn u16_from_usize(v: usize) -> u16 {
851    debug_assert!(u16::try_from(v).is_ok());
852    u16::try_from(v).unwrap_or(u16::MAX)
853}
854
855fn u64_from_usize(v: usize) -> u64 {
856    v as u64
857}
858
859fn usize_from_u64(v: u64) -> Result<usize> {
860    usize::try_from(v).map_err(|_| Error::Corruption { page_id: 0 })
861}
862
863#[cfg(test)]
864mod tests {
865    use super::*;
866
867    fn empty_leaf() -> DecodedNode {
868        DecodedNode {
869            kind: NodeKind::Leaf,
870            level: 0,
871            next_sibling: 0,
872            children: Vec::new(),
873            leaves: Vec::new(),
874            internals: Vec::new(),
875        }
876    }
877
878    fn leaf_with(entries: &[(&[u8], &[u8])]) -> DecodedNode {
879        let mut leaf = empty_leaf();
880        for (k, v) in entries {
881            leaf.leaves.push(LeafEntry {
882                key: k.to_vec(),
883                value: v.to_vec(),
884            });
885        }
886        leaf
887    }
888
889    #[test]
890    fn round_trip_empty_leaf() {
891        let leaf = empty_leaf();
892        let mut page = Page::zeroed();
893        encode_node(&leaf, &mut page).expect("encode");
894        let decoded = decode_node(page.as_bytes()).expect("decode");
895        assert_eq!(decoded.kind, NodeKind::Leaf);
896        assert_eq!(decoded.level, 0);
897        assert_eq!(decoded.next_sibling, 0);
898        assert!(decoded.leaves.is_empty());
899    }
900
901    #[test]
902    fn round_trip_populated_leaf() {
903        let leaf = leaf_with(&[(b"alpha", b"AAA"), (b"bravo", b"BBB"), (b"charlie", b"CCC")]);
904        let mut page = Page::zeroed();
905        encode_node(&leaf, &mut page).expect("encode");
906        let decoded = decode_node(page.as_bytes()).expect("decode");
907        assert_eq!(decoded.leaves.len(), 3);
908        assert_eq!(decoded.leaves[0].key.as_slice(), b"alpha");
909        assert_eq!(decoded.leaves[0].value.as_slice(), b"AAA");
910        assert_eq!(decoded.leaves[2].key.as_slice(), b"charlie");
911    }
912
913    #[test]
914    fn round_trip_internal() {
915        let mut internal = DecodedNode {
916            kind: NodeKind::Internal,
917            level: 1,
918            next_sibling: 0,
919            children: Vec::new(),
920            leaves: Vec::new(),
921            internals: Vec::new(),
922        };
923        // 3 pivots → 4 children.
924        for raw in [10u64, 20, 30, 40] {
925            internal.children.push(raw);
926        }
927        for k in [b"d".as_slice(), b"h", b"m"] {
928            internal.internals.push(InternalEntry { key: k.to_vec() });
929        }
930        let mut page = Page::zeroed();
931        encode_node(&internal, &mut page).expect("encode");
932        let decoded = decode_node(page.as_bytes()).expect("decode");
933        assert_eq!(decoded.kind, NodeKind::Internal);
934        assert_eq!(decoded.level, 1);
935        assert_eq!(decoded.internals.len(), 3);
936        assert_eq!(decoded.children.as_slice(), &[10, 20, 30, 40]);
937        assert_eq!(decoded.internals[0].key.as_slice(), b"d");
938        assert_eq!(decoded.internals[2].key.as_slice(), b"m");
939    }
940
941    #[test]
942    fn decode_rejects_unsorted_leaf() {
943        // Encode a deliberately-malformed leaf by hand: write two
944        // out-of-order entries and check decode errors.
945        let mut page = Page::zeroed();
946        let buf = page.as_bytes_mut();
947        buf[OFF_PAGE_TYPE] = PAGE_TYPE_BTREE_LEAF;
948        buf[OFF_LEVEL] = 0;
949        buf[OFF_KEY_COUNT..OFF_KEY_COUNT + 2].copy_from_slice(&2u16.to_le_bytes());
950        // Slot 0 at PAYLOAD_END - 5  ("c" + "x").
951        // Slot 1 at PAYLOAD_END - 10 ("a" + "y").
952        let slot0 = u32_from_usize(PAYLOAD_END - 4);
953        let slot1 = u32_from_usize(PAYLOAD_END - 8);
954        buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + 4].copy_from_slice(&slot0.to_le_bytes());
955        buf[PAYLOAD_OFFSET + 4..PAYLOAD_OFFSET + 8].copy_from_slice(&slot1.to_le_bytes());
956        // Heap entry for slot 0: key="c", value="x"  -> 1,'c',1,'x'.
957        buf[PAYLOAD_END - 4] = 1;
958        buf[PAYLOAD_END - 3] = b'c';
959        buf[PAYLOAD_END - 2] = 1;
960        buf[PAYLOAD_END - 1] = b'x';
961        // Heap entry for slot 1: key="a", value="y"  -> 1,'a',1,'y'.
962        buf[PAYLOAD_END - 8] = 1;
963        buf[PAYLOAD_END - 7] = b'a';
964        buf[PAYLOAD_END - 6] = 1;
965        buf[PAYLOAD_END - 5] = b'y';
966        crate::pager::checksum::write_page_trailer(&mut page);
967        let err = decode_node(page.as_bytes()).expect_err("decode must reject");
968        assert!(matches!(err, Error::Corruption { .. }));
969    }
970
971    #[test]
972    fn varint_round_trip() {
973        for v in [0u64, 1, 127, 128, 0xFFFF, 0xDEAD_BEEF, u64::MAX] {
974            let mut buf = [0u8; 16];
975            let n = write_varint(&mut buf, v);
976            assert_eq!(n, varint_len(v));
977            // Read back using a faux PAGE_SIZE buffer.
978            let mut page = [0u8; PAGE_SIZE];
979            page[..n].copy_from_slice(&buf[..n]);
980            let (decoded, after) = read_varint(&page, 0).expect("decode varint");
981            assert_eq!(decoded, v);
982            assert_eq!(after, n);
983        }
984    }
985
986    /// M13 #115 regression: a tampered B+tree leaf carrying a
987    /// length-prefix varint that encodes `u64::MAX` must return
988    /// `Error::Corruption`, NOT panic in `read_length_prefixed` on
989    /// the `after + len_usize` overflow.
990    ///
991    /// Critical: this test must also pass under `cargo test --release`
992    /// — the workspace profile enables `overflow-checks = true` per
993    /// power-of-ten Rule 10, which is the mechanism that turned the
994    /// arithmetic wrap into a panic in the first place. A `checked_add`
995    /// fix is the only thing that makes both debug and release happy.
996    #[test]
997    fn decode_node_varint_max_returns_corruption_not_panic() {
998        let mut page = Page::zeroed();
999        {
1000            let buf = page.as_bytes_mut();
1001            buf[OFF_PAGE_TYPE] = PAGE_TYPE_BTREE_LEAF;
1002            buf[OFF_LEVEL] = 0;
1003            // One slot. The varint at the slot's entry-offset will
1004            // encode `u64::MAX`, which `read_length_prefixed` must
1005            // reject rather than wrap.
1006            buf[OFF_KEY_COUNT..OFF_KEY_COUNT + 2].copy_from_slice(&1u16.to_le_bytes());
1007            // Place the heap entry near the end of the payload. The
1008            // bug triggers BEFORE the bounds check, so the absolute
1009            // offset value only matters to the extent that it points
1010            // somewhere parseable.
1011            let entry_off = PAYLOAD_END - 16;
1012            let slot_off_bytes = u32_from_usize(entry_off).to_le_bytes();
1013            buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + 4].copy_from_slice(&slot_off_bytes);
1014            // 10-byte LEB128 encoding of `u64::MAX`. The 10th byte's
1015            // top bit is clear so `read_varint` accepts it; the
1016            // decoded length is `u64::MAX`, which `as usize` becomes
1017            // `usize::MAX` on 64-bit and triggers the `after + len`
1018            // overflow in `read_length_prefixed`.
1019            let varint = [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01];
1020            buf[entry_off..entry_off + varint.len()].copy_from_slice(&varint);
1021        }
1022        crate::pager::checksum::write_page_trailer(&mut page);
1023        let result = decode_node(page.as_bytes());
1024        match result {
1025            Err(Error::Corruption { .. }) => {}
1026            other => panic!(
1027                "expected Err(Error::Corruption), got {:?} \
1028                 (a panic here would indicate the M13 #115 bug \
1029                  is still present in release builds)",
1030                other.map_or("non-corruption err", |_| "Ok(_)")
1031            ),
1032        }
1033    }
1034
1035    /// M13 #115 regression for the internal-node variant: a tampered
1036    /// internal-node entry pointing at a varint that encodes
1037    /// `u64::MAX` must surface as `Error::Corruption`, not panic via
1038    /// `decode_internal_slot → read_length_prefixed`.
1039    #[test]
1040    fn decode_internal_node_varint_max_returns_corruption() {
1041        let mut page = Page::zeroed();
1042        {
1043            let buf = page.as_bytes_mut();
1044            buf[OFF_PAGE_TYPE] = PAGE_TYPE_BTREE_INTERNAL;
1045            buf[OFF_LEVEL] = 1;
1046            buf[OFF_KEY_COUNT..OFF_KEY_COUNT + 2].copy_from_slice(&1u16.to_le_bytes());
1047            // Leftmost child page-id — must be non-zero.
1048            buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + 8].copy_from_slice(&42u64.to_le_bytes());
1049            // One internal slot: u32 entry_off + u64 right_child.
1050            let slot_base = PAYLOAD_OFFSET + INTERNAL_LEFTMOST_CHILD_BYTES;
1051            let entry_off = PAYLOAD_END - 16;
1052            buf[slot_base..slot_base + 4].copy_from_slice(&u32_from_usize(entry_off).to_le_bytes());
1053            buf[slot_base + 4..slot_base + INTERNAL_SLOT_BYTES]
1054                .copy_from_slice(&7u64.to_le_bytes());
1055            // Same `u64::MAX` varint as the leaf test above.
1056            let varint = [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01];
1057            buf[entry_off..entry_off + varint.len()].copy_from_slice(&varint);
1058        }
1059        crate::pager::checksum::write_page_trailer(&mut page);
1060        let result = decode_node(page.as_bytes());
1061        assert!(
1062            matches!(result, Err(Error::Corruption { .. })),
1063            "expected Err(Error::Corruption) on u64::MAX varint in internal slot"
1064        );
1065    }
1066
1067    #[test]
1068    fn max_inline_value_is_below_payload() {
1069        let k = 8;
1070        let v = max_inline_value(k);
1071        // Compute slot+heap bytes for one entry at this max value
1072        // length and assert it fits.
1073        let entry_len = varint_len(u64_from_usize(k)) + k + varint_len(u64_from_usize(v)) + v;
1074        let payload = entry_len + LEAF_SLOT_BYTES;
1075        assert!(payload <= PAYLOAD_END - PAYLOAD_OFFSET);
1076    }
1077
1078    // ---- find-only path (#82) ----
1079
1080    /// The find-only path returns the matched value and `None` for a
1081    /// miss, agreeing with a full decode.
1082    #[test]
1083    fn decode_node_find_matches_full_decode() {
1084        let leaf = leaf_with(&[(b"alpha", b"AAA"), (b"bravo", b"BBB"), (b"charlie", b"CCC")]);
1085        let mut page = Page::zeroed();
1086        encode_node(&leaf, &mut page).expect("encode");
1087        let found = decode_node_find(page.as_bytes(), b"bravo").expect("find");
1088        assert_eq!(found.as_deref(), Some(b"BBB".as_slice()));
1089        let missing = decode_node_find(page.as_bytes(), b"zeta").expect("find");
1090        assert_eq!(missing, None);
1091        // First and last keys, exercising both scan ends.
1092        assert_eq!(
1093            decode_node_find(page.as_bytes(), b"alpha")
1094                .expect("find")
1095                .as_deref(),
1096            Some(b"AAA".as_slice())
1097        );
1098        assert_eq!(
1099            decode_node_find(page.as_bytes(), b"charlie")
1100                .expect("find")
1101                .as_deref(),
1102            Some(b"CCC".as_slice())
1103        );
1104    }
1105
1106    /// #82 safety regression (i): a tampered slot offset pointing
1107    /// outside the payload range must surface `Error::Corruption`
1108    /// THROUGH THE FIND PATH, not only through the full decode. The
1109    /// search targets `"b"`, which forces the scan to read the damaged
1110    /// slot 0 first.
1111    #[test]
1112    fn decode_node_find_rejects_corrupted_slot_offset() {
1113        let leaf = leaf_with(&[(b"a", b"x"), (b"b", b"y")]);
1114        let mut page = Page::zeroed();
1115        encode_node(&leaf, &mut page).expect("encode");
1116        let buf = page.as_bytes_mut();
1117        // Point slot 0's offset past the payload end → out of range.
1118        let bad = u32_from_usize(PAYLOAD_END + 4);
1119        buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + 4].copy_from_slice(&bad.to_le_bytes());
1120        crate::pager::checksum::write_page_trailer(&mut page);
1121        let r = decode_node_find(page.as_bytes(), b"b");
1122        assert!(
1123            matches!(r, Err(Error::Corruption { .. })),
1124            "corrupted slot offset not caught by find path: {r:?}"
1125        );
1126    }
1127
1128    /// #82 safety regression (ii): two out-of-order keys must surface
1129    /// `Error::Corruption` THROUGH THE FIND PATH. Built by hand since
1130    /// `encode_node` would reject a descending leaf; the search target
1131    /// `"z"` (a miss) forces the full scan to reach the descending
1132    /// pair at slot 1.
1133    #[test]
1134    fn decode_node_find_rejects_out_of_order_keys() {
1135        let mut page = Page::zeroed();
1136        {
1137            let buf = page.as_bytes_mut();
1138            buf[OFF_PAGE_TYPE] = PAGE_TYPE_BTREE_LEAF;
1139            buf[OFF_LEVEL] = 0;
1140            buf[OFF_KEY_COUNT..OFF_KEY_COUNT + 2].copy_from_slice(&2u16.to_le_bytes());
1141            // Slot 0 -> ("c","x"); slot 1 -> ("a","y") — descending.
1142            let slot0 = u32_from_usize(PAYLOAD_END - 4);
1143            let slot1 = u32_from_usize(PAYLOAD_END - 8);
1144            buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + 4].copy_from_slice(&slot0.to_le_bytes());
1145            buf[PAYLOAD_OFFSET + 4..PAYLOAD_OFFSET + 8].copy_from_slice(&slot1.to_le_bytes());
1146            buf[PAYLOAD_END - 4] = 1;
1147            buf[PAYLOAD_END - 3] = b'c';
1148            buf[PAYLOAD_END - 2] = 1;
1149            buf[PAYLOAD_END - 1] = b'x';
1150            buf[PAYLOAD_END - 8] = 1;
1151            buf[PAYLOAD_END - 7] = b'a';
1152            buf[PAYLOAD_END - 6] = 1;
1153            buf[PAYLOAD_END - 5] = b'y';
1154        }
1155        crate::pager::checksum::write_page_trailer(&mut page);
1156        let r = decode_node_find(page.as_bytes(), b"z");
1157        assert!(
1158            matches!(r, Err(Error::Corruption { .. })),
1159            "out-of-order key not caught by find path: {r:?}"
1160        );
1161    }
1162
1163    /// #82 safety regression (iii): a key whose length prefix claims
1164    /// `max_key_len() + 1` bytes (in-bounds for the page but over the
1165    /// format cap) must surface `Error::Corruption` THROUGH THE FIND
1166    /// PATH on the only slot it reads.
1167    #[test]
1168    fn decode_node_find_rejects_over_long_key() {
1169        let mut page = Page::zeroed();
1170        {
1171            let buf = page.as_bytes_mut();
1172            buf[OFF_PAGE_TYPE] = PAGE_TYPE_BTREE_LEAF;
1173            buf[OFF_LEVEL] = 0;
1174            buf[OFF_KEY_COUNT..OFF_KEY_COUNT + 2].copy_from_slice(&1u16.to_le_bytes());
1175            let over = max_key_len() + 1;
1176            let entry_off = PAYLOAD_OFFSET + LEAF_SLOT_BYTES;
1177            buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + 4]
1178                .copy_from_slice(&u32_from_usize(entry_off).to_le_bytes());
1179            let n = write_varint(&mut buf[entry_off..], u64_from_usize(over));
1180            assert_eq!(n, varint_len(u64_from_usize(over)));
1181            // Claimed key bytes stay zero; they fit because
1182            // max_key_len() = PAGE_SIZE/4 < the remaining payload.
1183        }
1184        crate::pager::checksum::write_page_trailer(&mut page);
1185        let r = decode_node_find(page.as_bytes(), b"anything");
1186        assert!(
1187            matches!(r, Err(Error::Corruption { .. })),
1188            "over-long key length not caught by find path: {r:?}"
1189        );
1190    }
1191
1192    /// The find path rejects an internal-node page: a read never calls
1193    /// it on an internal node, and doing so is an invariant violation.
1194    #[test]
1195    fn decode_node_find_rejects_internal_node() {
1196        let mut internal = DecodedNode {
1197            kind: NodeKind::Internal,
1198            level: 1,
1199            next_sibling: 0,
1200            children: Vec::new(),
1201            leaves: Vec::new(),
1202            internals: Vec::new(),
1203        };
1204        internal.children.push(10);
1205        internal.children.push(20);
1206        internal
1207            .internals
1208            .push(InternalEntry { key: b"m".to_vec() });
1209        let mut page = Page::zeroed();
1210        encode_node(&internal, &mut page).expect("encode");
1211        let r = decode_node_find(page.as_bytes(), b"m");
1212        assert!(matches!(r, Err(Error::BTreeInvariantViolated { .. })));
1213    }
1214}