Skip to main content

citadel_buffer/
btree.rs

1//! CoW B+ tree engine. Mutations clone pages; old pages go to pending-free list.
2
3use crate::allocator::PageAllocator;
4use citadel_core::types::{PageId, PageType, TxnId, ValueType};
5use citadel_core::{Error, Result};
6use citadel_page::page::Page;
7use citadel_page::{branch_node, leaf_node};
8use rustc_hash::FxHashMap;
9
10/// B+ tree metadata. Lightweight struct - pages are stored externally.
11#[derive(Clone)]
12pub struct BTree {
13    pub root: PageId,
14    pub depth: u16,
15    pub entry_count: u64,
16    last_insert: Option<(Vec<(PageId, usize)>, PageId)>,
17}
18
19impl BTree {
20    /// Create a new empty B+ tree with a single leaf root.
21    pub fn new(
22        pages: &mut FxHashMap<PageId, Page>,
23        alloc: &mut PageAllocator,
24        txn_id: TxnId,
25    ) -> Self {
26        let root_id = alloc.allocate();
27        let root = Page::new(root_id, PageType::Leaf, txn_id);
28        pages.insert(root_id, root);
29        Self {
30            root: root_id,
31            depth: 1,
32            entry_count: 0,
33            last_insert: None,
34        }
35    }
36
37    /// Create a BTree from existing metadata (e.g., loaded from commit slot).
38    pub fn from_existing(root: PageId, depth: u16, entry_count: u64) -> Self {
39        Self {
40            root,
41            depth,
42            entry_count,
43            last_insert: None,
44        }
45    }
46
47    /// Search for a key. Returns `Some((val_type, value))` if found, `None` otherwise.
48    pub fn search(
49        &self,
50        pages: &FxHashMap<PageId, Page>,
51        key: &[u8],
52    ) -> Result<Option<(ValueType, Vec<u8>)>> {
53        let mut current = self.root;
54        loop {
55            let page = pages.get(&current).ok_or(Error::PageOutOfBounds(current))?;
56            match page.page_type() {
57                Some(PageType::Leaf) => {
58                    return match leaf_node::search(page, key) {
59                        Ok(idx) => {
60                            let cell = leaf_node::read_cell(page, idx);
61                            Ok(Some((cell.val_type, cell.value.to_vec())))
62                        }
63                        Err(_) => Ok(None),
64                    };
65                }
66                Some(PageType::Branch) => {
67                    let idx = branch_node::search_child_index(page, key);
68                    current = branch_node::get_child(page, idx);
69                }
70                _ => {
71                    return Err(Error::InvalidPageType(page.page_type_raw(), current));
72                }
73            }
74        }
75    }
76
77    pub fn lil_would_hit(&self, pages: &FxHashMap<PageId, Page>, key: &[u8]) -> bool {
78        if let Some((_, cached_leaf)) = &self.last_insert {
79            if let Some(page) = pages.get(cached_leaf) {
80                let n = page.num_cells();
81                return n > 0 && key > leaf_node::read_cell(page, n - 1).key;
82            }
83        }
84        false
85    }
86
87    /// Insert key-value. Returns `true` if new, `false` if updated existing.
88    pub fn insert(
89        &mut self,
90        pages: &mut FxHashMap<PageId, Page>,
91        alloc: &mut PageAllocator,
92        txn_id: TxnId,
93        key: &[u8],
94        val_type: ValueType,
95        value: &[u8],
96    ) -> Result<bool> {
97        // LIL cache: skip walk_to_leaf for sequential appends to the rightmost leaf.
98        if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
99            let (hit, needs_cow) = {
100                let page = pages
101                    .get(&cached_leaf)
102                    .ok_or(Error::PageOutOfBounds(cached_leaf))?;
103                let n = page.num_cells();
104                let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
105                let nc = page.txn_id() != txn_id;
106                (h, nc)
107            };
108            if hit {
109                let cow_id = if needs_cow {
110                    cow_page(pages, alloc, cached_leaf, txn_id)
111                } else {
112                    cached_leaf
113                };
114                let ok = {
115                    let page = pages.get_mut(&cow_id).unwrap();
116                    leaf_node::insert_direct(page, key, val_type, value)
117                };
118                if ok {
119                    if cow_id != cached_leaf {
120                        self.root =
121                            propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
122                    }
123                    self.entry_count += 1;
124                    self.last_insert = Some((cached_path, cow_id));
125                    return Ok(true);
126                }
127                let (sep_key, right_id) =
128                    split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
129                self.root = propagate_split_up(
130                    pages,
131                    alloc,
132                    txn_id,
133                    &cached_path,
134                    cow_id,
135                    &sep_key,
136                    right_id,
137                    &mut self.depth,
138                );
139                self.last_insert = None;
140                self.entry_count += 1;
141                return Ok(true);
142            }
143        }
144
145        let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
146
147        let key_exists = {
148            let page = pages.get(&leaf_id).unwrap();
149            leaf_node::search(page, key).is_ok()
150        };
151
152        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
153
154        let leaf_ok = {
155            let page = pages.get_mut(&new_leaf_id).unwrap();
156            leaf_node::insert_direct(page, key, val_type, value)
157        };
158
159        if leaf_ok {
160            // In-place mode: leaf PageId unchanged, no need to CoW ancestors.
161            // Only walk path to compute is_rightmost for LIL cache.
162            if alloc.in_place() && new_leaf_id == leaf_id {
163                let mut is_rightmost = true;
164                for &(ancestor_id, child_idx) in path.iter().rev() {
165                    let page = pages.get(&ancestor_id).unwrap();
166                    if child_idx != page.num_cells() as usize {
167                        is_rightmost = false;
168                        break;
169                    }
170                }
171                if is_rightmost {
172                    self.last_insert = Some((path, new_leaf_id));
173                }
174                if !key_exists {
175                    self.entry_count += 1;
176                }
177                return Ok(!key_exists);
178            }
179            let mut child = new_leaf_id;
180            let mut is_rightmost = true;
181            let mut new_path = path;
182            for i in (0..new_path.len()).rev() {
183                let (ancestor_id, child_idx) = new_path[i];
184                let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
185                let page = pages.get_mut(&new_ancestor).unwrap();
186                update_branch_child(page, child_idx, child);
187                if child_idx != page.num_cells() as usize {
188                    is_rightmost = false;
189                }
190                new_path[i] = (new_ancestor, child_idx);
191                child = new_ancestor;
192            }
193            self.root = child;
194
195            if is_rightmost {
196                self.last_insert = Some((new_path, new_leaf_id));
197            }
198
199            if !key_exists {
200                self.entry_count += 1;
201            }
202            return Ok(!key_exists);
203        }
204
205        self.last_insert = None;
206        let (sep_key, right_id) =
207            split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
208        self.root = propagate_split_up(
209            pages,
210            alloc,
211            txn_id,
212            &path,
213            new_leaf_id,
214            &sep_key,
215            right_id,
216            &mut self.depth,
217        );
218
219        if !key_exists {
220            self.entry_count += 1;
221        }
222        Ok(!key_exists)
223    }
224
225    /// Bulk-update existing keys. Keys must be sorted.
226    pub fn update_sorted(
227        &mut self,
228        pages: &mut FxHashMap<PageId, Page>,
229        alloc: &mut PageAllocator,
230        txn_id: TxnId,
231        pairs: &[(&[u8], &[u8])],
232    ) -> Result<u64> {
233        if pairs.is_empty() {
234            return Ok(0);
235        }
236        self.last_insert = None;
237
238        let (mut path, mut leaf_id) = self.walk_to_leaf(pages, pairs[0].0)?;
239        let mut cow_leaf = cow_page(pages, alloc, leaf_id, txn_id);
240        if cow_leaf != leaf_id {
241            self.root = propagate_cow_up(pages, alloc, txn_id, &mut path, cow_leaf);
242        }
243
244        let mut count: u64 = 0;
245        let mut hint: u16 = 0;
246
247        for &(key, value) in pairs {
248            let past_leaf = {
249                let page = pages.get(&cow_leaf).unwrap();
250                let n = page.num_cells();
251                n == 0 || key > leaf_node::read_cell(page, n - 1).key
252            };
253
254            if past_leaf {
255                let (new_path, new_leaf) = self.walk_to_leaf(pages, key)?;
256                path = new_path;
257                leaf_id = new_leaf;
258                cow_leaf = cow_page(pages, alloc, leaf_id, txn_id);
259                if cow_leaf != leaf_id {
260                    self.root = propagate_cow_up(pages, alloc, txn_id, &mut path, cow_leaf);
261                }
262                hint = 0;
263            }
264
265            let page = pages.get(&cow_leaf).unwrap();
266            let n = page.num_cells();
267            let idx = {
268                let mut i = hint;
269                loop {
270                    if i >= n {
271                        break None;
272                    }
273                    let cell = leaf_node::read_cell(page, i);
274                    match key.cmp(cell.key) {
275                        std::cmp::Ordering::Equal => break Some(i),
276                        std::cmp::Ordering::Less => break None,
277                        std::cmp::Ordering::Greater => i += 1,
278                    }
279                }
280            };
281
282            if let Some(idx) = idx {
283                hint = idx + 1;
284                let page = pages.get_mut(&cow_leaf).unwrap();
285                if !leaf_node::update_value_in_place(page, idx, ValueType::Inline, value) {
286                    leaf_node::insert_direct(page, key, ValueType::Inline, value);
287                }
288                count += 1;
289            }
290        }
291
292        Ok(count)
293    }
294
295    /// Delete a key. Returns `true` if the key was found and deleted.
296    pub fn delete(
297        &mut self,
298        pages: &mut FxHashMap<PageId, Page>,
299        alloc: &mut PageAllocator,
300        txn_id: TxnId,
301        key: &[u8],
302    ) -> Result<bool> {
303        self.last_insert = None;
304        let (mut path, leaf_id) = self.walk_to_leaf(pages, key)?;
305
306        let found = {
307            let page = pages.get(&leaf_id).unwrap();
308            leaf_node::search(page, key).is_ok()
309        };
310        if !found {
311            return Ok(false);
312        }
313
314        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
315        {
316            let page = pages.get_mut(&new_leaf_id).unwrap();
317            leaf_node::delete(page, key);
318        }
319
320        let leaf_empty = pages.get(&new_leaf_id).unwrap().num_cells() == 0;
321
322        if !leaf_empty || path.is_empty() {
323            if alloc.in_place() && new_leaf_id == leaf_id {
324                self.entry_count -= 1;
325                return Ok(true);
326            }
327            self.root = propagate_cow_up(pages, alloc, txn_id, &mut path, new_leaf_id);
328            self.entry_count -= 1;
329            return Ok(true);
330        }
331
332        alloc.free(new_leaf_id);
333        pages.remove(&new_leaf_id);
334
335        self.root = propagate_remove_up(pages, alloc, txn_id, &mut path, &mut self.depth);
336        self.entry_count -= 1;
337        Ok(true)
338    }
339
340    /// Walk root to leaf for `key`. Returns (path, leaf_page_id).
341    pub fn walk_to_leaf(
342        &self,
343        pages: &FxHashMap<PageId, Page>,
344        key: &[u8],
345    ) -> Result<(Vec<(PageId, usize)>, PageId)> {
346        let mut path = Vec::with_capacity(self.depth as usize);
347        let mut current = self.root;
348        loop {
349            let page = pages.get(&current).ok_or(Error::PageOutOfBounds(current))?;
350            match page.page_type() {
351                Some(PageType::Leaf) => return Ok((path, current)),
352                Some(PageType::Branch) => {
353                    let child_idx = branch_node::search_child_index(page, key);
354                    let child = branch_node::get_child(page, child_idx);
355                    path.push((current, child_idx));
356                    current = child;
357                }
358                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
359            }
360        }
361    }
362}
363
364/// CoW a page. No-op if already owned by this txn. In-place mode reuses page ID.
365pub fn cow_page(
366    pages: &mut FxHashMap<PageId, Page>,
367    alloc: &mut PageAllocator,
368    old_id: PageId,
369    txn_id: TxnId,
370) -> PageId {
371    if alloc.in_place() {
372        let page = pages.get_mut(&old_id).unwrap();
373        if page.txn_id() != txn_id {
374            page.set_txn_id(txn_id);
375        }
376        return old_id;
377    }
378    let mut new_page = {
379        let page = pages.get(&old_id).unwrap();
380        if page.txn_id() == txn_id {
381            return old_id;
382        }
383        page.clone()
384    };
385    let new_id = alloc.allocate();
386    new_page.set_page_id(new_id);
387    new_page.set_txn_id(txn_id);
388    pages.insert(new_id, new_page);
389    alloc.free(old_id);
390    new_id
391}
392
393/// Update a branch's child pointer at `child_idx` to point to `new_child`.
394fn update_branch_child(page: &mut Page, child_idx: usize, new_child: PageId) {
395    let n = page.num_cells() as usize;
396    if child_idx < n {
397        let offset = page.cell_offset(child_idx as u16) as usize;
398        page.data[offset..offset + 4].copy_from_slice(&new_child.as_u32().to_le_bytes());
399    } else {
400        page.set_right_child(new_child);
401    }
402}
403
404/// Propagate CoW up through ancestors. Updates `path` in place so callers
405/// caching the path (e.g. LIL) reuse current PageIds after CoW — critical
406/// across SAVEPOINT boundaries where txn_id changes invalidate the cache.
407pub fn propagate_cow_up(
408    pages: &mut FxHashMap<PageId, Page>,
409    alloc: &mut PageAllocator,
410    txn_id: TxnId,
411    path: &mut [(PageId, usize)],
412    mut new_child: PageId,
413) -> PageId {
414    for i in (0..path.len()).rev() {
415        let (ancestor_id, child_idx) = path[i];
416        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
417        let page = pages.get_mut(&new_ancestor).unwrap();
418        update_branch_child(page, child_idx, new_child);
419        path[i] = (new_ancestor, child_idx);
420        new_child = new_ancestor;
421    }
422    new_child
423}
424
425/// Split full leaf and insert. Returns (separator_key, right_page_id).
426fn split_leaf_with_insert(
427    pages: &mut FxHashMap<PageId, Page>,
428    alloc: &mut PageAllocator,
429    txn_id: TxnId,
430    leaf_id: PageId,
431    key: &[u8],
432    val_type: ValueType,
433    value: &[u8],
434) -> (Vec<u8>, PageId) {
435    let mut cells: Vec<(Vec<u8>, Vec<u8>)> = {
436        let page = pages.get(&leaf_id).unwrap();
437        let n = page.num_cells() as usize;
438        (0..n)
439            .map(|i| {
440                let cell = leaf_node::read_cell(page, i as u16);
441                let raw = leaf_node::read_cell_bytes(page, i as u16);
442                (cell.key.to_vec(), raw)
443            })
444            .collect()
445    };
446
447    let new_raw = leaf_node::build_cell(key, val_type, value);
448    match cells.binary_search_by(|(k, _)| k.as_slice().cmp(key)) {
449        Ok(idx) => cells[idx] = (key.to_vec(), new_raw),
450        Err(idx) => cells.insert(idx, (key.to_vec(), new_raw)),
451    }
452
453    let total = cells.len();
454
455    let usable = citadel_core::constants::USABLE_SIZE;
456    let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
457    cum.push(0);
458    for (_, raw) in &cells {
459        cum.push(cum.last().unwrap() + raw.len());
460    }
461    let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
462    let right_fits = |sp: usize| (cum[total] - cum[sp]) + (total - sp) * 2 <= usable;
463
464    let mut split_point = total / 2;
465    if !left_fits(split_point) || !right_fits(split_point) {
466        split_point = 1;
467        for sp in 1..total {
468            if left_fits(sp) && right_fits(sp) {
469                split_point = sp;
470                if sp >= total / 2 {
471                    break;
472                }
473            }
474        }
475    }
476
477    let sep_key = cells[split_point].0.clone();
478
479    {
480        let left_refs: Vec<&[u8]> = cells[..split_point]
481            .iter()
482            .map(|(_, raw)| raw.as_slice())
483            .collect();
484        let page = pages.get_mut(&leaf_id).unwrap();
485        page.rebuild_cells(&left_refs);
486    }
487
488    let right_id = alloc.allocate();
489    {
490        let mut right_page = Page::new(right_id, PageType::Leaf, txn_id);
491        let right_refs: Vec<&[u8]> = cells[split_point..]
492            .iter()
493            .map(|(_, raw)| raw.as_slice())
494            .collect();
495        right_page.rebuild_cells(&right_refs);
496        pages.insert(right_id, right_page);
497    }
498
499    (sep_key, right_id)
500}
501
502#[allow(clippy::too_many_arguments)]
503fn propagate_split_up(
504    pages: &mut FxHashMap<PageId, Page>,
505    alloc: &mut PageAllocator,
506    txn_id: TxnId,
507    path: &[(PageId, usize)],
508    mut left_child: PageId,
509    initial_sep: &[u8],
510    mut right_child: PageId,
511    depth: &mut u16,
512) -> PageId {
513    let mut sep_key = initial_sep.to_vec();
514    let mut pending_split = true;
515
516    for &(ancestor_id, child_idx) in path.iter().rev() {
517        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
518
519        if pending_split {
520            let ok = {
521                let page = pages.get_mut(&new_ancestor).unwrap();
522                branch_node::insert_separator(page, child_idx, left_child, &sep_key, right_child)
523            };
524
525            if ok {
526                pending_split = false;
527                left_child = new_ancestor;
528            } else {
529                let (new_sep, new_right) = split_branch_with_insert(
530                    pages,
531                    alloc,
532                    txn_id,
533                    new_ancestor,
534                    child_idx,
535                    left_child,
536                    &sep_key,
537                    right_child,
538                );
539                left_child = new_ancestor;
540                sep_key = new_sep;
541                right_child = new_right;
542            }
543        } else {
544            let page = pages.get_mut(&new_ancestor).unwrap();
545            update_branch_child(page, child_idx, left_child);
546            left_child = new_ancestor;
547        }
548    }
549
550    if pending_split {
551        let new_root_id = alloc.allocate();
552        let mut new_root = Page::new(new_root_id, PageType::Branch, txn_id);
553        let cell = branch_node::build_cell(left_child, &sep_key);
554        new_root.write_cell(&cell).unwrap();
555        new_root.set_right_child(right_child);
556        pages.insert(new_root_id, new_root);
557        *depth += 1;
558        new_root_id
559    } else {
560        left_child
561    }
562}
563
564#[allow(clippy::too_many_arguments)]
565fn split_branch_with_insert(
566    pages: &mut FxHashMap<PageId, Page>,
567    alloc: &mut PageAllocator,
568    txn_id: TxnId,
569    branch_id: PageId,
570    child_idx: usize,
571    new_left: PageId,
572    sep_key: &[u8],
573    new_right: PageId,
574) -> (Vec<u8>, PageId) {
575    let (new_cells, final_right_child) = {
576        let page = pages.get(&branch_id).unwrap();
577        let n = page.num_cells() as usize;
578        let cells: Vec<(PageId, Vec<u8>)> = (0..n)
579            .map(|i| {
580                let cell = branch_node::read_cell(page, i as u16);
581                (cell.child, cell.key.to_vec())
582            })
583            .collect();
584        let old_rc = page.right_child();
585
586        let mut result = Vec::with_capacity(n + 1);
587        let final_rc;
588
589        if child_idx < n {
590            let old_key = cells[child_idx].1.clone();
591            for (i, (child, key)) in cells.into_iter().enumerate() {
592                if i == child_idx {
593                    result.push((new_left, sep_key.to_vec()));
594                    result.push((new_right, old_key.clone()));
595                } else {
596                    result.push((child, key));
597                }
598            }
599            final_rc = old_rc;
600        } else {
601            result = cells;
602            result.push((new_left, sep_key.to_vec()));
603            final_rc = new_right;
604        }
605
606        (result, final_rc)
607    };
608
609    let total = new_cells.len();
610    let usable = citadel_core::constants::USABLE_SIZE;
611    let raw_sizes: Vec<usize> = new_cells.iter().map(|(_, key)| 6 + key.len()).collect();
612    let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
613    cum.push(0);
614    for &sz in &raw_sizes {
615        cum.push(cum.last().unwrap() + sz);
616    }
617    let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
618    let right_fits = |sp: usize| {
619        let right_count = total - sp - 1;
620        (cum[total] - cum[sp + 1]) + right_count * 2 <= usable
621    };
622
623    let mut split_point = total / 2;
624    if !left_fits(split_point) || !right_fits(split_point) {
625        split_point = 1;
626        for sp in 1..total.saturating_sub(1) {
627            if left_fits(sp) && right_fits(sp) {
628                split_point = sp;
629                if sp >= total / 2 {
630                    break;
631                }
632            }
633        }
634    }
635
636    let promoted_sep = new_cells[split_point].1.clone();
637    let promoted_child = new_cells[split_point].0;
638
639    {
640        let left_raw: Vec<Vec<u8>> = new_cells[..split_point]
641            .iter()
642            .map(|(child, key)| branch_node::build_cell(*child, key))
643            .collect();
644        let left_refs: Vec<&[u8]> = left_raw.iter().map(|c| c.as_slice()).collect();
645        let page = pages.get_mut(&branch_id).unwrap();
646        page.rebuild_cells(&left_refs);
647        page.set_right_child(promoted_child);
648    }
649
650    let right_branch_id = alloc.allocate();
651    {
652        let mut right_page = Page::new(right_branch_id, PageType::Branch, txn_id);
653        let right_raw: Vec<Vec<u8>> = new_cells[split_point + 1..]
654            .iter()
655            .map(|(child, key)| branch_node::build_cell(*child, key))
656            .collect();
657        let right_refs: Vec<&[u8]> = right_raw.iter().map(|c| c.as_slice()).collect();
658        right_page.rebuild_cells(&right_refs);
659        right_page.set_right_child(final_right_child);
660        pages.insert(right_branch_id, right_page);
661    }
662
663    (promoted_sep, right_branch_id)
664}
665
666fn remove_child_from_branch(page: &mut Page, child_idx: usize) {
667    let n = page.num_cells() as usize;
668    if child_idx < n {
669        let cell_sz = branch_node::get_cell_size(page, child_idx as u16);
670        page.delete_cell_at(child_idx as u16, cell_sz);
671    } else {
672        assert!(n > 0, "cannot remove right_child from branch with 0 cells");
673        let last_child = branch_node::read_cell(page, (n - 1) as u16).child;
674        let cell_sz = branch_node::get_cell_size(page, (n - 1) as u16);
675        page.delete_cell_at((n - 1) as u16, cell_sz);
676        page.set_right_child(last_child);
677    }
678}
679
680fn propagate_remove_up(
681    pages: &mut FxHashMap<PageId, Page>,
682    alloc: &mut PageAllocator,
683    txn_id: TxnId,
684    path: &mut [(PageId, usize)],
685    depth: &mut u16,
686) -> PageId {
687    let mut level = path.len();
688    let mut need_remove_at_level = true;
689    let mut new_child = PageId(0);
690
691    while level > 0 && need_remove_at_level {
692        level -= 1;
693        let (ancestor_id, child_idx) = path[level];
694        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
695
696        {
697            let page = pages.get_mut(&new_ancestor).unwrap();
698            remove_child_from_branch(page, child_idx);
699        }
700
701        let num_cells = pages.get(&new_ancestor).unwrap().num_cells();
702
703        if num_cells > 0 || level == 0 {
704            if num_cells == 0 && level == 0 {
705                // Root collapsed - replace with its only child
706                let only_child = pages.get(&new_ancestor).unwrap().right_child();
707                alloc.free(new_ancestor);
708                pages.remove(&new_ancestor);
709                *depth -= 1;
710                return only_child;
711            }
712            // Branch is non-empty, or it's the root with cells
713            new_child = new_ancestor;
714            need_remove_at_level = false;
715        } else {
716            // Branch became empty (0 cells) - collapse to its right_child
717            let only_child = pages.get(&new_ancestor).unwrap().right_child();
718            alloc.free(new_ancestor);
719            pages.remove(&new_ancestor);
720            *depth -= 1;
721
722            new_child = only_child;
723            need_remove_at_level = false;
724        }
725    }
726
727    if level > 0 {
728        let remaining_path = &mut path[..level];
729        new_child = propagate_cow_up(pages, alloc, txn_id, remaining_path, new_child);
730    }
731
732    new_child
733}
734
735#[cfg(test)]
736mod tests {
737    use super::*;
738
739    fn new_tree() -> (FxHashMap<PageId, Page>, PageAllocator, BTree) {
740        let mut pages = FxHashMap::default();
741        let mut alloc = PageAllocator::new(0);
742        let tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
743        (pages, alloc, tree)
744    }
745
746    #[test]
747    fn empty_tree_search() {
748        let (pages, _, tree) = new_tree();
749        assert_eq!(tree.search(&pages, b"anything").unwrap(), None);
750    }
751
752    #[test]
753    fn insert_and_search_single() {
754        let (mut pages, mut alloc, mut tree) = new_tree();
755        let is_new = tree
756            .insert(
757                &mut pages,
758                &mut alloc,
759                TxnId(1),
760                b"hello",
761                ValueType::Inline,
762                b"world",
763            )
764            .unwrap();
765        assert!(is_new);
766        assert_eq!(tree.entry_count, 1);
767
768        let result = tree.search(&pages, b"hello").unwrap();
769        assert_eq!(result, Some((ValueType::Inline, b"world".to_vec())));
770    }
771
772    #[test]
773    fn insert_update_existing() {
774        let (mut pages, mut alloc, mut tree) = new_tree();
775        tree.insert(
776            &mut pages,
777            &mut alloc,
778            TxnId(1),
779            b"key",
780            ValueType::Inline,
781            b"v1",
782        )
783        .unwrap();
784        let is_new = tree
785            .insert(
786                &mut pages,
787                &mut alloc,
788                TxnId(1),
789                b"key",
790                ValueType::Inline,
791                b"v2",
792            )
793            .unwrap();
794        assert!(!is_new);
795        assert_eq!(tree.entry_count, 1);
796
797        let result = tree.search(&pages, b"key").unwrap();
798        assert_eq!(result, Some((ValueType::Inline, b"v2".to_vec())));
799    }
800
801    #[test]
802    fn insert_multiple_sorted() {
803        let (mut pages, mut alloc, mut tree) = new_tree();
804        let keys = [b"dog", b"ant", b"cat", b"fox", b"bat", b"eel"];
805        for k in &keys {
806            tree.insert(&mut pages, &mut alloc, TxnId(1), *k, ValueType::Inline, *k)
807                .unwrap();
808        }
809        assert_eq!(tree.entry_count, 6);
810
811        for k in &keys {
812            let result = tree.search(&pages, *k).unwrap();
813            assert_eq!(result, Some((ValueType::Inline, k.to_vec())));
814        }
815
816        assert_eq!(tree.search(&pages, b"zebra").unwrap(), None);
817    }
818
819    #[test]
820    fn insert_triggers_leaf_split() {
821        let (mut pages, mut alloc, mut tree) = new_tree();
822
823        // 500 entries exceeds per-leaf capacity and forces a split.
824        let count = 500;
825        for i in 0..count {
826            let key = format!("key-{i:05}");
827            let val = format!("val-{i:05}");
828            tree.insert(
829                &mut pages,
830                &mut alloc,
831                TxnId(1),
832                key.as_bytes(),
833                ValueType::Inline,
834                val.as_bytes(),
835            )
836            .unwrap();
837        }
838
839        assert_eq!(tree.entry_count, count);
840        assert!(
841            tree.depth >= 2,
842            "tree should have split (depth={})",
843            tree.depth
844        );
845
846        for i in 0..count {
847            let key = format!("key-{i:05}");
848            let val = format!("val-{i:05}");
849            let result = tree.search(&pages, key.as_bytes()).unwrap();
850            assert_eq!(result, Some((ValueType::Inline, val.into_bytes())));
851        }
852    }
853
854    #[test]
855    fn delete_existing_key() {
856        let (mut pages, mut alloc, mut tree) = new_tree();
857        tree.insert(
858            &mut pages,
859            &mut alloc,
860            TxnId(1),
861            b"a",
862            ValueType::Inline,
863            b"1",
864        )
865        .unwrap();
866        tree.insert(
867            &mut pages,
868            &mut alloc,
869            TxnId(1),
870            b"b",
871            ValueType::Inline,
872            b"2",
873        )
874        .unwrap();
875        tree.insert(
876            &mut pages,
877            &mut alloc,
878            TxnId(1),
879            b"c",
880            ValueType::Inline,
881            b"3",
882        )
883        .unwrap();
884
885        let found = tree.delete(&mut pages, &mut alloc, TxnId(1), b"b").unwrap();
886        assert!(found);
887        assert_eq!(tree.entry_count, 2);
888        assert_eq!(tree.search(&pages, b"b").unwrap(), None);
889        assert_eq!(
890            tree.search(&pages, b"a").unwrap(),
891            Some((ValueType::Inline, b"1".to_vec()))
892        );
893        assert_eq!(
894            tree.search(&pages, b"c").unwrap(),
895            Some((ValueType::Inline, b"3".to_vec()))
896        );
897    }
898
899    #[test]
900    fn delete_nonexistent_key() {
901        let (mut pages, mut alloc, mut tree) = new_tree();
902        tree.insert(
903            &mut pages,
904            &mut alloc,
905            TxnId(1),
906            b"a",
907            ValueType::Inline,
908            b"1",
909        )
910        .unwrap();
911        let found = tree.delete(&mut pages, &mut alloc, TxnId(1), b"z").unwrap();
912        assert!(!found);
913        assert_eq!(tree.entry_count, 1);
914    }
915
916    #[test]
917    fn delete_all_from_root_leaf() {
918        let (mut pages, mut alloc, mut tree) = new_tree();
919        tree.insert(
920            &mut pages,
921            &mut alloc,
922            TxnId(1),
923            b"x",
924            ValueType::Inline,
925            b"1",
926        )
927        .unwrap();
928        tree.delete(&mut pages, &mut alloc, TxnId(1), b"x").unwrap();
929        assert_eq!(tree.entry_count, 0);
930
931        let root = pages.get(&tree.root).unwrap();
932        assert_eq!(root.page_type(), Some(PageType::Leaf));
933        assert_eq!(root.num_cells(), 0);
934    }
935
936    #[test]
937    fn cow_produces_new_page_ids() {
938        let (mut pages, mut alloc, mut tree) = new_tree();
939        let root_before = tree.root;
940
941        tree.insert(
942            &mut pages,
943            &mut alloc,
944            TxnId(2),
945            b"key",
946            ValueType::Inline,
947            b"val",
948        )
949        .unwrap();
950        let root_after = tree.root;
951
952        assert_ne!(root_before, root_after);
953        assert!(alloc.freed_this_txn().contains(&root_before));
954    }
955
956    #[test]
957    fn insert_and_delete_many() {
958        let (mut pages, mut alloc, mut tree) = new_tree();
959        let count = 1000u64;
960
961        for i in 0..count {
962            let key = format!("k{i:06}");
963            let val = format!("v{i:06}");
964            tree.insert(
965                &mut pages,
966                &mut alloc,
967                TxnId(1),
968                key.as_bytes(),
969                ValueType::Inline,
970                val.as_bytes(),
971            )
972            .unwrap();
973        }
974        assert_eq!(tree.entry_count, count);
975
976        for i in (0..count).step_by(2) {
977            let key = format!("k{i:06}");
978            let found = tree
979                .delete(&mut pages, &mut alloc, TxnId(1), key.as_bytes())
980                .unwrap();
981            assert!(found);
982        }
983        assert_eq!(tree.entry_count, count / 2);
984
985        for i in 0..count {
986            let key = format!("k{i:06}");
987            let result = tree.search(&pages, key.as_bytes()).unwrap();
988            if i % 2 == 0 {
989                assert_eq!(result, None, "deleted key {key} should not be found");
990            } else {
991                let val = format!("v{i:06}");
992                assert_eq!(result, Some((ValueType::Inline, val.into_bytes())));
993            }
994        }
995    }
996
997    #[test]
998    fn deep_tree_insert_delete() {
999        let (mut pages, mut alloc, mut tree) = new_tree();
1000
1001        let count = 2000u64;
1002        for i in 0..count {
1003            let key = format!("{i:08}");
1004            tree.insert(
1005                &mut pages,
1006                &mut alloc,
1007                TxnId(1),
1008                key.as_bytes(),
1009                ValueType::Inline,
1010                b"v",
1011            )
1012            .unwrap();
1013        }
1014        assert!(tree.depth >= 2, "depth={} expected >= 2", tree.depth);
1015        assert_eq!(tree.entry_count, count);
1016
1017        for i in 0..count {
1018            let key = format!("{i:08}");
1019            let found = tree
1020                .delete(&mut pages, &mut alloc, TxnId(1), key.as_bytes())
1021                .unwrap();
1022            assert!(found, "key {key} should be deletable");
1023        }
1024        assert_eq!(tree.entry_count, 0);
1025    }
1026}