Skip to main content

citadel_buffer/
btree.rs

1//! CoW B+ tree engine.
2//!
3//! All mutations use Copy-on-Write: the old page is cloned to a new page ID,
4//! modified, and ancestors are updated to point to the new page. Old pages
5//! are freed via the allocator's pending-free list.
6//!
7//! The tree uses `HashMap<PageId, Page>` as the in-memory page store.
8
9use crate::allocator::PageAllocator;
10use citadel_core::types::{PageId, PageType, TxnId, ValueType};
11use citadel_core::{Error, Result};
12use citadel_page::page::Page;
13use citadel_page::{branch_node, leaf_node};
14use std::collections::HashMap;
15
16/// B+ tree metadata. Lightweight struct — pages are stored externally.
17#[derive(Clone)]
18pub struct BTree {
19    pub root: PageId,
20    pub depth: u16,
21    pub entry_count: u64,
22}
23
24impl BTree {
25    /// Create a new empty B+ tree with a single leaf root.
26    pub fn new(
27        pages: &mut HashMap<PageId, Page>,
28        alloc: &mut PageAllocator,
29        txn_id: TxnId,
30    ) -> Self {
31        let root_id = alloc.allocate();
32        let root = Page::new(root_id, PageType::Leaf, txn_id);
33        pages.insert(root_id, root);
34        Self {
35            root: root_id,
36            depth: 1,
37            entry_count: 0,
38        }
39    }
40
41    /// Create a BTree from existing metadata (e.g., loaded from commit slot).
42    pub fn from_existing(root: PageId, depth: u16, entry_count: u64) -> Self {
43        Self {
44            root,
45            depth,
46            entry_count,
47        }
48    }
49
50    /// Search for a key. Returns `Some((val_type, value))` if found, `None` otherwise.
51    pub fn search(
52        &self,
53        pages: &HashMap<PageId, Page>,
54        key: &[u8],
55    ) -> Result<Option<(ValueType, Vec<u8>)>> {
56        let mut current = self.root;
57        loop {
58            let page = pages.get(&current).ok_or(Error::PageOutOfBounds(current))?;
59            match page.page_type() {
60                Some(PageType::Leaf) => {
61                    return match leaf_node::search(page, key) {
62                        Ok(idx) => {
63                            let cell = leaf_node::read_cell(page, idx);
64                            Ok(Some((cell.val_type, cell.value.to_vec())))
65                        }
66                        Err(_) => Ok(None),
67                    };
68                }
69                Some(PageType::Branch) => {
70                    current = branch_node::search(page, key);
71                }
72                _ => {
73                    return Err(Error::InvalidPageType(page.page_type_raw(), current));
74                }
75            }
76        }
77    }
78
79    /// Insert a key-value pair. Returns `true` if a new entry was added,
80    /// `false` if an existing key was updated.
81    pub fn insert(
82        &mut self,
83        pages: &mut HashMap<PageId, Page>,
84        alloc: &mut PageAllocator,
85        txn_id: TxnId,
86        key: &[u8],
87        val_type: ValueType,
88        value: &[u8],
89    ) -> Result<bool> {
90        // Walk root to leaf, recording path
91        let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
92
93        // Check if key already exists
94        let key_exists = {
95            let page = pages.get(&leaf_id).unwrap();
96            leaf_node::search(page, key).is_ok()
97        };
98
99        // CoW the leaf
100        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
101
102        // Try to insert into CoW'd leaf
103        let leaf_ok = {
104            let page = pages.get_mut(&new_leaf_id).unwrap();
105            leaf_node::insert(page, key, val_type, value)
106        };
107
108        if leaf_ok {
109            // No split needed — propagate CoW up
110            self.root = propagate_cow_up(pages, alloc, txn_id, &path, new_leaf_id);
111            if !key_exists {
112                self.entry_count += 1;
113            }
114            return Ok(!key_exists);
115        }
116
117        // Leaf is full — split
118        let (sep_key, right_id) =
119            split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
120
121        // Propagate split up through ancestors
122        self.root = propagate_split_up(
123            pages,
124            alloc,
125            txn_id,
126            &path,
127            new_leaf_id,
128            &sep_key,
129            right_id,
130            &mut self.depth,
131        );
132
133        if !key_exists {
134            self.entry_count += 1;
135        }
136        Ok(!key_exists)
137    }
138
139    /// Delete a key. Returns `true` if the key was found and deleted.
140    pub fn delete(
141        &mut self,
142        pages: &mut HashMap<PageId, Page>,
143        alloc: &mut PageAllocator,
144        txn_id: TxnId,
145        key: &[u8],
146    ) -> Result<bool> {
147        // Walk root to leaf
148        let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
149
150        // Check if key exists
151        let found = {
152            let page = pages.get(&leaf_id).unwrap();
153            leaf_node::search(page, key).is_ok()
154        };
155        if !found {
156            return Ok(false);
157        }
158
159        // CoW the leaf and delete
160        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
161        {
162            let page = pages.get_mut(&new_leaf_id).unwrap();
163            leaf_node::delete(page, key);
164        }
165
166        // Check if leaf became empty
167        let leaf_empty = pages.get(&new_leaf_id).unwrap().num_cells() == 0;
168
169        if !leaf_empty || path.is_empty() {
170            // Leaf is non-empty, or it's the root (root can be empty)
171            self.root = propagate_cow_up(pages, alloc, txn_id, &path, new_leaf_id);
172            self.entry_count -= 1;
173            return Ok(true);
174        }
175
176        // Empty leaf — remove from tree
177        alloc.free(new_leaf_id);
178        pages.remove(&new_leaf_id);
179
180        // Walk up, handling the removal
181        self.root = propagate_remove_up(pages, alloc, txn_id, &path, &mut self.depth);
182        self.entry_count -= 1;
183        Ok(true)
184    }
185
186    /// Walk from root to the leaf that should contain `key`.
187    /// Returns (path, leaf_page_id) where path is Vec<(ancestor_id, child_idx)>.
188    fn walk_to_leaf(
189        &self,
190        pages: &HashMap<PageId, Page>,
191        key: &[u8],
192    ) -> Result<(Vec<(PageId, usize)>, PageId)> {
193        let mut path = Vec::with_capacity(self.depth as usize);
194        let mut current = self.root;
195        loop {
196            let page = pages.get(&current).ok_or(Error::PageOutOfBounds(current))?;
197            match page.page_type() {
198                Some(PageType::Leaf) => return Ok((path, current)),
199                Some(PageType::Branch) => {
200                    let child_idx = branch_node::search_child_index(page, key);
201                    let child = branch_node::get_child(page, child_idx);
202                    path.push((current, child_idx));
203                    current = child;
204                }
205                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
206            }
207        }
208    }
209}
210
211/// Copy-on-Write: clone a page to a new page ID, free the old one.
212fn cow_page(
213    pages: &mut HashMap<PageId, Page>,
214    alloc: &mut PageAllocator,
215    old_id: PageId,
216    txn_id: TxnId,
217) -> PageId {
218    let new_id = alloc.allocate();
219    let mut new_page = pages.get(&old_id).unwrap().clone();
220    new_page.set_page_id(new_id);
221    new_page.set_txn_id(txn_id);
222    pages.insert(new_id, new_page);
223    alloc.free(old_id);
224    new_id
225}
226
227/// Update a branch's child pointer at `child_idx` to point to `new_child`.
228fn update_branch_child(page: &mut Page, child_idx: usize, new_child: PageId) {
229    let n = page.num_cells() as usize;
230    if child_idx < n {
231        let offset = page.cell_offset(child_idx as u16) as usize;
232        page.data[offset..offset + 4].copy_from_slice(&new_child.as_u32().to_le_bytes());
233    } else {
234        page.set_right_child(new_child);
235    }
236}
237
238/// Propagate CoW up through ancestors (no split, just update child pointers).
239fn propagate_cow_up(
240    pages: &mut HashMap<PageId, Page>,
241    alloc: &mut PageAllocator,
242    txn_id: TxnId,
243    path: &[(PageId, usize)],
244    mut new_child: PageId,
245) -> PageId {
246    for &(ancestor_id, child_idx) in path.iter().rev() {
247        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
248        let page = pages.get_mut(&new_ancestor).unwrap();
249        update_branch_child(page, child_idx, new_child);
250        new_child = new_ancestor;
251    }
252    new_child
253}
254
255/// Split a full leaf and insert the new key-value pair.
256/// Returns (separator_key, right_page_id). The left page is `leaf_id` (rebuilt in place).
257fn split_leaf_with_insert(
258    pages: &mut HashMap<PageId, Page>,
259    alloc: &mut PageAllocator,
260    txn_id: TxnId,
261    leaf_id: PageId,
262    key: &[u8],
263    val_type: ValueType,
264    value: &[u8],
265) -> (Vec<u8>, PageId) {
266    // Collect all existing cells + the new cell, sorted
267    let mut cells: Vec<(Vec<u8>, Vec<u8>)> = {
268        let page = pages.get(&leaf_id).unwrap();
269        let n = page.num_cells() as usize;
270        (0..n)
271            .map(|i| {
272                let cell = leaf_node::read_cell(page, i as u16);
273                let raw = leaf_node::read_cell_bytes(page, i as u16);
274                (cell.key.to_vec(), raw)
275            })
276            .collect()
277    };
278
279    // Insert or update the new cell in sorted position
280    let new_raw = leaf_node::build_cell(key, val_type, value);
281    match cells.binary_search_by(|(k, _)| k.as_slice().cmp(key)) {
282        Ok(idx) => cells[idx] = (key.to_vec(), new_raw),
283        Err(idx) => cells.insert(idx, (key.to_vec(), new_raw)),
284    }
285
286    let total = cells.len();
287
288    // Size-aware split: ensure both halves fit within USABLE_SIZE.
289    // Simple midpoint-by-count fails when cell sizes vary significantly.
290    let usable = citadel_core::constants::USABLE_SIZE;
291    let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
292    cum.push(0);
293    for (_, raw) in &cells {
294        cum.push(cum.last().unwrap() + raw.len());
295    }
296    let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
297    let right_fits = |sp: usize| (cum[total] - cum[sp]) + (total - sp) * 2 <= usable;
298
299    let mut split_point = total / 2;
300    if !left_fits(split_point) || !right_fits(split_point) {
301        split_point = 1;
302        for sp in 1..total {
303            if left_fits(sp) && right_fits(sp) {
304                split_point = sp;
305                if sp >= total / 2 {
306                    break;
307                }
308            }
309        }
310    }
311
312    let sep_key = cells[split_point].0.clone();
313
314    // Rebuild left page with cells [0..split_point]
315    {
316        let left_refs: Vec<&[u8]> = cells[..split_point]
317            .iter()
318            .map(|(_, raw)| raw.as_slice())
319            .collect();
320        let page = pages.get_mut(&leaf_id).unwrap();
321        page.rebuild_cells(&left_refs);
322    }
323
324    // Create right page with cells [split_point..total]
325    let right_id = alloc.allocate();
326    {
327        let mut right_page = Page::new(right_id, PageType::Leaf, txn_id);
328        let right_refs: Vec<&[u8]> = cells[split_point..]
329            .iter()
330            .map(|(_, raw)| raw.as_slice())
331            .collect();
332        right_page.rebuild_cells(&right_refs);
333        pages.insert(right_id, right_page);
334    }
335
336    (sep_key, right_id)
337}
338
339/// Propagate a split upward through the ancestor chain.
340/// Returns the new root page ID.
341#[allow(clippy::too_many_arguments)]
342fn propagate_split_up(
343    pages: &mut HashMap<PageId, Page>,
344    alloc: &mut PageAllocator,
345    txn_id: TxnId,
346    path: &[(PageId, usize)],
347    mut left_child: PageId,
348    initial_sep: &[u8],
349    mut right_child: PageId,
350    depth: &mut u16,
351) -> PageId {
352    let mut sep_key = initial_sep.to_vec();
353    let mut pending_split = true;
354
355    for &(ancestor_id, child_idx) in path.iter().rev() {
356        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
357
358        if pending_split {
359            let ok = {
360                let page = pages.get_mut(&new_ancestor).unwrap();
361                branch_node::insert_separator(page, child_idx, left_child, &sep_key, right_child)
362            };
363
364            if ok {
365                pending_split = false;
366                left_child = new_ancestor;
367            } else {
368                // Branch also full — split it
369                let (new_sep, new_right) = split_branch_with_insert(
370                    pages,
371                    alloc,
372                    txn_id,
373                    new_ancestor,
374                    child_idx,
375                    left_child,
376                    &sep_key,
377                    right_child,
378                );
379                left_child = new_ancestor;
380                sep_key = new_sep;
381                right_child = new_right;
382            }
383        } else {
384            let page = pages.get_mut(&new_ancestor).unwrap();
385            update_branch_child(page, child_idx, left_child);
386            left_child = new_ancestor;
387        }
388    }
389
390    if pending_split {
391        // Create a new root
392        let new_root_id = alloc.allocate();
393        let mut new_root = Page::new(new_root_id, PageType::Branch, txn_id);
394        let cell = branch_node::build_cell(left_child, &sep_key);
395        new_root.write_cell(&cell).unwrap();
396        new_root.set_right_child(right_child);
397        pages.insert(new_root_id, new_root);
398        *depth += 1;
399        new_root_id
400    } else {
401        left_child
402    }
403}
404
405/// Split a full branch and insert a separator.
406/// Returns (promoted_separator_key, right_branch_page_id).
407/// The left branch is `branch_id` (rebuilt in place).
408#[allow(clippy::too_many_arguments)]
409fn split_branch_with_insert(
410    pages: &mut HashMap<PageId, Page>,
411    alloc: &mut PageAllocator,
412    txn_id: TxnId,
413    branch_id: PageId,
414    child_idx: usize,
415    new_left: PageId,
416    sep_key: &[u8],
417    new_right: PageId,
418) -> (Vec<u8>, PageId) {
419    // Collect all cells and apply the separator insertion logically
420    let (new_cells, final_right_child) = {
421        let page = pages.get(&branch_id).unwrap();
422        let n = page.num_cells() as usize;
423        let cells: Vec<(PageId, Vec<u8>)> = (0..n)
424            .map(|i| {
425                let cell = branch_node::read_cell(page, i as u16);
426                (cell.child, cell.key.to_vec())
427            })
428            .collect();
429        let old_rc = page.right_child();
430
431        let mut result = Vec::with_capacity(n + 1);
432        let final_rc;
433
434        if child_idx < n {
435            let old_key = cells[child_idx].1.clone();
436            for (i, (child, key)) in cells.into_iter().enumerate() {
437                if i == child_idx {
438                    result.push((new_left, sep_key.to_vec()));
439                    result.push((new_right, old_key.clone()));
440                } else {
441                    result.push((child, key));
442                }
443            }
444            final_rc = old_rc;
445        } else {
446            result = cells;
447            result.push((new_left, sep_key.to_vec()));
448            final_rc = new_right;
449        }
450
451        (result, final_rc)
452    };
453
454    // Size-aware split — the middle key is promoted.
455    // Left = [0..split_point], promoted = [split_point], right = [split_point+1..total].
456    let total = new_cells.len();
457    let usable = citadel_core::constants::USABLE_SIZE;
458    let raw_sizes: Vec<usize> = new_cells.iter().map(|(_, key)| 6 + key.len()).collect();
459    let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
460    cum.push(0);
461    for &sz in &raw_sizes {
462        cum.push(cum.last().unwrap() + sz);
463    }
464    let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
465    let right_fits = |sp: usize| {
466        let right_count = total - sp - 1;
467        (cum[total] - cum[sp + 1]) + right_count * 2 <= usable
468    };
469
470    let mut split_point = total / 2;
471    if !left_fits(split_point) || !right_fits(split_point) {
472        split_point = 1;
473        for sp in 1..total.saturating_sub(1) {
474            if left_fits(sp) && right_fits(sp) {
475                split_point = sp;
476                if sp >= total / 2 {
477                    break;
478                }
479            }
480        }
481    }
482
483    let promoted_sep = new_cells[split_point].1.clone();
484    let promoted_child = new_cells[split_point].0;
485
486    // Rebuild left branch with cells [0..split_point], right_child = promoted_child
487    {
488        let left_raw: Vec<Vec<u8>> = new_cells[..split_point]
489            .iter()
490            .map(|(child, key)| branch_node::build_cell(*child, key))
491            .collect();
492        let left_refs: Vec<&[u8]> = left_raw.iter().map(|c| c.as_slice()).collect();
493        let page = pages.get_mut(&branch_id).unwrap();
494        page.rebuild_cells(&left_refs);
495        page.set_right_child(promoted_child);
496    }
497
498    // Create right branch with cells [split_point+1..total], right_child = final_right_child
499    let right_branch_id = alloc.allocate();
500    {
501        let mut right_page = Page::new(right_branch_id, PageType::Branch, txn_id);
502        let right_raw: Vec<Vec<u8>> = new_cells[split_point + 1..]
503            .iter()
504            .map(|(child, key)| branch_node::build_cell(*child, key))
505            .collect();
506        let right_refs: Vec<&[u8]> = right_raw.iter().map(|c| c.as_slice()).collect();
507        right_page.rebuild_cells(&right_refs);
508        right_page.set_right_child(final_right_child);
509        pages.insert(right_branch_id, right_page);
510    }
511
512    (promoted_sep, right_branch_id)
513}
514
515/// Remove a child from a branch page at the given child index.
516fn remove_child_from_branch(page: &mut Page, child_idx: usize) {
517    let n = page.num_cells() as usize;
518    if child_idx < n {
519        let cell_sz = branch_node::get_cell_size(page, child_idx as u16);
520        page.delete_cell_at(child_idx as u16, cell_sz);
521    } else {
522        // Removing right_child: promote last cell's child
523        assert!(n > 0, "cannot remove right_child from branch with 0 cells");
524        let last_child = branch_node::read_cell(page, (n - 1) as u16).child;
525        let cell_sz = branch_node::get_cell_size(page, (n - 1) as u16);
526        page.delete_cell_at((n - 1) as u16, cell_sz);
527        page.set_right_child(last_child);
528    }
529}
530
531/// Propagate child removal upward through the ancestor chain.
532/// Handles cascading collapses when branches become empty.
533fn propagate_remove_up(
534    pages: &mut HashMap<PageId, Page>,
535    alloc: &mut PageAllocator,
536    txn_id: TxnId,
537    path: &[(PageId, usize)],
538    depth: &mut u16,
539) -> PageId {
540    // Process the bottom-most ancestor first (parent of the deleted leaf)
541    let mut level = path.len();
542
543    // Track what needs to replace the removed child's slot in the parent above
544    // Initially: the child was removed entirely, so we need to remove it from parent
545    let mut need_remove_at_level = true;
546
547    // Result: the page ID that should be propagated upward
548    let mut new_child = PageId(0); // placeholder, set below
549
550    while level > 0 && need_remove_at_level {
551        level -= 1;
552        let (ancestor_id, child_idx) = path[level];
553        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
554
555        {
556            let page = pages.get_mut(&new_ancestor).unwrap();
557            remove_child_from_branch(page, child_idx);
558        }
559
560        let num_cells = pages.get(&new_ancestor).unwrap().num_cells();
561
562        if num_cells > 0 || level == 0 {
563            if num_cells == 0 && level == 0 {
564                // Root collapsed — replace with its only child
565                let only_child = pages.get(&new_ancestor).unwrap().right_child();
566                alloc.free(new_ancestor);
567                pages.remove(&new_ancestor);
568                *depth -= 1;
569                return only_child;
570            }
571            // Branch is non-empty, or it's the root with cells
572            new_child = new_ancestor;
573            need_remove_at_level = false;
574        } else {
575            // Branch became empty (0 cells) — collapse to its right_child
576            let only_child = pages.get(&new_ancestor).unwrap().right_child();
577            alloc.free(new_ancestor);
578            pages.remove(&new_ancestor);
579            *depth -= 1;
580
581            // The only_child replaces this branch in the grandparent
582            // This is a pointer update (not a removal), so we stop cascading
583            new_child = only_child;
584            need_remove_at_level = false;
585        }
586    }
587
588    // Propagate CoW for remaining path levels above
589    if level > 0 {
590        let remaining_path = &path[..level];
591        new_child = propagate_cow_up(pages, alloc, txn_id, remaining_path, new_child);
592    }
593
594    new_child
595}
596
597#[cfg(test)]
598mod tests {
599    use super::*;
600
601    fn new_tree() -> (HashMap<PageId, Page>, PageAllocator, BTree) {
602        let mut pages = HashMap::new();
603        let mut alloc = PageAllocator::new(0);
604        let tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
605        (pages, alloc, tree)
606    }
607
608    #[test]
609    fn empty_tree_search() {
610        let (pages, _, tree) = new_tree();
611        assert_eq!(tree.search(&pages, b"anything").unwrap(), None);
612    }
613
614    #[test]
615    fn insert_and_search_single() {
616        let (mut pages, mut alloc, mut tree) = new_tree();
617        let is_new = tree
618            .insert(
619                &mut pages,
620                &mut alloc,
621                TxnId(1),
622                b"hello",
623                ValueType::Inline,
624                b"world",
625            )
626            .unwrap();
627        assert!(is_new);
628        assert_eq!(tree.entry_count, 1);
629
630        let result = tree.search(&pages, b"hello").unwrap();
631        assert_eq!(result, Some((ValueType::Inline, b"world".to_vec())));
632    }
633
634    #[test]
635    fn insert_update_existing() {
636        let (mut pages, mut alloc, mut tree) = new_tree();
637        tree.insert(
638            &mut pages,
639            &mut alloc,
640            TxnId(1),
641            b"key",
642            ValueType::Inline,
643            b"v1",
644        )
645        .unwrap();
646        let is_new = tree
647            .insert(
648                &mut pages,
649                &mut alloc,
650                TxnId(1),
651                b"key",
652                ValueType::Inline,
653                b"v2",
654            )
655            .unwrap();
656        assert!(!is_new);
657        assert_eq!(tree.entry_count, 1);
658
659        let result = tree.search(&pages, b"key").unwrap();
660        assert_eq!(result, Some((ValueType::Inline, b"v2".to_vec())));
661    }
662
663    #[test]
664    fn insert_multiple_sorted() {
665        let (mut pages, mut alloc, mut tree) = new_tree();
666        let keys = [b"dog", b"ant", b"cat", b"fox", b"bat", b"eel"];
667        for k in &keys {
668            tree.insert(&mut pages, &mut alloc, TxnId(1), *k, ValueType::Inline, *k)
669                .unwrap();
670        }
671        assert_eq!(tree.entry_count, 6);
672
673        // Verify all keys searchable
674        for k in &keys {
675            let result = tree.search(&pages, *k).unwrap();
676            assert_eq!(result, Some((ValueType::Inline, k.to_vec())));
677        }
678
679        // Verify non-existent key
680        assert_eq!(tree.search(&pages, b"zebra").unwrap(), None);
681    }
682
683    #[test]
684    fn insert_triggers_leaf_split() {
685        let (mut pages, mut alloc, mut tree) = new_tree();
686
687        // Insert enough keys to trigger at least one leaf split.
688        // Each leaf cell: 7 + key_len + value_len bytes + 2 bytes pointer.
689        // With 4-byte keys and 8-byte values: 7 + 4 + 8 = 19 bytes + 2 = 21 bytes per entry.
690        // Page usable space: 8096 bytes. Fits ~385 entries per leaf.
691        // We need > 385 entries to trigger a split.
692        let count = 500;
693        for i in 0..count {
694            let key = format!("key-{i:05}");
695            let val = format!("val-{i:05}");
696            tree.insert(
697                &mut pages,
698                &mut alloc,
699                TxnId(1),
700                key.as_bytes(),
701                ValueType::Inline,
702                val.as_bytes(),
703            )
704            .unwrap();
705        }
706
707        assert_eq!(tree.entry_count, count);
708        assert!(
709            tree.depth >= 2,
710            "tree should have split (depth={})",
711            tree.depth
712        );
713
714        // Verify all keys present
715        for i in 0..count {
716            let key = format!("key-{i:05}");
717            let val = format!("val-{i:05}");
718            let result = tree.search(&pages, key.as_bytes()).unwrap();
719            assert_eq!(result, Some((ValueType::Inline, val.into_bytes())));
720        }
721    }
722
723    #[test]
724    fn delete_existing_key() {
725        let (mut pages, mut alloc, mut tree) = new_tree();
726        tree.insert(
727            &mut pages,
728            &mut alloc,
729            TxnId(1),
730            b"a",
731            ValueType::Inline,
732            b"1",
733        )
734        .unwrap();
735        tree.insert(
736            &mut pages,
737            &mut alloc,
738            TxnId(1),
739            b"b",
740            ValueType::Inline,
741            b"2",
742        )
743        .unwrap();
744        tree.insert(
745            &mut pages,
746            &mut alloc,
747            TxnId(1),
748            b"c",
749            ValueType::Inline,
750            b"3",
751        )
752        .unwrap();
753
754        let found = tree.delete(&mut pages, &mut alloc, TxnId(1), b"b").unwrap();
755        assert!(found);
756        assert_eq!(tree.entry_count, 2);
757        assert_eq!(tree.search(&pages, b"b").unwrap(), None);
758        assert_eq!(
759            tree.search(&pages, b"a").unwrap(),
760            Some((ValueType::Inline, b"1".to_vec()))
761        );
762        assert_eq!(
763            tree.search(&pages, b"c").unwrap(),
764            Some((ValueType::Inline, b"3".to_vec()))
765        );
766    }
767
768    #[test]
769    fn delete_nonexistent_key() {
770        let (mut pages, mut alloc, mut tree) = new_tree();
771        tree.insert(
772            &mut pages,
773            &mut alloc,
774            TxnId(1),
775            b"a",
776            ValueType::Inline,
777            b"1",
778        )
779        .unwrap();
780        let found = tree.delete(&mut pages, &mut alloc, TxnId(1), b"z").unwrap();
781        assert!(!found);
782        assert_eq!(tree.entry_count, 1);
783    }
784
785    #[test]
786    fn delete_all_from_root_leaf() {
787        let (mut pages, mut alloc, mut tree) = new_tree();
788        tree.insert(
789            &mut pages,
790            &mut alloc,
791            TxnId(1),
792            b"x",
793            ValueType::Inline,
794            b"1",
795        )
796        .unwrap();
797        tree.delete(&mut pages, &mut alloc, TxnId(1), b"x").unwrap();
798        assert_eq!(tree.entry_count, 0);
799
800        // Root is still a valid (empty) leaf
801        let root = pages.get(&tree.root).unwrap();
802        assert_eq!(root.page_type(), Some(PageType::Leaf));
803        assert_eq!(root.num_cells(), 0);
804    }
805
806    #[test]
807    fn cow_produces_new_page_ids() {
808        let (mut pages, mut alloc, mut tree) = new_tree();
809        let root_before = tree.root;
810
811        tree.insert(
812            &mut pages,
813            &mut alloc,
814            TxnId(2),
815            b"key",
816            ValueType::Inline,
817            b"val",
818        )
819        .unwrap();
820        let root_after = tree.root;
821
822        // Root should have changed (CoW)
823        assert_ne!(root_before, root_after);
824        // Old root should have been freed via allocator
825        assert!(alloc.freed_this_txn().contains(&root_before));
826    }
827
828    #[test]
829    fn insert_and_delete_many() {
830        let (mut pages, mut alloc, mut tree) = new_tree();
831        let count = 1000u64;
832
833        // Insert
834        for i in 0..count {
835            let key = format!("k{i:06}");
836            let val = format!("v{i:06}");
837            tree.insert(
838                &mut pages,
839                &mut alloc,
840                TxnId(1),
841                key.as_bytes(),
842                ValueType::Inline,
843                val.as_bytes(),
844            )
845            .unwrap();
846        }
847        assert_eq!(tree.entry_count, count);
848
849        // Delete every other key
850        for i in (0..count).step_by(2) {
851            let key = format!("k{i:06}");
852            let found = tree
853                .delete(&mut pages, &mut alloc, TxnId(1), key.as_bytes())
854                .unwrap();
855            assert!(found);
856        }
857        assert_eq!(tree.entry_count, count / 2);
858
859        // Verify remaining keys
860        for i in 0..count {
861            let key = format!("k{i:06}");
862            let result = tree.search(&pages, key.as_bytes()).unwrap();
863            if i % 2 == 0 {
864                assert_eq!(result, None, "deleted key {key} should not be found");
865            } else {
866                let val = format!("v{i:06}");
867                assert_eq!(result, Some((ValueType::Inline, val.into_bytes())));
868            }
869        }
870    }
871
872    #[test]
873    fn deep_tree_insert_delete() {
874        let (mut pages, mut alloc, mut tree) = new_tree();
875
876        // Insert enough to create depth >= 2
877        let count = 2000u64;
878        for i in 0..count {
879            let key = format!("{i:08}");
880            tree.insert(
881                &mut pages,
882                &mut alloc,
883                TxnId(1),
884                key.as_bytes(),
885                ValueType::Inline,
886                b"v",
887            )
888            .unwrap();
889        }
890        assert!(tree.depth >= 2, "depth={} expected >= 2", tree.depth);
891        assert_eq!(tree.entry_count, count);
892
893        // Delete all
894        for i in 0..count {
895            let key = format!("{i:08}");
896            let found = tree
897                .delete(&mut pages, &mut alloc, TxnId(1), key.as_bytes())
898                .unwrap();
899            assert!(found, "key {key} should be deletable");
900        }
901        assert_eq!(tree.entry_count, 0);
902    }
903}