Skip to main content

reddb_server/storage/engine/btree/
impl.rs

1use super::*;
2
3impl BTree {
4    /// Create a new B+ tree using the given pager
5    pub fn new(pager: Arc<Pager>) -> Self {
6        Self {
7            pager,
8            root_page_id: RwLock::new(0),
9            rightmost_leaf: RwLock::new(None),
10        }
11    }
12
13    /// Create a B+ tree with an existing root
14    pub fn with_root(pager: Arc<Pager>, root_page_id: u32) -> Self {
15        Self {
16            pager,
17            root_page_id: RwLock::new(root_page_id),
18            rightmost_leaf: RwLock::new(None),
19        }
20    }
21
22    /// Get the root page ID
23    pub fn root_page_id(&self) -> u32 {
24        *self
25            .root_page_id
26            .read()
27            .unwrap_or_else(|poisoned| poisoned.into_inner())
28    }
29
30    /// Check if tree is empty
31    pub fn is_empty(&self) -> bool {
32        self.root_page_id() == 0
33    }
34
35    /// Get value for a key
36    pub fn get(&self, key: &[u8]) -> BTreeResult<Option<Vec<u8>>> {
37        let root_id = self.root_page_id();
38        if root_id == 0 {
39            return Ok(None);
40        }
41
42        // Descend from root to the target leaf
43        let (leaf_id, _) = self.find_leaf(root_id, key)?;
44
45        // D3 — Lehman-Yao right-link traversal.
46        //
47        // A concurrent insert may have split the leaf between our descent
48        // and this read. If the key we're searching for is greater than the
49        // current page's high_key (max stored key), the key lives on a
50        // right-sibling page — follow the right-link instead of re-descending
51        // from the root. Cap at MAX_RIGHTLINK_HOPS to guard against corrupt
52        // link chains.
53        const MAX_RIGHTLINK_HOPS: usize = 32;
54        let mut current_id = leaf_id;
55        for _ in 0..MAX_RIGHTLINK_HOPS {
56            let page = self.pager.read_page(current_id)?;
57
58            match search_leaf(&page, key)? {
59                SearchResult::Found(pos) => {
60                    let (_, value) = read_leaf_cell(&page, pos)?;
61                    return Ok(Some(value));
62                }
63                SearchResult::NotFound(_) => {
64                    // Check if a split pushed our key to the right sibling
65                    let right = leaf_right_sibling(&page);
66                    if right != 0 {
67                        if let Some(high_key) = leaf_high_key(&page)? {
68                            if key > high_key.as_slice() {
69                                // Key belongs on a right sibling — follow link
70                                current_id = right;
71                                continue;
72                            }
73                        }
74                    }
75                    // Key genuinely not present
76                    return Ok(None);
77                }
78            }
79        }
80
81        // Exhausted hop limit — fall back to root descent (corrupt link chain)
82        let (leaf_id, _) = self.find_leaf(self.root_page_id(), key)?;
83        let page = self.pager.read_page(leaf_id)?;
84        match search_leaf(&page, key)? {
85            SearchResult::Found(pos) => {
86                let (_, value) = read_leaf_cell(&page, pos)?;
87                Ok(Some(value))
88            }
89            SearchResult::NotFound(_) => Ok(None),
90        }
91    }
92
93    /// Insert a key-value pair
94    pub fn insert(&self, key: &[u8], value: &[u8]) -> BTreeResult<()> {
95        // Validate sizes
96        if key.len() > MAX_KEY_SIZE {
97            return Err(BTreeError::KeyTooLarge(key.len()));
98        }
99        if value.len() > MAX_VALUE_SIZE {
100            return Err(BTreeError::ValueTooLarge(value.len()));
101        }
102
103        let root_id = self.root_page_id();
104
105        // Empty tree - create root leaf
106        if root_id == 0 {
107            let mut page = self.pager.allocate_page(PageType::BTreeLeaf)?;
108            clear_leaf_cells(&mut page);
109            insert_into_leaf(&mut page, key, value)?;
110            init_leaf_links(&mut page, 0, 0);
111            page.update_checksum();
112            let new_root = page.page_id();
113            self.pager.write_page(new_root, page)?;
114            *self.root_page_id.write().map_err(|e| {
115                BTreeError::LockPoisoned(format!("insert: root_page_id write lock: {e}"))
116            })? = new_root;
117            return Ok(());
118        }
119
120        // D2 fastpath: if key > cached rightmost high_key, go directly to
121        // the cached leaf without descending from the root. This cuts
122        // tree descent entirely for monotonic append workloads (timeseries,
123        // auto-increment entity IDs).
124        let cached = {
125            self.rightmost_leaf
126                .read()
127                .map_err(|e| BTreeError::LockPoisoned(format!("rightmost_leaf read: {e}")))?
128                .clone()
129        };
130        if let Some((cached_page_id, ref high_key)) = cached {
131            if key > high_key.as_slice() {
132                let mut page = self.pager.read_page(cached_page_id)?;
133                // Confirm this page is still a valid leaf with a right-link
134                // of 0 (i.e., it IS the rightmost leaf — not mid-tree after
135                // a concurrent split moved the boundary).
136                let right_sibling = leaf_right_sibling(&page);
137                if right_sibling == 0 {
138                    // Still the rightmost leaf. Try fast append.
139                    if can_insert_leaf(&page, key, value) {
140                        insert_into_leaf(&mut page, key, value)?;
141                        // Skip update_checksum here — pager.write_page
142                        // below recomputes it. Saves one CRC32 on 4KB
143                        // per insert on the hot append path.
144                        let page_id = page.page_id();
145                        // Update cached high_key
146                        *self.rightmost_leaf.write().map_err(|e| {
147                            BTreeError::LockPoisoned(format!("rightmost_leaf write: {e}"))
148                        })? = Some((page_id, key.to_vec()));
149                        self.pager.write_page(page_id, page)?;
150                        return Ok(());
151                    }
152                    // Leaf is full — split. After split, invalidate cache
153                    // (the new rightmost leaf is the split result).
154                    let (new_leaf, separator_key) = self.split_leaf(&mut page, key, value)?;
155                    let new_leaf_id = new_leaf.page_id();
156                    page.update_checksum();
157                    let page_id = page.page_id();
158                    self.pager.write_page(page_id, page.clone())?;
159                    // Cache the new rightmost leaf after split
160                    *self.rightmost_leaf.write().map_err(|e| {
161                        BTreeError::LockPoisoned(format!("rightmost_leaf write: {e}"))
162                    })? = Some((new_leaf_id, key.to_vec()));
163                    // Get parent path for the split propagation.
164                    // The left child is the original cached leaf.
165                    let (_, path) = self.find_leaf(root_id, &separator_key)?;
166                    self.insert_into_parent(path, cached_page_id, &separator_key, new_leaf_id)?;
167                    return Ok(());
168                }
169                // right_sibling != 0: cached page is no longer rightmost.
170                // Fall through to the full find_leaf path below and
171                // refresh the cache.
172                *self.rightmost_leaf.write().map_err(|e| {
173                    BTreeError::LockPoisoned(format!("rightmost_leaf write: {e}"))
174                })? = None;
175            }
176        }
177
178        // Find the leaf and path to it
179        let (leaf_id, path) = self.find_leaf(root_id, key)?;
180        let mut page = self.pager.read_page(leaf_id)?;
181
182        // Check for duplicate
183        if let SearchResult::Found(_) = search_leaf(&page, key)? {
184            return Err(BTreeError::DuplicateKey);
185        }
186
187        // Try to insert into leaf
188        if can_insert_leaf(&page, key, value) {
189            insert_into_leaf(&mut page, key, value)?;
190            // Skip update_checksum — pager.write_page recomputes it.
191            let page_id = page.page_id();
192            // If this is the rightmost leaf (right_sibling == 0), cache it.
193            if leaf_right_sibling(&page) == 0 {
194                *self.rightmost_leaf.write().map_err(|e| {
195                    BTreeError::LockPoisoned(format!("rightmost_leaf write: {e}"))
196                })? = Some((page_id, key.to_vec()));
197            }
198            self.pager.write_page(page_id, page)?;
199            return Ok(());
200        }
201
202        // Need to split the leaf
203        let (new_leaf, separator_key) = self.split_leaf(&mut page, key, value)?;
204        let new_leaf_id = new_leaf.page_id();
205        page.update_checksum();
206        let page_id = page.page_id();
207        self.pager.write_page(page_id, page.clone())?;
208        // Cache the new rightmost leaf (split result gets the larger keys)
209        *self
210            .rightmost_leaf
211            .write()
212            .map_err(|e| BTreeError::LockPoisoned(format!("rightmost_leaf write: {e}")))? =
213            Some((new_leaf_id, key.to_vec()));
214
215        // Propagate split up the tree
216        self.insert_into_parent(path, page_id, &separator_key, new_leaf_id)?;
217
218        Ok(())
219    }
220
221    /// Insert or update a key. If the key already exists and the new
222    /// value has the same length as the old, the value bytes are
223    /// overwritten in place — no structural changes, no rebalance,
224    /// one leaf page write.
225    ///
226    /// Falls back to `delete + insert` when the key is missing or the
227    /// new value has a different length. Callers like
228    /// `persist_entities_to_pager` that re-serialize a mutated entity
229    /// with a fixed-width schema almost always hit the fast path,
230    /// eliminating the `BTree::delete + rebalance` cost that previously
231    /// dominated UPDATE workloads (~50% of `bulk_update` CPU).
232    pub fn upsert(&self, key: &[u8], value: &[u8]) -> BTreeResult<()> {
233        let root_id = self.root_page_id();
234        if root_id == 0 {
235            return self.insert(key, value);
236        }
237
238        let (leaf_id, _path) = self.find_leaf(root_id, key)?;
239        let mut page = self.pager.read_page(leaf_id)?;
240
241        if let SearchResult::Found(pos) = search_leaf(&page, key)? {
242            let (_, old_value) = read_leaf_cell(&page, pos)?;
243            if value.len() <= old_value.len() {
244                // Same-length or shrink: update cell in place. Shrinks
245                // leave a small gap of dead bytes until the page is
246                // naturally rewritten (split / compact) — slotted layout
247                // keeps correctness because each cell carries its own
248                // length header.
249                overwrite_leaf_value(&mut page, pos, value)?;
250                let page_id = page.page_id();
251                self.pager.write_page(page_id, page)?;
252                return Ok(());
253            }
254            // Grow case: fall through to delete+insert for a proper
255            // reallocation + potential rebalance.
256            // Same key, different size — structural re-layout needed.
257            // Delete + re-insert preserves ordering + handles rebalance.
258            let _ = self.delete(key);
259            return self.insert(key, value);
260        }
261
262        // Key is new — plain insert path.
263        self.insert(key, value)
264    }
265
266    /// Batch upsert for sorted keys landing in a small set of leaves.
267    ///
268    /// Walks to each leaf once, applies every same-or-shrink in-place
269    /// overwrite that belongs there, writes the page once, then moves
270    /// on. Keys that miss or whose new value grows fall back to the
271    /// generic single-key `upsert` path — correctness identical,
272    /// structural re-layout handled there.
273    ///
274    /// Callers pass lex-sorted `(key, value)` pairs. For the entity
275    /// UPDATE path, IDs are big-endian so u64 ordering = lex ordering.
276    pub fn upsert_batch_sorted(&self, items: &[(Vec<u8>, Vec<u8>)]) -> BTreeResult<()> {
277        if items.is_empty() {
278            return Ok(());
279        }
280        let root_id = self.root_page_id();
281        if root_id == 0 {
282            for (k, v) in items {
283                self.upsert(k, v)?;
284            }
285            return Ok(());
286        }
287
288        let mut current_leaf: Option<(u32, Page, bool)> = None; // (leaf_id, page, dirty)
289        for (key, value) in items {
290            // Locate the leaf for this key. If it matches the leaf we
291            // already have in hand, reuse the in-memory page.
292            let (leaf_id, _path) = self.find_leaf(root_id, key)?;
293            if current_leaf.as_ref().map(|(id, _, _)| *id) != Some(leaf_id) {
294                if let Some((id, page, dirty)) = current_leaf.take() {
295                    if dirty {
296                        self.pager.write_page(id, page)?;
297                    }
298                }
299                let page = self.pager.read_page(leaf_id)?;
300                current_leaf = Some((leaf_id, page, false));
301            }
302
303            let applied_in_place = {
304                let (_, page, dirty) = current_leaf.as_mut().expect("set above");
305                if let SearchResult::Found(pos) = search_leaf(page, key)? {
306                    let (_, old_value) = read_leaf_cell(page, pos)?;
307                    if value.len() <= old_value.len() {
308                        overwrite_leaf_value(page, pos, value)?;
309                        *dirty = true;
310                        true
311                    } else {
312                        false
313                    }
314                } else {
315                    false
316                }
317            };
318            if !applied_in_place {
319                if let Some((id, page, dirty)) = current_leaf.take() {
320                    if dirty {
321                        self.pager.write_page(id, page)?;
322                    }
323                }
324                self.upsert(key, value)?;
325            }
326        }
327
328        if let Some((id, page, dirty)) = current_leaf.take() {
329            if dirty {
330                self.pager.write_page(id, page)?;
331            }
332        }
333        Ok(())
334    }
335
336    /// Bulk insert for sorted key-value pairs.
337    ///
338    /// Optimized for monotonically increasing keys (e.g. entity IDs):
339    /// - Walks to the target leaf ONCE, then appends many entries
340    ///   before re-walking.
341    /// - Writes each leaf only once per batch (amortized over many inserts).
342    ///
343    /// Falls back to per-entity `insert` on splits.
344    pub fn bulk_insert_sorted(&self, items: &[(Vec<u8>, Vec<u8>)]) -> BTreeResult<()> {
345        if items.is_empty() {
346            return Ok(());
347        }
348        for (key, value) in items {
349            if key.len() > MAX_KEY_SIZE {
350                return Err(BTreeError::KeyTooLarge(key.len()));
351            }
352            if value.len() > MAX_VALUE_SIZE {
353                return Err(BTreeError::ValueTooLarge(value.len()));
354            }
355        }
356
357        // Callers are expected to supply lex-sorted keys. This is the
358        // case for the entity-insert path in `impl_entities.rs`, which
359        // encodes u64 IDs as big-endian bytes — so sequential IDs are
360        // monotonically increasing in lex order too, and the tail-append
361        // fast path below triggers naturally.
362        //
363        // If a caller ever passes out-of-order keys, the fast path will
364        // simply bail out on the first violation and delegate to the
365        // generic slow path for that item.
366        let items: Vec<(&[u8], &[u8])> = items
367            .iter()
368            .map(|(k, v)| (k.as_slice(), v.as_slice()))
369            .collect();
370
371        // After a filled leaf is written to disk, the next ascending
372        // key almost certainly belongs on the right-sibling leaf
373        // (leaves are linked via `leaf_right_sibling`). Cache the
374        // just-written leaf id so the next outer iteration can hop
375        // the sibling pointer in one page read instead of a full
376        // root-to-leaf walk. Any fallback to `self.insert` (which
377        // may split the tree) invalidates the hint.
378        let mut last_filled_leaf: Option<u32> = None;
379        let mut i = 0;
380        while i < items.len() {
381            let root_id = self.root_page_id();
382            if root_id == 0 {
383                // Empty tree — use single insert to create root
384                let (k, v) = items[i];
385                self.insert(k, v)?;
386                i += 1;
387                last_filled_leaf = None;
388                continue;
389            }
390
391            // Try the right-sibling hop first. Valid when: (a) we have
392            // a cached last-filled leaf, (b) it points to a non-zero
393            // sibling, (c) the current key > sibling's last key (or
394            // the sibling is empty). Any mismatch → fall back to
395            // `find_leaf`.
396            let hint_leaf_id: Option<u32> = if let Some(prev) = last_filled_leaf {
397                let prev_page = self.pager.read_page(prev)?;
398                let sibling = leaf_right_sibling(&prev_page);
399                if sibling != 0 {
400                    let sib_page = self.pager.read_page(sibling)?;
401                    match leaf_high_key(&sib_page)? {
402                        None => Some(sibling), // empty sibling — free real estate
403                        Some(hk) if items[i].0 > hk.as_slice() => Some(sibling),
404                        Some(_) => None,
405                    }
406                } else {
407                    None
408                }
409            } else {
410                None
411            };
412
413            let (leaf_id, _path) = match hint_leaf_id {
414                Some(id) => (id, Vec::new()),
415                None => self.find_leaf(root_id, items[i].0)?,
416            };
417            let mut page = self.pager.read_page(leaf_id)?;
418
419            // Snapshot the leaf's last key (via the O(1) slot lookup)
420            // and current free bytes. Both drive the append fast path
421            // below. The old O(M) forward-walk that this block used to
422            // do was made redundant by the slotted layout.
423            let mut last_key_in_leaf: Option<Vec<u8>> = {
424                let cell_count = page.cell_count() as usize;
425                if cell_count == 0 {
426                    None
427                } else {
428                    Some(read_leaf_cell(&page, cell_count - 1)?.0)
429                }
430            };
431
432            // Fast path for monotonically-ascending keys: append new
433            // cells at the tail of the cell-data area and push new
434            // slot pointers onto the slot array. Every key must be
435            // strictly greater than `last_key_in_leaf` and must still
436            // fit in the leaf's free bytes.
437            let mut inserted = 0usize;
438            while i + inserted < items.len() {
439                let (key, value) = items[i + inserted];
440
441                // Strictly-ascending check, doubles as duplicate
442                // detection without a leaf binary search.
443                if let Some(lk) = &last_key_in_leaf {
444                    match key.cmp(lk.as_slice()) {
445                        Ordering::Greater => {}
446                        Ordering::Equal => return Err(BTreeError::DuplicateKey),
447                        Ordering::Less => break,
448                    }
449                }
450
451                // Free-space check: account for every slot appended in this
452                // batch even though `page.cell_count()` is only updated once
453                // at the end. Otherwise we can overrun the slot array into the
454                // cell area near the end of the page and persist bad offsets.
455                let cell_size = 4 + key.len() + value.len();
456                let slot_end_after =
457                    LEAF_SLOT_ARRAY_OFFSET + (page.cell_count() as usize + inserted + 1) * 2;
458                let cells_start = leaf_cells_start(&page);
459                if slot_end_after + cell_size > cells_start {
460                    break;
461                }
462
463                // Append the cell at the tail of the cell-data area
464                // and push a new slot pointer. `page.cell_count()`
465                // still reflects the state before this inserted batch,
466                // so the new slot index is `cell_count + inserted`.
467                let cell_offset = cells_start - cell_size;
468                {
469                    let data = page.as_bytes_mut();
470                    data[cell_offset..cell_offset + 2]
471                        .copy_from_slice(&(key.len() as u16).to_le_bytes());
472                    data[cell_offset + 2..cell_offset + 4]
473                        .copy_from_slice(&(value.len() as u16).to_le_bytes());
474                    data[cell_offset + 4..cell_offset + 4 + key.len()].copy_from_slice(key);
475                    data[cell_offset + 4 + key.len()..cell_offset + cell_size]
476                        .copy_from_slice(value);
477                }
478                page.set_free_end(cell_offset as u16);
479
480                let new_slot_index = page.cell_count() as usize + inserted;
481                let slot_pos = leaf_slot_offset_for(new_slot_index);
482                {
483                    let data = page.as_bytes_mut();
484                    data[slot_pos..slot_pos + 2]
485                        .copy_from_slice(&(cell_offset as u16).to_le_bytes());
486                }
487
488                last_key_in_leaf = Some(key.to_vec());
489                inserted += 1;
490            }
491
492            if inserted > 0 {
493                let new_count = page.cell_count() as usize + inserted;
494                page.set_cell_count(new_count as u16);
495                page.set_free_start((LEAF_SLOT_ARRAY_OFFSET + new_count * 2) as u16);
496                page.update_checksum();
497                self.pager.write_page(leaf_id, page)?;
498                i += inserted;
499                // Remember this leaf; the next outer iteration will
500                // try its right sibling before re-walking from root.
501                last_filled_leaf = Some(leaf_id);
502            } else {
503                // Couldn't fit even one entry via the fast path —
504                // either the leaf is full, or the next key belongs
505                // in the middle of the leaf. Delegate to the generic
506                // insert path which handles both splitting and
507                // mid-leaf positioning. Splits can invalidate the
508                // leaf-chain topology so drop the hint.
509                let (k, v) = items[i];
510                self.insert(k, v)?;
511                i += 1;
512                last_filled_leaf = None;
513            }
514        }
515
516        Ok(())
517    }
518
519    /// Delete a key
520    pub fn delete(&self, key: &[u8]) -> BTreeResult<bool> {
521        let root_id = self.root_page_id();
522        if root_id == 0 {
523            return Ok(false);
524        }
525
526        // D3 — Lehman-Yao right-link traversal for delete.
527        // A split may have moved the key to a right sibling between our
528        // descent and the actual page read. Follow right-links before giving up.
529        let (leaf_id, path) = self.find_leaf(root_id, key)?;
530        let mut current_id = leaf_id;
531        const MAX_RIGHTLINK_HOPS: usize = 32;
532        for _ in 0..MAX_RIGHTLINK_HOPS {
533            let mut page = self.pager.read_page(current_id)?;
534            match search_leaf(&page, key)? {
535                SearchResult::Found(pos) => {
536                    // Found — delete from this page (may differ from leaf_id)
537                    delete_from_leaf(&mut page, pos)?;
538                    page.update_checksum();
539                    let page_id = page.page_id();
540                    self.pager.write_page(page_id, page.clone())?;
541
542                    *self
543                        .rightmost_leaf
544                        .write()
545                        .unwrap_or_else(|e| e.into_inner()) = None;
546
547                    if page.cell_count() == 0 && page_id == root_id {
548                        self.pager.free_page(root_id)?;
549                        *self.root_page_id.write().map_err(|e| {
550                            BTreeError::LockPoisoned(format!(
551                                "delete: root_page_id write lock: {e}"
552                            ))
553                        })? = 0;
554                    } else {
555                        self.rebalance_leaf(current_id, path)?;
556                    }
557                    return Ok(true);
558                }
559                SearchResult::NotFound(_) => {
560                    let right = leaf_right_sibling(&page);
561                    if right != 0 {
562                        if let Some(high_key) = leaf_high_key(&page)? {
563                            if key > high_key.as_slice() {
564                                current_id = right;
565                                continue;
566                            }
567                        }
568                    }
569                    return Ok(false);
570                }
571            }
572        }
573        Ok(false)
574    }
575
576    /// Create a cursor starting at the first entry
577    pub fn cursor_first(&self) -> BTreeResult<BTreeCursor> {
578        let root_id = self.root_page_id();
579        if root_id == 0 {
580            return Ok(BTreeCursor {
581                leaf_page_id: 0,
582                position: 0,
583                pager: self.pager.clone(),
584                prefetched_next: false,
585            });
586        }
587
588        // Find leftmost leaf
589        let first_leaf = self.find_first_leaf(root_id)?;
590
591        Ok(BTreeCursor {
592            leaf_page_id: first_leaf,
593            position: 0,
594            pager: self.pager.clone(),
595            prefetched_next: false,
596        })
597    }
598
599    /// Create a cursor starting at or after the given key
600    pub fn cursor_seek(&self, key: &[u8]) -> BTreeResult<BTreeCursor> {
601        let root_id = self.root_page_id();
602        if root_id == 0 {
603            return Ok(BTreeCursor {
604                leaf_page_id: 0,
605                position: 0,
606                pager: self.pager.clone(),
607                prefetched_next: false,
608            });
609        }
610
611        let (leaf_id, _) = self.find_leaf(root_id, key)?;
612        let page = self.pager.read_page(leaf_id)?;
613
614        let position = match search_leaf(&page, key)? {
615            SearchResult::Found(pos) => pos,
616            SearchResult::NotFound(pos) => pos,
617        };
618
619        Ok(BTreeCursor {
620            leaf_page_id: leaf_id,
621            position,
622            pager: self.pager.clone(),
623            prefetched_next: false,
624        })
625    }
626
627    /// Range scan from start_key to end_key (inclusive)
628    pub fn range(&self, start_key: &[u8], end_key: &[u8]) -> BTreeResult<Vec<(Vec<u8>, Vec<u8>)>> {
629        let mut results = Vec::new();
630        let mut cursor = self.cursor_seek(start_key)?;
631
632        while let Some((key, value)) = cursor.next()? {
633            if key.as_slice() > end_key {
634                break;
635            }
636            results.push((key, value));
637        }
638
639        Ok(results)
640    }
641
642    /// Count entries in the tree
643    pub fn count(&self) -> BTreeResult<usize> {
644        let root_id = self.root_page_id();
645        if root_id == 0 {
646            return Ok(0);
647        }
648
649        let mut count = 0;
650        let mut cursor = self.cursor_first()?;
651        while cursor.next()?.is_some() {
652            count += 1;
653        }
654
655        Ok(count)
656    }
657
658    // ==================== Internal Methods ====================
659
660    /// Find the leaf page containing the key
661    fn find_leaf(&self, page_id: u32, key: &[u8]) -> BTreeResult<(u32, Vec<u32>)> {
662        let mut current_id = page_id;
663        let mut path = Vec::new();
664
665        loop {
666            let page = self.pager.read_page(current_id)?;
667
668            match page.page_type()? {
669                PageType::BTreeLeaf => {
670                    return Ok((current_id, path));
671                }
672                PageType::BTreeInterior => {
673                    path.push(current_id);
674                    current_id = find_child(&page, key)?;
675                }
676                other => {
677                    return Err(BTreeError::Corrupted(format!(
678                        "Unexpected page type in B-tree: {:?}",
679                        other
680                    )));
681                }
682            }
683        }
684    }
685
686    /// Find the leftmost leaf page
687    pub(crate) fn find_first_leaf(&self, page_id: u32) -> BTreeResult<u32> {
688        let mut current_id = page_id;
689
690        loop {
691            let page = self.pager.read_page(current_id)?;
692
693            match page.page_type()? {
694                PageType::BTreeLeaf => return Ok(current_id),
695                PageType::BTreeInterior => {
696                    // Go to leftmost child
697                    current_id = find_first_child(&page)?;
698                }
699                _ => {
700                    return Err(BTreeError::Corrupted("Invalid page type".into()));
701                }
702            }
703        }
704    }
705
706    /// Split a leaf page
707    fn split_leaf(
708        &self,
709        page: &mut Page,
710        new_key: &[u8],
711        new_value: &[u8],
712    ) -> BTreeResult<(Page, Vec<u8>)> {
713        // Collect all entries including the new one
714        let mut entries: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
715        let cell_count = page.cell_count() as usize;
716
717        for i in 0..cell_count {
718            entries.push(read_leaf_cell(page, i)?);
719        }
720
721        // Insert new entry in sorted position
722        let insert_pos = entries.partition_point(|(k, _)| k.as_slice() < new_key);
723        entries.insert(insert_pos, (new_key.to_vec(), new_value.to_vec()));
724
725        // Split in half
726        let mid = entries.len() / 2;
727
728        // Create new leaf
729        let mut new_page = self.pager.allocate_page(PageType::BTreeLeaf)?;
730
731        // Update leaf links
732        let old_next = read_next_leaf(page);
733        init_leaf_links(&mut new_page, page.page_id(), old_next);
734        set_next_leaf(page, new_page.page_id());
735
736        // Rebuild both halves through the slotted layout so their
737        // cell_count, free_start and free_end headers are consistent.
738        write_leaf_entries(page, &entries[..mid])?;
739        write_leaf_entries(&mut new_page, &entries[mid..])?;
740
741        // Separator is first key of new leaf
742        let separator = entries[mid].0.clone();
743
744        new_page.update_checksum();
745        let new_page_id = new_page.page_id();
746        self.pager.write_page(new_page_id, new_page.clone())?;
747
748        Ok((new_page, separator))
749    }
750
751    /// Insert into parent after split
752    fn insert_into_parent(
753        &self,
754        mut path: Vec<u32>,
755        left_child: u32,
756        key: &[u8],
757        right_child: u32,
758    ) -> BTreeResult<()> {
759        // If path is empty, need new root
760        if path.is_empty() {
761            let mut new_root = self.pager.allocate_page(PageType::BTreeInterior)?;
762            // Single separator with two children — seed the page via the
763            // canonical slotted builder so the header is consistent.
764            write_interior_entries(&mut new_root, &[key.to_vec()], &[left_child, right_child])?;
765
766            new_root.update_checksum();
767            let new_root_id = new_root.page_id();
768            self.pager.write_page(new_root_id, new_root)?;
769            *self.root_page_id.write().map_err(|e| {
770                BTreeError::LockPoisoned(format!(
771                    "insert_into_parent: root_page_id write lock: {e}"
772                ))
773            })? = new_root_id;
774            return Ok(());
775        }
776
777        // Insert into parent — path is non-empty (checked above)
778        let parent_id = path.pop().ok_or_else(|| {
779            BTreeError::Corrupted("insert_into_parent: path unexpectedly empty".into())
780        })?;
781        let mut parent = self.pager.read_page(parent_id)?;
782
783        // Can we fit?
784        if can_insert_interior(&parent, key) {
785            insert_into_interior(&mut parent, key, left_child, right_child)?;
786            parent.update_checksum();
787            self.pager.write_page(parent_id, parent)?;
788            return Ok(());
789        }
790
791        // Need to split interior node
792        let (new_interior, separator) =
793            self.split_interior(&mut parent, key, left_child, right_child)?;
794        parent.update_checksum();
795        self.pager.write_page(parent_id, parent.clone())?;
796
797        // Propagate up
798        self.insert_into_parent(path, parent.page_id(), &separator, new_interior.page_id())
799    }
800
801    /// Split an interior node
802    fn split_interior(
803        &self,
804        page: &mut Page,
805        new_key: &[u8],
806        left_child: u32,
807        right_child: u32,
808    ) -> BTreeResult<(Page, Vec<u8>)> {
809        let (mut keys, mut children) = read_interior_keys_children(page)?;
810        let key_insert_pos = keys.partition_point(|key| key.as_slice() < new_key);
811        let child_insert_pos = children
812            .iter()
813            .position(|child| *child == left_child)
814            .unwrap_or(key_insert_pos);
815
816        keys.insert(key_insert_pos, new_key.to_vec());
817        children.insert(child_insert_pos + 1, right_child);
818
819        // The promoted separator does not stay in either side; each
820        // resulting page is rebuilt through the shared slotted writer.
821        let mid = keys.len() / 2;
822        let separator = keys[mid].clone();
823
824        // Create new interior node
825        let mut new_page = self.pager.allocate_page(PageType::BTreeInterior)?;
826
827        write_interior_entries(page, &keys[..mid], &children[..mid + 1])?;
828        write_interior_entries(&mut new_page, &keys[mid + 1..], &children[mid + 1..])?;
829
830        new_page.update_checksum();
831        let new_page_id = new_page.page_id();
832        self.pager.write_page(new_page_id, new_page.clone())?;
833
834        Ok((new_page, separator))
835    }
836
837    fn rebalance_leaf(&self, leaf_id: u32, path: Vec<u32>) -> BTreeResult<()> {
838        if path.is_empty() {
839            return Ok(());
840        }
841
842        let root_id = self.root_page_id();
843        if leaf_id == root_id {
844            return Ok(());
845        }
846
847        let mut leaf = self.pager.read_page(leaf_id)?;
848        let mut leaf_entries = read_leaf_entries(&leaf)?;
849        let min_bytes = leaf_min_bytes();
850
851        let parent_id = *path.last().ok_or_else(|| {
852            BTreeError::Corrupted("rebalance_leaf: path unexpectedly empty".into())
853        })?;
854        let mut parent = self.pager.read_page(parent_id)?;
855        let (mut parent_keys, mut parent_children) = read_interior_keys_children(&parent)?;
856
857        let child_index = parent_children
858            .iter()
859            .position(|&id| id == leaf_id)
860            .ok_or_else(|| BTreeError::Corrupted("Leaf missing from parent".into()))?;
861
862        if child_index > 0 {
863            if let Some((first_key, _)) = leaf_entries.first() {
864                if parent_keys.get(child_index - 1).map(|k| k.as_slice())
865                    != Some(first_key.as_slice())
866                {
867                    parent_keys[child_index - 1] = first_key.clone();
868                    write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
869                    parent.update_checksum();
870                    self.pager.write_page(parent_id, parent.clone())?;
871                }
872            }
873        }
874
875        if leaf_entries_size(&leaf_entries) >= min_bytes {
876            return Ok(());
877        }
878
879        if child_index > 0 {
880            let left_id = parent_children[child_index - 1];
881            let mut left = self.pager.read_page(left_id)?;
882            let mut left_entries = read_leaf_entries(&left)?;
883            let mut borrowed = false;
884
885            while leaf_entries_size(&leaf_entries) < min_bytes {
886                let Some(entry) = left_entries.pop() else {
887                    break;
888                };
889                if leaf_entries_size(&left_entries) < min_bytes {
890                    left_entries.push(entry);
891                    break;
892                }
893                leaf_entries.insert(0, entry);
894                borrowed = true;
895            }
896
897            if borrowed {
898                write_leaf_entries(&mut left, &left_entries)?;
899                left.update_checksum();
900                self.pager.write_page(left_id, left)?;
901
902                write_leaf_entries(&mut leaf, &leaf_entries)?;
903                leaf.update_checksum();
904                self.pager.write_page(leaf_id, leaf)?;
905
906                if let Some((first_key, _)) = leaf_entries.first() {
907                    parent_keys[child_index - 1] = first_key.clone();
908                    write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
909                    parent.update_checksum();
910                    self.pager.write_page(parent_id, parent)?;
911                }
912
913                return Ok(());
914            }
915        }
916
917        if child_index + 1 < parent_children.len() {
918            let right_id = parent_children[child_index + 1];
919            let mut right = self.pager.read_page(right_id)?;
920            let mut right_entries = read_leaf_entries(&right)?;
921            let mut borrowed = false;
922
923            while leaf_entries_size(&leaf_entries) < min_bytes {
924                if right_entries.is_empty() {
925                    break;
926                }
927                let entry = right_entries.remove(0);
928                if leaf_entries_size(&right_entries) < min_bytes {
929                    right_entries.insert(0, entry);
930                    break;
931                }
932                leaf_entries.push(entry);
933                borrowed = true;
934            }
935
936            if borrowed {
937                write_leaf_entries(&mut right, &right_entries)?;
938                right.update_checksum();
939                self.pager.write_page(right_id, right)?;
940
941                write_leaf_entries(&mut leaf, &leaf_entries)?;
942                leaf.update_checksum();
943                self.pager.write_page(leaf_id, leaf)?;
944
945                if let Some((first_key, _)) = right_entries.first() {
946                    parent_keys[child_index] = first_key.clone();
947                    write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
948                    parent.update_checksum();
949                    self.pager.write_page(parent_id, parent)?;
950                }
951
952                return Ok(());
953            }
954        }
955
956        if child_index > 0 {
957            let left_id = parent_children[child_index - 1];
958            let mut left = self.pager.read_page(left_id)?;
959            let mut left_entries = read_leaf_entries(&left)?;
960
961            left_entries.extend(leaf_entries);
962            write_leaf_entries(&mut left, &left_entries)?;
963
964            let next_leaf = read_next_leaf(&leaf);
965            set_next_leaf(&mut left, next_leaf);
966            if next_leaf != 0 {
967                let mut next = self.pager.read_page(next_leaf)?;
968                set_prev_leaf(&mut next, left_id);
969                next.update_checksum();
970                self.pager.write_page(next_leaf, next)?;
971            }
972
973            left.update_checksum();
974            self.pager.write_page(left_id, left)?;
975            self.pager.free_page(leaf_id)?;
976
977            parent_keys.remove(child_index - 1);
978            parent_children.remove(child_index);
979            write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
980            parent.update_checksum();
981            self.pager.write_page(parent_id, parent)?;
982
983            let mut parent_path = path;
984            parent_path.pop();
985            return self.rebalance_interior(parent_id, parent_path);
986        }
987
988        if child_index + 1 < parent_children.len() {
989            let right_id = parent_children[child_index + 1];
990            let right = self.pager.read_page(right_id)?;
991            let right_entries = read_leaf_entries(&right)?;
992
993            leaf_entries.extend(right_entries);
994            write_leaf_entries(&mut leaf, &leaf_entries)?;
995
996            let next_leaf = read_next_leaf(&right);
997            set_next_leaf(&mut leaf, next_leaf);
998            if next_leaf != 0 {
999                let mut next = self.pager.read_page(next_leaf)?;
1000                set_prev_leaf(&mut next, leaf_id);
1001                next.update_checksum();
1002                self.pager.write_page(next_leaf, next)?;
1003            }
1004
1005            leaf.update_checksum();
1006            self.pager.write_page(leaf_id, leaf)?;
1007            self.pager.free_page(right_id)?;
1008
1009            parent_keys.remove(child_index);
1010            parent_children.remove(child_index + 1);
1011            write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
1012            parent.update_checksum();
1013            self.pager.write_page(parent_id, parent)?;
1014
1015            let mut parent_path = path;
1016            parent_path.pop();
1017            return self.rebalance_interior(parent_id, parent_path);
1018        }
1019
1020        Ok(())
1021    }
1022
1023    fn rebalance_interior(&self, node_id: u32, mut path: Vec<u32>) -> BTreeResult<()> {
1024        let root_id = self.root_page_id();
1025        let mut node = self.pager.read_page(node_id)?;
1026        let (mut node_keys, mut node_children) = read_interior_keys_children(&node)?;
1027        let min_bytes = interior_min_bytes();
1028
1029        if node_id == root_id {
1030            if node_keys.is_empty() {
1031                let next_root = node_children.first().copied().unwrap_or(0);
1032                self.pager.free_page(node_id)?;
1033                *self.root_page_id.write().map_err(|e| {
1034                    BTreeError::LockPoisoned(format!(
1035                        "rebalance_interior: root_page_id write lock: {e}"
1036                    ))
1037                })? = next_root;
1038            }
1039            return Ok(());
1040        }
1041
1042        if interior_entries_size(&node_keys) >= min_bytes {
1043            return Ok(());
1044        }
1045
1046        let parent_id = match path.pop() {
1047            Some(id) => id,
1048            None => return Ok(()),
1049        };
1050        let mut parent = self.pager.read_page(parent_id)?;
1051        let (mut parent_keys, mut parent_children) = read_interior_keys_children(&parent)?;
1052
1053        let child_index = parent_children
1054            .iter()
1055            .position(|&id| id == node_id)
1056            .ok_or_else(|| BTreeError::Corrupted("Interior missing from parent".into()))?;
1057
1058        if child_index > 0 {
1059            let left_id = parent_children[child_index - 1];
1060            let mut left = self.pager.read_page(left_id)?;
1061            let (mut left_keys, mut left_children) = read_interior_keys_children(&left)?;
1062
1063            if let Some(borrow_key) = left_keys.last().cloned() {
1064                let borrow_size = interior_key_size(&borrow_key);
1065                if interior_entries_size(&left_keys).saturating_sub(borrow_size) >= min_bytes {
1066                    let parent_key = parent_keys[child_index - 1].clone();
1067                    let borrowed_key = left_keys.pop().ok_or_else(|| {
1068                        BTreeError::Corrupted(
1069                            "rebalance_interior: left_keys empty after check".into(),
1070                        )
1071                    })?;
1072                    let borrowed_child = left_children.pop().ok_or_else(|| {
1073                        BTreeError::Corrupted("rebalance_interior: left_children empty".into())
1074                    })?;
1075
1076                    node_keys.insert(0, parent_key);
1077                    node_children.insert(0, borrowed_child);
1078                    parent_keys[child_index - 1] = borrowed_key;
1079
1080                    write_interior_entries(&mut left, &left_keys, &left_children)?;
1081                    left.update_checksum();
1082                    self.pager.write_page(left_id, left)?;
1083
1084                    write_interior_entries(&mut node, &node_keys, &node_children)?;
1085                    node.update_checksum();
1086                    self.pager.write_page(node_id, node)?;
1087
1088                    write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
1089                    parent.update_checksum();
1090                    self.pager.write_page(parent_id, parent)?;
1091
1092                    return Ok(());
1093                }
1094            }
1095        }
1096
1097        if child_index + 1 < parent_children.len() {
1098            let right_id = parent_children[child_index + 1];
1099            let mut right = self.pager.read_page(right_id)?;
1100            let (mut right_keys, mut right_children) = read_interior_keys_children(&right)?;
1101
1102            if let Some(borrow_key) = right_keys.first().cloned() {
1103                let borrow_size = interior_key_size(&borrow_key);
1104                if interior_entries_size(&right_keys).saturating_sub(borrow_size) >= min_bytes {
1105                    let parent_key = parent_keys[child_index].clone();
1106                    let new_parent_key = right_keys.remove(0);
1107                    let borrowed_child = right_children.remove(0);
1108
1109                    node_keys.push(parent_key);
1110                    node_children.push(borrowed_child);
1111                    parent_keys[child_index] = new_parent_key;
1112
1113                    write_interior_entries(&mut right, &right_keys, &right_children)?;
1114                    right.update_checksum();
1115                    self.pager.write_page(right_id, right)?;
1116
1117                    write_interior_entries(&mut node, &node_keys, &node_children)?;
1118                    node.update_checksum();
1119                    self.pager.write_page(node_id, node)?;
1120
1121                    write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
1122                    parent.update_checksum();
1123                    self.pager.write_page(parent_id, parent)?;
1124
1125                    return Ok(());
1126                }
1127            }
1128        }
1129
1130        if child_index > 0 {
1131            let left_id = parent_children[child_index - 1];
1132            let mut left = self.pager.read_page(left_id)?;
1133            let (mut left_keys, mut left_children) = read_interior_keys_children(&left)?;
1134            let parent_key = parent_keys.remove(child_index - 1);
1135            parent_children.remove(child_index);
1136
1137            left_keys.push(parent_key);
1138            left_keys.extend(node_keys);
1139            left_children.extend(node_children);
1140
1141            write_interior_entries(&mut left, &left_keys, &left_children)?;
1142            left.update_checksum();
1143            self.pager.write_page(left_id, left)?;
1144            self.pager.free_page(node_id)?;
1145
1146            write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
1147            parent.update_checksum();
1148            self.pager.write_page(parent_id, parent)?;
1149
1150            return self.rebalance_interior(parent_id, path);
1151        }
1152
1153        if child_index + 1 < parent_children.len() {
1154            let right_id = parent_children[child_index + 1];
1155            let right = self.pager.read_page(right_id)?;
1156            let (right_keys, right_children) = read_interior_keys_children(&right)?;
1157            let parent_key = parent_keys.remove(child_index);
1158            parent_children.remove(child_index + 1);
1159
1160            node_keys.push(parent_key);
1161            node_keys.extend(right_keys);
1162            node_children.extend(right_children);
1163
1164            write_interior_entries(&mut node, &node_keys, &node_children)?;
1165            node.update_checksum();
1166            self.pager.write_page(node_id, node)?;
1167            self.pager.free_page(right_id)?;
1168
1169            write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
1170            parent.update_checksum();
1171            self.pager.write_page(parent_id, parent)?;
1172
1173            return self.rebalance_interior(parent_id, path);
1174        }
1175
1176        Ok(())
1177    }
1178}