Skip to main content

citadel_buffer/
btree.rs

1//! CoW B+ tree engine. Mutations clone pages; old pages go to pending-free list.
2
3use crate::allocator::PageAllocator;
4use citadel_core::types::{PageId, PageType, TxnId, ValueType};
5use citadel_core::{Error, Result};
6use citadel_page::page::Page;
7use citadel_page::{branch_node, leaf_node};
8use rustc_hash::FxHashMap;
9
10/// B+ tree metadata. Pages stored externally.
11#[derive(Clone)]
12pub struct BTree {
13    pub root: PageId,
14    pub depth: u16,
15    pub entry_count: u64,
16    last_insert: Option<(Vec<(PageId, usize)>, PageId)>,
17    last_delete: Option<(Vec<(PageId, usize)>, PageId)>,
18}
19
20#[derive(Debug, Clone)]
21pub enum UpsertOutcome {
22    Inserted,
23    Updated,
24    Skipped,
25}
26
27#[derive(Debug, Clone)]
28pub enum UpsertAction {
29    Replace(Vec<u8>),
30    Skip,
31}
32
33impl BTree {
34    /// Create a new empty B+ tree with a single leaf root.
35    pub fn new(
36        pages: &mut FxHashMap<PageId, Page>,
37        alloc: &mut PageAllocator,
38        txn_id: TxnId,
39    ) -> Self {
40        let root_id = alloc.allocate();
41        let root = Page::new(root_id, PageType::Leaf, txn_id);
42        pages.insert(root_id, root);
43        Self {
44            root: root_id,
45            depth: 1,
46            entry_count: 0,
47            last_insert: None,
48            last_delete: None,
49        }
50    }
51
52    /// Create a BTree from existing metadata (e.g., loaded from commit slot).
53    pub fn from_existing(root: PageId, depth: u16, entry_count: u64) -> Self {
54        Self {
55            root,
56            depth,
57            entry_count,
58            last_insert: None,
59            last_delete: None,
60        }
61    }
62
63    pub fn search_at_leaf(
64        pages: &FxHashMap<PageId, Page>,
65        leaf_id: PageId,
66        key: &[u8],
67    ) -> Result<Option<(ValueType, Vec<u8>)>> {
68        let page = pages.get(&leaf_id).ok_or(Error::PageOutOfBounds(leaf_id))?;
69        match leaf_node::search(page, key) {
70            Ok(idx) => {
71                let cell = leaf_node::read_cell(page, idx);
72                Ok(Some((cell.val_type, cell.value.to_vec())))
73            }
74            Err(_) => Ok(None),
75        }
76    }
77
78    pub fn search(
79        &self,
80        pages: &FxHashMap<PageId, Page>,
81        key: &[u8],
82    ) -> Result<Option<(ValueType, Vec<u8>)>> {
83        let mut current = self.root;
84        loop {
85            let page = pages.get(&current).ok_or(Error::PageOutOfBounds(current))?;
86            match page.page_type() {
87                Some(PageType::Leaf) => {
88                    return match leaf_node::search(page, key) {
89                        Ok(idx) => {
90                            let cell = leaf_node::read_cell(page, idx);
91                            Ok(Some((cell.val_type, cell.value.to_vec())))
92                        }
93                        Err(_) => Ok(None),
94                    };
95                }
96                Some(PageType::Branch) => {
97                    let idx = branch_node::search_child_index(page, key);
98                    current = branch_node::get_child(page, idx);
99                }
100                _ => {
101                    return Err(Error::InvalidPageType(page.page_type_raw(), current));
102                }
103            }
104        }
105    }
106
107    /// Clear both LIL caches. Used at every site that previously cleared
108    /// `last_insert` alone, so a delete cache cannot survive a mutation
109    /// that may have shifted the cached leaf's path or range.
110    #[inline]
111    fn clear_lil_caches(&mut self) {
112        self.last_insert = None;
113        self.last_delete = None;
114    }
115
116    pub fn debug_assert_lil_disjoint(&self) {
117        debug_assert!(
118            self.last_insert.is_none() || self.last_delete.is_none(),
119            "LIL caches must be mutually exclusive (both Some indicates a missed clear site)"
120        );
121    }
122
123    pub fn lil_would_hit(&self, pages: &FxHashMap<PageId, Page>, key: &[u8]) -> bool {
124        if let Some((_, cached_leaf)) = &self.last_insert {
125            if let Some(page) = pages.get(cached_leaf) {
126                let n = page.num_cells();
127                return n > 0 && key > leaf_node::read_cell(page, n - 1).key;
128            }
129        }
130        false
131    }
132
133    /// Combined LIL check + insert. Returns `Some(was_new)` on hit, `None` on miss.
134    pub fn try_lil_insert(
135        &mut self,
136        pages: &mut FxHashMap<PageId, Page>,
137        alloc: &mut PageAllocator,
138        txn_id: TxnId,
139        key: &[u8],
140        val_type: ValueType,
141        value: &[u8],
142    ) -> Result<Option<bool>> {
143        let cached_leaf = match self.last_insert.as_ref() {
144            Some((_, leaf)) => *leaf,
145            None => return Ok(None),
146        };
147        let (hit, needs_cow) = {
148            let page = pages
149                .get(&cached_leaf)
150                .ok_or(Error::PageOutOfBounds(cached_leaf))?;
151            let n = page.num_cells();
152            let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
153            let nc = page.txn_id() != txn_id;
154            (h, nc)
155        };
156        if !hit {
157            return Ok(None);
158        }
159        let mut cached_path = self.last_insert.take().unwrap().0;
160        let cow_id = if needs_cow {
161            cow_page(pages, alloc, cached_leaf, txn_id)
162        } else {
163            cached_leaf
164        };
165        let ok = {
166            let page = pages.get_mut(&cow_id).unwrap();
167            leaf_node::insert_append_direct(page, key, val_type, value)
168        };
169        if ok {
170            if cow_id != cached_leaf {
171                self.root = propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
172            }
173            self.entry_count += 1;
174            self.last_delete = None;
175            self.last_insert = Some((cached_path, cow_id));
176            return Ok(Some(true));
177        }
178        let (sep_key, right_id) =
179            split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
180        self.root = propagate_split_up(
181            pages,
182            alloc,
183            txn_id,
184            &cached_path,
185            cow_id,
186            &sep_key,
187            right_id,
188            &mut self.depth,
189        );
190        self.clear_lil_caches();
191        self.entry_count += 1;
192        Ok(Some(true))
193    }
194
195    /// LIL fast-path delete. Returns `Some((deleted, overflow_head))` on
196    /// cache hit, `None` on miss. Mirrors `try_lil_insert` but with a
197    /// range predicate (`first_key <= key <= last_key`) and the empty-leaf
198    /// fallback from `delete_at_leaf`.
199    pub fn try_lil_delete(
200        &mut self,
201        pages: &mut FxHashMap<PageId, Page>,
202        alloc: &mut PageAllocator,
203        txn_id: TxnId,
204        key: &[u8],
205    ) -> Result<Option<(bool, Option<PageId>)>> {
206        let cached_leaf = match self.last_delete.as_ref() {
207            Some((_, leaf)) => *leaf,
208            None => return Ok(None),
209        };
210        let (in_range, found_idx, overflow_head, needs_cow) = {
211            let Some(page) = pages.get(&cached_leaf) else {
212                self.last_delete = None;
213                return Ok(None);
214            };
215            if !matches!(page.page_type(), Some(PageType::Leaf)) {
216                self.last_delete = None;
217                return Ok(None);
218            }
219            let n = page.num_cells();
220            if n == 0 {
221                self.last_delete = None;
222                return Ok(None);
223            }
224            let first = leaf_node::read_cell(page, 0).key;
225            let last = leaf_node::read_cell(page, n - 1).key;
226            let in_range = key >= first && key <= last;
227            if !in_range {
228                return Ok(None);
229            }
230            let (found_idx, overflow_head) = match leaf_node::search(page, key) {
231                Ok(idx) => {
232                    let cell = leaf_node::read_cell(page, idx);
233                    let head = if cell.val_type == ValueType::Overflow {
234                        Some(leaf_node::OverflowRef::from_bytes(cell.value).first_page)
235                    } else {
236                        None
237                    };
238                    (Some(idx), head)
239                }
240                Err(_) => (None, None),
241            };
242            let nc = page.txn_id() != txn_id;
243            (in_range, found_idx, overflow_head, nc)
244        };
245        if !in_range {
246            return Ok(None);
247        }
248        if found_idx.is_none() {
249            return Ok(Some((false, None)));
250        }
251
252        let mut cached_path = self.last_delete.take().unwrap().0;
253        let cow_id = if needs_cow {
254            cow_page(pages, alloc, cached_leaf, txn_id)
255        } else {
256            cached_leaf
257        };
258        {
259            let page = pages.get_mut(&cow_id).unwrap();
260            leaf_node::delete(page, key);
261        }
262
263        let leaf_empty = pages.get(&cow_id).unwrap().num_cells() == 0;
264
265        if !leaf_empty || cached_path.is_empty() {
266            if alloc.in_place() && cow_id == cached_leaf {
267                self.entry_count -= 1;
268                self.clear_lil_caches();
269                self.last_delete = Some((cached_path, cow_id));
270                return Ok(Some((true, overflow_head)));
271            }
272            if cow_id != cached_leaf {
273                self.root = propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
274            }
275            self.entry_count -= 1;
276            self.clear_lil_caches();
277            self.last_delete = Some((cached_path, cow_id));
278            return Ok(Some((true, overflow_head)));
279        }
280
281        alloc.free(cow_id);
282        pages.remove(&cow_id);
283        self.root = propagate_remove_up(pages, alloc, txn_id, &mut cached_path, &mut self.depth);
284        self.entry_count -= 1;
285        self.clear_lil_caches();
286        self.last_delete = None;
287        Ok(Some((true, overflow_head)))
288    }
289
290    /// Insert key-value. Returns `true` if new, `false` if updated existing.
291    pub fn insert(
292        &mut self,
293        pages: &mut FxHashMap<PageId, Page>,
294        alloc: &mut PageAllocator,
295        txn_id: TxnId,
296        key: &[u8],
297        val_type: ValueType,
298        value: &[u8],
299    ) -> Result<bool> {
300        // LIL cache: skip walk_to_leaf for sequential appends to the rightmost leaf.
301        if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
302            let (hit, needs_cow) = {
303                let page = pages
304                    .get(&cached_leaf)
305                    .ok_or(Error::PageOutOfBounds(cached_leaf))?;
306                let n = page.num_cells();
307                let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
308                let nc = page.txn_id() != txn_id;
309                (h, nc)
310            };
311            if hit {
312                let cow_id = if needs_cow {
313                    cow_page(pages, alloc, cached_leaf, txn_id)
314                } else {
315                    cached_leaf
316                };
317                let ok = {
318                    let page = pages.get_mut(&cow_id).unwrap();
319                    leaf_node::insert_direct(page, key, val_type, value)
320                };
321                if ok {
322                    if cow_id != cached_leaf {
323                        self.root =
324                            propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
325                    }
326                    self.entry_count += 1;
327                    self.last_delete = None;
328                    self.last_insert = Some((cached_path, cow_id));
329                    return Ok(true);
330                }
331                let (sep_key, right_id) =
332                    split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
333                self.root = propagate_split_up(
334                    pages,
335                    alloc,
336                    txn_id,
337                    &cached_path,
338                    cow_id,
339                    &sep_key,
340                    right_id,
341                    &mut self.depth,
342                );
343                self.clear_lil_caches();
344                self.entry_count += 1;
345                return Ok(true);
346            }
347        }
348
349        let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
350        let (was_new, _replaced) =
351            self.insert_at_leaf(pages, alloc, txn_id, key, val_type, value, path, leaf_id)?;
352        Ok(was_new)
353    }
354
355    #[allow(clippy::too_many_arguments)]
356    #[inline]
357    pub fn insert_at_leaf(
358        &mut self,
359        pages: &mut FxHashMap<PageId, Page>,
360        alloc: &mut PageAllocator,
361        txn_id: TxnId,
362        key: &[u8],
363        val_type: ValueType,
364        value: &[u8],
365        path: Vec<(PageId, usize)>,
366        leaf_id: PageId,
367    ) -> Result<(bool, Option<PageId>)> {
368        let (key_exists, replaced_overflow) = {
369            let page = pages.get(&leaf_id).unwrap();
370            match leaf_node::search(page, key) {
371                Ok(idx) => {
372                    let cell = leaf_node::read_cell(page, idx);
373                    let head = if cell.val_type == ValueType::Overflow {
374                        Some(leaf_node::OverflowRef::from_bytes(cell.value).first_page)
375                    } else {
376                        None
377                    };
378                    (true, head)
379                }
380                Err(_) => (false, None),
381            }
382        };
383
384        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
385
386        let leaf_ok = {
387            let page = pages.get_mut(&new_leaf_id).unwrap();
388            leaf_node::insert_direct(page, key, val_type, value)
389        };
390
391        if leaf_ok {
392            if alloc.in_place() && new_leaf_id == leaf_id {
393                let mut is_rightmost = true;
394                for &(ancestor_id, child_idx) in path.iter().rev() {
395                    let page = pages.get(&ancestor_id).unwrap();
396                    if child_idx != page.num_cells() as usize {
397                        is_rightmost = false;
398                        break;
399                    }
400                }
401                if is_rightmost {
402                    self.last_delete = None;
403                    self.last_insert = Some((path, new_leaf_id));
404                }
405                if !key_exists {
406                    self.entry_count += 1;
407                }
408                return Ok((!key_exists, replaced_overflow));
409            }
410            let mut child = new_leaf_id;
411            let mut is_rightmost = true;
412            let mut new_path = path;
413            for i in (0..new_path.len()).rev() {
414                let (ancestor_id, child_idx) = new_path[i];
415                let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
416                let page = pages.get_mut(&new_ancestor).unwrap();
417                update_branch_child(page, child_idx, child);
418                if child_idx != page.num_cells() as usize {
419                    is_rightmost = false;
420                }
421                new_path[i] = (new_ancestor, child_idx);
422                child = new_ancestor;
423            }
424            self.root = child;
425
426            if is_rightmost {
427                self.last_delete = None;
428                self.last_insert = Some((new_path, new_leaf_id));
429            }
430
431            if !key_exists {
432                self.entry_count += 1;
433            }
434            return Ok((!key_exists, replaced_overflow));
435        }
436
437        self.clear_lil_caches();
438        let (sep_key, right_id) =
439            split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
440        self.root = propagate_split_up(
441            pages,
442            alloc,
443            txn_id,
444            &path,
445            new_leaf_id,
446            &sep_key,
447            right_id,
448            &mut self.depth,
449        );
450
451        if !key_exists {
452            self.entry_count += 1;
453        }
454        Ok((!key_exists, replaced_overflow))
455    }
456
457    pub fn insert_or_fetch(
458        &mut self,
459        pages: &mut FxHashMap<PageId, Page>,
460        alloc: &mut PageAllocator,
461        txn_id: TxnId,
462        key: &[u8],
463        val_type: ValueType,
464        value: &[u8],
465    ) -> Result<Option<Vec<u8>>> {
466        if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
467            let (hit, needs_cow) = {
468                let page = pages
469                    .get(&cached_leaf)
470                    .ok_or(Error::PageOutOfBounds(cached_leaf))?;
471                let n = page.num_cells();
472                let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
473                let nc = page.txn_id() != txn_id;
474                (h, nc)
475            };
476            if hit {
477                let cow_id = if needs_cow {
478                    cow_page(pages, alloc, cached_leaf, txn_id)
479                } else {
480                    cached_leaf
481                };
482                let ok = {
483                    let page = pages.get_mut(&cow_id).unwrap();
484                    leaf_node::insert_direct(page, key, val_type, value)
485                };
486                if ok {
487                    if cow_id != cached_leaf {
488                        self.root =
489                            propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
490                    }
491                    self.entry_count += 1;
492                    self.last_delete = None;
493                    self.last_insert = Some((cached_path, cow_id));
494                    return Ok(None);
495                }
496                let (sep_key, right_id) =
497                    split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
498                self.root = propagate_split_up(
499                    pages,
500                    alloc,
501                    txn_id,
502                    &cached_path,
503                    cow_id,
504                    &sep_key,
505                    right_id,
506                    &mut self.depth,
507                );
508                self.clear_lil_caches();
509                self.entry_count += 1;
510                return Ok(None);
511            }
512            self.last_insert = Some((cached_path, cached_leaf));
513        }
514
515        let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
516
517        let existing_value = {
518            let page = pages.get(&leaf_id).unwrap();
519            match leaf_node::search(page, key) {
520                Ok(idx) => {
521                    let cell = leaf_node::read_cell(page, idx);
522                    if matches!(cell.val_type, ValueType::Tombstone) {
523                        None
524                    } else {
525                        Some(cell.value.to_vec())
526                    }
527                }
528                Err(_) => None,
529            }
530        };
531        if let Some(v) = existing_value {
532            return Ok(Some(v));
533        }
534
535        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
536        let leaf_ok = {
537            let page = pages.get_mut(&new_leaf_id).unwrap();
538            leaf_node::insert_direct(page, key, val_type, value)
539        };
540
541        if leaf_ok {
542            if alloc.in_place() && new_leaf_id == leaf_id {
543                let mut is_rightmost = true;
544                for &(ancestor_id, child_idx) in path.iter().rev() {
545                    let page = pages.get(&ancestor_id).unwrap();
546                    if child_idx != page.num_cells() as usize {
547                        is_rightmost = false;
548                        break;
549                    }
550                }
551                if is_rightmost {
552                    self.last_delete = None;
553                    self.last_insert = Some((path, new_leaf_id));
554                }
555                self.entry_count += 1;
556                return Ok(None);
557            }
558            let mut child = new_leaf_id;
559            let mut is_rightmost = true;
560            let mut new_path = path;
561            for i in (0..new_path.len()).rev() {
562                let (ancestor_id, child_idx) = new_path[i];
563                let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
564                let page = pages.get_mut(&new_ancestor).unwrap();
565                update_branch_child(page, child_idx, child);
566                if child_idx != page.num_cells() as usize {
567                    is_rightmost = false;
568                }
569                new_path[i] = (new_ancestor, child_idx);
570                child = new_ancestor;
571            }
572            self.root = child;
573
574            if is_rightmost {
575                self.last_delete = None;
576                self.last_insert = Some((new_path, new_leaf_id));
577            }
578            self.entry_count += 1;
579            return Ok(None);
580        }
581
582        self.clear_lil_caches();
583        let (sep_key, right_id) =
584            split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
585        self.root = propagate_split_up(
586            pages,
587            alloc,
588            txn_id,
589            &path,
590            new_leaf_id,
591            &sep_key,
592            right_id,
593            &mut self.depth,
594        );
595        self.entry_count += 1;
596        Ok(None)
597    }
598
599    #[inline]
600    pub fn insert_if_absent(
601        &mut self,
602        pages: &mut FxHashMap<PageId, Page>,
603        alloc: &mut PageAllocator,
604        txn_id: TxnId,
605        key: &[u8],
606        val_type: ValueType,
607        value: &[u8],
608    ) -> Result<bool> {
609        if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
610            let (hit, needs_cow) = {
611                let page = pages
612                    .get(&cached_leaf)
613                    .ok_or(Error::PageOutOfBounds(cached_leaf))?;
614                let n = page.num_cells();
615                let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
616                let nc = page.txn_id() != txn_id;
617                (h, nc)
618            };
619            if hit {
620                let cow_id = if needs_cow {
621                    cow_page(pages, alloc, cached_leaf, txn_id)
622                } else {
623                    cached_leaf
624                };
625                let ok = {
626                    let page = pages.get_mut(&cow_id).unwrap();
627                    leaf_node::insert_direct(page, key, val_type, value)
628                };
629                if ok {
630                    if cow_id != cached_leaf {
631                        self.root =
632                            propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
633                    }
634                    self.entry_count += 1;
635                    self.last_delete = None;
636                    self.last_insert = Some((cached_path, cow_id));
637                    return Ok(true);
638                }
639                let (sep_key, right_id) =
640                    split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
641                self.root = propagate_split_up(
642                    pages,
643                    alloc,
644                    txn_id,
645                    &cached_path,
646                    cow_id,
647                    &sep_key,
648                    right_id,
649                    &mut self.depth,
650                );
651                self.clear_lil_caches();
652                self.entry_count += 1;
653                return Ok(true);
654            }
655            self.last_insert = Some((cached_path, cached_leaf));
656        }
657
658        let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
659        self.insert_if_absent_at_leaf(pages, alloc, txn_id, key, val_type, value, path, leaf_id)
660    }
661
662    #[allow(clippy::too_many_arguments)]
663    #[inline]
664    pub fn insert_if_absent_at_leaf(
665        &mut self,
666        pages: &mut FxHashMap<PageId, Page>,
667        alloc: &mut PageAllocator,
668        txn_id: TxnId,
669        key: &[u8],
670        val_type: ValueType,
671        value: &[u8],
672        path: Vec<(PageId, usize)>,
673        leaf_id: PageId,
674    ) -> Result<bool> {
675        let exists = {
676            let page = pages.get(&leaf_id).unwrap();
677            match leaf_node::search(page, key) {
678                Ok(idx) => {
679                    let cell = leaf_node::read_cell(page, idx);
680                    !matches!(cell.val_type, ValueType::Tombstone)
681                }
682                Err(_) => false,
683            }
684        };
685        if exists {
686            return Ok(false);
687        }
688
689        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
690        let leaf_ok = {
691            let page = pages.get_mut(&new_leaf_id).unwrap();
692            leaf_node::insert_direct(page, key, val_type, value)
693        };
694
695        if leaf_ok {
696            if alloc.in_place() && new_leaf_id == leaf_id {
697                let mut is_rightmost = true;
698                for &(ancestor_id, child_idx) in path.iter().rev() {
699                    let page = pages.get(&ancestor_id).unwrap();
700                    if child_idx != page.num_cells() as usize {
701                        is_rightmost = false;
702                        break;
703                    }
704                }
705                if is_rightmost {
706                    self.last_delete = None;
707                    self.last_insert = Some((path, new_leaf_id));
708                }
709                self.entry_count += 1;
710                return Ok(true);
711            }
712            let mut child = new_leaf_id;
713            let mut is_rightmost = true;
714            let mut new_path = path;
715            for i in (0..new_path.len()).rev() {
716                let (ancestor_id, child_idx) = new_path[i];
717                let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
718                let page = pages.get_mut(&new_ancestor).unwrap();
719                update_branch_child(page, child_idx, child);
720                if child_idx != page.num_cells() as usize {
721                    is_rightmost = false;
722                }
723                new_path[i] = (new_ancestor, child_idx);
724                child = new_ancestor;
725            }
726            self.root = child;
727
728            if is_rightmost {
729                self.last_delete = None;
730                self.last_insert = Some((new_path, new_leaf_id));
731            }
732            self.entry_count += 1;
733            return Ok(true);
734        }
735
736        self.clear_lil_caches();
737        let (sep_key, right_id) =
738            split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
739        self.root = propagate_split_up(
740            pages,
741            alloc,
742            txn_id,
743            &path,
744            new_leaf_id,
745            &sep_key,
746            right_id,
747            &mut self.depth,
748        );
749        self.entry_count += 1;
750        Ok(true)
751    }
752
753    #[allow(clippy::too_many_arguments)]
754    #[inline]
755    pub fn upsert_with<F, E>(
756        &mut self,
757        pages: &mut FxHashMap<PageId, Page>,
758        alloc: &mut PageAllocator,
759        txn_id: TxnId,
760        key: &[u8],
761        val_type: ValueType,
762        default_value: &[u8],
763        f: F,
764    ) -> std::result::Result<UpsertOutcome, E>
765    where
766        F: FnMut(&[u8]) -> std::result::Result<UpsertAction, E>,
767        E: From<Error>,
768    {
769        if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
770            let (hit, needs_cow) = {
771                let page = pages
772                    .get(&cached_leaf)
773                    .ok_or(Error::PageOutOfBounds(cached_leaf))?;
774                let n = page.num_cells();
775                let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
776                let nc = page.txn_id() != txn_id;
777                (h, nc)
778            };
779            if hit {
780                let cow_id = if needs_cow {
781                    cow_page(pages, alloc, cached_leaf, txn_id)
782                } else {
783                    cached_leaf
784                };
785                let ok = {
786                    let page = pages.get_mut(&cow_id).unwrap();
787                    leaf_node::insert_direct(page, key, val_type, default_value)
788                };
789                if ok {
790                    if cow_id != cached_leaf {
791                        self.root =
792                            propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
793                    }
794                    self.entry_count += 1;
795                    self.last_delete = None;
796                    self.last_insert = Some((cached_path, cow_id));
797                    return Ok(UpsertOutcome::Inserted);
798                }
799                let (sep_key, right_id) = split_leaf_with_insert(
800                    pages,
801                    alloc,
802                    txn_id,
803                    cow_id,
804                    key,
805                    val_type,
806                    default_value,
807                );
808                self.root = propagate_split_up(
809                    pages,
810                    alloc,
811                    txn_id,
812                    &cached_path,
813                    cow_id,
814                    &sep_key,
815                    right_id,
816                    &mut self.depth,
817                );
818                self.clear_lil_caches();
819                self.entry_count += 1;
820                return Ok(UpsertOutcome::Inserted);
821            }
822            self.last_insert = Some((cached_path, cached_leaf));
823        }
824
825        let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
826        self.upsert_with_at_leaf(
827            pages,
828            alloc,
829            txn_id,
830            key,
831            val_type,
832            default_value,
833            path,
834            leaf_id,
835            f,
836        )
837    }
838
839    #[allow(clippy::too_many_arguments)]
840    #[inline]
841    pub fn upsert_with_at_leaf<F, E>(
842        &mut self,
843        pages: &mut FxHashMap<PageId, Page>,
844        alloc: &mut PageAllocator,
845        txn_id: TxnId,
846        key: &[u8],
847        val_type: ValueType,
848        default_value: &[u8],
849        path: Vec<(PageId, usize)>,
850        leaf_id: PageId,
851        mut f: F,
852    ) -> std::result::Result<UpsertOutcome, E>
853    where
854        F: FnMut(&[u8]) -> std::result::Result<UpsertAction, E>,
855        E: From<Error>,
856    {
857        let action = {
858            let page = pages.get(&leaf_id).unwrap();
859            match leaf_node::search(page, key) {
860                Ok(idx) => {
861                    let cell = leaf_node::read_cell(page, idx);
862                    if matches!(cell.val_type, ValueType::Tombstone) {
863                        None
864                    } else {
865                        Some(f(cell.value)?)
866                    }
867                }
868                Err(_) => None,
869            }
870        };
871
872        if let Some(act) = action {
873            match act {
874                UpsertAction::Skip => return Ok(UpsertOutcome::Skipped),
875                UpsertAction::Replace(new_bytes) => {
876                    let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
877                    let leaf_ok = {
878                        let page = pages.get_mut(&new_leaf_id).unwrap();
879                        leaf_node::insert_direct(page, key, val_type, &new_bytes)
880                    };
881                    if leaf_ok {
882                        if new_leaf_id != leaf_id {
883                            let mut new_path = path;
884                            self.root =
885                                propagate_cow_up(pages, alloc, txn_id, &mut new_path, new_leaf_id);
886                        }
887                        return Ok(UpsertOutcome::Updated);
888                    }
889                    self.clear_lil_caches();
890                    let (sep_key, right_id) = split_leaf_with_insert(
891                        pages,
892                        alloc,
893                        txn_id,
894                        new_leaf_id,
895                        key,
896                        val_type,
897                        &new_bytes,
898                    );
899                    self.root = propagate_split_up(
900                        pages,
901                        alloc,
902                        txn_id,
903                        &path,
904                        new_leaf_id,
905                        &sep_key,
906                        right_id,
907                        &mut self.depth,
908                    );
909                    return Ok(UpsertOutcome::Updated);
910                }
911            }
912        }
913
914        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
915        let leaf_ok = {
916            let page = pages.get_mut(&new_leaf_id).unwrap();
917            leaf_node::insert_direct(page, key, val_type, default_value)
918        };
919
920        if leaf_ok {
921            if alloc.in_place() && new_leaf_id == leaf_id {
922                let mut is_rightmost = true;
923                for &(ancestor_id, child_idx) in path.iter().rev() {
924                    let page = pages.get(&ancestor_id).unwrap();
925                    if child_idx != page.num_cells() as usize {
926                        is_rightmost = false;
927                        break;
928                    }
929                }
930                if is_rightmost {
931                    self.last_delete = None;
932                    self.last_insert = Some((path, new_leaf_id));
933                }
934                self.entry_count += 1;
935                return Ok(UpsertOutcome::Inserted);
936            }
937            let mut child = new_leaf_id;
938            let mut is_rightmost = true;
939            let mut new_path = path;
940            for i in (0..new_path.len()).rev() {
941                let (ancestor_id, child_idx) = new_path[i];
942                let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
943                let page = pages.get_mut(&new_ancestor).unwrap();
944                update_branch_child(page, child_idx, child);
945                if child_idx != page.num_cells() as usize {
946                    is_rightmost = false;
947                }
948                new_path[i] = (new_ancestor, child_idx);
949                child = new_ancestor;
950            }
951            self.root = child;
952
953            if is_rightmost {
954                self.last_delete = None;
955                self.last_insert = Some((new_path, new_leaf_id));
956            }
957            self.entry_count += 1;
958            return Ok(UpsertOutcome::Inserted);
959        }
960
961        self.clear_lil_caches();
962        let (sep_key, right_id) = split_leaf_with_insert(
963            pages,
964            alloc,
965            txn_id,
966            new_leaf_id,
967            key,
968            val_type,
969            default_value,
970        );
971        self.root = propagate_split_up(
972            pages,
973            alloc,
974            txn_id,
975            &path,
976            new_leaf_id,
977            &sep_key,
978            right_id,
979            &mut self.depth,
980        );
981        self.entry_count += 1;
982        Ok(UpsertOutcome::Inserted)
983    }
984
985    /// Bulk-update existing keys. Keys must be sorted.
986    pub fn update_sorted(
987        &mut self,
988        pages: &mut FxHashMap<PageId, Page>,
989        alloc: &mut PageAllocator,
990        txn_id: TxnId,
991        pairs: &[(&[u8], &[u8])],
992    ) -> Result<u64> {
993        if pairs.is_empty() {
994            return Ok(0);
995        }
996        self.clear_lil_caches();
997
998        let (mut path, mut leaf_id) = self.walk_to_leaf(pages, pairs[0].0)?;
999        let mut cow_leaf = cow_page(pages, alloc, leaf_id, txn_id);
1000        if cow_leaf != leaf_id {
1001            self.root = propagate_cow_up(pages, alloc, txn_id, &mut path, cow_leaf);
1002        }
1003
1004        let mut count: u64 = 0;
1005        let mut hint: u16 = 0;
1006
1007        for &(key, value) in pairs {
1008            let past_leaf = {
1009                let page = pages.get(&cow_leaf).unwrap();
1010                let n = page.num_cells();
1011                n == 0 || key > leaf_node::read_cell(page, n - 1).key
1012            };
1013
1014            if past_leaf {
1015                let (new_path, new_leaf) = self.walk_to_leaf(pages, key)?;
1016                path = new_path;
1017                leaf_id = new_leaf;
1018                cow_leaf = cow_page(pages, alloc, leaf_id, txn_id);
1019                if cow_leaf != leaf_id {
1020                    self.root = propagate_cow_up(pages, alloc, txn_id, &mut path, cow_leaf);
1021                }
1022                hint = 0;
1023            }
1024
1025            let page = pages.get(&cow_leaf).unwrap();
1026            let n = page.num_cells();
1027            let idx = {
1028                let mut i = hint;
1029                loop {
1030                    if i >= n {
1031                        break None;
1032                    }
1033                    let cell = leaf_node::read_cell(page, i);
1034                    match key.cmp(cell.key) {
1035                        std::cmp::Ordering::Equal => break Some(i),
1036                        std::cmp::Ordering::Less => break None,
1037                        std::cmp::Ordering::Greater => i += 1,
1038                    }
1039                }
1040            };
1041
1042            if let Some(idx) = idx {
1043                hint = idx + 1;
1044                let page = pages.get_mut(&cow_leaf).unwrap();
1045                if !leaf_node::update_value_in_place(page, idx, ValueType::Inline, value) {
1046                    leaf_node::insert_direct(page, key, ValueType::Inline, value);
1047                }
1048                count += 1;
1049            }
1050        }
1051
1052        Ok(count)
1053    }
1054
1055    /// Delete a key. Returns `true` if the key was found and deleted.
1056    pub fn delete(
1057        &mut self,
1058        pages: &mut FxHashMap<PageId, Page>,
1059        alloc: &mut PageAllocator,
1060        txn_id: TxnId,
1061        key: &[u8],
1062    ) -> Result<bool> {
1063        let (mut path, leaf_id) = self.walk_to_leaf(pages, key)?;
1064        self.delete_at_leaf(pages, alloc, txn_id, key, &mut path, leaf_id)
1065    }
1066
1067    #[allow(clippy::ptr_arg)]
1068    pub fn delete_at_leaf(
1069        &mut self,
1070        pages: &mut FxHashMap<PageId, Page>,
1071        alloc: &mut PageAllocator,
1072        txn_id: TxnId,
1073        key: &[u8],
1074        path: &mut Vec<(PageId, usize)>,
1075        leaf_id: PageId,
1076    ) -> Result<bool> {
1077        self.clear_lil_caches();
1078
1079        let found = {
1080            let page = pages.get(&leaf_id).unwrap();
1081            leaf_node::search(page, key).is_ok()
1082        };
1083        if !found {
1084            return Ok(false);
1085        }
1086
1087        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
1088        {
1089            let page = pages.get_mut(&new_leaf_id).unwrap();
1090            leaf_node::delete(page, key);
1091        }
1092
1093        let leaf_empty = pages.get(&new_leaf_id).unwrap().num_cells() == 0;
1094
1095        if !leaf_empty || path.is_empty() {
1096            if alloc.in_place() && new_leaf_id == leaf_id {
1097                self.entry_count -= 1;
1098                self.last_delete = Some((path.clone(), new_leaf_id));
1099                return Ok(true);
1100            }
1101            self.root = propagate_cow_up(pages, alloc, txn_id, path, new_leaf_id);
1102            self.entry_count -= 1;
1103            self.last_delete = Some((path.clone(), new_leaf_id));
1104            return Ok(true);
1105        }
1106
1107        alloc.free(new_leaf_id);
1108        pages.remove(&new_leaf_id);
1109
1110        self.root = propagate_remove_up(pages, alloc, txn_id, path, &mut self.depth);
1111        self.entry_count -= 1;
1112        self.last_delete = None;
1113        Ok(true)
1114    }
1115
1116    /// Walk root to leaf for `key`. Returns (path, leaf_page_id).
1117    pub fn walk_to_leaf(
1118        &self,
1119        pages: &FxHashMap<PageId, Page>,
1120        key: &[u8],
1121    ) -> Result<(Vec<(PageId, usize)>, PageId)> {
1122        let mut path = Vec::with_capacity(self.depth as usize);
1123        let mut current = self.root;
1124        loop {
1125            let page = pages.get(&current).ok_or(Error::PageOutOfBounds(current))?;
1126            match page.page_type() {
1127                Some(PageType::Leaf) => return Ok((path, current)),
1128                Some(PageType::Branch) => {
1129                    let child_idx = branch_node::search_child_index(page, key);
1130                    let child = branch_node::get_child(page, child_idx);
1131                    path.push((current, child_idx));
1132                    current = child;
1133                }
1134                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
1135            }
1136        }
1137    }
1138}
1139
1140/// CoW a page. No-op if already owned by this txn. In-place mode reuses page ID.
1141pub fn cow_page(
1142    pages: &mut FxHashMap<PageId, Page>,
1143    alloc: &mut PageAllocator,
1144    old_id: PageId,
1145    txn_id: TxnId,
1146) -> PageId {
1147    if alloc.in_place() {
1148        let page = pages.get_mut(&old_id).unwrap();
1149        if page.txn_id() != txn_id {
1150            page.set_txn_id(txn_id);
1151        }
1152        return old_id;
1153    }
1154    let mut new_page = {
1155        let page = pages.get(&old_id).unwrap();
1156        if page.txn_id() == txn_id {
1157            return old_id;
1158        }
1159        page.clone()
1160    };
1161    let new_id = alloc.allocate();
1162    new_page.set_page_id(new_id);
1163    new_page.set_txn_id(txn_id);
1164    pages.insert(new_id, new_page);
1165    alloc.free(old_id);
1166    new_id
1167}
1168
1169/// Update a branch's child pointer at `child_idx` to point to `new_child`.
1170fn update_branch_child(page: &mut Page, child_idx: usize, new_child: PageId) {
1171    let n = page.num_cells() as usize;
1172    if child_idx < n {
1173        let offset = page.cell_offset(child_idx as u16) as usize;
1174        page.data[offset..offset + 4].copy_from_slice(&new_child.as_u32().to_le_bytes());
1175    } else {
1176        page.set_right_child(new_child);
1177    }
1178}
1179
1180pub fn propagate_cow_up(
1181    pages: &mut FxHashMap<PageId, Page>,
1182    alloc: &mut PageAllocator,
1183    txn_id: TxnId,
1184    path: &mut [(PageId, usize)],
1185    mut new_child: PageId,
1186) -> PageId {
1187    for i in (0..path.len()).rev() {
1188        let (ancestor_id, child_idx) = path[i];
1189        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
1190        let page = pages.get_mut(&new_ancestor).unwrap();
1191        update_branch_child(page, child_idx, new_child);
1192        path[i] = (new_ancestor, child_idx);
1193        new_child = new_ancestor;
1194    }
1195    new_child
1196}
1197
1198/// Split full leaf and insert. Returns (separator_key, right_page_id).
1199fn split_leaf_with_insert(
1200    pages: &mut FxHashMap<PageId, Page>,
1201    alloc: &mut PageAllocator,
1202    txn_id: TxnId,
1203    leaf_id: PageId,
1204    key: &[u8],
1205    val_type: ValueType,
1206    value: &[u8],
1207) -> (Vec<u8>, PageId) {
1208    let mut cells: Vec<(Vec<u8>, Vec<u8>)> = {
1209        let page = pages.get(&leaf_id).unwrap();
1210        let n = page.num_cells() as usize;
1211        (0..n)
1212            .map(|i| {
1213                let cell = leaf_node::read_cell(page, i as u16);
1214                let raw = leaf_node::read_cell_bytes(page, i as u16);
1215                (cell.key.to_vec(), raw)
1216            })
1217            .collect()
1218    };
1219
1220    let new_raw = leaf_node::build_cell(key, val_type, value);
1221    match cells.binary_search_by(|(k, _)| k.as_slice().cmp(key)) {
1222        Ok(idx) => cells[idx] = (key.to_vec(), new_raw),
1223        Err(idx) => cells.insert(idx, (key.to_vec(), new_raw)),
1224    }
1225
1226    let total = cells.len();
1227
1228    let usable = citadel_core::constants::USABLE_SIZE;
1229    let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
1230    cum.push(0);
1231    for (_, raw) in &cells {
1232        cum.push(cum.last().unwrap() + raw.len());
1233    }
1234    let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
1235    let right_fits = |sp: usize| (cum[total] - cum[sp]) + (total - sp) * 2 <= usable;
1236
1237    let mut split_point = total / 2;
1238    if !left_fits(split_point) || !right_fits(split_point) {
1239        split_point = 1;
1240        for sp in 1..total {
1241            if left_fits(sp) && right_fits(sp) {
1242                split_point = sp;
1243                if sp >= total / 2 {
1244                    break;
1245                }
1246            }
1247        }
1248    }
1249
1250    let sep_key = cells[split_point].0.clone();
1251
1252    {
1253        let left_refs: Vec<&[u8]> = cells[..split_point]
1254            .iter()
1255            .map(|(_, raw)| raw.as_slice())
1256            .collect();
1257        let page = pages.get_mut(&leaf_id).unwrap();
1258        page.rebuild_cells(&left_refs);
1259    }
1260
1261    let right_id = alloc.allocate();
1262    {
1263        let mut right_page = Page::new(right_id, PageType::Leaf, txn_id);
1264        let right_refs: Vec<&[u8]> = cells[split_point..]
1265            .iter()
1266            .map(|(_, raw)| raw.as_slice())
1267            .collect();
1268        right_page.rebuild_cells(&right_refs);
1269        pages.insert(right_id, right_page);
1270    }
1271
1272    (sep_key, right_id)
1273}
1274
1275#[allow(clippy::too_many_arguments)]
1276fn propagate_split_up(
1277    pages: &mut FxHashMap<PageId, Page>,
1278    alloc: &mut PageAllocator,
1279    txn_id: TxnId,
1280    path: &[(PageId, usize)],
1281    mut left_child: PageId,
1282    initial_sep: &[u8],
1283    mut right_child: PageId,
1284    depth: &mut u16,
1285) -> PageId {
1286    let mut sep_key = initial_sep.to_vec();
1287    let mut pending_split = true;
1288
1289    for &(ancestor_id, child_idx) in path.iter().rev() {
1290        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
1291
1292        if pending_split {
1293            let ok = {
1294                let page = pages.get_mut(&new_ancestor).unwrap();
1295                branch_node::insert_separator(page, child_idx, left_child, &sep_key, right_child)
1296            };
1297
1298            if ok {
1299                pending_split = false;
1300                left_child = new_ancestor;
1301            } else {
1302                let (new_sep, new_right) = split_branch_with_insert(
1303                    pages,
1304                    alloc,
1305                    txn_id,
1306                    new_ancestor,
1307                    child_idx,
1308                    left_child,
1309                    &sep_key,
1310                    right_child,
1311                );
1312                left_child = new_ancestor;
1313                sep_key = new_sep;
1314                right_child = new_right;
1315            }
1316        } else {
1317            let page = pages.get_mut(&new_ancestor).unwrap();
1318            update_branch_child(page, child_idx, left_child);
1319            left_child = new_ancestor;
1320        }
1321    }
1322
1323    if pending_split {
1324        let new_root_id = alloc.allocate();
1325        let mut new_root = Page::new(new_root_id, PageType::Branch, txn_id);
1326        let cell = branch_node::build_cell(left_child, &sep_key);
1327        new_root.write_cell(&cell).unwrap();
1328        new_root.set_right_child(right_child);
1329        pages.insert(new_root_id, new_root);
1330        *depth += 1;
1331        new_root_id
1332    } else {
1333        left_child
1334    }
1335}
1336
1337#[allow(clippy::too_many_arguments)]
1338fn split_branch_with_insert(
1339    pages: &mut FxHashMap<PageId, Page>,
1340    alloc: &mut PageAllocator,
1341    txn_id: TxnId,
1342    branch_id: PageId,
1343    child_idx: usize,
1344    new_left: PageId,
1345    sep_key: &[u8],
1346    new_right: PageId,
1347) -> (Vec<u8>, PageId) {
1348    let (new_cells, final_right_child) = {
1349        let page = pages.get(&branch_id).unwrap();
1350        let n = page.num_cells() as usize;
1351        let cells: Vec<(PageId, Vec<u8>)> = (0..n)
1352            .map(|i| {
1353                let cell = branch_node::read_cell(page, i as u16);
1354                (cell.child, cell.key.to_vec())
1355            })
1356            .collect();
1357        let old_rc = page.right_child();
1358
1359        let mut result = Vec::with_capacity(n + 1);
1360        let final_rc;
1361
1362        if child_idx < n {
1363            let old_key = cells[child_idx].1.clone();
1364            for (i, (child, key)) in cells.into_iter().enumerate() {
1365                if i == child_idx {
1366                    result.push((new_left, sep_key.to_vec()));
1367                    result.push((new_right, old_key.clone()));
1368                } else {
1369                    result.push((child, key));
1370                }
1371            }
1372            final_rc = old_rc;
1373        } else {
1374            result = cells;
1375            result.push((new_left, sep_key.to_vec()));
1376            final_rc = new_right;
1377        }
1378
1379        (result, final_rc)
1380    };
1381
1382    let total = new_cells.len();
1383    let usable = citadel_core::constants::USABLE_SIZE;
1384    let raw_sizes: Vec<usize> = new_cells.iter().map(|(_, key)| 6 + key.len()).collect();
1385    let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
1386    cum.push(0);
1387    for &sz in &raw_sizes {
1388        cum.push(cum.last().unwrap() + sz);
1389    }
1390    let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
1391    let right_fits = |sp: usize| {
1392        let right_count = total - sp - 1;
1393        (cum[total] - cum[sp + 1]) + right_count * 2 <= usable
1394    };
1395
1396    let mut split_point = total / 2;
1397    if !left_fits(split_point) || !right_fits(split_point) {
1398        split_point = 1;
1399        for sp in 1..total.saturating_sub(1) {
1400            if left_fits(sp) && right_fits(sp) {
1401                split_point = sp;
1402                if sp >= total / 2 {
1403                    break;
1404                }
1405            }
1406        }
1407    }
1408
1409    let promoted_sep = new_cells[split_point].1.clone();
1410    let promoted_child = new_cells[split_point].0;
1411
1412    {
1413        let left_raw: Vec<Vec<u8>> = new_cells[..split_point]
1414            .iter()
1415            .map(|(child, key)| branch_node::build_cell(*child, key))
1416            .collect();
1417        let left_refs: Vec<&[u8]> = left_raw.iter().map(|c| c.as_slice()).collect();
1418        let page = pages.get_mut(&branch_id).unwrap();
1419        page.rebuild_cells(&left_refs);
1420        page.set_right_child(promoted_child);
1421    }
1422
1423    let right_branch_id = alloc.allocate();
1424    {
1425        let mut right_page = Page::new(right_branch_id, PageType::Branch, txn_id);
1426        let right_raw: Vec<Vec<u8>> = new_cells[split_point + 1..]
1427            .iter()
1428            .map(|(child, key)| branch_node::build_cell(*child, key))
1429            .collect();
1430        let right_refs: Vec<&[u8]> = right_raw.iter().map(|c| c.as_slice()).collect();
1431        right_page.rebuild_cells(&right_refs);
1432        right_page.set_right_child(final_right_child);
1433        pages.insert(right_branch_id, right_page);
1434    }
1435
1436    (promoted_sep, right_branch_id)
1437}
1438
1439fn remove_child_from_branch(page: &mut Page, child_idx: usize) {
1440    let n = page.num_cells() as usize;
1441    if child_idx < n {
1442        let cell_sz = branch_node::get_cell_size(page, child_idx as u16);
1443        page.delete_cell_at(child_idx as u16, cell_sz);
1444    } else {
1445        assert!(n > 0, "cannot remove right_child from branch with 0 cells");
1446        let last_child = branch_node::read_cell(page, (n - 1) as u16).child;
1447        let cell_sz = branch_node::get_cell_size(page, (n - 1) as u16);
1448        page.delete_cell_at((n - 1) as u16, cell_sz);
1449        page.set_right_child(last_child);
1450    }
1451}
1452
1453fn propagate_remove_up(
1454    pages: &mut FxHashMap<PageId, Page>,
1455    alloc: &mut PageAllocator,
1456    txn_id: TxnId,
1457    path: &mut [(PageId, usize)],
1458    depth: &mut u16,
1459) -> PageId {
1460    let mut level = path.len();
1461    let mut need_remove_at_level = true;
1462    let mut new_child = PageId(0);
1463
1464    while level > 0 && need_remove_at_level {
1465        level -= 1;
1466        let (ancestor_id, child_idx) = path[level];
1467        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
1468
1469        {
1470            let page = pages.get_mut(&new_ancestor).unwrap();
1471            remove_child_from_branch(page, child_idx);
1472        }
1473
1474        let num_cells = pages.get(&new_ancestor).unwrap().num_cells();
1475
1476        if num_cells > 0 || level == 0 {
1477            if num_cells == 0 && level == 0 {
1478                let only_child = pages.get(&new_ancestor).unwrap().right_child();
1479                alloc.free(new_ancestor);
1480                pages.remove(&new_ancestor);
1481                *depth -= 1;
1482                return only_child;
1483            }
1484            new_child = new_ancestor;
1485            need_remove_at_level = false;
1486        } else {
1487            let only_child = pages.get(&new_ancestor).unwrap().right_child();
1488            alloc.free(new_ancestor);
1489            pages.remove(&new_ancestor);
1490            *depth -= 1;
1491
1492            new_child = only_child;
1493            need_remove_at_level = false;
1494        }
1495    }
1496
1497    if level > 0 {
1498        let remaining_path = &mut path[..level];
1499        new_child = propagate_cow_up(pages, alloc, txn_id, remaining_path, new_child);
1500    }
1501
1502    new_child
1503}
1504
1505#[cfg(test)]
1506#[path = "btree_tests.rs"]
1507mod tests;