Skip to main content

obj_core/btree/
insert.rs

1//! B+tree insert path with copy-on-write splits.
2//!
3//! See `docs/format.md` § B+tree write semantics. Every node touched
4//! along the insert path is rewritten to a freshly-allocated page;
5//! the displaced pages enter the freelist only after the new root is
6//! staged in the pager's WAL transaction buffer. The caller is
7//! responsible for calling `Pager::commit` to make the insert
8//! durable.
9//!
10//! # Power-of-ten posture
11//!
12//! - **Rule 1.** The walk root → leaf uses an explicit
13//!   `heapless::Vec<_, MAX_BTREE_DEPTH>` path stack. The bubble-up
14//!   loop is a `while let Some(...)` over that stack — no
15//!   recursion.
16//! - **Rule 2.** Every loop is bounded: by the path length (≤
17//!   `MAX_BTREE_DEPTH`), by `key_count` (a node's slot count), or
18//!   by `LEAF_SLOT_CAP` / `INTERNAL_SLOT_CAP`.
19//! - **Rule 5.** `validate_node` runs on every encoded node behind
20//!   a `debug_assert!`; release builds surface invariant breaks as
21//!   `Error::BTreeInvariantViolated`.
22
23#![forbid(unsafe_code)]
24
25use heapless::Vec as HeaplessVec;
26
27use crate::btree::node::{
28    decode_node, encode_node, max_inline_value, max_key_len, DecodedNode, InternalEntry, LeafEntry,
29    NodeKind, INTERNAL_LEFTMOST_CHILD_BYTES, INTERNAL_SLOT_BYTES, LEAF_SLOT_BYTES,
30};
31use crate::btree::{BTree, MAX_BTREE_DEPTH};
32use crate::error::{Error, Result};
33use crate::pager::page::{Page, PageId, PAGE_SIZE, PAGE_TRAILER_SIZE};
34use crate::pager::Pager;
35use crate::platform::FileBackend;
36
37/// One step in a descending path: the node's id, its decoded
38/// representation, and the index of the child we followed.
39struct PathFrame {
40    page_id: PageId,
41    node: DecodedNode,
42    /// For internal-node frames: the child index we recursed into.
43    /// Unused for the leaf frame at the bottom of the stack.
44    child_index: usize,
45}
46
47/// One promoted (pivot key, right-child page-id) pair produced by a
48/// split. The parent inserts these in order immediately after the
49/// slot the split node occupied.
50struct Promoted {
51    /// Smallest key of the right sibling (leaf split) or the pivot
52    /// moved up (internal split).
53    key: Vec<u8>,
54    /// Page-id of the right sibling this pivot routes to.
55    right_id: PageId,
56}
57
58/// Outcome of replacing one node along the insert path: either the
59/// node still fits in a single page (we just COW it), or it had to
60/// split into N siblings (N ≥ 2) with N-1 promoted pivot keys.
61///
62/// # Split invariant (issue #137)
63///
64/// A [`ReplaceOutcome::Split`] always reports a `left_id` plus a
65/// non-empty `promoted` list. EVERY node referenced — `left_id` and
66/// every `promoted[i].right_id` — was written by [`write_new_node`],
67/// which calls [`encode_node`] and therefore guarantees
68/// `occupied_bytes() <= PAYLOAD_BYTES`. The split routines below
69/// (`split_leaf` / `split_internal`) pack entries by BYTES, never by
70/// count, so no resulting node can overflow a page even when one
71/// entry is large relative to the page. The parent
72/// (`replace_internal` / `build_new_root`) absorbs the WHOLE
73/// `promoted` list, which may itself overflow the parent and cascade
74/// the split upward — that is handled by the same byte-aware
75/// `split_internal`.
76enum ReplaceOutcome {
77    /// Node fits in one page; new page-id replaces the old one in
78    /// its parent's child slot.
79    Fits { new_id: PageId },
80    /// Node split into `left_id` plus one or more right siblings. The
81    /// `promoted` pivots/children are inserted, in order, immediately
82    /// after the original slot in the parent.
83    Split {
84        left_id: PageId,
85        promoted: Vec<Promoted>,
86    },
87}
88
89/// Total bytes available for the slot directory + heap in a single
90/// node (excludes the node header and the page trailer).
91const PAYLOAD_BYTES: usize = PAGE_SIZE - PAGE_TRAILER_SIZE - crate::btree::node::NODE_HEADER_SIZE;
92
93impl<F: FileBackend> BTree<F> {
94    /// Insert `key → value` into the tree. Returns
95    /// [`Error::BTreeKeyExists`] if `key` is already present.
96    ///
97    /// Copy-on-write: every node touched on the path is allocated as
98    /// a fresh page through the pager. The pre-insert pages enter the
99    /// freelist before this function returns, but only after every
100    /// new page has been staged in the WAL transaction. The caller
101    /// must call [`Pager::commit`] to make the insert durable.
102    ///
103    /// # Errors
104    ///
105    /// - [`Error::BTreeKeyTooLarge`] if the key exceeds `PAGE_SIZE / 4`.
106    /// - [`Error::BTreeValueTooLarge`] if the (key, value) pair will
107    ///   not fit inline in a leaf.
108    /// - [`Error::BTreeKeyExists`] if `key` is already present.
109    /// - [`Error::BTreeDepthExceeded`] if the tree height would
110    ///   exceed `MAX_BTREE_DEPTH`.
111    /// - [`Error::Corruption`] / [`Error::Io`] propagated from the
112    ///   pager.
113    pub fn insert(&mut self, pager: &mut Pager<F>, key: &[u8], value: &[u8]) -> Result<()> {
114        check_key_value_size(key, value)?;
115        let path = self.descend_with_path(pager, key)?;
116        self.apply_insert(pager, path, key, value)
117    }
118
119    /// Walk root → leaf, recording each ancestor's page-id, decoded
120    /// representation, and the child index that was followed.
121    fn descend_with_path(
122        &self,
123        pager: &mut Pager<F>,
124        key: &[u8],
125    ) -> Result<HeaplessVec<PathFrame, MAX_BTREE_DEPTH>> {
126        let mut path: HeaplessVec<PathFrame, MAX_BTREE_DEPTH> = HeaplessVec::new();
127        let mut current = self.root;
128        loop {
129            let decoded = {
130                let page_ref = pager.read_page(current)?;
131                decode_node(page_ref.as_bytes())?
132            };
133            match decoded.kind {
134                NodeKind::Leaf => {
135                    let frame = PathFrame {
136                        page_id: current,
137                        node: decoded,
138                        child_index: 0,
139                    };
140                    if path.push(frame).is_err() {
141                        return Err(Error::BTreeDepthExceeded {
142                            limit: MAX_BTREE_DEPTH,
143                        });
144                    }
145                    return Ok(path);
146                }
147                NodeKind::Internal => {
148                    let child_index = pivot_index(&decoded, key);
149                    let raw = decoded.children[child_index];
150                    let next = PageId::new(raw).ok_or(Error::BTreeInvariantViolated {
151                        reason: "internal node had zero child page-id",
152                    })?;
153                    let frame = PathFrame {
154                        page_id: current,
155                        node: decoded,
156                        child_index,
157                    };
158                    if path.push(frame).is_err() {
159                        return Err(Error::BTreeDepthExceeded {
160                            limit: MAX_BTREE_DEPTH,
161                        });
162                    }
163                    current = next;
164                }
165            }
166        }
167    }
168
169    /// Apply the (key, value) insertion to the leaf at the bottom of
170    /// the path, then bubble any split up to the root.
171    fn apply_insert(
172        &mut self,
173        pager: &mut Pager<F>,
174        mut path: HeaplessVec<PathFrame, MAX_BTREE_DEPTH>,
175        key: &[u8],
176        value: &[u8],
177    ) -> Result<()> {
178        let mut freed: HeaplessVec<PageId, { MAX_BTREE_DEPTH * 2 }> = HeaplessVec::new();
179        let Some(leaf_frame) = path.pop() else {
180            return Err(Error::BTreeInvariantViolated {
181                reason: "insert: descend returned empty path",
182            });
183        };
184        let mut outcome = replace_leaf(pager, leaf_frame, key, value, &mut freed)?;
185        while let Some(parent_frame) = path.pop() {
186            outcome = replace_internal(pager, parent_frame, outcome, &mut freed)?;
187        }
188        let new_root = build_new_root(pager, outcome)?;
189        self.root = new_root;
190        // Free displaced pages AFTER every new page is staged in the
191        // WAL transaction buffer. This is the COW contract: pre-
192        // insert pages must remain readable through the previous
193        // root until the new root is committed.
194        for old_id in freed.iter().copied() {
195            pager.free_page(old_id)?;
196        }
197        Ok(())
198    }
199}
200
201fn replace_leaf<F: FileBackend>(
202    pager: &mut Pager<F>,
203    frame: PathFrame,
204    key: &[u8],
205    value: &[u8],
206    freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 2 }>,
207) -> Result<ReplaceOutcome> {
208    let mut leaf = frame.node;
209    if leaf.leaves.iter().any(|e| e.key.as_slice() == key) {
210        return Err(Error::BTreeKeyExists);
211    }
212    let insert_at = leaf
213        .leaves
214        .iter()
215        .position(|e| e.key.as_slice() > key)
216        .unwrap_or(leaf.leaves.len());
217    leaf.leaves.insert(
218        insert_at,
219        LeafEntry {
220            key: key.to_vec(),
221            value: value.to_vec(),
222        },
223    );
224    push_freed(freed, frame.page_id)?;
225    if leaf.occupied_bytes() <= PAYLOAD_BYTES {
226        let new_id = write_new_node(pager, &leaf)?;
227        return Ok(ReplaceOutcome::Fits { new_id });
228    }
229    split_leaf(pager, leaf)
230}
231
232fn replace_internal<F: FileBackend>(
233    pager: &mut Pager<F>,
234    frame: PathFrame,
235    child_outcome: ReplaceOutcome,
236    freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 2 }>,
237) -> Result<ReplaceOutcome> {
238    let mut internal = frame.node;
239    let idx = frame.child_index;
240    match child_outcome {
241        ReplaceOutcome::Fits { new_id } => {
242            internal.children[idx] = new_id.get();
243        }
244        ReplaceOutcome::Split { left_id, promoted } => {
245            internal.children[idx] = left_id.get();
246            // Absorb the WHOLE promoted list (issue #137 multi-way
247            // split): pivot[idx + i] / child[idx + 1 + i] for each
248            // promoted entry, preserving order. Bounded by
249            // `promoted.len()` ≤ the original node's slot count.
250            for (i, p) in promoted.into_iter().enumerate() {
251                internal
252                    .internals
253                    .insert(idx + i, InternalEntry { key: p.key });
254                internal.children.insert(idx + 1 + i, p.right_id.get());
255            }
256        }
257    }
258    push_freed(freed, frame.page_id)?;
259    if internal.occupied_bytes() <= PAYLOAD_BYTES {
260        let new_id = write_new_node(pager, &internal)?;
261        return Ok(ReplaceOutcome::Fits { new_id });
262    }
263    split_internal(pager, internal)
264}
265
266/// Build the new root from the outcome of `apply_insert`'s final
267/// bubble. If no split, the root is just the new id; if a split, we
268/// allocate a fresh internal node holding (left, promoted...).
269///
270/// The new root absorbs the WHOLE promoted list. With a byte-aware
271/// multi-way split the promoted list can carry several large pivots,
272/// so the freshly-built root may itself overflow `PAYLOAD_BYTES`; in
273/// that case it is split via [`split_internal`] and a level is added
274/// per split. The loop is bounded by `MAX_BTREE_DEPTH` (Rule 2): each
275/// iteration adds one tree level, and a tree deeper than the bound is
276/// a [`Error::BTreeDepthExceeded`].
277fn build_new_root<F: FileBackend>(
278    pager: &mut Pager<F>,
279    mut outcome: ReplaceOutcome,
280) -> Result<PageId> {
281    for _ in 0..MAX_BTREE_DEPTH {
282        let (left_id, promoted) = match outcome {
283            ReplaceOutcome::Fits { new_id } => return Ok(new_id),
284            ReplaceOutcome::Split { left_id, promoted } => (left_id, promoted),
285        };
286        let level = node_level_after_split(pager, left_id)?;
287        let next_level = level.checked_add(1).ok_or(Error::BTreeDepthExceeded {
288            limit: MAX_BTREE_DEPTH,
289        })?;
290        let mut children = Vec::with_capacity(promoted.len() + 1);
291        let mut internals = Vec::with_capacity(promoted.len());
292        children.push(left_id.get());
293        for p in promoted {
294            internals.push(InternalEntry { key: p.key });
295            children.push(p.right_id.get());
296        }
297        let root_node = DecodedNode {
298            kind: NodeKind::Internal,
299            level: next_level,
300            next_sibling: 0,
301            children,
302            leaves: Vec::new(),
303            internals,
304        };
305        if root_node.occupied_bytes() <= PAYLOAD_BYTES {
306            return write_new_node(pager, &root_node);
307        }
308        outcome = split_internal(pager, root_node)?;
309    }
310    Err(Error::BTreeDepthExceeded {
311        limit: MAX_BTREE_DEPTH,
312    })
313}
314
315fn push_freed(freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 2 }>, id: PageId) -> Result<()> {
316    freed.push(id).map_err(|_| Error::BTreeInvariantViolated {
317        reason: "insert: too many displaced pages to track",
318    })
319}
320
321/// Pick the child index in `node` whose subtree contains `key`.
322fn pivot_index(node: &DecodedNode, key: &[u8]) -> usize {
323    let mut idx = node.internals.len();
324    for (i, pivot) in node.internals.iter().enumerate() {
325        if pivot.key.as_slice() > key {
326            idx = i;
327            break;
328        }
329    }
330    idx
331}
332
333/// Validate that `key` / `value` fit the format-version-0 bounds.
334fn check_key_value_size(key: &[u8], value: &[u8]) -> Result<()> {
335    if key.len() > max_key_len() {
336        return Err(Error::BTreeKeyTooLarge {
337            key_len: key.len(),
338            max: max_key_len(),
339        });
340    }
341    let v_max = max_inline_value(key.len());
342    if value.len() > v_max {
343        return Err(Error::BTreeValueTooLarge {
344            value_len: value.len(),
345            max: v_max,
346        });
347    }
348    Ok(())
349}
350
351/// Encode `node` into a fresh page and write it through the pager.
352/// Returns the newly-allocated `PageId`.
353pub(crate) fn write_new_node<F: FileBackend>(
354    pager: &mut Pager<F>,
355    node: &DecodedNode,
356) -> Result<PageId> {
357    let new_id = pager.alloc_page()?;
358    let mut page = Page::zeroed();
359    encode_node(node, &mut page)?;
360    pager.write_page(new_id, &page)?;
361    Ok(new_id)
362}
363
364/// Byte footprint one leaf entry occupies in an encoded node: its
365/// slot-directory entry plus its heap entry (two length-prefixed
366/// byte strings). Mirrors the accounting in
367/// [`DecodedNode::occupied_bytes`] for a leaf exactly.
368fn leaf_entry_bytes(entry: &LeafEntry) -> usize {
369    use crate::btree::node::varint_len;
370    LEAF_SLOT_BYTES
371        + varint_len(entry.key.len() as u64)
372        + entry.key.len()
373        + varint_len(entry.value.len() as u64)
374        + entry.value.len()
375}
376
377/// Byte footprint one internal pivot occupies in an encoded node: its
378/// slot-directory entry plus its length-prefixed heap key. Mirrors
379/// the per-pivot term of [`DecodedNode::occupied_bytes`] for an
380/// internal node. The leftmost-child term is accounted separately by
381/// the caller because it is per-NODE, not per-pivot.
382fn internal_pivot_bytes(entry: &InternalEntry) -> usize {
383    use crate::btree::node::varint_len;
384    INTERNAL_SLOT_BYTES + varint_len(entry.key.len() as u64) + entry.key.len()
385}
386
387/// Split an overflowing leaf into the minimum number of byte-bounded
388/// siblings (issue #137).
389///
390/// # Why by BYTES, not by count
391///
392/// Entries are variable-length. A count-midpoint split can leave a
393/// half whose `occupied_bytes()` still exceeds `PAYLOAD_BYTES` (a
394/// near-full leaf of small entries plus one large entry), which made
395/// [`encode_node`] raise the "slot dir and heap collide" invariant
396/// and surfaced as a spurious write error. This greedy packer walks
397/// entries left → right, opening a new sibling whenever the next
398/// entry would push the current sibling past `PAYLOAD_BYTES`.
399///
400/// # Split invariant
401///
402/// Every emitted sibling satisfies `occupied_bytes() <=
403/// PAYLOAD_BYTES`. Each sibling holds at least one entry, because
404/// [`check_key_value_size`] guarantees a single (key, value) pair
405/// always fits in one node (`max_inline_value` reserves a slot plus
406/// two varints), so a fresh-sibling never immediately overflows on
407/// its first entry. The siblings' `next_sibling` chain is rebuilt
408/// left → right so a forward range scan visits every entry in order.
409fn split_leaf<F: FileBackend>(pager: &mut Pager<F>, leaf: DecodedNode) -> Result<ReplaceOutcome> {
410    debug_assert!(matches!(leaf.kind, NodeKind::Leaf));
411    debug_assert!(leaf.leaves.len() >= 2, "leaf split needs ≥ 2 entries");
412    let original_sibling = leaf.next_sibling;
413    let groups = pack_leaf_groups(leaf.leaves)?;
414    write_leaf_groups(pager, groups, original_sibling)
415}
416
417/// Greedily pack leaf entries into byte-bounded groups. The first
418/// group is the left node (keeps the original slot in the parent);
419/// the rest become promoted right siblings. Bounded by the entry
420/// count (Rule 2).
421fn pack_leaf_groups(entries: Vec<LeafEntry>) -> Result<Vec<Vec<LeafEntry>>> {
422    let mut groups: Vec<Vec<LeafEntry>> = Vec::new();
423    let mut current: Vec<LeafEntry> = Vec::new();
424    let mut current_bytes = 0usize;
425    for entry in entries {
426        let entry_bytes = leaf_entry_bytes(&entry);
427        if !current.is_empty() && current_bytes + entry_bytes > PAYLOAD_BYTES {
428            groups.push(std::mem::take(&mut current));
429            current_bytes = 0;
430        }
431        current_bytes += entry_bytes;
432        current.push(entry);
433    }
434    if !current.is_empty() {
435        groups.push(current);
436    }
437    if groups.len() < 2 {
438        return Err(Error::BTreeInvariantViolated {
439            reason: "leaf split produced fewer than two groups",
440        });
441    }
442    Ok(groups)
443}
444
445/// Write each leaf group to a fresh page, chaining `next_sibling`
446/// left → right (the last group inherits the original right-sibling
447/// pointer). Returns the left id plus the promoted right siblings.
448/// Bounded by the group count (Rule 2).
449fn write_leaf_groups<F: FileBackend>(
450    pager: &mut Pager<F>,
451    groups: Vec<Vec<LeafEntry>>,
452    original_sibling: u64,
453) -> Result<ReplaceOutcome> {
454    let count = groups.len();
455    // Write right → left so each node can point at the already-known
456    // page-id of the sibling to its right.
457    let mut next_sibling = original_sibling;
458    let mut ids: Vec<(PageId, Vec<u8>)> = Vec::with_capacity(count);
459    for (i, entries) in groups.into_iter().enumerate().rev() {
460        let promoted_key =
461            entries
462                .first()
463                .map(|e| e.key.clone())
464                .ok_or(Error::BTreeInvariantViolated {
465                    reason: "leaf split produced an empty group",
466                })?;
467        let node = DecodedNode {
468            kind: NodeKind::Leaf,
469            level: 0,
470            next_sibling,
471            children: Vec::new(),
472            leaves: entries,
473            internals: Vec::new(),
474        };
475        let id = write_new_node(pager, &node)?;
476        next_sibling = id.get();
477        // The promoted key is only used for siblings i >= 1.
478        let key = if i == 0 { Vec::new() } else { promoted_key };
479        ids.push((id, key));
480    }
481    ids.reverse();
482    assemble_split_outcome(ids)
483}
484
485/// Turn a left → right ordered `(page_id, promoted_key)` list into a
486/// [`ReplaceOutcome::Split`]. The first element is the left node
487/// (its key is unused); the rest are promoted right siblings.
488fn assemble_split_outcome(mut ids: Vec<(PageId, Vec<u8>)>) -> Result<ReplaceOutcome> {
489    if ids.len() < 2 {
490        return Err(Error::BTreeInvariantViolated {
491            reason: "split outcome needs a left node and ≥ 1 promoted sibling",
492        });
493    }
494    let mut iter = ids.drain(..);
495    let (left_id, _) = iter.next().ok_or(Error::BTreeInvariantViolated {
496        reason: "split outcome missing left node",
497    })?;
498    let mut promoted = Vec::with_capacity(iter.len());
499    for (right_id, key) in iter {
500        promoted.push(Promoted { key, right_id });
501    }
502    Ok(ReplaceOutcome::Split { left_id, promoted })
503}
504
505/// Split an overflowing internal node into the minimum number of
506/// byte-bounded siblings (issue #137). Like the leaf split this packs
507/// by BYTES, not by count, so no sibling overflows `PAYLOAD_BYTES`
508/// even when a pivot key is large relative to the page.
509///
510/// An internal split PROMOTES the boundary pivot between two adjacent
511/// siblings (it is not retained in either), exactly like the original
512/// 2-way split — so for N siblings, N-1 pivots are promoted and the
513/// remaining pivots are partitioned among the siblings.
514///
515/// # Split invariant
516///
517/// Every emitted sibling satisfies `occupied_bytes() <=
518/// PAYLOAD_BYTES` and keeps `children.len() == internals.len() + 1`.
519/// Each sibling holds at least one pivot (so each has ≥ 2 children),
520/// because a single pivot entry plus the leftmost-child term always
521/// fits in a node (`max_key_len = PAGE_SIZE / 4` ≪ `PAYLOAD_BYTES`).
522fn split_internal<F: FileBackend>(
523    pager: &mut Pager<F>,
524    internal: DecodedNode,
525) -> Result<ReplaceOutcome> {
526    debug_assert!(matches!(internal.kind, NodeKind::Internal));
527    debug_assert!(
528        internal.internals.len() >= 2,
529        "internal split needs ≥ 2 pivots"
530    );
531    let level = internal.level;
532    let groups = pack_internal_groups(internal.internals, internal.children)?;
533    write_internal_groups(pager, groups, level)
534}
535
536/// One byte-bounded internal sibling produced by the split: its
537/// child pointers, its retained pivots, and (for siblings after the
538/// first) the boundary pivot promoted to the parent.
539struct InternalGroup {
540    children: Vec<u64>,
541    internals: Vec<InternalEntry>,
542    /// `None` for the first group (left node); `Some(key)` is the
543    /// pivot promoted to the parent ahead of this sibling.
544    promoted: Option<Vec<u8>>,
545}
546
547/// Greedily pack internal pivots/children into byte-bounded groups,
548/// promoting the boundary pivot between adjacent groups. Bounded by
549/// the pivot count (Rule 2).
550///
551/// `children.len()` is `internals.len() + 1`. We walk pivots left →
552/// right; child `i` always travels with pivot `i` (it is the child to
553/// the pivot's LEFT), and the trailing child `K` closes the last
554/// group. When the running group is non-empty and adding the next
555/// pivot+child would exceed `PAYLOAD_BYTES`, that pivot is PROMOTED
556/// (it becomes the parent separator) and a fresh group is opened
557/// starting with the pivot's right child.
558fn pack_internal_groups(
559    pivots: Vec<InternalEntry>,
560    children: Vec<u64>,
561) -> Result<Vec<InternalGroup>> {
562    if children.len() != pivots.len() + 1 {
563        return Err(Error::BTreeInvariantViolated {
564            reason: "internal split: children.len() != pivots+1",
565        });
566    }
567    let mut groups: Vec<InternalGroup> = Vec::new();
568    let mut cur_children: Vec<u64> = Vec::new();
569    let mut cur_pivots: Vec<InternalEntry> = Vec::new();
570    let mut cur_bytes = INTERNAL_LEFTMOST_CHILD_BYTES;
571    let mut pending_promote: Option<Vec<u8>> = None;
572    let mut child_iter = children.into_iter();
573    let leftmost = child_iter.next().ok_or(Error::BTreeInvariantViolated {
574        reason: "internal split: missing leftmost child",
575    })?;
576    cur_children.push(leftmost);
577    for pivot in pivots {
578        let right_child = child_iter.next().ok_or(Error::BTreeInvariantViolated {
579            reason: "internal split: missing right child for pivot",
580        })?;
581        let pivot_bytes = internal_pivot_bytes(&pivot);
582        // A non-empty group whose next pivot would overflow: close it
583        // and PROMOTE this pivot (it routes between the closed group
584        // and the new one), opening a fresh group with `right_child`.
585        if !cur_pivots.is_empty() && cur_bytes + pivot_bytes > PAYLOAD_BYTES {
586            groups.push(InternalGroup {
587                children: std::mem::take(&mut cur_children),
588                internals: std::mem::take(&mut cur_pivots),
589                promoted: pending_promote.take(),
590            });
591            pending_promote = Some(pivot.key);
592            cur_children.push(right_child);
593            cur_bytes = INTERNAL_LEFTMOST_CHILD_BYTES;
594        } else {
595            cur_bytes += pivot_bytes;
596            cur_pivots.push(pivot);
597            cur_children.push(right_child);
598        }
599    }
600    groups.push(InternalGroup {
601        children: cur_children,
602        internals: cur_pivots,
603        promoted: pending_promote,
604    });
605    if groups.len() < 2 {
606        return Err(Error::BTreeInvariantViolated {
607            reason: "internal split produced fewer than two groups",
608        });
609    }
610    Ok(groups)
611}
612
613/// Write each internal group to a fresh page and assemble the
614/// promoted (pivot, right-id) list. Bounded by the group count.
615fn write_internal_groups<F: FileBackend>(
616    pager: &mut Pager<F>,
617    groups: Vec<InternalGroup>,
618    level: u8,
619) -> Result<ReplaceOutcome> {
620    let mut left_id: Option<PageId> = None;
621    let mut promoted: Vec<Promoted> = Vec::new();
622    for group in groups {
623        let node = DecodedNode {
624            kind: NodeKind::Internal,
625            level,
626            next_sibling: 0,
627            children: group.children,
628            leaves: Vec::new(),
629            internals: group.internals,
630        };
631        let id = write_new_node(pager, &node)?;
632        match group.promoted {
633            None => left_id = Some(id),
634            Some(key) => promoted.push(Promoted { key, right_id: id }),
635        }
636    }
637    let left_id = left_id.ok_or(Error::BTreeInvariantViolated {
638        reason: "internal split produced no left node",
639    })?;
640    if promoted.is_empty() {
641        return Err(Error::BTreeInvariantViolated {
642            reason: "internal split produced no promoted siblings",
643        });
644    }
645    Ok(ReplaceOutcome::Split { left_id, promoted })
646}
647
648/// Determine the level of a node that just emerged from a split. We
649/// read the left half's page back through the pager (cheap — it was
650/// just staged in the WAL view) to learn its level.
651fn node_level_after_split<F: FileBackend>(pager: &mut Pager<F>, id: PageId) -> Result<u8> {
652    let page_ref = pager.read_page(id)?;
653    let decoded = decode_node(page_ref.as_bytes())?;
654    Ok(decoded.level)
655}
656
657#[cfg(test)]
658mod tests {
659    use super::*;
660    use crate::pager::{Config, Pager};
661    use crate::platform::FileHandle;
662
663    use proptest::prelude::*;
664    use rand::prelude::IndexedRandom;
665    use rand::SeedableRng;
666    use rand_chacha::ChaCha8Rng;
667    use std::collections::BTreeMap;
668
669    fn config() -> Config {
670        Config::default()
671    }
672
673    #[test]
674    fn insert_single_key_round_trip() {
675        let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
676        let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
677        tree.insert(&mut pager, b"hello", b"world").expect("ins");
678        assert_eq!(
679            tree.get(&mut pager, b"hello").expect("get"),
680            Some(b"world".to_vec())
681        );
682    }
683
684    #[test]
685    fn duplicate_key_errors() {
686        let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
687        let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
688        tree.insert(&mut pager, b"k", b"v1").expect("ins");
689        let err = tree
690            .insert(&mut pager, b"k", b"v2")
691            .expect_err("dup must fail");
692        assert!(matches!(err, Error::BTreeKeyExists));
693    }
694
695    #[test]
696    fn insert_growth_splits_root() {
697        let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
698        let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
699        // 256-byte values guarantee splits after ~15 inserts.
700        let value = vec![0xABu8; 256];
701        for i in 0..200u32 {
702            let key = format!("key-{i:08}");
703            tree.insert(&mut pager, key.as_bytes(), &value)
704                .expect("ins");
705        }
706        for i in 0..200u32 {
707            let key = format!("key-{i:08}");
708            assert_eq!(
709                tree.get(&mut pager, key.as_bytes()).expect("get"),
710                Some(value.clone()),
711                "key {key}"
712            );
713        }
714        // Tree should have at least 2 levels now.
715        let root = tree.root();
716        let page_ref = pager.read_page(root).expect("read root");
717        let decoded = decode_node(page_ref.as_bytes()).expect("decode root");
718        assert!(
719            decoded.level >= 1,
720            "expected internal root, got {decoded:?}"
721        );
722    }
723
724    proptest! {
725        #![proptest_config(ProptestConfig {
726            cases: 16,
727            max_shrink_iters: 32,
728            .. ProptestConfig::default()
729        })]
730
731        #[test]
732        fn insert_oracle_property(seed in any::<u64>()) {
733            run_insert_oracle(seed, 200);
734        }
735    }
736
737    /// Run 10k random insert operations against a `BTreeMap` oracle.
738    /// This is the in-module sanity check; the full 1M-op oracle
739    /// lives in `tests/btree_oracle.rs` (issue #29).
740    #[test]
741    fn insert_oracle_10k() {
742        for seed in 0..3u64 {
743            run_insert_oracle(seed, 10_000);
744        }
745    }
746
747    fn run_insert_oracle(seed: u64, ops: usize) {
748        let mut rng = ChaCha8Rng::seed_from_u64(seed);
749        let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
750        let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
751        let mut oracle: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
752        for op in 0..ops {
753            let key = random_key(&mut rng);
754            let value = random_value(&mut rng);
755            let key_already = oracle.contains_key(&key);
756            let res = tree.insert(&mut pager, &key, &value);
757            if key_already {
758                assert!(
759                    matches!(res, Err(Error::BTreeKeyExists)),
760                    "seed {seed} op {op}: expected BTreeKeyExists, got {res:?}"
761                );
762            } else {
763                res.unwrap_or_else(|e| panic!("seed {seed} op {op}: insert err {e:?}"));
764                oracle.insert(key.clone(), value.clone());
765            }
766            if op.is_multiple_of(127) {
767                // Spot-check a few random oracle keys.
768                let keys: Vec<&Vec<u8>> = oracle.keys().collect();
769                if !keys.is_empty() {
770                    let sample: Vec<&Vec<u8>> =
771                        keys.choose_multiple(&mut rng, 4).copied().collect();
772                    for k in sample {
773                        assert_eq!(
774                            tree.get(&mut pager, k).expect("get").as_ref(),
775                            oracle.get(k),
776                            "seed {seed} op {op}: key {k:?}"
777                        );
778                    }
779                }
780            }
781        }
782        // Final pass: every oracle key resolves correctly.
783        for (k, v) in &oracle {
784            assert_eq!(
785                tree.get(&mut pager, k).expect("get").as_ref(),
786                Some(v),
787                "seed {seed} final: key {k:?}"
788            );
789        }
790    }
791
792    fn random_key(rng: &mut ChaCha8Rng) -> Vec<u8> {
793        use rand::Rng;
794        let len = rng.random_range(1..16);
795        (0..len).map(|_| rng.random_range(b'a'..=b'z')).collect()
796    }
797
798    fn random_value(rng: &mut ChaCha8Rng) -> Vec<u8> {
799        use rand::Rng;
800        let len = rng.random_range(0..64);
801        (0..len).map(|_| rng.random()).collect()
802    }
803
804    // ---- issue #137: byte-aware split fuzz / oracle ----
805
806    use crate::btree::node::{validate_node, NODE_HEADER_SIZE};
807
808    /// Payload budget every encoded node must respect. Mirrors the
809    /// `PAYLOAD_BYTES` const used by the insert path so the test holds
810    /// the implementation to its own contract.
811    const TEST_PAYLOAD_BYTES: usize = PAGE_SIZE - PAGE_TRAILER_SIZE - NODE_HEADER_SIZE;
812
813    /// A random key in `[1, max_key_len()]`, heavily biased toward
814    /// small keys but reaching the format cap a fraction of the time
815    /// so large index-key entries exercise the internal-node split.
816    fn fuzz_key(rng: &mut ChaCha8Rng) -> Vec<u8> {
817        use rand::Rng;
818        let cap = max_key_len();
819        let len = match rng.random_range(0u32..100) {
820            0..=84 => rng.random_range(1..=16usize),
821            85..=96 => rng.random_range(16..=256usize),
822            _ => rng.random_range(256..=cap),
823        };
824        let len = len.clamp(1, cap);
825        (0..len).map(|_| rng.random::<u8>()).collect()
826    }
827
828    /// A random value whose length spans `[0, max_inline_value(key)]`,
829    /// heavily sampling LARGE values (hundreds–thousands of bytes) so
830    /// a populated leaf is repeatedly hit with an entry big enough to
831    /// force a byte-aware (not count) split.
832    fn fuzz_value(rng: &mut ChaCha8Rng, key_len: usize) -> Vec<u8> {
833        use rand::Rng;
834        let cap = max_inline_value(key_len);
835        if cap == 0 {
836            return Vec::new();
837        }
838        let len = match rng.random_range(0u32..100) {
839            0..=49 => rng.random_range(0..=64usize),
840            50..=79 => rng.random_range(64..=512usize),
841            _ => rng.random_range(512..=cap.max(512)),
842        };
843        let len = len.min(cap);
844        (0..len).map(|_| rng.random::<u8>()).collect()
845    }
846
847    /// Walk every node in the tree (explicit stack, no recursion —
848    /// Rule 1; bounded by `page_count` — Rule 2) and assert the split
849    /// invariant on each: it decodes (which re-runs
850    /// `validate_node_release`), passes `validate_node`, and its
851    /// `occupied_bytes()` never exceeds the per-node payload budget.
852    fn assert_tree_invariants(pager: &mut Pager<FileHandle>, root: PageId) {
853        let mut stack: Vec<PageId> = vec![root];
854        let budget = usize::try_from(pager.page_count())
855            .unwrap_or(usize::MAX)
856            .saturating_add(1);
857        let mut visited = 0usize;
858        while let Some(id) = stack.pop() {
859            visited += 1;
860            assert!(visited <= budget, "node walk exceeded page-count bound");
861            let node = {
862                let pr = pager.read_page(id).expect("read node");
863                decode_node(pr.as_bytes()).expect("decode node")
864            };
865            validate_node(&node).expect("node must satisfy validate_node");
866            assert!(
867                node.occupied_bytes() <= TEST_PAYLOAD_BYTES,
868                "node {id:?} occupies {} bytes > payload cap {} — a split \
869                 left an overflowing node (issue #137 regression)",
870                node.occupied_bytes(),
871                TEST_PAYLOAD_BYTES,
872            );
873            if matches!(node.kind, NodeKind::Internal) {
874                for &child in &node.children {
875                    stack.push(PageId::new(child).expect("child id nonzero"));
876                }
877            }
878        }
879    }
880
881    /// Drive `ops` random inserts of RANDOM-sized keys and values
882    /// (heavily sampling large values) against a `BTreeMap` oracle.
883    /// After EACH op the only tolerated error is a documented size
884    /// guard (which `fuzz_key`/`fuzz_value` never trip); any
885    /// `BTreeInvariantViolated` (the "slot dir and heap collide"
886    /// class) fails the test. At the end every oracle key must
887    /// round-trip with its EXACT value, an ordered range scan must
888    /// match the oracle's order, and every page must satisfy the
889    /// split invariant.
890    fn run_split_oracle(seed: u64, ops: usize) {
891        let mut rng = ChaCha8Rng::seed_from_u64(seed);
892        let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
893        let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
894        let mut oracle: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
895        for op in 0..ops {
896            let key = fuzz_key(&mut rng);
897            let value = fuzz_value(&mut rng, key.len());
898            let already = oracle.contains_key(&key);
899            let res = tree.insert(&mut pager, &key, &value);
900            match res {
901                Ok(()) => {
902                    assert!(!already, "seed {seed} op {op}: dup insert unexpectedly Ok");
903                    oracle.insert(key, value);
904                }
905                Err(Error::BTreeKeyExists) => {
906                    assert!(already, "seed {seed} op {op}: spurious BTreeKeyExists");
907                }
908                Err(e) => panic!(
909                    "seed {seed} op {op}: unexpected insert error {e:?} \
910                     (key_len={}, value_len={})",
911                    key.len(),
912                    value.len(),
913                ),
914            }
915            if op.is_multiple_of(521) {
916                assert_tree_invariants(&mut pager, tree.root());
917            }
918        }
919        // Full round-trip vs the oracle: every key present with its
920        // exact value.
921        for (k, v) in &oracle {
922            assert_eq!(
923                tree.get(&mut pager, k).expect("get").as_ref(),
924                Some(v),
925                "seed {seed} final: key {k:?} value mismatch",
926            );
927        }
928        // Ordered range scan matches the oracle's iteration order.
929        let scanned: Vec<(Vec<u8>, Vec<u8>)> = tree
930            .range(&mut pager, ..)
931            .expect("range")
932            .map(|r| r.expect("range item"))
933            .collect();
934        let expected: Vec<(Vec<u8>, Vec<u8>)> =
935            oracle.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
936        assert_eq!(
937            scanned.len(),
938            expected.len(),
939            "seed {seed}: range scan count disagrees with oracle",
940        );
941        assert_eq!(
942            scanned, expected,
943            "seed {seed}: ordered range scan mismatch"
944        );
945        assert_tree_invariants(&mut pager, tree.root());
946    }
947
948    proptest! {
949        #![proptest_config(ProptestConfig {
950            cases: 24,
951            max_shrink_iters: 16,
952            .. ProptestConfig::default()
953        })]
954
955        /// Many seeds × 600 large-value inserts each. The shrink-free
956        /// 10k×many-seeds run lives in `split_oracle_large_values_10k`.
957        #[test]
958        fn split_oracle_property(seed in any::<u64>()) {
959            run_split_oracle(seed, 600);
960        }
961    }
962
963    /// ≥10k random ops across several seeds with random key+value
964    /// sizes (heavy on large values). Pre-fix this trips
965    /// `BTreeInvariantViolated { "leaf encode: slot dir and heap
966    /// collide" }`; post-fix it is green with zero invariant
967    /// violations and zero oracle mismatches.
968    #[test]
969    fn split_oracle_large_values_10k() {
970        for seed in 0..5u64 {
971            run_split_oracle(seed, 10_000);
972        }
973    }
974
975    /// Deterministic reproduction of issue #137: populate a leaf with
976    /// ~30 small entries, then insert one ~1.8 KB-record entry that
977    /// (pre-fix) lands so a count-midpoint split leaves an
978    /// overflowing half. Post-fix every insert succeeds and the tree
979    /// satisfies the split invariant.
980    #[test]
981    fn issue_137_large_entry_into_populated_leaf() {
982        let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
983        let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
984        // ~30 small entries to populate (and nearly fill) the leaf.
985        for i in 0..30u32 {
986            let key = format!("k{i:04}");
987            let value = vec![0x5Au8; 8];
988            tree.insert(&mut pager, key.as_bytes(), &value)
989                .expect("small insert");
990        }
991        // One large entry whose record is ~1.8 KB. Pre-fix this is the
992        // exact case that produced "slot dir and heap collide".
993        let big_key = vec![b'm'; 16];
994        let big_value = vec![0xA5u8; 1_800];
995        tree.insert(&mut pager, &big_key, &big_value)
996            .expect("large insert must succeed (issue #137)");
997        // A second, near-max large entry to stress the multi-way path.
998        let big_key2 = vec![b'n'; 16];
999        let big_value2 = vec![0xC3u8; max_inline_value(big_key2.len())];
1000        tree.insert(&mut pager, &big_key2, &big_value2)
1001            .expect("near-max large insert must succeed");
1002        assert_eq!(
1003            tree.get(&mut pager, &big_key).expect("get"),
1004            Some(big_value)
1005        );
1006        assert_eq!(
1007            tree.get(&mut pager, &big_key2).expect("get"),
1008            Some(big_value2)
1009        );
1010        for i in 0..30u32 {
1011            let key = format!("k{i:04}");
1012            assert_eq!(
1013                tree.get(&mut pager, key.as_bytes()).expect("get"),
1014                Some(vec![0x5Au8; 8]),
1015                "small key {key} must survive the split",
1016            );
1017        }
1018        assert_tree_invariants(&mut pager, tree.root());
1019    }
1020
1021    /// Pathological size distribution the issue called out: a leaf
1022    /// near-full of small entries with a large entry landing in the
1023    /// MIDDLE, such that a naive 2-way split would leave BOTH halves
1024    /// overflowing. The multi-way split must still emit only
1025    /// payload-bounded nodes.
1026    #[test]
1027    fn issue_137_large_entry_in_the_middle_multiway() {
1028        let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
1029        let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
1030        // Fill a leaf with medium entries so total occupancy is high.
1031        for i in 0..20u32 {
1032            let key = format!("a{i:03}");
1033            let value = vec![0x11u8; 180];
1034            tree.insert(&mut pager, key.as_bytes(), &value)
1035                .expect("fill");
1036        }
1037        // Insert a large entry whose key sorts into the MIDDLE of the
1038        // existing run (between "a009" and "a010").
1039        let mid_key = b"a0095".to_vec();
1040        let mid_value = vec![0x22u8; max_inline_value(mid_key.len())];
1041        tree.insert(&mut pager, &mid_key, &mid_value)
1042            .expect("mid large insert must succeed");
1043        assert_eq!(
1044            tree.get(&mut pager, &mid_key).expect("get"),
1045            Some(mid_value)
1046        );
1047        assert_tree_invariants(&mut pager, tree.root());
1048    }
1049
1050    /// Insert-then-delete mixed workload of random large/small sizes,
1051    /// then an integrity walk: every page satisfies the split
1052    /// invariant and `validate_node`, and every surviving key
1053    /// round-trips. Guards the interaction between the byte-aware
1054    /// split and the delete-rebalance path.
1055    #[test]
1056    fn split_then_delete_integrity_walk() {
1057        use rand::Rng;
1058        for seed in 0..4u64 {
1059            let mut rng = ChaCha8Rng::seed_from_u64(seed);
1060            let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
1061            let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
1062            let mut oracle: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
1063            for _ in 0..6_000usize {
1064                let deleting = !oracle.is_empty() && rng.random_range(0u32..3) == 0;
1065                if deleting {
1066                    let n = oracle.len();
1067                    let i = rng.random_range(0..n);
1068                    let cand = oracle.keys().nth(i).cloned().unwrap_or_default();
1069                    let want = oracle.remove(&cand).is_some();
1070                    let got = tree.delete(&mut pager, &cand).expect("delete");
1071                    assert_eq!(got, want, "seed {seed}: delete presence");
1072                } else {
1073                    let key = fuzz_key(&mut rng);
1074                    let value = fuzz_value(&mut rng, key.len());
1075                    if let std::collections::btree_map::Entry::Vacant(slot) =
1076                        oracle.entry(key.clone())
1077                    {
1078                        tree.insert(&mut pager, &key, &value).expect("insert");
1079                        slot.insert(value);
1080                    }
1081                }
1082            }
1083            for (k, v) in &oracle {
1084                assert_eq!(
1085                    tree.get(&mut pager, k).expect("get").as_ref(),
1086                    Some(v),
1087                    "seed {seed}: surviving key {k:?}",
1088                );
1089            }
1090            assert_tree_invariants(&mut pager, tree.root());
1091        }
1092    }
1093}