Skip to main content

citadel_buffer/
btree.rs

1//! CoW B+ tree engine. Mutations clone pages; old pages go to pending-free list.
2
3use crate::allocator::PageAllocator;
4use citadel_core::types::{PageId, PageType, TxnId, ValueType};
5use citadel_core::{Error, Result};
6use citadel_page::page::Page;
7use citadel_page::{branch_node, leaf_node};
8use rustc_hash::FxHashMap;
9
10/// B+ tree metadata. Pages stored externally.
11#[derive(Clone)]
12pub struct BTree {
13    pub root: PageId,
14    pub depth: u16,
15    pub entry_count: u64,
16    last_insert: Option<(Vec<(PageId, usize)>, PageId)>,
17}
18
19#[derive(Debug, Clone)]
20pub enum UpsertOutcome {
21    Inserted,
22    Updated,
23    Skipped,
24}
25
26#[derive(Debug, Clone)]
27pub enum UpsertAction {
28    Replace(Vec<u8>),
29    Skip,
30}
31
32impl BTree {
33    /// Create a new empty B+ tree with a single leaf root.
34    pub fn new(
35        pages: &mut FxHashMap<PageId, Page>,
36        alloc: &mut PageAllocator,
37        txn_id: TxnId,
38    ) -> Self {
39        let root_id = alloc.allocate();
40        let root = Page::new(root_id, PageType::Leaf, txn_id);
41        pages.insert(root_id, root);
42        Self {
43            root: root_id,
44            depth: 1,
45            entry_count: 0,
46            last_insert: None,
47        }
48    }
49
50    /// Create a BTree from existing metadata (e.g., loaded from commit slot).
51    pub fn from_existing(root: PageId, depth: u16, entry_count: u64) -> Self {
52        Self {
53            root,
54            depth,
55            entry_count,
56            last_insert: None,
57        }
58    }
59
60    pub fn search_at_leaf(
61        pages: &FxHashMap<PageId, Page>,
62        leaf_id: PageId,
63        key: &[u8],
64    ) -> Result<Option<(ValueType, Vec<u8>)>> {
65        let page = pages.get(&leaf_id).ok_or(Error::PageOutOfBounds(leaf_id))?;
66        match leaf_node::search(page, key) {
67            Ok(idx) => {
68                let cell = leaf_node::read_cell(page, idx);
69                Ok(Some((cell.val_type, cell.value.to_vec())))
70            }
71            Err(_) => Ok(None),
72        }
73    }
74
75    pub fn search(
76        &self,
77        pages: &FxHashMap<PageId, Page>,
78        key: &[u8],
79    ) -> Result<Option<(ValueType, Vec<u8>)>> {
80        let mut current = self.root;
81        loop {
82            let page = pages.get(&current).ok_or(Error::PageOutOfBounds(current))?;
83            match page.page_type() {
84                Some(PageType::Leaf) => {
85                    return match leaf_node::search(page, key) {
86                        Ok(idx) => {
87                            let cell = leaf_node::read_cell(page, idx);
88                            Ok(Some((cell.val_type, cell.value.to_vec())))
89                        }
90                        Err(_) => Ok(None),
91                    };
92                }
93                Some(PageType::Branch) => {
94                    let idx = branch_node::search_child_index(page, key);
95                    current = branch_node::get_child(page, idx);
96                }
97                _ => {
98                    return Err(Error::InvalidPageType(page.page_type_raw(), current));
99                }
100            }
101        }
102    }
103
104    pub fn lil_would_hit(&self, pages: &FxHashMap<PageId, Page>, key: &[u8]) -> bool {
105        if let Some((_, cached_leaf)) = &self.last_insert {
106            if let Some(page) = pages.get(cached_leaf) {
107                let n = page.num_cells();
108                return n > 0 && key > leaf_node::read_cell(page, n - 1).key;
109            }
110        }
111        false
112    }
113
114    /// Combined LIL check + insert. Returns `Some(was_new)` on hit, `None` on miss.
115    pub fn try_lil_insert(
116        &mut self,
117        pages: &mut FxHashMap<PageId, Page>,
118        alloc: &mut PageAllocator,
119        txn_id: TxnId,
120        key: &[u8],
121        val_type: ValueType,
122        value: &[u8],
123    ) -> Result<Option<bool>> {
124        let cached_leaf = match self.last_insert.as_ref() {
125            Some((_, leaf)) => *leaf,
126            None => return Ok(None),
127        };
128        let (hit, needs_cow) = {
129            let page = pages
130                .get(&cached_leaf)
131                .ok_or(Error::PageOutOfBounds(cached_leaf))?;
132            let n = page.num_cells();
133            let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
134            let nc = page.txn_id() != txn_id;
135            (h, nc)
136        };
137        if !hit {
138            return Ok(None);
139        }
140        let mut cached_path = self.last_insert.take().unwrap().0;
141        let cow_id = if needs_cow {
142            cow_page(pages, alloc, cached_leaf, txn_id)
143        } else {
144            cached_leaf
145        };
146        let ok = {
147            let page = pages.get_mut(&cow_id).unwrap();
148            leaf_node::insert_append_direct(page, key, val_type, value)
149        };
150        if ok {
151            if cow_id != cached_leaf {
152                self.root = propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
153            }
154            self.entry_count += 1;
155            self.last_insert = Some((cached_path, cow_id));
156            return Ok(Some(true));
157        }
158        let (sep_key, right_id) =
159            split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
160        self.root = propagate_split_up(
161            pages,
162            alloc,
163            txn_id,
164            &cached_path,
165            cow_id,
166            &sep_key,
167            right_id,
168            &mut self.depth,
169        );
170        self.last_insert = None;
171        self.entry_count += 1;
172        Ok(Some(true))
173    }
174
175    /// Insert key-value. Returns `true` if new, `false` if updated existing.
176    pub fn insert(
177        &mut self,
178        pages: &mut FxHashMap<PageId, Page>,
179        alloc: &mut PageAllocator,
180        txn_id: TxnId,
181        key: &[u8],
182        val_type: ValueType,
183        value: &[u8],
184    ) -> Result<bool> {
185        // LIL cache: skip walk_to_leaf for sequential appends to the rightmost leaf.
186        if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
187            let (hit, needs_cow) = {
188                let page = pages
189                    .get(&cached_leaf)
190                    .ok_or(Error::PageOutOfBounds(cached_leaf))?;
191                let n = page.num_cells();
192                let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
193                let nc = page.txn_id() != txn_id;
194                (h, nc)
195            };
196            if hit {
197                let cow_id = if needs_cow {
198                    cow_page(pages, alloc, cached_leaf, txn_id)
199                } else {
200                    cached_leaf
201                };
202                let ok = {
203                    let page = pages.get_mut(&cow_id).unwrap();
204                    leaf_node::insert_direct(page, key, val_type, value)
205                };
206                if ok {
207                    if cow_id != cached_leaf {
208                        self.root =
209                            propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
210                    }
211                    self.entry_count += 1;
212                    self.last_insert = Some((cached_path, cow_id));
213                    return Ok(true);
214                }
215                let (sep_key, right_id) =
216                    split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
217                self.root = propagate_split_up(
218                    pages,
219                    alloc,
220                    txn_id,
221                    &cached_path,
222                    cow_id,
223                    &sep_key,
224                    right_id,
225                    &mut self.depth,
226                );
227                self.last_insert = None;
228                self.entry_count += 1;
229                return Ok(true);
230            }
231        }
232
233        let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
234        self.insert_at_leaf(pages, alloc, txn_id, key, val_type, value, path, leaf_id)
235    }
236
237    #[allow(clippy::too_many_arguments)]
238    #[inline]
239    pub fn insert_at_leaf(
240        &mut self,
241        pages: &mut FxHashMap<PageId, Page>,
242        alloc: &mut PageAllocator,
243        txn_id: TxnId,
244        key: &[u8],
245        val_type: ValueType,
246        value: &[u8],
247        path: Vec<(PageId, usize)>,
248        leaf_id: PageId,
249    ) -> Result<bool> {
250        let key_exists = {
251            let page = pages.get(&leaf_id).unwrap();
252            leaf_node::search(page, key).is_ok()
253        };
254
255        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
256
257        let leaf_ok = {
258            let page = pages.get_mut(&new_leaf_id).unwrap();
259            leaf_node::insert_direct(page, key, val_type, value)
260        };
261
262        if leaf_ok {
263            if alloc.in_place() && new_leaf_id == leaf_id {
264                let mut is_rightmost = true;
265                for &(ancestor_id, child_idx) in path.iter().rev() {
266                    let page = pages.get(&ancestor_id).unwrap();
267                    if child_idx != page.num_cells() as usize {
268                        is_rightmost = false;
269                        break;
270                    }
271                }
272                if is_rightmost {
273                    self.last_insert = Some((path, new_leaf_id));
274                }
275                if !key_exists {
276                    self.entry_count += 1;
277                }
278                return Ok(!key_exists);
279            }
280            let mut child = new_leaf_id;
281            let mut is_rightmost = true;
282            let mut new_path = path;
283            for i in (0..new_path.len()).rev() {
284                let (ancestor_id, child_idx) = new_path[i];
285                let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
286                let page = pages.get_mut(&new_ancestor).unwrap();
287                update_branch_child(page, child_idx, child);
288                if child_idx != page.num_cells() as usize {
289                    is_rightmost = false;
290                }
291                new_path[i] = (new_ancestor, child_idx);
292                child = new_ancestor;
293            }
294            self.root = child;
295
296            if is_rightmost {
297                self.last_insert = Some((new_path, new_leaf_id));
298            }
299
300            if !key_exists {
301                self.entry_count += 1;
302            }
303            return Ok(!key_exists);
304        }
305
306        self.last_insert = None;
307        let (sep_key, right_id) =
308            split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
309        self.root = propagate_split_up(
310            pages,
311            alloc,
312            txn_id,
313            &path,
314            new_leaf_id,
315            &sep_key,
316            right_id,
317            &mut self.depth,
318        );
319
320        if !key_exists {
321            self.entry_count += 1;
322        }
323        Ok(!key_exists)
324    }
325
326    pub fn insert_or_fetch(
327        &mut self,
328        pages: &mut FxHashMap<PageId, Page>,
329        alloc: &mut PageAllocator,
330        txn_id: TxnId,
331        key: &[u8],
332        val_type: ValueType,
333        value: &[u8],
334    ) -> Result<Option<Vec<u8>>> {
335        if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
336            let (hit, needs_cow) = {
337                let page = pages
338                    .get(&cached_leaf)
339                    .ok_or(Error::PageOutOfBounds(cached_leaf))?;
340                let n = page.num_cells();
341                let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
342                let nc = page.txn_id() != txn_id;
343                (h, nc)
344            };
345            if hit {
346                let cow_id = if needs_cow {
347                    cow_page(pages, alloc, cached_leaf, txn_id)
348                } else {
349                    cached_leaf
350                };
351                let ok = {
352                    let page = pages.get_mut(&cow_id).unwrap();
353                    leaf_node::insert_direct(page, key, val_type, value)
354                };
355                if ok {
356                    if cow_id != cached_leaf {
357                        self.root =
358                            propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
359                    }
360                    self.entry_count += 1;
361                    self.last_insert = Some((cached_path, cow_id));
362                    return Ok(None);
363                }
364                let (sep_key, right_id) =
365                    split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
366                self.root = propagate_split_up(
367                    pages,
368                    alloc,
369                    txn_id,
370                    &cached_path,
371                    cow_id,
372                    &sep_key,
373                    right_id,
374                    &mut self.depth,
375                );
376                self.last_insert = None;
377                self.entry_count += 1;
378                return Ok(None);
379            }
380            self.last_insert = Some((cached_path, cached_leaf));
381        }
382
383        let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
384
385        let existing_value = {
386            let page = pages.get(&leaf_id).unwrap();
387            match leaf_node::search(page, key) {
388                Ok(idx) => {
389                    let cell = leaf_node::read_cell(page, idx);
390                    if matches!(cell.val_type, ValueType::Tombstone) {
391                        None
392                    } else {
393                        Some(cell.value.to_vec())
394                    }
395                }
396                Err(_) => None,
397            }
398        };
399        if let Some(v) = existing_value {
400            return Ok(Some(v));
401        }
402
403        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
404        let leaf_ok = {
405            let page = pages.get_mut(&new_leaf_id).unwrap();
406            leaf_node::insert_direct(page, key, val_type, value)
407        };
408
409        if leaf_ok {
410            if alloc.in_place() && new_leaf_id == leaf_id {
411                let mut is_rightmost = true;
412                for &(ancestor_id, child_idx) in path.iter().rev() {
413                    let page = pages.get(&ancestor_id).unwrap();
414                    if child_idx != page.num_cells() as usize {
415                        is_rightmost = false;
416                        break;
417                    }
418                }
419                if is_rightmost {
420                    self.last_insert = Some((path, new_leaf_id));
421                }
422                self.entry_count += 1;
423                return Ok(None);
424            }
425            let mut child = new_leaf_id;
426            let mut is_rightmost = true;
427            let mut new_path = path;
428            for i in (0..new_path.len()).rev() {
429                let (ancestor_id, child_idx) = new_path[i];
430                let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
431                let page = pages.get_mut(&new_ancestor).unwrap();
432                update_branch_child(page, child_idx, child);
433                if child_idx != page.num_cells() as usize {
434                    is_rightmost = false;
435                }
436                new_path[i] = (new_ancestor, child_idx);
437                child = new_ancestor;
438            }
439            self.root = child;
440
441            if is_rightmost {
442                self.last_insert = Some((new_path, new_leaf_id));
443            }
444            self.entry_count += 1;
445            return Ok(None);
446        }
447
448        self.last_insert = None;
449        let (sep_key, right_id) =
450            split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
451        self.root = propagate_split_up(
452            pages,
453            alloc,
454            txn_id,
455            &path,
456            new_leaf_id,
457            &sep_key,
458            right_id,
459            &mut self.depth,
460        );
461        self.entry_count += 1;
462        Ok(None)
463    }
464
465    #[inline]
466    pub fn insert_if_absent(
467        &mut self,
468        pages: &mut FxHashMap<PageId, Page>,
469        alloc: &mut PageAllocator,
470        txn_id: TxnId,
471        key: &[u8],
472        val_type: ValueType,
473        value: &[u8],
474    ) -> Result<bool> {
475        if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
476            let (hit, needs_cow) = {
477                let page = pages
478                    .get(&cached_leaf)
479                    .ok_or(Error::PageOutOfBounds(cached_leaf))?;
480                let n = page.num_cells();
481                let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
482                let nc = page.txn_id() != txn_id;
483                (h, nc)
484            };
485            if hit {
486                let cow_id = if needs_cow {
487                    cow_page(pages, alloc, cached_leaf, txn_id)
488                } else {
489                    cached_leaf
490                };
491                let ok = {
492                    let page = pages.get_mut(&cow_id).unwrap();
493                    leaf_node::insert_direct(page, key, val_type, value)
494                };
495                if ok {
496                    if cow_id != cached_leaf {
497                        self.root =
498                            propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
499                    }
500                    self.entry_count += 1;
501                    self.last_insert = Some((cached_path, cow_id));
502                    return Ok(true);
503                }
504                let (sep_key, right_id) =
505                    split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
506                self.root = propagate_split_up(
507                    pages,
508                    alloc,
509                    txn_id,
510                    &cached_path,
511                    cow_id,
512                    &sep_key,
513                    right_id,
514                    &mut self.depth,
515                );
516                self.last_insert = None;
517                self.entry_count += 1;
518                return Ok(true);
519            }
520            self.last_insert = Some((cached_path, cached_leaf));
521        }
522
523        let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
524        self.insert_if_absent_at_leaf(pages, alloc, txn_id, key, val_type, value, path, leaf_id)
525    }
526
527    #[allow(clippy::too_many_arguments)]
528    #[inline]
529    pub fn insert_if_absent_at_leaf(
530        &mut self,
531        pages: &mut FxHashMap<PageId, Page>,
532        alloc: &mut PageAllocator,
533        txn_id: TxnId,
534        key: &[u8],
535        val_type: ValueType,
536        value: &[u8],
537        path: Vec<(PageId, usize)>,
538        leaf_id: PageId,
539    ) -> Result<bool> {
540        let exists = {
541            let page = pages.get(&leaf_id).unwrap();
542            match leaf_node::search(page, key) {
543                Ok(idx) => {
544                    let cell = leaf_node::read_cell(page, idx);
545                    !matches!(cell.val_type, ValueType::Tombstone)
546                }
547                Err(_) => false,
548            }
549        };
550        if exists {
551            return Ok(false);
552        }
553
554        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
555        let leaf_ok = {
556            let page = pages.get_mut(&new_leaf_id).unwrap();
557            leaf_node::insert_direct(page, key, val_type, value)
558        };
559
560        if leaf_ok {
561            if alloc.in_place() && new_leaf_id == leaf_id {
562                let mut is_rightmost = true;
563                for &(ancestor_id, child_idx) in path.iter().rev() {
564                    let page = pages.get(&ancestor_id).unwrap();
565                    if child_idx != page.num_cells() as usize {
566                        is_rightmost = false;
567                        break;
568                    }
569                }
570                if is_rightmost {
571                    self.last_insert = Some((path, new_leaf_id));
572                }
573                self.entry_count += 1;
574                return Ok(true);
575            }
576            let mut child = new_leaf_id;
577            let mut is_rightmost = true;
578            let mut new_path = path;
579            for i in (0..new_path.len()).rev() {
580                let (ancestor_id, child_idx) = new_path[i];
581                let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
582                let page = pages.get_mut(&new_ancestor).unwrap();
583                update_branch_child(page, child_idx, child);
584                if child_idx != page.num_cells() as usize {
585                    is_rightmost = false;
586                }
587                new_path[i] = (new_ancestor, child_idx);
588                child = new_ancestor;
589            }
590            self.root = child;
591
592            if is_rightmost {
593                self.last_insert = Some((new_path, new_leaf_id));
594            }
595            self.entry_count += 1;
596            return Ok(true);
597        }
598
599        self.last_insert = None;
600        let (sep_key, right_id) =
601            split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
602        self.root = propagate_split_up(
603            pages,
604            alloc,
605            txn_id,
606            &path,
607            new_leaf_id,
608            &sep_key,
609            right_id,
610            &mut self.depth,
611        );
612        self.entry_count += 1;
613        Ok(true)
614    }
615
616    #[allow(clippy::too_many_arguments)]
617    #[inline]
618    pub fn upsert_with<F, E>(
619        &mut self,
620        pages: &mut FxHashMap<PageId, Page>,
621        alloc: &mut PageAllocator,
622        txn_id: TxnId,
623        key: &[u8],
624        val_type: ValueType,
625        default_value: &[u8],
626        f: F,
627    ) -> std::result::Result<UpsertOutcome, E>
628    where
629        F: FnMut(&[u8]) -> std::result::Result<UpsertAction, E>,
630        E: From<Error>,
631    {
632        if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
633            let (hit, needs_cow) = {
634                let page = pages
635                    .get(&cached_leaf)
636                    .ok_or(Error::PageOutOfBounds(cached_leaf))?;
637                let n = page.num_cells();
638                let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
639                let nc = page.txn_id() != txn_id;
640                (h, nc)
641            };
642            if hit {
643                let cow_id = if needs_cow {
644                    cow_page(pages, alloc, cached_leaf, txn_id)
645                } else {
646                    cached_leaf
647                };
648                let ok = {
649                    let page = pages.get_mut(&cow_id).unwrap();
650                    leaf_node::insert_direct(page, key, val_type, default_value)
651                };
652                if ok {
653                    if cow_id != cached_leaf {
654                        self.root =
655                            propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
656                    }
657                    self.entry_count += 1;
658                    self.last_insert = Some((cached_path, cow_id));
659                    return Ok(UpsertOutcome::Inserted);
660                }
661                let (sep_key, right_id) = split_leaf_with_insert(
662                    pages,
663                    alloc,
664                    txn_id,
665                    cow_id,
666                    key,
667                    val_type,
668                    default_value,
669                );
670                self.root = propagate_split_up(
671                    pages,
672                    alloc,
673                    txn_id,
674                    &cached_path,
675                    cow_id,
676                    &sep_key,
677                    right_id,
678                    &mut self.depth,
679                );
680                self.last_insert = None;
681                self.entry_count += 1;
682                return Ok(UpsertOutcome::Inserted);
683            }
684            self.last_insert = Some((cached_path, cached_leaf));
685        }
686
687        let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
688        self.upsert_with_at_leaf(
689            pages,
690            alloc,
691            txn_id,
692            key,
693            val_type,
694            default_value,
695            path,
696            leaf_id,
697            f,
698        )
699    }
700
701    #[allow(clippy::too_many_arguments)]
702    #[inline]
703    pub fn upsert_with_at_leaf<F, E>(
704        &mut self,
705        pages: &mut FxHashMap<PageId, Page>,
706        alloc: &mut PageAllocator,
707        txn_id: TxnId,
708        key: &[u8],
709        val_type: ValueType,
710        default_value: &[u8],
711        path: Vec<(PageId, usize)>,
712        leaf_id: PageId,
713        mut f: F,
714    ) -> std::result::Result<UpsertOutcome, E>
715    where
716        F: FnMut(&[u8]) -> std::result::Result<UpsertAction, E>,
717        E: From<Error>,
718    {
719        let action = {
720            let page = pages.get(&leaf_id).unwrap();
721            match leaf_node::search(page, key) {
722                Ok(idx) => {
723                    let cell = leaf_node::read_cell(page, idx);
724                    if matches!(cell.val_type, ValueType::Tombstone) {
725                        None
726                    } else {
727                        Some(f(cell.value)?)
728                    }
729                }
730                Err(_) => None,
731            }
732        };
733
734        if let Some(act) = action {
735            match act {
736                UpsertAction::Skip => return Ok(UpsertOutcome::Skipped),
737                UpsertAction::Replace(new_bytes) => {
738                    let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
739                    let leaf_ok = {
740                        let page = pages.get_mut(&new_leaf_id).unwrap();
741                        leaf_node::insert_direct(page, key, val_type, &new_bytes)
742                    };
743                    if leaf_ok {
744                        if new_leaf_id != leaf_id {
745                            let mut new_path = path;
746                            self.root =
747                                propagate_cow_up(pages, alloc, txn_id, &mut new_path, new_leaf_id);
748                        }
749                        return Ok(UpsertOutcome::Updated);
750                    }
751                    self.last_insert = None;
752                    let (sep_key, right_id) = split_leaf_with_insert(
753                        pages,
754                        alloc,
755                        txn_id,
756                        new_leaf_id,
757                        key,
758                        val_type,
759                        &new_bytes,
760                    );
761                    self.root = propagate_split_up(
762                        pages,
763                        alloc,
764                        txn_id,
765                        &path,
766                        new_leaf_id,
767                        &sep_key,
768                        right_id,
769                        &mut self.depth,
770                    );
771                    return Ok(UpsertOutcome::Updated);
772                }
773            }
774        }
775
776        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
777        let leaf_ok = {
778            let page = pages.get_mut(&new_leaf_id).unwrap();
779            leaf_node::insert_direct(page, key, val_type, default_value)
780        };
781
782        if leaf_ok {
783            if alloc.in_place() && new_leaf_id == leaf_id {
784                let mut is_rightmost = true;
785                for &(ancestor_id, child_idx) in path.iter().rev() {
786                    let page = pages.get(&ancestor_id).unwrap();
787                    if child_idx != page.num_cells() as usize {
788                        is_rightmost = false;
789                        break;
790                    }
791                }
792                if is_rightmost {
793                    self.last_insert = Some((path, new_leaf_id));
794                }
795                self.entry_count += 1;
796                return Ok(UpsertOutcome::Inserted);
797            }
798            let mut child = new_leaf_id;
799            let mut is_rightmost = true;
800            let mut new_path = path;
801            for i in (0..new_path.len()).rev() {
802                let (ancestor_id, child_idx) = new_path[i];
803                let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
804                let page = pages.get_mut(&new_ancestor).unwrap();
805                update_branch_child(page, child_idx, child);
806                if child_idx != page.num_cells() as usize {
807                    is_rightmost = false;
808                }
809                new_path[i] = (new_ancestor, child_idx);
810                child = new_ancestor;
811            }
812            self.root = child;
813
814            if is_rightmost {
815                self.last_insert = Some((new_path, new_leaf_id));
816            }
817            self.entry_count += 1;
818            return Ok(UpsertOutcome::Inserted);
819        }
820
821        self.last_insert = None;
822        let (sep_key, right_id) = split_leaf_with_insert(
823            pages,
824            alloc,
825            txn_id,
826            new_leaf_id,
827            key,
828            val_type,
829            default_value,
830        );
831        self.root = propagate_split_up(
832            pages,
833            alloc,
834            txn_id,
835            &path,
836            new_leaf_id,
837            &sep_key,
838            right_id,
839            &mut self.depth,
840        );
841        self.entry_count += 1;
842        Ok(UpsertOutcome::Inserted)
843    }
844
845    /// Bulk-update existing keys. Keys must be sorted.
846    pub fn update_sorted(
847        &mut self,
848        pages: &mut FxHashMap<PageId, Page>,
849        alloc: &mut PageAllocator,
850        txn_id: TxnId,
851        pairs: &[(&[u8], &[u8])],
852    ) -> Result<u64> {
853        if pairs.is_empty() {
854            return Ok(0);
855        }
856        self.last_insert = None;
857
858        let (mut path, mut leaf_id) = self.walk_to_leaf(pages, pairs[0].0)?;
859        let mut cow_leaf = cow_page(pages, alloc, leaf_id, txn_id);
860        if cow_leaf != leaf_id {
861            self.root = propagate_cow_up(pages, alloc, txn_id, &mut path, cow_leaf);
862        }
863
864        let mut count: u64 = 0;
865        let mut hint: u16 = 0;
866
867        for &(key, value) in pairs {
868            let past_leaf = {
869                let page = pages.get(&cow_leaf).unwrap();
870                let n = page.num_cells();
871                n == 0 || key > leaf_node::read_cell(page, n - 1).key
872            };
873
874            if past_leaf {
875                let (new_path, new_leaf) = self.walk_to_leaf(pages, key)?;
876                path = new_path;
877                leaf_id = new_leaf;
878                cow_leaf = cow_page(pages, alloc, leaf_id, txn_id);
879                if cow_leaf != leaf_id {
880                    self.root = propagate_cow_up(pages, alloc, txn_id, &mut path, cow_leaf);
881                }
882                hint = 0;
883            }
884
885            let page = pages.get(&cow_leaf).unwrap();
886            let n = page.num_cells();
887            let idx = {
888                let mut i = hint;
889                loop {
890                    if i >= n {
891                        break None;
892                    }
893                    let cell = leaf_node::read_cell(page, i);
894                    match key.cmp(cell.key) {
895                        std::cmp::Ordering::Equal => break Some(i),
896                        std::cmp::Ordering::Less => break None,
897                        std::cmp::Ordering::Greater => i += 1,
898                    }
899                }
900            };
901
902            if let Some(idx) = idx {
903                hint = idx + 1;
904                let page = pages.get_mut(&cow_leaf).unwrap();
905                if !leaf_node::update_value_in_place(page, idx, ValueType::Inline, value) {
906                    leaf_node::insert_direct(page, key, ValueType::Inline, value);
907                }
908                count += 1;
909            }
910        }
911
912        Ok(count)
913    }
914
915    /// Delete a key. Returns `true` if the key was found and deleted.
916    pub fn delete(
917        &mut self,
918        pages: &mut FxHashMap<PageId, Page>,
919        alloc: &mut PageAllocator,
920        txn_id: TxnId,
921        key: &[u8],
922    ) -> Result<bool> {
923        let (mut path, leaf_id) = self.walk_to_leaf(pages, key)?;
924        self.delete_at_leaf(pages, alloc, txn_id, key, &mut path, leaf_id)
925    }
926
927    #[allow(clippy::ptr_arg)]
928    pub fn delete_at_leaf(
929        &mut self,
930        pages: &mut FxHashMap<PageId, Page>,
931        alloc: &mut PageAllocator,
932        txn_id: TxnId,
933        key: &[u8],
934        path: &mut Vec<(PageId, usize)>,
935        leaf_id: PageId,
936    ) -> Result<bool> {
937        self.last_insert = None;
938
939        let found = {
940            let page = pages.get(&leaf_id).unwrap();
941            leaf_node::search(page, key).is_ok()
942        };
943        if !found {
944            return Ok(false);
945        }
946
947        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
948        {
949            let page = pages.get_mut(&new_leaf_id).unwrap();
950            leaf_node::delete(page, key);
951        }
952
953        let leaf_empty = pages.get(&new_leaf_id).unwrap().num_cells() == 0;
954
955        if !leaf_empty || path.is_empty() {
956            if alloc.in_place() && new_leaf_id == leaf_id {
957                self.entry_count -= 1;
958                return Ok(true);
959            }
960            self.root = propagate_cow_up(pages, alloc, txn_id, path, new_leaf_id);
961            self.entry_count -= 1;
962            return Ok(true);
963        }
964
965        alloc.free(new_leaf_id);
966        pages.remove(&new_leaf_id);
967
968        self.root = propagate_remove_up(pages, alloc, txn_id, path, &mut self.depth);
969        self.entry_count -= 1;
970        Ok(true)
971    }
972
973    /// Walk root to leaf for `key`. Returns (path, leaf_page_id).
974    pub fn walk_to_leaf(
975        &self,
976        pages: &FxHashMap<PageId, Page>,
977        key: &[u8],
978    ) -> Result<(Vec<(PageId, usize)>, PageId)> {
979        let mut path = Vec::with_capacity(self.depth as usize);
980        let mut current = self.root;
981        loop {
982            let page = pages.get(&current).ok_or(Error::PageOutOfBounds(current))?;
983            match page.page_type() {
984                Some(PageType::Leaf) => return Ok((path, current)),
985                Some(PageType::Branch) => {
986                    let child_idx = branch_node::search_child_index(page, key);
987                    let child = branch_node::get_child(page, child_idx);
988                    path.push((current, child_idx));
989                    current = child;
990                }
991                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
992            }
993        }
994    }
995}
996
997/// CoW a page. No-op if already owned by this txn. In-place mode reuses page ID.
998pub fn cow_page(
999    pages: &mut FxHashMap<PageId, Page>,
1000    alloc: &mut PageAllocator,
1001    old_id: PageId,
1002    txn_id: TxnId,
1003) -> PageId {
1004    if alloc.in_place() {
1005        let page = pages.get_mut(&old_id).unwrap();
1006        if page.txn_id() != txn_id {
1007            page.set_txn_id(txn_id);
1008        }
1009        return old_id;
1010    }
1011    let mut new_page = {
1012        let page = pages.get(&old_id).unwrap();
1013        if page.txn_id() == txn_id {
1014            return old_id;
1015        }
1016        page.clone()
1017    };
1018    let new_id = alloc.allocate();
1019    new_page.set_page_id(new_id);
1020    new_page.set_txn_id(txn_id);
1021    pages.insert(new_id, new_page);
1022    alloc.free(old_id);
1023    new_id
1024}
1025
1026/// Update a branch's child pointer at `child_idx` to point to `new_child`.
1027fn update_branch_child(page: &mut Page, child_idx: usize, new_child: PageId) {
1028    let n = page.num_cells() as usize;
1029    if child_idx < n {
1030        let offset = page.cell_offset(child_idx as u16) as usize;
1031        page.data[offset..offset + 4].copy_from_slice(&new_child.as_u32().to_le_bytes());
1032    } else {
1033        page.set_right_child(new_child);
1034    }
1035}
1036
1037pub fn propagate_cow_up(
1038    pages: &mut FxHashMap<PageId, Page>,
1039    alloc: &mut PageAllocator,
1040    txn_id: TxnId,
1041    path: &mut [(PageId, usize)],
1042    mut new_child: PageId,
1043) -> PageId {
1044    for i in (0..path.len()).rev() {
1045        let (ancestor_id, child_idx) = path[i];
1046        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
1047        let page = pages.get_mut(&new_ancestor).unwrap();
1048        update_branch_child(page, child_idx, new_child);
1049        path[i] = (new_ancestor, child_idx);
1050        new_child = new_ancestor;
1051    }
1052    new_child
1053}
1054
1055/// Split full leaf and insert. Returns (separator_key, right_page_id).
1056fn split_leaf_with_insert(
1057    pages: &mut FxHashMap<PageId, Page>,
1058    alloc: &mut PageAllocator,
1059    txn_id: TxnId,
1060    leaf_id: PageId,
1061    key: &[u8],
1062    val_type: ValueType,
1063    value: &[u8],
1064) -> (Vec<u8>, PageId) {
1065    let mut cells: Vec<(Vec<u8>, Vec<u8>)> = {
1066        let page = pages.get(&leaf_id).unwrap();
1067        let n = page.num_cells() as usize;
1068        (0..n)
1069            .map(|i| {
1070                let cell = leaf_node::read_cell(page, i as u16);
1071                let raw = leaf_node::read_cell_bytes(page, i as u16);
1072                (cell.key.to_vec(), raw)
1073            })
1074            .collect()
1075    };
1076
1077    let new_raw = leaf_node::build_cell(key, val_type, value);
1078    match cells.binary_search_by(|(k, _)| k.as_slice().cmp(key)) {
1079        Ok(idx) => cells[idx] = (key.to_vec(), new_raw),
1080        Err(idx) => cells.insert(idx, (key.to_vec(), new_raw)),
1081    }
1082
1083    let total = cells.len();
1084
1085    let usable = citadel_core::constants::USABLE_SIZE;
1086    let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
1087    cum.push(0);
1088    for (_, raw) in &cells {
1089        cum.push(cum.last().unwrap() + raw.len());
1090    }
1091    let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
1092    let right_fits = |sp: usize| (cum[total] - cum[sp]) + (total - sp) * 2 <= usable;
1093
1094    let mut split_point = total / 2;
1095    if !left_fits(split_point) || !right_fits(split_point) {
1096        split_point = 1;
1097        for sp in 1..total {
1098            if left_fits(sp) && right_fits(sp) {
1099                split_point = sp;
1100                if sp >= total / 2 {
1101                    break;
1102                }
1103            }
1104        }
1105    }
1106
1107    let sep_key = cells[split_point].0.clone();
1108
1109    {
1110        let left_refs: Vec<&[u8]> = cells[..split_point]
1111            .iter()
1112            .map(|(_, raw)| raw.as_slice())
1113            .collect();
1114        let page = pages.get_mut(&leaf_id).unwrap();
1115        page.rebuild_cells(&left_refs);
1116    }
1117
1118    let right_id = alloc.allocate();
1119    {
1120        let mut right_page = Page::new(right_id, PageType::Leaf, txn_id);
1121        let right_refs: Vec<&[u8]> = cells[split_point..]
1122            .iter()
1123            .map(|(_, raw)| raw.as_slice())
1124            .collect();
1125        right_page.rebuild_cells(&right_refs);
1126        pages.insert(right_id, right_page);
1127    }
1128
1129    (sep_key, right_id)
1130}
1131
1132#[allow(clippy::too_many_arguments)]
1133fn propagate_split_up(
1134    pages: &mut FxHashMap<PageId, Page>,
1135    alloc: &mut PageAllocator,
1136    txn_id: TxnId,
1137    path: &[(PageId, usize)],
1138    mut left_child: PageId,
1139    initial_sep: &[u8],
1140    mut right_child: PageId,
1141    depth: &mut u16,
1142) -> PageId {
1143    let mut sep_key = initial_sep.to_vec();
1144    let mut pending_split = true;
1145
1146    for &(ancestor_id, child_idx) in path.iter().rev() {
1147        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
1148
1149        if pending_split {
1150            let ok = {
1151                let page = pages.get_mut(&new_ancestor).unwrap();
1152                branch_node::insert_separator(page, child_idx, left_child, &sep_key, right_child)
1153            };
1154
1155            if ok {
1156                pending_split = false;
1157                left_child = new_ancestor;
1158            } else {
1159                let (new_sep, new_right) = split_branch_with_insert(
1160                    pages,
1161                    alloc,
1162                    txn_id,
1163                    new_ancestor,
1164                    child_idx,
1165                    left_child,
1166                    &sep_key,
1167                    right_child,
1168                );
1169                left_child = new_ancestor;
1170                sep_key = new_sep;
1171                right_child = new_right;
1172            }
1173        } else {
1174            let page = pages.get_mut(&new_ancestor).unwrap();
1175            update_branch_child(page, child_idx, left_child);
1176            left_child = new_ancestor;
1177        }
1178    }
1179
1180    if pending_split {
1181        let new_root_id = alloc.allocate();
1182        let mut new_root = Page::new(new_root_id, PageType::Branch, txn_id);
1183        let cell = branch_node::build_cell(left_child, &sep_key);
1184        new_root.write_cell(&cell).unwrap();
1185        new_root.set_right_child(right_child);
1186        pages.insert(new_root_id, new_root);
1187        *depth += 1;
1188        new_root_id
1189    } else {
1190        left_child
1191    }
1192}
1193
1194#[allow(clippy::too_many_arguments)]
1195fn split_branch_with_insert(
1196    pages: &mut FxHashMap<PageId, Page>,
1197    alloc: &mut PageAllocator,
1198    txn_id: TxnId,
1199    branch_id: PageId,
1200    child_idx: usize,
1201    new_left: PageId,
1202    sep_key: &[u8],
1203    new_right: PageId,
1204) -> (Vec<u8>, PageId) {
1205    let (new_cells, final_right_child) = {
1206        let page = pages.get(&branch_id).unwrap();
1207        let n = page.num_cells() as usize;
1208        let cells: Vec<(PageId, Vec<u8>)> = (0..n)
1209            .map(|i| {
1210                let cell = branch_node::read_cell(page, i as u16);
1211                (cell.child, cell.key.to_vec())
1212            })
1213            .collect();
1214        let old_rc = page.right_child();
1215
1216        let mut result = Vec::with_capacity(n + 1);
1217        let final_rc;
1218
1219        if child_idx < n {
1220            let old_key = cells[child_idx].1.clone();
1221            for (i, (child, key)) in cells.into_iter().enumerate() {
1222                if i == child_idx {
1223                    result.push((new_left, sep_key.to_vec()));
1224                    result.push((new_right, old_key.clone()));
1225                } else {
1226                    result.push((child, key));
1227                }
1228            }
1229            final_rc = old_rc;
1230        } else {
1231            result = cells;
1232            result.push((new_left, sep_key.to_vec()));
1233            final_rc = new_right;
1234        }
1235
1236        (result, final_rc)
1237    };
1238
1239    let total = new_cells.len();
1240    let usable = citadel_core::constants::USABLE_SIZE;
1241    let raw_sizes: Vec<usize> = new_cells.iter().map(|(_, key)| 6 + key.len()).collect();
1242    let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
1243    cum.push(0);
1244    for &sz in &raw_sizes {
1245        cum.push(cum.last().unwrap() + sz);
1246    }
1247    let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
1248    let right_fits = |sp: usize| {
1249        let right_count = total - sp - 1;
1250        (cum[total] - cum[sp + 1]) + right_count * 2 <= usable
1251    };
1252
1253    let mut split_point = total / 2;
1254    if !left_fits(split_point) || !right_fits(split_point) {
1255        split_point = 1;
1256        for sp in 1..total.saturating_sub(1) {
1257            if left_fits(sp) && right_fits(sp) {
1258                split_point = sp;
1259                if sp >= total / 2 {
1260                    break;
1261                }
1262            }
1263        }
1264    }
1265
1266    let promoted_sep = new_cells[split_point].1.clone();
1267    let promoted_child = new_cells[split_point].0;
1268
1269    {
1270        let left_raw: Vec<Vec<u8>> = new_cells[..split_point]
1271            .iter()
1272            .map(|(child, key)| branch_node::build_cell(*child, key))
1273            .collect();
1274        let left_refs: Vec<&[u8]> = left_raw.iter().map(|c| c.as_slice()).collect();
1275        let page = pages.get_mut(&branch_id).unwrap();
1276        page.rebuild_cells(&left_refs);
1277        page.set_right_child(promoted_child);
1278    }
1279
1280    let right_branch_id = alloc.allocate();
1281    {
1282        let mut right_page = Page::new(right_branch_id, PageType::Branch, txn_id);
1283        let right_raw: Vec<Vec<u8>> = new_cells[split_point + 1..]
1284            .iter()
1285            .map(|(child, key)| branch_node::build_cell(*child, key))
1286            .collect();
1287        let right_refs: Vec<&[u8]> = right_raw.iter().map(|c| c.as_slice()).collect();
1288        right_page.rebuild_cells(&right_refs);
1289        right_page.set_right_child(final_right_child);
1290        pages.insert(right_branch_id, right_page);
1291    }
1292
1293    (promoted_sep, right_branch_id)
1294}
1295
1296fn remove_child_from_branch(page: &mut Page, child_idx: usize) {
1297    let n = page.num_cells() as usize;
1298    if child_idx < n {
1299        let cell_sz = branch_node::get_cell_size(page, child_idx as u16);
1300        page.delete_cell_at(child_idx as u16, cell_sz);
1301    } else {
1302        assert!(n > 0, "cannot remove right_child from branch with 0 cells");
1303        let last_child = branch_node::read_cell(page, (n - 1) as u16).child;
1304        let cell_sz = branch_node::get_cell_size(page, (n - 1) as u16);
1305        page.delete_cell_at((n - 1) as u16, cell_sz);
1306        page.set_right_child(last_child);
1307    }
1308}
1309
1310fn propagate_remove_up(
1311    pages: &mut FxHashMap<PageId, Page>,
1312    alloc: &mut PageAllocator,
1313    txn_id: TxnId,
1314    path: &mut [(PageId, usize)],
1315    depth: &mut u16,
1316) -> PageId {
1317    let mut level = path.len();
1318    let mut need_remove_at_level = true;
1319    let mut new_child = PageId(0);
1320
1321    while level > 0 && need_remove_at_level {
1322        level -= 1;
1323        let (ancestor_id, child_idx) = path[level];
1324        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
1325
1326        {
1327            let page = pages.get_mut(&new_ancestor).unwrap();
1328            remove_child_from_branch(page, child_idx);
1329        }
1330
1331        let num_cells = pages.get(&new_ancestor).unwrap().num_cells();
1332
1333        if num_cells > 0 || level == 0 {
1334            if num_cells == 0 && level == 0 {
1335                let only_child = pages.get(&new_ancestor).unwrap().right_child();
1336                alloc.free(new_ancestor);
1337                pages.remove(&new_ancestor);
1338                *depth -= 1;
1339                return only_child;
1340            }
1341            new_child = new_ancestor;
1342            need_remove_at_level = false;
1343        } else {
1344            let only_child = pages.get(&new_ancestor).unwrap().right_child();
1345            alloc.free(new_ancestor);
1346            pages.remove(&new_ancestor);
1347            *depth -= 1;
1348
1349            new_child = only_child;
1350            need_remove_at_level = false;
1351        }
1352    }
1353
1354    if level > 0 {
1355        let remaining_path = &mut path[..level];
1356        new_child = propagate_cow_up(pages, alloc, txn_id, remaining_path, new_child);
1357    }
1358
1359    new_child
1360}
1361
1362#[cfg(test)]
1363#[path = "btree_tests.rs"]
1364mod tests;