Skip to main content

citadel_buffer/
btree.rs

1//! CoW B+ tree engine. Mutations clone pages; old pages go to pending-free list.
2
3use crate::allocator::PageAllocator;
4use citadel_core::types::{PageId, PageType, TxnId, ValueType};
5use citadel_core::{Error, Result};
6use citadel_page::page::Page;
7use citadel_page::{branch_node, leaf_node};
8use rustc_hash::FxHashMap;
9
10/// B+ tree metadata. Pages stored externally.
11#[derive(Clone)]
12pub struct BTree {
13    pub root: PageId,
14    pub depth: u16,
15    pub entry_count: u64,
16    last_insert: Option<(Vec<(PageId, usize)>, PageId)>,
17}
18
19#[derive(Debug, Clone)]
20pub enum UpsertOutcome {
21    Inserted,
22    Updated,
23    Skipped,
24}
25
26#[derive(Debug, Clone)]
27pub enum UpsertAction {
28    Replace(Vec<u8>),
29    Skip,
30}
31
32impl BTree {
33    /// Create a new empty B+ tree with a single leaf root.
34    pub fn new(
35        pages: &mut FxHashMap<PageId, Page>,
36        alloc: &mut PageAllocator,
37        txn_id: TxnId,
38    ) -> Self {
39        let root_id = alloc.allocate();
40        let root = Page::new(root_id, PageType::Leaf, txn_id);
41        pages.insert(root_id, root);
42        Self {
43            root: root_id,
44            depth: 1,
45            entry_count: 0,
46            last_insert: None,
47        }
48    }
49
50    /// Create a BTree from existing metadata (e.g., loaded from commit slot).
51    pub fn from_existing(root: PageId, depth: u16, entry_count: u64) -> Self {
52        Self {
53            root,
54            depth,
55            entry_count,
56            last_insert: None,
57        }
58    }
59
60    /// Search for a key. Returns `Some((val_type, value))` if found, `None` otherwise.
61    pub fn search(
62        &self,
63        pages: &FxHashMap<PageId, Page>,
64        key: &[u8],
65    ) -> Result<Option<(ValueType, Vec<u8>)>> {
66        let mut current = self.root;
67        loop {
68            let page = pages.get(&current).ok_or(Error::PageOutOfBounds(current))?;
69            match page.page_type() {
70                Some(PageType::Leaf) => {
71                    return match leaf_node::search(page, key) {
72                        Ok(idx) => {
73                            let cell = leaf_node::read_cell(page, idx);
74                            Ok(Some((cell.val_type, cell.value.to_vec())))
75                        }
76                        Err(_) => Ok(None),
77                    };
78                }
79                Some(PageType::Branch) => {
80                    let idx = branch_node::search_child_index(page, key);
81                    current = branch_node::get_child(page, idx);
82                }
83                _ => {
84                    return Err(Error::InvalidPageType(page.page_type_raw(), current));
85                }
86            }
87        }
88    }
89
90    pub fn lil_would_hit(&self, pages: &FxHashMap<PageId, Page>, key: &[u8]) -> bool {
91        if let Some((_, cached_leaf)) = &self.last_insert {
92            if let Some(page) = pages.get(cached_leaf) {
93                let n = page.num_cells();
94                return n > 0 && key > leaf_node::read_cell(page, n - 1).key;
95            }
96        }
97        false
98    }
99
100    /// Combined LIL check + insert. Returns `Some(was_new)` on hit, `None` on miss.
101    pub fn try_lil_insert(
102        &mut self,
103        pages: &mut FxHashMap<PageId, Page>,
104        alloc: &mut PageAllocator,
105        txn_id: TxnId,
106        key: &[u8],
107        val_type: ValueType,
108        value: &[u8],
109    ) -> Result<Option<bool>> {
110        let cached_leaf = match self.last_insert.as_ref() {
111            Some((_, leaf)) => *leaf,
112            None => return Ok(None),
113        };
114        let (hit, needs_cow) = {
115            let page = pages
116                .get(&cached_leaf)
117                .ok_or(Error::PageOutOfBounds(cached_leaf))?;
118            let n = page.num_cells();
119            let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
120            let nc = page.txn_id() != txn_id;
121            (h, nc)
122        };
123        if !hit {
124            return Ok(None);
125        }
126        let mut cached_path = self.last_insert.take().unwrap().0;
127        let cow_id = if needs_cow {
128            cow_page(pages, alloc, cached_leaf, txn_id)
129        } else {
130            cached_leaf
131        };
132        let ok = {
133            let page = pages.get_mut(&cow_id).unwrap();
134            leaf_node::insert_append_direct(page, key, val_type, value)
135        };
136        if ok {
137            if cow_id != cached_leaf {
138                self.root = propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
139            }
140            self.entry_count += 1;
141            self.last_insert = Some((cached_path, cow_id));
142            return Ok(Some(true));
143        }
144        let (sep_key, right_id) =
145            split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
146        self.root = propagate_split_up(
147            pages,
148            alloc,
149            txn_id,
150            &cached_path,
151            cow_id,
152            &sep_key,
153            right_id,
154            &mut self.depth,
155        );
156        self.last_insert = None;
157        self.entry_count += 1;
158        Ok(Some(true))
159    }
160
161    /// Insert key-value. Returns `true` if new, `false` if updated existing.
162    pub fn insert(
163        &mut self,
164        pages: &mut FxHashMap<PageId, Page>,
165        alloc: &mut PageAllocator,
166        txn_id: TxnId,
167        key: &[u8],
168        val_type: ValueType,
169        value: &[u8],
170    ) -> Result<bool> {
171        // LIL cache: skip walk_to_leaf for sequential appends to the rightmost leaf.
172        if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
173            let (hit, needs_cow) = {
174                let page = pages
175                    .get(&cached_leaf)
176                    .ok_or(Error::PageOutOfBounds(cached_leaf))?;
177                let n = page.num_cells();
178                let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
179                let nc = page.txn_id() != txn_id;
180                (h, nc)
181            };
182            if hit {
183                let cow_id = if needs_cow {
184                    cow_page(pages, alloc, cached_leaf, txn_id)
185                } else {
186                    cached_leaf
187                };
188                let ok = {
189                    let page = pages.get_mut(&cow_id).unwrap();
190                    leaf_node::insert_direct(page, key, val_type, value)
191                };
192                if ok {
193                    if cow_id != cached_leaf {
194                        self.root =
195                            propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
196                    }
197                    self.entry_count += 1;
198                    self.last_insert = Some((cached_path, cow_id));
199                    return Ok(true);
200                }
201                let (sep_key, right_id) =
202                    split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
203                self.root = propagate_split_up(
204                    pages,
205                    alloc,
206                    txn_id,
207                    &cached_path,
208                    cow_id,
209                    &sep_key,
210                    right_id,
211                    &mut self.depth,
212                );
213                self.last_insert = None;
214                self.entry_count += 1;
215                return Ok(true);
216            }
217        }
218
219        let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
220        self.insert_at_leaf(pages, alloc, txn_id, key, val_type, value, path, leaf_id)
221    }
222
223    #[allow(clippy::too_many_arguments)]
224    #[inline]
225    pub fn insert_at_leaf(
226        &mut self,
227        pages: &mut FxHashMap<PageId, Page>,
228        alloc: &mut PageAllocator,
229        txn_id: TxnId,
230        key: &[u8],
231        val_type: ValueType,
232        value: &[u8],
233        path: Vec<(PageId, usize)>,
234        leaf_id: PageId,
235    ) -> Result<bool> {
236        let key_exists = {
237            let page = pages.get(&leaf_id).unwrap();
238            leaf_node::search(page, key).is_ok()
239        };
240
241        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
242
243        let leaf_ok = {
244            let page = pages.get_mut(&new_leaf_id).unwrap();
245            leaf_node::insert_direct(page, key, val_type, value)
246        };
247
248        if leaf_ok {
249            if alloc.in_place() && new_leaf_id == leaf_id {
250                let mut is_rightmost = true;
251                for &(ancestor_id, child_idx) in path.iter().rev() {
252                    let page = pages.get(&ancestor_id).unwrap();
253                    if child_idx != page.num_cells() as usize {
254                        is_rightmost = false;
255                        break;
256                    }
257                }
258                if is_rightmost {
259                    self.last_insert = Some((path, new_leaf_id));
260                }
261                if !key_exists {
262                    self.entry_count += 1;
263                }
264                return Ok(!key_exists);
265            }
266            let mut child = new_leaf_id;
267            let mut is_rightmost = true;
268            let mut new_path = path;
269            for i in (0..new_path.len()).rev() {
270                let (ancestor_id, child_idx) = new_path[i];
271                let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
272                let page = pages.get_mut(&new_ancestor).unwrap();
273                update_branch_child(page, child_idx, child);
274                if child_idx != page.num_cells() as usize {
275                    is_rightmost = false;
276                }
277                new_path[i] = (new_ancestor, child_idx);
278                child = new_ancestor;
279            }
280            self.root = child;
281
282            if is_rightmost {
283                self.last_insert = Some((new_path, new_leaf_id));
284            }
285
286            if !key_exists {
287                self.entry_count += 1;
288            }
289            return Ok(!key_exists);
290        }
291
292        self.last_insert = None;
293        let (sep_key, right_id) =
294            split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
295        self.root = propagate_split_up(
296            pages,
297            alloc,
298            txn_id,
299            &path,
300            new_leaf_id,
301            &sep_key,
302            right_id,
303            &mut self.depth,
304        );
305
306        if !key_exists {
307            self.entry_count += 1;
308        }
309        Ok(!key_exists)
310    }
311
312    pub fn insert_or_fetch(
313        &mut self,
314        pages: &mut FxHashMap<PageId, Page>,
315        alloc: &mut PageAllocator,
316        txn_id: TxnId,
317        key: &[u8],
318        val_type: ValueType,
319        value: &[u8],
320    ) -> Result<Option<Vec<u8>>> {
321        if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
322            let (hit, needs_cow) = {
323                let page = pages
324                    .get(&cached_leaf)
325                    .ok_or(Error::PageOutOfBounds(cached_leaf))?;
326                let n = page.num_cells();
327                let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
328                let nc = page.txn_id() != txn_id;
329                (h, nc)
330            };
331            if hit {
332                let cow_id = if needs_cow {
333                    cow_page(pages, alloc, cached_leaf, txn_id)
334                } else {
335                    cached_leaf
336                };
337                let ok = {
338                    let page = pages.get_mut(&cow_id).unwrap();
339                    leaf_node::insert_direct(page, key, val_type, value)
340                };
341                if ok {
342                    if cow_id != cached_leaf {
343                        self.root =
344                            propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
345                    }
346                    self.entry_count += 1;
347                    self.last_insert = Some((cached_path, cow_id));
348                    return Ok(None);
349                }
350                let (sep_key, right_id) =
351                    split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
352                self.root = propagate_split_up(
353                    pages,
354                    alloc,
355                    txn_id,
356                    &cached_path,
357                    cow_id,
358                    &sep_key,
359                    right_id,
360                    &mut self.depth,
361                );
362                self.last_insert = None;
363                self.entry_count += 1;
364                return Ok(None);
365            }
366            self.last_insert = Some((cached_path, cached_leaf));
367        }
368
369        let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
370
371        let existing_value = {
372            let page = pages.get(&leaf_id).unwrap();
373            match leaf_node::search(page, key) {
374                Ok(idx) => {
375                    let cell = leaf_node::read_cell(page, idx);
376                    if matches!(cell.val_type, ValueType::Tombstone) {
377                        None
378                    } else {
379                        Some(cell.value.to_vec())
380                    }
381                }
382                Err(_) => None,
383            }
384        };
385        if let Some(v) = existing_value {
386            return Ok(Some(v));
387        }
388
389        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
390        let leaf_ok = {
391            let page = pages.get_mut(&new_leaf_id).unwrap();
392            leaf_node::insert_direct(page, key, val_type, value)
393        };
394
395        if leaf_ok {
396            if alloc.in_place() && new_leaf_id == leaf_id {
397                let mut is_rightmost = true;
398                for &(ancestor_id, child_idx) in path.iter().rev() {
399                    let page = pages.get(&ancestor_id).unwrap();
400                    if child_idx != page.num_cells() as usize {
401                        is_rightmost = false;
402                        break;
403                    }
404                }
405                if is_rightmost {
406                    self.last_insert = Some((path, new_leaf_id));
407                }
408                self.entry_count += 1;
409                return Ok(None);
410            }
411            let mut child = new_leaf_id;
412            let mut is_rightmost = true;
413            let mut new_path = path;
414            for i in (0..new_path.len()).rev() {
415                let (ancestor_id, child_idx) = new_path[i];
416                let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
417                let page = pages.get_mut(&new_ancestor).unwrap();
418                update_branch_child(page, child_idx, child);
419                if child_idx != page.num_cells() as usize {
420                    is_rightmost = false;
421                }
422                new_path[i] = (new_ancestor, child_idx);
423                child = new_ancestor;
424            }
425            self.root = child;
426
427            if is_rightmost {
428                self.last_insert = Some((new_path, new_leaf_id));
429            }
430            self.entry_count += 1;
431            return Ok(None);
432        }
433
434        self.last_insert = None;
435        let (sep_key, right_id) =
436            split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
437        self.root = propagate_split_up(
438            pages,
439            alloc,
440            txn_id,
441            &path,
442            new_leaf_id,
443            &sep_key,
444            right_id,
445            &mut self.depth,
446        );
447        self.entry_count += 1;
448        Ok(None)
449    }
450
451    #[inline]
452    pub fn insert_if_absent(
453        &mut self,
454        pages: &mut FxHashMap<PageId, Page>,
455        alloc: &mut PageAllocator,
456        txn_id: TxnId,
457        key: &[u8],
458        val_type: ValueType,
459        value: &[u8],
460    ) -> Result<bool> {
461        if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
462            let (hit, needs_cow) = {
463                let page = pages
464                    .get(&cached_leaf)
465                    .ok_or(Error::PageOutOfBounds(cached_leaf))?;
466                let n = page.num_cells();
467                let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
468                let nc = page.txn_id() != txn_id;
469                (h, nc)
470            };
471            if hit {
472                let cow_id = if needs_cow {
473                    cow_page(pages, alloc, cached_leaf, txn_id)
474                } else {
475                    cached_leaf
476                };
477                let ok = {
478                    let page = pages.get_mut(&cow_id).unwrap();
479                    leaf_node::insert_direct(page, key, val_type, value)
480                };
481                if ok {
482                    if cow_id != cached_leaf {
483                        self.root =
484                            propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
485                    }
486                    self.entry_count += 1;
487                    self.last_insert = Some((cached_path, cow_id));
488                    return Ok(true);
489                }
490                let (sep_key, right_id) =
491                    split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
492                self.root = propagate_split_up(
493                    pages,
494                    alloc,
495                    txn_id,
496                    &cached_path,
497                    cow_id,
498                    &sep_key,
499                    right_id,
500                    &mut self.depth,
501                );
502                self.last_insert = None;
503                self.entry_count += 1;
504                return Ok(true);
505            }
506            self.last_insert = Some((cached_path, cached_leaf));
507        }
508
509        let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
510        self.insert_if_absent_at_leaf(pages, alloc, txn_id, key, val_type, value, path, leaf_id)
511    }
512
513    #[allow(clippy::too_many_arguments)]
514    #[inline]
515    pub fn insert_if_absent_at_leaf(
516        &mut self,
517        pages: &mut FxHashMap<PageId, Page>,
518        alloc: &mut PageAllocator,
519        txn_id: TxnId,
520        key: &[u8],
521        val_type: ValueType,
522        value: &[u8],
523        path: Vec<(PageId, usize)>,
524        leaf_id: PageId,
525    ) -> Result<bool> {
526        let exists = {
527            let page = pages.get(&leaf_id).unwrap();
528            match leaf_node::search(page, key) {
529                Ok(idx) => {
530                    let cell = leaf_node::read_cell(page, idx);
531                    !matches!(cell.val_type, ValueType::Tombstone)
532                }
533                Err(_) => false,
534            }
535        };
536        if exists {
537            return Ok(false);
538        }
539
540        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
541        let leaf_ok = {
542            let page = pages.get_mut(&new_leaf_id).unwrap();
543            leaf_node::insert_direct(page, key, val_type, value)
544        };
545
546        if leaf_ok {
547            if alloc.in_place() && new_leaf_id == leaf_id {
548                let mut is_rightmost = true;
549                for &(ancestor_id, child_idx) in path.iter().rev() {
550                    let page = pages.get(&ancestor_id).unwrap();
551                    if child_idx != page.num_cells() as usize {
552                        is_rightmost = false;
553                        break;
554                    }
555                }
556                if is_rightmost {
557                    self.last_insert = Some((path, new_leaf_id));
558                }
559                self.entry_count += 1;
560                return Ok(true);
561            }
562            let mut child = new_leaf_id;
563            let mut is_rightmost = true;
564            let mut new_path = path;
565            for i in (0..new_path.len()).rev() {
566                let (ancestor_id, child_idx) = new_path[i];
567                let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
568                let page = pages.get_mut(&new_ancestor).unwrap();
569                update_branch_child(page, child_idx, child);
570                if child_idx != page.num_cells() as usize {
571                    is_rightmost = false;
572                }
573                new_path[i] = (new_ancestor, child_idx);
574                child = new_ancestor;
575            }
576            self.root = child;
577
578            if is_rightmost {
579                self.last_insert = Some((new_path, new_leaf_id));
580            }
581            self.entry_count += 1;
582            return Ok(true);
583        }
584
585        self.last_insert = None;
586        let (sep_key, right_id) =
587            split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
588        self.root = propagate_split_up(
589            pages,
590            alloc,
591            txn_id,
592            &path,
593            new_leaf_id,
594            &sep_key,
595            right_id,
596            &mut self.depth,
597        );
598        self.entry_count += 1;
599        Ok(true)
600    }
601
602    #[allow(clippy::too_many_arguments)]
603    #[inline]
604    pub fn upsert_with<F, E>(
605        &mut self,
606        pages: &mut FxHashMap<PageId, Page>,
607        alloc: &mut PageAllocator,
608        txn_id: TxnId,
609        key: &[u8],
610        val_type: ValueType,
611        default_value: &[u8],
612        f: F,
613    ) -> std::result::Result<UpsertOutcome, E>
614    where
615        F: FnMut(&[u8]) -> std::result::Result<UpsertAction, E>,
616        E: From<Error>,
617    {
618        if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
619            let (hit, needs_cow) = {
620                let page = pages
621                    .get(&cached_leaf)
622                    .ok_or(Error::PageOutOfBounds(cached_leaf))?;
623                let n = page.num_cells();
624                let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
625                let nc = page.txn_id() != txn_id;
626                (h, nc)
627            };
628            if hit {
629                let cow_id = if needs_cow {
630                    cow_page(pages, alloc, cached_leaf, txn_id)
631                } else {
632                    cached_leaf
633                };
634                let ok = {
635                    let page = pages.get_mut(&cow_id).unwrap();
636                    leaf_node::insert_direct(page, key, val_type, default_value)
637                };
638                if ok {
639                    if cow_id != cached_leaf {
640                        self.root =
641                            propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
642                    }
643                    self.entry_count += 1;
644                    self.last_insert = Some((cached_path, cow_id));
645                    return Ok(UpsertOutcome::Inserted);
646                }
647                let (sep_key, right_id) = split_leaf_with_insert(
648                    pages,
649                    alloc,
650                    txn_id,
651                    cow_id,
652                    key,
653                    val_type,
654                    default_value,
655                );
656                self.root = propagate_split_up(
657                    pages,
658                    alloc,
659                    txn_id,
660                    &cached_path,
661                    cow_id,
662                    &sep_key,
663                    right_id,
664                    &mut self.depth,
665                );
666                self.last_insert = None;
667                self.entry_count += 1;
668                return Ok(UpsertOutcome::Inserted);
669            }
670            self.last_insert = Some((cached_path, cached_leaf));
671        }
672
673        let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
674        self.upsert_with_at_leaf(
675            pages,
676            alloc,
677            txn_id,
678            key,
679            val_type,
680            default_value,
681            path,
682            leaf_id,
683            f,
684        )
685    }
686
687    #[allow(clippy::too_many_arguments)]
688    #[inline]
689    pub fn upsert_with_at_leaf<F, E>(
690        &mut self,
691        pages: &mut FxHashMap<PageId, Page>,
692        alloc: &mut PageAllocator,
693        txn_id: TxnId,
694        key: &[u8],
695        val_type: ValueType,
696        default_value: &[u8],
697        path: Vec<(PageId, usize)>,
698        leaf_id: PageId,
699        mut f: F,
700    ) -> std::result::Result<UpsertOutcome, E>
701    where
702        F: FnMut(&[u8]) -> std::result::Result<UpsertAction, E>,
703        E: From<Error>,
704    {
705        let action = {
706            let page = pages.get(&leaf_id).unwrap();
707            match leaf_node::search(page, key) {
708                Ok(idx) => {
709                    let cell = leaf_node::read_cell(page, idx);
710                    if matches!(cell.val_type, ValueType::Tombstone) {
711                        None
712                    } else {
713                        Some(f(cell.value)?)
714                    }
715                }
716                Err(_) => None,
717            }
718        };
719
720        if let Some(act) = action {
721            match act {
722                UpsertAction::Skip => return Ok(UpsertOutcome::Skipped),
723                UpsertAction::Replace(new_bytes) => {
724                    let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
725                    let leaf_ok = {
726                        let page = pages.get_mut(&new_leaf_id).unwrap();
727                        leaf_node::insert_direct(page, key, val_type, &new_bytes)
728                    };
729                    if leaf_ok {
730                        if new_leaf_id != leaf_id {
731                            let mut new_path = path;
732                            self.root =
733                                propagate_cow_up(pages, alloc, txn_id, &mut new_path, new_leaf_id);
734                        }
735                        return Ok(UpsertOutcome::Updated);
736                    }
737                    self.last_insert = None;
738                    let (sep_key, right_id) = split_leaf_with_insert(
739                        pages,
740                        alloc,
741                        txn_id,
742                        new_leaf_id,
743                        key,
744                        val_type,
745                        &new_bytes,
746                    );
747                    self.root = propagate_split_up(
748                        pages,
749                        alloc,
750                        txn_id,
751                        &path,
752                        new_leaf_id,
753                        &sep_key,
754                        right_id,
755                        &mut self.depth,
756                    );
757                    return Ok(UpsertOutcome::Updated);
758                }
759            }
760        }
761
762        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
763        let leaf_ok = {
764            let page = pages.get_mut(&new_leaf_id).unwrap();
765            leaf_node::insert_direct(page, key, val_type, default_value)
766        };
767
768        if leaf_ok {
769            if alloc.in_place() && new_leaf_id == leaf_id {
770                let mut is_rightmost = true;
771                for &(ancestor_id, child_idx) in path.iter().rev() {
772                    let page = pages.get(&ancestor_id).unwrap();
773                    if child_idx != page.num_cells() as usize {
774                        is_rightmost = false;
775                        break;
776                    }
777                }
778                if is_rightmost {
779                    self.last_insert = Some((path, new_leaf_id));
780                }
781                self.entry_count += 1;
782                return Ok(UpsertOutcome::Inserted);
783            }
784            let mut child = new_leaf_id;
785            let mut is_rightmost = true;
786            let mut new_path = path;
787            for i in (0..new_path.len()).rev() {
788                let (ancestor_id, child_idx) = new_path[i];
789                let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
790                let page = pages.get_mut(&new_ancestor).unwrap();
791                update_branch_child(page, child_idx, child);
792                if child_idx != page.num_cells() as usize {
793                    is_rightmost = false;
794                }
795                new_path[i] = (new_ancestor, child_idx);
796                child = new_ancestor;
797            }
798            self.root = child;
799
800            if is_rightmost {
801                self.last_insert = Some((new_path, new_leaf_id));
802            }
803            self.entry_count += 1;
804            return Ok(UpsertOutcome::Inserted);
805        }
806
807        self.last_insert = None;
808        let (sep_key, right_id) = split_leaf_with_insert(
809            pages,
810            alloc,
811            txn_id,
812            new_leaf_id,
813            key,
814            val_type,
815            default_value,
816        );
817        self.root = propagate_split_up(
818            pages,
819            alloc,
820            txn_id,
821            &path,
822            new_leaf_id,
823            &sep_key,
824            right_id,
825            &mut self.depth,
826        );
827        self.entry_count += 1;
828        Ok(UpsertOutcome::Inserted)
829    }
830
831    /// Bulk-update existing keys. Keys must be sorted.
832    pub fn update_sorted(
833        &mut self,
834        pages: &mut FxHashMap<PageId, Page>,
835        alloc: &mut PageAllocator,
836        txn_id: TxnId,
837        pairs: &[(&[u8], &[u8])],
838    ) -> Result<u64> {
839        if pairs.is_empty() {
840            return Ok(0);
841        }
842        self.last_insert = None;
843
844        let (mut path, mut leaf_id) = self.walk_to_leaf(pages, pairs[0].0)?;
845        let mut cow_leaf = cow_page(pages, alloc, leaf_id, txn_id);
846        if cow_leaf != leaf_id {
847            self.root = propagate_cow_up(pages, alloc, txn_id, &mut path, cow_leaf);
848        }
849
850        let mut count: u64 = 0;
851        let mut hint: u16 = 0;
852
853        for &(key, value) in pairs {
854            let past_leaf = {
855                let page = pages.get(&cow_leaf).unwrap();
856                let n = page.num_cells();
857                n == 0 || key > leaf_node::read_cell(page, n - 1).key
858            };
859
860            if past_leaf {
861                let (new_path, new_leaf) = self.walk_to_leaf(pages, key)?;
862                path = new_path;
863                leaf_id = new_leaf;
864                cow_leaf = cow_page(pages, alloc, leaf_id, txn_id);
865                if cow_leaf != leaf_id {
866                    self.root = propagate_cow_up(pages, alloc, txn_id, &mut path, cow_leaf);
867                }
868                hint = 0;
869            }
870
871            let page = pages.get(&cow_leaf).unwrap();
872            let n = page.num_cells();
873            let idx = {
874                let mut i = hint;
875                loop {
876                    if i >= n {
877                        break None;
878                    }
879                    let cell = leaf_node::read_cell(page, i);
880                    match key.cmp(cell.key) {
881                        std::cmp::Ordering::Equal => break Some(i),
882                        std::cmp::Ordering::Less => break None,
883                        std::cmp::Ordering::Greater => i += 1,
884                    }
885                }
886            };
887
888            if let Some(idx) = idx {
889                hint = idx + 1;
890                let page = pages.get_mut(&cow_leaf).unwrap();
891                if !leaf_node::update_value_in_place(page, idx, ValueType::Inline, value) {
892                    leaf_node::insert_direct(page, key, ValueType::Inline, value);
893                }
894                count += 1;
895            }
896        }
897
898        Ok(count)
899    }
900
901    /// Delete a key. Returns `true` if the key was found and deleted.
902    pub fn delete(
903        &mut self,
904        pages: &mut FxHashMap<PageId, Page>,
905        alloc: &mut PageAllocator,
906        txn_id: TxnId,
907        key: &[u8],
908    ) -> Result<bool> {
909        self.last_insert = None;
910        let (mut path, leaf_id) = self.walk_to_leaf(pages, key)?;
911
912        let found = {
913            let page = pages.get(&leaf_id).unwrap();
914            leaf_node::search(page, key).is_ok()
915        };
916        if !found {
917            return Ok(false);
918        }
919
920        let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
921        {
922            let page = pages.get_mut(&new_leaf_id).unwrap();
923            leaf_node::delete(page, key);
924        }
925
926        let leaf_empty = pages.get(&new_leaf_id).unwrap().num_cells() == 0;
927
928        if !leaf_empty || path.is_empty() {
929            if alloc.in_place() && new_leaf_id == leaf_id {
930                self.entry_count -= 1;
931                return Ok(true);
932            }
933            self.root = propagate_cow_up(pages, alloc, txn_id, &mut path, new_leaf_id);
934            self.entry_count -= 1;
935            return Ok(true);
936        }
937
938        alloc.free(new_leaf_id);
939        pages.remove(&new_leaf_id);
940
941        self.root = propagate_remove_up(pages, alloc, txn_id, &mut path, &mut self.depth);
942        self.entry_count -= 1;
943        Ok(true)
944    }
945
946    /// Walk root to leaf for `key`. Returns (path, leaf_page_id).
947    pub fn walk_to_leaf(
948        &self,
949        pages: &FxHashMap<PageId, Page>,
950        key: &[u8],
951    ) -> Result<(Vec<(PageId, usize)>, PageId)> {
952        let mut path = Vec::with_capacity(self.depth as usize);
953        let mut current = self.root;
954        loop {
955            let page = pages.get(&current).ok_or(Error::PageOutOfBounds(current))?;
956            match page.page_type() {
957                Some(PageType::Leaf) => return Ok((path, current)),
958                Some(PageType::Branch) => {
959                    let child_idx = branch_node::search_child_index(page, key);
960                    let child = branch_node::get_child(page, child_idx);
961                    path.push((current, child_idx));
962                    current = child;
963                }
964                _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
965            }
966        }
967    }
968}
969
970/// CoW a page. No-op if already owned by this txn. In-place mode reuses page ID.
971pub fn cow_page(
972    pages: &mut FxHashMap<PageId, Page>,
973    alloc: &mut PageAllocator,
974    old_id: PageId,
975    txn_id: TxnId,
976) -> PageId {
977    if alloc.in_place() {
978        let page = pages.get_mut(&old_id).unwrap();
979        if page.txn_id() != txn_id {
980            page.set_txn_id(txn_id);
981        }
982        return old_id;
983    }
984    let mut new_page = {
985        let page = pages.get(&old_id).unwrap();
986        if page.txn_id() == txn_id {
987            return old_id;
988        }
989        page.clone()
990    };
991    let new_id = alloc.allocate();
992    new_page.set_page_id(new_id);
993    new_page.set_txn_id(txn_id);
994    pages.insert(new_id, new_page);
995    alloc.free(old_id);
996    new_id
997}
998
999/// Update a branch's child pointer at `child_idx` to point to `new_child`.
1000fn update_branch_child(page: &mut Page, child_idx: usize, new_child: PageId) {
1001    let n = page.num_cells() as usize;
1002    if child_idx < n {
1003        let offset = page.cell_offset(child_idx as u16) as usize;
1004        page.data[offset..offset + 4].copy_from_slice(&new_child.as_u32().to_le_bytes());
1005    } else {
1006        page.set_right_child(new_child);
1007    }
1008}
1009
1010/// Propagate CoW up through ancestors. Updates `path` in place so callers
1011/// caching the path (e.g. LIL) reuse current PageIds after CoW — critical
1012/// across SAVEPOINT boundaries where txn_id changes invalidate the cache.
1013pub fn propagate_cow_up(
1014    pages: &mut FxHashMap<PageId, Page>,
1015    alloc: &mut PageAllocator,
1016    txn_id: TxnId,
1017    path: &mut [(PageId, usize)],
1018    mut new_child: PageId,
1019) -> PageId {
1020    for i in (0..path.len()).rev() {
1021        let (ancestor_id, child_idx) = path[i];
1022        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
1023        let page = pages.get_mut(&new_ancestor).unwrap();
1024        update_branch_child(page, child_idx, new_child);
1025        path[i] = (new_ancestor, child_idx);
1026        new_child = new_ancestor;
1027    }
1028    new_child
1029}
1030
1031/// Split full leaf and insert. Returns (separator_key, right_page_id).
1032fn split_leaf_with_insert(
1033    pages: &mut FxHashMap<PageId, Page>,
1034    alloc: &mut PageAllocator,
1035    txn_id: TxnId,
1036    leaf_id: PageId,
1037    key: &[u8],
1038    val_type: ValueType,
1039    value: &[u8],
1040) -> (Vec<u8>, PageId) {
1041    let mut cells: Vec<(Vec<u8>, Vec<u8>)> = {
1042        let page = pages.get(&leaf_id).unwrap();
1043        let n = page.num_cells() as usize;
1044        (0..n)
1045            .map(|i| {
1046                let cell = leaf_node::read_cell(page, i as u16);
1047                let raw = leaf_node::read_cell_bytes(page, i as u16);
1048                (cell.key.to_vec(), raw)
1049            })
1050            .collect()
1051    };
1052
1053    let new_raw = leaf_node::build_cell(key, val_type, value);
1054    match cells.binary_search_by(|(k, _)| k.as_slice().cmp(key)) {
1055        Ok(idx) => cells[idx] = (key.to_vec(), new_raw),
1056        Err(idx) => cells.insert(idx, (key.to_vec(), new_raw)),
1057    }
1058
1059    let total = cells.len();
1060
1061    let usable = citadel_core::constants::USABLE_SIZE;
1062    let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
1063    cum.push(0);
1064    for (_, raw) in &cells {
1065        cum.push(cum.last().unwrap() + raw.len());
1066    }
1067    let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
1068    let right_fits = |sp: usize| (cum[total] - cum[sp]) + (total - sp) * 2 <= usable;
1069
1070    let mut split_point = total / 2;
1071    if !left_fits(split_point) || !right_fits(split_point) {
1072        split_point = 1;
1073        for sp in 1..total {
1074            if left_fits(sp) && right_fits(sp) {
1075                split_point = sp;
1076                if sp >= total / 2 {
1077                    break;
1078                }
1079            }
1080        }
1081    }
1082
1083    let sep_key = cells[split_point].0.clone();
1084
1085    {
1086        let left_refs: Vec<&[u8]> = cells[..split_point]
1087            .iter()
1088            .map(|(_, raw)| raw.as_slice())
1089            .collect();
1090        let page = pages.get_mut(&leaf_id).unwrap();
1091        page.rebuild_cells(&left_refs);
1092    }
1093
1094    let right_id = alloc.allocate();
1095    {
1096        let mut right_page = Page::new(right_id, PageType::Leaf, txn_id);
1097        let right_refs: Vec<&[u8]> = cells[split_point..]
1098            .iter()
1099            .map(|(_, raw)| raw.as_slice())
1100            .collect();
1101        right_page.rebuild_cells(&right_refs);
1102        pages.insert(right_id, right_page);
1103    }
1104
1105    (sep_key, right_id)
1106}
1107
1108#[allow(clippy::too_many_arguments)]
1109fn propagate_split_up(
1110    pages: &mut FxHashMap<PageId, Page>,
1111    alloc: &mut PageAllocator,
1112    txn_id: TxnId,
1113    path: &[(PageId, usize)],
1114    mut left_child: PageId,
1115    initial_sep: &[u8],
1116    mut right_child: PageId,
1117    depth: &mut u16,
1118) -> PageId {
1119    let mut sep_key = initial_sep.to_vec();
1120    let mut pending_split = true;
1121
1122    for &(ancestor_id, child_idx) in path.iter().rev() {
1123        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
1124
1125        if pending_split {
1126            let ok = {
1127                let page = pages.get_mut(&new_ancestor).unwrap();
1128                branch_node::insert_separator(page, child_idx, left_child, &sep_key, right_child)
1129            };
1130
1131            if ok {
1132                pending_split = false;
1133                left_child = new_ancestor;
1134            } else {
1135                let (new_sep, new_right) = split_branch_with_insert(
1136                    pages,
1137                    alloc,
1138                    txn_id,
1139                    new_ancestor,
1140                    child_idx,
1141                    left_child,
1142                    &sep_key,
1143                    right_child,
1144                );
1145                left_child = new_ancestor;
1146                sep_key = new_sep;
1147                right_child = new_right;
1148            }
1149        } else {
1150            let page = pages.get_mut(&new_ancestor).unwrap();
1151            update_branch_child(page, child_idx, left_child);
1152            left_child = new_ancestor;
1153        }
1154    }
1155
1156    if pending_split {
1157        let new_root_id = alloc.allocate();
1158        let mut new_root = Page::new(new_root_id, PageType::Branch, txn_id);
1159        let cell = branch_node::build_cell(left_child, &sep_key);
1160        new_root.write_cell(&cell).unwrap();
1161        new_root.set_right_child(right_child);
1162        pages.insert(new_root_id, new_root);
1163        *depth += 1;
1164        new_root_id
1165    } else {
1166        left_child
1167    }
1168}
1169
1170#[allow(clippy::too_many_arguments)]
1171fn split_branch_with_insert(
1172    pages: &mut FxHashMap<PageId, Page>,
1173    alloc: &mut PageAllocator,
1174    txn_id: TxnId,
1175    branch_id: PageId,
1176    child_idx: usize,
1177    new_left: PageId,
1178    sep_key: &[u8],
1179    new_right: PageId,
1180) -> (Vec<u8>, PageId) {
1181    let (new_cells, final_right_child) = {
1182        let page = pages.get(&branch_id).unwrap();
1183        let n = page.num_cells() as usize;
1184        let cells: Vec<(PageId, Vec<u8>)> = (0..n)
1185            .map(|i| {
1186                let cell = branch_node::read_cell(page, i as u16);
1187                (cell.child, cell.key.to_vec())
1188            })
1189            .collect();
1190        let old_rc = page.right_child();
1191
1192        let mut result = Vec::with_capacity(n + 1);
1193        let final_rc;
1194
1195        if child_idx < n {
1196            let old_key = cells[child_idx].1.clone();
1197            for (i, (child, key)) in cells.into_iter().enumerate() {
1198                if i == child_idx {
1199                    result.push((new_left, sep_key.to_vec()));
1200                    result.push((new_right, old_key.clone()));
1201                } else {
1202                    result.push((child, key));
1203                }
1204            }
1205            final_rc = old_rc;
1206        } else {
1207            result = cells;
1208            result.push((new_left, sep_key.to_vec()));
1209            final_rc = new_right;
1210        }
1211
1212        (result, final_rc)
1213    };
1214
1215    let total = new_cells.len();
1216    let usable = citadel_core::constants::USABLE_SIZE;
1217    let raw_sizes: Vec<usize> = new_cells.iter().map(|(_, key)| 6 + key.len()).collect();
1218    let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
1219    cum.push(0);
1220    for &sz in &raw_sizes {
1221        cum.push(cum.last().unwrap() + sz);
1222    }
1223    let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
1224    let right_fits = |sp: usize| {
1225        let right_count = total - sp - 1;
1226        (cum[total] - cum[sp + 1]) + right_count * 2 <= usable
1227    };
1228
1229    let mut split_point = total / 2;
1230    if !left_fits(split_point) || !right_fits(split_point) {
1231        split_point = 1;
1232        for sp in 1..total.saturating_sub(1) {
1233            if left_fits(sp) && right_fits(sp) {
1234                split_point = sp;
1235                if sp >= total / 2 {
1236                    break;
1237                }
1238            }
1239        }
1240    }
1241
1242    let promoted_sep = new_cells[split_point].1.clone();
1243    let promoted_child = new_cells[split_point].0;
1244
1245    {
1246        let left_raw: Vec<Vec<u8>> = new_cells[..split_point]
1247            .iter()
1248            .map(|(child, key)| branch_node::build_cell(*child, key))
1249            .collect();
1250        let left_refs: Vec<&[u8]> = left_raw.iter().map(|c| c.as_slice()).collect();
1251        let page = pages.get_mut(&branch_id).unwrap();
1252        page.rebuild_cells(&left_refs);
1253        page.set_right_child(promoted_child);
1254    }
1255
1256    let right_branch_id = alloc.allocate();
1257    {
1258        let mut right_page = Page::new(right_branch_id, PageType::Branch, txn_id);
1259        let right_raw: Vec<Vec<u8>> = new_cells[split_point + 1..]
1260            .iter()
1261            .map(|(child, key)| branch_node::build_cell(*child, key))
1262            .collect();
1263        let right_refs: Vec<&[u8]> = right_raw.iter().map(|c| c.as_slice()).collect();
1264        right_page.rebuild_cells(&right_refs);
1265        right_page.set_right_child(final_right_child);
1266        pages.insert(right_branch_id, right_page);
1267    }
1268
1269    (promoted_sep, right_branch_id)
1270}
1271
1272fn remove_child_from_branch(page: &mut Page, child_idx: usize) {
1273    let n = page.num_cells() as usize;
1274    if child_idx < n {
1275        let cell_sz = branch_node::get_cell_size(page, child_idx as u16);
1276        page.delete_cell_at(child_idx as u16, cell_sz);
1277    } else {
1278        assert!(n > 0, "cannot remove right_child from branch with 0 cells");
1279        let last_child = branch_node::read_cell(page, (n - 1) as u16).child;
1280        let cell_sz = branch_node::get_cell_size(page, (n - 1) as u16);
1281        page.delete_cell_at((n - 1) as u16, cell_sz);
1282        page.set_right_child(last_child);
1283    }
1284}
1285
1286fn propagate_remove_up(
1287    pages: &mut FxHashMap<PageId, Page>,
1288    alloc: &mut PageAllocator,
1289    txn_id: TxnId,
1290    path: &mut [(PageId, usize)],
1291    depth: &mut u16,
1292) -> PageId {
1293    let mut level = path.len();
1294    let mut need_remove_at_level = true;
1295    let mut new_child = PageId(0);
1296
1297    while level > 0 && need_remove_at_level {
1298        level -= 1;
1299        let (ancestor_id, child_idx) = path[level];
1300        let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
1301
1302        {
1303            let page = pages.get_mut(&new_ancestor).unwrap();
1304            remove_child_from_branch(page, child_idx);
1305        }
1306
1307        let num_cells = pages.get(&new_ancestor).unwrap().num_cells();
1308
1309        if num_cells > 0 || level == 0 {
1310            if num_cells == 0 && level == 0 {
1311                // Root collapsed - replace with its only child
1312                let only_child = pages.get(&new_ancestor).unwrap().right_child();
1313                alloc.free(new_ancestor);
1314                pages.remove(&new_ancestor);
1315                *depth -= 1;
1316                return only_child;
1317            }
1318            // Branch is non-empty, or it's the root with cells
1319            new_child = new_ancestor;
1320            need_remove_at_level = false;
1321        } else {
1322            // Branch became empty (0 cells) - collapse to its right_child
1323            let only_child = pages.get(&new_ancestor).unwrap().right_child();
1324            alloc.free(new_ancestor);
1325            pages.remove(&new_ancestor);
1326            *depth -= 1;
1327
1328            new_child = only_child;
1329            need_remove_at_level = false;
1330        }
1331    }
1332
1333    if level > 0 {
1334        let remaining_path = &mut path[..level];
1335        new_child = propagate_cow_up(pages, alloc, txn_id, remaining_path, new_child);
1336    }
1337
1338    new_child
1339}
1340
1341#[cfg(test)]
1342#[path = "btree_tests.rs"]
1343mod tests;