Skip to main content

citadel_buffer/
btree.rs

1//! CoW B+ tree engine. Mutations clone pages; old pages go to pending-free list.
2
3use crate::allocator::PageAllocator;
4use citadel_core::types::{PageId, PageType, TxnId, ValueType};
5use citadel_core::{Error, Result};
6use citadel_page::page::Page;
7use citadel_page::{branch_node, leaf_node};
8use std::collections::HashMap;
9
10/// B+ tree metadata. Lightweight struct - pages are stored externally.
11#[derive(Clone)]
12pub struct BTree {
13    pub root: PageId,
14    pub depth: u16,
15    pub entry_count: u64,
16    last_insert: Option<(Vec<(PageId, usize)>, PageId)>,
17}
18
19impl BTree {
20    /// Create a new empty B+ tree with a single leaf root.
21    pub fn new(
22        pages: &mut HashMap<PageId, Page>,
23        alloc: &mut PageAllocator,
24        txn_id: TxnId,
25    ) -> Self {
26        let root_id = alloc.allocate();
27        let root = Page::new(root_id, PageType::Leaf, txn_id);
28        pages.insert(root_id, root);
29        Self {
30            root: root_id,
31            depth: 1,
32            entry_count: 0,
33            last_insert: None,
34        }
35    }
36
37    /// Create a BTree from existing metadata (e.g., loaded from commit slot).
38    pub fn from_existing(root: PageId, depth: u16, entry_count: u64) -> Self {
39        Self {
40            root,
41            depth,
42            entry_count,
43            last_insert: None,
44        }
45    }
46
47    /// Search for a key. Returns `Some((val_type, value))` if found, `None` otherwise.
48    pub fn search(
49        &self,
50        pages: &HashMap<PageId, Page>,
51        key: &[u8],
52    ) -> Result<Option<(ValueType, Vec<u8>)>> {
53        let mut current = self.root;
54        loop {
55            let page = pages.get(&current).ok_or(Error::PageOutOfBounds(current))?;
56            match page.page_type() {
57                Some(PageType::Leaf) => {
58                    return match leaf_node::search(page, key) {
59                        Ok(idx) => {
60                            let cell = leaf_node::read_cell(page, idx);
61                            Ok(Some((cell.val_type, cell.value.to_vec())))
62                        }
63                        Err(_) => Ok(None),
64                    };
65                }
66                Some(PageType::Branch) => {
67                    let idx = branch_node::search_child_index(page, key);
68                    current = branch_node::get_child(page, idx);
69                }
70                _ => {
71                    return Err(Error::InvalidPageType(page.page_type_raw(), current));
72                }
73            }
74        }
75    }
76
77    pub fn lil_would_hit(&self, pages: &HashMap<PageId, Page>, key: &[u8]) -> bool {
78        if let Some((_, cached_leaf)) = &self.last_insert {
79            if let Some(page) = pages.get(cached_leaf) {
80                let n = page.num_cells();
81                return n > 0 && key > leaf_node::read_cell(page, n - 1).key;
82            }
83        }
84        false
85    }
86
87    /// Insert key-value. Returns `true` if new, `false` if updated existing.
88    pub fn insert(
89        &mut self,
90        pages: &mut HashMap<PageId, Page>,
91        alloc: &mut PageAllocator,
92        txn_id: TxnId,
93        key: &[u8],
94        val_type: ValueType,
95        value: &[u8],
96    ) -> Result<bool> {
97        // LIL cache: skip walk_to_leaf for sequential appends to the rightmost leaf.
98        if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
99            let hit = {
100                let page = pages
101                    .get(&cached_leaf)
102                    .ok_or(Error::PageOutOfBounds(cached_leaf))?;
103                let n = page.num_cells();
104                n > 0 && key > leaf_node::read_cell(page, n - 1).key
105            };
106            if hit {
107                let cow_id = cow_page(pages, alloc, cached_leaf, txn_id);
108                let ok = {
109                    let page = pages.get_mut(&cow_id).unwrap();
110                    leaf_node::insert_direct(page, key, val_type, value)
111                };
112                if ok {
113                    if cow_id != cached_leaf {
114                        self.root =
115                            propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
116                    }
117                    self.entry_count += 1;
118                    self.last_insert = Some((cached_path, cow_id));
119                    return Ok(true);
120                }
121                let (sep_key, right_id) =
122                    split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
123                self.root = propagate_split_up(
124                    pages,
125                    alloc,
126                    txn_id,
127                    &cached_path,
128                    cow_id,
129                    &sep_key,
130                    right_id,
131                    &mut self.depth,
132                );
133                self.last_insert = None;
134                self.entry_count += 1;
135                return Ok(true);
136            }
137        }
138
139        let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
140
141        let key_exists = {
142            let page = pages.get(&leaf_id).unwrap();
143            leaf_node::search(page, key).is_ok()
144        };
145
146        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
147
148        let leaf_ok = {
149            let page = pages.get_mut(&new_leaf_id).unwrap();
150            leaf_node::insert_direct(page, key, val_type, value)
151        };
152
153        if leaf_ok {
154            let mut child = new_leaf_id;
155            let mut is_rightmost = true;
156            let mut new_path = path;
157            for i in (0..new_path.len()).rev() {
158                let (ancestor_id, child_idx) = new_path[i];
159                let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
160                let page = pages.get_mut(&new_ancestor).unwrap();
161                update_branch_child(page, child_idx, child);
162                if child_idx != page.num_cells() as usize {
163                    is_rightmost = false;
164                }
165                new_path[i] = (new_ancestor, child_idx);
166                child = new_ancestor;
167            }
168            self.root = child;
169
170            if is_rightmost {
171                self.last_insert = Some((new_path, new_leaf_id));
172            }
173
174            if !key_exists {
175                self.entry_count += 1;
176            }
177            return Ok(!key_exists);
178        }
179
180        self.last_insert = None;
181        let (sep_key, right_id) =
182            split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
183        self.root = propagate_split_up(
184            pages,
185            alloc,
186            txn_id,
187            &path,
188            new_leaf_id,
189            &sep_key,
190            right_id,
191            &mut self.depth,
192        );
193
194        if !key_exists {
195            self.entry_count += 1;
196        }
197        Ok(!key_exists)
198    }
199
200    /// Bulk-update existing keys (sorted). Single tree walk, in-place when sizes match.
201    pub fn update_sorted(
202        &mut self,
203        pages: &mut HashMap<PageId, Page>,
204        alloc: &mut PageAllocator,
205        txn_id: TxnId,
206        pairs: &[(&[u8], &[u8])],
207    ) -> Result<u64> {
208        if pairs.is_empty() {
209            return Ok(0);
210        }
211        self.last_insert = None;
212
213        let (mut path, mut leaf_id) = self.walk_to_leaf(pages, pairs[0].0)?;
214        let mut cow_leaf = cow_page(pages, alloc, leaf_id, txn_id);
215        if cow_leaf != leaf_id {
216            self.root = propagate_cow_up(pages, alloc, txn_id, &mut path, cow_leaf);
217        }
218
219        let mut count: u64 = 0;
220        let mut hint: u16 = 0;
221
222        for &(key, value) in pairs {
223            // Check if key is past the current leaf's range
224            let past_leaf = {
225                let page = pages.get(&cow_leaf).unwrap();
226                let n = page.num_cells();
227                n == 0 || key > leaf_node::read_cell(page, n - 1).key
228            };
229
230            if past_leaf {
231                let (new_path, new_leaf) = self.walk_to_leaf(pages, key)?;
232                path = new_path;
233                leaf_id = new_leaf;
234                cow_leaf = cow_page(pages, alloc, leaf_id, txn_id);
235                if cow_leaf != leaf_id {
236                    self.root = propagate_cow_up(pages, alloc, txn_id, &mut path, cow_leaf);
237                }
238                hint = 0;
239            }
240
241            // Search from hint position (keys are sorted → position increases)
242            let page = pages.get(&cow_leaf).unwrap();
243            let n = page.num_cells();
244            let idx = {
245                let mut i = hint;
246                loop {
247                    if i >= n {
248                        break None;
249                    }
250                    let cell = leaf_node::read_cell(page, i);
251                    match key.cmp(cell.key) {
252                        std::cmp::Ordering::Equal => break Some(i),
253                        std::cmp::Ordering::Less => break None,
254                        std::cmp::Ordering::Greater => i += 1,
255                    }
256                }
257            };
258
259            if let Some(idx) = idx {
260                hint = idx + 1;
261                // Try in-place value overwrite (same size = no cell movement)
262                let page = pages.get_mut(&cow_leaf).unwrap();
263                if !leaf_node::update_value_in_place(page, idx, ValueType::Inline, value) {
264                    // Different size: fall back to delete + insert
265                    leaf_node::insert_direct(page, key, ValueType::Inline, value);
266                }
267                count += 1;
268            }
269        }
270
271        Ok(count)
272    }
273
274    /// Delete a key. Returns `true` if the key was found and deleted.
275    pub fn delete(
276        &mut self,
277        pages: &mut HashMap<PageId, Page>,
278        alloc: &mut PageAllocator,
279        txn_id: TxnId,
280        key: &[u8],
281    ) -> Result<bool> {
282        self.last_insert = None;
283        let (mut path, leaf_id) = self.walk_to_leaf(pages, key)?;
284
285        let found = {
286            let page = pages.get(&leaf_id).unwrap();
287            leaf_node::search(page, key).is_ok()
288        };
289        if !found {
290            return Ok(false);
291        }
292
293        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
294        {
295            let page = pages.get_mut(&new_leaf_id).unwrap();
296            leaf_node::delete(page, key);
297        }
298
299        let leaf_empty = pages.get(&new_leaf_id).unwrap().num_cells() == 0;
300
301        if !leaf_empty || path.is_empty() {
302            self.root = propagate_cow_up(pages, alloc, txn_id, &mut path, new_leaf_id);
303            self.entry_count -= 1;
304            return Ok(true);
305        }
306
307        alloc.free(new_leaf_id);
308        pages.remove(&new_leaf_id);
309
310        self.root = propagate_remove_up(pages, alloc, txn_id, &mut path, &mut self.depth);
311        self.entry_count -= 1;
312        Ok(true)
313    }
314
315    /// Walk root to leaf for `key`. Returns (path, leaf_page_id).
316    pub fn walk_to_leaf(
317        &self,
318        pages: &HashMap<PageId, Page>,
319        key: &[u8],
320    ) -> Result<(Vec<(PageId, usize)>, PageId)> {
321        let mut path = Vec::with_capacity(self.depth as usize);
322        let mut current = self.root;
323        loop {
324            let page = pages.get(&current).ok_or(Error::PageOutOfBounds(current))?;
325            match page.page_type() {
326                Some(PageType::Leaf) => return Ok((path, current)),
327                Some(PageType::Branch) => {
328                    let child_idx = branch_node::search_child_index(page, key);
329                    let child = branch_node::get_child(page, child_idx);
330                    path.push((current, child_idx));
331                    current = child;
332                }
333                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
334            }
335        }
336    }
337}
338
339/// CoW a page. No-op if already owned by this txn. In-place mode reuses page ID.
340pub fn cow_page(
341    pages: &mut HashMap<PageId, Page>,
342    alloc: &mut PageAllocator,
343    old_id: PageId,
344    txn_id: TxnId,
345) -> PageId {
346    if pages.get(&old_id).unwrap().txn_id() == txn_id {
347        return old_id;
348    }
349    if alloc.in_place() {
350        let page = pages.get_mut(&old_id).unwrap();
351        page.set_txn_id(txn_id);
352        return old_id;
353    }
354    let new_id = alloc.allocate();
355    let mut new_page = pages.get(&old_id).unwrap().clone();
356    new_page.set_page_id(new_id);
357    new_page.set_txn_id(txn_id);
358    pages.insert(new_id, new_page);
359    alloc.free(old_id);
360    new_id
361}
362
363/// Update a branch's child pointer at `child_idx` to point to `new_child`.
364fn update_branch_child(page: &mut Page, child_idx: usize, new_child: PageId) {
365    let n = page.num_cells() as usize;
366    if child_idx < n {
367        let offset = page.cell_offset(child_idx as u16) as usize;
368        page.data[offset..offset + 4].copy_from_slice(&new_child.as_u32().to_le_bytes());
369    } else {
370        page.set_right_child(new_child);
371    }
372}
373
374/// Propagate CoW up through ancestors. Updates `path` in place so callers
375/// caching the path (e.g. LIL) reuse current PageIds after CoW — critical
376/// across SAVEPOINT boundaries where txn_id changes invalidate the cache.
377pub fn propagate_cow_up(
378    pages: &mut HashMap<PageId, Page>,
379    alloc: &mut PageAllocator,
380    txn_id: TxnId,
381    path: &mut [(PageId, usize)],
382    mut new_child: PageId,
383) -> PageId {
384    for i in (0..path.len()).rev() {
385        let (ancestor_id, child_idx) = path[i];
386        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
387        let page = pages.get_mut(&new_ancestor).unwrap();
388        update_branch_child(page, child_idx, new_child);
389        path[i] = (new_ancestor, child_idx);
390        new_child = new_ancestor;
391    }
392    new_child
393}
394
395/// Split full leaf and insert. Returns (separator_key, right_page_id).
396fn split_leaf_with_insert(
397    pages: &mut HashMap<PageId, Page>,
398    alloc: &mut PageAllocator,
399    txn_id: TxnId,
400    leaf_id: PageId,
401    key: &[u8],
402    val_type: ValueType,
403    value: &[u8],
404) -> (Vec<u8>, PageId) {
405    let mut cells: Vec<(Vec<u8>, Vec<u8>)> = {
406        let page = pages.get(&leaf_id).unwrap();
407        let n = page.num_cells() as usize;
408        (0..n)
409            .map(|i| {
410                let cell = leaf_node::read_cell(page, i as u16);
411                let raw = leaf_node::read_cell_bytes(page, i as u16);
412                (cell.key.to_vec(), raw)
413            })
414            .collect()
415    };
416
417    let new_raw = leaf_node::build_cell(key, val_type, value);
418    match cells.binary_search_by(|(k, _)| k.as_slice().cmp(key)) {
419        Ok(idx) => cells[idx] = (key.to_vec(), new_raw),
420        Err(idx) => cells.insert(idx, (key.to_vec(), new_raw)),
421    }
422
423    let total = cells.len();
424
425    let usable = citadel_core::constants::USABLE_SIZE;
426    let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
427    cum.push(0);
428    for (_, raw) in &cells {
429        cum.push(cum.last().unwrap() + raw.len());
430    }
431    let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
432    let right_fits = |sp: usize| (cum[total] - cum[sp]) + (total - sp) * 2 <= usable;
433
434    let mut split_point = total / 2;
435    if !left_fits(split_point) || !right_fits(split_point) {
436        split_point = 1;
437        for sp in 1..total {
438            if left_fits(sp) && right_fits(sp) {
439                split_point = sp;
440                if sp >= total / 2 {
441                    break;
442                }
443            }
444        }
445    }
446
447    let sep_key = cells[split_point].0.clone();
448
449    {
450        let left_refs: Vec<&[u8]> = cells[..split_point]
451            .iter()
452            .map(|(_, raw)| raw.as_slice())
453            .collect();
454        let page = pages.get_mut(&leaf_id).unwrap();
455        page.rebuild_cells(&left_refs);
456    }
457
458    let right_id = alloc.allocate();
459    {
460        let mut right_page = Page::new(right_id, PageType::Leaf, txn_id);
461        let right_refs: Vec<&[u8]> = cells[split_point..]
462            .iter()
463            .map(|(_, raw)| raw.as_slice())
464            .collect();
465        right_page.rebuild_cells(&right_refs);
466        pages.insert(right_id, right_page);
467    }
468
469    (sep_key, right_id)
470}
471
472#[allow(clippy::too_many_arguments)]
473fn propagate_split_up(
474    pages: &mut HashMap<PageId, Page>,
475    alloc: &mut PageAllocator,
476    txn_id: TxnId,
477    path: &[(PageId, usize)],
478    mut left_child: PageId,
479    initial_sep: &[u8],
480    mut right_child: PageId,
481    depth: &mut u16,
482) -> PageId {
483    let mut sep_key = initial_sep.to_vec();
484    let mut pending_split = true;
485
486    for &(ancestor_id, child_idx) in path.iter().rev() {
487        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
488
489        if pending_split {
490            let ok = {
491                let page = pages.get_mut(&new_ancestor).unwrap();
492                branch_node::insert_separator(page, child_idx, left_child, &sep_key, right_child)
493            };
494
495            if ok {
496                pending_split = false;
497                left_child = new_ancestor;
498            } else {
499                let (new_sep, new_right) = split_branch_with_insert(
500                    pages,
501                    alloc,
502                    txn_id,
503                    new_ancestor,
504                    child_idx,
505                    left_child,
506                    &sep_key,
507                    right_child,
508                );
509                left_child = new_ancestor;
510                sep_key = new_sep;
511                right_child = new_right;
512            }
513        } else {
514            let page = pages.get_mut(&new_ancestor).unwrap();
515            update_branch_child(page, child_idx, left_child);
516            left_child = new_ancestor;
517        }
518    }
519
520    if pending_split {
521        let new_root_id = alloc.allocate();
522        let mut new_root = Page::new(new_root_id, PageType::Branch, txn_id);
523        let cell = branch_node::build_cell(left_child, &sep_key);
524        new_root.write_cell(&cell).unwrap();
525        new_root.set_right_child(right_child);
526        pages.insert(new_root_id, new_root);
527        *depth += 1;
528        new_root_id
529    } else {
530        left_child
531    }
532}
533
534#[allow(clippy::too_many_arguments)]
535fn split_branch_with_insert(
536    pages: &mut HashMap<PageId, Page>,
537    alloc: &mut PageAllocator,
538    txn_id: TxnId,
539    branch_id: PageId,
540    child_idx: usize,
541    new_left: PageId,
542    sep_key: &[u8],
543    new_right: PageId,
544) -> (Vec<u8>, PageId) {
545    // Collect all cells and apply the separator insertion logically
546    let (new_cells, final_right_child) = {
547        let page = pages.get(&branch_id).unwrap();
548        let n = page.num_cells() as usize;
549        let cells: Vec<(PageId, Vec<u8>)> = (0..n)
550            .map(|i| {
551                let cell = branch_node::read_cell(page, i as u16);
552                (cell.child, cell.key.to_vec())
553            })
554            .collect();
555        let old_rc = page.right_child();
556
557        let mut result = Vec::with_capacity(n + 1);
558        let final_rc;
559
560        if child_idx < n {
561            let old_key = cells[child_idx].1.clone();
562            for (i, (child, key)) in cells.into_iter().enumerate() {
563                if i == child_idx {
564                    result.push((new_left, sep_key.to_vec()));
565                    result.push((new_right, old_key.clone()));
566                } else {
567                    result.push((child, key));
568                }
569            }
570            final_rc = old_rc;
571        } else {
572            result = cells;
573            result.push((new_left, sep_key.to_vec()));
574            final_rc = new_right;
575        }
576
577        (result, final_rc)
578    };
579
580    let total = new_cells.len();
581    let usable = citadel_core::constants::USABLE_SIZE;
582    let raw_sizes: Vec<usize> = new_cells.iter().map(|(_, key)| 6 + key.len()).collect();
583    let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
584    cum.push(0);
585    for &sz in &raw_sizes {
586        cum.push(cum.last().unwrap() + sz);
587    }
588    let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
589    let right_fits = |sp: usize| {
590        let right_count = total - sp - 1;
591        (cum[total] - cum[sp + 1]) + right_count * 2 <= usable
592    };
593
594    let mut split_point = total / 2;
595    if !left_fits(split_point) || !right_fits(split_point) {
596        split_point = 1;
597        for sp in 1..total.saturating_sub(1) {
598            if left_fits(sp) && right_fits(sp) {
599                split_point = sp;
600                if sp >= total / 2 {
601                    break;
602                }
603            }
604        }
605    }
606
607    let promoted_sep = new_cells[split_point].1.clone();
608    let promoted_child = new_cells[split_point].0;
609
610    {
611        let left_raw: Vec<Vec<u8>> = new_cells[..split_point]
612            .iter()
613            .map(|(child, key)| branch_node::build_cell(*child, key))
614            .collect();
615        let left_refs: Vec<&[u8]> = left_raw.iter().map(|c| c.as_slice()).collect();
616        let page = pages.get_mut(&branch_id).unwrap();
617        page.rebuild_cells(&left_refs);
618        page.set_right_child(promoted_child);
619    }
620
621    let right_branch_id = alloc.allocate();
622    {
623        let mut right_page = Page::new(right_branch_id, PageType::Branch, txn_id);
624        let right_raw: Vec<Vec<u8>> = new_cells[split_point + 1..]
625            .iter()
626            .map(|(child, key)| branch_node::build_cell(*child, key))
627            .collect();
628        let right_refs: Vec<&[u8]> = right_raw.iter().map(|c| c.as_slice()).collect();
629        right_page.rebuild_cells(&right_refs);
630        right_page.set_right_child(final_right_child);
631        pages.insert(right_branch_id, right_page);
632    }
633
634    (promoted_sep, right_branch_id)
635}
636
637fn remove_child_from_branch(page: &mut Page, child_idx: usize) {
638    let n = page.num_cells() as usize;
639    if child_idx < n {
640        let cell_sz = branch_node::get_cell_size(page, child_idx as u16);
641        page.delete_cell_at(child_idx as u16, cell_sz);
642    } else {
643        assert!(n > 0, "cannot remove right_child from branch with 0 cells");
644        let last_child = branch_node::read_cell(page, (n - 1) as u16).child;
645        let cell_sz = branch_node::get_cell_size(page, (n - 1) as u16);
646        page.delete_cell_at((n - 1) as u16, cell_sz);
647        page.set_right_child(last_child);
648    }
649}
650
651fn propagate_remove_up(
652    pages: &mut HashMap<PageId, Page>,
653    alloc: &mut PageAllocator,
654    txn_id: TxnId,
655    path: &mut [(PageId, usize)],
656    depth: &mut u16,
657) -> PageId {
658    // Process the bottom-most ancestor first (parent of the deleted leaf)
659    let mut level = path.len();
660
661    // Track whether parent needs child removal vs pointer update
662    let mut need_remove_at_level = true;
663
664    // Result: the page ID that should be propagated upward
665    let mut new_child = PageId(0); // placeholder, set below
666
667    while level > 0 && need_remove_at_level {
668        level -= 1;
669        let (ancestor_id, child_idx) = path[level];
670        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
671
672        {
673            let page = pages.get_mut(&new_ancestor).unwrap();
674            remove_child_from_branch(page, child_idx);
675        }
676
677        let num_cells = pages.get(&new_ancestor).unwrap().num_cells();
678
679        if num_cells > 0 || level == 0 {
680            if num_cells == 0 && level == 0 {
681                // Root collapsed - replace with its only child
682                let only_child = pages.get(&new_ancestor).unwrap().right_child();
683                alloc.free(new_ancestor);
684                pages.remove(&new_ancestor);
685                *depth -= 1;
686                return only_child;
687            }
688            // Branch is non-empty, or it's the root with cells
689            new_child = new_ancestor;
690            need_remove_at_level = false;
691        } else {
692            // Branch became empty (0 cells) - collapse to its right_child
693            let only_child = pages.get(&new_ancestor).unwrap().right_child();
694            alloc.free(new_ancestor);
695            pages.remove(&new_ancestor);
696            *depth -= 1;
697
698            // Replace this branch with only_child in grandparent (stop cascading)
699            new_child = only_child;
700            need_remove_at_level = false;
701        }
702    }
703
704    // Propagate CoW for remaining path levels above
705    if level > 0 {
706        let remaining_path = &mut path[..level];
707        new_child = propagate_cow_up(pages, alloc, txn_id, remaining_path, new_child);
708    }
709
710    new_child
711}
712
713#[cfg(test)]
714mod tests {
715    use super::*;
716
717    fn new_tree() -> (HashMap<PageId, Page>, PageAllocator, BTree) {
718        let mut pages = HashMap::new();
719        let mut alloc = PageAllocator::new(0);
720        let tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
721        (pages, alloc, tree)
722    }
723
724    #[test]
725    fn empty_tree_search() {
726        let (pages, _, tree) = new_tree();
727        assert_eq!(tree.search(&pages, b"anything").unwrap(), None);
728    }
729
730    #[test]
731    fn insert_and_search_single() {
732        let (mut pages, mut alloc, mut tree) = new_tree();
733        let is_new = tree
734            .insert(
735                &mut pages,
736                &mut alloc,
737                TxnId(1),
738                b"hello",
739                ValueType::Inline,
740                b"world",
741            )
742            .unwrap();
743        assert!(is_new);
744        assert_eq!(tree.entry_count, 1);
745
746        let result = tree.search(&pages, b"hello").unwrap();
747        assert_eq!(result, Some((ValueType::Inline, b"world".to_vec())));
748    }
749
750    #[test]
751    fn insert_update_existing() {
752        let (mut pages, mut alloc, mut tree) = new_tree();
753        tree.insert(
754            &mut pages,
755            &mut alloc,
756            TxnId(1),
757            b"key",
758            ValueType::Inline,
759            b"v1",
760        )
761        .unwrap();
762        let is_new = tree
763            .insert(
764                &mut pages,
765                &mut alloc,
766                TxnId(1),
767                b"key",
768                ValueType::Inline,
769                b"v2",
770            )
771            .unwrap();
772        assert!(!is_new);
773        assert_eq!(tree.entry_count, 1);
774
775        let result = tree.search(&pages, b"key").unwrap();
776        assert_eq!(result, Some((ValueType::Inline, b"v2".to_vec())));
777    }
778
779    #[test]
780    fn insert_multiple_sorted() {
781        let (mut pages, mut alloc, mut tree) = new_tree();
782        let keys = [b"dog", b"ant", b"cat", b"fox", b"bat", b"eel"];
783        for k in &keys {
784            tree.insert(&mut pages, &mut alloc, TxnId(1), *k, ValueType::Inline, *k)
785                .unwrap();
786        }
787        assert_eq!(tree.entry_count, 6);
788
789        // Verify all keys searchable
790        for k in &keys {
791            let result = tree.search(&pages, *k).unwrap();
792            assert_eq!(result, Some((ValueType::Inline, k.to_vec())));
793        }
794
795        // Verify non-existent key
796        assert_eq!(tree.search(&pages, b"zebra").unwrap(), None);
797    }
798
799    #[test]
800    fn insert_triggers_leaf_split() {
801        let (mut pages, mut alloc, mut tree) = new_tree();
802
803        // 500 entries > ~385 per leaf → triggers split
804        let count = 500;
805        for i in 0..count {
806            let key = format!("key-{i:05}");
807            let val = format!("val-{i:05}");
808            tree.insert(
809                &mut pages,
810                &mut alloc,
811                TxnId(1),
812                key.as_bytes(),
813                ValueType::Inline,
814                val.as_bytes(),
815            )
816            .unwrap();
817        }
818
819        assert_eq!(tree.entry_count, count);
820        assert!(
821            tree.depth >= 2,
822            "tree should have split (depth={})",
823            tree.depth
824        );
825
826        // Verify all keys present
827        for i in 0..count {
828            let key = format!("key-{i:05}");
829            let val = format!("val-{i:05}");
830            let result = tree.search(&pages, key.as_bytes()).unwrap();
831            assert_eq!(result, Some((ValueType::Inline, val.into_bytes())));
832        }
833    }
834
835    #[test]
836    fn delete_existing_key() {
837        let (mut pages, mut alloc, mut tree) = new_tree();
838        tree.insert(
839            &mut pages,
840            &mut alloc,
841            TxnId(1),
842            b"a",
843            ValueType::Inline,
844            b"1",
845        )
846        .unwrap();
847        tree.insert(
848            &mut pages,
849            &mut alloc,
850            TxnId(1),
851            b"b",
852            ValueType::Inline,
853            b"2",
854        )
855        .unwrap();
856        tree.insert(
857            &mut pages,
858            &mut alloc,
859            TxnId(1),
860            b"c",
861            ValueType::Inline,
862            b"3",
863        )
864        .unwrap();
865
866        let found = tree.delete(&mut pages, &mut alloc, TxnId(1), b"b").unwrap();
867        assert!(found);
868        assert_eq!(tree.entry_count, 2);
869        assert_eq!(tree.search(&pages, b"b").unwrap(), None);
870        assert_eq!(
871            tree.search(&pages, b"a").unwrap(),
872            Some((ValueType::Inline, b"1".to_vec()))
873        );
874        assert_eq!(
875            tree.search(&pages, b"c").unwrap(),
876            Some((ValueType::Inline, b"3".to_vec()))
877        );
878    }
879
880    #[test]
881    fn delete_nonexistent_key() {
882        let (mut pages, mut alloc, mut tree) = new_tree();
883        tree.insert(
884            &mut pages,
885            &mut alloc,
886            TxnId(1),
887            b"a",
888            ValueType::Inline,
889            b"1",
890        )
891        .unwrap();
892        let found = tree.delete(&mut pages, &mut alloc, TxnId(1), b"z").unwrap();
893        assert!(!found);
894        assert_eq!(tree.entry_count, 1);
895    }
896
897    #[test]
898    fn delete_all_from_root_leaf() {
899        let (mut pages, mut alloc, mut tree) = new_tree();
900        tree.insert(
901            &mut pages,
902            &mut alloc,
903            TxnId(1),
904            b"x",
905            ValueType::Inline,
906            b"1",
907        )
908        .unwrap();
909        tree.delete(&mut pages, &mut alloc, TxnId(1), b"x").unwrap();
910        assert_eq!(tree.entry_count, 0);
911
912        // Root is still a valid (empty) leaf
913        let root = pages.get(&tree.root).unwrap();
914        assert_eq!(root.page_type(), Some(PageType::Leaf));
915        assert_eq!(root.num_cells(), 0);
916    }
917
918    #[test]
919    fn cow_produces_new_page_ids() {
920        let (mut pages, mut alloc, mut tree) = new_tree();
921        let root_before = tree.root;
922
923        tree.insert(
924            &mut pages,
925            &mut alloc,
926            TxnId(2),
927            b"key",
928            ValueType::Inline,
929            b"val",
930        )
931        .unwrap();
932        let root_after = tree.root;
933
934        // Root should have changed (CoW)
935        assert_ne!(root_before, root_after);
936        // Old root should have been freed via allocator
937        assert!(alloc.freed_this_txn().contains(&root_before));
938    }
939
940    #[test]
941    fn insert_and_delete_many() {
942        let (mut pages, mut alloc, mut tree) = new_tree();
943        let count = 1000u64;
944
945        // Insert
946        for i in 0..count {
947            let key = format!("k{i:06}");
948            let val = format!("v{i:06}");
949            tree.insert(
950                &mut pages,
951                &mut alloc,
952                TxnId(1),
953                key.as_bytes(),
954                ValueType::Inline,
955                val.as_bytes(),
956            )
957            .unwrap();
958        }
959        assert_eq!(tree.entry_count, count);
960
961        // Delete every other key
962        for i in (0..count).step_by(2) {
963            let key = format!("k{i:06}");
964            let found = tree
965                .delete(&mut pages, &mut alloc, TxnId(1), key.as_bytes())
966                .unwrap();
967            assert!(found);
968        }
969        assert_eq!(tree.entry_count, count / 2);
970
971        // Verify remaining keys
972        for i in 0..count {
973            let key = format!("k{i:06}");
974            let result = tree.search(&pages, key.as_bytes()).unwrap();
975            if i % 2 == 0 {
976                assert_eq!(result, None, "deleted key {key} should not be found");
977            } else {
978                let val = format!("v{i:06}");
979                assert_eq!(result, Some((ValueType::Inline, val.into_bytes())));
980            }
981        }
982    }
983
984    #[test]
985    fn deep_tree_insert_delete() {
986        let (mut pages, mut alloc, mut tree) = new_tree();
987
988        // Insert enough to create depth >= 2
989        let count = 2000u64;
990        for i in 0..count {
991            let key = format!("{i:08}");
992            tree.insert(
993                &mut pages,
994                &mut alloc,
995                TxnId(1),
996                key.as_bytes(),
997                ValueType::Inline,
998                b"v",
999            )
1000            .unwrap();
1001        }
1002        assert!(tree.depth >= 2, "depth={} expected >= 2", tree.depth);
1003        assert_eq!(tree.entry_count, count);
1004
1005        // Delete all
1006        for i in 0..count {
1007            let key = format!("{i:08}");
1008            let found = tree
1009                .delete(&mut pages, &mut alloc, TxnId(1), key.as_bytes())
1010                .unwrap();
1011            assert!(found, "key {key} should be deletable");
1012        }
1013        assert_eq!(tree.entry_count, 0);
1014    }
1015}