Skip to main content

obj_core/btree/
insert.rs

1//! B+tree insert path with copy-on-write splits.
2//!
3//! See `docs/format.md` § B+tree write semantics. Every node touched
4//! along the insert path is rewritten to a freshly-allocated page;
5//! the displaced pages enter the freelist only after the new root is
6//! staged in the pager's WAL transaction buffer. The caller is
7//! responsible for calling `Pager::commit` to make the insert
8//! durable.
9//!
10//! # Power-of-ten posture
11//!
12//! - **Rule 1.** The walk root → leaf uses an explicit
13//!   `heapless::Vec<_, MAX_BTREE_DEPTH>` path stack. The bubble-up
14//!   loop is a `while let Some(...)` over that stack — no
15//!   recursion.
16//! - **Rule 2.** Every loop is bounded: by the path length (≤
17//!   `MAX_BTREE_DEPTH`), by `key_count` (a node's slot count), or
18//!   by `LEAF_SLOT_CAP` / `INTERNAL_SLOT_CAP`.
19//! - **Rule 5.** `validate_node` runs on every encoded node behind
20//!   a `debug_assert!`; release builds surface invariant breaks as
21//!   `Error::BTreeInvariantViolated`.
22
23#![forbid(unsafe_code)]
24
25use heapless::Vec as HeaplessVec;
26
27use crate::btree::node::{
28    decode_node, encode_node, max_inline_value, max_key_len, DecodedNode, InternalEntry, LeafEntry,
29    NodeKind, INTERNAL_LEFTMOST_CHILD_BYTES, INTERNAL_SLOT_BYTES, LEAF_SLOT_BYTES,
30};
31use crate::btree::{BTree, MAX_BTREE_DEPTH};
32use crate::error::{Error, Result};
33use crate::pager::page::{Page, PageId, PAGE_SIZE, PAGE_TRAILER_SIZE};
34use crate::pager::Pager;
35use crate::platform::FileBackend;
36
37/// One step in a descending path: the node's id, its decoded
38/// representation, and the index of the child we followed.
39struct PathFrame {
40    page_id: PageId,
41    node: DecodedNode,
42    /// For internal-node frames: the child index we recursed into.
43    /// Unused for the leaf frame at the bottom of the stack.
44    child_index: usize,
45}
46
47/// Outcome of replacing one node along the insert path: either the
48/// node still fits in a single page (we just COW it), or it had to
49/// split into two siblings with a promoted pivot key.
50enum ReplaceOutcome {
51    /// Node fits in one page; new page-id replaces the old one in
52    /// its parent's child slot.
53    Fits { new_id: PageId },
54    /// Node split. `left_id` is the new left half (same role as the
55    /// pre-insert node), `right_id` is the brand-new right sibling,
56    /// `promoted_key` is the smallest key of the right half (for
57    /// leaves) or the pivot moved up (for internals).
58    Split {
59        left_id: PageId,
60        right_id: PageId,
61        promoted_key: Vec<u8>,
62    },
63}
64
65/// Total bytes available for the slot directory + heap in a single
66/// node (excludes the node header and the page trailer).
67const PAYLOAD_BYTES: usize = PAGE_SIZE - PAGE_TRAILER_SIZE - crate::btree::node::NODE_HEADER_SIZE;
68
69impl<F: FileBackend> BTree<F> {
70    /// Insert `key → value` into the tree. Returns
71    /// [`Error::BTreeKeyExists`] if `key` is already present.
72    ///
73    /// Copy-on-write: every node touched on the path is allocated as
74    /// a fresh page through the pager. The pre-insert pages enter the
75    /// freelist before this function returns, but only after every
76    /// new page has been staged in the WAL transaction. The caller
77    /// must call [`Pager::commit`] to make the insert durable.
78    ///
79    /// # Errors
80    ///
81    /// - [`Error::BTreeKeyTooLarge`] if the key exceeds `PAGE_SIZE / 4`.
82    /// - [`Error::BTreeValueTooLarge`] if the (key, value) pair will
83    ///   not fit inline in a leaf.
84    /// - [`Error::BTreeKeyExists`] if `key` is already present.
85    /// - [`Error::BTreeDepthExceeded`] if the tree height would
86    ///   exceed `MAX_BTREE_DEPTH`.
87    /// - [`Error::Corruption`] / [`Error::Io`] propagated from the
88    ///   pager.
89    pub fn insert(&mut self, pager: &mut Pager<F>, key: &[u8], value: &[u8]) -> Result<()> {
90        check_key_value_size(key, value)?;
91        let path = self.descend_with_path(pager, key)?;
92        self.apply_insert(pager, path, key, value)
93    }
94
95    /// Walk root → leaf, recording each ancestor's page-id, decoded
96    /// representation, and the child index that was followed.
97    fn descend_with_path(
98        &self,
99        pager: &mut Pager<F>,
100        key: &[u8],
101    ) -> Result<HeaplessVec<PathFrame, MAX_BTREE_DEPTH>> {
102        let mut path: HeaplessVec<PathFrame, MAX_BTREE_DEPTH> = HeaplessVec::new();
103        let mut current = self.root;
104        loop {
105            let decoded = {
106                let page_ref = pager.read_page(current)?;
107                decode_node(page_ref.as_bytes())?
108            };
109            match decoded.kind {
110                NodeKind::Leaf => {
111                    let frame = PathFrame {
112                        page_id: current,
113                        node: decoded,
114                        child_index: 0,
115                    };
116                    if path.push(frame).is_err() {
117                        return Err(Error::BTreeDepthExceeded {
118                            limit: MAX_BTREE_DEPTH,
119                        });
120                    }
121                    return Ok(path);
122                }
123                NodeKind::Internal => {
124                    let child_index = pivot_index(&decoded, key);
125                    let raw = decoded.children[child_index];
126                    let next = PageId::new(raw).ok_or(Error::BTreeInvariantViolated {
127                        reason: "internal node had zero child page-id",
128                    })?;
129                    let frame = PathFrame {
130                        page_id: current,
131                        node: decoded,
132                        child_index,
133                    };
134                    if path.push(frame).is_err() {
135                        return Err(Error::BTreeDepthExceeded {
136                            limit: MAX_BTREE_DEPTH,
137                        });
138                    }
139                    current = next;
140                }
141            }
142        }
143    }
144
145    /// Apply the (key, value) insertion to the leaf at the bottom of
146    /// the path, then bubble any split up to the root.
147    fn apply_insert(
148        &mut self,
149        pager: &mut Pager<F>,
150        mut path: HeaplessVec<PathFrame, MAX_BTREE_DEPTH>,
151        key: &[u8],
152        value: &[u8],
153    ) -> Result<()> {
154        let mut freed: HeaplessVec<PageId, { MAX_BTREE_DEPTH * 2 }> = HeaplessVec::new();
155        let Some(leaf_frame) = path.pop() else {
156            return Err(Error::BTreeInvariantViolated {
157                reason: "insert: descend returned empty path",
158            });
159        };
160        let mut outcome = replace_leaf(pager, leaf_frame, key, value, &mut freed)?;
161        while let Some(parent_frame) = path.pop() {
162            outcome = replace_internal(pager, parent_frame, outcome, &mut freed)?;
163        }
164        let new_root = build_new_root(pager, outcome)?;
165        self.root = new_root;
166        // Free displaced pages AFTER every new page is staged in the
167        // WAL transaction buffer. This is the COW contract: pre-
168        // insert pages must remain readable through the previous
169        // root until the new root is committed.
170        for old_id in freed.iter().copied() {
171            pager.free_page(old_id)?;
172        }
173        Ok(())
174    }
175}
176
177fn replace_leaf<F: FileBackend>(
178    pager: &mut Pager<F>,
179    frame: PathFrame,
180    key: &[u8],
181    value: &[u8],
182    freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 2 }>,
183) -> Result<ReplaceOutcome> {
184    let mut leaf = frame.node;
185    if leaf.leaves.iter().any(|e| e.key.as_slice() == key) {
186        return Err(Error::BTreeKeyExists);
187    }
188    let insert_at = leaf
189        .leaves
190        .iter()
191        .position(|e| e.key.as_slice() > key)
192        .unwrap_or(leaf.leaves.len());
193    leaf.leaves.insert(
194        insert_at,
195        LeafEntry {
196            key: key.to_vec(),
197            value: value.to_vec(),
198        },
199    );
200    push_freed(freed, frame.page_id)?;
201    if leaf.occupied_bytes() <= PAYLOAD_BYTES {
202        let new_id = write_new_node(pager, &leaf)?;
203        return Ok(ReplaceOutcome::Fits { new_id });
204    }
205    split_leaf(pager, leaf)
206}
207
208fn replace_internal<F: FileBackend>(
209    pager: &mut Pager<F>,
210    frame: PathFrame,
211    child_outcome: ReplaceOutcome,
212    freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 2 }>,
213) -> Result<ReplaceOutcome> {
214    let mut internal = frame.node;
215    let idx = frame.child_index;
216    match child_outcome {
217        ReplaceOutcome::Fits { new_id } => {
218            internal.children[idx] = new_id.get();
219        }
220        ReplaceOutcome::Split {
221            left_id,
222            right_id,
223            promoted_key,
224        } => {
225            internal.children[idx] = left_id.get();
226            internal
227                .internals
228                .insert(idx, InternalEntry { key: promoted_key });
229            internal.children.insert(idx + 1, right_id.get());
230        }
231    }
232    push_freed(freed, frame.page_id)?;
233    if internal.occupied_bytes() <= PAYLOAD_BYTES {
234        let new_id = write_new_node(pager, &internal)?;
235        return Ok(ReplaceOutcome::Fits { new_id });
236    }
237    split_internal(pager, internal)
238}
239
240/// Build the new root from the outcome of `apply_insert`'s final
241/// bubble. If no split, the root is just the new id; if a split, we
242/// allocate a fresh internal node holding (left, promoted, right).
243fn build_new_root<F: FileBackend>(pager: &mut Pager<F>, outcome: ReplaceOutcome) -> Result<PageId> {
244    let (left_id, right_id, promoted_key) = match outcome {
245        ReplaceOutcome::Fits { new_id } => return Ok(new_id),
246        ReplaceOutcome::Split {
247            left_id,
248            right_id,
249            promoted_key,
250        } => (left_id, right_id, promoted_key),
251    };
252    let level = node_level_after_split(pager, left_id)?;
253    let next_level = level.checked_add(1).ok_or(Error::BTreeDepthExceeded {
254        limit: MAX_BTREE_DEPTH,
255    })?;
256    let root_node = DecodedNode {
257        kind: NodeKind::Internal,
258        level: next_level,
259        next_sibling: 0,
260        children: vec![left_id.get(), right_id.get()],
261        leaves: Vec::new(),
262        internals: vec![InternalEntry { key: promoted_key }],
263    };
264    write_new_node(pager, &root_node)
265}
266
267fn push_freed(freed: &mut HeaplessVec<PageId, { MAX_BTREE_DEPTH * 2 }>, id: PageId) -> Result<()> {
268    freed.push(id).map_err(|_| Error::BTreeInvariantViolated {
269        reason: "insert: too many displaced pages to track",
270    })
271}
272
273/// Pick the child index in `node` whose subtree contains `key`.
274fn pivot_index(node: &DecodedNode, key: &[u8]) -> usize {
275    let mut idx = node.internals.len();
276    for (i, pivot) in node.internals.iter().enumerate() {
277        if pivot.key.as_slice() > key {
278            idx = i;
279            break;
280        }
281    }
282    idx
283}
284
285/// Validate that `key` / `value` fit the format-version-0 bounds.
286fn check_key_value_size(key: &[u8], value: &[u8]) -> Result<()> {
287    if key.len() > max_key_len() {
288        return Err(Error::BTreeKeyTooLarge {
289            key_len: key.len(),
290            max: max_key_len(),
291        });
292    }
293    let v_max = max_inline_value(key.len());
294    if value.len() > v_max {
295        return Err(Error::BTreeValueTooLarge {
296            value_len: value.len(),
297            max: v_max,
298        });
299    }
300    Ok(())
301}
302
303/// Encode `node` into a fresh page and write it through the pager.
304/// Returns the newly-allocated `PageId`.
305pub(crate) fn write_new_node<F: FileBackend>(
306    pager: &mut Pager<F>,
307    node: &DecodedNode,
308) -> Result<PageId> {
309    let new_id = pager.alloc_page()?;
310    let mut page = Page::zeroed();
311    encode_node(node, &mut page)?;
312    pager.write_page(new_id, &page)?;
313    Ok(new_id)
314}
315
316/// Split an overflowing leaf into two. Allocates new ids for both
317/// halves; the right half's `next_sibling` is inherited, the left
318/// half points at the new right.
319///
320/// Takes the leaf by value because the function relocates the
321/// internal `Vec`s into two new nodes (no allocation of a clone).
322fn split_leaf<F: FileBackend>(
323    pager: &mut Pager<F>,
324    mut leaf: DecodedNode,
325) -> Result<ReplaceOutcome> {
326    let mid = leaf.leaves.len() / 2;
327    let original_sibling = leaf.next_sibling;
328    // Split off the right half by draining; the left half stays in
329    // `leaf.leaves`. `Vec::split_off` returns a new `Vec` and
330    // truncates the receiver — no clone, no extra alloc.
331    let right_entries: Vec<LeafEntry> = leaf.leaves.split_off(mid);
332    let promoted_key = right_entries[0].key.clone();
333    let right_node = DecodedNode {
334        kind: NodeKind::Leaf,
335        level: 0,
336        next_sibling: original_sibling,
337        children: Vec::new(),
338        leaves: right_entries,
339        internals: Vec::new(),
340    };
341    let right_id = write_new_node(pager, &right_node)?;
342    let left_node = DecodedNode {
343        kind: NodeKind::Leaf,
344        level: 0,
345        next_sibling: right_id.get(),
346        children: Vec::new(),
347        leaves: leaf.leaves,
348        internals: Vec::new(),
349    };
350    let left_id = write_new_node(pager, &left_node)?;
351    Ok(ReplaceOutcome::Split {
352        left_id,
353        right_id,
354        promoted_key,
355    })
356}
357
358/// Split an overflowing internal node. The pivot at the split point
359/// moves *up* (not duplicated, unlike leaves); each half keeps
360/// `mid` / `K - mid - 1` pivots respectively.
361fn split_internal<F: FileBackend>(
362    pager: &mut Pager<F>,
363    mut internal: DecodedNode,
364) -> Result<ReplaceOutcome> {
365    let k = internal.internals.len();
366    debug_assert!(k >= 2, "internal split needs ≥ 2 pivots");
367    let mid = k / 2;
368    // Right half: pivots [mid+1..K) and children [mid+1..K+1).
369    // The pivot at `mid` is promoted (not retained in either half).
370    let right_pivots: Vec<InternalEntry> = internal.internals.split_off(mid + 1);
371    let right_children: Vec<u64> = internal.children.split_off(mid + 1);
372    // Remove the promoted pivot from the left half (now at the end of
373    // the truncated `internals` vec).
374    let promoted_pivot = internal
375        .internals
376        .pop()
377        .ok_or(Error::BTreeInvariantViolated {
378            reason: "internal split: missing promoted pivot",
379        })?;
380    let level = internal.level;
381    let right_node = DecodedNode {
382        kind: NodeKind::Internal,
383        level,
384        next_sibling: 0,
385        children: right_children,
386        leaves: Vec::new(),
387        internals: right_pivots,
388    };
389    let right_id = write_new_node(pager, &right_node)?;
390    let left_node = DecodedNode {
391        kind: NodeKind::Internal,
392        level,
393        next_sibling: 0,
394        children: internal.children,
395        leaves: Vec::new(),
396        internals: internal.internals,
397    };
398    let left_id = write_new_node(pager, &left_node)?;
399    Ok(ReplaceOutcome::Split {
400        left_id,
401        right_id,
402        promoted_key: promoted_pivot.key,
403    })
404}
405
406/// Determine the level of a node that just emerged from a split. We
407/// read the left half's page back through the pager (cheap — it was
408/// just staged in the WAL view) to learn its level.
409fn node_level_after_split<F: FileBackend>(pager: &mut Pager<F>, id: PageId) -> Result<u8> {
410    let page_ref = pager.read_page(id)?;
411    let decoded = decode_node(page_ref.as_bytes())?;
412    Ok(decoded.level)
413}
414
415// `_PAYLOAD_BYTES` reference to keep the linker happy if the const
416// is not otherwise visible to insert callers; used in
417// `occupied_bytes() <= PAYLOAD_BYTES`.
418const _: usize = PAYLOAD_BYTES;
419
420/// Force-link the unused private slot/header byte constants so any
421/// future refactor that drops them gets a hard error here rather
422/// than silently changing the encode layout.
423const _UNUSED_CHECKS: () = {
424    let _ = INTERNAL_LEFTMOST_CHILD_BYTES;
425    let _ = INTERNAL_SLOT_BYTES;
426    let _ = LEAF_SLOT_BYTES;
427};
428
429#[cfg(test)]
430mod tests {
431    use super::*;
432    use crate::pager::{Config, Pager};
433    use crate::platform::FileHandle;
434
435    use proptest::prelude::*;
436    use rand::prelude::IndexedRandom;
437    use rand::SeedableRng;
438    use rand_chacha::ChaCha8Rng;
439    use std::collections::BTreeMap;
440
441    fn config() -> Config {
442        Config::default()
443    }
444
445    #[test]
446    fn insert_single_key_round_trip() {
447        let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
448        let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
449        tree.insert(&mut pager, b"hello", b"world").expect("ins");
450        assert_eq!(
451            tree.get(&mut pager, b"hello").expect("get"),
452            Some(b"world".to_vec())
453        );
454    }
455
456    #[test]
457    fn duplicate_key_errors() {
458        let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
459        let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
460        tree.insert(&mut pager, b"k", b"v1").expect("ins");
461        let err = tree
462            .insert(&mut pager, b"k", b"v2")
463            .expect_err("dup must fail");
464        assert!(matches!(err, Error::BTreeKeyExists));
465    }
466
467    #[test]
468    fn insert_growth_splits_root() {
469        let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
470        let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
471        // 256-byte values guarantee splits after ~15 inserts.
472        let value = vec![0xABu8; 256];
473        for i in 0..200u32 {
474            let key = format!("key-{i:08}");
475            tree.insert(&mut pager, key.as_bytes(), &value)
476                .expect("ins");
477        }
478        for i in 0..200u32 {
479            let key = format!("key-{i:08}");
480            assert_eq!(
481                tree.get(&mut pager, key.as_bytes()).expect("get"),
482                Some(value.clone()),
483                "key {key}"
484            );
485        }
486        // Tree should have at least 2 levels now.
487        let root = tree.root();
488        let page_ref = pager.read_page(root).expect("read root");
489        let decoded = decode_node(page_ref.as_bytes()).expect("decode root");
490        assert!(
491            decoded.level >= 1,
492            "expected internal root, got {decoded:?}"
493        );
494    }
495
496    proptest! {
497        #![proptest_config(ProptestConfig {
498            cases: 16,
499            max_shrink_iters: 32,
500            .. ProptestConfig::default()
501        })]
502
503        #[test]
504        fn insert_oracle_property(seed in any::<u64>()) {
505            run_insert_oracle(seed, 200);
506        }
507    }
508
509    /// Run 10k random insert operations against a `BTreeMap` oracle.
510    /// This is the in-module sanity check; the full 1M-op oracle
511    /// lives in `tests/btree_oracle.rs` (issue #29).
512    #[test]
513    fn insert_oracle_10k() {
514        for seed in 0..3u64 {
515            run_insert_oracle(seed, 10_000);
516        }
517    }
518
519    fn run_insert_oracle(seed: u64, ops: usize) {
520        let mut rng = ChaCha8Rng::seed_from_u64(seed);
521        let mut pager = Pager::<FileHandle>::memory(config()).expect("pager");
522        let mut tree = BTree::<FileHandle>::empty(&mut pager).expect("empty");
523        let mut oracle: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
524        for op in 0..ops {
525            let key = random_key(&mut rng);
526            let value = random_value(&mut rng);
527            let key_already = oracle.contains_key(&key);
528            let res = tree.insert(&mut pager, &key, &value);
529            if key_already {
530                assert!(
531                    matches!(res, Err(Error::BTreeKeyExists)),
532                    "seed {seed} op {op}: expected BTreeKeyExists, got {res:?}"
533                );
534            } else {
535                res.unwrap_or_else(|e| panic!("seed {seed} op {op}: insert err {e:?}"));
536                oracle.insert(key.clone(), value.clone());
537            }
538            if op.is_multiple_of(127) {
539                // Spot-check a few random oracle keys.
540                let keys: Vec<&Vec<u8>> = oracle.keys().collect();
541                if !keys.is_empty() {
542                    let sample: Vec<&Vec<u8>> =
543                        keys.choose_multiple(&mut rng, 4).copied().collect();
544                    for k in sample {
545                        assert_eq!(
546                            tree.get(&mut pager, k).expect("get").as_ref(),
547                            oracle.get(k),
548                            "seed {seed} op {op}: key {k:?}"
549                        );
550                    }
551                }
552            }
553        }
554        // Final pass: every oracle key resolves correctly.
555        for (k, v) in &oracle {
556            assert_eq!(
557                tree.get(&mut pager, k).expect("get").as_ref(),
558                Some(v),
559                "seed {seed} final: key {k:?}"
560            );
561        }
562    }
563
564    fn random_key(rng: &mut ChaCha8Rng) -> Vec<u8> {
565        use rand::Rng;
566        let len = rng.random_range(1..16);
567        (0..len).map(|_| rng.random_range(b'a'..=b'z')).collect()
568    }
569
570    fn random_value(rng: &mut ChaCha8Rng) -> Vec<u8> {
571        use rand::Rng;
572        let len = rng.random_range(0..64);
573        (0..len).map(|_| rng.random()).collect()
574    }
575}