Skip to main content

citadel_buffer/
btree.rs

1//! CoW B+ tree engine.
2//!
3//! All mutations use Copy-on-Write: the old page is cloned to a new page ID,
4//! modified, and ancestors are updated to point to the new page. Old pages
5//! are freed via the allocator's pending-free list.
6//!
7//! The tree uses `HashMap<PageId, Page>` as the in-memory page store.
8
9use crate::allocator::PageAllocator;
10use citadel_core::types::{PageId, PageType, TxnId, ValueType};
11use citadel_core::{Error, Result};
12use citadel_page::page::Page;
13use citadel_page::{branch_node, leaf_node};
14use std::collections::HashMap;
15
16/// B+ tree metadata. Lightweight struct — pages are stored externally.
17#[derive(Clone)]
18pub struct BTree {
19    pub root: PageId,
20    pub depth: u16,
21    pub entry_count: u64,
22}
23
24impl BTree {
25    /// Create a new empty B+ tree with a single leaf root.
26    pub fn new(
27        pages: &mut HashMap<PageId, Page>,
28        alloc: &mut PageAllocator,
29        txn_id: TxnId,
30    ) -> Self {
31        let root_id = alloc.allocate();
32        let root = Page::new(root_id, PageType::Leaf, txn_id);
33        pages.insert(root_id, root);
34        Self {
35            root: root_id,
36            depth: 1,
37            entry_count: 0,
38        }
39    }
40
41    /// Create a BTree from existing metadata (e.g., loaded from commit slot).
42    pub fn from_existing(root: PageId, depth: u16, entry_count: u64) -> Self {
43        Self {
44            root,
45            depth,
46            entry_count,
47        }
48    }
49
50    /// Search for a key. Returns `Some((val_type, value))` if found, `None` otherwise.
51    pub fn search(
52        &self,
53        pages: &HashMap<PageId, Page>,
54        key: &[u8],
55    ) -> Result<Option<(ValueType, Vec<u8>)>> {
56        let mut current = self.root;
57        loop {
58            let page = pages.get(&current).ok_or(Error::PageOutOfBounds(current))?;
59            match page.page_type() {
60                Some(PageType::Leaf) => {
61                    return match leaf_node::search(page, key) {
62                        Ok(idx) => {
63                            let cell = leaf_node::read_cell(page, idx);
64                            Ok(Some((cell.val_type, cell.value.to_vec())))
65                        }
66                        Err(_) => Ok(None),
67                    };
68                }
69                Some(PageType::Branch) => {
70                    let idx = branch_node::search_child_index(page, key);
71                    current = branch_node::get_child(page, idx);
72                }
73                _ => {
74                    return Err(Error::InvalidPageType(page.page_type_raw(), current));
75                }
76            }
77        }
78    }
79
80    /// Insert a key-value pair. Returns `true` if a new entry was added,
81    /// `false` if an existing key was updated.
82    pub fn insert(
83        &mut self,
84        pages: &mut HashMap<PageId, Page>,
85        alloc: &mut PageAllocator,
86        txn_id: TxnId,
87        key: &[u8],
88        val_type: ValueType,
89        value: &[u8],
90    ) -> Result<bool> {
91        // Walk root to leaf, recording path
92        let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
93
94        // Check if key already exists
95        let key_exists = {
96            let page = pages.get(&leaf_id).unwrap();
97            leaf_node::search(page, key).is_ok()
98        };
99
100        // CoW the leaf
101        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
102
103        // Try to insert into CoW'd leaf
104        let leaf_ok = {
105            let page = pages.get_mut(&new_leaf_id).unwrap();
106            leaf_node::insert(page, key, val_type, value)
107        };
108
109        if leaf_ok {
110            // No split needed — propagate CoW up
111            self.root = propagate_cow_up(pages, alloc, txn_id, &path, new_leaf_id);
112            if !key_exists {
113                self.entry_count += 1;
114            }
115            return Ok(!key_exists);
116        }
117
118        // Leaf is full — split
119        let (sep_key, right_id) =
120            split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
121
122        // Propagate split up through ancestors
123        self.root = propagate_split_up(
124            pages,
125            alloc,
126            txn_id,
127            &path,
128            new_leaf_id,
129            &sep_key,
130            right_id,
131            &mut self.depth,
132        );
133
134        if !key_exists {
135            self.entry_count += 1;
136        }
137        Ok(!key_exists)
138    }
139
140    /// Delete a key. Returns `true` if the key was found and deleted.
141    pub fn delete(
142        &mut self,
143        pages: &mut HashMap<PageId, Page>,
144        alloc: &mut PageAllocator,
145        txn_id: TxnId,
146        key: &[u8],
147    ) -> Result<bool> {
148        // Walk root to leaf
149        let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
150
151        // Check if key exists
152        let found = {
153            let page = pages.get(&leaf_id).unwrap();
154            leaf_node::search(page, key).is_ok()
155        };
156        if !found {
157            return Ok(false);
158        }
159
160        // CoW the leaf and delete
161        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
162        {
163            let page = pages.get_mut(&new_leaf_id).unwrap();
164            leaf_node::delete(page, key);
165        }
166
167        // Check if leaf became empty
168        let leaf_empty = pages.get(&new_leaf_id).unwrap().num_cells() == 0;
169
170        if !leaf_empty || path.is_empty() {
171            // Leaf is non-empty, or it's the root (root can be empty)
172            self.root = propagate_cow_up(pages, alloc, txn_id, &path, new_leaf_id);
173            self.entry_count -= 1;
174            return Ok(true);
175        }
176
177        // Empty leaf — remove from tree
178        alloc.free(new_leaf_id);
179        pages.remove(&new_leaf_id);
180
181        // Walk up, handling the removal
182        self.root = propagate_remove_up(pages, alloc, txn_id, &path, &mut self.depth);
183        self.entry_count -= 1;
184        Ok(true)
185    }
186
187    /// Walk from root to the leaf that should contain `key`.
188    /// Returns (path, leaf_page_id) where path is Vec<(ancestor_id, child_idx)>.
189    fn walk_to_leaf(
190        &self,
191        pages: &HashMap<PageId, Page>,
192        key: &[u8],
193    ) -> Result<(Vec<(PageId, usize)>, PageId)> {
194        let mut path = Vec::with_capacity(self.depth as usize);
195        let mut current = self.root;
196        loop {
197            let page = pages.get(&current).ok_or(Error::PageOutOfBounds(current))?;
198            match page.page_type() {
199                Some(PageType::Leaf) => return Ok((path, current)),
200                Some(PageType::Branch) => {
201                    let child_idx = branch_node::search_child_index(page, key);
202                    let child = branch_node::get_child(page, child_idx);
203                    path.push((current, child_idx));
204                    current = child;
205                }
206                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
207            }
208        }
209    }
210}
211
212/// Copy-on-Write: clone a page to a new page ID, free the old one.
213fn cow_page(
214    pages: &mut HashMap<PageId, Page>,
215    alloc: &mut PageAllocator,
216    old_id: PageId,
217    txn_id: TxnId,
218) -> PageId {
219    let new_id = alloc.allocate();
220    let mut new_page = pages.get(&old_id).unwrap().clone();
221    new_page.set_page_id(new_id);
222    new_page.set_txn_id(txn_id);
223    pages.insert(new_id, new_page);
224    alloc.free(old_id);
225    new_id
226}
227
228/// Update a branch's child pointer at `child_idx` to point to `new_child`.
229fn update_branch_child(page: &mut Page, child_idx: usize, new_child: PageId) {
230    let n = page.num_cells() as usize;
231    if child_idx < n {
232        let offset = page.cell_offset(child_idx as u16) as usize;
233        page.data[offset..offset + 4].copy_from_slice(&new_child.as_u32().to_le_bytes());
234    } else {
235        page.set_right_child(new_child);
236    }
237}
238
239/// Propagate CoW up through ancestors (no split, just update child pointers).
240fn propagate_cow_up(
241    pages: &mut HashMap<PageId, Page>,
242    alloc: &mut PageAllocator,
243    txn_id: TxnId,
244    path: &[(PageId, usize)],
245    mut new_child: PageId,
246) -> PageId {
247    for &(ancestor_id, child_idx) in path.iter().rev() {
248        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
249        let page = pages.get_mut(&new_ancestor).unwrap();
250        update_branch_child(page, child_idx, new_child);
251        new_child = new_ancestor;
252    }
253    new_child
254}
255
256/// Split a full leaf and insert the new key-value pair.
257/// Returns (separator_key, right_page_id). The left page is `leaf_id` (rebuilt in place).
258fn split_leaf_with_insert(
259    pages: &mut HashMap<PageId, Page>,
260    alloc: &mut PageAllocator,
261    txn_id: TxnId,
262    leaf_id: PageId,
263    key: &[u8],
264    val_type: ValueType,
265    value: &[u8],
266) -> (Vec<u8>, PageId) {
267    // Collect all existing cells + the new cell, sorted
268    let mut cells: Vec<(Vec<u8>, Vec<u8>)> = {
269        let page = pages.get(&leaf_id).unwrap();
270        let n = page.num_cells() as usize;
271        (0..n)
272            .map(|i| {
273                let cell = leaf_node::read_cell(page, i as u16);
274                let raw = leaf_node::read_cell_bytes(page, i as u16);
275                (cell.key.to_vec(), raw)
276            })
277            .collect()
278    };
279
280    // Insert or update the new cell in sorted position
281    let new_raw = leaf_node::build_cell(key, val_type, value);
282    match cells.binary_search_by(|(k, _)| k.as_slice().cmp(key)) {
283        Ok(idx) => cells[idx] = (key.to_vec(), new_raw),
284        Err(idx) => cells.insert(idx, (key.to_vec(), new_raw)),
285    }
286
287    let total = cells.len();
288
289    // Size-aware split: ensure both halves fit within USABLE_SIZE.
290    // Simple midpoint-by-count fails when cell sizes vary significantly.
291    let usable = citadel_core::constants::USABLE_SIZE;
292    let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
293    cum.push(0);
294    for (_, raw) in &cells {
295        cum.push(cum.last().unwrap() + raw.len());
296    }
297    let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
298    let right_fits = |sp: usize| (cum[total] - cum[sp]) + (total - sp) * 2 <= usable;
299
300    let mut split_point = total / 2;
301    if !left_fits(split_point) || !right_fits(split_point) {
302        split_point = 1;
303        for sp in 1..total {
304            if left_fits(sp) && right_fits(sp) {
305                split_point = sp;
306                if sp >= total / 2 {
307                    break;
308                }
309            }
310        }
311    }
312
313    let sep_key = cells[split_point].0.clone();
314
315    // Rebuild left page with cells [0..split_point]
316    {
317        let left_refs: Vec<&[u8]> = cells[..split_point]
318            .iter()
319            .map(|(_, raw)| raw.as_slice())
320            .collect();
321        let page = pages.get_mut(&leaf_id).unwrap();
322        page.rebuild_cells(&left_refs);
323    }
324
325    // Create right page with cells [split_point..total]
326    let right_id = alloc.allocate();
327    {
328        let mut right_page = Page::new(right_id, PageType::Leaf, txn_id);
329        let right_refs: Vec<&[u8]> = cells[split_point..]
330            .iter()
331            .map(|(_, raw)| raw.as_slice())
332            .collect();
333        right_page.rebuild_cells(&right_refs);
334        pages.insert(right_id, right_page);
335    }
336
337    (sep_key, right_id)
338}
339
340/// Propagate a split upward through the ancestor chain.
341/// Returns the new root page ID.
342#[allow(clippy::too_many_arguments)]
343fn propagate_split_up(
344    pages: &mut HashMap<PageId, Page>,
345    alloc: &mut PageAllocator,
346    txn_id: TxnId,
347    path: &[(PageId, usize)],
348    mut left_child: PageId,
349    initial_sep: &[u8],
350    mut right_child: PageId,
351    depth: &mut u16,
352) -> PageId {
353    let mut sep_key = initial_sep.to_vec();
354    let mut pending_split = true;
355
356    for &(ancestor_id, child_idx) in path.iter().rev() {
357        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
358
359        if pending_split {
360            let ok = {
361                let page = pages.get_mut(&new_ancestor).unwrap();
362                branch_node::insert_separator(page, child_idx, left_child, &sep_key, right_child)
363            };
364
365            if ok {
366                pending_split = false;
367                left_child = new_ancestor;
368            } else {
369                // Branch also full — split it
370                let (new_sep, new_right) = split_branch_with_insert(
371                    pages,
372                    alloc,
373                    txn_id,
374                    new_ancestor,
375                    child_idx,
376                    left_child,
377                    &sep_key,
378                    right_child,
379                );
380                left_child = new_ancestor;
381                sep_key = new_sep;
382                right_child = new_right;
383            }
384        } else {
385            let page = pages.get_mut(&new_ancestor).unwrap();
386            update_branch_child(page, child_idx, left_child);
387            left_child = new_ancestor;
388        }
389    }
390
391    if pending_split {
392        // Create a new root
393        let new_root_id = alloc.allocate();
394        let mut new_root = Page::new(new_root_id, PageType::Branch, txn_id);
395        let cell = branch_node::build_cell(left_child, &sep_key);
396        new_root.write_cell(&cell).unwrap();
397        new_root.set_right_child(right_child);
398        pages.insert(new_root_id, new_root);
399        *depth += 1;
400        new_root_id
401    } else {
402        left_child
403    }
404}
405
406/// Split a full branch and insert a separator.
407/// Returns (promoted_separator_key, right_branch_page_id).
408/// The left branch is `branch_id` (rebuilt in place).
409#[allow(clippy::too_many_arguments)]
410fn split_branch_with_insert(
411    pages: &mut HashMap<PageId, Page>,
412    alloc: &mut PageAllocator,
413    txn_id: TxnId,
414    branch_id: PageId,
415    child_idx: usize,
416    new_left: PageId,
417    sep_key: &[u8],
418    new_right: PageId,
419) -> (Vec<u8>, PageId) {
420    // Collect all cells and apply the separator insertion logically
421    let (new_cells, final_right_child) = {
422        let page = pages.get(&branch_id).unwrap();
423        let n = page.num_cells() as usize;
424        let cells: Vec<(PageId, Vec<u8>)> = (0..n)
425            .map(|i| {
426                let cell = branch_node::read_cell(page, i as u16);
427                (cell.child, cell.key.to_vec())
428            })
429            .collect();
430        let old_rc = page.right_child();
431
432        let mut result = Vec::with_capacity(n + 1);
433        let final_rc;
434
435        if child_idx < n {
436            let old_key = cells[child_idx].1.clone();
437            for (i, (child, key)) in cells.into_iter().enumerate() {
438                if i == child_idx {
439                    result.push((new_left, sep_key.to_vec()));
440                    result.push((new_right, old_key.clone()));
441                } else {
442                    result.push((child, key));
443                }
444            }
445            final_rc = old_rc;
446        } else {
447            result = cells;
448            result.push((new_left, sep_key.to_vec()));
449            final_rc = new_right;
450        }
451
452        (result, final_rc)
453    };
454
455    // Size-aware split — the middle key is promoted.
456    // Left = [0..split_point], promoted = [split_point], right = [split_point+1..total].
457    let total = new_cells.len();
458    let usable = citadel_core::constants::USABLE_SIZE;
459    let raw_sizes: Vec<usize> = new_cells.iter().map(|(_, key)| 6 + key.len()).collect();
460    let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
461    cum.push(0);
462    for &sz in &raw_sizes {
463        cum.push(cum.last().unwrap() + sz);
464    }
465    let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
466    let right_fits = |sp: usize| {
467        let right_count = total - sp - 1;
468        (cum[total] - cum[sp + 1]) + right_count * 2 <= usable
469    };
470
471    let mut split_point = total / 2;
472    if !left_fits(split_point) || !right_fits(split_point) {
473        split_point = 1;
474        for sp in 1..total.saturating_sub(1) {
475            if left_fits(sp) && right_fits(sp) {
476                split_point = sp;
477                if sp >= total / 2 {
478                    break;
479                }
480            }
481        }
482    }
483
484    let promoted_sep = new_cells[split_point].1.clone();
485    let promoted_child = new_cells[split_point].0;
486
487    // Rebuild left branch with cells [0..split_point], right_child = promoted_child
488    {
489        let left_raw: Vec<Vec<u8>> = new_cells[..split_point]
490            .iter()
491            .map(|(child, key)| branch_node::build_cell(*child, key))
492            .collect();
493        let left_refs: Vec<&[u8]> = left_raw.iter().map(|c| c.as_slice()).collect();
494        let page = pages.get_mut(&branch_id).unwrap();
495        page.rebuild_cells(&left_refs);
496        page.set_right_child(promoted_child);
497    }
498
499    // Create right branch with cells [split_point+1..total], right_child = final_right_child
500    let right_branch_id = alloc.allocate();
501    {
502        let mut right_page = Page::new(right_branch_id, PageType::Branch, txn_id);
503        let right_raw: Vec<Vec<u8>> = new_cells[split_point + 1..]
504            .iter()
505            .map(|(child, key)| branch_node::build_cell(*child, key))
506            .collect();
507        let right_refs: Vec<&[u8]> = right_raw.iter().map(|c| c.as_slice()).collect();
508        right_page.rebuild_cells(&right_refs);
509        right_page.set_right_child(final_right_child);
510        pages.insert(right_branch_id, right_page);
511    }
512
513    (promoted_sep, right_branch_id)
514}
515
516/// Remove a child from a branch page at the given child index.
517fn remove_child_from_branch(page: &mut Page, child_idx: usize) {
518    let n = page.num_cells() as usize;
519    if child_idx < n {
520        let cell_sz = branch_node::get_cell_size(page, child_idx as u16);
521        page.delete_cell_at(child_idx as u16, cell_sz);
522    } else {
523        // Removing right_child: promote last cell's child
524        assert!(n > 0, "cannot remove right_child from branch with 0 cells");
525        let last_child = branch_node::read_cell(page, (n - 1) as u16).child;
526        let cell_sz = branch_node::get_cell_size(page, (n - 1) as u16);
527        page.delete_cell_at((n - 1) as u16, cell_sz);
528        page.set_right_child(last_child);
529    }
530}
531
532/// Propagate child removal upward through the ancestor chain.
533/// Handles cascading collapses when branches become empty.
534fn propagate_remove_up(
535    pages: &mut HashMap<PageId, Page>,
536    alloc: &mut PageAllocator,
537    txn_id: TxnId,
538    path: &[(PageId, usize)],
539    depth: &mut u16,
540) -> PageId {
541    // Process the bottom-most ancestor first (parent of the deleted leaf)
542    let mut level = path.len();
543
544    // Track what needs to replace the removed child's slot in the parent above
545    // Initially: the child was removed entirely, so we need to remove it from parent
546    let mut need_remove_at_level = true;
547
548    // Result: the page ID that should be propagated upward
549    let mut new_child = PageId(0); // placeholder, set below
550
551    while level > 0 && need_remove_at_level {
552        level -= 1;
553        let (ancestor_id, child_idx) = path[level];
554        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
555
556        {
557            let page = pages.get_mut(&new_ancestor).unwrap();
558            remove_child_from_branch(page, child_idx);
559        }
560
561        let num_cells = pages.get(&new_ancestor).unwrap().num_cells();
562
563        if num_cells > 0 || level == 0 {
564            if num_cells == 0 && level == 0 {
565                // Root collapsed — replace with its only child
566                let only_child = pages.get(&new_ancestor).unwrap().right_child();
567                alloc.free(new_ancestor);
568                pages.remove(&new_ancestor);
569                *depth -= 1;
570                return only_child;
571            }
572            // Branch is non-empty, or it's the root with cells
573            new_child = new_ancestor;
574            need_remove_at_level = false;
575        } else {
576            // Branch became empty (0 cells) — collapse to its right_child
577            let only_child = pages.get(&new_ancestor).unwrap().right_child();
578            alloc.free(new_ancestor);
579            pages.remove(&new_ancestor);
580            *depth -= 1;
581
582            // The only_child replaces this branch in the grandparent
583            // This is a pointer update (not a removal), so we stop cascading
584            new_child = only_child;
585            need_remove_at_level = false;
586        }
587    }
588
589    // Propagate CoW for remaining path levels above
590    if level > 0 {
591        let remaining_path = &path[..level];
592        new_child = propagate_cow_up(pages, alloc, txn_id, remaining_path, new_child);
593    }
594
595    new_child
596}
597
598#[cfg(test)]
599mod tests {
600    use super::*;
601
602    fn new_tree() -> (HashMap<PageId, Page>, PageAllocator, BTree) {
603        let mut pages = HashMap::new();
604        let mut alloc = PageAllocator::new(0);
605        let tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
606        (pages, alloc, tree)
607    }
608
609    #[test]
610    fn empty_tree_search() {
611        let (pages, _, tree) = new_tree();
612        assert_eq!(tree.search(&pages, b"anything").unwrap(), None);
613    }
614
615    #[test]
616    fn insert_and_search_single() {
617        let (mut pages, mut alloc, mut tree) = new_tree();
618        let is_new = tree
619            .insert(
620                &mut pages,
621                &mut alloc,
622                TxnId(1),
623                b"hello",
624                ValueType::Inline,
625                b"world",
626            )
627            .unwrap();
628        assert!(is_new);
629        assert_eq!(tree.entry_count, 1);
630
631        let result = tree.search(&pages, b"hello").unwrap();
632        assert_eq!(result, Some((ValueType::Inline, b"world".to_vec())));
633    }
634
635    #[test]
636    fn insert_update_existing() {
637        let (mut pages, mut alloc, mut tree) = new_tree();
638        tree.insert(
639            &mut pages,
640            &mut alloc,
641            TxnId(1),
642            b"key",
643            ValueType::Inline,
644            b"v1",
645        )
646        .unwrap();
647        let is_new = tree
648            .insert(
649                &mut pages,
650                &mut alloc,
651                TxnId(1),
652                b"key",
653                ValueType::Inline,
654                b"v2",
655            )
656            .unwrap();
657        assert!(!is_new);
658        assert_eq!(tree.entry_count, 1);
659
660        let result = tree.search(&pages, b"key").unwrap();
661        assert_eq!(result, Some((ValueType::Inline, b"v2".to_vec())));
662    }
663
664    #[test]
665    fn insert_multiple_sorted() {
666        let (mut pages, mut alloc, mut tree) = new_tree();
667        let keys = [b"dog", b"ant", b"cat", b"fox", b"bat", b"eel"];
668        for k in &keys {
669            tree.insert(&mut pages, &mut alloc, TxnId(1), *k, ValueType::Inline, *k)
670                .unwrap();
671        }
672        assert_eq!(tree.entry_count, 6);
673
674        // Verify all keys searchable
675        for k in &keys {
676            let result = tree.search(&pages, *k).unwrap();
677            assert_eq!(result, Some((ValueType::Inline, k.to_vec())));
678        }
679
680        // Verify non-existent key
681        assert_eq!(tree.search(&pages, b"zebra").unwrap(), None);
682    }
683
684    #[test]
685    fn insert_triggers_leaf_split() {
686        let (mut pages, mut alloc, mut tree) = new_tree();
687
688        // Insert enough keys to trigger at least one leaf split.
689        // Each leaf cell: 7 + key_len + value_len bytes + 2 bytes pointer.
690        // With 4-byte keys and 8-byte values: 7 + 4 + 8 = 19 bytes + 2 = 21 bytes per entry.
691        // Page usable space: 8096 bytes. Fits ~385 entries per leaf.
692        // We need > 385 entries to trigger a split.
693        let count = 500;
694        for i in 0..count {
695            let key = format!("key-{i:05}");
696            let val = format!("val-{i:05}");
697            tree.insert(
698                &mut pages,
699                &mut alloc,
700                TxnId(1),
701                key.as_bytes(),
702                ValueType::Inline,
703                val.as_bytes(),
704            )
705            .unwrap();
706        }
707
708        assert_eq!(tree.entry_count, count);
709        assert!(
710            tree.depth >= 2,
711            "tree should have split (depth={})",
712            tree.depth
713        );
714
715        // Verify all keys present
716        for i in 0..count {
717            let key = format!("key-{i:05}");
718            let val = format!("val-{i:05}");
719            let result = tree.search(&pages, key.as_bytes()).unwrap();
720            assert_eq!(result, Some((ValueType::Inline, val.into_bytes())));
721        }
722    }
723
724    #[test]
725    fn delete_existing_key() {
726        let (mut pages, mut alloc, mut tree) = new_tree();
727        tree.insert(
728            &mut pages,
729            &mut alloc,
730            TxnId(1),
731            b"a",
732            ValueType::Inline,
733            b"1",
734        )
735        .unwrap();
736        tree.insert(
737            &mut pages,
738            &mut alloc,
739            TxnId(1),
740            b"b",
741            ValueType::Inline,
742            b"2",
743        )
744        .unwrap();
745        tree.insert(
746            &mut pages,
747            &mut alloc,
748            TxnId(1),
749            b"c",
750            ValueType::Inline,
751            b"3",
752        )
753        .unwrap();
754
755        let found = tree.delete(&mut pages, &mut alloc, TxnId(1), b"b").unwrap();
756        assert!(found);
757        assert_eq!(tree.entry_count, 2);
758        assert_eq!(tree.search(&pages, b"b").unwrap(), None);
759        assert_eq!(
760            tree.search(&pages, b"a").unwrap(),
761            Some((ValueType::Inline, b"1".to_vec()))
762        );
763        assert_eq!(
764            tree.search(&pages, b"c").unwrap(),
765            Some((ValueType::Inline, b"3".to_vec()))
766        );
767    }
768
769    #[test]
770    fn delete_nonexistent_key() {
771        let (mut pages, mut alloc, mut tree) = new_tree();
772        tree.insert(
773            &mut pages,
774            &mut alloc,
775            TxnId(1),
776            b"a",
777            ValueType::Inline,
778            b"1",
779        )
780        .unwrap();
781        let found = tree.delete(&mut pages, &mut alloc, TxnId(1), b"z").unwrap();
782        assert!(!found);
783        assert_eq!(tree.entry_count, 1);
784    }
785
786    #[test]
787    fn delete_all_from_root_leaf() {
788        let (mut pages, mut alloc, mut tree) = new_tree();
789        tree.insert(
790            &mut pages,
791            &mut alloc,
792            TxnId(1),
793            b"x",
794            ValueType::Inline,
795            b"1",
796        )
797        .unwrap();
798        tree.delete(&mut pages, &mut alloc, TxnId(1), b"x").unwrap();
799        assert_eq!(tree.entry_count, 0);
800
801        // Root is still a valid (empty) leaf
802        let root = pages.get(&tree.root).unwrap();
803        assert_eq!(root.page_type(), Some(PageType::Leaf));
804        assert_eq!(root.num_cells(), 0);
805    }
806
807    #[test]
808    fn cow_produces_new_page_ids() {
809        let (mut pages, mut alloc, mut tree) = new_tree();
810        let root_before = tree.root;
811
812        tree.insert(
813            &mut pages,
814            &mut alloc,
815            TxnId(2),
816            b"key",
817            ValueType::Inline,
818            b"val",
819        )
820        .unwrap();
821        let root_after = tree.root;
822
823        // Root should have changed (CoW)
824        assert_ne!(root_before, root_after);
825        // Old root should have been freed via allocator
826        assert!(alloc.freed_this_txn().contains(&root_before));
827    }
828
829    #[test]
830    fn insert_and_delete_many() {
831        let (mut pages, mut alloc, mut tree) = new_tree();
832        let count = 1000u64;
833
834        // Insert
835        for i in 0..count {
836            let key = format!("k{i:06}");
837            let val = format!("v{i:06}");
838            tree.insert(
839                &mut pages,
840                &mut alloc,
841                TxnId(1),
842                key.as_bytes(),
843                ValueType::Inline,
844                val.as_bytes(),
845            )
846            .unwrap();
847        }
848        assert_eq!(tree.entry_count, count);
849
850        // Delete every other key
851        for i in (0..count).step_by(2) {
852            let key = format!("k{i:06}");
853            let found = tree
854                .delete(&mut pages, &mut alloc, TxnId(1), key.as_bytes())
855                .unwrap();
856            assert!(found);
857        }
858        assert_eq!(tree.entry_count, count / 2);
859
860        // Verify remaining keys
861        for i in 0..count {
862            let key = format!("k{i:06}");
863            let result = tree.search(&pages, key.as_bytes()).unwrap();
864            if i % 2 == 0 {
865                assert_eq!(result, None, "deleted key {key} should not be found");
866            } else {
867                let val = format!("v{i:06}");
868                assert_eq!(result, Some((ValueType::Inline, val.into_bytes())));
869            }
870        }
871    }
872
873    #[test]
874    fn deep_tree_insert_delete() {
875        let (mut pages, mut alloc, mut tree) = new_tree();
876
877        // Insert enough to create depth >= 2
878        let count = 2000u64;
879        for i in 0..count {
880            let key = format!("{i:08}");
881            tree.insert(
882                &mut pages,
883                &mut alloc,
884                TxnId(1),
885                key.as_bytes(),
886                ValueType::Inline,
887                b"v",
888            )
889            .unwrap();
890        }
891        assert!(tree.depth >= 2, "depth={} expected >= 2", tree.depth);
892        assert_eq!(tree.entry_count, count);
893
894        // Delete all
895        for i in 0..count {
896            let key = format!("{i:08}");
897            let found = tree
898                .delete(&mut pages, &mut alloc, TxnId(1), key.as_bytes())
899                .unwrap();
900            assert!(found, "key {key} should be deletable");
901        }
902        assert_eq!(tree.entry_count, 0);
903    }
904}