Skip to main content

citadel_buffer/
btree.rs

1//! CoW B+ tree engine. Mutations clone pages; old pages go to pending-free list.
2
3use crate::allocator::PageAllocator;
4use citadel_core::types::{PageId, PageType, TxnId, ValueType};
5use citadel_core::{Error, Result};
6use citadel_page::page::Page;
7use citadel_page::{branch_node, leaf_node};
8use rustc_hash::FxHashMap;
9
10/// B+ tree metadata. Lightweight struct - pages are stored externally.
11#[derive(Clone)]
12pub struct BTree {
13    pub root: PageId,
14    pub depth: u16,
15    pub entry_count: u64,
16    last_insert: Option<(Vec<(PageId, usize)>, PageId)>,
17}
18
19#[derive(Debug, Clone)]
20pub enum UpsertOutcome {
21    Inserted,
22    Updated,
23    Skipped,
24}
25
26#[derive(Debug, Clone)]
27pub enum UpsertAction {
28    Replace(Vec<u8>),
29    Skip,
30}
31
32impl BTree {
33    /// Create a new empty B+ tree with a single leaf root.
34    pub fn new(
35        pages: &mut FxHashMap<PageId, Page>,
36        alloc: &mut PageAllocator,
37        txn_id: TxnId,
38    ) -> Self {
39        let root_id = alloc.allocate();
40        let root = Page::new(root_id, PageType::Leaf, txn_id);
41        pages.insert(root_id, root);
42        Self {
43            root: root_id,
44            depth: 1,
45            entry_count: 0,
46            last_insert: None,
47        }
48    }
49
50    /// Create a BTree from existing metadata (e.g., loaded from commit slot).
51    pub fn from_existing(root: PageId, depth: u16, entry_count: u64) -> Self {
52        Self {
53            root,
54            depth,
55            entry_count,
56            last_insert: None,
57        }
58    }
59
60    /// Search for a key. Returns `Some((val_type, value))` if found, `None` otherwise.
61    pub fn search(
62        &self,
63        pages: &FxHashMap<PageId, Page>,
64        key: &[u8],
65    ) -> Result<Option<(ValueType, Vec<u8>)>> {
66        let mut current = self.root;
67        loop {
68            let page = pages.get(&current).ok_or(Error::PageOutOfBounds(current))?;
69            match page.page_type() {
70                Some(PageType::Leaf) => {
71                    return match leaf_node::search(page, key) {
72                        Ok(idx) => {
73                            let cell = leaf_node::read_cell(page, idx);
74                            Ok(Some((cell.val_type, cell.value.to_vec())))
75                        }
76                        Err(_) => Ok(None),
77                    };
78                }
79                Some(PageType::Branch) => {
80                    let idx = branch_node::search_child_index(page, key);
81                    current = branch_node::get_child(page, idx);
82                }
83                _ => {
84                    return Err(Error::InvalidPageType(page.page_type_raw(), current));
85                }
86            }
87        }
88    }
89
90    pub fn lil_would_hit(&self, pages: &FxHashMap<PageId, Page>, key: &[u8]) -> bool {
91        if let Some((_, cached_leaf)) = &self.last_insert {
92            if let Some(page) = pages.get(cached_leaf) {
93                let n = page.num_cells();
94                return n > 0 && key > leaf_node::read_cell(page, n - 1).key;
95            }
96        }
97        false
98    }
99
100    /// Insert key-value. Returns `true` if new, `false` if updated existing.
101    pub fn insert(
102        &mut self,
103        pages: &mut FxHashMap<PageId, Page>,
104        alloc: &mut PageAllocator,
105        txn_id: TxnId,
106        key: &[u8],
107        val_type: ValueType,
108        value: &[u8],
109    ) -> Result<bool> {
110        // LIL cache: skip walk_to_leaf for sequential appends to the rightmost leaf.
111        if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
112            let (hit, needs_cow) = {
113                let page = pages
114                    .get(&cached_leaf)
115                    .ok_or(Error::PageOutOfBounds(cached_leaf))?;
116                let n = page.num_cells();
117                let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
118                let nc = page.txn_id() != txn_id;
119                (h, nc)
120            };
121            if hit {
122                let cow_id = if needs_cow {
123                    cow_page(pages, alloc, cached_leaf, txn_id)
124                } else {
125                    cached_leaf
126                };
127                let ok = {
128                    let page = pages.get_mut(&cow_id).unwrap();
129                    leaf_node::insert_direct(page, key, val_type, value)
130                };
131                if ok {
132                    if cow_id != cached_leaf {
133                        self.root =
134                            propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
135                    }
136                    self.entry_count += 1;
137                    self.last_insert = Some((cached_path, cow_id));
138                    return Ok(true);
139                }
140                let (sep_key, right_id) =
141                    split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
142                self.root = propagate_split_up(
143                    pages,
144                    alloc,
145                    txn_id,
146                    &cached_path,
147                    cow_id,
148                    &sep_key,
149                    right_id,
150                    &mut self.depth,
151                );
152                self.last_insert = None;
153                self.entry_count += 1;
154                return Ok(true);
155            }
156        }
157
158        let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
159        self.insert_at_leaf(pages, alloc, txn_id, key, val_type, value, path, leaf_id)
160    }
161
162    #[allow(clippy::too_many_arguments)]
163    #[inline]
164    pub fn insert_at_leaf(
165        &mut self,
166        pages: &mut FxHashMap<PageId, Page>,
167        alloc: &mut PageAllocator,
168        txn_id: TxnId,
169        key: &[u8],
170        val_type: ValueType,
171        value: &[u8],
172        path: Vec<(PageId, usize)>,
173        leaf_id: PageId,
174    ) -> Result<bool> {
175        let key_exists = {
176            let page = pages.get(&leaf_id).unwrap();
177            leaf_node::search(page, key).is_ok()
178        };
179
180        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
181
182        let leaf_ok = {
183            let page = pages.get_mut(&new_leaf_id).unwrap();
184            leaf_node::insert_direct(page, key, val_type, value)
185        };
186
187        if leaf_ok {
188            if alloc.in_place() && new_leaf_id == leaf_id {
189                let mut is_rightmost = true;
190                for &(ancestor_id, child_idx) in path.iter().rev() {
191                    let page = pages.get(&ancestor_id).unwrap();
192                    if child_idx != page.num_cells() as usize {
193                        is_rightmost = false;
194                        break;
195                    }
196                }
197                if is_rightmost {
198                    self.last_insert = Some((path, new_leaf_id));
199                }
200                if !key_exists {
201                    self.entry_count += 1;
202                }
203                return Ok(!key_exists);
204            }
205            let mut child = new_leaf_id;
206            let mut is_rightmost = true;
207            let mut new_path = path;
208            for i in (0..new_path.len()).rev() {
209                let (ancestor_id, child_idx) = new_path[i];
210                let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
211                let page = pages.get_mut(&new_ancestor).unwrap();
212                update_branch_child(page, child_idx, child);
213                if child_idx != page.num_cells() as usize {
214                    is_rightmost = false;
215                }
216                new_path[i] = (new_ancestor, child_idx);
217                child = new_ancestor;
218            }
219            self.root = child;
220
221            if is_rightmost {
222                self.last_insert = Some((new_path, new_leaf_id));
223            }
224
225            if !key_exists {
226                self.entry_count += 1;
227            }
228            return Ok(!key_exists);
229        }
230
231        self.last_insert = None;
232        let (sep_key, right_id) =
233            split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
234        self.root = propagate_split_up(
235            pages,
236            alloc,
237            txn_id,
238            &path,
239            new_leaf_id,
240            &sep_key,
241            right_id,
242            &mut self.depth,
243        );
244
245        if !key_exists {
246            self.entry_count += 1;
247        }
248        Ok(!key_exists)
249    }
250
251    pub fn insert_or_fetch(
252        &mut self,
253        pages: &mut FxHashMap<PageId, Page>,
254        alloc: &mut PageAllocator,
255        txn_id: TxnId,
256        key: &[u8],
257        val_type: ValueType,
258        value: &[u8],
259    ) -> Result<Option<Vec<u8>>> {
260        if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
261            let (hit, needs_cow) = {
262                let page = pages
263                    .get(&cached_leaf)
264                    .ok_or(Error::PageOutOfBounds(cached_leaf))?;
265                let n = page.num_cells();
266                let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
267                let nc = page.txn_id() != txn_id;
268                (h, nc)
269            };
270            if hit {
271                let cow_id = if needs_cow {
272                    cow_page(pages, alloc, cached_leaf, txn_id)
273                } else {
274                    cached_leaf
275                };
276                let ok = {
277                    let page = pages.get_mut(&cow_id).unwrap();
278                    leaf_node::insert_direct(page, key, val_type, value)
279                };
280                if ok {
281                    if cow_id != cached_leaf {
282                        self.root =
283                            propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
284                    }
285                    self.entry_count += 1;
286                    self.last_insert = Some((cached_path, cow_id));
287                    return Ok(None);
288                }
289                let (sep_key, right_id) =
290                    split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
291                self.root = propagate_split_up(
292                    pages,
293                    alloc,
294                    txn_id,
295                    &cached_path,
296                    cow_id,
297                    &sep_key,
298                    right_id,
299                    &mut self.depth,
300                );
301                self.last_insert = None;
302                self.entry_count += 1;
303                return Ok(None);
304            }
305            self.last_insert = Some((cached_path, cached_leaf));
306        }
307
308        let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
309
310        let existing_value = {
311            let page = pages.get(&leaf_id).unwrap();
312            match leaf_node::search(page, key) {
313                Ok(idx) => {
314                    let cell = leaf_node::read_cell(page, idx);
315                    if matches!(cell.val_type, ValueType::Tombstone) {
316                        None
317                    } else {
318                        Some(cell.value.to_vec())
319                    }
320                }
321                Err(_) => None,
322            }
323        };
324        if let Some(v) = existing_value {
325            return Ok(Some(v));
326        }
327
328        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
329        let leaf_ok = {
330            let page = pages.get_mut(&new_leaf_id).unwrap();
331            leaf_node::insert_direct(page, key, val_type, value)
332        };
333
334        if leaf_ok {
335            if alloc.in_place() && new_leaf_id == leaf_id {
336                let mut is_rightmost = true;
337                for &(ancestor_id, child_idx) in path.iter().rev() {
338                    let page = pages.get(&ancestor_id).unwrap();
339                    if child_idx != page.num_cells() as usize {
340                        is_rightmost = false;
341                        break;
342                    }
343                }
344                if is_rightmost {
345                    self.last_insert = Some((path, new_leaf_id));
346                }
347                self.entry_count += 1;
348                return Ok(None);
349            }
350            let mut child = new_leaf_id;
351            let mut is_rightmost = true;
352            let mut new_path = path;
353            for i in (0..new_path.len()).rev() {
354                let (ancestor_id, child_idx) = new_path[i];
355                let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
356                let page = pages.get_mut(&new_ancestor).unwrap();
357                update_branch_child(page, child_idx, child);
358                if child_idx != page.num_cells() as usize {
359                    is_rightmost = false;
360                }
361                new_path[i] = (new_ancestor, child_idx);
362                child = new_ancestor;
363            }
364            self.root = child;
365
366            if is_rightmost {
367                self.last_insert = Some((new_path, new_leaf_id));
368            }
369            self.entry_count += 1;
370            return Ok(None);
371        }
372
373        self.last_insert = None;
374        let (sep_key, right_id) =
375            split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
376        self.root = propagate_split_up(
377            pages,
378            alloc,
379            txn_id,
380            &path,
381            new_leaf_id,
382            &sep_key,
383            right_id,
384            &mut self.depth,
385        );
386        self.entry_count += 1;
387        Ok(None)
388    }
389
390    #[inline]
391    pub fn insert_if_absent(
392        &mut self,
393        pages: &mut FxHashMap<PageId, Page>,
394        alloc: &mut PageAllocator,
395        txn_id: TxnId,
396        key: &[u8],
397        val_type: ValueType,
398        value: &[u8],
399    ) -> Result<bool> {
400        if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
401            let (hit, needs_cow) = {
402                let page = pages
403                    .get(&cached_leaf)
404                    .ok_or(Error::PageOutOfBounds(cached_leaf))?;
405                let n = page.num_cells();
406                let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
407                let nc = page.txn_id() != txn_id;
408                (h, nc)
409            };
410            if hit {
411                let cow_id = if needs_cow {
412                    cow_page(pages, alloc, cached_leaf, txn_id)
413                } else {
414                    cached_leaf
415                };
416                let ok = {
417                    let page = pages.get_mut(&cow_id).unwrap();
418                    leaf_node::insert_direct(page, key, val_type, value)
419                };
420                if ok {
421                    if cow_id != cached_leaf {
422                        self.root =
423                            propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
424                    }
425                    self.entry_count += 1;
426                    self.last_insert = Some((cached_path, cow_id));
427                    return Ok(true);
428                }
429                let (sep_key, right_id) =
430                    split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
431                self.root = propagate_split_up(
432                    pages,
433                    alloc,
434                    txn_id,
435                    &cached_path,
436                    cow_id,
437                    &sep_key,
438                    right_id,
439                    &mut self.depth,
440                );
441                self.last_insert = None;
442                self.entry_count += 1;
443                return Ok(true);
444            }
445            self.last_insert = Some((cached_path, cached_leaf));
446        }
447
448        let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
449        self.insert_if_absent_at_leaf(pages, alloc, txn_id, key, val_type, value, path, leaf_id)
450    }
451
452    #[allow(clippy::too_many_arguments)]
453    #[inline]
454    pub fn insert_if_absent_at_leaf(
455        &mut self,
456        pages: &mut FxHashMap<PageId, Page>,
457        alloc: &mut PageAllocator,
458        txn_id: TxnId,
459        key: &[u8],
460        val_type: ValueType,
461        value: &[u8],
462        path: Vec<(PageId, usize)>,
463        leaf_id: PageId,
464    ) -> Result<bool> {
465        let exists = {
466            let page = pages.get(&leaf_id).unwrap();
467            match leaf_node::search(page, key) {
468                Ok(idx) => {
469                    let cell = leaf_node::read_cell(page, idx);
470                    !matches!(cell.val_type, ValueType::Tombstone)
471                }
472                Err(_) => false,
473            }
474        };
475        if exists {
476            return Ok(false);
477        }
478
479        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
480        let leaf_ok = {
481            let page = pages.get_mut(&new_leaf_id).unwrap();
482            leaf_node::insert_direct(page, key, val_type, value)
483        };
484
485        if leaf_ok {
486            if alloc.in_place() && new_leaf_id == leaf_id {
487                let mut is_rightmost = true;
488                for &(ancestor_id, child_idx) in path.iter().rev() {
489                    let page = pages.get(&ancestor_id).unwrap();
490                    if child_idx != page.num_cells() as usize {
491                        is_rightmost = false;
492                        break;
493                    }
494                }
495                if is_rightmost {
496                    self.last_insert = Some((path, new_leaf_id));
497                }
498                self.entry_count += 1;
499                return Ok(true);
500            }
501            let mut child = new_leaf_id;
502            let mut is_rightmost = true;
503            let mut new_path = path;
504            for i in (0..new_path.len()).rev() {
505                let (ancestor_id, child_idx) = new_path[i];
506                let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
507                let page = pages.get_mut(&new_ancestor).unwrap();
508                update_branch_child(page, child_idx, child);
509                if child_idx != page.num_cells() as usize {
510                    is_rightmost = false;
511                }
512                new_path[i] = (new_ancestor, child_idx);
513                child = new_ancestor;
514            }
515            self.root = child;
516
517            if is_rightmost {
518                self.last_insert = Some((new_path, new_leaf_id));
519            }
520            self.entry_count += 1;
521            return Ok(true);
522        }
523
524        self.last_insert = None;
525        let (sep_key, right_id) =
526            split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
527        self.root = propagate_split_up(
528            pages,
529            alloc,
530            txn_id,
531            &path,
532            new_leaf_id,
533            &sep_key,
534            right_id,
535            &mut self.depth,
536        );
537        self.entry_count += 1;
538        Ok(true)
539    }
540
541    #[allow(clippy::too_many_arguments)]
542    #[inline]
543    pub fn upsert_with<F, E>(
544        &mut self,
545        pages: &mut FxHashMap<PageId, Page>,
546        alloc: &mut PageAllocator,
547        txn_id: TxnId,
548        key: &[u8],
549        val_type: ValueType,
550        default_value: &[u8],
551        f: F,
552    ) -> std::result::Result<UpsertOutcome, E>
553    where
554        F: FnMut(&[u8]) -> std::result::Result<UpsertAction, E>,
555        E: From<Error>,
556    {
557        if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
558            let (hit, needs_cow) = {
559                let page = pages
560                    .get(&cached_leaf)
561                    .ok_or(Error::PageOutOfBounds(cached_leaf))?;
562                let n = page.num_cells();
563                let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
564                let nc = page.txn_id() != txn_id;
565                (h, nc)
566            };
567            if hit {
568                let cow_id = if needs_cow {
569                    cow_page(pages, alloc, cached_leaf, txn_id)
570                } else {
571                    cached_leaf
572                };
573                let ok = {
574                    let page = pages.get_mut(&cow_id).unwrap();
575                    leaf_node::insert_direct(page, key, val_type, default_value)
576                };
577                if ok {
578                    if cow_id != cached_leaf {
579                        self.root =
580                            propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
581                    }
582                    self.entry_count += 1;
583                    self.last_insert = Some((cached_path, cow_id));
584                    return Ok(UpsertOutcome::Inserted);
585                }
586                let (sep_key, right_id) = split_leaf_with_insert(
587                    pages,
588                    alloc,
589                    txn_id,
590                    cow_id,
591                    key,
592                    val_type,
593                    default_value,
594                );
595                self.root = propagate_split_up(
596                    pages,
597                    alloc,
598                    txn_id,
599                    &cached_path,
600                    cow_id,
601                    &sep_key,
602                    right_id,
603                    &mut self.depth,
604                );
605                self.last_insert = None;
606                self.entry_count += 1;
607                return Ok(UpsertOutcome::Inserted);
608            }
609            self.last_insert = Some((cached_path, cached_leaf));
610        }
611
612        let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
613        self.upsert_with_at_leaf(
614            pages,
615            alloc,
616            txn_id,
617            key,
618            val_type,
619            default_value,
620            path,
621            leaf_id,
622            f,
623        )
624    }
625
626    #[allow(clippy::too_many_arguments)]
627    #[inline]
628    pub fn upsert_with_at_leaf<F, E>(
629        &mut self,
630        pages: &mut FxHashMap<PageId, Page>,
631        alloc: &mut PageAllocator,
632        txn_id: TxnId,
633        key: &[u8],
634        val_type: ValueType,
635        default_value: &[u8],
636        path: Vec<(PageId, usize)>,
637        leaf_id: PageId,
638        mut f: F,
639    ) -> std::result::Result<UpsertOutcome, E>
640    where
641        F: FnMut(&[u8]) -> std::result::Result<UpsertAction, E>,
642        E: From<Error>,
643    {
644        let action = {
645            let page = pages.get(&leaf_id).unwrap();
646            match leaf_node::search(page, key) {
647                Ok(idx) => {
648                    let cell = leaf_node::read_cell(page, idx);
649                    if matches!(cell.val_type, ValueType::Tombstone) {
650                        None
651                    } else {
652                        Some(f(cell.value)?)
653                    }
654                }
655                Err(_) => None,
656            }
657        };
658
659        if let Some(act) = action {
660            match act {
661                UpsertAction::Skip => return Ok(UpsertOutcome::Skipped),
662                UpsertAction::Replace(new_bytes) => {
663                    let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
664                    let leaf_ok = {
665                        let page = pages.get_mut(&new_leaf_id).unwrap();
666                        leaf_node::insert_direct(page, key, val_type, &new_bytes)
667                    };
668                    if leaf_ok {
669                        if new_leaf_id != leaf_id {
670                            let mut new_path = path;
671                            self.root =
672                                propagate_cow_up(pages, alloc, txn_id, &mut new_path, new_leaf_id);
673                        }
674                        return Ok(UpsertOutcome::Updated);
675                    }
676                    self.last_insert = None;
677                    let (sep_key, right_id) = split_leaf_with_insert(
678                        pages,
679                        alloc,
680                        txn_id,
681                        new_leaf_id,
682                        key,
683                        val_type,
684                        &new_bytes,
685                    );
686                    self.root = propagate_split_up(
687                        pages,
688                        alloc,
689                        txn_id,
690                        &path,
691                        new_leaf_id,
692                        &sep_key,
693                        right_id,
694                        &mut self.depth,
695                    );
696                    return Ok(UpsertOutcome::Updated);
697                }
698            }
699        }
700
701        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
702        let leaf_ok = {
703            let page = pages.get_mut(&new_leaf_id).unwrap();
704            leaf_node::insert_direct(page, key, val_type, default_value)
705        };
706
707        if leaf_ok {
708            if alloc.in_place() && new_leaf_id == leaf_id {
709                let mut is_rightmost = true;
710                for &(ancestor_id, child_idx) in path.iter().rev() {
711                    let page = pages.get(&ancestor_id).unwrap();
712                    if child_idx != page.num_cells() as usize {
713                        is_rightmost = false;
714                        break;
715                    }
716                }
717                if is_rightmost {
718                    self.last_insert = Some((path, new_leaf_id));
719                }
720                self.entry_count += 1;
721                return Ok(UpsertOutcome::Inserted);
722            }
723            let mut child = new_leaf_id;
724            let mut is_rightmost = true;
725            let mut new_path = path;
726            for i in (0..new_path.len()).rev() {
727                let (ancestor_id, child_idx) = new_path[i];
728                let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
729                let page = pages.get_mut(&new_ancestor).unwrap();
730                update_branch_child(page, child_idx, child);
731                if child_idx != page.num_cells() as usize {
732                    is_rightmost = false;
733                }
734                new_path[i] = (new_ancestor, child_idx);
735                child = new_ancestor;
736            }
737            self.root = child;
738
739            if is_rightmost {
740                self.last_insert = Some((new_path, new_leaf_id));
741            }
742            self.entry_count += 1;
743            return Ok(UpsertOutcome::Inserted);
744        }
745
746        self.last_insert = None;
747        let (sep_key, right_id) = split_leaf_with_insert(
748            pages,
749            alloc,
750            txn_id,
751            new_leaf_id,
752            key,
753            val_type,
754            default_value,
755        );
756        self.root = propagate_split_up(
757            pages,
758            alloc,
759            txn_id,
760            &path,
761            new_leaf_id,
762            &sep_key,
763            right_id,
764            &mut self.depth,
765        );
766        self.entry_count += 1;
767        Ok(UpsertOutcome::Inserted)
768    }
769
770    /// Bulk-update existing keys. Keys must be sorted.
771    pub fn update_sorted(
772        &mut self,
773        pages: &mut FxHashMap<PageId, Page>,
774        alloc: &mut PageAllocator,
775        txn_id: TxnId,
776        pairs: &[(&[u8], &[u8])],
777    ) -> Result<u64> {
778        if pairs.is_empty() {
779            return Ok(0);
780        }
781        self.last_insert = None;
782
783        let (mut path, mut leaf_id) = self.walk_to_leaf(pages, pairs[0].0)?;
784        let mut cow_leaf = cow_page(pages, alloc, leaf_id, txn_id);
785        if cow_leaf != leaf_id {
786            self.root = propagate_cow_up(pages, alloc, txn_id, &mut path, cow_leaf);
787        }
788
789        let mut count: u64 = 0;
790        let mut hint: u16 = 0;
791
792        for &(key, value) in pairs {
793            let past_leaf = {
794                let page = pages.get(&cow_leaf).unwrap();
795                let n = page.num_cells();
796                n == 0 || key > leaf_node::read_cell(page, n - 1).key
797            };
798
799            if past_leaf {
800                let (new_path, new_leaf) = self.walk_to_leaf(pages, key)?;
801                path = new_path;
802                leaf_id = new_leaf;
803                cow_leaf = cow_page(pages, alloc, leaf_id, txn_id);
804                if cow_leaf != leaf_id {
805                    self.root = propagate_cow_up(pages, alloc, txn_id, &mut path, cow_leaf);
806                }
807                hint = 0;
808            }
809
810            let page = pages.get(&cow_leaf).unwrap();
811            let n = page.num_cells();
812            let idx = {
813                let mut i = hint;
814                loop {
815                    if i >= n {
816                        break None;
817                    }
818                    let cell = leaf_node::read_cell(page, i);
819                    match key.cmp(cell.key) {
820                        std::cmp::Ordering::Equal => break Some(i),
821                        std::cmp::Ordering::Less => break None,
822                        std::cmp::Ordering::Greater => i += 1,
823                    }
824                }
825            };
826
827            if let Some(idx) = idx {
828                hint = idx + 1;
829                let page = pages.get_mut(&cow_leaf).unwrap();
830                if !leaf_node::update_value_in_place(page, idx, ValueType::Inline, value) {
831                    leaf_node::insert_direct(page, key, ValueType::Inline, value);
832                }
833                count += 1;
834            }
835        }
836
837        Ok(count)
838    }
839
840    /// Delete a key. Returns `true` if the key was found and deleted.
841    pub fn delete(
842        &mut self,
843        pages: &mut FxHashMap<PageId, Page>,
844        alloc: &mut PageAllocator,
845        txn_id: TxnId,
846        key: &[u8],
847    ) -> Result<bool> {
848        self.last_insert = None;
849        let (mut path, leaf_id) = self.walk_to_leaf(pages, key)?;
850
851        let found = {
852            let page = pages.get(&leaf_id).unwrap();
853            leaf_node::search(page, key).is_ok()
854        };
855        if !found {
856            return Ok(false);
857        }
858
859        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
860        {
861            let page = pages.get_mut(&new_leaf_id).unwrap();
862            leaf_node::delete(page, key);
863        }
864
865        let leaf_empty = pages.get(&new_leaf_id).unwrap().num_cells() == 0;
866
867        if !leaf_empty || path.is_empty() {
868            if alloc.in_place() && new_leaf_id == leaf_id {
869                self.entry_count -= 1;
870                return Ok(true);
871            }
872            self.root = propagate_cow_up(pages, alloc, txn_id, &mut path, new_leaf_id);
873            self.entry_count -= 1;
874            return Ok(true);
875        }
876
877        alloc.free(new_leaf_id);
878        pages.remove(&new_leaf_id);
879
880        self.root = propagate_remove_up(pages, alloc, txn_id, &mut path, &mut self.depth);
881        self.entry_count -= 1;
882        Ok(true)
883    }
884
885    /// Walk root to leaf for `key`. Returns (path, leaf_page_id).
886    pub fn walk_to_leaf(
887        &self,
888        pages: &FxHashMap<PageId, Page>,
889        key: &[u8],
890    ) -> Result<(Vec<(PageId, usize)>, PageId)> {
891        let mut path = Vec::with_capacity(self.depth as usize);
892        let mut current = self.root;
893        loop {
894            let page = pages.get(&current).ok_or(Error::PageOutOfBounds(current))?;
895            match page.page_type() {
896                Some(PageType::Leaf) => return Ok((path, current)),
897                Some(PageType::Branch) => {
898                    let child_idx = branch_node::search_child_index(page, key);
899                    let child = branch_node::get_child(page, child_idx);
900                    path.push((current, child_idx));
901                    current = child;
902                }
903                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
904            }
905        }
906    }
907}
908
909/// CoW a page. No-op if already owned by this txn. In-place mode reuses page ID.
910pub fn cow_page(
911    pages: &mut FxHashMap<PageId, Page>,
912    alloc: &mut PageAllocator,
913    old_id: PageId,
914    txn_id: TxnId,
915) -> PageId {
916    if alloc.in_place() {
917        let page = pages.get_mut(&old_id).unwrap();
918        if page.txn_id() != txn_id {
919            page.set_txn_id(txn_id);
920        }
921        return old_id;
922    }
923    let mut new_page = {
924        let page = pages.get(&old_id).unwrap();
925        if page.txn_id() == txn_id {
926            return old_id;
927        }
928        page.clone()
929    };
930    let new_id = alloc.allocate();
931    new_page.set_page_id(new_id);
932    new_page.set_txn_id(txn_id);
933    pages.insert(new_id, new_page);
934    alloc.free(old_id);
935    new_id
936}
937
938/// Update a branch's child pointer at `child_idx` to point to `new_child`.
939fn update_branch_child(page: &mut Page, child_idx: usize, new_child: PageId) {
940    let n = page.num_cells() as usize;
941    if child_idx < n {
942        let offset = page.cell_offset(child_idx as u16) as usize;
943        page.data[offset..offset + 4].copy_from_slice(&new_child.as_u32().to_le_bytes());
944    } else {
945        page.set_right_child(new_child);
946    }
947}
948
949/// Propagate CoW up through ancestors. Updates `path` in place so callers
950/// caching the path (e.g. LIL) reuse current PageIds after CoW — critical
951/// across SAVEPOINT boundaries where txn_id changes invalidate the cache.
952pub fn propagate_cow_up(
953    pages: &mut FxHashMap<PageId, Page>,
954    alloc: &mut PageAllocator,
955    txn_id: TxnId,
956    path: &mut [(PageId, usize)],
957    mut new_child: PageId,
958) -> PageId {
959    for i in (0..path.len()).rev() {
960        let (ancestor_id, child_idx) = path[i];
961        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
962        let page = pages.get_mut(&new_ancestor).unwrap();
963        update_branch_child(page, child_idx, new_child);
964        path[i] = (new_ancestor, child_idx);
965        new_child = new_ancestor;
966    }
967    new_child
968}
969
970/// Split full leaf and insert. Returns (separator_key, right_page_id).
971fn split_leaf_with_insert(
972    pages: &mut FxHashMap<PageId, Page>,
973    alloc: &mut PageAllocator,
974    txn_id: TxnId,
975    leaf_id: PageId,
976    key: &[u8],
977    val_type: ValueType,
978    value: &[u8],
979) -> (Vec<u8>, PageId) {
980    let mut cells: Vec<(Vec<u8>, Vec<u8>)> = {
981        let page = pages.get(&leaf_id).unwrap();
982        let n = page.num_cells() as usize;
983        (0..n)
984            .map(|i| {
985                let cell = leaf_node::read_cell(page, i as u16);
986                let raw = leaf_node::read_cell_bytes(page, i as u16);
987                (cell.key.to_vec(), raw)
988            })
989            .collect()
990    };
991
992    let new_raw = leaf_node::build_cell(key, val_type, value);
993    match cells.binary_search_by(|(k, _)| k.as_slice().cmp(key)) {
994        Ok(idx) => cells[idx] = (key.to_vec(), new_raw),
995        Err(idx) => cells.insert(idx, (key.to_vec(), new_raw)),
996    }
997
998    let total = cells.len();
999
1000    let usable = citadel_core::constants::USABLE_SIZE;
1001    let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
1002    cum.push(0);
1003    for (_, raw) in &cells {
1004        cum.push(cum.last().unwrap() + raw.len());
1005    }
1006    let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
1007    let right_fits = |sp: usize| (cum[total] - cum[sp]) + (total - sp) * 2 <= usable;
1008
1009    let mut split_point = total / 2;
1010    if !left_fits(split_point) || !right_fits(split_point) {
1011        split_point = 1;
1012        for sp in 1..total {
1013            if left_fits(sp) && right_fits(sp) {
1014                split_point = sp;
1015                if sp >= total / 2 {
1016                    break;
1017                }
1018            }
1019        }
1020    }
1021
1022    let sep_key = cells[split_point].0.clone();
1023
1024    {
1025        let left_refs: Vec<&[u8]> = cells[..split_point]
1026            .iter()
1027            .map(|(_, raw)| raw.as_slice())
1028            .collect();
1029        let page = pages.get_mut(&leaf_id).unwrap();
1030        page.rebuild_cells(&left_refs);
1031    }
1032
1033    let right_id = alloc.allocate();
1034    {
1035        let mut right_page = Page::new(right_id, PageType::Leaf, txn_id);
1036        let right_refs: Vec<&[u8]> = cells[split_point..]
1037            .iter()
1038            .map(|(_, raw)| raw.as_slice())
1039            .collect();
1040        right_page.rebuild_cells(&right_refs);
1041        pages.insert(right_id, right_page);
1042    }
1043
1044    (sep_key, right_id)
1045}
1046
1047#[allow(clippy::too_many_arguments)]
1048fn propagate_split_up(
1049    pages: &mut FxHashMap<PageId, Page>,
1050    alloc: &mut PageAllocator,
1051    txn_id: TxnId,
1052    path: &[(PageId, usize)],
1053    mut left_child: PageId,
1054    initial_sep: &[u8],
1055    mut right_child: PageId,
1056    depth: &mut u16,
1057) -> PageId {
1058    let mut sep_key = initial_sep.to_vec();
1059    let mut pending_split = true;
1060
1061    for &(ancestor_id, child_idx) in path.iter().rev() {
1062        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
1063
1064        if pending_split {
1065            let ok = {
1066                let page = pages.get_mut(&new_ancestor).unwrap();
1067                branch_node::insert_separator(page, child_idx, left_child, &sep_key, right_child)
1068            };
1069
1070            if ok {
1071                pending_split = false;
1072                left_child = new_ancestor;
1073            } else {
1074                let (new_sep, new_right) = split_branch_with_insert(
1075                    pages,
1076                    alloc,
1077                    txn_id,
1078                    new_ancestor,
1079                    child_idx,
1080                    left_child,
1081                    &sep_key,
1082                    right_child,
1083                );
1084                left_child = new_ancestor;
1085                sep_key = new_sep;
1086                right_child = new_right;
1087            }
1088        } else {
1089            let page = pages.get_mut(&new_ancestor).unwrap();
1090            update_branch_child(page, child_idx, left_child);
1091            left_child = new_ancestor;
1092        }
1093    }
1094
1095    if pending_split {
1096        let new_root_id = alloc.allocate();
1097        let mut new_root = Page::new(new_root_id, PageType::Branch, txn_id);
1098        let cell = branch_node::build_cell(left_child, &sep_key);
1099        new_root.write_cell(&cell).unwrap();
1100        new_root.set_right_child(right_child);
1101        pages.insert(new_root_id, new_root);
1102        *depth += 1;
1103        new_root_id
1104    } else {
1105        left_child
1106    }
1107}
1108
1109#[allow(clippy::too_many_arguments)]
1110fn split_branch_with_insert(
1111    pages: &mut FxHashMap<PageId, Page>,
1112    alloc: &mut PageAllocator,
1113    txn_id: TxnId,
1114    branch_id: PageId,
1115    child_idx: usize,
1116    new_left: PageId,
1117    sep_key: &[u8],
1118    new_right: PageId,
1119) -> (Vec<u8>, PageId) {
1120    let (new_cells, final_right_child) = {
1121        let page = pages.get(&branch_id).unwrap();
1122        let n = page.num_cells() as usize;
1123        let cells: Vec<(PageId, Vec<u8>)> = (0..n)
1124            .map(|i| {
1125                let cell = branch_node::read_cell(page, i as u16);
1126                (cell.child, cell.key.to_vec())
1127            })
1128            .collect();
1129        let old_rc = page.right_child();
1130
1131        let mut result = Vec::with_capacity(n + 1);
1132        let final_rc;
1133
1134        if child_idx < n {
1135            let old_key = cells[child_idx].1.clone();
1136            for (i, (child, key)) in cells.into_iter().enumerate() {
1137                if i == child_idx {
1138                    result.push((new_left, sep_key.to_vec()));
1139                    result.push((new_right, old_key.clone()));
1140                } else {
1141                    result.push((child, key));
1142                }
1143            }
1144            final_rc = old_rc;
1145        } else {
1146            result = cells;
1147            result.push((new_left, sep_key.to_vec()));
1148            final_rc = new_right;
1149        }
1150
1151        (result, final_rc)
1152    };
1153
1154    let total = new_cells.len();
1155    let usable = citadel_core::constants::USABLE_SIZE;
1156    let raw_sizes: Vec<usize> = new_cells.iter().map(|(_, key)| 6 + key.len()).collect();
1157    let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
1158    cum.push(0);
1159    for &sz in &raw_sizes {
1160        cum.push(cum.last().unwrap() + sz);
1161    }
1162    let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
1163    let right_fits = |sp: usize| {
1164        let right_count = total - sp - 1;
1165        (cum[total] - cum[sp + 1]) + right_count * 2 <= usable
1166    };
1167
1168    let mut split_point = total / 2;
1169    if !left_fits(split_point) || !right_fits(split_point) {
1170        split_point = 1;
1171        for sp in 1..total.saturating_sub(1) {
1172            if left_fits(sp) && right_fits(sp) {
1173                split_point = sp;
1174                if sp >= total / 2 {
1175                    break;
1176                }
1177            }
1178        }
1179    }
1180
1181    let promoted_sep = new_cells[split_point].1.clone();
1182    let promoted_child = new_cells[split_point].0;
1183
1184    {
1185        let left_raw: Vec<Vec<u8>> = new_cells[..split_point]
1186            .iter()
1187            .map(|(child, key)| branch_node::build_cell(*child, key))
1188            .collect();
1189        let left_refs: Vec<&[u8]> = left_raw.iter().map(|c| c.as_slice()).collect();
1190        let page = pages.get_mut(&branch_id).unwrap();
1191        page.rebuild_cells(&left_refs);
1192        page.set_right_child(promoted_child);
1193    }
1194
1195    let right_branch_id = alloc.allocate();
1196    {
1197        let mut right_page = Page::new(right_branch_id, PageType::Branch, txn_id);
1198        let right_raw: Vec<Vec<u8>> = new_cells[split_point + 1..]
1199            .iter()
1200            .map(|(child, key)| branch_node::build_cell(*child, key))
1201            .collect();
1202        let right_refs: Vec<&[u8]> = right_raw.iter().map(|c| c.as_slice()).collect();
1203        right_page.rebuild_cells(&right_refs);
1204        right_page.set_right_child(final_right_child);
1205        pages.insert(right_branch_id, right_page);
1206    }
1207
1208    (promoted_sep, right_branch_id)
1209}
1210
1211fn remove_child_from_branch(page: &mut Page, child_idx: usize) {
1212    let n = page.num_cells() as usize;
1213    if child_idx < n {
1214        let cell_sz = branch_node::get_cell_size(page, child_idx as u16);
1215        page.delete_cell_at(child_idx as u16, cell_sz);
1216    } else {
1217        assert!(n > 0, "cannot remove right_child from branch with 0 cells");
1218        let last_child = branch_node::read_cell(page, (n - 1) as u16).child;
1219        let cell_sz = branch_node::get_cell_size(page, (n - 1) as u16);
1220        page.delete_cell_at((n - 1) as u16, cell_sz);
1221        page.set_right_child(last_child);
1222    }
1223}
1224
1225fn propagate_remove_up(
1226    pages: &mut FxHashMap<PageId, Page>,
1227    alloc: &mut PageAllocator,
1228    txn_id: TxnId,
1229    path: &mut [(PageId, usize)],
1230    depth: &mut u16,
1231) -> PageId {
1232    let mut level = path.len();
1233    let mut need_remove_at_level = true;
1234    let mut new_child = PageId(0);
1235
1236    while level > 0 && need_remove_at_level {
1237        level -= 1;
1238        let (ancestor_id, child_idx) = path[level];
1239        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
1240
1241        {
1242            let page = pages.get_mut(&new_ancestor).unwrap();
1243            remove_child_from_branch(page, child_idx);
1244        }
1245
1246        let num_cells = pages.get(&new_ancestor).unwrap().num_cells();
1247
1248        if num_cells > 0 || level == 0 {
1249            if num_cells == 0 && level == 0 {
1250                // Root collapsed - replace with its only child
1251                let only_child = pages.get(&new_ancestor).unwrap().right_child();
1252                alloc.free(new_ancestor);
1253                pages.remove(&new_ancestor);
1254                *depth -= 1;
1255                return only_child;
1256            }
1257            // Branch is non-empty, or it's the root with cells
1258            new_child = new_ancestor;
1259            need_remove_at_level = false;
1260        } else {
1261            // Branch became empty (0 cells) - collapse to its right_child
1262            let only_child = pages.get(&new_ancestor).unwrap().right_child();
1263            alloc.free(new_ancestor);
1264            pages.remove(&new_ancestor);
1265            *depth -= 1;
1266
1267            new_child = only_child;
1268            need_remove_at_level = false;
1269        }
1270    }
1271
1272    if level > 0 {
1273        let remaining_path = &mut path[..level];
1274        new_child = propagate_cow_up(pages, alloc, txn_id, remaining_path, new_child);
1275    }
1276
1277    new_child
1278}
1279
1280#[cfg(test)]
1281mod tests {
1282    use super::*;
1283
1284    fn new_tree() -> (FxHashMap<PageId, Page>, PageAllocator, BTree) {
1285        let mut pages = FxHashMap::default();
1286        let mut alloc = PageAllocator::new(0);
1287        let tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
1288        (pages, alloc, tree)
1289    }
1290
1291    #[test]
1292    fn empty_tree_search() {
1293        let (pages, _, tree) = new_tree();
1294        assert_eq!(tree.search(&pages, b"anything").unwrap(), None);
1295    }
1296
1297    #[test]
1298    fn insert_and_search_single() {
1299        let (mut pages, mut alloc, mut tree) = new_tree();
1300        let is_new = tree
1301            .insert(
1302                &mut pages,
1303                &mut alloc,
1304                TxnId(1),
1305                b"hello",
1306                ValueType::Inline,
1307                b"world",
1308            )
1309            .unwrap();
1310        assert!(is_new);
1311        assert_eq!(tree.entry_count, 1);
1312
1313        let result = tree.search(&pages, b"hello").unwrap();
1314        assert_eq!(result, Some((ValueType::Inline, b"world".to_vec())));
1315    }
1316
1317    #[test]
1318    fn insert_update_existing() {
1319        let (mut pages, mut alloc, mut tree) = new_tree();
1320        tree.insert(
1321            &mut pages,
1322            &mut alloc,
1323            TxnId(1),
1324            b"key",
1325            ValueType::Inline,
1326            b"v1",
1327        )
1328        .unwrap();
1329        let is_new = tree
1330            .insert(
1331                &mut pages,
1332                &mut alloc,
1333                TxnId(1),
1334                b"key",
1335                ValueType::Inline,
1336                b"v2",
1337            )
1338            .unwrap();
1339        assert!(!is_new);
1340        assert_eq!(tree.entry_count, 1);
1341
1342        let result = tree.search(&pages, b"key").unwrap();
1343        assert_eq!(result, Some((ValueType::Inline, b"v2".to_vec())));
1344    }
1345
1346    #[test]
1347    fn insert_multiple_sorted() {
1348        let (mut pages, mut alloc, mut tree) = new_tree();
1349        let keys = [b"dog", b"ant", b"cat", b"fox", b"bat", b"eel"];
1350        for k in &keys {
1351            tree.insert(&mut pages, &mut alloc, TxnId(1), *k, ValueType::Inline, *k)
1352                .unwrap();
1353        }
1354        assert_eq!(tree.entry_count, 6);
1355
1356        for k in &keys {
1357            let result = tree.search(&pages, *k).unwrap();
1358            assert_eq!(result, Some((ValueType::Inline, k.to_vec())));
1359        }
1360
1361        assert_eq!(tree.search(&pages, b"zebra").unwrap(), None);
1362    }
1363
1364    #[test]
1365    fn insert_triggers_leaf_split() {
1366        let (mut pages, mut alloc, mut tree) = new_tree();
1367
1368        // 500 entries exceeds per-leaf capacity and forces a split.
1369        let count = 500;
1370        for i in 0..count {
1371            let key = format!("key-{i:05}");
1372            let val = format!("val-{i:05}");
1373            tree.insert(
1374                &mut pages,
1375                &mut alloc,
1376                TxnId(1),
1377                key.as_bytes(),
1378                ValueType::Inline,
1379                val.as_bytes(),
1380            )
1381            .unwrap();
1382        }
1383
1384        assert_eq!(tree.entry_count, count);
1385        assert!(
1386            tree.depth >= 2,
1387            "tree should have split (depth={})",
1388            tree.depth
1389        );
1390
1391        for i in 0..count {
1392            let key = format!("key-{i:05}");
1393            let val = format!("val-{i:05}");
1394            let result = tree.search(&pages, key.as_bytes()).unwrap();
1395            assert_eq!(result, Some((ValueType::Inline, val.into_bytes())));
1396        }
1397    }
1398
1399    #[test]
1400    fn delete_existing_key() {
1401        let (mut pages, mut alloc, mut tree) = new_tree();
1402        tree.insert(
1403            &mut pages,
1404            &mut alloc,
1405            TxnId(1),
1406            b"a",
1407            ValueType::Inline,
1408            b"1",
1409        )
1410        .unwrap();
1411        tree.insert(
1412            &mut pages,
1413            &mut alloc,
1414            TxnId(1),
1415            b"b",
1416            ValueType::Inline,
1417            b"2",
1418        )
1419        .unwrap();
1420        tree.insert(
1421            &mut pages,
1422            &mut alloc,
1423            TxnId(1),
1424            b"c",
1425            ValueType::Inline,
1426            b"3",
1427        )
1428        .unwrap();
1429
1430        let found = tree.delete(&mut pages, &mut alloc, TxnId(1), b"b").unwrap();
1431        assert!(found);
1432        assert_eq!(tree.entry_count, 2);
1433        assert_eq!(tree.search(&pages, b"b").unwrap(), None);
1434        assert_eq!(
1435            tree.search(&pages, b"a").unwrap(),
1436            Some((ValueType::Inline, b"1".to_vec()))
1437        );
1438        assert_eq!(
1439            tree.search(&pages, b"c").unwrap(),
1440            Some((ValueType::Inline, b"3".to_vec()))
1441        );
1442    }
1443
1444    #[test]
1445    fn delete_nonexistent_key() {
1446        let (mut pages, mut alloc, mut tree) = new_tree();
1447        tree.insert(
1448            &mut pages,
1449            &mut alloc,
1450            TxnId(1),
1451            b"a",
1452            ValueType::Inline,
1453            b"1",
1454        )
1455        .unwrap();
1456        let found = tree.delete(&mut pages, &mut alloc, TxnId(1), b"z").unwrap();
1457        assert!(!found);
1458        assert_eq!(tree.entry_count, 1);
1459    }
1460
1461    #[test]
1462    fn delete_all_from_root_leaf() {
1463        let (mut pages, mut alloc, mut tree) = new_tree();
1464        tree.insert(
1465            &mut pages,
1466            &mut alloc,
1467            TxnId(1),
1468            b"x",
1469            ValueType::Inline,
1470            b"1",
1471        )
1472        .unwrap();
1473        tree.delete(&mut pages, &mut alloc, TxnId(1), b"x").unwrap();
1474        assert_eq!(tree.entry_count, 0);
1475
1476        let root = pages.get(&tree.root).unwrap();
1477        assert_eq!(root.page_type(), Some(PageType::Leaf));
1478        assert_eq!(root.num_cells(), 0);
1479    }
1480
1481    #[test]
1482    fn cow_produces_new_page_ids() {
1483        let (mut pages, mut alloc, mut tree) = new_tree();
1484        let root_before = tree.root;
1485
1486        tree.insert(
1487            &mut pages,
1488            &mut alloc,
1489            TxnId(2),
1490            b"key",
1491            ValueType::Inline,
1492            b"val",
1493        )
1494        .unwrap();
1495        let root_after = tree.root;
1496
1497        assert_ne!(root_before, root_after);
1498        assert!(alloc.freed_this_txn().contains(&root_before));
1499    }
1500
1501    #[test]
1502    fn insert_and_delete_many() {
1503        let (mut pages, mut alloc, mut tree) = new_tree();
1504        let count = 1000u64;
1505
1506        for i in 0..count {
1507            let key = format!("k{i:06}");
1508            let val = format!("v{i:06}");
1509            tree.insert(
1510                &mut pages,
1511                &mut alloc,
1512                TxnId(1),
1513                key.as_bytes(),
1514                ValueType::Inline,
1515                val.as_bytes(),
1516            )
1517            .unwrap();
1518        }
1519        assert_eq!(tree.entry_count, count);
1520
1521        for i in (0..count).step_by(2) {
1522            let key = format!("k{i:06}");
1523            let found = tree
1524                .delete(&mut pages, &mut alloc, TxnId(1), key.as_bytes())
1525                .unwrap();
1526            assert!(found);
1527        }
1528        assert_eq!(tree.entry_count, count / 2);
1529
1530        for i in 0..count {
1531            let key = format!("k{i:06}");
1532            let result = tree.search(&pages, key.as_bytes()).unwrap();
1533            if i % 2 == 0 {
1534                assert_eq!(result, None, "deleted key {key} should not be found");
1535            } else {
1536                let val = format!("v{i:06}");
1537                assert_eq!(result, Some((ValueType::Inline, val.into_bytes())));
1538            }
1539        }
1540    }
1541
1542    #[test]
1543    fn deep_tree_insert_delete() {
1544        let (mut pages, mut alloc, mut tree) = new_tree();
1545
1546        let count = 2000u64;
1547        for i in 0..count {
1548            let key = format!("{i:08}");
1549            tree.insert(
1550                &mut pages,
1551                &mut alloc,
1552                TxnId(1),
1553                key.as_bytes(),
1554                ValueType::Inline,
1555                b"v",
1556            )
1557            .unwrap();
1558        }
1559        assert!(tree.depth >= 2, "depth={} expected >= 2", tree.depth);
1560        assert_eq!(tree.entry_count, count);
1561
1562        for i in 0..count {
1563            let key = format!("{i:08}");
1564            let found = tree
1565                .delete(&mut pages, &mut alloc, TxnId(1), key.as_bytes())
1566                .unwrap();
1567            assert!(found, "key {key} should be deletable");
1568        }
1569        assert_eq!(tree.entry_count, 0);
1570    }
1571}