Skip to main content

obj_core/btree/
delete.rs

1//! B+tree delete path with merge / borrow rebalancing.
2//!
3//! Mirrors `insert.rs`: every node touched is rewritten to a fresh
4//! page; pre-delete pages enter the freelist only after the new
5//! root is staged. The caller still owns the commit boundary —
6//! `BTree::delete` does NOT call `Pager::commit`.
7//!
8//! # Underflow policy
9//!
10//! A non-root node is "underflowing" when its post-delete occupied
11//! byte count falls below `MIN_OCCUPIED_BYTES = PAYLOAD_BYTES / 2`.
12//! At underflow:
13//! - If a sibling is rich enough that **moving one slot from it**
14//!   keeps both halves above the threshold, borrow.
15//! - Otherwise merge with the sibling. The two halves combine into
16//!   a single page; the separator pivot in the parent is removed
17//!   (leaf merge) or absorbed (internal merge).
18//!
19//! Root collapse: an internal root with exactly one child (zero
20//! pivots) is replaced by its child. A leaf root with zero slots
21//! is allowed and is the bootstrap "empty tree" state.
22//!
23//! # Power-of-ten posture
24//!
25//! - **Rule 1.** Iterative descent + iterative bubble-up; no
26//!   recursion.
27//! - **Rule 2.** Every loop is bounded by `MAX_BTREE_DEPTH` or a
28//!   node's slot count.
29//! - **Rule 5.** `validate_node` runs on every encoded node.
30
31#![forbid(unsafe_code)]
32
33use heapless::Vec as HeaplessVec;
34
35use crate::btree::insert::write_new_node;
36use crate::btree::node::{
37    decode_node, DecodedNode, InternalEntry, LeafEntry, NodeKind, NODE_HEADER_SIZE,
38};
39use crate::btree::{BTree, MAX_BTREE_DEPTH};
40use crate::error::{Error, Result};
41use crate::pager::page::{PageId, PAGE_SIZE, PAGE_TRAILER_SIZE};
42use crate::pager::Pager;
43use crate::platform::FileBackend;
44
45/// Available payload bytes (excludes the node header and the page
46/// trailer).
47const PAYLOAD_BYTES: usize = PAGE_SIZE - PAGE_TRAILER_SIZE - NODE_HEADER_SIZE;
48
49/// Underflow threshold: half-full. A non-root node whose
50/// `occupied_bytes()` drops below this is rebalanced via borrow or
51/// merge. This is the smallest threshold that keeps the B+tree's
52/// `O(log_F n)` guarantees with a worst-case fanout of 2.
53const MIN_OCCUPIED_BYTES: usize = PAYLOAD_BYTES / 2;
54
55struct PathFrame {
56    page_id: PageId,
57    node: DecodedNode,
58    /// Child index this frame's descent followed.
59    child_index: usize,
60}
61
62/// Outcome of replacing one node along the delete path: a fresh
63/// page-id and an "underflow" hint the parent uses to decide
64/// whether to rebalance.
65#[derive(Debug, Clone, Copy)]
66struct ReplaceOutcome {
67    new_id: PageId,
68    /// Whether the new node falls below the underflow threshold.
69    /// The parent inspects this to decide whether to borrow or
70    /// merge.
71    underflow: bool,
72}
73
74impl<F: FileBackend> BTree<F> {
75    /// Remove `key` from the tree. Returns `Ok(true)` if it was
76    /// present (and removed), `Ok(false)` if the key was absent.
77    ///
78    /// # Errors
79    ///
80    /// Propagates pager / decode errors. Surfaces
81    /// `Error::BTreeDepthExceeded` if the tree height exceeds
82    /// `MAX_BTREE_DEPTH`. No `panic`s.
83    pub fn delete(&mut self, pager: &mut Pager<F>, key: &[u8]) -> Result<bool> {
84        let path = self.descend_path_for_delete(pager, key)?;
85        self.apply_delete(pager, path, key)
86    }
87
88    fn descend_path_for_delete(
89        &self,
90        pager: &mut Pager<F>,
91        key: &[u8],
92    ) -> Result<HeaplessVec<PathFrame, MAX_BTREE_DEPTH>> {
93        let mut path: HeaplessVec<PathFrame, MAX_BTREE_DEPTH> = HeaplessVec::new();
94        let mut current = self.root;
95        loop {
96            let decoded = {
97                let page_ref = pager.read_page(current)?;
98                decode_node(page_ref.as_bytes())?
99            };
100            match decoded.kind {
101                NodeKind::Leaf => {
102                    let frame = PathFrame {
103                        page_id: current,
104                        node: decoded,
105                        child_index: 0,
106                    };
107                    push_path(&mut path, frame)?;
108                    return Ok(path);
109                }
110                NodeKind::Internal => {
111                    let child_index = pivot_index(&decoded, key);
112                    let next = PageId::new(decoded.children[child_index]).ok_or(
113                        Error::BTreeInvariantViolated {
114                            reason: "internal node had zero child page-id",
115                        },
116                    )?;
117                    let frame = PathFrame {
118                        page_id: current,
119                        node: decoded,
120                        child_index,
121                    };
122                    push_path(&mut path, frame)?;
123                    current = next;
124                }
125            }
126        }
127    }
128
129    fn apply_delete(
130        &mut self,
131        pager: &mut Pager<F>,
132        mut path: HeaplessVec<PathFrame, MAX_BTREE_DEPTH>,
133        key: &[u8],
134    ) -> Result<bool> {
135        let mut freed: HeaplessVec<PageId, { MAX_BTREE_DEPTH * 4 }> = HeaplessVec::new();
136        let Some(leaf_frame) = path.pop() else {
137            return Err(Error::BTreeInvariantViolated {
138                reason: "delete: descend returned empty path",
139            });
140        };
141        let found = leaf_frame
142            .node
143            .leaves
144            .iter()
145            .any(|e| e.key.as_slice() == key);
146        if !found {
147            return Ok(false);
148        }
149        let leaf_outcome = remove_from_leaf(pager, leaf_frame, key, &mut freed)?;
150        let mut outcome = leaf_outcome;
151        while let Some(parent_frame) = path.pop() {
152            outcome = process_parent(pager, parent_frame, outcome, &mut freed, &mut path)?;
153        }
154        self.root = outcome.new_id;
155        // Try collapsing a single-child internal root into its child.
156        self.collapse_root(pager, &mut freed)?;
157        for old_id in freed.iter().copied() {
158            pager.free_page(old_id)?;
159        }
160        Ok(true)
161    }
162
163    /// If the current root is an internal node with exactly one
164    /// child (zero pivots), replace it with that child and free the
165    /// old root. Idempotent for any other root shape.
166    fn collapse_root(
167        &mut self,
168        pager: &mut Pager<F>,
169        freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 4 }>,
170    ) -> Result<()> {
171        loop {
172            let root_id = self.root;
173            let decoded = {
174                let page_ref = pager.read_page(root_id)?;
175                decode_node(page_ref.as_bytes())?
176            };
177            if !matches!(decoded.kind, NodeKind::Internal) {
178                return Ok(());
179            }
180            if !decoded.internals.is_empty() || decoded.children.len() != 1 {
181                return Ok(());
182            }
183            let only_child =
184                PageId::new(decoded.children[0]).ok_or(Error::BTreeInvariantViolated {
185                    reason: "collapse_root: zero child page-id",
186                })?;
187            self.root = only_child;
188            push_freed(freed, root_id)?;
189        }
190    }
191}
192
193fn push_path(path: &mut HeaplessVec<PathFrame, MAX_BTREE_DEPTH>, frame: PathFrame) -> Result<()> {
194    path.push(frame).map_err(|_| Error::BTreeDepthExceeded {
195        limit: MAX_BTREE_DEPTH,
196    })
197}
198
199fn push_freed(freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 4 }>, id: PageId) -> Result<()> {
200    freed.push(id).map_err(|_| Error::BTreeInvariantViolated {
201        reason: "delete: too many displaced pages to track",
202    })
203}
204
205fn pivot_index(node: &DecodedNode, key: &[u8]) -> usize {
206    let mut idx = node.internals.len();
207    for (i, pivot) in node.internals.iter().enumerate() {
208        if pivot.key.as_slice() > key {
209            idx = i;
210            break;
211        }
212    }
213    idx
214}
215
216/// Remove the slot whose key equals `key` from `frame.node` (a
217/// leaf), rewrite it as a fresh page, and report underflow.
218fn remove_from_leaf<F: FileBackend>(
219    pager: &mut Pager<F>,
220    frame: PathFrame,
221    key: &[u8],
222    freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 4 }>,
223) -> Result<ReplaceOutcome> {
224    let mut leaf = frame.node;
225    let Some(idx) = leaf.leaves.iter().position(|e| e.key.as_slice() == key) else {
226        return Err(Error::BTreeInvariantViolated {
227            reason: "remove_from_leaf: descend located absent key",
228        });
229    };
230    leaf.leaves.remove(idx);
231    let occupied = leaf.occupied_bytes();
232    let underflow = occupied < MIN_OCCUPIED_BYTES;
233    push_freed(freed, frame.page_id)?;
234    let new_id = write_new_node(pager, &leaf)?;
235    Ok(ReplaceOutcome { new_id, underflow })
236}
237
238/// Apply the child's outcome to a parent frame, possibly
239/// rebalancing on underflow.
240fn process_parent<F: FileBackend>(
241    pager: &mut Pager<F>,
242    frame: PathFrame,
243    child_outcome: ReplaceOutcome,
244    freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 4 }>,
245    _path: &mut HeaplessVec<PathFrame, MAX_BTREE_DEPTH>,
246) -> Result<ReplaceOutcome> {
247    let mut internal = frame.node;
248    let child_index = frame.child_index;
249    internal.children[child_index] = child_outcome.new_id.get();
250    if child_outcome.underflow {
251        rebalance_under_parent(pager, &mut internal, child_index, freed)?;
252    }
253    finalize_parent(pager, &internal, frame.page_id, freed)
254}
255
256/// Decide whether the rebalanced parent itself underflows, encode
257/// it, and return the outcome.
258fn finalize_parent<F: FileBackend>(
259    pager: &mut Pager<F>,
260    internal: &DecodedNode,
261    old_id: PageId,
262    freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 4 }>,
263) -> Result<ReplaceOutcome> {
264    let occupied = internal.occupied_bytes();
265    let underflow = occupied < MIN_OCCUPIED_BYTES;
266    push_freed(freed, old_id)?;
267    let new_id = write_new_node(pager, internal)?;
268    Ok(ReplaceOutcome { new_id, underflow })
269}
270
271/// Rebalance an underflowing child of `internal` at `child_index`.
272/// Tries to merge first (one fewer page in the tree, better for
273/// cache locality); falls back to borrow if merge would overflow.
274///
275/// # Residual-underflow policy (#64) — best-effort, bounded, NOT a
276/// correctness bug
277///
278/// In four ordered attempts (merge-right, merge-left, borrow-right,
279/// borrow-left) this resolves the underflow in every ordinary case.
280/// A *residual* underflow — where ALL FOUR attempts decline — can
281/// only arise with variable-length keys, and only when both of these
282/// hold simultaneously for every available sibling:
283///
284/// 1. **Merge declines**: combined `occupied_bytes` of the child, the
285///    sibling, and (for internal nodes) the descended separator pivot
286///    would exceed `PAYLOAD_BYTES` — i.e. the sibling is too FULL to
287///    absorb the child; and
288/// 2. **Borrow declines**: either the sibling cannot spare a slot and
289///    stay `>= MIN_OCCUPIED_BYTES` ([`sibling_has_spare`]), or moving
290///    its boundary slot up would install a separator pivot LARGER than
291///    the one it replaces and push the PARENT past `PAYLOAD_BYTES`
292///    ([`parent_fits_after_pivot_swap`]). The latter is the
293///    "borrow-grows-parent" hazard the #29 large-key oracle pinned.
294///
295/// When that conjunction holds we deliberately leave the child below
296/// `MIN_OCCUPIED_BYTES`. This is sound:
297///
298/// - **Correctness is preserved.** The child still holds every key it
299///   should; the parent's pivots and child pointers stay consistent;
300///   `get` / range-scan / subsequent `insert` and `delete` all remain
301///   correct. The B+tree oracle (`btree_oracle`, `index_oracle`) only
302///   observes externally-visible state and would not — and must not —
303///   change. The ONLY degraded property is the *minimum-occupancy*
304///   guarantee on that one node.
305/// - **The degradation is bounded.** At most one node per
306///   delete-rebalance bubble-up frame can be left underflowing, and a
307///   later `insert` into that subtree, or a `delete` that makes a
308///   sibling spare a slot, repairs the shape. It does not compound
309///   into emptiness: a node that reaches zero slots is handled by the
310///   merge path (its sibling can always absorb a zero-occupancy node),
311///   and the root-collapse path handles a single-child root.
312///
313/// Implementing a full cascade here (recursively splitting the
314/// over-full sibling, or borrow-then-rebalance-the-parent) was
315/// considered and rejected for this change: it is materially more
316/// invasive and risks a B-tree CORRECTNESS regression, which is a far
317/// worse failure mode than a transient occupancy dip. The
318/// `delete_rebalance_residual_underflow_is_bounded_and_correct` test
319/// makes the degradation visible and asserts it stays bounded.
320fn rebalance_under_parent<F: FileBackend>(
321    pager: &mut Pager<F>,
322    internal: &mut DecodedNode,
323    child_index: usize,
324    freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 4 }>,
325) -> Result<()> {
326    // Try merge first: prefer the right sibling.
327    if child_index + 1 < internal.children.len()
328        && try_merge(pager, internal, child_index, MergeDirection::Right, freed)?
329    {
330        return Ok(());
331    }
332    if child_index > 0 && try_merge(pager, internal, child_index, MergeDirection::Left, freed)? {
333        return Ok(());
334    }
335    // Merge couldn't fit; fall back to a borrow. Borrow is best-
336    // effort: if it does not occur the underflow persists for now
337    // (see the residual-underflow policy in the doc comment above).
338    if child_index + 1 < internal.children.len()
339        && try_borrow_from_right(pager, internal, child_index, freed)?
340    {
341        return Ok(());
342    }
343    if child_index > 0 && try_borrow_from_left(pager, internal, child_index, freed)? {
344        return Ok(());
345    }
346    Ok(())
347}
348
349#[derive(Debug, Clone, Copy)]
350enum MergeDirection {
351    Left,
352    Right,
353}
354
355/// Attempt to merge with the indicated sibling. Returns `Ok(true)`
356/// if the merge happened; `Ok(false)` if combined sizes would not
357/// fit in a single page and the merge was abandoned.
358fn try_merge<F: FileBackend>(
359    pager: &mut Pager<F>,
360    internal: &mut DecodedNode,
361    child_index: usize,
362    direction: MergeDirection,
363    freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 4 }>,
364) -> Result<bool> {
365    let (left_idx, right_idx) = match direction {
366        MergeDirection::Right => (child_index, child_index + 1),
367        MergeDirection::Left => (child_index - 1, child_index),
368    };
369    let left_id = pid(internal.children[left_idx])?;
370    let right_id = pid(internal.children[right_idx])?;
371    let left = read_node(pager, left_id)?;
372    let right = read_node(pager, right_id)?;
373    let separator_bytes = match left.kind {
374        NodeKind::Leaf => 0,
375        NodeKind::Internal => {
376            use crate::btree::node::{varint_len, INTERNAL_SLOT_BYTES};
377            let pivot = &internal.internals[left_idx].key;
378            INTERNAL_SLOT_BYTES + varint_len(pivot.len() as u64) + pivot.len()
379        }
380    };
381    let combined = left.occupied_bytes() + right.occupied_bytes() + separator_bytes
382        - match left.kind {
383            NodeKind::Internal => crate::btree::node::INTERNAL_LEFTMOST_CHILD_BYTES,
384            NodeKind::Leaf => 0,
385        };
386    if combined > PAYLOAD_BYTES {
387        return Ok(false);
388    }
389    drop(left);
390    drop(right);
391    match direction {
392        MergeDirection::Right => merge_with_right(pager, internal, child_index, freed)?,
393        MergeDirection::Left => merge_with_left(pager, internal, child_index, freed)?,
394    }
395    Ok(true)
396}
397
398/// Attempt to borrow one slot from the right sibling. Returns
399/// `Ok(true)` if the borrow occurred.
400///
401/// A borrow REPLACES the separator pivot in the parent: the new
402/// pivot can be larger than the old one, growing the parent. If
403/// that growth would push the parent past `PAYLOAD_BYTES` the borrow
404/// is abandoned (returns `Ok(false)`) so the caller can fall through
405/// to the "leave the underflowing child as-is" branch — better than
406/// raising a hard encode error for a delete-rebalance edge case.
407fn try_borrow_from_right<F: FileBackend>(
408    pager: &mut Pager<F>,
409    internal: &mut DecodedNode,
410    child_index: usize,
411    freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 4 }>,
412) -> Result<bool> {
413    let child_id = pid(internal.children[child_index])?;
414    let right_id = pid(internal.children[child_index + 1])?;
415    let mut child = read_node(pager, child_id)?;
416    let mut right = read_node(pager, right_id)?;
417    if !sibling_has_spare(&right) {
418        return Ok(false);
419    }
420    let new_pivot = pick_borrow_right_pivot(&right, child.kind)?;
421    if !parent_fits_after_pivot_swap(internal, child_index, &new_pivot) {
422        return Ok(false);
423    }
424    apply_borrow_from_right(internal, &mut child, &mut right, child_index, &new_pivot)?;
425    push_freed(freed, child_id)?;
426    push_freed(freed, right_id)?;
427    internal.children[child_index] = write_new_node(pager, &child)?.get();
428    internal.children[child_index + 1] = write_new_node(pager, &right)?.get();
429    Ok(true)
430}
431
432/// Choose the key that will be promoted from the right sibling to
433/// become the parent's new separator pivot.
434fn pick_borrow_right_pivot(right: &DecodedNode, child_kind: NodeKind) -> Result<Vec<u8>> {
435    match (child_kind, right.kind) {
436        (NodeKind::Leaf, NodeKind::Leaf) => {
437            right
438                .leaves
439                .get(1)
440                .map(|e| e.key.clone())
441                .ok_or(Error::BTreeInvariantViolated {
442                    reason: "borrow_from_right: right leaf has < 2 entries",
443                })
444        }
445        (NodeKind::Internal, NodeKind::Internal) => right
446            .internals
447            .first()
448            .map(|p| p.key.clone())
449            .ok_or(Error::BTreeInvariantViolated {
450                reason: "borrow_from_right: right internal has no pivots",
451            }),
452        _ => Err(Error::BTreeInvariantViolated {
453            reason: "borrow_from_right: mixed kinds",
454        }),
455    }
456}
457
458/// Mutate `child`, `right`, and `internal` in place to complete the
459/// borrow once the parent-fit check has passed.
460fn apply_borrow_from_right(
461    internal: &mut DecodedNode,
462    child: &mut DecodedNode,
463    right: &mut DecodedNode,
464    child_index: usize,
465    new_pivot: &[u8],
466) -> Result<()> {
467    match (child.kind, right.kind) {
468        (NodeKind::Leaf, NodeKind::Leaf) => {
469            let entry = right.leaves.remove(0);
470            child.leaves.push(entry);
471        }
472        (NodeKind::Internal, NodeKind::Internal) => {
473            let pivot_down = internal.internals[child_index].key.clone();
474            right.internals.remove(0);
475            let moved_child = right.children.remove(0);
476            child.internals.push(InternalEntry { key: pivot_down });
477            child.children.push(moved_child);
478        }
479        _ => {
480            return Err(Error::BTreeInvariantViolated {
481                reason: "borrow_from_right: mixed kinds",
482            });
483        }
484    }
485    internal.internals[child_index].key = new_pivot.to_vec();
486    Ok(())
487}
488
489/// Attempt to borrow one slot from the left sibling. Same parent-
490/// fit check as [`try_borrow_from_right`].
491fn try_borrow_from_left<F: FileBackend>(
492    pager: &mut Pager<F>,
493    internal: &mut DecodedNode,
494    child_index: usize,
495    freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 4 }>,
496) -> Result<bool> {
497    let child_id = pid(internal.children[child_index])?;
498    let left_id = pid(internal.children[child_index - 1])?;
499    let mut child = read_node(pager, child_id)?;
500    let mut left = read_node(pager, left_id)?;
501    if !sibling_has_spare(&left) {
502        return Ok(false);
503    }
504    let new_pivot = pick_borrow_left_pivot(&left, child.kind)?;
505    if !parent_fits_after_pivot_swap(internal, child_index - 1, &new_pivot) {
506        return Ok(false);
507    }
508    apply_borrow_from_left(internal, &mut child, &mut left, child_index, &new_pivot)?;
509    push_freed(freed, child_id)?;
510    push_freed(freed, left_id)?;
511    internal.children[child_index - 1] = write_new_node(pager, &left)?.get();
512    internal.children[child_index] = write_new_node(pager, &child)?.get();
513    Ok(true)
514}
515
516fn pick_borrow_left_pivot(left: &DecodedNode, child_kind: NodeKind) -> Result<Vec<u8>> {
517    match (child_kind, left.kind) {
518        (NodeKind::Leaf, NodeKind::Leaf) => {
519            left.leaves
520                .last()
521                .map(|e| e.key.clone())
522                .ok_or(Error::BTreeInvariantViolated {
523                    reason: "borrow_from_left: empty left leaf",
524                })
525        }
526        (NodeKind::Internal, NodeKind::Internal) => left
527            .internals
528            .last()
529            .map(|p| p.key.clone())
530            .ok_or(Error::BTreeInvariantViolated {
531                reason: "borrow_from_left: empty left pivots",
532            }),
533        _ => Err(Error::BTreeInvariantViolated {
534            reason: "borrow_from_left: mixed kinds",
535        }),
536    }
537}
538
539fn apply_borrow_from_left(
540    internal: &mut DecodedNode,
541    child: &mut DecodedNode,
542    left: &mut DecodedNode,
543    child_index: usize,
544    new_pivot: &[u8],
545) -> Result<()> {
546    match (child.kind, left.kind) {
547        (NodeKind::Leaf, NodeKind::Leaf) => {
548            let entry = left.leaves.pop().ok_or(Error::BTreeInvariantViolated {
549                reason: "borrow_from_left: empty left leaf",
550            })?;
551            child.leaves.insert(0, entry);
552        }
553        (NodeKind::Internal, NodeKind::Internal) => {
554            let pivot_down = internal.internals[child_index - 1].key.clone();
555            left.internals.pop().ok_or(Error::BTreeInvariantViolated {
556                reason: "borrow_from_left: empty left pivots",
557            })?;
558            let moved_child = left.children.pop().ok_or(Error::BTreeInvariantViolated {
559                reason: "borrow_from_left: empty left children",
560            })?;
561            child.internals.insert(0, InternalEntry { key: pivot_down });
562            child.children.insert(0, moved_child);
563        }
564        _ => {
565            return Err(Error::BTreeInvariantViolated {
566                reason: "borrow_from_left: mixed kinds",
567            });
568        }
569    }
570    internal.internals[child_index - 1].key = new_pivot.to_vec();
571    Ok(())
572}
573
574/// Compute whether the parent internal node would still fit in
575/// `PAYLOAD_BYTES` after swapping the pivot at `pivot_index` for a
576/// new pivot of `new_key`. Borrow may grow the parent if
577/// `new_key.len() + varint_len(new_key.len())` exceeds the same for
578/// the old pivot.
579fn parent_fits_after_pivot_swap(parent: &DecodedNode, pivot_index: usize, new_key: &[u8]) -> bool {
580    use crate::btree::node::varint_len;
581    let Some(old_pivot) = parent.internals.get(pivot_index) else {
582        return false;
583    };
584    let old_entry = varint_len(old_pivot.key.len() as u64) + old_pivot.key.len();
585    let new_entry = varint_len(new_key.len() as u64) + new_key.len();
586    let parent_occupied = parent.occupied_bytes();
587    parent_occupied + new_entry <= PAYLOAD_BYTES + old_entry
588}
589
590/// Merge the child at `child_index` with its right sibling.
591/// Removes the separator pivot and the right child id from the
592/// parent.
593fn merge_with_right<F: FileBackend>(
594    pager: &mut Pager<F>,
595    internal: &mut DecodedNode,
596    child_index: usize,
597    freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 4 }>,
598) -> Result<()> {
599    let child_id = pid(internal.children[child_index])?;
600    let right_id = pid(internal.children[child_index + 1])?;
601    let mut child = read_node(pager, child_id)?;
602    let mut right = read_node(pager, right_id)?;
603    match (child.kind, right.kind) {
604        (NodeKind::Leaf, NodeKind::Leaf) => {
605            child.leaves.append(&mut right.leaves);
606            child.next_sibling = right.next_sibling;
607        }
608        (NodeKind::Internal, NodeKind::Internal) => {
609            let pivot_down = internal.internals[child_index].key.clone();
610            child.internals.push(InternalEntry { key: pivot_down });
611            child.internals.append(&mut right.internals);
612            child.children.append(&mut right.children);
613        }
614        _ => {
615            return Err(Error::BTreeInvariantViolated {
616                reason: "merge_with_right: mixed kinds",
617            });
618        }
619    }
620    internal.internals.remove(child_index);
621    internal.children.remove(child_index + 1);
622    push_freed(freed, child_id)?;
623    push_freed(freed, right_id)?;
624    internal.children[child_index] = write_new_node(pager, &child)?.get();
625    Ok(())
626}
627
628/// Merge the child at `child_index` with its left sibling. The
629/// merged node lives in the left's slot; the parent's separator
630/// pivot and the right (current) child id are removed.
631fn merge_with_left<F: FileBackend>(
632    pager: &mut Pager<F>,
633    internal: &mut DecodedNode,
634    child_index: usize,
635    freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 4 }>,
636) -> Result<()> {
637    let left_id = pid(internal.children[child_index - 1])?;
638    let child_id = pid(internal.children[child_index])?;
639    let mut left = read_node(pager, left_id)?;
640    let mut child = read_node(pager, child_id)?;
641    match (left.kind, child.kind) {
642        (NodeKind::Leaf, NodeKind::Leaf) => {
643            left.leaves.append(&mut child.leaves);
644            left.next_sibling = child.next_sibling;
645        }
646        (NodeKind::Internal, NodeKind::Internal) => {
647            let pivot_down = internal.internals[child_index - 1].key.clone();
648            left.internals.push(InternalEntry { key: pivot_down });
649            left.internals.append(&mut child.internals);
650            left.children.append(&mut child.children);
651        }
652        _ => {
653            return Err(Error::BTreeInvariantViolated {
654                reason: "merge_with_left: mixed kinds",
655            });
656        }
657    }
658    internal.internals.remove(child_index - 1);
659    internal.children.remove(child_index);
660    push_freed(freed, left_id)?;
661    push_freed(freed, child_id)?;
662    internal.children[child_index - 1] = write_new_node(pager, &left)?.get();
663    Ok(())
664}
665
666fn sibling_has_spare(node: &DecodedNode) -> bool {
667    // We borrow if removing one slot keeps the sibling at or above
668    // the underflow threshold. Using `occupied_bytes` as the size
669    // proxy means we always check after a hypothetical removal.
670    let one_slot_bytes = one_slot_size(node);
671    let after_borrow = node.occupied_bytes().saturating_sub(one_slot_bytes);
672    after_borrow >= MIN_OCCUPIED_BYTES
673}
674
675/// Approximate the byte footprint of one slot in `node`. Used by
676/// the underflow / borrow heuristic.
677fn one_slot_size(node: &DecodedNode) -> usize {
678    use crate::btree::node::{varint_len, INTERNAL_SLOT_BYTES, LEAF_SLOT_BYTES};
679    match node.kind {
680        NodeKind::Leaf => {
681            let Some(e) = node.leaves.first() else {
682                return 0;
683            };
684            LEAF_SLOT_BYTES
685                + varint_len(e.key.len() as u64)
686                + e.key.len()
687                + varint_len(e.value.len() as u64)
688                + e.value.len()
689        }
690        NodeKind::Internal => {
691            let Some(e) = node.internals.first() else {
692                return 0;
693            };
694            INTERNAL_SLOT_BYTES + varint_len(e.key.len() as u64) + e.key.len()
695        }
696    }
697}
698
699fn read_node<F: FileBackend>(pager: &mut Pager<F>, id: PageId) -> Result<DecodedNode> {
700    let page_ref = pager.read_page(id)?;
701    decode_node(page_ref.as_bytes())
702}
703
704fn pid(raw: u64) -> Result<PageId> {
705    PageId::new(raw).ok_or(Error::BTreeInvariantViolated {
706        reason: "child page-id was zero",
707    })
708}
709
710// Keep the LeafEntry import referenced even when no leaf-specific
711// helper actually constructs one (every leaf entry is taken from a
712// decoded node and moved/cloned around).
713const _: fn(LeafEntry) = drop;
714
715#[cfg(test)]
716mod tests {
717    use super::*;
718    use crate::pager::{Config, Pager};
719    use crate::platform::FileHandle;
720
721    use rand::Rng;
722    use rand::SeedableRng;
723    use rand_chacha::ChaCha8Rng;
724    use std::collections::BTreeMap;
725
726    fn config() -> Config {
727        Config::default()
728    }
729
730    #[test]
731    fn delete_absent_returns_false() {
732        let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
733        let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
734        let removed = tree.delete(&mut pager, b"missing").expect("del");
735        assert!(!removed);
736    }
737
738    #[test]
739    fn delete_single_key_round_trip() {
740        let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
741        let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
742        tree.insert(&mut pager, b"k", b"v").expect("ins");
743        assert!(tree.delete(&mut pager, b"k").expect("del"));
744        assert_eq!(tree.get(&mut pager, b"k").expect("get"), None);
745    }
746
747    #[test]
748    fn delete_collapses_tall_tree_back_to_one_level() {
749        let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
750        let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
751        let value = vec![0xCDu8; 256];
752        for i in 0..200u32 {
753            let key = format!("key-{i:08}");
754            tree.insert(&mut pager, key.as_bytes(), &value)
755                .expect("ins");
756        }
757        // Confirm we grew tall.
758        let root = tree.root();
759        let level_before = {
760            let pr = pager.read_page(root).expect("read");
761            decode_node(pr.as_bytes()).expect("dec").level
762        };
763        assert!(level_before >= 1);
764        // Delete all but one key.
765        for i in 0..199u32 {
766            let key = format!("key-{i:08}");
767            assert!(tree.delete(&mut pager, key.as_bytes()).expect("del"));
768        }
769        let final_root = tree.root();
770        let level_after = {
771            let pr = pager.read_page(final_root).expect("read");
772            decode_node(pr.as_bytes()).expect("dec").level
773        };
774        assert!(
775            level_after < level_before,
776            "tree should have collapsed (before {level_before}, after {level_after})"
777        );
778        let leftover_key = format!("key-{i:08}", i = 199);
779        assert_eq!(
780            tree.get(&mut pager, leftover_key.as_bytes()).expect("get"),
781            Some(value.clone())
782        );
783    }
784
785    #[test]
786    fn insert_delete_oracle_10k() {
787        for seed in 0..3u64 {
788            run_oracle(seed, 10_000);
789        }
790    }
791
792    /// Regression for the borrow-grows-parent bug surfaced by the
793    /// 1M-op oracle (#29). With random keys up to 64 bytes a borrow
794    /// during delete-rebalance can install a new separator pivot in
795    /// the parent that is much larger than the one it replaced,
796    /// pushing the parent past `PAYLOAD_BYTES` and causing the
797    /// subsequent encode to fail with the "slot dir and heap
798    /// collide" invariant. Drive 30 000 mixed insert/delete ops with
799    /// the same key-length distribution as #29 to flush out the
800    /// edge case without paying the 1M-op cost in the unit-test
801    /// harness.
802    #[test]
803    fn insert_delete_oracle_large_keys_30k() {
804        for seed in 0..3u64 {
805            run_oracle_with(seed, 30_000, random_key_up_to_64, random_value_up_to_256);
806        }
807    }
808
809    fn random_key_up_to_64(rng: &mut ChaCha8Rng) -> Vec<u8> {
810        let len = rng.random_range(1u32..=64);
811        (0..len).map(|_| rng.random_range(b'a'..=b'z')).collect()
812    }
813
814    fn random_value_up_to_256(rng: &mut ChaCha8Rng) -> Vec<u8> {
815        let len = rng.random_range(0u32..=256);
816        (0..len).map(|_| rng.random()).collect()
817    }
818
819    fn run_oracle_with(
820        seed: u64,
821        ops: usize,
822        gen_key: fn(&mut ChaCha8Rng) -> Vec<u8>,
823        gen_value: fn(&mut ChaCha8Rng) -> Vec<u8>,
824    ) {
825        let mut rng = ChaCha8Rng::seed_from_u64(seed);
826        let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
827        let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
828        let mut oracle: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
829        for op in 0..ops {
830            // Balance insert/delete so the tree grows tall (>= 2
831            // levels) and the parent-fit edge case is exercised.
832            let pick = rng.random_range(0u32..3);
833            if pick < 2 {
834                let key = gen_key(&mut rng);
835                let value = gen_value(&mut rng);
836                if let std::collections::btree_map::Entry::Vacant(slot) = oracle.entry(key.clone())
837                {
838                    tree.insert(&mut pager, &key, &value)
839                        .unwrap_or_else(|e| panic!("seed {seed} op {op} ins: {e:?}"));
840                    slot.insert(value);
841                }
842            } else if !oracle.is_empty() {
843                let pick_in_oracle = rng.random_range(0u32..5) > 0;
844                let candidate = if pick_in_oracle {
845                    let n = oracle.len();
846                    let i = rng.random_range(0..n);
847                    oracle.keys().nth(i).cloned().unwrap_or_default()
848                } else {
849                    gen_key(&mut rng)
850                };
851                let want = oracle.remove(&candidate).is_some();
852                let got = tree
853                    .delete(&mut pager, &candidate)
854                    .unwrap_or_else(|e| panic!("seed {seed} op {op} del: {e:?}"));
855                assert_eq!(got, want, "seed {seed} op {op}: delete-presence disagrees");
856            }
857        }
858        // Final consistency: every oracle key resolves.
859        for (k, v) in &oracle {
860            assert_eq!(
861                tree.get(&mut pager, k).expect("get").as_ref(),
862                Some(v),
863                "seed {seed} final: key {k:?}"
864            );
865        }
866    }
867
868    fn run_oracle(seed: u64, ops: usize) {
869        let mut rng = ChaCha8Rng::seed_from_u64(seed);
870        let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
871        let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
872        let mut oracle: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
873        for op in 0..ops {
874            let pick = rng.random_range(0u32..3);
875            if pick < 2 {
876                oracle_step_insert(&mut pager, &mut tree, &mut oracle, &mut rng, seed, op);
877            } else {
878                oracle_step_delete(&mut pager, &mut tree, &mut oracle, &mut rng, seed, op);
879            }
880            if op.is_multiple_of(257) {
881                oracle_sample_check(&mut pager, &tree, &oracle, seed, op);
882            }
883        }
884        // Final pass: every oracle key resolves; tree returns None
885        // for a few synthesized absent keys.
886        for (k, v) in &oracle {
887            assert_eq!(
888                tree.get(&mut pager, k).expect("get").as_ref(),
889                Some(v),
890                "seed {seed} final: key {k:?}"
891            );
892        }
893    }
894
895    fn oracle_step_insert(
896        pager: &mut Pager<FileHandle>,
897        tree: &mut BTree<FileHandle>,
898        oracle: &mut BTreeMap<Vec<u8>, Vec<u8>>,
899        rng: &mut ChaCha8Rng,
900        seed: u64,
901        op: usize,
902    ) {
903        let key = random_key(rng);
904        let value = random_value(rng);
905        match oracle.entry(key.clone()) {
906            std::collections::btree_map::Entry::Occupied(existing) => {
907                assert_eq!(
908                    tree.get(pager, &key).expect("get").as_ref(),
909                    Some(existing.get()),
910                    "seed {seed} op {op}: existing key disagrees"
911                );
912            }
913            std::collections::btree_map::Entry::Vacant(slot) => {
914                tree.insert(pager, &key, &value)
915                    .unwrap_or_else(|e| panic!("seed {seed} op {op} ins: {e:?}"));
916                slot.insert(value);
917            }
918        }
919    }
920
921    fn oracle_step_delete(
922        pager: &mut Pager<FileHandle>,
923        tree: &mut BTree<FileHandle>,
924        oracle: &mut BTreeMap<Vec<u8>, Vec<u8>>,
925        rng: &mut ChaCha8Rng,
926        seed: u64,
927        op: usize,
928    ) {
929        let candidate = if !oracle.is_empty() && rng.random_range(0u32..4) > 0 {
930            let n = oracle.len();
931            let pick = rng.random_range(0..n);
932            oracle.keys().nth(pick).cloned().unwrap_or_default()
933        } else {
934            random_key(rng)
935        };
936        let want = oracle.remove(&candidate).is_some();
937        let got = tree
938            .delete(pager, &candidate)
939            .unwrap_or_else(|e| panic!("seed {seed} op {op} del: {e:?}"));
940        assert_eq!(got, want, "seed {seed} op {op}: delete-presence disagrees");
941    }
942
943    fn oracle_sample_check(
944        pager: &mut Pager<FileHandle>,
945        tree: &BTree<FileHandle>,
946        oracle: &BTreeMap<Vec<u8>, Vec<u8>>,
947        seed: u64,
948        op: usize,
949    ) {
950        let mut sample_keys: Vec<&Vec<u8>> = oracle.keys().take(5).collect();
951        sample_keys.extend(oracle.keys().rev().take(5));
952        for k in sample_keys {
953            assert_eq!(
954                tree.get(pager, k).expect("get").as_ref(),
955                oracle.get(k),
956                "seed {seed} op {op}: mid-run get disagrees on key {k:?}"
957            );
958        }
959    }
960
961    fn random_key(rng: &mut ChaCha8Rng) -> Vec<u8> {
962        let len = rng.random_range(1..16);
963        (0..len).map(|_| rng.random_range(b'a'..=b'z')).collect()
964    }
965
966    fn random_value(rng: &mut ChaCha8Rng) -> Vec<u8> {
967        let len = rng.random_range(0..64);
968        (0..len).map(|_| rng.random()).collect()
969    }
970
971    /// Result of walking every node in a tree once.
972    struct OccupancyWalk {
973        nodes: usize,
974        /// Non-root nodes left below `MIN_OCCUPIED_BYTES` — the #64
975        /// residual-underflow population. Bounded and made visible
976        /// here so a future regression that lets it grow unbounded
977        /// (or that turns it into a correctness bug) is caught.
978        residual_underflows: usize,
979    }
980
981    /// Walk the whole tree from `root` with an explicit stack
982    /// (Power-of-ten Rule 1: no recursion; Rule 2: the visit count is
983    /// bounded by `pager.page_count()`, which strictly exceeds the
984    /// node count). Counts nodes and non-root nodes that sit below the
985    /// min-occupancy threshold. Empty/zero-slot leaves are NOT counted
986    /// as underflows: an empty leaf root is the legitimate empty-tree
987    /// state, and the merge path keeps non-root leaves from stranding
988    /// at zero.
989    fn walk_occupancy(pager: &mut Pager<FileHandle>, root: PageId) -> OccupancyWalk {
990        let mut stack: Vec<(PageId, bool)> = vec![(root, true)];
991        let mut nodes = 0usize;
992        let mut residual_underflows = 0usize;
993        let visit_budget = usize::try_from(pager.page_count())
994            .unwrap_or(usize::MAX)
995            .saturating_add(1);
996        let mut visited = 0usize;
997        while let Some((id, is_root)) = stack.pop() {
998            visited += 1;
999            assert!(
1000                visited <= visit_budget,
1001                "occupancy walk exceeded node bound"
1002            );
1003            let node = {
1004                let pr = pager.read_page(id).expect("read node");
1005                decode_node(pr.as_bytes()).expect("decode node")
1006            };
1007            nodes += 1;
1008            let occupied = node.occupied_bytes();
1009            let empty = match node.kind {
1010                NodeKind::Leaf => node.leaves.is_empty(),
1011                NodeKind::Internal => node.internals.is_empty(),
1012            };
1013            if !is_root && !empty && occupied < MIN_OCCUPIED_BYTES {
1014                residual_underflows += 1;
1015            }
1016            if matches!(node.kind, NodeKind::Internal) {
1017                for &child in &node.children {
1018                    let cid = PageId::new(child).expect("child id nonzero");
1019                    stack.push((cid, false));
1020                }
1021            }
1022        }
1023        OccupancyWalk {
1024            nodes,
1025            residual_underflows,
1026        }
1027    }
1028
1029    /// #64: after a delete-heavy, large-key workload (the distribution
1030    /// most likely to trip the merge-and-borrow-both-decline residual
1031    /// underflow), assert that:
1032    ///
1033    /// 1. the tree is still fully CORRECT — every surviving key
1034    ///    resolves to its expected value (this is the invariant a
1035    ///    cascade-handling regression must never break); and
1036    /// 2. the residual-underflow degradation is BOUNDED and VISIBLE —
1037    ///    the number of underflowing non-root nodes never exceeds the
1038    ///    node count (it cannot "leak" unboundedly) and is reported.
1039    ///
1040    /// The assertion is intentionally loose on the exact count (the
1041    /// snapshot shape is workload-dependent); its job is to make the
1042    /// degradation observable and to fail loudly if it ever stops
1043    /// being bounded by the node population.
1044    #[test]
1045    fn delete_rebalance_residual_underflow_is_bounded_and_correct() {
1046        for seed in 0..4u64 {
1047            let mut rng = ChaCha8Rng::seed_from_u64(seed);
1048            let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
1049            let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
1050            let mut oracle: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
1051            // Grow tall with large keys, then delete-heavy to exercise
1052            // rebalance.
1053            for op in 0..20_000usize {
1054                let deleting = op >= 6_000 && rng.random_range(0u32..3) > 0;
1055                if deleting && !oracle.is_empty() {
1056                    let n = oracle.len();
1057                    let i = rng.random_range(0..n);
1058                    let cand = oracle.keys().nth(i).cloned().unwrap_or_default();
1059                    let want = oracle.remove(&cand).is_some();
1060                    let got = tree
1061                        .delete(&mut pager, &cand)
1062                        .unwrap_or_else(|e| panic!("seed {seed} op {op} del: {e:?}"));
1063                    assert_eq!(got, want, "seed {seed} op {op}: delete presence");
1064                } else {
1065                    let key = random_key_up_to_64(&mut rng);
1066                    let value = random_value_up_to_256(&mut rng);
1067                    if let std::collections::btree_map::Entry::Vacant(slot) =
1068                        oracle.entry(key.clone())
1069                    {
1070                        tree.insert(&mut pager, &key, &value)
1071                            .unwrap_or_else(|e| panic!("seed {seed} op {op} ins: {e:?}"));
1072                        slot.insert(value);
1073                    }
1074                }
1075            }
1076            // (1) Correctness: every surviving key resolves.
1077            for (k, v) in &oracle {
1078                assert_eq!(
1079                    tree.get(&mut pager, k).expect("get").as_ref(),
1080                    Some(v),
1081                    "seed {seed}: surviving key must resolve: {k:?}",
1082                );
1083            }
1084            // (2) Bounded + visible degradation.
1085            let root = tree.root();
1086            let walk = walk_occupancy(&mut pager, root);
1087            eprintln!(
1088                "OCCUPANCY #64 seed {seed}: nodes={} residual_underflows={}",
1089                walk.nodes, walk.residual_underflows,
1090            );
1091            assert!(
1092                walk.residual_underflows <= walk.nodes,
1093                "seed {seed}: residual underflows ({}) must stay bounded by \
1094                 node count ({}) — an unbounded count signals a rebalance \
1095                 regression",
1096                walk.residual_underflows,
1097                walk.nodes,
1098            );
1099        }
1100    }
1101}