Skip to main content

citadel_buffer/
btree.rs

1//! CoW B+ tree engine. Mutations clone pages; old pages go to pending-free list.
2
3use crate::allocator::PageAllocator;
4use citadel_core::types::{PageId, PageType, TxnId, ValueType};
5use citadel_core::{Error, Result};
6use citadel_page::page::Page;
7use citadel_page::{branch_node, leaf_node};
8use std::collections::HashMap;
9
10/// B+ tree metadata. Lightweight struct - pages are stored externally.
11#[derive(Clone)]
12pub struct BTree {
13    pub root: PageId,
14    pub depth: u16,
15    pub entry_count: u64,
16    last_insert: Option<(Vec<(PageId, usize)>, PageId)>,
17}
18
19impl BTree {
20    /// Create a new empty B+ tree with a single leaf root.
21    pub fn new(
22        pages: &mut HashMap<PageId, Page>,
23        alloc: &mut PageAllocator,
24        txn_id: TxnId,
25    ) -> Self {
26        let root_id = alloc.allocate();
27        let root = Page::new(root_id, PageType::Leaf, txn_id);
28        pages.insert(root_id, root);
29        Self {
30            root: root_id,
31            depth: 1,
32            entry_count: 0,
33            last_insert: None,
34        }
35    }
36
37    /// Create a BTree from existing metadata (e.g., loaded from commit slot).
38    pub fn from_existing(root: PageId, depth: u16, entry_count: u64) -> Self {
39        Self {
40            root,
41            depth,
42            entry_count,
43            last_insert: None,
44        }
45    }
46
47    /// Search for a key. Returns `Some((val_type, value))` if found, `None` otherwise.
48    pub fn search(
49        &self,
50        pages: &HashMap<PageId, Page>,
51        key: &[u8],
52    ) -> Result<Option<(ValueType, Vec<u8>)>> {
53        let mut current = self.root;
54        loop {
55            let page = pages.get(&current).ok_or(Error::PageOutOfBounds(current))?;
56            match page.page_type() {
57                Some(PageType::Leaf) => {
58                    return match leaf_node::search(page, key) {
59                        Ok(idx) => {
60                            let cell = leaf_node::read_cell(page, idx);
61                            Ok(Some((cell.val_type, cell.value.to_vec())))
62                        }
63                        Err(_) => Ok(None),
64                    };
65                }
66                Some(PageType::Branch) => {
67                    let idx = branch_node::search_child_index(page, key);
68                    current = branch_node::get_child(page, idx);
69                }
70                _ => {
71                    return Err(Error::InvalidPageType(page.page_type_raw(), current));
72                }
73            }
74        }
75    }
76
77    pub fn lil_would_hit(&self, pages: &HashMap<PageId, Page>, key: &[u8]) -> bool {
78        if let Some((_, cached_leaf)) = &self.last_insert {
79            if let Some(page) = pages.get(cached_leaf) {
80                let n = page.num_cells();
81                return n > 0 && key > leaf_node::read_cell(page, n - 1).key;
82            }
83        }
84        false
85    }
86
87    /// Insert key-value. Returns `true` if new, `false` if updated existing.
88    pub fn insert(
89        &mut self,
90        pages: &mut HashMap<PageId, Page>,
91        alloc: &mut PageAllocator,
92        txn_id: TxnId,
93        key: &[u8],
94        val_type: ValueType,
95        value: &[u8],
96    ) -> Result<bool> {
97        // LIL cache: skip walk_to_leaf for sequential appends to the rightmost leaf.
98        if let Some((cached_path, cached_leaf)) = self.last_insert.take() {
99            let hit = {
100                let page = pages
101                    .get(&cached_leaf)
102                    .ok_or(Error::PageOutOfBounds(cached_leaf))?;
103                let n = page.num_cells();
104                n > 0 && key > leaf_node::read_cell(page, n - 1).key
105            };
106            if hit {
107                let cow_id = cow_page(pages, alloc, cached_leaf, txn_id);
108                let ok = {
109                    let page = pages.get_mut(&cow_id).unwrap();
110                    leaf_node::insert_direct(page, key, val_type, value)
111                };
112                if ok {
113                    if cow_id != cached_leaf {
114                        self.root = propagate_cow_up(pages, alloc, txn_id, &cached_path, cow_id);
115                    }
116                    self.entry_count += 1;
117                    self.last_insert = Some((cached_path, cow_id));
118                    return Ok(true);
119                }
120                let (sep_key, right_id) =
121                    split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
122                self.root = propagate_split_up(
123                    pages,
124                    alloc,
125                    txn_id,
126                    &cached_path,
127                    cow_id,
128                    &sep_key,
129                    right_id,
130                    &mut self.depth,
131                );
132                self.last_insert = None;
133                self.entry_count += 1;
134                return Ok(true);
135            }
136        }
137
138        let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
139
140        let key_exists = {
141            let page = pages.get(&leaf_id).unwrap();
142            leaf_node::search(page, key).is_ok()
143        };
144
145        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
146
147        let leaf_ok = {
148            let page = pages.get_mut(&new_leaf_id).unwrap();
149            leaf_node::insert_direct(page, key, val_type, value)
150        };
151
152        if leaf_ok {
153            let mut child = new_leaf_id;
154            let mut is_rightmost = true;
155            let mut new_path = path;
156            for i in (0..new_path.len()).rev() {
157                let (ancestor_id, child_idx) = new_path[i];
158                let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
159                let page = pages.get_mut(&new_ancestor).unwrap();
160                update_branch_child(page, child_idx, child);
161                if child_idx != page.num_cells() as usize {
162                    is_rightmost = false;
163                }
164                new_path[i] = (new_ancestor, child_idx);
165                child = new_ancestor;
166            }
167            self.root = child;
168
169            if is_rightmost {
170                self.last_insert = Some((new_path, new_leaf_id));
171            }
172
173            if !key_exists {
174                self.entry_count += 1;
175            }
176            return Ok(!key_exists);
177        }
178
179        self.last_insert = None;
180        let (sep_key, right_id) =
181            split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
182        self.root = propagate_split_up(
183            pages,
184            alloc,
185            txn_id,
186            &path,
187            new_leaf_id,
188            &sep_key,
189            right_id,
190            &mut self.depth,
191        );
192
193        if !key_exists {
194            self.entry_count += 1;
195        }
196        Ok(!key_exists)
197    }
198
199    /// Bulk-update existing keys (sorted). Single tree walk, in-place when sizes match.
200    pub fn update_sorted(
201        &mut self,
202        pages: &mut HashMap<PageId, Page>,
203        alloc: &mut PageAllocator,
204        txn_id: TxnId,
205        pairs: &[(&[u8], &[u8])],
206    ) -> Result<u64> {
207        if pairs.is_empty() {
208            return Ok(0);
209        }
210        self.last_insert = None;
211
212        let (mut path, mut leaf_id) = self.walk_to_leaf(pages, pairs[0].0)?;
213        let mut cow_leaf = cow_page(pages, alloc, leaf_id, txn_id);
214        if cow_leaf != leaf_id {
215            self.root = propagate_cow_up(pages, alloc, txn_id, &path, cow_leaf);
216        }
217
218        let mut count: u64 = 0;
219        let mut hint: u16 = 0;
220
221        for &(key, value) in pairs {
222            // Check if key is past the current leaf's range
223            let past_leaf = {
224                let page = pages.get(&cow_leaf).unwrap();
225                let n = page.num_cells();
226                n == 0 || key > leaf_node::read_cell(page, n - 1).key
227            };
228
229            if past_leaf {
230                let (new_path, new_leaf) = self.walk_to_leaf(pages, key)?;
231                path = new_path;
232                leaf_id = new_leaf;
233                cow_leaf = cow_page(pages, alloc, leaf_id, txn_id);
234                if cow_leaf != leaf_id {
235                    self.root = propagate_cow_up(pages, alloc, txn_id, &path, cow_leaf);
236                }
237                hint = 0;
238            }
239
240            // Search from hint position (keys are sorted → position increases)
241            let page = pages.get(&cow_leaf).unwrap();
242            let n = page.num_cells();
243            let idx = {
244                let mut i = hint;
245                loop {
246                    if i >= n {
247                        break None;
248                    }
249                    let cell = leaf_node::read_cell(page, i);
250                    match key.cmp(cell.key) {
251                        std::cmp::Ordering::Equal => break Some(i),
252                        std::cmp::Ordering::Less => break None,
253                        std::cmp::Ordering::Greater => i += 1,
254                    }
255                }
256            };
257
258            if let Some(idx) = idx {
259                hint = idx + 1;
260                // Try in-place value overwrite (same size = no cell movement)
261                let page = pages.get_mut(&cow_leaf).unwrap();
262                if !leaf_node::update_value_in_place(page, idx, ValueType::Inline, value) {
263                    // Different size: fall back to delete + insert
264                    leaf_node::insert_direct(page, key, ValueType::Inline, value);
265                }
266                count += 1;
267            }
268        }
269
270        Ok(count)
271    }
272
273    /// Delete a key. Returns `true` if the key was found and deleted.
274    pub fn delete(
275        &mut self,
276        pages: &mut HashMap<PageId, Page>,
277        alloc: &mut PageAllocator,
278        txn_id: TxnId,
279        key: &[u8],
280    ) -> Result<bool> {
281        self.last_insert = None;
282        let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
283
284        let found = {
285            let page = pages.get(&leaf_id).unwrap();
286            leaf_node::search(page, key).is_ok()
287        };
288        if !found {
289            return Ok(false);
290        }
291
292        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
293        {
294            let page = pages.get_mut(&new_leaf_id).unwrap();
295            leaf_node::delete(page, key);
296        }
297
298        let leaf_empty = pages.get(&new_leaf_id).unwrap().num_cells() == 0;
299
300        if !leaf_empty || path.is_empty() {
301            self.root = propagate_cow_up(pages, alloc, txn_id, &path, new_leaf_id);
302            self.entry_count -= 1;
303            return Ok(true);
304        }
305
306        alloc.free(new_leaf_id);
307        pages.remove(&new_leaf_id);
308
309        self.root = propagate_remove_up(pages, alloc, txn_id, &path, &mut self.depth);
310        self.entry_count -= 1;
311        Ok(true)
312    }
313
314    /// Walk root to leaf for `key`. Returns (path, leaf_page_id).
315    pub fn walk_to_leaf(
316        &self,
317        pages: &HashMap<PageId, Page>,
318        key: &[u8],
319    ) -> Result<(Vec<(PageId, usize)>, PageId)> {
320        let mut path = Vec::with_capacity(self.depth as usize);
321        let mut current = self.root;
322        loop {
323            let page = pages.get(&current).ok_or(Error::PageOutOfBounds(current))?;
324            match page.page_type() {
325                Some(PageType::Leaf) => return Ok((path, current)),
326                Some(PageType::Branch) => {
327                    let child_idx = branch_node::search_child_index(page, key);
328                    let child = branch_node::get_child(page, child_idx);
329                    path.push((current, child_idx));
330                    current = child;
331                }
332                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
333            }
334        }
335    }
336}
337
338/// CoW a page. No-op if already owned by this txn. In-place mode reuses page ID.
339pub fn cow_page(
340    pages: &mut HashMap<PageId, Page>,
341    alloc: &mut PageAllocator,
342    old_id: PageId,
343    txn_id: TxnId,
344) -> PageId {
345    if pages.get(&old_id).unwrap().txn_id() == txn_id {
346        return old_id;
347    }
348    if alloc.in_place() {
349        let page = pages.get_mut(&old_id).unwrap();
350        page.set_txn_id(txn_id);
351        return old_id;
352    }
353    let new_id = alloc.allocate();
354    let mut new_page = pages.get(&old_id).unwrap().clone();
355    new_page.set_page_id(new_id);
356    new_page.set_txn_id(txn_id);
357    pages.insert(new_id, new_page);
358    alloc.free(old_id);
359    new_id
360}
361
362/// Update a branch's child pointer at `child_idx` to point to `new_child`.
363fn update_branch_child(page: &mut Page, child_idx: usize, new_child: PageId) {
364    let n = page.num_cells() as usize;
365    if child_idx < n {
366        let offset = page.cell_offset(child_idx as u16) as usize;
367        page.data[offset..offset + 4].copy_from_slice(&new_child.as_u32().to_le_bytes());
368    } else {
369        page.set_right_child(new_child);
370    }
371}
372
373/// Propagate CoW up through ancestors (no split, just update child pointers).
374pub fn propagate_cow_up(
375    pages: &mut HashMap<PageId, Page>,
376    alloc: &mut PageAllocator,
377    txn_id: TxnId,
378    path: &[(PageId, usize)],
379    mut new_child: PageId,
380) -> PageId {
381    for &(ancestor_id, child_idx) in path.iter().rev() {
382        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
383        let page = pages.get_mut(&new_ancestor).unwrap();
384        update_branch_child(page, child_idx, new_child);
385        new_child = new_ancestor;
386    }
387    new_child
388}
389
390/// Split full leaf and insert. Returns (separator_key, right_page_id).
391fn split_leaf_with_insert(
392    pages: &mut HashMap<PageId, Page>,
393    alloc: &mut PageAllocator,
394    txn_id: TxnId,
395    leaf_id: PageId,
396    key: &[u8],
397    val_type: ValueType,
398    value: &[u8],
399) -> (Vec<u8>, PageId) {
400    let mut cells: Vec<(Vec<u8>, Vec<u8>)> = {
401        let page = pages.get(&leaf_id).unwrap();
402        let n = page.num_cells() as usize;
403        (0..n)
404            .map(|i| {
405                let cell = leaf_node::read_cell(page, i as u16);
406                let raw = leaf_node::read_cell_bytes(page, i as u16);
407                (cell.key.to_vec(), raw)
408            })
409            .collect()
410    };
411
412    let new_raw = leaf_node::build_cell(key, val_type, value);
413    match cells.binary_search_by(|(k, _)| k.as_slice().cmp(key)) {
414        Ok(idx) => cells[idx] = (key.to_vec(), new_raw),
415        Err(idx) => cells.insert(idx, (key.to_vec(), new_raw)),
416    }
417
418    let total = cells.len();
419
420    let usable = citadel_core::constants::USABLE_SIZE;
421    let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
422    cum.push(0);
423    for (_, raw) in &cells {
424        cum.push(cum.last().unwrap() + raw.len());
425    }
426    let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
427    let right_fits = |sp: usize| (cum[total] - cum[sp]) + (total - sp) * 2 <= usable;
428
429    let mut split_point = total / 2;
430    if !left_fits(split_point) || !right_fits(split_point) {
431        split_point = 1;
432        for sp in 1..total {
433            if left_fits(sp) && right_fits(sp) {
434                split_point = sp;
435                if sp >= total / 2 {
436                    break;
437                }
438            }
439        }
440    }
441
442    let sep_key = cells[split_point].0.clone();
443
444    {
445        let left_refs: Vec<&[u8]> = cells[..split_point]
446            .iter()
447            .map(|(_, raw)| raw.as_slice())
448            .collect();
449        let page = pages.get_mut(&leaf_id).unwrap();
450        page.rebuild_cells(&left_refs);
451    }
452
453    let right_id = alloc.allocate();
454    {
455        let mut right_page = Page::new(right_id, PageType::Leaf, txn_id);
456        let right_refs: Vec<&[u8]> = cells[split_point..]
457            .iter()
458            .map(|(_, raw)| raw.as_slice())
459            .collect();
460        right_page.rebuild_cells(&right_refs);
461        pages.insert(right_id, right_page);
462    }
463
464    (sep_key, right_id)
465}
466
467#[allow(clippy::too_many_arguments)]
468fn propagate_split_up(
469    pages: &mut HashMap<PageId, Page>,
470    alloc: &mut PageAllocator,
471    txn_id: TxnId,
472    path: &[(PageId, usize)],
473    mut left_child: PageId,
474    initial_sep: &[u8],
475    mut right_child: PageId,
476    depth: &mut u16,
477) -> PageId {
478    let mut sep_key = initial_sep.to_vec();
479    let mut pending_split = true;
480
481    for &(ancestor_id, child_idx) in path.iter().rev() {
482        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
483
484        if pending_split {
485            let ok = {
486                let page = pages.get_mut(&new_ancestor).unwrap();
487                branch_node::insert_separator(page, child_idx, left_child, &sep_key, right_child)
488            };
489
490            if ok {
491                pending_split = false;
492                left_child = new_ancestor;
493            } else {
494                let (new_sep, new_right) = split_branch_with_insert(
495                    pages,
496                    alloc,
497                    txn_id,
498                    new_ancestor,
499                    child_idx,
500                    left_child,
501                    &sep_key,
502                    right_child,
503                );
504                left_child = new_ancestor;
505                sep_key = new_sep;
506                right_child = new_right;
507            }
508        } else {
509            let page = pages.get_mut(&new_ancestor).unwrap();
510            update_branch_child(page, child_idx, left_child);
511            left_child = new_ancestor;
512        }
513    }
514
515    if pending_split {
516        let new_root_id = alloc.allocate();
517        let mut new_root = Page::new(new_root_id, PageType::Branch, txn_id);
518        let cell = branch_node::build_cell(left_child, &sep_key);
519        new_root.write_cell(&cell).unwrap();
520        new_root.set_right_child(right_child);
521        pages.insert(new_root_id, new_root);
522        *depth += 1;
523        new_root_id
524    } else {
525        left_child
526    }
527}
528
529#[allow(clippy::too_many_arguments)]
530fn split_branch_with_insert(
531    pages: &mut HashMap<PageId, Page>,
532    alloc: &mut PageAllocator,
533    txn_id: TxnId,
534    branch_id: PageId,
535    child_idx: usize,
536    new_left: PageId,
537    sep_key: &[u8],
538    new_right: PageId,
539) -> (Vec<u8>, PageId) {
540    // Collect all cells and apply the separator insertion logically
541    let (new_cells, final_right_child) = {
542        let page = pages.get(&branch_id).unwrap();
543        let n = page.num_cells() as usize;
544        let cells: Vec<(PageId, Vec<u8>)> = (0..n)
545            .map(|i| {
546                let cell = branch_node::read_cell(page, i as u16);
547                (cell.child, cell.key.to_vec())
548            })
549            .collect();
550        let old_rc = page.right_child();
551
552        let mut result = Vec::with_capacity(n + 1);
553        let final_rc;
554
555        if child_idx < n {
556            let old_key = cells[child_idx].1.clone();
557            for (i, (child, key)) in cells.into_iter().enumerate() {
558                if i == child_idx {
559                    result.push((new_left, sep_key.to_vec()));
560                    result.push((new_right, old_key.clone()));
561                } else {
562                    result.push((child, key));
563                }
564            }
565            final_rc = old_rc;
566        } else {
567            result = cells;
568            result.push((new_left, sep_key.to_vec()));
569            final_rc = new_right;
570        }
571
572        (result, final_rc)
573    };
574
575    let total = new_cells.len();
576    let usable = citadel_core::constants::USABLE_SIZE;
577    let raw_sizes: Vec<usize> = new_cells.iter().map(|(_, key)| 6 + key.len()).collect();
578    let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
579    cum.push(0);
580    for &sz in &raw_sizes {
581        cum.push(cum.last().unwrap() + sz);
582    }
583    let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
584    let right_fits = |sp: usize| {
585        let right_count = total - sp - 1;
586        (cum[total] - cum[sp + 1]) + right_count * 2 <= usable
587    };
588
589    let mut split_point = total / 2;
590    if !left_fits(split_point) || !right_fits(split_point) {
591        split_point = 1;
592        for sp in 1..total.saturating_sub(1) {
593            if left_fits(sp) && right_fits(sp) {
594                split_point = sp;
595                if sp >= total / 2 {
596                    break;
597                }
598            }
599        }
600    }
601
602    let promoted_sep = new_cells[split_point].1.clone();
603    let promoted_child = new_cells[split_point].0;
604
605    {
606        let left_raw: Vec<Vec<u8>> = new_cells[..split_point]
607            .iter()
608            .map(|(child, key)| branch_node::build_cell(*child, key))
609            .collect();
610        let left_refs: Vec<&[u8]> = left_raw.iter().map(|c| c.as_slice()).collect();
611        let page = pages.get_mut(&branch_id).unwrap();
612        page.rebuild_cells(&left_refs);
613        page.set_right_child(promoted_child);
614    }
615
616    let right_branch_id = alloc.allocate();
617    {
618        let mut right_page = Page::new(right_branch_id, PageType::Branch, txn_id);
619        let right_raw: Vec<Vec<u8>> = new_cells[split_point + 1..]
620            .iter()
621            .map(|(child, key)| branch_node::build_cell(*child, key))
622            .collect();
623        let right_refs: Vec<&[u8]> = right_raw.iter().map(|c| c.as_slice()).collect();
624        right_page.rebuild_cells(&right_refs);
625        right_page.set_right_child(final_right_child);
626        pages.insert(right_branch_id, right_page);
627    }
628
629    (promoted_sep, right_branch_id)
630}
631
632fn remove_child_from_branch(page: &mut Page, child_idx: usize) {
633    let n = page.num_cells() as usize;
634    if child_idx < n {
635        let cell_sz = branch_node::get_cell_size(page, child_idx as u16);
636        page.delete_cell_at(child_idx as u16, cell_sz);
637    } else {
638        assert!(n > 0, "cannot remove right_child from branch with 0 cells");
639        let last_child = branch_node::read_cell(page, (n - 1) as u16).child;
640        let cell_sz = branch_node::get_cell_size(page, (n - 1) as u16);
641        page.delete_cell_at((n - 1) as u16, cell_sz);
642        page.set_right_child(last_child);
643    }
644}
645
646fn propagate_remove_up(
647    pages: &mut HashMap<PageId, Page>,
648    alloc: &mut PageAllocator,
649    txn_id: TxnId,
650    path: &[(PageId, usize)],
651    depth: &mut u16,
652) -> PageId {
653    // Process the bottom-most ancestor first (parent of the deleted leaf)
654    let mut level = path.len();
655
656    // Track whether parent needs child removal vs pointer update
657    let mut need_remove_at_level = true;
658
659    // Result: the page ID that should be propagated upward
660    let mut new_child = PageId(0); // placeholder, set below
661
662    while level > 0 && need_remove_at_level {
663        level -= 1;
664        let (ancestor_id, child_idx) = path[level];
665        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
666
667        {
668            let page = pages.get_mut(&new_ancestor).unwrap();
669            remove_child_from_branch(page, child_idx);
670        }
671
672        let num_cells = pages.get(&new_ancestor).unwrap().num_cells();
673
674        if num_cells > 0 || level == 0 {
675            if num_cells == 0 && level == 0 {
676                // Root collapsed - replace with its only child
677                let only_child = pages.get(&new_ancestor).unwrap().right_child();
678                alloc.free(new_ancestor);
679                pages.remove(&new_ancestor);
680                *depth -= 1;
681                return only_child;
682            }
683            // Branch is non-empty, or it's the root with cells
684            new_child = new_ancestor;
685            need_remove_at_level = false;
686        } else {
687            // Branch became empty (0 cells) - collapse to its right_child
688            let only_child = pages.get(&new_ancestor).unwrap().right_child();
689            alloc.free(new_ancestor);
690            pages.remove(&new_ancestor);
691            *depth -= 1;
692
693            // Replace this branch with only_child in grandparent (stop cascading)
694            new_child = only_child;
695            need_remove_at_level = false;
696        }
697    }
698
699    // Propagate CoW for remaining path levels above
700    if level > 0 {
701        let remaining_path = &path[..level];
702        new_child = propagate_cow_up(pages, alloc, txn_id, remaining_path, new_child);
703    }
704
705    new_child
706}
707
708#[cfg(test)]
709mod tests {
710    use super::*;
711
712    fn new_tree() -> (HashMap<PageId, Page>, PageAllocator, BTree) {
713        let mut pages = HashMap::new();
714        let mut alloc = PageAllocator::new(0);
715        let tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
716        (pages, alloc, tree)
717    }
718
719    #[test]
720    fn empty_tree_search() {
721        let (pages, _, tree) = new_tree();
722        assert_eq!(tree.search(&pages, b"anything").unwrap(), None);
723    }
724
725    #[test]
726    fn insert_and_search_single() {
727        let (mut pages, mut alloc, mut tree) = new_tree();
728        let is_new = tree
729            .insert(
730                &mut pages,
731                &mut alloc,
732                TxnId(1),
733                b"hello",
734                ValueType::Inline,
735                b"world",
736            )
737            .unwrap();
738        assert!(is_new);
739        assert_eq!(tree.entry_count, 1);
740
741        let result = tree.search(&pages, b"hello").unwrap();
742        assert_eq!(result, Some((ValueType::Inline, b"world".to_vec())));
743    }
744
745    #[test]
746    fn insert_update_existing() {
747        let (mut pages, mut alloc, mut tree) = new_tree();
748        tree.insert(
749            &mut pages,
750            &mut alloc,
751            TxnId(1),
752            b"key",
753            ValueType::Inline,
754            b"v1",
755        )
756        .unwrap();
757        let is_new = tree
758            .insert(
759                &mut pages,
760                &mut alloc,
761                TxnId(1),
762                b"key",
763                ValueType::Inline,
764                b"v2",
765            )
766            .unwrap();
767        assert!(!is_new);
768        assert_eq!(tree.entry_count, 1);
769
770        let result = tree.search(&pages, b"key").unwrap();
771        assert_eq!(result, Some((ValueType::Inline, b"v2".to_vec())));
772    }
773
774    #[test]
775    fn insert_multiple_sorted() {
776        let (mut pages, mut alloc, mut tree) = new_tree();
777        let keys = [b"dog", b"ant", b"cat", b"fox", b"bat", b"eel"];
778        for k in &keys {
779            tree.insert(&mut pages, &mut alloc, TxnId(1), *k, ValueType::Inline, *k)
780                .unwrap();
781        }
782        assert_eq!(tree.entry_count, 6);
783
784        // Verify all keys searchable
785        for k in &keys {
786            let result = tree.search(&pages, *k).unwrap();
787            assert_eq!(result, Some((ValueType::Inline, k.to_vec())));
788        }
789
790        // Verify non-existent key
791        assert_eq!(tree.search(&pages, b"zebra").unwrap(), None);
792    }
793
794    #[test]
795    fn insert_triggers_leaf_split() {
796        let (mut pages, mut alloc, mut tree) = new_tree();
797
798        // 500 entries > ~385 per leaf → triggers split
799        let count = 500;
800        for i in 0..count {
801            let key = format!("key-{i:05}");
802            let val = format!("val-{i:05}");
803            tree.insert(
804                &mut pages,
805                &mut alloc,
806                TxnId(1),
807                key.as_bytes(),
808                ValueType::Inline,
809                val.as_bytes(),
810            )
811            .unwrap();
812        }
813
814        assert_eq!(tree.entry_count, count);
815        assert!(
816            tree.depth >= 2,
817            "tree should have split (depth={})",
818            tree.depth
819        );
820
821        // Verify all keys present
822        for i in 0..count {
823            let key = format!("key-{i:05}");
824            let val = format!("val-{i:05}");
825            let result = tree.search(&pages, key.as_bytes()).unwrap();
826            assert_eq!(result, Some((ValueType::Inline, val.into_bytes())));
827        }
828    }
829
830    #[test]
831    fn delete_existing_key() {
832        let (mut pages, mut alloc, mut tree) = new_tree();
833        tree.insert(
834            &mut pages,
835            &mut alloc,
836            TxnId(1),
837            b"a",
838            ValueType::Inline,
839            b"1",
840        )
841        .unwrap();
842        tree.insert(
843            &mut pages,
844            &mut alloc,
845            TxnId(1),
846            b"b",
847            ValueType::Inline,
848            b"2",
849        )
850        .unwrap();
851        tree.insert(
852            &mut pages,
853            &mut alloc,
854            TxnId(1),
855            b"c",
856            ValueType::Inline,
857            b"3",
858        )
859        .unwrap();
860
861        let found = tree.delete(&mut pages, &mut alloc, TxnId(1), b"b").unwrap();
862        assert!(found);
863        assert_eq!(tree.entry_count, 2);
864        assert_eq!(tree.search(&pages, b"b").unwrap(), None);
865        assert_eq!(
866            tree.search(&pages, b"a").unwrap(),
867            Some((ValueType::Inline, b"1".to_vec()))
868        );
869        assert_eq!(
870            tree.search(&pages, b"c").unwrap(),
871            Some((ValueType::Inline, b"3".to_vec()))
872        );
873    }
874
875    #[test]
876    fn delete_nonexistent_key() {
877        let (mut pages, mut alloc, mut tree) = new_tree();
878        tree.insert(
879            &mut pages,
880            &mut alloc,
881            TxnId(1),
882            b"a",
883            ValueType::Inline,
884            b"1",
885        )
886        .unwrap();
887        let found = tree.delete(&mut pages, &mut alloc, TxnId(1), b"z").unwrap();
888        assert!(!found);
889        assert_eq!(tree.entry_count, 1);
890    }
891
892    #[test]
893    fn delete_all_from_root_leaf() {
894        let (mut pages, mut alloc, mut tree) = new_tree();
895        tree.insert(
896            &mut pages,
897            &mut alloc,
898            TxnId(1),
899            b"x",
900            ValueType::Inline,
901            b"1",
902        )
903        .unwrap();
904        tree.delete(&mut pages, &mut alloc, TxnId(1), b"x").unwrap();
905        assert_eq!(tree.entry_count, 0);
906
907        // Root is still a valid (empty) leaf
908        let root = pages.get(&tree.root).unwrap();
909        assert_eq!(root.page_type(), Some(PageType::Leaf));
910        assert_eq!(root.num_cells(), 0);
911    }
912
913    #[test]
914    fn cow_produces_new_page_ids() {
915        let (mut pages, mut alloc, mut tree) = new_tree();
916        let root_before = tree.root;
917
918        tree.insert(
919            &mut pages,
920            &mut alloc,
921            TxnId(2),
922            b"key",
923            ValueType::Inline,
924            b"val",
925        )
926        .unwrap();
927        let root_after = tree.root;
928
929        // Root should have changed (CoW)
930        assert_ne!(root_before, root_after);
931        // Old root should have been freed via allocator
932        assert!(alloc.freed_this_txn().contains(&root_before));
933    }
934
935    #[test]
936    fn insert_and_delete_many() {
937        let (mut pages, mut alloc, mut tree) = new_tree();
938        let count = 1000u64;
939
940        // Insert
941        for i in 0..count {
942            let key = format!("k{i:06}");
943            let val = format!("v{i:06}");
944            tree.insert(
945                &mut pages,
946                &mut alloc,
947                TxnId(1),
948                key.as_bytes(),
949                ValueType::Inline,
950                val.as_bytes(),
951            )
952            .unwrap();
953        }
954        assert_eq!(tree.entry_count, count);
955
956        // Delete every other key
957        for i in (0..count).step_by(2) {
958            let key = format!("k{i:06}");
959            let found = tree
960                .delete(&mut pages, &mut alloc, TxnId(1), key.as_bytes())
961                .unwrap();
962            assert!(found);
963        }
964        assert_eq!(tree.entry_count, count / 2);
965
966        // Verify remaining keys
967        for i in 0..count {
968            let key = format!("k{i:06}");
969            let result = tree.search(&pages, key.as_bytes()).unwrap();
970            if i % 2 == 0 {
971                assert_eq!(result, None, "deleted key {key} should not be found");
972            } else {
973                let val = format!("v{i:06}");
974                assert_eq!(result, Some((ValueType::Inline, val.into_bytes())));
975            }
976        }
977    }
978
979    #[test]
980    fn deep_tree_insert_delete() {
981        let (mut pages, mut alloc, mut tree) = new_tree();
982
983        // Insert enough to create depth >= 2
984        let count = 2000u64;
985        for i in 0..count {
986            let key = format!("{i:08}");
987            tree.insert(
988                &mut pages,
989                &mut alloc,
990                TxnId(1),
991                key.as_bytes(),
992                ValueType::Inline,
993                b"v",
994            )
995            .unwrap();
996        }
997        assert!(tree.depth >= 2, "depth={} expected >= 2", tree.depth);
998        assert_eq!(tree.entry_count, count);
999
1000        // Delete all
1001        for i in 0..count {
1002            let key = format!("{i:08}");
1003            let found = tree
1004                .delete(&mut pages, &mut alloc, TxnId(1), key.as_bytes())
1005                .unwrap();
1006            assert!(found, "key {key} should be deletable");
1007        }
1008        assert_eq!(tree.entry_count, 0);
1009    }
1010}