Skip to main content

citadeldb_buffer/
btree.rs

1//! CoW B+ tree engine.
2//!
3//! All mutations use Copy-on-Write: the old page is cloned to a new page ID,
4//! modified, and ancestors are updated to point to the new page. Old pages
5//! are freed via the allocator's pending-free list.
6//!
7//! The tree uses `HashMap<PageId, Page>` as the in-memory page store.
8
9use std::collections::HashMap;
10use citadel_core::{Error, Result};
11use citadel_core::types::{PageId, PageType, TxnId, ValueType};
12use citadel_page::page::Page;
13use citadel_page::{branch_node, leaf_node};
14use crate::allocator::PageAllocator;
15
16/// B+ tree metadata. Lightweight struct — pages are stored externally.
17#[derive(Clone)]
18pub struct BTree {
19    pub root: PageId,
20    pub depth: u16,
21    pub entry_count: u64,
22}
23
24impl BTree {
25    /// Create a new empty B+ tree with a single leaf root.
26    pub fn new(
27        pages: &mut HashMap<PageId, Page>,
28        alloc: &mut PageAllocator,
29        txn_id: TxnId,
30    ) -> Self {
31        let root_id = alloc.allocate();
32        let root = Page::new(root_id, PageType::Leaf, txn_id);
33        pages.insert(root_id, root);
34        Self {
35            root: root_id,
36            depth: 1,
37            entry_count: 0,
38        }
39    }
40
41    /// Create a BTree from existing metadata (e.g., loaded from commit slot).
42    pub fn from_existing(root: PageId, depth: u16, entry_count: u64) -> Self {
43        Self { root, depth, entry_count }
44    }
45
46    /// Search for a key. Returns `Some((val_type, value))` if found, `None` otherwise.
47    pub fn search(
48        &self,
49        pages: &HashMap<PageId, Page>,
50        key: &[u8],
51    ) -> Result<Option<(ValueType, Vec<u8>)>> {
52        let mut current = self.root;
53        loop {
54            let page = pages.get(&current)
55                .ok_or(Error::PageOutOfBounds(current))?;
56            match page.page_type() {
57                Some(PageType::Leaf) => {
58                    return match leaf_node::search(page, key) {
59                        Ok(idx) => {
60                            let cell = leaf_node::read_cell(page, idx);
61                            Ok(Some((cell.val_type, cell.value.to_vec())))
62                        }
63                        Err(_) => Ok(None),
64                    };
65                }
66                Some(PageType::Branch) => {
67                    current = branch_node::search(page, key);
68                }
69                _ => {
70                    return Err(Error::InvalidPageType(page.page_type_raw(), current));
71                }
72            }
73        }
74    }
75
76    /// Insert a key-value pair. Returns `true` if a new entry was added,
77    /// `false` if an existing key was updated.
78    pub fn insert(
79        &mut self,
80        pages: &mut HashMap<PageId, Page>,
81        alloc: &mut PageAllocator,
82        txn_id: TxnId,
83        key: &[u8],
84        val_type: ValueType,
85        value: &[u8],
86    ) -> Result<bool> {
87        // Walk root to leaf, recording path
88        let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
89
90        // Check if key already exists
91        let key_exists = {
92            let page = pages.get(&leaf_id).unwrap();
93            leaf_node::search(page, key).is_ok()
94        };
95
96        // CoW the leaf
97        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
98
99        // Try to insert into CoW'd leaf
100        let leaf_ok = {
101            let page = pages.get_mut(&new_leaf_id).unwrap();
102            leaf_node::insert(page, key, val_type, value)
103        };
104
105        if leaf_ok {
106            // No split needed — propagate CoW up
107            self.root = propagate_cow_up(pages, alloc, txn_id, &path, new_leaf_id);
108            if !key_exists {
109                self.entry_count += 1;
110            }
111            return Ok(!key_exists);
112        }
113
114        // Leaf is full — split
115        let (sep_key, right_id) = split_leaf_with_insert(
116            pages, alloc, txn_id, new_leaf_id, key, val_type, value,
117        );
118
119        // Propagate split up through ancestors
120        self.root = propagate_split_up(
121            pages, alloc, txn_id, &path, new_leaf_id, &sep_key, right_id,
122            &mut self.depth,
123        );
124
125        if !key_exists {
126            self.entry_count += 1;
127        }
128        Ok(!key_exists)
129    }
130
131    /// Delete a key. Returns `true` if the key was found and deleted.
132    pub fn delete(
133        &mut self,
134        pages: &mut HashMap<PageId, Page>,
135        alloc: &mut PageAllocator,
136        txn_id: TxnId,
137        key: &[u8],
138    ) -> Result<bool> {
139        // Walk root to leaf
140        let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
141
142        // Check if key exists
143        let found = {
144            let page = pages.get(&leaf_id).unwrap();
145            leaf_node::search(page, key).is_ok()
146        };
147        if !found {
148            return Ok(false);
149        }
150
151        // CoW the leaf and delete
152        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
153        {
154            let page = pages.get_mut(&new_leaf_id).unwrap();
155            leaf_node::delete(page, key);
156        }
157
158        // Check if leaf became empty
159        let leaf_empty = pages.get(&new_leaf_id).unwrap().num_cells() == 0;
160
161        if !leaf_empty || path.is_empty() {
162            // Leaf is non-empty, or it's the root (root can be empty)
163            self.root = propagate_cow_up(pages, alloc, txn_id, &path, new_leaf_id);
164            self.entry_count -= 1;
165            return Ok(true);
166        }
167
168        // Empty leaf — remove from tree
169        alloc.free(new_leaf_id);
170        pages.remove(&new_leaf_id);
171
172        // Walk up, handling the removal
173        self.root = propagate_remove_up(
174            pages, alloc, txn_id, &path, &mut self.depth,
175        );
176        self.entry_count -= 1;
177        Ok(true)
178    }
179
180    /// Walk from root to the leaf that should contain `key`.
181    /// Returns (path, leaf_page_id) where path is Vec<(ancestor_id, child_idx)>.
182    fn walk_to_leaf(
183        &self,
184        pages: &HashMap<PageId, Page>,
185        key: &[u8],
186    ) -> Result<(Vec<(PageId, usize)>, PageId)> {
187        let mut path = Vec::with_capacity(self.depth as usize);
188        let mut current = self.root;
189        loop {
190            let page = pages.get(&current)
191                .ok_or(Error::PageOutOfBounds(current))?;
192            match page.page_type() {
193                Some(PageType::Leaf) => return Ok((path, current)),
194                Some(PageType::Branch) => {
195                    let child_idx = branch_node::search_child_index(page, key);
196                    let child = branch_node::get_child(page, child_idx);
197                    path.push((current, child_idx));
198                    current = child;
199                }
200                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
201            }
202        }
203    }
204}
205
206/// Copy-on-Write: clone a page to a new page ID, free the old one.
207fn cow_page(
208    pages: &mut HashMap<PageId, Page>,
209    alloc: &mut PageAllocator,
210    old_id: PageId,
211    txn_id: TxnId,
212) -> PageId {
213    let new_id = alloc.allocate();
214    let mut new_page = pages.get(&old_id).unwrap().clone();
215    new_page.set_page_id(new_id);
216    new_page.set_txn_id(txn_id);
217    pages.insert(new_id, new_page);
218    alloc.free(old_id);
219    new_id
220}
221
222/// Update a branch's child pointer at `child_idx` to point to `new_child`.
223fn update_branch_child(page: &mut Page, child_idx: usize, new_child: PageId) {
224    let n = page.num_cells() as usize;
225    if child_idx < n {
226        let offset = page.cell_offset(child_idx as u16) as usize;
227        page.data[offset..offset + 4].copy_from_slice(&new_child.as_u32().to_le_bytes());
228    } else {
229        page.set_right_child(new_child);
230    }
231}
232
233/// Propagate CoW up through ancestors (no split, just update child pointers).
234fn propagate_cow_up(
235    pages: &mut HashMap<PageId, Page>,
236    alloc: &mut PageAllocator,
237    txn_id: TxnId,
238    path: &[(PageId, usize)],
239    mut new_child: PageId,
240) -> PageId {
241    for &(ancestor_id, child_idx) in path.iter().rev() {
242        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
243        let page = pages.get_mut(&new_ancestor).unwrap();
244        update_branch_child(page, child_idx, new_child);
245        new_child = new_ancestor;
246    }
247    new_child
248}
249
250/// Split a full leaf and insert the new key-value pair.
251/// Returns (separator_key, right_page_id). The left page is `leaf_id` (rebuilt in place).
252fn split_leaf_with_insert(
253    pages: &mut HashMap<PageId, Page>,
254    alloc: &mut PageAllocator,
255    txn_id: TxnId,
256    leaf_id: PageId,
257    key: &[u8],
258    val_type: ValueType,
259    value: &[u8],
260) -> (Vec<u8>, PageId) {
261    // Collect all existing cells + the new cell, sorted
262    let mut cells: Vec<(Vec<u8>, Vec<u8>)> = {
263        let page = pages.get(&leaf_id).unwrap();
264        let n = page.num_cells() as usize;
265        (0..n).map(|i| {
266            let cell = leaf_node::read_cell(page, i as u16);
267            let raw = leaf_node::read_cell_bytes(page, i as u16);
268            (cell.key.to_vec(), raw)
269        }).collect()
270    };
271
272    // Insert or update the new cell in sorted position
273    let new_raw = leaf_node::build_cell(key, val_type, value);
274    match cells.binary_search_by(|(k, _)| k.as_slice().cmp(key)) {
275        Ok(idx) => cells[idx] = (key.to_vec(), new_raw),
276        Err(idx) => cells.insert(idx, (key.to_vec(), new_raw)),
277    }
278
279    let total = cells.len();
280
281    // Size-aware split: ensure both halves fit within USABLE_SIZE.
282    // Simple midpoint-by-count fails when cell sizes vary significantly.
283    let usable = citadel_core::constants::USABLE_SIZE;
284    let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
285    cum.push(0);
286    for (_, raw) in &cells {
287        cum.push(cum.last().unwrap() + raw.len());
288    }
289    let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
290    let right_fits = |sp: usize| (cum[total] - cum[sp]) + (total - sp) * 2 <= usable;
291
292    let mut split_point = total / 2;
293    if !left_fits(split_point) || !right_fits(split_point) {
294        split_point = 1;
295        for sp in 1..total {
296            if left_fits(sp) && right_fits(sp) {
297                split_point = sp;
298                if sp >= total / 2 {
299                    break;
300                }
301            }
302        }
303    }
304
305    let sep_key = cells[split_point].0.clone();
306
307    // Rebuild left page with cells [0..split_point]
308    {
309        let left_refs: Vec<&[u8]> = cells[..split_point].iter()
310            .map(|(_, raw)| raw.as_slice()).collect();
311        let page = pages.get_mut(&leaf_id).unwrap();
312        page.rebuild_cells(&left_refs);
313    }
314
315    // Create right page with cells [split_point..total]
316    let right_id = alloc.allocate();
317    {
318        let mut right_page = Page::new(right_id, PageType::Leaf, txn_id);
319        let right_refs: Vec<&[u8]> = cells[split_point..].iter()
320            .map(|(_, raw)| raw.as_slice()).collect();
321        right_page.rebuild_cells(&right_refs);
322        pages.insert(right_id, right_page);
323    }
324
325    (sep_key, right_id)
326}
327
328/// Propagate a split upward through the ancestor chain.
329/// Returns the new root page ID.
330fn propagate_split_up(
331    pages: &mut HashMap<PageId, Page>,
332    alloc: &mut PageAllocator,
333    txn_id: TxnId,
334    path: &[(PageId, usize)],
335    mut left_child: PageId,
336    initial_sep: &[u8],
337    mut right_child: PageId,
338    depth: &mut u16,
339) -> PageId {
340    let mut sep_key = initial_sep.to_vec();
341    let mut pending_split = true;
342
343    for &(ancestor_id, child_idx) in path.iter().rev() {
344        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
345
346        if pending_split {
347            let ok = {
348                let page = pages.get_mut(&new_ancestor).unwrap();
349                branch_node::insert_separator(
350                    page, child_idx, left_child, &sep_key, right_child,
351                )
352            };
353
354            if ok {
355                pending_split = false;
356                left_child = new_ancestor;
357            } else {
358                // Branch also full — split it
359                let (new_sep, new_right) = split_branch_with_insert(
360                    pages, alloc, txn_id, new_ancestor, child_idx,
361                    left_child, &sep_key, right_child,
362                );
363                left_child = new_ancestor;
364                sep_key = new_sep;
365                right_child = new_right;
366            }
367        } else {
368            let page = pages.get_mut(&new_ancestor).unwrap();
369            update_branch_child(page, child_idx, left_child);
370            left_child = new_ancestor;
371        }
372    }
373
374    if pending_split {
375        // Create a new root
376        let new_root_id = alloc.allocate();
377        let mut new_root = Page::new(new_root_id, PageType::Branch, txn_id);
378        let cell = branch_node::build_cell(left_child, &sep_key);
379        new_root.write_cell(&cell).unwrap();
380        new_root.set_right_child(right_child);
381        pages.insert(new_root_id, new_root);
382        *depth += 1;
383        new_root_id
384    } else {
385        left_child
386    }
387}
388
389/// Split a full branch and insert a separator.
390/// Returns (promoted_separator_key, right_branch_page_id).
391/// The left branch is `branch_id` (rebuilt in place).
392fn split_branch_with_insert(
393    pages: &mut HashMap<PageId, Page>,
394    alloc: &mut PageAllocator,
395    txn_id: TxnId,
396    branch_id: PageId,
397    child_idx: usize,
398    new_left: PageId,
399    sep_key: &[u8],
400    new_right: PageId,
401) -> (Vec<u8>, PageId) {
402    // Collect all cells and apply the separator insertion logically
403    let (new_cells, final_right_child) = {
404        let page = pages.get(&branch_id).unwrap();
405        let n = page.num_cells() as usize;
406        let cells: Vec<(PageId, Vec<u8>)> = (0..n).map(|i| {
407            let cell = branch_node::read_cell(page, i as u16);
408            (cell.child, cell.key.to_vec())
409        }).collect();
410        let old_rc = page.right_child();
411
412        let mut result = Vec::with_capacity(n + 1);
413        let final_rc;
414
415        if child_idx < n {
416            let old_key = cells[child_idx].1.clone();
417            for (i, (child, key)) in cells.into_iter().enumerate() {
418                if i == child_idx {
419                    result.push((new_left, sep_key.to_vec()));
420                    result.push((new_right, old_key.clone()));
421                } else {
422                    result.push((child, key));
423                }
424            }
425            final_rc = old_rc;
426        } else {
427            result = cells;
428            result.push((new_left, sep_key.to_vec()));
429            final_rc = new_right;
430        }
431
432        (result, final_rc)
433    };
434
435    // Size-aware split — the middle key is promoted.
436    // Left = [0..split_point], promoted = [split_point], right = [split_point+1..total].
437    let total = new_cells.len();
438    let usable = citadel_core::constants::USABLE_SIZE;
439    let raw_sizes: Vec<usize> = new_cells.iter().map(|(_, key)| 6 + key.len()).collect();
440    let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
441    cum.push(0);
442    for &sz in &raw_sizes {
443        cum.push(cum.last().unwrap() + sz);
444    }
445    let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
446    let right_fits = |sp: usize| {
447        let right_count = total - sp - 1;
448        (cum[total] - cum[sp + 1]) + right_count * 2 <= usable
449    };
450
451    let mut split_point = total / 2;
452    if !left_fits(split_point) || !right_fits(split_point) {
453        split_point = 1;
454        for sp in 1..total.saturating_sub(1) {
455            if left_fits(sp) && right_fits(sp) {
456                split_point = sp;
457                if sp >= total / 2 {
458                    break;
459                }
460            }
461        }
462    }
463
464    let promoted_sep = new_cells[split_point].1.clone();
465    let promoted_child = new_cells[split_point].0;
466
467    // Rebuild left branch with cells [0..split_point], right_child = promoted_child
468    {
469        let left_raw: Vec<Vec<u8>> = new_cells[..split_point].iter()
470            .map(|(child, key)| branch_node::build_cell(*child, key))
471            .collect();
472        let left_refs: Vec<&[u8]> = left_raw.iter().map(|c| c.as_slice()).collect();
473        let page = pages.get_mut(&branch_id).unwrap();
474        page.rebuild_cells(&left_refs);
475        page.set_right_child(promoted_child);
476    }
477
478    // Create right branch with cells [split_point+1..total], right_child = final_right_child
479    let right_branch_id = alloc.allocate();
480    {
481        let mut right_page = Page::new(right_branch_id, PageType::Branch, txn_id);
482        let right_raw: Vec<Vec<u8>> = new_cells[split_point + 1..].iter()
483            .map(|(child, key)| branch_node::build_cell(*child, key))
484            .collect();
485        let right_refs: Vec<&[u8]> = right_raw.iter().map(|c| c.as_slice()).collect();
486        right_page.rebuild_cells(&right_refs);
487        right_page.set_right_child(final_right_child);
488        pages.insert(right_branch_id, right_page);
489    }
490
491    (promoted_sep, right_branch_id)
492}
493
494/// Remove a child from a branch page at the given child index.
495fn remove_child_from_branch(page: &mut Page, child_idx: usize) {
496    let n = page.num_cells() as usize;
497    if child_idx < n {
498        let cell_sz = branch_node::get_cell_size(page, child_idx as u16);
499        page.delete_cell_at(child_idx as u16, cell_sz);
500    } else {
501        // Removing right_child: promote last cell's child
502        assert!(n > 0, "cannot remove right_child from branch with 0 cells");
503        let last_child = branch_node::read_cell(page, (n - 1) as u16).child;
504        let cell_sz = branch_node::get_cell_size(page, (n - 1) as u16);
505        page.delete_cell_at((n - 1) as u16, cell_sz);
506        page.set_right_child(last_child);
507    }
508}
509
510/// Propagate child removal upward through the ancestor chain.
511/// Handles cascading collapses when branches become empty.
512fn propagate_remove_up(
513    pages: &mut HashMap<PageId, Page>,
514    alloc: &mut PageAllocator,
515    txn_id: TxnId,
516    path: &[(PageId, usize)],
517    depth: &mut u16,
518) -> PageId {
519    // Process the bottom-most ancestor first (parent of the deleted leaf)
520    let mut level = path.len();
521
522    // Track what needs to replace the removed child's slot in the parent above
523    // Initially: the child was removed entirely, so we need to remove it from parent
524    let mut need_remove_at_level = true;
525
526    // Result: the page ID that should be propagated upward
527    let mut new_child = PageId(0); // placeholder, set below
528
529    while level > 0 && need_remove_at_level {
530        level -= 1;
531        let (ancestor_id, child_idx) = path[level];
532        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
533
534        {
535            let page = pages.get_mut(&new_ancestor).unwrap();
536            remove_child_from_branch(page, child_idx);
537        }
538
539        let num_cells = pages.get(&new_ancestor).unwrap().num_cells();
540
541        if num_cells > 0 || level == 0 {
542            if num_cells == 0 && level == 0 {
543                // Root collapsed — replace with its only child
544                let only_child = pages.get(&new_ancestor).unwrap().right_child();
545                alloc.free(new_ancestor);
546                pages.remove(&new_ancestor);
547                *depth -= 1;
548                return only_child;
549            }
550            // Branch is non-empty, or it's the root with cells
551            new_child = new_ancestor;
552            need_remove_at_level = false;
553        } else {
554            // Branch became empty (0 cells) — collapse to its right_child
555            let only_child = pages.get(&new_ancestor).unwrap().right_child();
556            alloc.free(new_ancestor);
557            pages.remove(&new_ancestor);
558            *depth -= 1;
559
560            // The only_child replaces this branch in the grandparent
561            // This is a pointer update (not a removal), so we stop cascading
562            new_child = only_child;
563            need_remove_at_level = false;
564        }
565    }
566
567    // Propagate CoW for remaining path levels above
568    if level > 0 {
569        let remaining_path = &path[..level];
570        new_child = propagate_cow_up(pages, alloc, txn_id, remaining_path, new_child);
571    }
572
573    new_child
574}
575
576#[cfg(test)]
577mod tests {
578    use super::*;
579
580    fn new_tree() -> (HashMap<PageId, Page>, PageAllocator, BTree) {
581        let mut pages = HashMap::new();
582        let mut alloc = PageAllocator::new(0);
583        let tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
584        (pages, alloc, tree)
585    }
586
587    #[test]
588    fn empty_tree_search() {
589        let (pages, _, tree) = new_tree();
590        assert_eq!(tree.search(&pages, b"anything").unwrap(), None);
591    }
592
593    #[test]
594    fn insert_and_search_single() {
595        let (mut pages, mut alloc, mut tree) = new_tree();
596        let is_new = tree.insert(&mut pages, &mut alloc, TxnId(1), b"hello", ValueType::Inline, b"world").unwrap();
597        assert!(is_new);
598        assert_eq!(tree.entry_count, 1);
599
600        let result = tree.search(&pages, b"hello").unwrap();
601        assert_eq!(result, Some((ValueType::Inline, b"world".to_vec())));
602    }
603
604    #[test]
605    fn insert_update_existing() {
606        let (mut pages, mut alloc, mut tree) = new_tree();
607        tree.insert(&mut pages, &mut alloc, TxnId(1), b"key", ValueType::Inline, b"v1").unwrap();
608        let is_new = tree.insert(&mut pages, &mut alloc, TxnId(1), b"key", ValueType::Inline, b"v2").unwrap();
609        assert!(!is_new);
610        assert_eq!(tree.entry_count, 1);
611
612        let result = tree.search(&pages, b"key").unwrap();
613        assert_eq!(result, Some((ValueType::Inline, b"v2".to_vec())));
614    }
615
616    #[test]
617    fn insert_multiple_sorted() {
618        let (mut pages, mut alloc, mut tree) = new_tree();
619        let keys = [b"dog", b"ant", b"cat", b"fox", b"bat", b"eel"];
620        for k in &keys {
621            tree.insert(&mut pages, &mut alloc, TxnId(1), *k, ValueType::Inline, *k).unwrap();
622        }
623        assert_eq!(tree.entry_count, 6);
624
625        // Verify all keys searchable
626        for k in &keys {
627            let result = tree.search(&pages, *k).unwrap();
628            assert_eq!(result, Some((ValueType::Inline, k.to_vec())));
629        }
630
631        // Verify non-existent key
632        assert_eq!(tree.search(&pages, b"zebra").unwrap(), None);
633    }
634
635    #[test]
636    fn insert_triggers_leaf_split() {
637        let (mut pages, mut alloc, mut tree) = new_tree();
638
639        // Insert enough keys to trigger at least one leaf split.
640        // Each leaf cell: 7 + key_len + value_len bytes + 2 bytes pointer.
641        // With 4-byte keys and 8-byte values: 7 + 4 + 8 = 19 bytes + 2 = 21 bytes per entry.
642        // Page usable space: 8096 bytes. Fits ~385 entries per leaf.
643        // We need > 385 entries to trigger a split.
644        let count = 500;
645        for i in 0..count {
646            let key = format!("key-{i:05}");
647            let val = format!("val-{i:05}");
648            tree.insert(
649                &mut pages, &mut alloc, TxnId(1),
650                key.as_bytes(), ValueType::Inline, val.as_bytes(),
651            ).unwrap();
652        }
653
654        assert_eq!(tree.entry_count, count);
655        assert!(tree.depth >= 2, "tree should have split (depth={})", tree.depth);
656
657        // Verify all keys present
658        for i in 0..count {
659            let key = format!("key-{i:05}");
660            let val = format!("val-{i:05}");
661            let result = tree.search(&pages, key.as_bytes()).unwrap();
662            assert_eq!(result, Some((ValueType::Inline, val.into_bytes())));
663        }
664    }
665
666    #[test]
667    fn delete_existing_key() {
668        let (mut pages, mut alloc, mut tree) = new_tree();
669        tree.insert(&mut pages, &mut alloc, TxnId(1), b"a", ValueType::Inline, b"1").unwrap();
670        tree.insert(&mut pages, &mut alloc, TxnId(1), b"b", ValueType::Inline, b"2").unwrap();
671        tree.insert(&mut pages, &mut alloc, TxnId(1), b"c", ValueType::Inline, b"3").unwrap();
672
673        let found = tree.delete(&mut pages, &mut alloc, TxnId(1), b"b").unwrap();
674        assert!(found);
675        assert_eq!(tree.entry_count, 2);
676        assert_eq!(tree.search(&pages, b"b").unwrap(), None);
677        assert_eq!(tree.search(&pages, b"a").unwrap(), Some((ValueType::Inline, b"1".to_vec())));
678        assert_eq!(tree.search(&pages, b"c").unwrap(), Some((ValueType::Inline, b"3".to_vec())));
679    }
680
681    #[test]
682    fn delete_nonexistent_key() {
683        let (mut pages, mut alloc, mut tree) = new_tree();
684        tree.insert(&mut pages, &mut alloc, TxnId(1), b"a", ValueType::Inline, b"1").unwrap();
685        let found = tree.delete(&mut pages, &mut alloc, TxnId(1), b"z").unwrap();
686        assert!(!found);
687        assert_eq!(tree.entry_count, 1);
688    }
689
690    #[test]
691    fn delete_all_from_root_leaf() {
692        let (mut pages, mut alloc, mut tree) = new_tree();
693        tree.insert(&mut pages, &mut alloc, TxnId(1), b"x", ValueType::Inline, b"1").unwrap();
694        tree.delete(&mut pages, &mut alloc, TxnId(1), b"x").unwrap();
695        assert_eq!(tree.entry_count, 0);
696
697        // Root is still a valid (empty) leaf
698        let root = pages.get(&tree.root).unwrap();
699        assert_eq!(root.page_type(), Some(PageType::Leaf));
700        assert_eq!(root.num_cells(), 0);
701    }
702
703    #[test]
704    fn cow_produces_new_page_ids() {
705        let (mut pages, mut alloc, mut tree) = new_tree();
706        let root_before = tree.root;
707
708        tree.insert(&mut pages, &mut alloc, TxnId(2), b"key", ValueType::Inline, b"val").unwrap();
709        let root_after = tree.root;
710
711        // Root should have changed (CoW)
712        assert_ne!(root_before, root_after);
713        // Old root should have been freed via allocator
714        assert!(alloc.freed_this_txn().contains(&root_before));
715    }
716
717    #[test]
718    fn insert_and_delete_many() {
719        let (mut pages, mut alloc, mut tree) = new_tree();
720        let count = 1000u64;
721
722        // Insert
723        for i in 0..count {
724            let key = format!("k{i:06}");
725            let val = format!("v{i:06}");
726            tree.insert(&mut pages, &mut alloc, TxnId(1), key.as_bytes(), ValueType::Inline, val.as_bytes()).unwrap();
727        }
728        assert_eq!(tree.entry_count, count);
729
730        // Delete every other key
731        for i in (0..count).step_by(2) {
732            let key = format!("k{i:06}");
733            let found = tree.delete(&mut pages, &mut alloc, TxnId(1), key.as_bytes()).unwrap();
734            assert!(found);
735        }
736        assert_eq!(tree.entry_count, count / 2);
737
738        // Verify remaining keys
739        for i in 0..count {
740            let key = format!("k{i:06}");
741            let result = tree.search(&pages, key.as_bytes()).unwrap();
742            if i % 2 == 0 {
743                assert_eq!(result, None, "deleted key {key} should not be found");
744            } else {
745                let val = format!("v{i:06}");
746                assert_eq!(result, Some((ValueType::Inline, val.into_bytes())));
747            }
748        }
749    }
750
751    #[test]
752    fn deep_tree_insert_delete() {
753        let (mut pages, mut alloc, mut tree) = new_tree();
754
755        // Insert enough to create depth >= 2
756        let count = 2000u64;
757        for i in 0..count {
758            let key = format!("{i:08}");
759            tree.insert(&mut pages, &mut alloc, TxnId(1), key.as_bytes(), ValueType::Inline, b"v").unwrap();
760        }
761        assert!(tree.depth >= 2, "depth={} expected >= 2", tree.depth);
762        assert_eq!(tree.entry_count, count);
763
764        // Delete all
765        for i in 0..count {
766            let key = format!("{i:08}");
767            let found = tree.delete(&mut pages, &mut alloc, TxnId(1), key.as_bytes()).unwrap();
768            assert!(found, "key {key} should be deletable");
769        }
770        assert_eq!(tree.entry_count, 0);
771    }
772}