Skip to main content

obj_core/btree/
delete.rs

1//! B+tree delete path with merge / borrow rebalancing.
2//!
3//! Mirrors `insert.rs`: every node touched is rewritten to a fresh
4//! page; pre-delete pages enter the freelist only after the new
5//! root is staged. The caller still owns the commit boundary —
6//! `BTree::delete` does NOT call `Pager::commit`.
7//!
8//! # Underflow policy
9//!
10//! A non-root node is "underflowing" when its post-delete occupied
11//! byte count falls below `MIN_OCCUPIED_BYTES = PAYLOAD_BYTES / 2`.
12//! At underflow:
13//! - If a sibling is rich enough that **moving one slot from it**
14//!   keeps both halves above the threshold, borrow.
15//! - Otherwise merge with the sibling. The two halves combine into
16//!   a single page; the separator pivot in the parent is removed
17//!   (leaf merge) or absorbed (internal merge).
18//!
19//! Root collapse: an internal root with exactly one child (zero
20//! pivots) is replaced by its child. A leaf root with zero slots
21//! is allowed and is the bootstrap "empty tree" state.
22//!
23//! # Power-of-ten posture
24//!
25//! - **Rule 1.** Iterative descent + iterative bubble-up; no
26//!   recursion.
27//! - **Rule 2.** Every loop is bounded by `MAX_BTREE_DEPTH` or a
28//!   node's slot count.
29//! - **Rule 5.** `validate_node` runs on every encoded node.
30
31#![forbid(unsafe_code)]
32
33use heapless::Vec as HeaplessVec;
34
35use crate::btree::insert::write_new_node;
36use crate::btree::node::{
37    decode_node, DecodedNode, InternalEntry, LeafEntry, NodeKind, NODE_HEADER_SIZE,
38};
39use crate::btree::{BTree, MAX_BTREE_DEPTH};
40use crate::error::{Error, Result};
41use crate::pager::page::{PageId, PAGE_SIZE, PAGE_TRAILER_SIZE};
42use crate::pager::Pager;
43use crate::platform::FileBackend;
44
45/// Available payload bytes (excludes the node header and the page
46/// trailer).
47const PAYLOAD_BYTES: usize = PAGE_SIZE - PAGE_TRAILER_SIZE - NODE_HEADER_SIZE;
48
49/// Underflow threshold: half-full. A non-root node whose
50/// `occupied_bytes()` drops below this is rebalanced via borrow or
51/// merge. This is the smallest threshold that keeps the B+tree's
52/// `O(log_F n)` guarantees with a worst-case fanout of 2.
53const MIN_OCCUPIED_BYTES: usize = PAYLOAD_BYTES / 2;
54
55struct PathFrame {
56    page_id: PageId,
57    node: DecodedNode,
58    /// Child index this frame's descent followed.
59    child_index: usize,
60}
61
62/// Outcome of replacing one node along the delete path: a fresh
63/// page-id and an "underflow" hint the parent uses to decide
64/// whether to rebalance.
65#[derive(Debug, Clone, Copy)]
66struct ReplaceOutcome {
67    new_id: PageId,
68    /// Whether the new node falls below the underflow threshold.
69    /// The parent inspects this to decide whether to borrow or
70    /// merge.
71    underflow: bool,
72}
73
74impl<F: FileBackend> BTree<F> {
75    /// Remove `key` from the tree. Returns `Ok(true)` if it was
76    /// present (and removed), `Ok(false)` if the key was absent.
77    ///
78    /// # Errors
79    ///
80    /// Propagates pager / decode errors. Surfaces
81    /// `Error::BTreeDepthExceeded` if the tree height exceeds
82    /// `MAX_BTREE_DEPTH`. No `panic`s.
83    pub fn delete(&mut self, pager: &mut Pager<F>, key: &[u8]) -> Result<bool> {
84        let path = self.descend_path_for_delete(pager, key)?;
85        self.apply_delete(pager, path, key)
86    }
87
88    fn descend_path_for_delete(
89        &self,
90        pager: &mut Pager<F>,
91        key: &[u8],
92    ) -> Result<HeaplessVec<PathFrame, MAX_BTREE_DEPTH>> {
93        let mut path: HeaplessVec<PathFrame, MAX_BTREE_DEPTH> = HeaplessVec::new();
94        let mut current = self.root;
95        loop {
96            let decoded = {
97                let page_ref = pager.read_page(current)?;
98                decode_node(page_ref.as_bytes())?
99            };
100            match decoded.kind {
101                NodeKind::Leaf => {
102                    let frame = PathFrame {
103                        page_id: current,
104                        node: decoded,
105                        child_index: 0,
106                    };
107                    push_path(&mut path, frame)?;
108                    return Ok(path);
109                }
110                NodeKind::Internal => {
111                    let child_index = pivot_index(&decoded, key);
112                    let next = PageId::new(decoded.children[child_index]).ok_or(
113                        Error::BTreeInvariantViolated {
114                            reason: "internal node had zero child page-id",
115                        },
116                    )?;
117                    let frame = PathFrame {
118                        page_id: current,
119                        node: decoded,
120                        child_index,
121                    };
122                    push_path(&mut path, frame)?;
123                    current = next;
124                }
125            }
126        }
127    }
128
129    fn apply_delete(
130        &mut self,
131        pager: &mut Pager<F>,
132        mut path: HeaplessVec<PathFrame, MAX_BTREE_DEPTH>,
133        key: &[u8],
134    ) -> Result<bool> {
135        let mut freed: HeaplessVec<PageId, { MAX_BTREE_DEPTH * 4 }> = HeaplessVec::new();
136        let Some(leaf_frame) = path.pop() else {
137            return Err(Error::BTreeInvariantViolated {
138                reason: "delete: descend returned empty path",
139            });
140        };
141        let found = leaf_frame
142            .node
143            .leaves
144            .iter()
145            .any(|e| e.key.as_slice() == key);
146        if !found {
147            return Ok(false);
148        }
149        let leaf_outcome = remove_from_leaf(pager, leaf_frame, key, &mut freed)?;
150        let mut outcome = leaf_outcome;
151        while let Some(parent_frame) = path.pop() {
152            outcome = process_parent(pager, parent_frame, outcome, &mut freed, &mut path)?;
153        }
154        self.root = outcome.new_id;
155        // Try collapsing a single-child internal root into its child.
156        self.collapse_root(pager, &mut freed)?;
157        for old_id in freed.iter().copied() {
158            pager.free_page(old_id)?;
159        }
160        Ok(true)
161    }
162
163    /// If the current root is an internal node with exactly one
164    /// child (zero pivots), replace it with that child and free the
165    /// old root. Idempotent for any other root shape.
166    fn collapse_root(
167        &mut self,
168        pager: &mut Pager<F>,
169        freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 4 }>,
170    ) -> Result<()> {
171        loop {
172            let root_id = self.root;
173            let decoded = {
174                let page_ref = pager.read_page(root_id)?;
175                decode_node(page_ref.as_bytes())?
176            };
177            if !matches!(decoded.kind, NodeKind::Internal) {
178                return Ok(());
179            }
180            if !decoded.internals.is_empty() || decoded.children.len() != 1 {
181                return Ok(());
182            }
183            let only_child =
184                PageId::new(decoded.children[0]).ok_or(Error::BTreeInvariantViolated {
185                    reason: "collapse_root: zero child page-id",
186                })?;
187            self.root = only_child;
188            push_freed(freed, root_id)?;
189        }
190    }
191}
192
193fn push_path(path: &mut HeaplessVec<PathFrame, MAX_BTREE_DEPTH>, frame: PathFrame) -> Result<()> {
194    path.push(frame).map_err(|_| Error::BTreeDepthExceeded {
195        limit: MAX_BTREE_DEPTH,
196    })
197}
198
199fn push_freed(freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 4 }>, id: PageId) -> Result<()> {
200    freed.push(id).map_err(|_| Error::BTreeInvariantViolated {
201        reason: "delete: too many displaced pages to track",
202    })
203}
204
205fn pivot_index(node: &DecodedNode, key: &[u8]) -> usize {
206    let mut idx = node.internals.len();
207    for (i, pivot) in node.internals.iter().enumerate() {
208        if pivot.key.as_slice() > key {
209            idx = i;
210            break;
211        }
212    }
213    idx
214}
215
216/// Remove the slot whose key equals `key` from `frame.node` (a
217/// leaf), rewrite it as a fresh page, and report underflow.
218fn remove_from_leaf<F: FileBackend>(
219    pager: &mut Pager<F>,
220    frame: PathFrame,
221    key: &[u8],
222    freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 4 }>,
223) -> Result<ReplaceOutcome> {
224    let mut leaf = frame.node;
225    let Some(idx) = leaf.leaves.iter().position(|e| e.key.as_slice() == key) else {
226        return Err(Error::BTreeInvariantViolated {
227            reason: "remove_from_leaf: descend located absent key",
228        });
229    };
230    leaf.leaves.remove(idx);
231    let occupied = leaf.occupied_bytes();
232    let underflow = occupied < MIN_OCCUPIED_BYTES;
233    push_freed(freed, frame.page_id)?;
234    let new_id = write_new_node(pager, &leaf)?;
235    Ok(ReplaceOutcome { new_id, underflow })
236}
237
238/// Apply the child's outcome to a parent frame, possibly
239/// rebalancing on underflow.
240fn process_parent<F: FileBackend>(
241    pager: &mut Pager<F>,
242    frame: PathFrame,
243    child_outcome: ReplaceOutcome,
244    freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 4 }>,
245    _path: &mut HeaplessVec<PathFrame, MAX_BTREE_DEPTH>,
246) -> Result<ReplaceOutcome> {
247    let mut internal = frame.node;
248    let child_index = frame.child_index;
249    internal.children[child_index] = child_outcome.new_id.get();
250    if child_outcome.underflow {
251        rebalance_under_parent(pager, &mut internal, child_index, freed)?;
252    }
253    finalize_parent(pager, &internal, frame.page_id, freed)
254}
255
256/// Decide whether the rebalanced parent itself underflows, encode
257/// it, and return the outcome.
258fn finalize_parent<F: FileBackend>(
259    pager: &mut Pager<F>,
260    internal: &DecodedNode,
261    old_id: PageId,
262    freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 4 }>,
263) -> Result<ReplaceOutcome> {
264    let occupied = internal.occupied_bytes();
265    let underflow = occupied < MIN_OCCUPIED_BYTES;
266    push_freed(freed, old_id)?;
267    let new_id = write_new_node(pager, internal)?;
268    Ok(ReplaceOutcome { new_id, underflow })
269}
270
271/// Rebalance an underflowing child of `internal` at `child_index`.
272/// Tries to merge first (one fewer page in the tree, better for
273/// cache locality); falls back to borrow if merge would overflow.
274///
275/// # Residual-underflow policy (#64) — best-effort, bounded, NOT a
276/// correctness bug
277///
278/// In four ordered attempts (merge-right, merge-left, borrow-right,
279/// borrow-left) this resolves the underflow in every ordinary case.
280/// A *residual* underflow — where ALL FOUR attempts decline — can
281/// only arise with variable-length keys, and only when both of these
282/// hold simultaneously for every available sibling:
283///
284/// 1. **Merge declines**: combined `occupied_bytes` of the child, the
285///    sibling, and (for internal nodes) the descended separator pivot
286///    would exceed `PAYLOAD_BYTES` — i.e. the sibling is too FULL to
287///    absorb the child; and
288/// 2. **Borrow declines**: any of — the sibling cannot spare a slot and
289///    stay `>= MIN_OCCUPIED_BYTES` ([`sibling_has_spare`]); moving its
290///    boundary slot up would install a separator pivot LARGER than the
291///    one it replaces and push the PARENT past `PAYLOAD_BYTES`
292///    ([`parent_fits_after_pivot_swap`]) — the "borrow-grows-parent"
293///    hazard the #29 large-key oracle pinned; or (issue #137) the
294///    single borrowed slot is large enough that the receiving child
295///    would itself exceed `PAYLOAD_BYTES` ([`child_fits_after_borrow`]).
296///
297/// When that conjunction holds we deliberately leave the child below
298/// `MIN_OCCUPIED_BYTES`. This is sound:
299///
300/// - **Correctness is preserved.** The child still holds every key it
301///   should; the parent's pivots and child pointers stay consistent;
302///   `get` / range-scan / subsequent `insert` and `delete` all remain
303///   correct. The B+tree oracle (`btree_oracle`, `index_oracle`) only
304///   observes externally-visible state and would not — and must not —
305///   change. The ONLY degraded property is the *minimum-occupancy*
306///   guarantee on that one node.
307/// - **The degradation is bounded.** At most one node per
308///   delete-rebalance bubble-up frame can be left underflowing, and a
309///   later `insert` into that subtree, or a `delete` that makes a
310///   sibling spare a slot, repairs the shape. It does not compound
311///   into emptiness: a node that reaches zero slots is handled by the
312///   merge path (its sibling can always absorb a zero-occupancy node),
313///   and the root-collapse path handles a single-child root.
314///
315/// Implementing a full cascade here (recursively splitting the
316/// over-full sibling, or borrow-then-rebalance-the-parent) was
317/// considered and rejected for this change: it is materially more
318/// invasive and risks a B-tree CORRECTNESS regression, which is a far
319/// worse failure mode than a transient occupancy dip. The
320/// `delete_rebalance_residual_underflow_is_bounded_and_correct` test
321/// makes the degradation visible and asserts it stays bounded.
322fn rebalance_under_parent<F: FileBackend>(
323    pager: &mut Pager<F>,
324    internal: &mut DecodedNode,
325    child_index: usize,
326    freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 4 }>,
327) -> Result<()> {
328    // Try merge first: prefer the right sibling.
329    if child_index + 1 < internal.children.len()
330        && try_merge(pager, internal, child_index, MergeDirection::Right, freed)?
331    {
332        return Ok(());
333    }
334    if child_index > 0 && try_merge(pager, internal, child_index, MergeDirection::Left, freed)? {
335        return Ok(());
336    }
337    // Merge couldn't fit; fall back to a borrow. Borrow is best-
338    // effort: if it does not occur the underflow persists for now
339    // (see the residual-underflow policy in the doc comment above).
340    if child_index + 1 < internal.children.len()
341        && try_borrow_from_right(pager, internal, child_index, freed)?
342    {
343        return Ok(());
344    }
345    if child_index > 0 && try_borrow_from_left(pager, internal, child_index, freed)? {
346        return Ok(());
347    }
348    Ok(())
349}
350
351#[derive(Debug, Clone, Copy)]
352enum MergeDirection {
353    Left,
354    Right,
355}
356
357/// Attempt to merge with the indicated sibling. Returns `Ok(true)`
358/// if the merge happened; `Ok(false)` if combined sizes would not
359/// fit in a single page and the merge was abandoned.
360fn try_merge<F: FileBackend>(
361    pager: &mut Pager<F>,
362    internal: &mut DecodedNode,
363    child_index: usize,
364    direction: MergeDirection,
365    freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 4 }>,
366) -> Result<bool> {
367    let (left_idx, right_idx) = match direction {
368        MergeDirection::Right => (child_index, child_index + 1),
369        MergeDirection::Left => (child_index - 1, child_index),
370    };
371    let left_id = pid(internal.children[left_idx])?;
372    let right_id = pid(internal.children[right_idx])?;
373    let left = read_node(pager, left_id)?;
374    let right = read_node(pager, right_id)?;
375    let separator_bytes = match left.kind {
376        NodeKind::Leaf => 0,
377        NodeKind::Internal => {
378            use crate::btree::node::{varint_len, INTERNAL_SLOT_BYTES};
379            let pivot = &internal.internals[left_idx].key;
380            INTERNAL_SLOT_BYTES + varint_len(pivot.len() as u64) + pivot.len()
381        }
382    };
383    let combined = left.occupied_bytes() + right.occupied_bytes() + separator_bytes
384        - match left.kind {
385            NodeKind::Internal => crate::btree::node::INTERNAL_LEFTMOST_CHILD_BYTES,
386            NodeKind::Leaf => 0,
387        };
388    if combined > PAYLOAD_BYTES {
389        return Ok(false);
390    }
391    drop(left);
392    drop(right);
393    match direction {
394        MergeDirection::Right => merge_with_right(pager, internal, child_index, freed)?,
395        MergeDirection::Left => merge_with_left(pager, internal, child_index, freed)?,
396    }
397    Ok(true)
398}
399
400/// Attempt to borrow one slot from the right sibling. Returns
401/// `Ok(true)` if the borrow occurred.
402///
403/// A borrow REPLACES the separator pivot in the parent: the new
404/// pivot can be larger than the old one, growing the parent. If
405/// that growth would push the parent past `PAYLOAD_BYTES` the borrow
406/// is abandoned (returns `Ok(false)`) so the caller can fall through
407/// to the "leave the underflowing child as-is" branch — better than
408/// raising a hard encode error for a delete-rebalance edge case.
409fn try_borrow_from_right<F: FileBackend>(
410    pager: &mut Pager<F>,
411    internal: &mut DecodedNode,
412    child_index: usize,
413    freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 4 }>,
414) -> Result<bool> {
415    let child_id = pid(internal.children[child_index])?;
416    let right_id = pid(internal.children[child_index + 1])?;
417    let mut child = read_node(pager, child_id)?;
418    let mut right = read_node(pager, right_id)?;
419    if !sibling_has_spare(&right) {
420        return Ok(false);
421    }
422    let new_pivot = pick_borrow_right_pivot(&right, child.kind)?;
423    if !parent_fits_after_pivot_swap(internal, child_index, &new_pivot) {
424        return Ok(false);
425    }
426    // Issue #137: a borrow moves ONE slot INTO the (underflowing)
427    // child. With variable-length entries that one slot can be large
428    // enough to push the child past `PAYLOAD_BYTES` and produce a
429    // "slot dir and heap collide" encode error. If the receiving
430    // child would overflow, decline the borrow (fall through to the
431    // bounded residual-underflow policy) rather than corrupt.
432    let moved_bytes = borrow_right_moved_bytes(internal, &right, child.kind, child_index)?;
433    if !child_fits_after_borrow(&child, moved_bytes) {
434        return Ok(false);
435    }
436    apply_borrow_from_right(internal, &mut child, &mut right, child_index, &new_pivot)?;
437    push_freed(freed, child_id)?;
438    push_freed(freed, right_id)?;
439    internal.children[child_index] = write_new_node(pager, &child)?.get();
440    internal.children[child_index + 1] = write_new_node(pager, &right)?.get();
441    Ok(true)
442}
443
444/// Choose the key that will be promoted from the right sibling to
445/// become the parent's new separator pivot.
446fn pick_borrow_right_pivot(right: &DecodedNode, child_kind: NodeKind) -> Result<Vec<u8>> {
447    match (child_kind, right.kind) {
448        (NodeKind::Leaf, NodeKind::Leaf) => {
449            right
450                .leaves
451                .get(1)
452                .map(|e| e.key.clone())
453                .ok_or(Error::BTreeInvariantViolated {
454                    reason: "borrow_from_right: right leaf has < 2 entries",
455                })
456        }
457        (NodeKind::Internal, NodeKind::Internal) => right
458            .internals
459            .first()
460            .map(|p| p.key.clone())
461            .ok_or(Error::BTreeInvariantViolated {
462                reason: "borrow_from_right: right internal has no pivots",
463            }),
464        _ => Err(Error::BTreeInvariantViolated {
465            reason: "borrow_from_right: mixed kinds",
466        }),
467    }
468}
469
470/// Mutate `child`, `right`, and `internal` in place to complete the
471/// borrow once the parent-fit check has passed.
472fn apply_borrow_from_right(
473    internal: &mut DecodedNode,
474    child: &mut DecodedNode,
475    right: &mut DecodedNode,
476    child_index: usize,
477    new_pivot: &[u8],
478) -> Result<()> {
479    match (child.kind, right.kind) {
480        (NodeKind::Leaf, NodeKind::Leaf) => {
481            let entry = right.leaves.remove(0);
482            child.leaves.push(entry);
483        }
484        (NodeKind::Internal, NodeKind::Internal) => {
485            let pivot_down = internal.internals[child_index].key.clone();
486            right.internals.remove(0);
487            let moved_child = right.children.remove(0);
488            child.internals.push(InternalEntry { key: pivot_down });
489            child.children.push(moved_child);
490        }
491        _ => {
492            return Err(Error::BTreeInvariantViolated {
493                reason: "borrow_from_right: mixed kinds",
494            });
495        }
496    }
497    internal.internals[child_index].key = new_pivot.to_vec();
498    Ok(())
499}
500
501/// Attempt to borrow one slot from the left sibling. Same parent-
502/// fit check as [`try_borrow_from_right`].
503fn try_borrow_from_left<F: FileBackend>(
504    pager: &mut Pager<F>,
505    internal: &mut DecodedNode,
506    child_index: usize,
507    freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 4 }>,
508) -> Result<bool> {
509    let child_id = pid(internal.children[child_index])?;
510    let left_id = pid(internal.children[child_index - 1])?;
511    let mut child = read_node(pager, child_id)?;
512    let mut left = read_node(pager, left_id)?;
513    if !sibling_has_spare(&left) {
514        return Ok(false);
515    }
516    let new_pivot = pick_borrow_left_pivot(&left, child.kind)?;
517    if !parent_fits_after_pivot_swap(internal, child_index - 1, &new_pivot) {
518        return Ok(false);
519    }
520    // Issue #137: see `try_borrow_from_right` — decline if the moved
521    // slot would push the receiving child past `PAYLOAD_BYTES`.
522    let moved_bytes = borrow_left_moved_bytes(internal, &left, child.kind, child_index)?;
523    if !child_fits_after_borrow(&child, moved_bytes) {
524        return Ok(false);
525    }
526    apply_borrow_from_left(internal, &mut child, &mut left, child_index, &new_pivot)?;
527    push_freed(freed, child_id)?;
528    push_freed(freed, left_id)?;
529    internal.children[child_index - 1] = write_new_node(pager, &left)?.get();
530    internal.children[child_index] = write_new_node(pager, &child)?.get();
531    Ok(true)
532}
533
534fn pick_borrow_left_pivot(left: &DecodedNode, child_kind: NodeKind) -> Result<Vec<u8>> {
535    match (child_kind, left.kind) {
536        (NodeKind::Leaf, NodeKind::Leaf) => {
537            left.leaves
538                .last()
539                .map(|e| e.key.clone())
540                .ok_or(Error::BTreeInvariantViolated {
541                    reason: "borrow_from_left: empty left leaf",
542                })
543        }
544        (NodeKind::Internal, NodeKind::Internal) => left
545            .internals
546            .last()
547            .map(|p| p.key.clone())
548            .ok_or(Error::BTreeInvariantViolated {
549                reason: "borrow_from_left: empty left pivots",
550            }),
551        _ => Err(Error::BTreeInvariantViolated {
552            reason: "borrow_from_left: mixed kinds",
553        }),
554    }
555}
556
557fn apply_borrow_from_left(
558    internal: &mut DecodedNode,
559    child: &mut DecodedNode,
560    left: &mut DecodedNode,
561    child_index: usize,
562    new_pivot: &[u8],
563) -> Result<()> {
564    match (child.kind, left.kind) {
565        (NodeKind::Leaf, NodeKind::Leaf) => {
566            let entry = left.leaves.pop().ok_or(Error::BTreeInvariantViolated {
567                reason: "borrow_from_left: empty left leaf",
568            })?;
569            child.leaves.insert(0, entry);
570        }
571        (NodeKind::Internal, NodeKind::Internal) => {
572            let pivot_down = internal.internals[child_index - 1].key.clone();
573            left.internals.pop().ok_or(Error::BTreeInvariantViolated {
574                reason: "borrow_from_left: empty left pivots",
575            })?;
576            let moved_child = left.children.pop().ok_or(Error::BTreeInvariantViolated {
577                reason: "borrow_from_left: empty left children",
578            })?;
579            child.internals.insert(0, InternalEntry { key: pivot_down });
580            child.children.insert(0, moved_child);
581        }
582        _ => {
583            return Err(Error::BTreeInvariantViolated {
584                reason: "borrow_from_left: mixed kinds",
585            });
586        }
587    }
588    internal.internals[child_index - 1].key = new_pivot.to_vec();
589    Ok(())
590}
591
592/// Compute whether the parent internal node would still fit in
593/// `PAYLOAD_BYTES` after swapping the pivot at `pivot_index` for a
594/// new pivot of `new_key`. Borrow may grow the parent if
595/// `new_key.len() + varint_len(new_key.len())` exceeds the same for
596/// the old pivot.
597fn parent_fits_after_pivot_swap(parent: &DecodedNode, pivot_index: usize, new_key: &[u8]) -> bool {
598    use crate::btree::node::varint_len;
599    let Some(old_pivot) = parent.internals.get(pivot_index) else {
600        return false;
601    };
602    let old_entry = varint_len(old_pivot.key.len() as u64) + old_pivot.key.len();
603    let new_entry = varint_len(new_key.len() as u64) + new_key.len();
604    let parent_occupied = parent.occupied_bytes();
605    parent_occupied + new_entry <= PAYLOAD_BYTES + old_entry
606}
607
608/// Whether the receiving child stays within `PAYLOAD_BYTES` after a
609/// borrow appends/prepends a slot of `moved_bytes`. Issue #137: with
610/// variable-length entries a single borrowed slot can be large enough
611/// to overflow the (previously underflowing) child; declining the
612/// borrow in that case preserves the no-overflow encode invariant.
613fn child_fits_after_borrow(child: &DecodedNode, moved_bytes: usize) -> bool {
614    child.occupied_bytes().saturating_add(moved_bytes) <= PAYLOAD_BYTES
615}
616
617/// Byte footprint the right-borrow moves INTO the child: a leaf gains
618/// `right.leaves[0]`; an internal gains the descended separator pivot
619/// (`internal.internals[child_index]`) plus one child pointer.
620fn borrow_right_moved_bytes(
621    internal: &DecodedNode,
622    right: &DecodedNode,
623    child_kind: NodeKind,
624    child_index: usize,
625) -> Result<usize> {
626    match (child_kind, right.kind) {
627        (NodeKind::Leaf, NodeKind::Leaf) => {
628            let e = right.leaves.first().ok_or(Error::BTreeInvariantViolated {
629                reason: "borrow_right_moved_bytes: empty right leaf",
630            })?;
631            Ok(leaf_slot_bytes(e))
632        }
633        (NodeKind::Internal, NodeKind::Internal) => {
634            let pivot =
635                internal
636                    .internals
637                    .get(child_index)
638                    .ok_or(Error::BTreeInvariantViolated {
639                        reason: "borrow_right_moved_bytes: missing separator pivot",
640                    })?;
641            Ok(internal_slot_bytes(&pivot.key))
642        }
643        _ => Err(Error::BTreeInvariantViolated {
644            reason: "borrow_right_moved_bytes: mixed kinds",
645        }),
646    }
647}
648
649/// Byte footprint the left-borrow moves INTO the child: a leaf gains
650/// `left.leaves.last()`; an internal gains the descended separator
651/// pivot (`internal.internals[child_index - 1]`) plus a child pointer.
652fn borrow_left_moved_bytes(
653    internal: &DecodedNode,
654    left: &DecodedNode,
655    child_kind: NodeKind,
656    child_index: usize,
657) -> Result<usize> {
658    match (child_kind, left.kind) {
659        (NodeKind::Leaf, NodeKind::Leaf) => {
660            let e = left.leaves.last().ok_or(Error::BTreeInvariantViolated {
661                reason: "borrow_left_moved_bytes: empty left leaf",
662            })?;
663            Ok(leaf_slot_bytes(e))
664        }
665        (NodeKind::Internal, NodeKind::Internal) => {
666            let pivot =
667                internal
668                    .internals
669                    .get(child_index - 1)
670                    .ok_or(Error::BTreeInvariantViolated {
671                        reason: "borrow_left_moved_bytes: missing separator pivot",
672                    })?;
673            Ok(internal_slot_bytes(&pivot.key))
674        }
675        _ => Err(Error::BTreeInvariantViolated {
676            reason: "borrow_left_moved_bytes: mixed kinds",
677        }),
678    }
679}
680
681/// Encoded byte footprint of one leaf slot (directory entry + heap
682/// entry). Matches the per-entry term of `DecodedNode::occupied_bytes`.
683fn leaf_slot_bytes(e: &LeafEntry) -> usize {
684    use crate::btree::node::{varint_len, LEAF_SLOT_BYTES};
685    LEAF_SLOT_BYTES
686        + varint_len(e.key.len() as u64)
687        + e.key.len()
688        + varint_len(e.value.len() as u64)
689        + e.value.len()
690}
691
692/// Encoded byte footprint of one internal slot (directory entry +
693/// heap pivot key). The child pointer lives in the fixed-width slot
694/// directory, so no separate accounting is needed.
695fn internal_slot_bytes(key: &[u8]) -> usize {
696    use crate::btree::node::{varint_len, INTERNAL_SLOT_BYTES};
697    INTERNAL_SLOT_BYTES + varint_len(key.len() as u64) + key.len()
698}
699
700/// Merge the child at `child_index` with its right sibling.
701/// Removes the separator pivot and the right child id from the
702/// parent.
703fn merge_with_right<F: FileBackend>(
704    pager: &mut Pager<F>,
705    internal: &mut DecodedNode,
706    child_index: usize,
707    freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 4 }>,
708) -> Result<()> {
709    let child_id = pid(internal.children[child_index])?;
710    let right_id = pid(internal.children[child_index + 1])?;
711    let mut child = read_node(pager, child_id)?;
712    let mut right = read_node(pager, right_id)?;
713    match (child.kind, right.kind) {
714        (NodeKind::Leaf, NodeKind::Leaf) => {
715            child.leaves.append(&mut right.leaves);
716            child.next_sibling = right.next_sibling;
717        }
718        (NodeKind::Internal, NodeKind::Internal) => {
719            let pivot_down = internal.internals[child_index].key.clone();
720            child.internals.push(InternalEntry { key: pivot_down });
721            child.internals.append(&mut right.internals);
722            child.children.append(&mut right.children);
723        }
724        _ => {
725            return Err(Error::BTreeInvariantViolated {
726                reason: "merge_with_right: mixed kinds",
727            });
728        }
729    }
730    internal.internals.remove(child_index);
731    internal.children.remove(child_index + 1);
732    push_freed(freed, child_id)?;
733    push_freed(freed, right_id)?;
734    internal.children[child_index] = write_new_node(pager, &child)?.get();
735    Ok(())
736}
737
738/// Merge the child at `child_index` with its left sibling. The
739/// merged node lives in the left's slot; the parent's separator
740/// pivot and the right (current) child id are removed.
741fn merge_with_left<F: FileBackend>(
742    pager: &mut Pager<F>,
743    internal: &mut DecodedNode,
744    child_index: usize,
745    freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 4 }>,
746) -> Result<()> {
747    let left_id = pid(internal.children[child_index - 1])?;
748    let child_id = pid(internal.children[child_index])?;
749    let mut left = read_node(pager, left_id)?;
750    let mut child = read_node(pager, child_id)?;
751    match (left.kind, child.kind) {
752        (NodeKind::Leaf, NodeKind::Leaf) => {
753            left.leaves.append(&mut child.leaves);
754            left.next_sibling = child.next_sibling;
755        }
756        (NodeKind::Internal, NodeKind::Internal) => {
757            let pivot_down = internal.internals[child_index - 1].key.clone();
758            left.internals.push(InternalEntry { key: pivot_down });
759            left.internals.append(&mut child.internals);
760            left.children.append(&mut child.children);
761        }
762        _ => {
763            return Err(Error::BTreeInvariantViolated {
764                reason: "merge_with_left: mixed kinds",
765            });
766        }
767    }
768    internal.internals.remove(child_index - 1);
769    internal.children.remove(child_index);
770    push_freed(freed, left_id)?;
771    push_freed(freed, child_id)?;
772    internal.children[child_index - 1] = write_new_node(pager, &left)?.get();
773    Ok(())
774}
775
776fn sibling_has_spare(node: &DecodedNode) -> bool {
777    // We borrow if removing one slot keeps the sibling at or above
778    // the underflow threshold. Using `occupied_bytes` as the size
779    // proxy means we always check after a hypothetical removal.
780    let one_slot_bytes = one_slot_size(node);
781    let after_borrow = node.occupied_bytes().saturating_sub(one_slot_bytes);
782    after_borrow >= MIN_OCCUPIED_BYTES
783}
784
785/// Approximate the byte footprint of one slot in `node`. Used by
786/// the underflow / borrow heuristic.
787fn one_slot_size(node: &DecodedNode) -> usize {
788    use crate::btree::node::{varint_len, INTERNAL_SLOT_BYTES, LEAF_SLOT_BYTES};
789    match node.kind {
790        NodeKind::Leaf => {
791            let Some(e) = node.leaves.first() else {
792                return 0;
793            };
794            LEAF_SLOT_BYTES
795                + varint_len(e.key.len() as u64)
796                + e.key.len()
797                + varint_len(e.value.len() as u64)
798                + e.value.len()
799        }
800        NodeKind::Internal => {
801            let Some(e) = node.internals.first() else {
802                return 0;
803            };
804            INTERNAL_SLOT_BYTES + varint_len(e.key.len() as u64) + e.key.len()
805        }
806    }
807}
808
809fn read_node<F: FileBackend>(pager: &mut Pager<F>, id: PageId) -> Result<DecodedNode> {
810    let page_ref = pager.read_page(id)?;
811    decode_node(page_ref.as_bytes())
812}
813
814fn pid(raw: u64) -> Result<PageId> {
815    PageId::new(raw).ok_or(Error::BTreeInvariantViolated {
816        reason: "child page-id was zero",
817    })
818}
819
820// Keep the LeafEntry import referenced even when no leaf-specific
821// helper actually constructs one (every leaf entry is taken from a
822// decoded node and moved/cloned around).
823const _: fn(LeafEntry) = drop;
824
825#[cfg(test)]
826mod tests {
827    use super::*;
828    use crate::pager::{Config, Pager};
829    use crate::platform::FileHandle;
830
831    use rand::Rng;
832    use rand::SeedableRng;
833    use rand_chacha::ChaCha8Rng;
834    use std::collections::BTreeMap;
835
836    fn config() -> Config {
837        Config::default()
838    }
839
840    #[test]
841    fn delete_absent_returns_false() {
842        let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
843        let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
844        let removed = tree.delete(&mut pager, b"missing").expect("del");
845        assert!(!removed);
846    }
847
848    #[test]
849    fn delete_single_key_round_trip() {
850        let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
851        let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
852        tree.insert(&mut pager, b"k", b"v").expect("ins");
853        assert!(tree.delete(&mut pager, b"k").expect("del"));
854        assert_eq!(tree.get(&mut pager, b"k").expect("get"), None);
855    }
856
857    #[test]
858    fn delete_collapses_tall_tree_back_to_one_level() {
859        let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
860        let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
861        let value = vec![0xCDu8; 256];
862        for i in 0..200u32 {
863            let key = format!("key-{i:08}");
864            tree.insert(&mut pager, key.as_bytes(), &value)
865                .expect("ins");
866        }
867        // Confirm we grew tall.
868        let root = tree.root();
869        let level_before = {
870            let pr = pager.read_page(root).expect("read");
871            decode_node(pr.as_bytes()).expect("dec").level
872        };
873        assert!(level_before >= 1);
874        // Delete all but one key.
875        for i in 0..199u32 {
876            let key = format!("key-{i:08}");
877            assert!(tree.delete(&mut pager, key.as_bytes()).expect("del"));
878        }
879        let final_root = tree.root();
880        let level_after = {
881            let pr = pager.read_page(final_root).expect("read");
882            decode_node(pr.as_bytes()).expect("dec").level
883        };
884        assert!(
885            level_after < level_before,
886            "tree should have collapsed (before {level_before}, after {level_after})"
887        );
888        let leftover_key = format!("key-{i:08}", i = 199);
889        assert_eq!(
890            tree.get(&mut pager, leftover_key.as_bytes()).expect("get"),
891            Some(value.clone())
892        );
893    }
894
895    #[test]
896    fn insert_delete_oracle_10k() {
897        for seed in 0..3u64 {
898            run_oracle(seed, 10_000);
899        }
900    }
901
902    /// Regression for the borrow-grows-parent bug surfaced by the
903    /// 1M-op oracle (#29). With random keys up to 64 bytes a borrow
904    /// during delete-rebalance can install a new separator pivot in
905    /// the parent that is much larger than the one it replaced,
906    /// pushing the parent past `PAYLOAD_BYTES` and causing the
907    /// subsequent encode to fail with the "slot dir and heap
908    /// collide" invariant. Drive 30 000 mixed insert/delete ops with
909    /// the same key-length distribution as #29 to flush out the
910    /// edge case without paying the 1M-op cost in the unit-test
911    /// harness.
912    #[test]
913    fn insert_delete_oracle_large_keys_30k() {
914        for seed in 0..3u64 {
915            run_oracle_with(seed, 30_000, random_key_up_to_64, random_value_up_to_256);
916        }
917    }
918
919    fn random_key_up_to_64(rng: &mut ChaCha8Rng) -> Vec<u8> {
920        let len = rng.random_range(1u32..=64);
921        (0..len).map(|_| rng.random_range(b'a'..=b'z')).collect()
922    }
923
924    fn random_value_up_to_256(rng: &mut ChaCha8Rng) -> Vec<u8> {
925        let len = rng.random_range(0u32..=256);
926        (0..len).map(|_| rng.random()).collect()
927    }
928
929    fn run_oracle_with(
930        seed: u64,
931        ops: usize,
932        gen_key: fn(&mut ChaCha8Rng) -> Vec<u8>,
933        gen_value: fn(&mut ChaCha8Rng) -> Vec<u8>,
934    ) {
935        let mut rng = ChaCha8Rng::seed_from_u64(seed);
936        let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
937        let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
938        let mut oracle: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
939        for op in 0..ops {
940            // Balance insert/delete so the tree grows tall (>= 2
941            // levels) and the parent-fit edge case is exercised.
942            let pick = rng.random_range(0u32..3);
943            if pick < 2 {
944                let key = gen_key(&mut rng);
945                let value = gen_value(&mut rng);
946                if let std::collections::btree_map::Entry::Vacant(slot) = oracle.entry(key.clone())
947                {
948                    tree.insert(&mut pager, &key, &value)
949                        .unwrap_or_else(|e| panic!("seed {seed} op {op} ins: {e:?}"));
950                    slot.insert(value);
951                }
952            } else if !oracle.is_empty() {
953                let pick_in_oracle = rng.random_range(0u32..5) > 0;
954                let candidate = if pick_in_oracle {
955                    let n = oracle.len();
956                    let i = rng.random_range(0..n);
957                    oracle.keys().nth(i).cloned().unwrap_or_default()
958                } else {
959                    gen_key(&mut rng)
960                };
961                let want = oracle.remove(&candidate).is_some();
962                let got = tree
963                    .delete(&mut pager, &candidate)
964                    .unwrap_or_else(|e| panic!("seed {seed} op {op} del: {e:?}"));
965                assert_eq!(got, want, "seed {seed} op {op}: delete-presence disagrees");
966            }
967        }
968        // Final consistency: every oracle key resolves.
969        for (k, v) in &oracle {
970            assert_eq!(
971                tree.get(&mut pager, k).expect("get").as_ref(),
972                Some(v),
973                "seed {seed} final: key {k:?}"
974            );
975        }
976    }
977
978    fn run_oracle(seed: u64, ops: usize) {
979        let mut rng = ChaCha8Rng::seed_from_u64(seed);
980        let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
981        let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
982        let mut oracle: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
983        for op in 0..ops {
984            let pick = rng.random_range(0u32..3);
985            if pick < 2 {
986                oracle_step_insert(&mut pager, &mut tree, &mut oracle, &mut rng, seed, op);
987            } else {
988                oracle_step_delete(&mut pager, &mut tree, &mut oracle, &mut rng, seed, op);
989            }
990            if op.is_multiple_of(257) {
991                oracle_sample_check(&mut pager, &tree, &oracle, seed, op);
992            }
993        }
994        // Final pass: every oracle key resolves; tree returns None
995        // for a few synthesized absent keys.
996        for (k, v) in &oracle {
997            assert_eq!(
998                tree.get(&mut pager, k).expect("get").as_ref(),
999                Some(v),
1000                "seed {seed} final: key {k:?}"
1001            );
1002        }
1003    }
1004
1005    fn oracle_step_insert(
1006        pager: &mut Pager<FileHandle>,
1007        tree: &mut BTree<FileHandle>,
1008        oracle: &mut BTreeMap<Vec<u8>, Vec<u8>>,
1009        rng: &mut ChaCha8Rng,
1010        seed: u64,
1011        op: usize,
1012    ) {
1013        let key = random_key(rng);
1014        let value = random_value(rng);
1015        match oracle.entry(key.clone()) {
1016            std::collections::btree_map::Entry::Occupied(existing) => {
1017                assert_eq!(
1018                    tree.get(pager, &key).expect("get").as_ref(),
1019                    Some(existing.get()),
1020                    "seed {seed} op {op}: existing key disagrees"
1021                );
1022            }
1023            std::collections::btree_map::Entry::Vacant(slot) => {
1024                tree.insert(pager, &key, &value)
1025                    .unwrap_or_else(|e| panic!("seed {seed} op {op} ins: {e:?}"));
1026                slot.insert(value);
1027            }
1028        }
1029    }
1030
1031    fn oracle_step_delete(
1032        pager: &mut Pager<FileHandle>,
1033        tree: &mut BTree<FileHandle>,
1034        oracle: &mut BTreeMap<Vec<u8>, Vec<u8>>,
1035        rng: &mut ChaCha8Rng,
1036        seed: u64,
1037        op: usize,
1038    ) {
1039        let candidate = if !oracle.is_empty() && rng.random_range(0u32..4) > 0 {
1040            let n = oracle.len();
1041            let pick = rng.random_range(0..n);
1042            oracle.keys().nth(pick).cloned().unwrap_or_default()
1043        } else {
1044            random_key(rng)
1045        };
1046        let want = oracle.remove(&candidate).is_some();
1047        let got = tree
1048            .delete(pager, &candidate)
1049            .unwrap_or_else(|e| panic!("seed {seed} op {op} del: {e:?}"));
1050        assert_eq!(got, want, "seed {seed} op {op}: delete-presence disagrees");
1051    }
1052
1053    fn oracle_sample_check(
1054        pager: &mut Pager<FileHandle>,
1055        tree: &BTree<FileHandle>,
1056        oracle: &BTreeMap<Vec<u8>, Vec<u8>>,
1057        seed: u64,
1058        op: usize,
1059    ) {
1060        let mut sample_keys: Vec<&Vec<u8>> = oracle.keys().take(5).collect();
1061        sample_keys.extend(oracle.keys().rev().take(5));
1062        for k in sample_keys {
1063            assert_eq!(
1064                tree.get(pager, k).expect("get").as_ref(),
1065                oracle.get(k),
1066                "seed {seed} op {op}: mid-run get disagrees on key {k:?}"
1067            );
1068        }
1069    }
1070
1071    fn random_key(rng: &mut ChaCha8Rng) -> Vec<u8> {
1072        let len = rng.random_range(1..16);
1073        (0..len).map(|_| rng.random_range(b'a'..=b'z')).collect()
1074    }
1075
1076    fn random_value(rng: &mut ChaCha8Rng) -> Vec<u8> {
1077        let len = rng.random_range(0..64);
1078        (0..len).map(|_| rng.random()).collect()
1079    }
1080
1081    /// Result of walking every node in a tree once.
1082    struct OccupancyWalk {
1083        nodes: usize,
1084        /// Non-root nodes left below `MIN_OCCUPIED_BYTES` — the #64
1085        /// residual-underflow population. Bounded and made visible
1086        /// here so a future regression that lets it grow unbounded
1087        /// (or that turns it into a correctness bug) is caught.
1088        residual_underflows: usize,
1089    }
1090
1091    /// Walk the whole tree from `root` with an explicit stack
1092    /// (Power-of-ten Rule 1: no recursion; Rule 2: the visit count is
1093    /// bounded by `pager.page_count()`, which strictly exceeds the
1094    /// node count). Counts nodes and non-root nodes that sit below the
1095    /// min-occupancy threshold. Empty/zero-slot leaves are NOT counted
1096    /// as underflows: an empty leaf root is the legitimate empty-tree
1097    /// state, and the merge path keeps non-root leaves from stranding
1098    /// at zero.
1099    fn walk_occupancy(pager: &mut Pager<FileHandle>, root: PageId) -> OccupancyWalk {
1100        let mut stack: Vec<(PageId, bool)> = vec![(root, true)];
1101        let mut nodes = 0usize;
1102        let mut residual_underflows = 0usize;
1103        let visit_budget = usize::try_from(pager.page_count())
1104            .unwrap_or(usize::MAX)
1105            .saturating_add(1);
1106        let mut visited = 0usize;
1107        while let Some((id, is_root)) = stack.pop() {
1108            visited += 1;
1109            assert!(
1110                visited <= visit_budget,
1111                "occupancy walk exceeded node bound"
1112            );
1113            let node = {
1114                let pr = pager.read_page(id).expect("read node");
1115                decode_node(pr.as_bytes()).expect("decode node")
1116            };
1117            nodes += 1;
1118            let occupied = node.occupied_bytes();
1119            let empty = match node.kind {
1120                NodeKind::Leaf => node.leaves.is_empty(),
1121                NodeKind::Internal => node.internals.is_empty(),
1122            };
1123            if !is_root && !empty && occupied < MIN_OCCUPIED_BYTES {
1124                residual_underflows += 1;
1125            }
1126            if matches!(node.kind, NodeKind::Internal) {
1127                for &child in &node.children {
1128                    let cid = PageId::new(child).expect("child id nonzero");
1129                    stack.push((cid, false));
1130                }
1131            }
1132        }
1133        OccupancyWalk {
1134            nodes,
1135            residual_underflows,
1136        }
1137    }
1138
1139    /// #64: after a delete-heavy, large-key workload (the distribution
1140    /// most likely to trip the merge-and-borrow-both-decline residual
1141    /// underflow), assert that:
1142    ///
1143    /// 1. the tree is still fully CORRECT — every surviving key
1144    ///    resolves to its expected value (this is the invariant a
1145    ///    cascade-handling regression must never break); and
1146    /// 2. the residual-underflow degradation is BOUNDED and VISIBLE —
1147    ///    the number of underflowing non-root nodes never exceeds the
1148    ///    node count (it cannot "leak" unboundedly) and is reported.
1149    ///
1150    /// The assertion is intentionally loose on the exact count (the
1151    /// snapshot shape is workload-dependent); its job is to make the
1152    /// degradation observable and to fail loudly if it ever stops
1153    /// being bounded by the node population.
1154    #[test]
1155    fn delete_rebalance_residual_underflow_is_bounded_and_correct() {
1156        for seed in 0..4u64 {
1157            let mut rng = ChaCha8Rng::seed_from_u64(seed);
1158            let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
1159            let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
1160            let mut oracle: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
1161            // Grow tall with large keys, then delete-heavy to exercise
1162            // rebalance.
1163            for op in 0..20_000usize {
1164                let deleting = op >= 6_000 && rng.random_range(0u32..3) > 0;
1165                if deleting && !oracle.is_empty() {
1166                    let n = oracle.len();
1167                    let i = rng.random_range(0..n);
1168                    let cand = oracle.keys().nth(i).cloned().unwrap_or_default();
1169                    let want = oracle.remove(&cand).is_some();
1170                    let got = tree
1171                        .delete(&mut pager, &cand)
1172                        .unwrap_or_else(|e| panic!("seed {seed} op {op} del: {e:?}"));
1173                    assert_eq!(got, want, "seed {seed} op {op}: delete presence");
1174                } else {
1175                    let key = random_key_up_to_64(&mut rng);
1176                    let value = random_value_up_to_256(&mut rng);
1177                    if let std::collections::btree_map::Entry::Vacant(slot) =
1178                        oracle.entry(key.clone())
1179                    {
1180                        tree.insert(&mut pager, &key, &value)
1181                            .unwrap_or_else(|e| panic!("seed {seed} op {op} ins: {e:?}"));
1182                        slot.insert(value);
1183                    }
1184                }
1185            }
1186            // (1) Correctness: every surviving key resolves.
1187            for (k, v) in &oracle {
1188                assert_eq!(
1189                    tree.get(&mut pager, k).expect("get").as_ref(),
1190                    Some(v),
1191                    "seed {seed}: surviving key must resolve: {k:?}",
1192                );
1193            }
1194            // (2) Bounded + visible degradation.
1195            let root = tree.root();
1196            let walk = walk_occupancy(&mut pager, root);
1197            eprintln!(
1198                "OCCUPANCY #64 seed {seed}: nodes={} residual_underflows={}",
1199                walk.nodes, walk.residual_underflows,
1200            );
1201            assert!(
1202                walk.residual_underflows <= walk.nodes,
1203                "seed {seed}: residual underflows ({}) must stay bounded by \
1204                 node count ({}) — an unbounded count signals a rebalance \
1205                 regression",
1206                walk.residual_underflows,
1207                walk.nodes,
1208            );
1209        }
1210    }
1211}