Skip to main content

citadel_buffer/
btree.rs

1//! CoW B+ tree engine.
2//!
3//! All mutations use Copy-on-Write: the old page is cloned to a new page ID,
4//! modified, and ancestors are updated to point to the new page. Old pages
5//! are freed via the allocator's pending-free list.
6//!
7//! The tree uses `HashMap<PageId, Page>` as the in-memory page store.
8
9use crate::allocator::PageAllocator;
10use citadel_core::types::{PageId, PageType, TxnId, ValueType};
11use citadel_core::{Error, Result};
12use citadel_page::page::Page;
13use citadel_page::{branch_node, leaf_node};
14use std::collections::HashMap;
15
16/// B+ tree metadata. Lightweight struct — pages are stored externally.
17#[derive(Clone)]
18pub struct BTree {
19    pub root: PageId,
20    pub depth: u16,
21    pub entry_count: u64,
22    last_insert: Option<(Vec<(PageId, usize)>, PageId)>,
23}
24
25impl BTree {
26    /// Create a new empty B+ tree with a single leaf root.
27    pub fn new(
28        pages: &mut HashMap<PageId, Page>,
29        alloc: &mut PageAllocator,
30        txn_id: TxnId,
31    ) -> Self {
32        let root_id = alloc.allocate();
33        let root = Page::new(root_id, PageType::Leaf, txn_id);
34        pages.insert(root_id, root);
35        Self {
36            root: root_id,
37            depth: 1,
38            entry_count: 0,
39            last_insert: None,
40        }
41    }
42
43    /// Create a BTree from existing metadata (e.g., loaded from commit slot).
44    pub fn from_existing(root: PageId, depth: u16, entry_count: u64) -> Self {
45        Self {
46            root,
47            depth,
48            entry_count,
49            last_insert: None,
50        }
51    }
52
53    /// Search for a key. Returns `Some((val_type, value))` if found, `None` otherwise.
54    pub fn search(
55        &self,
56        pages: &HashMap<PageId, Page>,
57        key: &[u8],
58    ) -> Result<Option<(ValueType, Vec<u8>)>> {
59        let mut current = self.root;
60        loop {
61            let page = pages.get(&current).ok_or(Error::PageOutOfBounds(current))?;
62            match page.page_type() {
63                Some(PageType::Leaf) => {
64                    return match leaf_node::search(page, key) {
65                        Ok(idx) => {
66                            let cell = leaf_node::read_cell(page, idx);
67                            Ok(Some((cell.val_type, cell.value.to_vec())))
68                        }
69                        Err(_) => Ok(None),
70                    };
71                }
72                Some(PageType::Branch) => {
73                    let idx = branch_node::search_child_index(page, key);
74                    current = branch_node::get_child(page, idx);
75                }
76                _ => {
77                    return Err(Error::InvalidPageType(page.page_type_raw(), current));
78                }
79            }
80        }
81    }
82
83    pub fn lil_would_hit(&self, pages: &HashMap<PageId, Page>, key: &[u8]) -> bool {
84        if let Some((_, cached_leaf)) = &self.last_insert {
85            if let Some(page) = pages.get(cached_leaf) {
86                let n = page.num_cells();
87                return n > 0 && key > leaf_node::read_cell(page, n - 1).key;
88            }
89        }
90        false
91    }
92
93    /// Insert a key-value pair. Returns `true` if a new entry was added,
94    /// `false` if an existing key was updated.
95    pub fn insert(
96        &mut self,
97        pages: &mut HashMap<PageId, Page>,
98        alloc: &mut PageAllocator,
99        txn_id: TxnId,
100        key: &[u8],
101        val_type: ValueType,
102        value: &[u8],
103    ) -> Result<bool> {
104        // LIL cache: skip walk_to_leaf for sequential appends to the rightmost leaf.
105        if let Some((cached_path, cached_leaf)) = self.last_insert.take() {
106            let hit = {
107                let page = pages
108                    .get(&cached_leaf)
109                    .ok_or(Error::PageOutOfBounds(cached_leaf))?;
110                let n = page.num_cells();
111                n > 0 && key > leaf_node::read_cell(page, n - 1).key
112            };
113            if hit {
114                let cow_id = cow_page(pages, alloc, cached_leaf, txn_id);
115                let ok = {
116                    let page = pages.get_mut(&cow_id).unwrap();
117                    leaf_node::insert_direct(page, key, val_type, value)
118                };
119                if ok {
120                    if cow_id != cached_leaf {
121                        self.root = propagate_cow_up(pages, alloc, txn_id, &cached_path, cow_id);
122                    }
123                    self.entry_count += 1;
124                    self.last_insert = Some((cached_path, cow_id));
125                    return Ok(true);
126                }
127                let (sep_key, right_id) =
128                    split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
129                self.root = propagate_split_up(
130                    pages,
131                    alloc,
132                    txn_id,
133                    &cached_path,
134                    cow_id,
135                    &sep_key,
136                    right_id,
137                    &mut self.depth,
138                );
139                self.last_insert = None;
140                self.entry_count += 1;
141                return Ok(true);
142            }
143        }
144
145        let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
146
147        let key_exists = {
148            let page = pages.get(&leaf_id).unwrap();
149            leaf_node::search(page, key).is_ok()
150        };
151
152        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
153
154        let leaf_ok = {
155            let page = pages.get_mut(&new_leaf_id).unwrap();
156            leaf_node::insert_direct(page, key, val_type, value)
157        };
158
159        if leaf_ok {
160            let mut child = new_leaf_id;
161            let mut is_rightmost = true;
162            let mut new_path = path;
163            for i in (0..new_path.len()).rev() {
164                let (ancestor_id, child_idx) = new_path[i];
165                let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
166                let page = pages.get_mut(&new_ancestor).unwrap();
167                update_branch_child(page, child_idx, child);
168                if child_idx != page.num_cells() as usize {
169                    is_rightmost = false;
170                }
171                new_path[i] = (new_ancestor, child_idx);
172                child = new_ancestor;
173            }
174            self.root = child;
175
176            if is_rightmost {
177                self.last_insert = Some((new_path, new_leaf_id));
178            }
179
180            if !key_exists {
181                self.entry_count += 1;
182            }
183            return Ok(!key_exists);
184        }
185
186        self.last_insert = None;
187        let (sep_key, right_id) =
188            split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
189        self.root = propagate_split_up(
190            pages,
191            alloc,
192            txn_id,
193            &path,
194            new_leaf_id,
195            &sep_key,
196            right_id,
197            &mut self.depth,
198        );
199
200        if !key_exists {
201            self.entry_count += 1;
202        }
203        Ok(!key_exists)
204    }
205
206    /// Delete a key. Returns `true` if the key was found and deleted.
207    pub fn delete(
208        &mut self,
209        pages: &mut HashMap<PageId, Page>,
210        alloc: &mut PageAllocator,
211        txn_id: TxnId,
212        key: &[u8],
213    ) -> Result<bool> {
214        self.last_insert = None;
215        let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
216
217        // Check if key exists
218        let found = {
219            let page = pages.get(&leaf_id).unwrap();
220            leaf_node::search(page, key).is_ok()
221        };
222        if !found {
223            return Ok(false);
224        }
225
226        // CoW the leaf and delete
227        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
228        {
229            let page = pages.get_mut(&new_leaf_id).unwrap();
230            leaf_node::delete(page, key);
231        }
232
233        // Check if leaf became empty
234        let leaf_empty = pages.get(&new_leaf_id).unwrap().num_cells() == 0;
235
236        if !leaf_empty || path.is_empty() {
237            // Leaf is non-empty, or it's the root (root can be empty)
238            self.root = propagate_cow_up(pages, alloc, txn_id, &path, new_leaf_id);
239            self.entry_count -= 1;
240            return Ok(true);
241        }
242
243        // Empty leaf — remove from tree
244        alloc.free(new_leaf_id);
245        pages.remove(&new_leaf_id);
246
247        // Walk up, handling the removal
248        self.root = propagate_remove_up(pages, alloc, txn_id, &path, &mut self.depth);
249        self.entry_count -= 1;
250        Ok(true)
251    }
252
253    /// Walk from root to the leaf that should contain `key`.
254    /// Returns (path, leaf_page_id) where path is Vec<(ancestor_id, child_idx)>.
255    fn walk_to_leaf(
256        &self,
257        pages: &HashMap<PageId, Page>,
258        key: &[u8],
259    ) -> Result<(Vec<(PageId, usize)>, PageId)> {
260        let mut path = Vec::with_capacity(self.depth as usize);
261        let mut current = self.root;
262        loop {
263            let page = pages.get(&current).ok_or(Error::PageOutOfBounds(current))?;
264            match page.page_type() {
265                Some(PageType::Leaf) => return Ok((path, current)),
266                Some(PageType::Branch) => {
267                    let child_idx = branch_node::search_child_index(page, key);
268                    let child = branch_node::get_child(page, child_idx);
269                    path.push((current, child_idx));
270                    current = child;
271                }
272                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
273            }
274        }
275    }
276}
277
278/// Copy-on-Write: clone a page to a new page ID, free the old one.
279/// If the page already belongs to this transaction, return it as-is.
280fn cow_page(
281    pages: &mut HashMap<PageId, Page>,
282    alloc: &mut PageAllocator,
283    old_id: PageId,
284    txn_id: TxnId,
285) -> PageId {
286    if pages.get(&old_id).unwrap().txn_id() == txn_id {
287        return old_id;
288    }
289    let new_id = alloc.allocate();
290    let mut new_page = pages.get(&old_id).unwrap().clone();
291    new_page.set_page_id(new_id);
292    new_page.set_txn_id(txn_id);
293    pages.insert(new_id, new_page);
294    alloc.free(old_id);
295    new_id
296}
297
298/// Update a branch's child pointer at `child_idx` to point to `new_child`.
299fn update_branch_child(page: &mut Page, child_idx: usize, new_child: PageId) {
300    let n = page.num_cells() as usize;
301    if child_idx < n {
302        let offset = page.cell_offset(child_idx as u16) as usize;
303        page.data[offset..offset + 4].copy_from_slice(&new_child.as_u32().to_le_bytes());
304    } else {
305        page.set_right_child(new_child);
306    }
307}
308
309/// Propagate CoW up through ancestors (no split, just update child pointers).
310fn propagate_cow_up(
311    pages: &mut HashMap<PageId, Page>,
312    alloc: &mut PageAllocator,
313    txn_id: TxnId,
314    path: &[(PageId, usize)],
315    mut new_child: PageId,
316) -> PageId {
317    for &(ancestor_id, child_idx) in path.iter().rev() {
318        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
319        let page = pages.get_mut(&new_ancestor).unwrap();
320        update_branch_child(page, child_idx, new_child);
321        new_child = new_ancestor;
322    }
323    new_child
324}
325
326/// Split a full leaf and insert the new key-value pair.
327/// Returns (separator_key, right_page_id). The left page is `leaf_id` (rebuilt in place).
328fn split_leaf_with_insert(
329    pages: &mut HashMap<PageId, Page>,
330    alloc: &mut PageAllocator,
331    txn_id: TxnId,
332    leaf_id: PageId,
333    key: &[u8],
334    val_type: ValueType,
335    value: &[u8],
336) -> (Vec<u8>, PageId) {
337    // Collect all existing cells + the new cell, sorted
338    let mut cells: Vec<(Vec<u8>, Vec<u8>)> = {
339        let page = pages.get(&leaf_id).unwrap();
340        let n = page.num_cells() as usize;
341        (0..n)
342            .map(|i| {
343                let cell = leaf_node::read_cell(page, i as u16);
344                let raw = leaf_node::read_cell_bytes(page, i as u16);
345                (cell.key.to_vec(), raw)
346            })
347            .collect()
348    };
349
350    // Insert or update the new cell in sorted position
351    let new_raw = leaf_node::build_cell(key, val_type, value);
352    match cells.binary_search_by(|(k, _)| k.as_slice().cmp(key)) {
353        Ok(idx) => cells[idx] = (key.to_vec(), new_raw),
354        Err(idx) => cells.insert(idx, (key.to_vec(), new_raw)),
355    }
356
357    let total = cells.len();
358
359    // Size-aware split: ensure both halves fit within USABLE_SIZE.
360    // Simple midpoint-by-count fails when cell sizes vary significantly.
361    let usable = citadel_core::constants::USABLE_SIZE;
362    let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
363    cum.push(0);
364    for (_, raw) in &cells {
365        cum.push(cum.last().unwrap() + raw.len());
366    }
367    let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
368    let right_fits = |sp: usize| (cum[total] - cum[sp]) + (total - sp) * 2 <= usable;
369
370    let mut split_point = total / 2;
371    if !left_fits(split_point) || !right_fits(split_point) {
372        split_point = 1;
373        for sp in 1..total {
374            if left_fits(sp) && right_fits(sp) {
375                split_point = sp;
376                if sp >= total / 2 {
377                    break;
378                }
379            }
380        }
381    }
382
383    let sep_key = cells[split_point].0.clone();
384
385    // Rebuild left page with cells [0..split_point]
386    {
387        let left_refs: Vec<&[u8]> = cells[..split_point]
388            .iter()
389            .map(|(_, raw)| raw.as_slice())
390            .collect();
391        let page = pages.get_mut(&leaf_id).unwrap();
392        page.rebuild_cells(&left_refs);
393    }
394
395    // Create right page with cells [split_point..total]
396    let right_id = alloc.allocate();
397    {
398        let mut right_page = Page::new(right_id, PageType::Leaf, txn_id);
399        let right_refs: Vec<&[u8]> = cells[split_point..]
400            .iter()
401            .map(|(_, raw)| raw.as_slice())
402            .collect();
403        right_page.rebuild_cells(&right_refs);
404        pages.insert(right_id, right_page);
405    }
406
407    (sep_key, right_id)
408}
409
410/// Propagate a split upward through the ancestor chain.
411/// Returns the new root page ID.
412#[allow(clippy::too_many_arguments)]
413fn propagate_split_up(
414    pages: &mut HashMap<PageId, Page>,
415    alloc: &mut PageAllocator,
416    txn_id: TxnId,
417    path: &[(PageId, usize)],
418    mut left_child: PageId,
419    initial_sep: &[u8],
420    mut right_child: PageId,
421    depth: &mut u16,
422) -> PageId {
423    let mut sep_key = initial_sep.to_vec();
424    let mut pending_split = true;
425
426    for &(ancestor_id, child_idx) in path.iter().rev() {
427        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
428
429        if pending_split {
430            let ok = {
431                let page = pages.get_mut(&new_ancestor).unwrap();
432                branch_node::insert_separator(page, child_idx, left_child, &sep_key, right_child)
433            };
434
435            if ok {
436                pending_split = false;
437                left_child = new_ancestor;
438            } else {
439                // Branch also full — split it
440                let (new_sep, new_right) = split_branch_with_insert(
441                    pages,
442                    alloc,
443                    txn_id,
444                    new_ancestor,
445                    child_idx,
446                    left_child,
447                    &sep_key,
448                    right_child,
449                );
450                left_child = new_ancestor;
451                sep_key = new_sep;
452                right_child = new_right;
453            }
454        } else {
455            let page = pages.get_mut(&new_ancestor).unwrap();
456            update_branch_child(page, child_idx, left_child);
457            left_child = new_ancestor;
458        }
459    }
460
461    if pending_split {
462        // Create a new root
463        let new_root_id = alloc.allocate();
464        let mut new_root = Page::new(new_root_id, PageType::Branch, txn_id);
465        let cell = branch_node::build_cell(left_child, &sep_key);
466        new_root.write_cell(&cell).unwrap();
467        new_root.set_right_child(right_child);
468        pages.insert(new_root_id, new_root);
469        *depth += 1;
470        new_root_id
471    } else {
472        left_child
473    }
474}
475
476/// Split a full branch and insert a separator.
477/// Returns (promoted_separator_key, right_branch_page_id).
478/// The left branch is `branch_id` (rebuilt in place).
479#[allow(clippy::too_many_arguments)]
480fn split_branch_with_insert(
481    pages: &mut HashMap<PageId, Page>,
482    alloc: &mut PageAllocator,
483    txn_id: TxnId,
484    branch_id: PageId,
485    child_idx: usize,
486    new_left: PageId,
487    sep_key: &[u8],
488    new_right: PageId,
489) -> (Vec<u8>, PageId) {
490    // Collect all cells and apply the separator insertion logically
491    let (new_cells, final_right_child) = {
492        let page = pages.get(&branch_id).unwrap();
493        let n = page.num_cells() as usize;
494        let cells: Vec<(PageId, Vec<u8>)> = (0..n)
495            .map(|i| {
496                let cell = branch_node::read_cell(page, i as u16);
497                (cell.child, cell.key.to_vec())
498            })
499            .collect();
500        let old_rc = page.right_child();
501
502        let mut result = Vec::with_capacity(n + 1);
503        let final_rc;
504
505        if child_idx < n {
506            let old_key = cells[child_idx].1.clone();
507            for (i, (child, key)) in cells.into_iter().enumerate() {
508                if i == child_idx {
509                    result.push((new_left, sep_key.to_vec()));
510                    result.push((new_right, old_key.clone()));
511                } else {
512                    result.push((child, key));
513                }
514            }
515            final_rc = old_rc;
516        } else {
517            result = cells;
518            result.push((new_left, sep_key.to_vec()));
519            final_rc = new_right;
520        }
521
522        (result, final_rc)
523    };
524
525    // Size-aware split — the middle key is promoted.
526    // Left = [0..split_point], promoted = [split_point], right = [split_point+1..total].
527    let total = new_cells.len();
528    let usable = citadel_core::constants::USABLE_SIZE;
529    let raw_sizes: Vec<usize> = new_cells.iter().map(|(_, key)| 6 + key.len()).collect();
530    let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
531    cum.push(0);
532    for &sz in &raw_sizes {
533        cum.push(cum.last().unwrap() + sz);
534    }
535    let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
536    let right_fits = |sp: usize| {
537        let right_count = total - sp - 1;
538        (cum[total] - cum[sp + 1]) + right_count * 2 <= usable
539    };
540
541    let mut split_point = total / 2;
542    if !left_fits(split_point) || !right_fits(split_point) {
543        split_point = 1;
544        for sp in 1..total.saturating_sub(1) {
545            if left_fits(sp) && right_fits(sp) {
546                split_point = sp;
547                if sp >= total / 2 {
548                    break;
549                }
550            }
551        }
552    }
553
554    let promoted_sep = new_cells[split_point].1.clone();
555    let promoted_child = new_cells[split_point].0;
556
557    // Rebuild left branch with cells [0..split_point], right_child = promoted_child
558    {
559        let left_raw: Vec<Vec<u8>> = new_cells[..split_point]
560            .iter()
561            .map(|(child, key)| branch_node::build_cell(*child, key))
562            .collect();
563        let left_refs: Vec<&[u8]> = left_raw.iter().map(|c| c.as_slice()).collect();
564        let page = pages.get_mut(&branch_id).unwrap();
565        page.rebuild_cells(&left_refs);
566        page.set_right_child(promoted_child);
567    }
568
569    // Create right branch with cells [split_point+1..total], right_child = final_right_child
570    let right_branch_id = alloc.allocate();
571    {
572        let mut right_page = Page::new(right_branch_id, PageType::Branch, txn_id);
573        let right_raw: Vec<Vec<u8>> = new_cells[split_point + 1..]
574            .iter()
575            .map(|(child, key)| branch_node::build_cell(*child, key))
576            .collect();
577        let right_refs: Vec<&[u8]> = right_raw.iter().map(|c| c.as_slice()).collect();
578        right_page.rebuild_cells(&right_refs);
579        right_page.set_right_child(final_right_child);
580        pages.insert(right_branch_id, right_page);
581    }
582
583    (promoted_sep, right_branch_id)
584}
585
586/// Remove a child from a branch page at the given child index.
587fn remove_child_from_branch(page: &mut Page, child_idx: usize) {
588    let n = page.num_cells() as usize;
589    if child_idx < n {
590        let cell_sz = branch_node::get_cell_size(page, child_idx as u16);
591        page.delete_cell_at(child_idx as u16, cell_sz);
592    } else {
593        // Removing right_child: promote last cell's child
594        assert!(n > 0, "cannot remove right_child from branch with 0 cells");
595        let last_child = branch_node::read_cell(page, (n - 1) as u16).child;
596        let cell_sz = branch_node::get_cell_size(page, (n - 1) as u16);
597        page.delete_cell_at((n - 1) as u16, cell_sz);
598        page.set_right_child(last_child);
599    }
600}
601
602/// Propagate child removal upward through the ancestor chain.
603/// Handles cascading collapses when branches become empty.
604fn propagate_remove_up(
605    pages: &mut HashMap<PageId, Page>,
606    alloc: &mut PageAllocator,
607    txn_id: TxnId,
608    path: &[(PageId, usize)],
609    depth: &mut u16,
610) -> PageId {
611    // Process the bottom-most ancestor first (parent of the deleted leaf)
612    let mut level = path.len();
613
614    // Track what needs to replace the removed child's slot in the parent above
615    // Initially: the child was removed entirely, so we need to remove it from parent
616    let mut need_remove_at_level = true;
617
618    // Result: the page ID that should be propagated upward
619    let mut new_child = PageId(0); // placeholder, set below
620
621    while level > 0 && need_remove_at_level {
622        level -= 1;
623        let (ancestor_id, child_idx) = path[level];
624        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
625
626        {
627            let page = pages.get_mut(&new_ancestor).unwrap();
628            remove_child_from_branch(page, child_idx);
629        }
630
631        let num_cells = pages.get(&new_ancestor).unwrap().num_cells();
632
633        if num_cells > 0 || level == 0 {
634            if num_cells == 0 && level == 0 {
635                // Root collapsed — replace with its only child
636                let only_child = pages.get(&new_ancestor).unwrap().right_child();
637                alloc.free(new_ancestor);
638                pages.remove(&new_ancestor);
639                *depth -= 1;
640                return only_child;
641            }
642            // Branch is non-empty, or it's the root with cells
643            new_child = new_ancestor;
644            need_remove_at_level = false;
645        } else {
646            // Branch became empty (0 cells) — collapse to its right_child
647            let only_child = pages.get(&new_ancestor).unwrap().right_child();
648            alloc.free(new_ancestor);
649            pages.remove(&new_ancestor);
650            *depth -= 1;
651
652            // The only_child replaces this branch in the grandparent
653            // This is a pointer update (not a removal), so we stop cascading
654            new_child = only_child;
655            need_remove_at_level = false;
656        }
657    }
658
659    // Propagate CoW for remaining path levels above
660    if level > 0 {
661        let remaining_path = &path[..level];
662        new_child = propagate_cow_up(pages, alloc, txn_id, remaining_path, new_child);
663    }
664
665    new_child
666}
667
668#[cfg(test)]
669mod tests {
670    use super::*;
671
672    fn new_tree() -> (HashMap<PageId, Page>, PageAllocator, BTree) {
673        let mut pages = HashMap::new();
674        let mut alloc = PageAllocator::new(0);
675        let tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
676        (pages, alloc, tree)
677    }
678
679    #[test]
680    fn empty_tree_search() {
681        let (pages, _, tree) = new_tree();
682        assert_eq!(tree.search(&pages, b"anything").unwrap(), None);
683    }
684
685    #[test]
686    fn insert_and_search_single() {
687        let (mut pages, mut alloc, mut tree) = new_tree();
688        let is_new = tree
689            .insert(
690                &mut pages,
691                &mut alloc,
692                TxnId(1),
693                b"hello",
694                ValueType::Inline,
695                b"world",
696            )
697            .unwrap();
698        assert!(is_new);
699        assert_eq!(tree.entry_count, 1);
700
701        let result = tree.search(&pages, b"hello").unwrap();
702        assert_eq!(result, Some((ValueType::Inline, b"world".to_vec())));
703    }
704
705    #[test]
706    fn insert_update_existing() {
707        let (mut pages, mut alloc, mut tree) = new_tree();
708        tree.insert(
709            &mut pages,
710            &mut alloc,
711            TxnId(1),
712            b"key",
713            ValueType::Inline,
714            b"v1",
715        )
716        .unwrap();
717        let is_new = tree
718            .insert(
719                &mut pages,
720                &mut alloc,
721                TxnId(1),
722                b"key",
723                ValueType::Inline,
724                b"v2",
725            )
726            .unwrap();
727        assert!(!is_new);
728        assert_eq!(tree.entry_count, 1);
729
730        let result = tree.search(&pages, b"key").unwrap();
731        assert_eq!(result, Some((ValueType::Inline, b"v2".to_vec())));
732    }
733
734    #[test]
735    fn insert_multiple_sorted() {
736        let (mut pages, mut alloc, mut tree) = new_tree();
737        let keys = [b"dog", b"ant", b"cat", b"fox", b"bat", b"eel"];
738        for k in &keys {
739            tree.insert(&mut pages, &mut alloc, TxnId(1), *k, ValueType::Inline, *k)
740                .unwrap();
741        }
742        assert_eq!(tree.entry_count, 6);
743
744        // Verify all keys searchable
745        for k in &keys {
746            let result = tree.search(&pages, *k).unwrap();
747            assert_eq!(result, Some((ValueType::Inline, k.to_vec())));
748        }
749
750        // Verify non-existent key
751        assert_eq!(tree.search(&pages, b"zebra").unwrap(), None);
752    }
753
754    #[test]
755    fn insert_triggers_leaf_split() {
756        let (mut pages, mut alloc, mut tree) = new_tree();
757
758        // Insert enough keys to trigger at least one leaf split.
759        // Each leaf cell: 7 + key_len + value_len bytes + 2 bytes pointer.
760        // With 4-byte keys and 8-byte values: 7 + 4 + 8 = 19 bytes + 2 = 21 bytes per entry.
761        // Page usable space: 8096 bytes. Fits ~385 entries per leaf.
762        // We need > 385 entries to trigger a split.
763        let count = 500;
764        for i in 0..count {
765            let key = format!("key-{i:05}");
766            let val = format!("val-{i:05}");
767            tree.insert(
768                &mut pages,
769                &mut alloc,
770                TxnId(1),
771                key.as_bytes(),
772                ValueType::Inline,
773                val.as_bytes(),
774            )
775            .unwrap();
776        }
777
778        assert_eq!(tree.entry_count, count);
779        assert!(
780            tree.depth >= 2,
781            "tree should have split (depth={})",
782            tree.depth
783        );
784
785        // Verify all keys present
786        for i in 0..count {
787            let key = format!("key-{i:05}");
788            let val = format!("val-{i:05}");
789            let result = tree.search(&pages, key.as_bytes()).unwrap();
790            assert_eq!(result, Some((ValueType::Inline, val.into_bytes())));
791        }
792    }
793
794    #[test]
795    fn delete_existing_key() {
796        let (mut pages, mut alloc, mut tree) = new_tree();
797        tree.insert(
798            &mut pages,
799            &mut alloc,
800            TxnId(1),
801            b"a",
802            ValueType::Inline,
803            b"1",
804        )
805        .unwrap();
806        tree.insert(
807            &mut pages,
808            &mut alloc,
809            TxnId(1),
810            b"b",
811            ValueType::Inline,
812            b"2",
813        )
814        .unwrap();
815        tree.insert(
816            &mut pages,
817            &mut alloc,
818            TxnId(1),
819            b"c",
820            ValueType::Inline,
821            b"3",
822        )
823        .unwrap();
824
825        let found = tree.delete(&mut pages, &mut alloc, TxnId(1), b"b").unwrap();
826        assert!(found);
827        assert_eq!(tree.entry_count, 2);
828        assert_eq!(tree.search(&pages, b"b").unwrap(), None);
829        assert_eq!(
830            tree.search(&pages, b"a").unwrap(),
831            Some((ValueType::Inline, b"1".to_vec()))
832        );
833        assert_eq!(
834            tree.search(&pages, b"c").unwrap(),
835            Some((ValueType::Inline, b"3".to_vec()))
836        );
837    }
838
839    #[test]
840    fn delete_nonexistent_key() {
841        let (mut pages, mut alloc, mut tree) = new_tree();
842        tree.insert(
843            &mut pages,
844            &mut alloc,
845            TxnId(1),
846            b"a",
847            ValueType::Inline,
848            b"1",
849        )
850        .unwrap();
851        let found = tree.delete(&mut pages, &mut alloc, TxnId(1), b"z").unwrap();
852        assert!(!found);
853        assert_eq!(tree.entry_count, 1);
854    }
855
856    #[test]
857    fn delete_all_from_root_leaf() {
858        let (mut pages, mut alloc, mut tree) = new_tree();
859        tree.insert(
860            &mut pages,
861            &mut alloc,
862            TxnId(1),
863            b"x",
864            ValueType::Inline,
865            b"1",
866        )
867        .unwrap();
868        tree.delete(&mut pages, &mut alloc, TxnId(1), b"x").unwrap();
869        assert_eq!(tree.entry_count, 0);
870
871        // Root is still a valid (empty) leaf
872        let root = pages.get(&tree.root).unwrap();
873        assert_eq!(root.page_type(), Some(PageType::Leaf));
874        assert_eq!(root.num_cells(), 0);
875    }
876
877    #[test]
878    fn cow_produces_new_page_ids() {
879        let (mut pages, mut alloc, mut tree) = new_tree();
880        let root_before = tree.root;
881
882        tree.insert(
883            &mut pages,
884            &mut alloc,
885            TxnId(2),
886            b"key",
887            ValueType::Inline,
888            b"val",
889        )
890        .unwrap();
891        let root_after = tree.root;
892
893        // Root should have changed (CoW)
894        assert_ne!(root_before, root_after);
895        // Old root should have been freed via allocator
896        assert!(alloc.freed_this_txn().contains(&root_before));
897    }
898
899    #[test]
900    fn insert_and_delete_many() {
901        let (mut pages, mut alloc, mut tree) = new_tree();
902        let count = 1000u64;
903
904        // Insert
905        for i in 0..count {
906            let key = format!("k{i:06}");
907            let val = format!("v{i:06}");
908            tree.insert(
909                &mut pages,
910                &mut alloc,
911                TxnId(1),
912                key.as_bytes(),
913                ValueType::Inline,
914                val.as_bytes(),
915            )
916            .unwrap();
917        }
918        assert_eq!(tree.entry_count, count);
919
920        // Delete every other key
921        for i in (0..count).step_by(2) {
922            let key = format!("k{i:06}");
923            let found = tree
924                .delete(&mut pages, &mut alloc, TxnId(1), key.as_bytes())
925                .unwrap();
926            assert!(found);
927        }
928        assert_eq!(tree.entry_count, count / 2);
929
930        // Verify remaining keys
931        for i in 0..count {
932            let key = format!("k{i:06}");
933            let result = tree.search(&pages, key.as_bytes()).unwrap();
934            if i % 2 == 0 {
935                assert_eq!(result, None, "deleted key {key} should not be found");
936            } else {
937                let val = format!("v{i:06}");
938                assert_eq!(result, Some((ValueType::Inline, val.into_bytes())));
939            }
940        }
941    }
942
943    #[test]
944    fn deep_tree_insert_delete() {
945        let (mut pages, mut alloc, mut tree) = new_tree();
946
947        // Insert enough to create depth >= 2
948        let count = 2000u64;
949        for i in 0..count {
950            let key = format!("{i:08}");
951            tree.insert(
952                &mut pages,
953                &mut alloc,
954                TxnId(1),
955                key.as_bytes(),
956                ValueType::Inline,
957                b"v",
958            )
959            .unwrap();
960        }
961        assert!(tree.depth >= 2, "depth={} expected >= 2", tree.depth);
962        assert_eq!(tree.entry_count, count);
963
964        // Delete all
965        for i in 0..count {
966            let key = format!("{i:08}");
967            let found = tree
968                .delete(&mut pages, &mut alloc, TxnId(1), key.as_bytes())
969                .unwrap();
970            assert!(found, "key {key} should be deletable");
971        }
972        assert_eq!(tree.entry_count, 0);
973    }
974}