Skip to main content

reddb_server/storage/engine/btree/
impl.rs

1use super::*;
2use crate::storage::engine::overflow::OverflowChain;
3
4impl BTree {
5    /// If `stored` is a pointer cell (slice E layout), walk the overflow
6    /// chain anchored at it and return every page to the free list.
7    /// No-op for inline cells (raw or compressed) and for empty cells.
8    ///
9    /// Used by the B-tree paths that drop or overwrite a pointer cell —
10    /// `delete` and the shrinking branches of `upsert` /
11    /// `upsert_batch_sorted` (slice F of PRD #662). At the B-tree layer
12    /// this is synchronous: a higher transactional layer that needs
13    /// MVCC-aware reclamation must defer freeing through its own
14    /// snapshot machinery rather than calling these primitives during a
15    /// rollback-capable window.
16    fn free_chain_if_pointer(&self, stored: &[u8]) -> BTreeResult<()> {
17        if let Some(head) = value_layout::pointer_head(stored) {
18            OverflowChain::new(&self.pager)
19                .free(head)
20                .map_err(value_layout::ValueLayoutError::from)?;
21        }
22        Ok(())
23    }
24    /// Create a new B+ tree using the given pager
25    pub fn new(pager: Arc<Pager>) -> Self {
26        Self {
27            pager,
28            root_page_id: RwLock::new(0),
29            rightmost_leaf: RwLock::new(None),
30        }
31    }
32
33    /// Create a B+ tree with an existing root
34    pub fn with_root(pager: Arc<Pager>, root_page_id: u32) -> Self {
35        Self {
36            pager,
37            root_page_id: RwLock::new(root_page_id),
38            rightmost_leaf: RwLock::new(None),
39        }
40    }
41
42    /// Get the root page ID
43    pub fn root_page_id(&self) -> u32 {
44        *self
45            .root_page_id
46            .read()
47            .unwrap_or_else(|poisoned| poisoned.into_inner())
48    }
49
50    /// Check if tree is empty
51    pub fn is_empty(&self) -> bool {
52        self.root_page_id() == 0
53    }
54
55    /// Get value for a key
56    pub fn get(&self, key: &[u8]) -> BTreeResult<Option<Vec<u8>>> {
57        let root_id = self.root_page_id();
58        if root_id == 0 {
59            return Ok(None);
60        }
61
62        // Descend from root to the target leaf
63        let (leaf_id, _) = self.find_leaf(root_id, key)?;
64
65        // D3 — Lehman-Yao right-link traversal.
66        //
67        // A concurrent insert may have split the leaf between our descent
68        // and this read. If the key we're searching for is greater than the
69        // current page's high_key (max stored key), the key lives on a
70        // right-sibling page — follow the right-link instead of re-descending
71        // from the root. Cap at MAX_RIGHTLINK_HOPS to guard against corrupt
72        // link chains.
73        const MAX_RIGHTLINK_HOPS: usize = 32;
74        let mut current_id = leaf_id;
75        for _ in 0..MAX_RIGHTLINK_HOPS {
76            let page = self.pager.read_page(current_id)?;
77
78            match search_leaf(&page, key)? {
79                SearchResult::Found(pos) => {
80                    let (_, stored) = read_leaf_cell(&page, pos)?;
81                    let value = value_layout::decode(&self.pager, &stored)?;
82                    return Ok(Some(value));
83                }
84                SearchResult::NotFound(_) => {
85                    // Check if a split pushed our key to the right sibling
86                    let right = leaf_right_sibling(&page);
87                    if right != 0 {
88                        if let Some(high_key) = leaf_high_key(&page)? {
89                            if key > high_key.as_slice() {
90                                // Key belongs on a right sibling — follow link
91                                current_id = right;
92                                continue;
93                            }
94                        }
95                    }
96                    // Key genuinely not present
97                    return Ok(None);
98                }
99            }
100        }
101
102        // Exhausted hop limit — fall back to root descent (corrupt link chain)
103        let (leaf_id, _) = self.find_leaf(self.root_page_id(), key)?;
104        let page = self.pager.read_page(leaf_id)?;
105        match search_leaf(&page, key)? {
106            SearchResult::Found(pos) => {
107                let (_, stored) = read_leaf_cell(&page, pos)?;
108                let value = value_layout::decode(&self.pager, &stored)?;
109                Ok(Some(value))
110            }
111            SearchResult::NotFound(_) => Ok(None),
112        }
113    }
114
115    /// Insert a key-value pair.
116    ///
117    /// Applies the value-layout spill ladder before touching the tree:
118    /// the 256 MB ceiling rejects here without any allocation; values
119    /// at or below `OVERFLOW_THRESHOLD` go inline raw; larger values may
120    /// inline compressed or spill into an [`crate::storage::engine::overflow::OverflowChain`].
121    /// The encoded bytes are then committed through [`Self::insert_encoded`].
122    pub fn insert(&self, key: &[u8], value: &[u8]) -> BTreeResult<()> {
123        if key.len() > MAX_KEY_SIZE {
124            return Err(BTreeError::KeyTooLarge(key.len()));
125        }
126        let stored = value_layout::encode(&self.pager, value)?;
127        self.insert_encoded(key, &stored)
128    }
129
130    /// Insert a key with bytes already produced by [`value_layout::encode`].
131    ///
132    /// Internal entry point used by `bulk_insert_sorted` so its fast
133    /// path can pre-encode the whole batch (eagerly allocating overflow
134    /// pages, surfacing oversize rows up front) and still re-use the
135    /// generic insert machinery for fallback. Public callers should go
136    /// through [`Self::insert`] which encodes for them.
137    pub(crate) fn insert_encoded(&self, key: &[u8], value: &[u8]) -> BTreeResult<()> {
138        let root_id = self.root_page_id();
139
140        // Empty tree - create root leaf
141        if root_id == 0 {
142            let mut page = self.pager.allocate_page(PageType::BTreeLeaf)?;
143            clear_leaf_cells(&mut page);
144            insert_into_leaf(&mut page, key, value)?;
145            init_leaf_links(&mut page, 0, 0);
146            page.update_checksum();
147            let new_root = page.page_id();
148            self.pager.write_page(new_root, page)?;
149            *self.root_page_id.write().map_err(|e| {
150                BTreeError::LockPoisoned(format!("insert: root_page_id write lock: {e}"))
151            })? = new_root;
152            return Ok(());
153        }
154
155        // D2 fastpath: if key > cached rightmost high_key, go directly to
156        // the cached leaf without descending from the root. This cuts
157        // tree descent entirely for monotonic append workloads (timeseries,
158        // auto-increment entity IDs).
159        let cached = {
160            self.rightmost_leaf
161                .read()
162                .map_err(|e| BTreeError::LockPoisoned(format!("rightmost_leaf read: {e}")))?
163                .clone()
164        };
165        if let Some((cached_page_id, ref high_key)) = cached {
166            if key > high_key.as_slice() {
167                let mut page = self.pager.read_page(cached_page_id)?;
168                // Confirm this page is still a valid leaf with a right-link
169                // of 0 (i.e., it IS the rightmost leaf — not mid-tree after
170                // a concurrent split moved the boundary).
171                let right_sibling = leaf_right_sibling(&page);
172                if right_sibling == 0 {
173                    // Still the rightmost leaf. Try fast append.
174                    if can_insert_leaf(&page, key, value) {
175                        insert_into_leaf(&mut page, key, value)?;
176                        // Skip update_checksum here — pager.write_page
177                        // below recomputes it. Saves one CRC32 on 4KB
178                        // per insert on the hot append path.
179                        let page_id = page.page_id();
180                        // Update cached high_key
181                        *self.rightmost_leaf.write().map_err(|e| {
182                            BTreeError::LockPoisoned(format!("rightmost_leaf write: {e}"))
183                        })? = Some((page_id, key.to_vec()));
184                        self.pager.write_page(page_id, page)?;
185                        return Ok(());
186                    }
187                    // Leaf is full — split. After split, invalidate cache
188                    // (the new rightmost leaf is the split result).
189                    let (new_leaf, separator_key) = self.split_leaf(&mut page, key, value)?;
190                    let new_leaf_id = new_leaf.page_id();
191                    page.update_checksum();
192                    let page_id = page.page_id();
193                    self.pager.write_page(page_id, page.clone())?;
194                    // Cache the new rightmost leaf after split
195                    *self.rightmost_leaf.write().map_err(|e| {
196                        BTreeError::LockPoisoned(format!("rightmost_leaf write: {e}"))
197                    })? = Some((new_leaf_id, key.to_vec()));
198                    // Get parent path for the split propagation.
199                    // The left child is the original cached leaf.
200                    let (_, path) = self.find_leaf(root_id, &separator_key)?;
201                    self.insert_into_parent(path, cached_page_id, &separator_key, new_leaf_id)?;
202                    return Ok(());
203                }
204                // right_sibling != 0: cached page is no longer rightmost.
205                // Fall through to the full find_leaf path below and
206                // refresh the cache.
207                *self.rightmost_leaf.write().map_err(|e| {
208                    BTreeError::LockPoisoned(format!("rightmost_leaf write: {e}"))
209                })? = None;
210            }
211        }
212
213        // Find the leaf and path to it
214        let (leaf_id, path) = self.find_leaf(root_id, key)?;
215        let mut page = self.pager.read_page(leaf_id)?;
216
217        // Check for duplicate
218        if let SearchResult::Found(_) = search_leaf(&page, key)? {
219            return Err(BTreeError::DuplicateKey);
220        }
221
222        // Try to insert into leaf
223        if can_insert_leaf(&page, key, value) {
224            insert_into_leaf(&mut page, key, value)?;
225            // Skip update_checksum — pager.write_page recomputes it.
226            let page_id = page.page_id();
227            // If this is the rightmost leaf (right_sibling == 0), cache it.
228            if leaf_right_sibling(&page) == 0 {
229                *self.rightmost_leaf.write().map_err(|e| {
230                    BTreeError::LockPoisoned(format!("rightmost_leaf write: {e}"))
231                })? = Some((page_id, key.to_vec()));
232            }
233            self.pager.write_page(page_id, page)?;
234            return Ok(());
235        }
236
237        // Need to split the leaf
238        let (new_leaf, separator_key) = self.split_leaf(&mut page, key, value)?;
239        let new_leaf_id = new_leaf.page_id();
240        page.update_checksum();
241        let page_id = page.page_id();
242        self.pager.write_page(page_id, page.clone())?;
243        // Cache the new rightmost leaf (split result gets the larger keys)
244        *self
245            .rightmost_leaf
246            .write()
247            .map_err(|e| BTreeError::LockPoisoned(format!("rightmost_leaf write: {e}")))? =
248            Some((new_leaf_id, key.to_vec()));
249
250        // Propagate split up the tree
251        self.insert_into_parent(path, page_id, &separator_key, new_leaf_id)?;
252
253        Ok(())
254    }
255
256    /// Insert or update a key. If the key already exists and the new
257    /// value has the same length as the old, the value bytes are
258    /// overwritten in place — no structural changes, no rebalance,
259    /// one leaf page write.
260    ///
261    /// Falls back to `delete + insert` when the key is missing or the
262    /// new value has a different length. Callers like
263    /// `persist_entities_to_pager` that re-serialize a mutated entity
264    /// with a fixed-width schema almost always hit the fast path,
265    /// eliminating the `BTree::delete + rebalance` cost that previously
266    /// dominated UPDATE workloads (~50% of `bulk_update` CPU).
267    pub fn upsert(&self, key: &[u8], value: &[u8]) -> BTreeResult<()> {
268        let root_id = self.root_page_id();
269        if root_id == 0 {
270            return self.insert(key, value);
271        }
272
273        // Encode the new value once. The in-place fast path compares
274        // encoded lengths so the leaf cell stays in the on-disk format
275        // documented in `value_layout`; without this the in-place write
276        // would scribble raw bytes over a flag-prefixed cell.
277        let new_stored = value_layout::encode(&self.pager, value)?;
278
279        let (leaf_id, _path) = self.find_leaf(root_id, key)?;
280        let mut page = self.pager.read_page(leaf_id)?;
281
282        if let SearchResult::Found(pos) = search_leaf(&page, key)? {
283            let (_, old_value) = read_leaf_cell(&page, pos)?;
284            if new_stored.len() <= old_value.len() {
285                // Shrinking (or same-size) overwrite. If the old cell
286                // owned an overflow chain, free it before the new bytes
287                // overwrite the pointer header — otherwise the chain
288                // is unreachable but still allocated (slice F).
289                self.free_chain_if_pointer(&old_value)?;
290                overwrite_leaf_value(&mut page, pos, &new_stored)?;
291                let page_id = page.page_id();
292                self.pager.write_page(page_id, page)?;
293                return Ok(());
294            }
295            // Grow case: fall through to delete+insert for a proper
296            // reallocation + potential rebalance.
297            let _ = self.delete(key);
298            return self.insert_encoded(key, &new_stored);
299        }
300
301        // Key is new — plain insert path. Reuse the bytes we already
302        // encoded above instead of re-running LZ4 / re-allocating.
303        self.insert_encoded(key, &new_stored)
304    }
305
306    /// Batch upsert for sorted keys landing in a small set of leaves.
307    ///
308    /// Walks to each leaf once, applies every same-or-shrink in-place
309    /// overwrite that belongs there, writes the page once, then moves
310    /// on. Keys that miss or whose new value grows fall back to the
311    /// generic single-key `upsert` path — correctness identical,
312    /// structural re-layout handled there.
313    ///
314    /// Callers pass lex-sorted `(key, value)` pairs. For the entity
315    /// UPDATE path, IDs are big-endian so u64 ordering = lex ordering.
316    pub fn upsert_batch_sorted(&self, items: &[(Vec<u8>, Vec<u8>)]) -> BTreeResult<()> {
317        if items.is_empty() {
318            return Ok(());
319        }
320        let root_id = self.root_page_id();
321        if root_id == 0 {
322            for (k, v) in items {
323                self.upsert(k, v)?;
324            }
325            return Ok(());
326        }
327
328        let mut current_leaf: Option<(u32, Page, bool)> = None; // (leaf_id, page, dirty)
329        for (key, value) in items {
330            // Locate the leaf for this key. If it matches the leaf we
331            // already have in hand, reuse the in-memory page.
332            let (leaf_id, _path) = self.find_leaf(root_id, key)?;
333            if current_leaf.as_ref().map(|(id, _, _)| *id) != Some(leaf_id) {
334                if let Some((id, page, dirty)) = current_leaf.take() {
335                    if dirty {
336                        self.pager.write_page(id, page)?;
337                    }
338                }
339                let page = self.pager.read_page(leaf_id)?;
340                current_leaf = Some((leaf_id, page, false));
341            }
342
343            // Encode the new value through the spill ladder so the
344            // length comparison is between encoded bytes (matching the
345            // on-disk cell format). Compute lazily: only the in-place
346            // branch needs it, and an oversize value gets caught by the
347            // single-key `upsert` fallback below.
348            let applied_in_place = {
349                let (_, page, dirty) = current_leaf.as_mut().expect("set above");
350                if let SearchResult::Found(pos) = search_leaf(page, key)? {
351                    let (_, old_value) = read_leaf_cell(page, pos)?;
352                    match value_layout::encode(&self.pager, value) {
353                        Ok(new_stored) if new_stored.len() <= old_value.len() => {
354                            // Shrinking overwrite: free the old chain
355                            // before the pointer header is scribbled over
356                            // (slice F).
357                            self.free_chain_if_pointer(&old_value)?;
358                            overwrite_leaf_value(page, pos, &new_stored)?;
359                            *dirty = true;
360                            true
361                        }
362                        _ => false,
363                    }
364                } else {
365                    false
366                }
367            };
368            if !applied_in_place {
369                if let Some((id, page, dirty)) = current_leaf.take() {
370                    if dirty {
371                        self.pager.write_page(id, page)?;
372                    }
373                }
374                self.upsert(key, value)?;
375            }
376        }
377
378        if let Some((id, page, dirty)) = current_leaf.take() {
379            if dirty {
380                self.pager.write_page(id, page)?;
381            }
382        }
383        Ok(())
384    }
385
386    /// Bulk insert for sorted key-value pairs.
387    ///
388    /// Optimized for monotonically increasing keys (e.g. entity IDs):
389    /// - Walks to the target leaf ONCE, then appends many entries
390    ///   before re-walking.
391    /// - Writes each leaf only once per batch (amortized over many inserts).
392    ///
393    /// Falls back to per-entity `insert` on splits.
394    pub fn bulk_insert_sorted(&self, items: &[(Vec<u8>, Vec<u8>)]) -> BTreeResult<()> {
395        if items.is_empty() {
396            return Ok(());
397        }
398        // Key check is a hard fail (oversized keys would corrupt the
399        // ordering invariant). Value oversize, by contrast, is skipped
400        // per-row so a single huge row in the middle of a batch cannot
401        // poison the rest — the caller still gets a signal because we
402        // return the first ValueTooLarge after the rest have landed.
403        for (key, _) in items {
404            if key.len() > MAX_KEY_SIZE {
405                return Err(BTreeError::KeyTooLarge(key.len()));
406            }
407        }
408
409        // Pre-encode every value through the spill ladder. Oversized
410        // rows are dropped from the encoded batch but remembered so we
411        // can surface the first violation at the end of the call.
412        // Encoding here also allocates overflow pages eagerly, which
413        // mirrors single-key `insert` behaviour and keeps the bulk fast
414        // path below opaque to value shape.
415        let mut encoded: Vec<(&[u8], Vec<u8>)> = Vec::with_capacity(items.len());
416        let mut first_oversize: Option<usize> = None;
417        for (key, value) in items {
418            match value_layout::encode(&self.pager, value) {
419                Ok(stored) => encoded.push((key.as_slice(), stored)),
420                Err(value_layout::ValueLayoutError::ValueTooLarge(n)) => {
421                    if first_oversize.is_none() {
422                        first_oversize = Some(n);
423                    }
424                }
425                Err(other) => return Err(other.into()),
426            }
427        }
428
429        let items: Vec<(&[u8], &[u8])> = encoded.iter().map(|(k, v)| (*k, v.as_slice())).collect();
430
431        // After a filled leaf is written to disk, the next ascending
432        // key almost certainly belongs on the right-sibling leaf
433        // (leaves are linked via `leaf_right_sibling`). Cache the
434        // just-written leaf id so the next outer iteration can hop
435        // the sibling pointer in one page read instead of a full
436        // root-to-leaf walk. Any fallback to `self.insert` (which
437        // may split the tree) invalidates the hint.
438        let mut last_filled_leaf: Option<u32> = None;
439        let mut i = 0;
440        while i < items.len() {
441            let root_id = self.root_page_id();
442            if root_id == 0 {
443                // Empty tree — use single insert to create root.
444                // Bytes are already encoded, so go through the internal
445                // entry point that skips the spill ladder.
446                let (k, v) = items[i];
447                self.insert_encoded(k, v)?;
448                i += 1;
449                last_filled_leaf = None;
450                continue;
451            }
452
453            // Try the right-sibling hop first. Valid when: (a) we have
454            // a cached last-filled leaf, (b) it points to a non-zero
455            // sibling, (c) the current key > sibling's last key (or
456            // the sibling is empty). Any mismatch → fall back to
457            // `find_leaf`.
458            let hint_leaf_id: Option<u32> = if let Some(prev) = last_filled_leaf {
459                let prev_page = self.pager.read_page(prev)?;
460                let sibling = leaf_right_sibling(&prev_page);
461                if sibling != 0 {
462                    let sib_page = self.pager.read_page(sibling)?;
463                    match leaf_high_key(&sib_page)? {
464                        None => Some(sibling), // empty sibling — free real estate
465                        Some(hk) if items[i].0 > hk.as_slice() => Some(sibling),
466                        Some(_) => None,
467                    }
468                } else {
469                    None
470                }
471            } else {
472                None
473            };
474
475            let (leaf_id, _path) = match hint_leaf_id {
476                Some(id) => (id, Vec::new()),
477                None => self.find_leaf(root_id, items[i].0)?,
478            };
479            let mut page = self.pager.read_page(leaf_id)?;
480
481            // Snapshot the leaf's last key (via the O(1) slot lookup)
482            // and current free bytes. Both drive the append fast path
483            // below. The old O(M) forward-walk that this block used to
484            // do was made redundant by the slotted layout.
485            let mut last_key_in_leaf: Option<Vec<u8>> = {
486                let cell_count = page.cell_count() as usize;
487                if cell_count == 0 {
488                    None
489                } else {
490                    Some(read_leaf_cell(&page, cell_count - 1)?.0)
491                }
492            };
493
494            // Fast path for monotonically-ascending keys: append new
495            // cells at the tail of the cell-data area and push new
496            // slot pointers onto the slot array. Every key must be
497            // strictly greater than `last_key_in_leaf` and must still
498            // fit in the leaf's free bytes.
499            let mut inserted = 0usize;
500            while i + inserted < items.len() {
501                let (key, value) = items[i + inserted];
502
503                // Strictly-ascending check, doubles as duplicate
504                // detection without a leaf binary search.
505                if let Some(lk) = &last_key_in_leaf {
506                    match key.cmp(lk.as_slice()) {
507                        Ordering::Greater => {}
508                        Ordering::Equal => return Err(BTreeError::DuplicateKey),
509                        Ordering::Less => break,
510                    }
511                }
512
513                // Free-space check: account for every slot appended in this
514                // batch even though `page.cell_count()` is only updated once
515                // at the end. Otherwise we can overrun the slot array into the
516                // cell area near the end of the page and persist bad offsets.
517                let cell_size = 4 + key.len() + value.len();
518                let slot_end_after =
519                    LEAF_SLOT_ARRAY_OFFSET + (page.cell_count() as usize + inserted + 1) * 2;
520                let cells_start = leaf_cells_start(&page);
521                if slot_end_after + cell_size > cells_start {
522                    break;
523                }
524
525                // Append the cell at the tail of the cell-data area
526                // and push a new slot pointer. `page.cell_count()`
527                // still reflects the state before this inserted batch,
528                // so the new slot index is `cell_count + inserted`.
529                let cell_offset = cells_start - cell_size;
530                {
531                    let data = page.as_bytes_mut();
532                    data[cell_offset..cell_offset + 2]
533                        .copy_from_slice(&(key.len() as u16).to_le_bytes());
534                    data[cell_offset + 2..cell_offset + 4]
535                        .copy_from_slice(&(value.len() as u16).to_le_bytes());
536                    data[cell_offset + 4..cell_offset + 4 + key.len()].copy_from_slice(key);
537                    data[cell_offset + 4 + key.len()..cell_offset + cell_size]
538                        .copy_from_slice(value);
539                }
540                page.set_free_end(cell_offset as u16);
541
542                let new_slot_index = page.cell_count() as usize + inserted;
543                let slot_pos = leaf_slot_offset_for(new_slot_index);
544                {
545                    let data = page.as_bytes_mut();
546                    data[slot_pos..slot_pos + 2]
547                        .copy_from_slice(&(cell_offset as u16).to_le_bytes());
548                }
549
550                last_key_in_leaf = Some(key.to_vec());
551                inserted += 1;
552            }
553
554            if inserted > 0 {
555                let new_count = page.cell_count() as usize + inserted;
556                page.set_cell_count(new_count as u16);
557                page.set_free_start((LEAF_SLOT_ARRAY_OFFSET + new_count * 2) as u16);
558                page.update_checksum();
559                self.pager.write_page(leaf_id, page)?;
560                i += inserted;
561                // Remember this leaf; the next outer iteration will
562                // try its right sibling before re-walking from root.
563                last_filled_leaf = Some(leaf_id);
564            } else {
565                // Couldn't fit even one entry via the fast path —
566                // either the leaf is full, or the next key belongs
567                // in the middle of the leaf. Delegate to the generic
568                // insert path which handles both splitting and
569                // mid-leaf positioning. Splits can invalidate the
570                // leaf-chain topology so drop the hint.
571                let (k, v) = items[i];
572                self.insert_encoded(k, v)?;
573                i += 1;
574                last_filled_leaf = None;
575            }
576        }
577
578        // Valid rows have all landed by this point; surface the first
579        // oversize so callers still see a `ValueTooLarge` signal
580        // without losing the rest of the batch (AC #5 — one oversized
581        // row in the middle is rejected but the rest land).
582        if let Some(n) = first_oversize {
583            return Err(BTreeError::ValueTooLarge(n));
584        }
585
586        Ok(())
587    }
588
589    /// Delete a key
590    pub fn delete(&self, key: &[u8]) -> BTreeResult<bool> {
591        let root_id = self.root_page_id();
592        if root_id == 0 {
593            return Ok(false);
594        }
595
596        // D3 — Lehman-Yao right-link traversal for delete.
597        // A split may have moved the key to a right sibling between our
598        // descent and the actual page read. Follow right-links before giving up.
599        let (leaf_id, path) = self.find_leaf(root_id, key)?;
600        let mut current_id = leaf_id;
601        const MAX_RIGHTLINK_HOPS: usize = 32;
602        for _ in 0..MAX_RIGHTLINK_HOPS {
603            let mut page = self.pager.read_page(current_id)?;
604            match search_leaf(&page, key)? {
605                SearchResult::Found(pos) => {
606                    // Found — delete from this page (may differ from leaf_id).
607                    // If the cell carries a spill pointer, return every
608                    // overflow page to the free list before the leaf cell
609                    // goes away; otherwise the chain leaks (slice F).
610                    let (_, stored) = read_leaf_cell(&page, pos)?;
611                    self.free_chain_if_pointer(&stored)?;
612                    delete_from_leaf(&mut page, pos)?;
613                    page.update_checksum();
614                    let page_id = page.page_id();
615                    self.pager.write_page(page_id, page.clone())?;
616
617                    *self
618                        .rightmost_leaf
619                        .write()
620                        .unwrap_or_else(|e| e.into_inner()) = None;
621
622                    if page.cell_count() == 0 && page_id == root_id {
623                        self.pager.free_page(root_id)?;
624                        *self.root_page_id.write().map_err(|e| {
625                            BTreeError::LockPoisoned(format!(
626                                "delete: root_page_id write lock: {e}"
627                            ))
628                        })? = 0;
629                    } else {
630                        self.rebalance_leaf(current_id, path)?;
631                    }
632                    return Ok(true);
633                }
634                SearchResult::NotFound(_) => {
635                    let right = leaf_right_sibling(&page);
636                    if right != 0 {
637                        if let Some(high_key) = leaf_high_key(&page)? {
638                            if key > high_key.as_slice() {
639                                current_id = right;
640                                continue;
641                            }
642                        }
643                    }
644                    return Ok(false);
645                }
646            }
647        }
648        Ok(false)
649    }
650
651    /// Create a cursor starting at the first entry
652    pub fn cursor_first(&self) -> BTreeResult<BTreeCursor> {
653        let root_id = self.root_page_id();
654        if root_id == 0 {
655            return Ok(BTreeCursor {
656                leaf_page_id: 0,
657                position: 0,
658                pager: self.pager.clone(),
659                prefetched_next: false,
660            });
661        }
662
663        // Find leftmost leaf
664        let first_leaf = self.find_first_leaf(root_id)?;
665
666        Ok(BTreeCursor {
667            leaf_page_id: first_leaf,
668            position: 0,
669            pager: self.pager.clone(),
670            prefetched_next: false,
671        })
672    }
673
674    /// Create a cursor starting at or after the given key
675    pub fn cursor_seek(&self, key: &[u8]) -> BTreeResult<BTreeCursor> {
676        let root_id = self.root_page_id();
677        if root_id == 0 {
678            return Ok(BTreeCursor {
679                leaf_page_id: 0,
680                position: 0,
681                pager: self.pager.clone(),
682                prefetched_next: false,
683            });
684        }
685
686        let (leaf_id, _) = self.find_leaf(root_id, key)?;
687        let page = self.pager.read_page(leaf_id)?;
688
689        let position = match search_leaf(&page, key)? {
690            SearchResult::Found(pos) => pos,
691            SearchResult::NotFound(pos) => pos,
692        };
693
694        Ok(BTreeCursor {
695            leaf_page_id: leaf_id,
696            position,
697            pager: self.pager.clone(),
698            prefetched_next: false,
699        })
700    }
701
702    /// Range scan from start_key to end_key (inclusive)
703    pub fn range(&self, start_key: &[u8], end_key: &[u8]) -> BTreeResult<Vec<(Vec<u8>, Vec<u8>)>> {
704        let mut results = Vec::new();
705        let mut cursor = self.cursor_seek(start_key)?;
706
707        while let Some((key, value)) = cursor.next()? {
708            if key.as_slice() > end_key {
709                break;
710            }
711            results.push((key, value));
712        }
713
714        Ok(results)
715    }
716
717    /// Count entries in the tree
718    pub fn count(&self) -> BTreeResult<usize> {
719        let root_id = self.root_page_id();
720        if root_id == 0 {
721            return Ok(0);
722        }
723
724        let mut count = 0;
725        let mut cursor = self.cursor_first()?;
726        while cursor.next()?.is_some() {
727            count += 1;
728        }
729
730        Ok(count)
731    }
732
733    // ==================== Internal Methods ====================
734
735    /// Find the leaf page containing the key
736    fn find_leaf(&self, page_id: u32, key: &[u8]) -> BTreeResult<(u32, Vec<u32>)> {
737        let mut current_id = page_id;
738        let mut path = Vec::new();
739
740        loop {
741            let page = self.pager.read_page(current_id)?;
742
743            match page.page_type()? {
744                PageType::BTreeLeaf => {
745                    return Ok((current_id, path));
746                }
747                PageType::BTreeInterior => {
748                    path.push(current_id);
749                    current_id = find_child(&page, key)?;
750                }
751                other => {
752                    return Err(BTreeError::Corrupted(format!(
753                        "Unexpected page type in B-tree: {:?}",
754                        other
755                    )));
756                }
757            }
758        }
759    }
760
761    /// Find the leftmost leaf page
762    pub(crate) fn find_first_leaf(&self, page_id: u32) -> BTreeResult<u32> {
763        let mut current_id = page_id;
764
765        loop {
766            let page = self.pager.read_page(current_id)?;
767
768            match page.page_type()? {
769                PageType::BTreeLeaf => return Ok(current_id),
770                PageType::BTreeInterior => {
771                    // Go to leftmost child
772                    current_id = find_first_child(&page)?;
773                }
774                _ => {
775                    return Err(BTreeError::Corrupted("Invalid page type".into()));
776                }
777            }
778        }
779    }
780
781    /// Split a leaf page
782    fn split_leaf(
783        &self,
784        page: &mut Page,
785        new_key: &[u8],
786        new_value: &[u8],
787    ) -> BTreeResult<(Page, Vec<u8>)> {
788        // Collect all entries including the new one
789        let mut entries: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
790        let cell_count = page.cell_count() as usize;
791
792        for i in 0..cell_count {
793            entries.push(read_leaf_cell(page, i)?);
794        }
795
796        // Insert new entry in sorted position
797        let insert_pos = entries.partition_point(|(k, _)| k.as_slice() < new_key);
798        entries.insert(insert_pos, (new_key.to_vec(), new_value.to_vec()));
799
800        // Split in half
801        let mid = entries.len() / 2;
802
803        // Create new leaf
804        let mut new_page = self.pager.allocate_page(PageType::BTreeLeaf)?;
805
806        // Update leaf links
807        let old_next = read_next_leaf(page);
808        init_leaf_links(&mut new_page, page.page_id(), old_next);
809        set_next_leaf(page, new_page.page_id());
810
811        // Rebuild both halves through the slotted layout so their
812        // cell_count, free_start and free_end headers are consistent.
813        write_leaf_entries(page, &entries[..mid])?;
814        write_leaf_entries(&mut new_page, &entries[mid..])?;
815
816        // Separator is first key of new leaf
817        let separator = entries[mid].0.clone();
818
819        new_page.update_checksum();
820        let new_page_id = new_page.page_id();
821        self.pager.write_page(new_page_id, new_page.clone())?;
822
823        Ok((new_page, separator))
824    }
825
826    /// Insert into parent after split
827    fn insert_into_parent(
828        &self,
829        mut path: Vec<u32>,
830        left_child: u32,
831        key: &[u8],
832        right_child: u32,
833    ) -> BTreeResult<()> {
834        // If path is empty, need new root
835        if path.is_empty() {
836            let mut new_root = self.pager.allocate_page(PageType::BTreeInterior)?;
837            // Single separator with two children — seed the page via the
838            // canonical slotted builder so the header is consistent.
839            write_interior_entries(&mut new_root, &[key.to_vec()], &[left_child, right_child])?;
840
841            new_root.update_checksum();
842            let new_root_id = new_root.page_id();
843            self.pager.write_page(new_root_id, new_root)?;
844            *self.root_page_id.write().map_err(|e| {
845                BTreeError::LockPoisoned(format!(
846                    "insert_into_parent: root_page_id write lock: {e}"
847                ))
848            })? = new_root_id;
849            return Ok(());
850        }
851
852        // Insert into parent — path is non-empty (checked above)
853        let parent_id = path.pop().ok_or_else(|| {
854            BTreeError::Corrupted("insert_into_parent: path unexpectedly empty".into())
855        })?;
856        let mut parent = self.pager.read_page(parent_id)?;
857
858        // Can we fit?
859        if can_insert_interior(&parent, key) {
860            insert_into_interior(&mut parent, key, left_child, right_child)?;
861            parent.update_checksum();
862            self.pager.write_page(parent_id, parent)?;
863            return Ok(());
864        }
865
866        // Need to split interior node
867        let (new_interior, separator) =
868            self.split_interior(&mut parent, key, left_child, right_child)?;
869        parent.update_checksum();
870        self.pager.write_page(parent_id, parent.clone())?;
871
872        // Propagate up
873        self.insert_into_parent(path, parent.page_id(), &separator, new_interior.page_id())
874    }
875
876    /// Split an interior node
877    fn split_interior(
878        &self,
879        page: &mut Page,
880        new_key: &[u8],
881        left_child: u32,
882        right_child: u32,
883    ) -> BTreeResult<(Page, Vec<u8>)> {
884        let (mut keys, mut children) = read_interior_keys_children(page)?;
885        let key_insert_pos = keys.partition_point(|key| key.as_slice() < new_key);
886        let child_insert_pos = children
887            .iter()
888            .position(|child| *child == left_child)
889            .unwrap_or(key_insert_pos);
890
891        keys.insert(key_insert_pos, new_key.to_vec());
892        children.insert(child_insert_pos + 1, right_child);
893
894        // The promoted separator does not stay in either side; each
895        // resulting page is rebuilt through the shared slotted writer.
896        let mid = keys.len() / 2;
897        let separator = keys[mid].clone();
898
899        // Create new interior node
900        let mut new_page = self.pager.allocate_page(PageType::BTreeInterior)?;
901
902        write_interior_entries(page, &keys[..mid], &children[..mid + 1])?;
903        write_interior_entries(&mut new_page, &keys[mid + 1..], &children[mid + 1..])?;
904
905        new_page.update_checksum();
906        let new_page_id = new_page.page_id();
907        self.pager.write_page(new_page_id, new_page.clone())?;
908
909        Ok((new_page, separator))
910    }
911
912    fn rebalance_leaf(&self, leaf_id: u32, path: Vec<u32>) -> BTreeResult<()> {
913        if path.is_empty() {
914            return Ok(());
915        }
916
917        let root_id = self.root_page_id();
918        if leaf_id == root_id {
919            return Ok(());
920        }
921
922        let mut leaf = self.pager.read_page(leaf_id)?;
923        let mut leaf_entries = read_leaf_entries(&leaf)?;
924        let min_bytes = leaf_min_bytes();
925
926        let parent_id = *path.last().ok_or_else(|| {
927            BTreeError::Corrupted("rebalance_leaf: path unexpectedly empty".into())
928        })?;
929        let mut parent = self.pager.read_page(parent_id)?;
930        let (mut parent_keys, mut parent_children) = read_interior_keys_children(&parent)?;
931
932        let child_index = parent_children
933            .iter()
934            .position(|&id| id == leaf_id)
935            .ok_or_else(|| BTreeError::Corrupted("Leaf missing from parent".into()))?;
936
937        if child_index > 0 {
938            if let Some((first_key, _)) = leaf_entries.first() {
939                if parent_keys.get(child_index - 1).map(|k| k.as_slice())
940                    != Some(first_key.as_slice())
941                {
942                    parent_keys[child_index - 1] = first_key.clone();
943                    write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
944                    parent.update_checksum();
945                    self.pager.write_page(parent_id, parent.clone())?;
946                }
947            }
948        }
949
950        if leaf_entries_size(&leaf_entries) >= min_bytes {
951            return Ok(());
952        }
953
954        if child_index > 0 {
955            let left_id = parent_children[child_index - 1];
956            let mut left = self.pager.read_page(left_id)?;
957            let mut left_entries = read_leaf_entries(&left)?;
958            let mut borrowed = false;
959
960            while leaf_entries_size(&leaf_entries) < min_bytes {
961                let Some(entry) = left_entries.pop() else {
962                    break;
963                };
964                if leaf_entries_size(&left_entries) < min_bytes {
965                    left_entries.push(entry);
966                    break;
967                }
968                leaf_entries.insert(0, entry);
969                borrowed = true;
970            }
971
972            if borrowed {
973                write_leaf_entries(&mut left, &left_entries)?;
974                left.update_checksum();
975                self.pager.write_page(left_id, left)?;
976
977                write_leaf_entries(&mut leaf, &leaf_entries)?;
978                leaf.update_checksum();
979                self.pager.write_page(leaf_id, leaf)?;
980
981                if let Some((first_key, _)) = leaf_entries.first() {
982                    parent_keys[child_index - 1] = first_key.clone();
983                    write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
984                    parent.update_checksum();
985                    self.pager.write_page(parent_id, parent)?;
986                }
987
988                return Ok(());
989            }
990        }
991
992        if child_index + 1 < parent_children.len() {
993            let right_id = parent_children[child_index + 1];
994            let mut right = self.pager.read_page(right_id)?;
995            let mut right_entries = read_leaf_entries(&right)?;
996            let mut borrowed = false;
997
998            while leaf_entries_size(&leaf_entries) < min_bytes {
999                if right_entries.is_empty() {
1000                    break;
1001                }
1002                let entry = right_entries.remove(0);
1003                if leaf_entries_size(&right_entries) < min_bytes {
1004                    right_entries.insert(0, entry);
1005                    break;
1006                }
1007                leaf_entries.push(entry);
1008                borrowed = true;
1009            }
1010
1011            if borrowed {
1012                write_leaf_entries(&mut right, &right_entries)?;
1013                right.update_checksum();
1014                self.pager.write_page(right_id, right)?;
1015
1016                write_leaf_entries(&mut leaf, &leaf_entries)?;
1017                leaf.update_checksum();
1018                self.pager.write_page(leaf_id, leaf)?;
1019
1020                if let Some((first_key, _)) = right_entries.first() {
1021                    parent_keys[child_index] = first_key.clone();
1022                    write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
1023                    parent.update_checksum();
1024                    self.pager.write_page(parent_id, parent)?;
1025                }
1026
1027                return Ok(());
1028            }
1029        }
1030
1031        if child_index > 0 {
1032            let left_id = parent_children[child_index - 1];
1033            let mut left = self.pager.read_page(left_id)?;
1034            let mut left_entries = read_leaf_entries(&left)?;
1035
1036            left_entries.extend(leaf_entries);
1037            write_leaf_entries(&mut left, &left_entries)?;
1038
1039            let next_leaf = read_next_leaf(&leaf);
1040            set_next_leaf(&mut left, next_leaf);
1041            if next_leaf != 0 {
1042                let mut next = self.pager.read_page(next_leaf)?;
1043                set_prev_leaf(&mut next, left_id);
1044                next.update_checksum();
1045                self.pager.write_page(next_leaf, next)?;
1046            }
1047
1048            left.update_checksum();
1049            self.pager.write_page(left_id, left)?;
1050            self.pager.free_page(leaf_id)?;
1051
1052            parent_keys.remove(child_index - 1);
1053            parent_children.remove(child_index);
1054            write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
1055            parent.update_checksum();
1056            self.pager.write_page(parent_id, parent)?;
1057
1058            let mut parent_path = path;
1059            parent_path.pop();
1060            return self.rebalance_interior(parent_id, parent_path);
1061        }
1062
1063        if child_index + 1 < parent_children.len() {
1064            let right_id = parent_children[child_index + 1];
1065            let right = self.pager.read_page(right_id)?;
1066            let right_entries = read_leaf_entries(&right)?;
1067
1068            leaf_entries.extend(right_entries);
1069            write_leaf_entries(&mut leaf, &leaf_entries)?;
1070
1071            let next_leaf = read_next_leaf(&right);
1072            set_next_leaf(&mut leaf, next_leaf);
1073            if next_leaf != 0 {
1074                let mut next = self.pager.read_page(next_leaf)?;
1075                set_prev_leaf(&mut next, leaf_id);
1076                next.update_checksum();
1077                self.pager.write_page(next_leaf, next)?;
1078            }
1079
1080            leaf.update_checksum();
1081            self.pager.write_page(leaf_id, leaf)?;
1082            self.pager.free_page(right_id)?;
1083
1084            parent_keys.remove(child_index);
1085            parent_children.remove(child_index + 1);
1086            write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
1087            parent.update_checksum();
1088            self.pager.write_page(parent_id, parent)?;
1089
1090            let mut parent_path = path;
1091            parent_path.pop();
1092            return self.rebalance_interior(parent_id, parent_path);
1093        }
1094
1095        Ok(())
1096    }
1097
1098    fn rebalance_interior(&self, node_id: u32, mut path: Vec<u32>) -> BTreeResult<()> {
1099        let root_id = self.root_page_id();
1100        let mut node = self.pager.read_page(node_id)?;
1101        let (mut node_keys, mut node_children) = read_interior_keys_children(&node)?;
1102        let min_bytes = interior_min_bytes();
1103
1104        if node_id == root_id {
1105            if node_keys.is_empty() {
1106                let next_root = node_children.first().copied().unwrap_or(0);
1107                self.pager.free_page(node_id)?;
1108                *self.root_page_id.write().map_err(|e| {
1109                    BTreeError::LockPoisoned(format!(
1110                        "rebalance_interior: root_page_id write lock: {e}"
1111                    ))
1112                })? = next_root;
1113            }
1114            return Ok(());
1115        }
1116
1117        if interior_entries_size(&node_keys) >= min_bytes {
1118            return Ok(());
1119        }
1120
1121        let parent_id = match path.pop() {
1122            Some(id) => id,
1123            None => return Ok(()),
1124        };
1125        let mut parent = self.pager.read_page(parent_id)?;
1126        let (mut parent_keys, mut parent_children) = read_interior_keys_children(&parent)?;
1127
1128        let child_index = parent_children
1129            .iter()
1130            .position(|&id| id == node_id)
1131            .ok_or_else(|| BTreeError::Corrupted("Interior missing from parent".into()))?;
1132
1133        if child_index > 0 {
1134            let left_id = parent_children[child_index - 1];
1135            let mut left = self.pager.read_page(left_id)?;
1136            let (mut left_keys, mut left_children) = read_interior_keys_children(&left)?;
1137
1138            if let Some(borrow_key) = left_keys.last().cloned() {
1139                let borrow_size = interior_key_size(&borrow_key);
1140                if interior_entries_size(&left_keys).saturating_sub(borrow_size) >= min_bytes {
1141                    let parent_key = parent_keys[child_index - 1].clone();
1142                    let borrowed_key = left_keys.pop().ok_or_else(|| {
1143                        BTreeError::Corrupted(
1144                            "rebalance_interior: left_keys empty after check".into(),
1145                        )
1146                    })?;
1147                    let borrowed_child = left_children.pop().ok_or_else(|| {
1148                        BTreeError::Corrupted("rebalance_interior: left_children empty".into())
1149                    })?;
1150
1151                    node_keys.insert(0, parent_key);
1152                    node_children.insert(0, borrowed_child);
1153                    parent_keys[child_index - 1] = borrowed_key;
1154
1155                    write_interior_entries(&mut left, &left_keys, &left_children)?;
1156                    left.update_checksum();
1157                    self.pager.write_page(left_id, left)?;
1158
1159                    write_interior_entries(&mut node, &node_keys, &node_children)?;
1160                    node.update_checksum();
1161                    self.pager.write_page(node_id, node)?;
1162
1163                    write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
1164                    parent.update_checksum();
1165                    self.pager.write_page(parent_id, parent)?;
1166
1167                    return Ok(());
1168                }
1169            }
1170        }
1171
1172        if child_index + 1 < parent_children.len() {
1173            let right_id = parent_children[child_index + 1];
1174            let mut right = self.pager.read_page(right_id)?;
1175            let (mut right_keys, mut right_children) = read_interior_keys_children(&right)?;
1176
1177            if let Some(borrow_key) = right_keys.first().cloned() {
1178                let borrow_size = interior_key_size(&borrow_key);
1179                if interior_entries_size(&right_keys).saturating_sub(borrow_size) >= min_bytes {
1180                    let parent_key = parent_keys[child_index].clone();
1181                    let new_parent_key = right_keys.remove(0);
1182                    let borrowed_child = right_children.remove(0);
1183
1184                    node_keys.push(parent_key);
1185                    node_children.push(borrowed_child);
1186                    parent_keys[child_index] = new_parent_key;
1187
1188                    write_interior_entries(&mut right, &right_keys, &right_children)?;
1189                    right.update_checksum();
1190                    self.pager.write_page(right_id, right)?;
1191
1192                    write_interior_entries(&mut node, &node_keys, &node_children)?;
1193                    node.update_checksum();
1194                    self.pager.write_page(node_id, node)?;
1195
1196                    write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
1197                    parent.update_checksum();
1198                    self.pager.write_page(parent_id, parent)?;
1199
1200                    return Ok(());
1201                }
1202            }
1203        }
1204
1205        if child_index > 0 {
1206            let left_id = parent_children[child_index - 1];
1207            let mut left = self.pager.read_page(left_id)?;
1208            let (mut left_keys, mut left_children) = read_interior_keys_children(&left)?;
1209            let parent_key = parent_keys.remove(child_index - 1);
1210            parent_children.remove(child_index);
1211
1212            left_keys.push(parent_key);
1213            left_keys.extend(node_keys);
1214            left_children.extend(node_children);
1215
1216            write_interior_entries(&mut left, &left_keys, &left_children)?;
1217            left.update_checksum();
1218            self.pager.write_page(left_id, left)?;
1219            self.pager.free_page(node_id)?;
1220
1221            write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
1222            parent.update_checksum();
1223            self.pager.write_page(parent_id, parent)?;
1224
1225            return self.rebalance_interior(parent_id, path);
1226        }
1227
1228        if child_index + 1 < parent_children.len() {
1229            let right_id = parent_children[child_index + 1];
1230            let right = self.pager.read_page(right_id)?;
1231            let (right_keys, right_children) = read_interior_keys_children(&right)?;
1232            let parent_key = parent_keys.remove(child_index);
1233            parent_children.remove(child_index + 1);
1234
1235            node_keys.push(parent_key);
1236            node_keys.extend(right_keys);
1237            node_children.extend(right_children);
1238
1239            write_interior_entries(&mut node, &node_keys, &node_children)?;
1240            node.update_checksum();
1241            self.pager.write_page(node_id, node)?;
1242            self.pager.free_page(right_id)?;
1243
1244            write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
1245            parent.update_checksum();
1246            self.pager.write_page(parent_id, parent)?;
1247
1248            return self.rebalance_interior(parent_id, path);
1249        }
1250
1251        Ok(())
1252    }
1253}