Skip to main content

citadel_buffer/
btree.rs

1//! CoW B+ tree engine.
2//!
3//! All mutations use Copy-on-Write: the old page is cloned to a new page ID,
4//! modified, and ancestors are updated to point to the new page. Old pages
5//! are freed via the allocator's pending-free list.
6//!
7//! The tree uses `HashMap<PageId, Page>` as the in-memory page store.
8
9use crate::allocator::PageAllocator;
10use citadel_core::types::{PageId, PageType, TxnId, ValueType};
11use citadel_core::{Error, Result};
12use citadel_page::page::Page;
13use citadel_page::{branch_node, leaf_node};
14use std::collections::HashMap;
15
16/// B+ tree metadata. Lightweight struct - pages are stored externally.
17#[derive(Clone)]
18pub struct BTree {
19    pub root: PageId,
20    pub depth: u16,
21    pub entry_count: u64,
22    last_insert: Option<(Vec<(PageId, usize)>, PageId)>,
23}
24
25impl BTree {
26    /// Create a new empty B+ tree with a single leaf root.
27    pub fn new(
28        pages: &mut HashMap<PageId, Page>,
29        alloc: &mut PageAllocator,
30        txn_id: TxnId,
31    ) -> Self {
32        let root_id = alloc.allocate();
33        let root = Page::new(root_id, PageType::Leaf, txn_id);
34        pages.insert(root_id, root);
35        Self {
36            root: root_id,
37            depth: 1,
38            entry_count: 0,
39            last_insert: None,
40        }
41    }
42
43    /// Create a BTree from existing metadata (e.g., loaded from commit slot).
44    pub fn from_existing(root: PageId, depth: u16, entry_count: u64) -> Self {
45        Self {
46            root,
47            depth,
48            entry_count,
49            last_insert: None,
50        }
51    }
52
53    /// Search for a key. Returns `Some((val_type, value))` if found, `None` otherwise.
54    pub fn search(
55        &self,
56        pages: &HashMap<PageId, Page>,
57        key: &[u8],
58    ) -> Result<Option<(ValueType, Vec<u8>)>> {
59        let mut current = self.root;
60        loop {
61            let page = pages.get(&current).ok_or(Error::PageOutOfBounds(current))?;
62            match page.page_type() {
63                Some(PageType::Leaf) => {
64                    return match leaf_node::search(page, key) {
65                        Ok(idx) => {
66                            let cell = leaf_node::read_cell(page, idx);
67                            Ok(Some((cell.val_type, cell.value.to_vec())))
68                        }
69                        Err(_) => Ok(None),
70                    };
71                }
72                Some(PageType::Branch) => {
73                    let idx = branch_node::search_child_index(page, key);
74                    current = branch_node::get_child(page, idx);
75                }
76                _ => {
77                    return Err(Error::InvalidPageType(page.page_type_raw(), current));
78                }
79            }
80        }
81    }
82
83    pub fn lil_would_hit(&self, pages: &HashMap<PageId, Page>, key: &[u8]) -> bool {
84        if let Some((_, cached_leaf)) = &self.last_insert {
85            if let Some(page) = pages.get(cached_leaf) {
86                let n = page.num_cells();
87                return n > 0 && key > leaf_node::read_cell(page, n - 1).key;
88            }
89        }
90        false
91    }
92
93    /// Insert a key-value pair. Returns `true` if a new entry was added,
94    /// `false` if an existing key was updated.
95    pub fn insert(
96        &mut self,
97        pages: &mut HashMap<PageId, Page>,
98        alloc: &mut PageAllocator,
99        txn_id: TxnId,
100        key: &[u8],
101        val_type: ValueType,
102        value: &[u8],
103    ) -> Result<bool> {
104        // LIL cache: skip walk_to_leaf for sequential appends to the rightmost leaf.
105        if let Some((cached_path, cached_leaf)) = self.last_insert.take() {
106            let hit = {
107                let page = pages
108                    .get(&cached_leaf)
109                    .ok_or(Error::PageOutOfBounds(cached_leaf))?;
110                let n = page.num_cells();
111                n > 0 && key > leaf_node::read_cell(page, n - 1).key
112            };
113            if hit {
114                let cow_id = cow_page(pages, alloc, cached_leaf, txn_id);
115                let ok = {
116                    let page = pages.get_mut(&cow_id).unwrap();
117                    leaf_node::insert_direct(page, key, val_type, value)
118                };
119                if ok {
120                    if cow_id != cached_leaf {
121                        self.root = propagate_cow_up(pages, alloc, txn_id, &cached_path, cow_id);
122                    }
123                    self.entry_count += 1;
124                    self.last_insert = Some((cached_path, cow_id));
125                    return Ok(true);
126                }
127                let (sep_key, right_id) =
128                    split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
129                self.root = propagate_split_up(
130                    pages,
131                    alloc,
132                    txn_id,
133                    &cached_path,
134                    cow_id,
135                    &sep_key,
136                    right_id,
137                    &mut self.depth,
138                );
139                self.last_insert = None;
140                self.entry_count += 1;
141                return Ok(true);
142            }
143        }
144
145        let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
146
147        let key_exists = {
148            let page = pages.get(&leaf_id).unwrap();
149            leaf_node::search(page, key).is_ok()
150        };
151
152        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
153
154        let leaf_ok = {
155            let page = pages.get_mut(&new_leaf_id).unwrap();
156            leaf_node::insert_direct(page, key, val_type, value)
157        };
158
159        if leaf_ok {
160            let mut child = new_leaf_id;
161            let mut is_rightmost = true;
162            let mut new_path = path;
163            for i in (0..new_path.len()).rev() {
164                let (ancestor_id, child_idx) = new_path[i];
165                let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
166                let page = pages.get_mut(&new_ancestor).unwrap();
167                update_branch_child(page, child_idx, child);
168                if child_idx != page.num_cells() as usize {
169                    is_rightmost = false;
170                }
171                new_path[i] = (new_ancestor, child_idx);
172                child = new_ancestor;
173            }
174            self.root = child;
175
176            if is_rightmost {
177                self.last_insert = Some((new_path, new_leaf_id));
178            }
179
180            if !key_exists {
181                self.entry_count += 1;
182            }
183            return Ok(!key_exists);
184        }
185
186        self.last_insert = None;
187        let (sep_key, right_id) =
188            split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
189        self.root = propagate_split_up(
190            pages,
191            alloc,
192            txn_id,
193            &path,
194            new_leaf_id,
195            &sep_key,
196            right_id,
197            &mut self.depth,
198        );
199
200        if !key_exists {
201            self.entry_count += 1;
202        }
203        Ok(!key_exists)
204    }
205
206    /// Delete a key. Returns `true` if the key was found and deleted.
207    pub fn delete(
208        &mut self,
209        pages: &mut HashMap<PageId, Page>,
210        alloc: &mut PageAllocator,
211        txn_id: TxnId,
212        key: &[u8],
213    ) -> Result<bool> {
214        self.last_insert = None;
215        let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
216
217        let found = {
218            let page = pages.get(&leaf_id).unwrap();
219            leaf_node::search(page, key).is_ok()
220        };
221        if !found {
222            return Ok(false);
223        }
224
225        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
226        {
227            let page = pages.get_mut(&new_leaf_id).unwrap();
228            leaf_node::delete(page, key);
229        }
230
231        let leaf_empty = pages.get(&new_leaf_id).unwrap().num_cells() == 0;
232
233        if !leaf_empty || path.is_empty() {
234            self.root = propagate_cow_up(pages, alloc, txn_id, &path, new_leaf_id);
235            self.entry_count -= 1;
236            return Ok(true);
237        }
238
239        alloc.free(new_leaf_id);
240        pages.remove(&new_leaf_id);
241
242        self.root = propagate_remove_up(pages, alloc, txn_id, &path, &mut self.depth);
243        self.entry_count -= 1;
244        Ok(true)
245    }
246
247    /// Walk from root to the leaf that should contain `key`.
248    /// Returns (path, leaf_page_id) where path is Vec<(ancestor_id, child_idx)>.
249    fn walk_to_leaf(
250        &self,
251        pages: &HashMap<PageId, Page>,
252        key: &[u8],
253    ) -> Result<(Vec<(PageId, usize)>, PageId)> {
254        let mut path = Vec::with_capacity(self.depth as usize);
255        let mut current = self.root;
256        loop {
257            let page = pages.get(&current).ok_or(Error::PageOutOfBounds(current))?;
258            match page.page_type() {
259                Some(PageType::Leaf) => return Ok((path, current)),
260                Some(PageType::Branch) => {
261                    let child_idx = branch_node::search_child_index(page, key);
262                    let child = branch_node::get_child(page, child_idx);
263                    path.push((current, child_idx));
264                    current = child;
265                }
266                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
267            }
268        }
269    }
270}
271
272/// Copy-on-Write: clone a page to a new page ID, free the old one.
273/// If the page already belongs to this transaction, return it as-is.
274fn cow_page(
275    pages: &mut HashMap<PageId, Page>,
276    alloc: &mut PageAllocator,
277    old_id: PageId,
278    txn_id: TxnId,
279) -> PageId {
280    if pages.get(&old_id).unwrap().txn_id() == txn_id {
281        return old_id;
282    }
283    let new_id = alloc.allocate();
284    let mut new_page = pages.get(&old_id).unwrap().clone();
285    new_page.set_page_id(new_id);
286    new_page.set_txn_id(txn_id);
287    pages.insert(new_id, new_page);
288    alloc.free(old_id);
289    new_id
290}
291
292/// Update a branch's child pointer at `child_idx` to point to `new_child`.
293fn update_branch_child(page: &mut Page, child_idx: usize, new_child: PageId) {
294    let n = page.num_cells() as usize;
295    if child_idx < n {
296        let offset = page.cell_offset(child_idx as u16) as usize;
297        page.data[offset..offset + 4].copy_from_slice(&new_child.as_u32().to_le_bytes());
298    } else {
299        page.set_right_child(new_child);
300    }
301}
302
303/// Propagate CoW up through ancestors (no split, just update child pointers).
304fn propagate_cow_up(
305    pages: &mut HashMap<PageId, Page>,
306    alloc: &mut PageAllocator,
307    txn_id: TxnId,
308    path: &[(PageId, usize)],
309    mut new_child: PageId,
310) -> PageId {
311    for &(ancestor_id, child_idx) in path.iter().rev() {
312        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
313        let page = pages.get_mut(&new_ancestor).unwrap();
314        update_branch_child(page, child_idx, new_child);
315        new_child = new_ancestor;
316    }
317    new_child
318}
319
320/// Split a full leaf and insert the new key-value pair.
321/// Returns (separator_key, right_page_id). The left page is `leaf_id` (rebuilt in place).
322fn split_leaf_with_insert(
323    pages: &mut HashMap<PageId, Page>,
324    alloc: &mut PageAllocator,
325    txn_id: TxnId,
326    leaf_id: PageId,
327    key: &[u8],
328    val_type: ValueType,
329    value: &[u8],
330) -> (Vec<u8>, PageId) {
331    let mut cells: Vec<(Vec<u8>, Vec<u8>)> = {
332        let page = pages.get(&leaf_id).unwrap();
333        let n = page.num_cells() as usize;
334        (0..n)
335            .map(|i| {
336                let cell = leaf_node::read_cell(page, i as u16);
337                let raw = leaf_node::read_cell_bytes(page, i as u16);
338                (cell.key.to_vec(), raw)
339            })
340            .collect()
341    };
342
343    let new_raw = leaf_node::build_cell(key, val_type, value);
344    match cells.binary_search_by(|(k, _)| k.as_slice().cmp(key)) {
345        Ok(idx) => cells[idx] = (key.to_vec(), new_raw),
346        Err(idx) => cells.insert(idx, (key.to_vec(), new_raw)),
347    }
348
349    let total = cells.len();
350
351    let usable = citadel_core::constants::USABLE_SIZE;
352    let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
353    cum.push(0);
354    for (_, raw) in &cells {
355        cum.push(cum.last().unwrap() + raw.len());
356    }
357    let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
358    let right_fits = |sp: usize| (cum[total] - cum[sp]) + (total - sp) * 2 <= usable;
359
360    let mut split_point = total / 2;
361    if !left_fits(split_point) || !right_fits(split_point) {
362        split_point = 1;
363        for sp in 1..total {
364            if left_fits(sp) && right_fits(sp) {
365                split_point = sp;
366                if sp >= total / 2 {
367                    break;
368                }
369            }
370        }
371    }
372
373    let sep_key = cells[split_point].0.clone();
374
375    {
376        let left_refs: Vec<&[u8]> = cells[..split_point]
377            .iter()
378            .map(|(_, raw)| raw.as_slice())
379            .collect();
380        let page = pages.get_mut(&leaf_id).unwrap();
381        page.rebuild_cells(&left_refs);
382    }
383
384    let right_id = alloc.allocate();
385    {
386        let mut right_page = Page::new(right_id, PageType::Leaf, txn_id);
387        let right_refs: Vec<&[u8]> = cells[split_point..]
388            .iter()
389            .map(|(_, raw)| raw.as_slice())
390            .collect();
391        right_page.rebuild_cells(&right_refs);
392        pages.insert(right_id, right_page);
393    }
394
395    (sep_key, right_id)
396}
397
398#[allow(clippy::too_many_arguments)]
399fn propagate_split_up(
400    pages: &mut HashMap<PageId, Page>,
401    alloc: &mut PageAllocator,
402    txn_id: TxnId,
403    path: &[(PageId, usize)],
404    mut left_child: PageId,
405    initial_sep: &[u8],
406    mut right_child: PageId,
407    depth: &mut u16,
408) -> PageId {
409    let mut sep_key = initial_sep.to_vec();
410    let mut pending_split = true;
411
412    for &(ancestor_id, child_idx) in path.iter().rev() {
413        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
414
415        if pending_split {
416            let ok = {
417                let page = pages.get_mut(&new_ancestor).unwrap();
418                branch_node::insert_separator(page, child_idx, left_child, &sep_key, right_child)
419            };
420
421            if ok {
422                pending_split = false;
423                left_child = new_ancestor;
424            } else {
425                let (new_sep, new_right) = split_branch_with_insert(
426                    pages,
427                    alloc,
428                    txn_id,
429                    new_ancestor,
430                    child_idx,
431                    left_child,
432                    &sep_key,
433                    right_child,
434                );
435                left_child = new_ancestor;
436                sep_key = new_sep;
437                right_child = new_right;
438            }
439        } else {
440            let page = pages.get_mut(&new_ancestor).unwrap();
441            update_branch_child(page, child_idx, left_child);
442            left_child = new_ancestor;
443        }
444    }
445
446    if pending_split {
447        let new_root_id = alloc.allocate();
448        let mut new_root = Page::new(new_root_id, PageType::Branch, txn_id);
449        let cell = branch_node::build_cell(left_child, &sep_key);
450        new_root.write_cell(&cell).unwrap();
451        new_root.set_right_child(right_child);
452        pages.insert(new_root_id, new_root);
453        *depth += 1;
454        new_root_id
455    } else {
456        left_child
457    }
458}
459
460#[allow(clippy::too_many_arguments)]
461fn split_branch_with_insert(
462    pages: &mut HashMap<PageId, Page>,
463    alloc: &mut PageAllocator,
464    txn_id: TxnId,
465    branch_id: PageId,
466    child_idx: usize,
467    new_left: PageId,
468    sep_key: &[u8],
469    new_right: PageId,
470) -> (Vec<u8>, PageId) {
471    // Collect all cells and apply the separator insertion logically
472    let (new_cells, final_right_child) = {
473        let page = pages.get(&branch_id).unwrap();
474        let n = page.num_cells() as usize;
475        let cells: Vec<(PageId, Vec<u8>)> = (0..n)
476            .map(|i| {
477                let cell = branch_node::read_cell(page, i as u16);
478                (cell.child, cell.key.to_vec())
479            })
480            .collect();
481        let old_rc = page.right_child();
482
483        let mut result = Vec::with_capacity(n + 1);
484        let final_rc;
485
486        if child_idx < n {
487            let old_key = cells[child_idx].1.clone();
488            for (i, (child, key)) in cells.into_iter().enumerate() {
489                if i == child_idx {
490                    result.push((new_left, sep_key.to_vec()));
491                    result.push((new_right, old_key.clone()));
492                } else {
493                    result.push((child, key));
494                }
495            }
496            final_rc = old_rc;
497        } else {
498            result = cells;
499            result.push((new_left, sep_key.to_vec()));
500            final_rc = new_right;
501        }
502
503        (result, final_rc)
504    };
505
506    let total = new_cells.len();
507    let usable = citadel_core::constants::USABLE_SIZE;
508    let raw_sizes: Vec<usize> = new_cells.iter().map(|(_, key)| 6 + key.len()).collect();
509    let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
510    cum.push(0);
511    for &sz in &raw_sizes {
512        cum.push(cum.last().unwrap() + sz);
513    }
514    let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
515    let right_fits = |sp: usize| {
516        let right_count = total - sp - 1;
517        (cum[total] - cum[sp + 1]) + right_count * 2 <= usable
518    };
519
520    let mut split_point = total / 2;
521    if !left_fits(split_point) || !right_fits(split_point) {
522        split_point = 1;
523        for sp in 1..total.saturating_sub(1) {
524            if left_fits(sp) && right_fits(sp) {
525                split_point = sp;
526                if sp >= total / 2 {
527                    break;
528                }
529            }
530        }
531    }
532
533    let promoted_sep = new_cells[split_point].1.clone();
534    let promoted_child = new_cells[split_point].0;
535
536    {
537        let left_raw: Vec<Vec<u8>> = new_cells[..split_point]
538            .iter()
539            .map(|(child, key)| branch_node::build_cell(*child, key))
540            .collect();
541        let left_refs: Vec<&[u8]> = left_raw.iter().map(|c| c.as_slice()).collect();
542        let page = pages.get_mut(&branch_id).unwrap();
543        page.rebuild_cells(&left_refs);
544        page.set_right_child(promoted_child);
545    }
546
547    let right_branch_id = alloc.allocate();
548    {
549        let mut right_page = Page::new(right_branch_id, PageType::Branch, txn_id);
550        let right_raw: Vec<Vec<u8>> = new_cells[split_point + 1..]
551            .iter()
552            .map(|(child, key)| branch_node::build_cell(*child, key))
553            .collect();
554        let right_refs: Vec<&[u8]> = right_raw.iter().map(|c| c.as_slice()).collect();
555        right_page.rebuild_cells(&right_refs);
556        right_page.set_right_child(final_right_child);
557        pages.insert(right_branch_id, right_page);
558    }
559
560    (promoted_sep, right_branch_id)
561}
562
563fn remove_child_from_branch(page: &mut Page, child_idx: usize) {
564    let n = page.num_cells() as usize;
565    if child_idx < n {
566        let cell_sz = branch_node::get_cell_size(page, child_idx as u16);
567        page.delete_cell_at(child_idx as u16, cell_sz);
568    } else {
569        assert!(n > 0, "cannot remove right_child from branch with 0 cells");
570        let last_child = branch_node::read_cell(page, (n - 1) as u16).child;
571        let cell_sz = branch_node::get_cell_size(page, (n - 1) as u16);
572        page.delete_cell_at((n - 1) as u16, cell_sz);
573        page.set_right_child(last_child);
574    }
575}
576
577fn propagate_remove_up(
578    pages: &mut HashMap<PageId, Page>,
579    alloc: &mut PageAllocator,
580    txn_id: TxnId,
581    path: &[(PageId, usize)],
582    depth: &mut u16,
583) -> PageId {
584    // Process the bottom-most ancestor first (parent of the deleted leaf)
585    let mut level = path.len();
586
587    // Track what needs to replace the removed child's slot in the parent above
588    // Initially: the child was removed entirely, so we need to remove it from parent
589    let mut need_remove_at_level = true;
590
591    // Result: the page ID that should be propagated upward
592    let mut new_child = PageId(0); // placeholder, set below
593
594    while level > 0 && need_remove_at_level {
595        level -= 1;
596        let (ancestor_id, child_idx) = path[level];
597        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
598
599        {
600            let page = pages.get_mut(&new_ancestor).unwrap();
601            remove_child_from_branch(page, child_idx);
602        }
603
604        let num_cells = pages.get(&new_ancestor).unwrap().num_cells();
605
606        if num_cells > 0 || level == 0 {
607            if num_cells == 0 && level == 0 {
608                // Root collapsed - replace with its only child
609                let only_child = pages.get(&new_ancestor).unwrap().right_child();
610                alloc.free(new_ancestor);
611                pages.remove(&new_ancestor);
612                *depth -= 1;
613                return only_child;
614            }
615            // Branch is non-empty, or it's the root with cells
616            new_child = new_ancestor;
617            need_remove_at_level = false;
618        } else {
619            // Branch became empty (0 cells) - collapse to its right_child
620            let only_child = pages.get(&new_ancestor).unwrap().right_child();
621            alloc.free(new_ancestor);
622            pages.remove(&new_ancestor);
623            *depth -= 1;
624
625            // The only_child replaces this branch in the grandparent
626            // This is a pointer update (not a removal), so we stop cascading
627            new_child = only_child;
628            need_remove_at_level = false;
629        }
630    }
631
632    // Propagate CoW for remaining path levels above
633    if level > 0 {
634        let remaining_path = &path[..level];
635        new_child = propagate_cow_up(pages, alloc, txn_id, remaining_path, new_child);
636    }
637
638    new_child
639}
640
641#[cfg(test)]
642mod tests {
643    use super::*;
644
645    fn new_tree() -> (HashMap<PageId, Page>, PageAllocator, BTree) {
646        let mut pages = HashMap::new();
647        let mut alloc = PageAllocator::new(0);
648        let tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
649        (pages, alloc, tree)
650    }
651
652    #[test]
653    fn empty_tree_search() {
654        let (pages, _, tree) = new_tree();
655        assert_eq!(tree.search(&pages, b"anything").unwrap(), None);
656    }
657
658    #[test]
659    fn insert_and_search_single() {
660        let (mut pages, mut alloc, mut tree) = new_tree();
661        let is_new = tree
662            .insert(
663                &mut pages,
664                &mut alloc,
665                TxnId(1),
666                b"hello",
667                ValueType::Inline,
668                b"world",
669            )
670            .unwrap();
671        assert!(is_new);
672        assert_eq!(tree.entry_count, 1);
673
674        let result = tree.search(&pages, b"hello").unwrap();
675        assert_eq!(result, Some((ValueType::Inline, b"world".to_vec())));
676    }
677
678    #[test]
679    fn insert_update_existing() {
680        let (mut pages, mut alloc, mut tree) = new_tree();
681        tree.insert(
682            &mut pages,
683            &mut alloc,
684            TxnId(1),
685            b"key",
686            ValueType::Inline,
687            b"v1",
688        )
689        .unwrap();
690        let is_new = tree
691            .insert(
692                &mut pages,
693                &mut alloc,
694                TxnId(1),
695                b"key",
696                ValueType::Inline,
697                b"v2",
698            )
699            .unwrap();
700        assert!(!is_new);
701        assert_eq!(tree.entry_count, 1);
702
703        let result = tree.search(&pages, b"key").unwrap();
704        assert_eq!(result, Some((ValueType::Inline, b"v2".to_vec())));
705    }
706
707    #[test]
708    fn insert_multiple_sorted() {
709        let (mut pages, mut alloc, mut tree) = new_tree();
710        let keys = [b"dog", b"ant", b"cat", b"fox", b"bat", b"eel"];
711        for k in &keys {
712            tree.insert(&mut pages, &mut alloc, TxnId(1), *k, ValueType::Inline, *k)
713                .unwrap();
714        }
715        assert_eq!(tree.entry_count, 6);
716
717        // Verify all keys searchable
718        for k in &keys {
719            let result = tree.search(&pages, *k).unwrap();
720            assert_eq!(result, Some((ValueType::Inline, k.to_vec())));
721        }
722
723        // Verify non-existent key
724        assert_eq!(tree.search(&pages, b"zebra").unwrap(), None);
725    }
726
727    #[test]
728    fn insert_triggers_leaf_split() {
729        let (mut pages, mut alloc, mut tree) = new_tree();
730
731        // Insert enough keys to trigger at least one leaf split.
732        // Each leaf cell: 7 + key_len + value_len bytes + 2 bytes pointer.
733        // With 4-byte keys and 8-byte values: 7 + 4 + 8 = 19 bytes + 2 = 21 bytes per entry.
734        // Page usable space: 8096 bytes. Fits ~385 entries per leaf.
735        // We need > 385 entries to trigger a split.
736        let count = 500;
737        for i in 0..count {
738            let key = format!("key-{i:05}");
739            let val = format!("val-{i:05}");
740            tree.insert(
741                &mut pages,
742                &mut alloc,
743                TxnId(1),
744                key.as_bytes(),
745                ValueType::Inline,
746                val.as_bytes(),
747            )
748            .unwrap();
749        }
750
751        assert_eq!(tree.entry_count, count);
752        assert!(
753            tree.depth >= 2,
754            "tree should have split (depth={})",
755            tree.depth
756        );
757
758        // Verify all keys present
759        for i in 0..count {
760            let key = format!("key-{i:05}");
761            let val = format!("val-{i:05}");
762            let result = tree.search(&pages, key.as_bytes()).unwrap();
763            assert_eq!(result, Some((ValueType::Inline, val.into_bytes())));
764        }
765    }
766
767    #[test]
768    fn delete_existing_key() {
769        let (mut pages, mut alloc, mut tree) = new_tree();
770        tree.insert(
771            &mut pages,
772            &mut alloc,
773            TxnId(1),
774            b"a",
775            ValueType::Inline,
776            b"1",
777        )
778        .unwrap();
779        tree.insert(
780            &mut pages,
781            &mut alloc,
782            TxnId(1),
783            b"b",
784            ValueType::Inline,
785            b"2",
786        )
787        .unwrap();
788        tree.insert(
789            &mut pages,
790            &mut alloc,
791            TxnId(1),
792            b"c",
793            ValueType::Inline,
794            b"3",
795        )
796        .unwrap();
797
798        let found = tree.delete(&mut pages, &mut alloc, TxnId(1), b"b").unwrap();
799        assert!(found);
800        assert_eq!(tree.entry_count, 2);
801        assert_eq!(tree.search(&pages, b"b").unwrap(), None);
802        assert_eq!(
803            tree.search(&pages, b"a").unwrap(),
804            Some((ValueType::Inline, b"1".to_vec()))
805        );
806        assert_eq!(
807            tree.search(&pages, b"c").unwrap(),
808            Some((ValueType::Inline, b"3".to_vec()))
809        );
810    }
811
812    #[test]
813    fn delete_nonexistent_key() {
814        let (mut pages, mut alloc, mut tree) = new_tree();
815        tree.insert(
816            &mut pages,
817            &mut alloc,
818            TxnId(1),
819            b"a",
820            ValueType::Inline,
821            b"1",
822        )
823        .unwrap();
824        let found = tree.delete(&mut pages, &mut alloc, TxnId(1), b"z").unwrap();
825        assert!(!found);
826        assert_eq!(tree.entry_count, 1);
827    }
828
829    #[test]
830    fn delete_all_from_root_leaf() {
831        let (mut pages, mut alloc, mut tree) = new_tree();
832        tree.insert(
833            &mut pages,
834            &mut alloc,
835            TxnId(1),
836            b"x",
837            ValueType::Inline,
838            b"1",
839        )
840        .unwrap();
841        tree.delete(&mut pages, &mut alloc, TxnId(1), b"x").unwrap();
842        assert_eq!(tree.entry_count, 0);
843
844        // Root is still a valid (empty) leaf
845        let root = pages.get(&tree.root).unwrap();
846        assert_eq!(root.page_type(), Some(PageType::Leaf));
847        assert_eq!(root.num_cells(), 0);
848    }
849
850    #[test]
851    fn cow_produces_new_page_ids() {
852        let (mut pages, mut alloc, mut tree) = new_tree();
853        let root_before = tree.root;
854
855        tree.insert(
856            &mut pages,
857            &mut alloc,
858            TxnId(2),
859            b"key",
860            ValueType::Inline,
861            b"val",
862        )
863        .unwrap();
864        let root_after = tree.root;
865
866        // Root should have changed (CoW)
867        assert_ne!(root_before, root_after);
868        // Old root should have been freed via allocator
869        assert!(alloc.freed_this_txn().contains(&root_before));
870    }
871
872    #[test]
873    fn insert_and_delete_many() {
874        let (mut pages, mut alloc, mut tree) = new_tree();
875        let count = 1000u64;
876
877        // Insert
878        for i in 0..count {
879            let key = format!("k{i:06}");
880            let val = format!("v{i:06}");
881            tree.insert(
882                &mut pages,
883                &mut alloc,
884                TxnId(1),
885                key.as_bytes(),
886                ValueType::Inline,
887                val.as_bytes(),
888            )
889            .unwrap();
890        }
891        assert_eq!(tree.entry_count, count);
892
893        // Delete every other key
894        for i in (0..count).step_by(2) {
895            let key = format!("k{i:06}");
896            let found = tree
897                .delete(&mut pages, &mut alloc, TxnId(1), key.as_bytes())
898                .unwrap();
899            assert!(found);
900        }
901        assert_eq!(tree.entry_count, count / 2);
902
903        // Verify remaining keys
904        for i in 0..count {
905            let key = format!("k{i:06}");
906            let result = tree.search(&pages, key.as_bytes()).unwrap();
907            if i % 2 == 0 {
908                assert_eq!(result, None, "deleted key {key} should not be found");
909            } else {
910                let val = format!("v{i:06}");
911                assert_eq!(result, Some((ValueType::Inline, val.into_bytes())));
912            }
913        }
914    }
915
916    #[test]
917    fn deep_tree_insert_delete() {
918        let (mut pages, mut alloc, mut tree) = new_tree();
919
920        // Insert enough to create depth >= 2
921        let count = 2000u64;
922        for i in 0..count {
923            let key = format!("{i:08}");
924            tree.insert(
925                &mut pages,
926                &mut alloc,
927                TxnId(1),
928                key.as_bytes(),
929                ValueType::Inline,
930                b"v",
931            )
932            .unwrap();
933        }
934        assert!(tree.depth >= 2, "depth={} expected >= 2", tree.depth);
935        assert_eq!(tree.entry_count, count);
936
937        // Delete all
938        for i in 0..count {
939            let key = format!("{i:08}");
940            let found = tree
941                .delete(&mut pages, &mut alloc, TxnId(1), key.as_bytes())
942                .unwrap();
943            assert!(found, "key {key} should be deletable");
944        }
945        assert_eq!(tree.entry_count, 0);
946    }
947}