1use crate::allocator::PageAllocator;
4use citadel_core::types::{PageId, PageType, TxnId, ValueType};
5use citadel_core::{Error, Result};
6use citadel_page::page::Page;
7use citadel_page::{branch_node, leaf_node};
8use rustc_hash::FxHashMap;
9
10#[derive(Clone)]
12pub struct BTree {
13 pub root: PageId,
14 pub depth: u16,
15 pub entry_count: u64,
16 last_insert: Option<(Vec<(PageId, usize)>, PageId)>,
17 last_delete: Option<(Vec<(PageId, usize)>, PageId)>,
18}
19
20#[derive(Debug, Clone)]
21pub enum UpsertOutcome {
22 Inserted,
23 Updated,
24 Skipped,
25}
26
27#[derive(Debug, Clone)]
28pub enum UpsertAction {
29 Replace(Vec<u8>),
30 Skip,
31}
32
33impl BTree {
34 pub fn new(
36 pages: &mut FxHashMap<PageId, Page>,
37 alloc: &mut PageAllocator,
38 txn_id: TxnId,
39 ) -> Self {
40 let root_id = alloc.allocate();
41 let root = Page::new(root_id, PageType::Leaf, txn_id);
42 pages.insert(root_id, root);
43 Self {
44 root: root_id,
45 depth: 1,
46 entry_count: 0,
47 last_insert: None,
48 last_delete: None,
49 }
50 }
51
52 pub fn from_existing(root: PageId, depth: u16, entry_count: u64) -> Self {
54 Self {
55 root,
56 depth,
57 entry_count,
58 last_insert: None,
59 last_delete: None,
60 }
61 }
62
63 pub fn search_at_leaf(
64 pages: &FxHashMap<PageId, Page>,
65 leaf_id: PageId,
66 key: &[u8],
67 ) -> Result<Option<(ValueType, Vec<u8>)>> {
68 let page = pages.get(&leaf_id).ok_or(Error::PageOutOfBounds(leaf_id))?;
69 match leaf_node::search(page, key) {
70 Ok(idx) => {
71 let cell = leaf_node::read_cell(page, idx);
72 Ok(Some((cell.val_type, cell.value.to_vec())))
73 }
74 Err(_) => Ok(None),
75 }
76 }
77
78 pub fn search(
79 &self,
80 pages: &FxHashMap<PageId, Page>,
81 key: &[u8],
82 ) -> Result<Option<(ValueType, Vec<u8>)>> {
83 let mut current = self.root;
84 loop {
85 let page = pages.get(¤t).ok_or(Error::PageOutOfBounds(current))?;
86 match page.page_type() {
87 Some(PageType::Leaf) => {
88 return match leaf_node::search(page, key) {
89 Ok(idx) => {
90 let cell = leaf_node::read_cell(page, idx);
91 Ok(Some((cell.val_type, cell.value.to_vec())))
92 }
93 Err(_) => Ok(None),
94 };
95 }
96 Some(PageType::Branch) => {
97 let idx = branch_node::search_child_index(page, key);
98 current = branch_node::get_child(page, idx);
99 }
100 _ => {
101 return Err(Error::InvalidPageType(page.page_type_raw(), current));
102 }
103 }
104 }
105 }
106
107 #[inline]
111 fn clear_lil_caches(&mut self) {
112 self.last_insert = None;
113 self.last_delete = None;
114 }
115
116 pub fn debug_assert_lil_disjoint(&self) {
117 debug_assert!(
118 self.last_insert.is_none() || self.last_delete.is_none(),
119 "LIL caches must be mutually exclusive (both Some indicates a missed clear site)"
120 );
121 }
122
123 pub fn lil_would_hit(&self, pages: &FxHashMap<PageId, Page>, key: &[u8]) -> bool {
124 if let Some((_, cached_leaf)) = &self.last_insert {
125 if let Some(page) = pages.get(cached_leaf) {
126 let n = page.num_cells();
127 return n > 0 && key > leaf_node::read_cell(page, n - 1).key;
128 }
129 }
130 false
131 }
132
133 pub fn try_lil_insert(
135 &mut self,
136 pages: &mut FxHashMap<PageId, Page>,
137 alloc: &mut PageAllocator,
138 txn_id: TxnId,
139 key: &[u8],
140 val_type: ValueType,
141 value: &[u8],
142 ) -> Result<Option<bool>> {
143 let cached_leaf = match self.last_insert.as_ref() {
144 Some((_, leaf)) => *leaf,
145 None => return Ok(None),
146 };
147 let (hit, needs_cow) = {
148 let page = pages
149 .get(&cached_leaf)
150 .ok_or(Error::PageOutOfBounds(cached_leaf))?;
151 let n = page.num_cells();
152 let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
153 let nc = page.txn_id() != txn_id;
154 (h, nc)
155 };
156 if !hit {
157 return Ok(None);
158 }
159 let mut cached_path = self.last_insert.take().unwrap().0;
160 let cow_id = if needs_cow {
161 cow_page(pages, alloc, cached_leaf, txn_id)
162 } else {
163 cached_leaf
164 };
165 let ok = {
166 let page = pages.get_mut(&cow_id).unwrap();
167 leaf_node::insert_append_direct(page, key, val_type, value)
168 };
169 if ok {
170 if cow_id != cached_leaf {
171 self.root = propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
172 }
173 self.entry_count += 1;
174 self.last_delete = None;
175 self.last_insert = Some((cached_path, cow_id));
176 return Ok(Some(true));
177 }
178 let (sep_key, right_id) =
179 split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
180 self.root = propagate_split_up(
181 pages,
182 alloc,
183 txn_id,
184 &cached_path,
185 cow_id,
186 &sep_key,
187 right_id,
188 &mut self.depth,
189 );
190 self.clear_lil_caches();
191 self.entry_count += 1;
192 Ok(Some(true))
193 }
194
195 pub fn try_lil_delete(
200 &mut self,
201 pages: &mut FxHashMap<PageId, Page>,
202 alloc: &mut PageAllocator,
203 txn_id: TxnId,
204 key: &[u8],
205 ) -> Result<Option<(bool, Option<PageId>)>> {
206 let cached_leaf = match self.last_delete.as_ref() {
207 Some((_, leaf)) => *leaf,
208 None => return Ok(None),
209 };
210 let (in_range, found_idx, overflow_head, needs_cow) = {
211 let Some(page) = pages.get(&cached_leaf) else {
212 self.last_delete = None;
213 return Ok(None);
214 };
215 if !matches!(page.page_type(), Some(PageType::Leaf)) {
216 self.last_delete = None;
217 return Ok(None);
218 }
219 let n = page.num_cells();
220 if n == 0 {
221 self.last_delete = None;
222 return Ok(None);
223 }
224 let first = leaf_node::read_cell(page, 0).key;
225 let last = leaf_node::read_cell(page, n - 1).key;
226 let in_range = key >= first && key <= last;
227 if !in_range {
228 return Ok(None);
229 }
230 let (found_idx, overflow_head) = match leaf_node::search(page, key) {
231 Ok(idx) => {
232 let cell = leaf_node::read_cell(page, idx);
233 let head = if cell.val_type == ValueType::Overflow {
234 Some(leaf_node::OverflowRef::from_bytes(cell.value).first_page)
235 } else {
236 None
237 };
238 (Some(idx), head)
239 }
240 Err(_) => (None, None),
241 };
242 let nc = page.txn_id() != txn_id;
243 (in_range, found_idx, overflow_head, nc)
244 };
245 if !in_range {
246 return Ok(None);
247 }
248 if found_idx.is_none() {
249 return Ok(Some((false, None)));
250 }
251
252 let mut cached_path = self.last_delete.take().unwrap().0;
253 let cow_id = if needs_cow {
254 cow_page(pages, alloc, cached_leaf, txn_id)
255 } else {
256 cached_leaf
257 };
258 {
259 let page = pages.get_mut(&cow_id).unwrap();
260 leaf_node::delete(page, key);
261 }
262
263 let leaf_empty = pages.get(&cow_id).unwrap().num_cells() == 0;
264
265 if !leaf_empty || cached_path.is_empty() {
266 if alloc.in_place() && cow_id == cached_leaf {
267 self.entry_count -= 1;
268 self.clear_lil_caches();
269 self.last_delete = Some((cached_path, cow_id));
270 return Ok(Some((true, overflow_head)));
271 }
272 if cow_id != cached_leaf {
273 self.root = propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
274 }
275 self.entry_count -= 1;
276 self.clear_lil_caches();
277 self.last_delete = Some((cached_path, cow_id));
278 return Ok(Some((true, overflow_head)));
279 }
280
281 alloc.free(cow_id);
282 pages.remove(&cow_id);
283 self.root = propagate_remove_up(pages, alloc, txn_id, &mut cached_path, &mut self.depth);
284 self.entry_count -= 1;
285 self.clear_lil_caches();
286 self.last_delete = None;
287 Ok(Some((true, overflow_head)))
288 }
289
290 pub fn insert(
292 &mut self,
293 pages: &mut FxHashMap<PageId, Page>,
294 alloc: &mut PageAllocator,
295 txn_id: TxnId,
296 key: &[u8],
297 val_type: ValueType,
298 value: &[u8],
299 ) -> Result<bool> {
300 if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
302 let (hit, needs_cow) = {
303 let page = pages
304 .get(&cached_leaf)
305 .ok_or(Error::PageOutOfBounds(cached_leaf))?;
306 let n = page.num_cells();
307 let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
308 let nc = page.txn_id() != txn_id;
309 (h, nc)
310 };
311 if hit {
312 let cow_id = if needs_cow {
313 cow_page(pages, alloc, cached_leaf, txn_id)
314 } else {
315 cached_leaf
316 };
317 let ok = {
318 let page = pages.get_mut(&cow_id).unwrap();
319 leaf_node::insert_direct(page, key, val_type, value)
320 };
321 if ok {
322 if cow_id != cached_leaf {
323 self.root =
324 propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
325 }
326 self.entry_count += 1;
327 self.last_delete = None;
328 self.last_insert = Some((cached_path, cow_id));
329 return Ok(true);
330 }
331 let (sep_key, right_id) =
332 split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
333 self.root = propagate_split_up(
334 pages,
335 alloc,
336 txn_id,
337 &cached_path,
338 cow_id,
339 &sep_key,
340 right_id,
341 &mut self.depth,
342 );
343 self.clear_lil_caches();
344 self.entry_count += 1;
345 return Ok(true);
346 }
347 }
348
349 let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
350 let (was_new, _replaced) =
351 self.insert_at_leaf(pages, alloc, txn_id, key, val_type, value, path, leaf_id)?;
352 Ok(was_new)
353 }
354
355 #[allow(clippy::too_many_arguments)]
356 #[inline]
357 pub fn insert_at_leaf(
358 &mut self,
359 pages: &mut FxHashMap<PageId, Page>,
360 alloc: &mut PageAllocator,
361 txn_id: TxnId,
362 key: &[u8],
363 val_type: ValueType,
364 value: &[u8],
365 path: Vec<(PageId, usize)>,
366 leaf_id: PageId,
367 ) -> Result<(bool, Option<PageId>)> {
368 let (key_exists, replaced_overflow) = {
369 let page = pages.get(&leaf_id).unwrap();
370 match leaf_node::search(page, key) {
371 Ok(idx) => {
372 let cell = leaf_node::read_cell(page, idx);
373 let head = if cell.val_type == ValueType::Overflow {
374 Some(leaf_node::OverflowRef::from_bytes(cell.value).first_page)
375 } else {
376 None
377 };
378 (true, head)
379 }
380 Err(_) => (false, None),
381 }
382 };
383
384 let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
385
386 let leaf_ok = {
387 let page = pages.get_mut(&new_leaf_id).unwrap();
388 leaf_node::insert_direct(page, key, val_type, value)
389 };
390
391 if leaf_ok {
392 if alloc.in_place() && new_leaf_id == leaf_id {
393 let mut is_rightmost = true;
394 for &(ancestor_id, child_idx) in path.iter().rev() {
395 let page = pages.get(&ancestor_id).unwrap();
396 if child_idx != page.num_cells() as usize {
397 is_rightmost = false;
398 break;
399 }
400 }
401 if is_rightmost {
402 self.last_delete = None;
403 self.last_insert = Some((path, new_leaf_id));
404 }
405 if !key_exists {
406 self.entry_count += 1;
407 }
408 return Ok((!key_exists, replaced_overflow));
409 }
410 let mut child = new_leaf_id;
411 let mut is_rightmost = true;
412 let mut new_path = path;
413 for i in (0..new_path.len()).rev() {
414 let (ancestor_id, child_idx) = new_path[i];
415 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
416 let page = pages.get_mut(&new_ancestor).unwrap();
417 update_branch_child(page, child_idx, child);
418 if child_idx != page.num_cells() as usize {
419 is_rightmost = false;
420 }
421 new_path[i] = (new_ancestor, child_idx);
422 child = new_ancestor;
423 }
424 self.root = child;
425
426 if is_rightmost {
427 self.last_delete = None;
428 self.last_insert = Some((new_path, new_leaf_id));
429 }
430
431 if !key_exists {
432 self.entry_count += 1;
433 }
434 return Ok((!key_exists, replaced_overflow));
435 }
436
437 self.clear_lil_caches();
438 let (sep_key, right_id) =
439 split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
440 self.root = propagate_split_up(
441 pages,
442 alloc,
443 txn_id,
444 &path,
445 new_leaf_id,
446 &sep_key,
447 right_id,
448 &mut self.depth,
449 );
450
451 if !key_exists {
452 self.entry_count += 1;
453 }
454 Ok((!key_exists, replaced_overflow))
455 }
456
457 pub fn insert_or_fetch(
458 &mut self,
459 pages: &mut FxHashMap<PageId, Page>,
460 alloc: &mut PageAllocator,
461 txn_id: TxnId,
462 key: &[u8],
463 val_type: ValueType,
464 value: &[u8],
465 ) -> Result<Option<Vec<u8>>> {
466 if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
467 let (hit, needs_cow) = {
468 let page = pages
469 .get(&cached_leaf)
470 .ok_or(Error::PageOutOfBounds(cached_leaf))?;
471 let n = page.num_cells();
472 let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
473 let nc = page.txn_id() != txn_id;
474 (h, nc)
475 };
476 if hit {
477 let cow_id = if needs_cow {
478 cow_page(pages, alloc, cached_leaf, txn_id)
479 } else {
480 cached_leaf
481 };
482 let ok = {
483 let page = pages.get_mut(&cow_id).unwrap();
484 leaf_node::insert_direct(page, key, val_type, value)
485 };
486 if ok {
487 if cow_id != cached_leaf {
488 self.root =
489 propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
490 }
491 self.entry_count += 1;
492 self.last_delete = None;
493 self.last_insert = Some((cached_path, cow_id));
494 return Ok(None);
495 }
496 let (sep_key, right_id) =
497 split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
498 self.root = propagate_split_up(
499 pages,
500 alloc,
501 txn_id,
502 &cached_path,
503 cow_id,
504 &sep_key,
505 right_id,
506 &mut self.depth,
507 );
508 self.clear_lil_caches();
509 self.entry_count += 1;
510 return Ok(None);
511 }
512 self.last_insert = Some((cached_path, cached_leaf));
513 }
514
515 let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
516
517 let existing_value = {
518 let page = pages.get(&leaf_id).unwrap();
519 match leaf_node::search(page, key) {
520 Ok(idx) => {
521 let cell = leaf_node::read_cell(page, idx);
522 if matches!(cell.val_type, ValueType::Tombstone) {
523 None
524 } else {
525 Some(cell.value.to_vec())
526 }
527 }
528 Err(_) => None,
529 }
530 };
531 if let Some(v) = existing_value {
532 return Ok(Some(v));
533 }
534
535 let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
536 let leaf_ok = {
537 let page = pages.get_mut(&new_leaf_id).unwrap();
538 leaf_node::insert_direct(page, key, val_type, value)
539 };
540
541 if leaf_ok {
542 if alloc.in_place() && new_leaf_id == leaf_id {
543 let mut is_rightmost = true;
544 for &(ancestor_id, child_idx) in path.iter().rev() {
545 let page = pages.get(&ancestor_id).unwrap();
546 if child_idx != page.num_cells() as usize {
547 is_rightmost = false;
548 break;
549 }
550 }
551 if is_rightmost {
552 self.last_delete = None;
553 self.last_insert = Some((path, new_leaf_id));
554 }
555 self.entry_count += 1;
556 return Ok(None);
557 }
558 let mut child = new_leaf_id;
559 let mut is_rightmost = true;
560 let mut new_path = path;
561 for i in (0..new_path.len()).rev() {
562 let (ancestor_id, child_idx) = new_path[i];
563 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
564 let page = pages.get_mut(&new_ancestor).unwrap();
565 update_branch_child(page, child_idx, child);
566 if child_idx != page.num_cells() as usize {
567 is_rightmost = false;
568 }
569 new_path[i] = (new_ancestor, child_idx);
570 child = new_ancestor;
571 }
572 self.root = child;
573
574 if is_rightmost {
575 self.last_delete = None;
576 self.last_insert = Some((new_path, new_leaf_id));
577 }
578 self.entry_count += 1;
579 return Ok(None);
580 }
581
582 self.clear_lil_caches();
583 let (sep_key, right_id) =
584 split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
585 self.root = propagate_split_up(
586 pages,
587 alloc,
588 txn_id,
589 &path,
590 new_leaf_id,
591 &sep_key,
592 right_id,
593 &mut self.depth,
594 );
595 self.entry_count += 1;
596 Ok(None)
597 }
598
599 #[inline]
600 pub fn insert_if_absent(
601 &mut self,
602 pages: &mut FxHashMap<PageId, Page>,
603 alloc: &mut PageAllocator,
604 txn_id: TxnId,
605 key: &[u8],
606 val_type: ValueType,
607 value: &[u8],
608 ) -> Result<bool> {
609 if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
610 let (hit, needs_cow) = {
611 let page = pages
612 .get(&cached_leaf)
613 .ok_or(Error::PageOutOfBounds(cached_leaf))?;
614 let n = page.num_cells();
615 let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
616 let nc = page.txn_id() != txn_id;
617 (h, nc)
618 };
619 if hit {
620 let cow_id = if needs_cow {
621 cow_page(pages, alloc, cached_leaf, txn_id)
622 } else {
623 cached_leaf
624 };
625 let ok = {
626 let page = pages.get_mut(&cow_id).unwrap();
627 leaf_node::insert_direct(page, key, val_type, value)
628 };
629 if ok {
630 if cow_id != cached_leaf {
631 self.root =
632 propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
633 }
634 self.entry_count += 1;
635 self.last_delete = None;
636 self.last_insert = Some((cached_path, cow_id));
637 return Ok(true);
638 }
639 let (sep_key, right_id) =
640 split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
641 self.root = propagate_split_up(
642 pages,
643 alloc,
644 txn_id,
645 &cached_path,
646 cow_id,
647 &sep_key,
648 right_id,
649 &mut self.depth,
650 );
651 self.clear_lil_caches();
652 self.entry_count += 1;
653 return Ok(true);
654 }
655 self.last_insert = Some((cached_path, cached_leaf));
656 }
657
658 let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
659 self.insert_if_absent_at_leaf(pages, alloc, txn_id, key, val_type, value, path, leaf_id)
660 }
661
662 #[allow(clippy::too_many_arguments)]
663 #[inline]
664 pub fn insert_if_absent_at_leaf(
665 &mut self,
666 pages: &mut FxHashMap<PageId, Page>,
667 alloc: &mut PageAllocator,
668 txn_id: TxnId,
669 key: &[u8],
670 val_type: ValueType,
671 value: &[u8],
672 path: Vec<(PageId, usize)>,
673 leaf_id: PageId,
674 ) -> Result<bool> {
675 let exists = {
676 let page = pages.get(&leaf_id).unwrap();
677 match leaf_node::search(page, key) {
678 Ok(idx) => {
679 let cell = leaf_node::read_cell(page, idx);
680 !matches!(cell.val_type, ValueType::Tombstone)
681 }
682 Err(_) => false,
683 }
684 };
685 if exists {
686 return Ok(false);
687 }
688
689 let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
690 let leaf_ok = {
691 let page = pages.get_mut(&new_leaf_id).unwrap();
692 leaf_node::insert_direct(page, key, val_type, value)
693 };
694
695 if leaf_ok {
696 if alloc.in_place() && new_leaf_id == leaf_id {
697 let mut is_rightmost = true;
698 for &(ancestor_id, child_idx) in path.iter().rev() {
699 let page = pages.get(&ancestor_id).unwrap();
700 if child_idx != page.num_cells() as usize {
701 is_rightmost = false;
702 break;
703 }
704 }
705 if is_rightmost {
706 self.last_delete = None;
707 self.last_insert = Some((path, new_leaf_id));
708 }
709 self.entry_count += 1;
710 return Ok(true);
711 }
712 let mut child = new_leaf_id;
713 let mut is_rightmost = true;
714 let mut new_path = path;
715 for i in (0..new_path.len()).rev() {
716 let (ancestor_id, child_idx) = new_path[i];
717 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
718 let page = pages.get_mut(&new_ancestor).unwrap();
719 update_branch_child(page, child_idx, child);
720 if child_idx != page.num_cells() as usize {
721 is_rightmost = false;
722 }
723 new_path[i] = (new_ancestor, child_idx);
724 child = new_ancestor;
725 }
726 self.root = child;
727
728 if is_rightmost {
729 self.last_delete = None;
730 self.last_insert = Some((new_path, new_leaf_id));
731 }
732 self.entry_count += 1;
733 return Ok(true);
734 }
735
736 self.clear_lil_caches();
737 let (sep_key, right_id) =
738 split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
739 self.root = propagate_split_up(
740 pages,
741 alloc,
742 txn_id,
743 &path,
744 new_leaf_id,
745 &sep_key,
746 right_id,
747 &mut self.depth,
748 );
749 self.entry_count += 1;
750 Ok(true)
751 }
752
753 #[allow(clippy::too_many_arguments)]
754 #[inline]
755 pub fn upsert_with<F, E>(
756 &mut self,
757 pages: &mut FxHashMap<PageId, Page>,
758 alloc: &mut PageAllocator,
759 txn_id: TxnId,
760 key: &[u8],
761 val_type: ValueType,
762 default_value: &[u8],
763 f: F,
764 ) -> std::result::Result<UpsertOutcome, E>
765 where
766 F: FnMut(&[u8]) -> std::result::Result<UpsertAction, E>,
767 E: From<Error>,
768 {
769 if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
770 let (hit, needs_cow) = {
771 let page = pages
772 .get(&cached_leaf)
773 .ok_or(Error::PageOutOfBounds(cached_leaf))?;
774 let n = page.num_cells();
775 let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
776 let nc = page.txn_id() != txn_id;
777 (h, nc)
778 };
779 if hit {
780 let cow_id = if needs_cow {
781 cow_page(pages, alloc, cached_leaf, txn_id)
782 } else {
783 cached_leaf
784 };
785 let ok = {
786 let page = pages.get_mut(&cow_id).unwrap();
787 leaf_node::insert_direct(page, key, val_type, default_value)
788 };
789 if ok {
790 if cow_id != cached_leaf {
791 self.root =
792 propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
793 }
794 self.entry_count += 1;
795 self.last_delete = None;
796 self.last_insert = Some((cached_path, cow_id));
797 return Ok(UpsertOutcome::Inserted);
798 }
799 let (sep_key, right_id) = split_leaf_with_insert(
800 pages,
801 alloc,
802 txn_id,
803 cow_id,
804 key,
805 val_type,
806 default_value,
807 );
808 self.root = propagate_split_up(
809 pages,
810 alloc,
811 txn_id,
812 &cached_path,
813 cow_id,
814 &sep_key,
815 right_id,
816 &mut self.depth,
817 );
818 self.clear_lil_caches();
819 self.entry_count += 1;
820 return Ok(UpsertOutcome::Inserted);
821 }
822 self.last_insert = Some((cached_path, cached_leaf));
823 }
824
825 let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
826 self.upsert_with_at_leaf(
827 pages,
828 alloc,
829 txn_id,
830 key,
831 val_type,
832 default_value,
833 path,
834 leaf_id,
835 f,
836 )
837 }
838
839 #[allow(clippy::too_many_arguments)]
840 #[inline]
841 pub fn upsert_with_at_leaf<F, E>(
842 &mut self,
843 pages: &mut FxHashMap<PageId, Page>,
844 alloc: &mut PageAllocator,
845 txn_id: TxnId,
846 key: &[u8],
847 val_type: ValueType,
848 default_value: &[u8],
849 path: Vec<(PageId, usize)>,
850 leaf_id: PageId,
851 mut f: F,
852 ) -> std::result::Result<UpsertOutcome, E>
853 where
854 F: FnMut(&[u8]) -> std::result::Result<UpsertAction, E>,
855 E: From<Error>,
856 {
857 let action = {
858 let page = pages.get(&leaf_id).unwrap();
859 match leaf_node::search(page, key) {
860 Ok(idx) => {
861 let cell = leaf_node::read_cell(page, idx);
862 if matches!(cell.val_type, ValueType::Tombstone) {
863 None
864 } else {
865 Some(f(cell.value)?)
866 }
867 }
868 Err(_) => None,
869 }
870 };
871
872 if let Some(act) = action {
873 match act {
874 UpsertAction::Skip => return Ok(UpsertOutcome::Skipped),
875 UpsertAction::Replace(new_bytes) => {
876 let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
877 let leaf_ok = {
878 let page = pages.get_mut(&new_leaf_id).unwrap();
879 leaf_node::insert_direct(page, key, val_type, &new_bytes)
880 };
881 if leaf_ok {
882 if new_leaf_id != leaf_id {
883 let mut new_path = path;
884 self.root =
885 propagate_cow_up(pages, alloc, txn_id, &mut new_path, new_leaf_id);
886 }
887 return Ok(UpsertOutcome::Updated);
888 }
889 self.clear_lil_caches();
890 let (sep_key, right_id) = split_leaf_with_insert(
891 pages,
892 alloc,
893 txn_id,
894 new_leaf_id,
895 key,
896 val_type,
897 &new_bytes,
898 );
899 self.root = propagate_split_up(
900 pages,
901 alloc,
902 txn_id,
903 &path,
904 new_leaf_id,
905 &sep_key,
906 right_id,
907 &mut self.depth,
908 );
909 return Ok(UpsertOutcome::Updated);
910 }
911 }
912 }
913
914 let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
915 let leaf_ok = {
916 let page = pages.get_mut(&new_leaf_id).unwrap();
917 leaf_node::insert_direct(page, key, val_type, default_value)
918 };
919
920 if leaf_ok {
921 if alloc.in_place() && new_leaf_id == leaf_id {
922 let mut is_rightmost = true;
923 for &(ancestor_id, child_idx) in path.iter().rev() {
924 let page = pages.get(&ancestor_id).unwrap();
925 if child_idx != page.num_cells() as usize {
926 is_rightmost = false;
927 break;
928 }
929 }
930 if is_rightmost {
931 self.last_delete = None;
932 self.last_insert = Some((path, new_leaf_id));
933 }
934 self.entry_count += 1;
935 return Ok(UpsertOutcome::Inserted);
936 }
937 let mut child = new_leaf_id;
938 let mut is_rightmost = true;
939 let mut new_path = path;
940 for i in (0..new_path.len()).rev() {
941 let (ancestor_id, child_idx) = new_path[i];
942 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
943 let page = pages.get_mut(&new_ancestor).unwrap();
944 update_branch_child(page, child_idx, child);
945 if child_idx != page.num_cells() as usize {
946 is_rightmost = false;
947 }
948 new_path[i] = (new_ancestor, child_idx);
949 child = new_ancestor;
950 }
951 self.root = child;
952
953 if is_rightmost {
954 self.last_delete = None;
955 self.last_insert = Some((new_path, new_leaf_id));
956 }
957 self.entry_count += 1;
958 return Ok(UpsertOutcome::Inserted);
959 }
960
961 self.clear_lil_caches();
962 let (sep_key, right_id) = split_leaf_with_insert(
963 pages,
964 alloc,
965 txn_id,
966 new_leaf_id,
967 key,
968 val_type,
969 default_value,
970 );
971 self.root = propagate_split_up(
972 pages,
973 alloc,
974 txn_id,
975 &path,
976 new_leaf_id,
977 &sep_key,
978 right_id,
979 &mut self.depth,
980 );
981 self.entry_count += 1;
982 Ok(UpsertOutcome::Inserted)
983 }
984
985 pub fn update_sorted(
987 &mut self,
988 pages: &mut FxHashMap<PageId, Page>,
989 alloc: &mut PageAllocator,
990 txn_id: TxnId,
991 pairs: &[(&[u8], &[u8])],
992 ) -> Result<u64> {
993 if pairs.is_empty() {
994 return Ok(0);
995 }
996 self.clear_lil_caches();
997
998 let (mut path, mut leaf_id) = self.walk_to_leaf(pages, pairs[0].0)?;
999 let mut cow_leaf = cow_page(pages, alloc, leaf_id, txn_id);
1000 if cow_leaf != leaf_id {
1001 self.root = propagate_cow_up(pages, alloc, txn_id, &mut path, cow_leaf);
1002 }
1003
1004 let mut count: u64 = 0;
1005 let mut hint: u16 = 0;
1006
1007 for &(key, value) in pairs {
1008 let past_leaf = {
1009 let page = pages.get(&cow_leaf).unwrap();
1010 let n = page.num_cells();
1011 n == 0 || key > leaf_node::read_cell(page, n - 1).key
1012 };
1013
1014 if past_leaf {
1015 let (new_path, new_leaf) = self.walk_to_leaf(pages, key)?;
1016 path = new_path;
1017 leaf_id = new_leaf;
1018 cow_leaf = cow_page(pages, alloc, leaf_id, txn_id);
1019 if cow_leaf != leaf_id {
1020 self.root = propagate_cow_up(pages, alloc, txn_id, &mut path, cow_leaf);
1021 }
1022 hint = 0;
1023 }
1024
1025 let page = pages.get(&cow_leaf).unwrap();
1026 let n = page.num_cells();
1027 let idx = {
1028 let mut i = hint;
1029 loop {
1030 if i >= n {
1031 break None;
1032 }
1033 let cell = leaf_node::read_cell(page, i);
1034 match key.cmp(cell.key) {
1035 std::cmp::Ordering::Equal => break Some(i),
1036 std::cmp::Ordering::Less => break None,
1037 std::cmp::Ordering::Greater => i += 1,
1038 }
1039 }
1040 };
1041
1042 if let Some(idx) = idx {
1043 hint = idx + 1;
1044 let page = pages.get_mut(&cow_leaf).unwrap();
1045 if !leaf_node::update_value_in_place(page, idx, ValueType::Inline, value) {
1046 leaf_node::insert_direct(page, key, ValueType::Inline, value);
1047 }
1048 count += 1;
1049 }
1050 }
1051
1052 Ok(count)
1053 }
1054
1055 pub fn delete(
1057 &mut self,
1058 pages: &mut FxHashMap<PageId, Page>,
1059 alloc: &mut PageAllocator,
1060 txn_id: TxnId,
1061 key: &[u8],
1062 ) -> Result<bool> {
1063 let (mut path, leaf_id) = self.walk_to_leaf(pages, key)?;
1064 self.delete_at_leaf(pages, alloc, txn_id, key, &mut path, leaf_id)
1065 }
1066
1067 #[allow(clippy::ptr_arg)]
1068 pub fn delete_at_leaf(
1069 &mut self,
1070 pages: &mut FxHashMap<PageId, Page>,
1071 alloc: &mut PageAllocator,
1072 txn_id: TxnId,
1073 key: &[u8],
1074 path: &mut Vec<(PageId, usize)>,
1075 leaf_id: PageId,
1076 ) -> Result<bool> {
1077 self.clear_lil_caches();
1078
1079 let found = {
1080 let page = pages.get(&leaf_id).unwrap();
1081 leaf_node::search(page, key).is_ok()
1082 };
1083 if !found {
1084 return Ok(false);
1085 }
1086
1087 let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
1088 {
1089 let page = pages.get_mut(&new_leaf_id).unwrap();
1090 leaf_node::delete(page, key);
1091 }
1092
1093 let leaf_empty = pages.get(&new_leaf_id).unwrap().num_cells() == 0;
1094
1095 if !leaf_empty || path.is_empty() {
1096 if alloc.in_place() && new_leaf_id == leaf_id {
1097 self.entry_count -= 1;
1098 self.last_delete = Some((path.clone(), new_leaf_id));
1099 return Ok(true);
1100 }
1101 self.root = propagate_cow_up(pages, alloc, txn_id, path, new_leaf_id);
1102 self.entry_count -= 1;
1103 self.last_delete = Some((path.clone(), new_leaf_id));
1104 return Ok(true);
1105 }
1106
1107 alloc.free(new_leaf_id);
1108 pages.remove(&new_leaf_id);
1109
1110 self.root = propagate_remove_up(pages, alloc, txn_id, path, &mut self.depth);
1111 self.entry_count -= 1;
1112 self.last_delete = None;
1113 Ok(true)
1114 }
1115
1116 pub fn walk_to_leaf(
1118 &self,
1119 pages: &FxHashMap<PageId, Page>,
1120 key: &[u8],
1121 ) -> Result<(Vec<(PageId, usize)>, PageId)> {
1122 let mut path = Vec::with_capacity(self.depth as usize);
1123 let mut current = self.root;
1124 loop {
1125 let page = pages.get(¤t).ok_or(Error::PageOutOfBounds(current))?;
1126 match page.page_type() {
1127 Some(PageType::Leaf) => return Ok((path, current)),
1128 Some(PageType::Branch) => {
1129 let child_idx = branch_node::search_child_index(page, key);
1130 let child = branch_node::get_child(page, child_idx);
1131 path.push((current, child_idx));
1132 current = child;
1133 }
1134 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
1135 }
1136 }
1137 }
1138}
1139
1140pub fn cow_page(
1142 pages: &mut FxHashMap<PageId, Page>,
1143 alloc: &mut PageAllocator,
1144 old_id: PageId,
1145 txn_id: TxnId,
1146) -> PageId {
1147 if alloc.in_place() {
1148 let page = pages.get_mut(&old_id).unwrap();
1149 if page.txn_id() != txn_id {
1150 page.set_txn_id(txn_id);
1151 }
1152 return old_id;
1153 }
1154 let mut new_page = {
1155 let page = pages.get(&old_id).unwrap();
1156 if page.txn_id() == txn_id {
1157 return old_id;
1158 }
1159 page.clone()
1160 };
1161 let new_id = alloc.allocate();
1162 new_page.set_page_id(new_id);
1163 new_page.set_txn_id(txn_id);
1164 pages.insert(new_id, new_page);
1165 alloc.free(old_id);
1166 new_id
1167}
1168
1169fn update_branch_child(page: &mut Page, child_idx: usize, new_child: PageId) {
1171 let n = page.num_cells() as usize;
1172 if child_idx < n {
1173 let offset = page.cell_offset(child_idx as u16) as usize;
1174 page.data[offset..offset + 4].copy_from_slice(&new_child.as_u32().to_le_bytes());
1175 } else {
1176 page.set_right_child(new_child);
1177 }
1178}
1179
1180pub fn propagate_cow_up(
1181 pages: &mut FxHashMap<PageId, Page>,
1182 alloc: &mut PageAllocator,
1183 txn_id: TxnId,
1184 path: &mut [(PageId, usize)],
1185 mut new_child: PageId,
1186) -> PageId {
1187 for i in (0..path.len()).rev() {
1188 let (ancestor_id, child_idx) = path[i];
1189 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
1190 let page = pages.get_mut(&new_ancestor).unwrap();
1191 update_branch_child(page, child_idx, new_child);
1192 path[i] = (new_ancestor, child_idx);
1193 new_child = new_ancestor;
1194 }
1195 new_child
1196}
1197
1198fn split_leaf_with_insert(
1200 pages: &mut FxHashMap<PageId, Page>,
1201 alloc: &mut PageAllocator,
1202 txn_id: TxnId,
1203 leaf_id: PageId,
1204 key: &[u8],
1205 val_type: ValueType,
1206 value: &[u8],
1207) -> (Vec<u8>, PageId) {
1208 let mut cells: Vec<(Vec<u8>, Vec<u8>)> = {
1209 let page = pages.get(&leaf_id).unwrap();
1210 let n = page.num_cells() as usize;
1211 (0..n)
1212 .map(|i| {
1213 let cell = leaf_node::read_cell(page, i as u16);
1214 let raw = leaf_node::read_cell_bytes(page, i as u16);
1215 (cell.key.to_vec(), raw)
1216 })
1217 .collect()
1218 };
1219
1220 let new_raw = leaf_node::build_cell(key, val_type, value);
1221 match cells.binary_search_by(|(k, _)| k.as_slice().cmp(key)) {
1222 Ok(idx) => cells[idx] = (key.to_vec(), new_raw),
1223 Err(idx) => cells.insert(idx, (key.to_vec(), new_raw)),
1224 }
1225
1226 let total = cells.len();
1227
1228 let usable = citadel_core::constants::USABLE_SIZE;
1229 let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
1230 cum.push(0);
1231 for (_, raw) in &cells {
1232 cum.push(cum.last().unwrap() + raw.len());
1233 }
1234 let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
1235 let right_fits = |sp: usize| (cum[total] - cum[sp]) + (total - sp) * 2 <= usable;
1236
1237 let mut split_point = total / 2;
1238 if !left_fits(split_point) || !right_fits(split_point) {
1239 split_point = 1;
1240 for sp in 1..total {
1241 if left_fits(sp) && right_fits(sp) {
1242 split_point = sp;
1243 if sp >= total / 2 {
1244 break;
1245 }
1246 }
1247 }
1248 }
1249
1250 let sep_key = cells[split_point].0.clone();
1251
1252 {
1253 let left_refs: Vec<&[u8]> = cells[..split_point]
1254 .iter()
1255 .map(|(_, raw)| raw.as_slice())
1256 .collect();
1257 let page = pages.get_mut(&leaf_id).unwrap();
1258 page.rebuild_cells(&left_refs);
1259 }
1260
1261 let right_id = alloc.allocate();
1262 {
1263 let mut right_page = Page::new(right_id, PageType::Leaf, txn_id);
1264 let right_refs: Vec<&[u8]> = cells[split_point..]
1265 .iter()
1266 .map(|(_, raw)| raw.as_slice())
1267 .collect();
1268 right_page.rebuild_cells(&right_refs);
1269 pages.insert(right_id, right_page);
1270 }
1271
1272 (sep_key, right_id)
1273}
1274
1275#[allow(clippy::too_many_arguments)]
1276fn propagate_split_up(
1277 pages: &mut FxHashMap<PageId, Page>,
1278 alloc: &mut PageAllocator,
1279 txn_id: TxnId,
1280 path: &[(PageId, usize)],
1281 mut left_child: PageId,
1282 initial_sep: &[u8],
1283 mut right_child: PageId,
1284 depth: &mut u16,
1285) -> PageId {
1286 let mut sep_key = initial_sep.to_vec();
1287 let mut pending_split = true;
1288
1289 for &(ancestor_id, child_idx) in path.iter().rev() {
1290 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
1291
1292 if pending_split {
1293 let ok = {
1294 let page = pages.get_mut(&new_ancestor).unwrap();
1295 branch_node::insert_separator(page, child_idx, left_child, &sep_key, right_child)
1296 };
1297
1298 if ok {
1299 pending_split = false;
1300 left_child = new_ancestor;
1301 } else {
1302 let (new_sep, new_right) = split_branch_with_insert(
1303 pages,
1304 alloc,
1305 txn_id,
1306 new_ancestor,
1307 child_idx,
1308 left_child,
1309 &sep_key,
1310 right_child,
1311 );
1312 left_child = new_ancestor;
1313 sep_key = new_sep;
1314 right_child = new_right;
1315 }
1316 } else {
1317 let page = pages.get_mut(&new_ancestor).unwrap();
1318 update_branch_child(page, child_idx, left_child);
1319 left_child = new_ancestor;
1320 }
1321 }
1322
1323 if pending_split {
1324 let new_root_id = alloc.allocate();
1325 let mut new_root = Page::new(new_root_id, PageType::Branch, txn_id);
1326 let cell = branch_node::build_cell(left_child, &sep_key);
1327 new_root.write_cell(&cell).unwrap();
1328 new_root.set_right_child(right_child);
1329 pages.insert(new_root_id, new_root);
1330 *depth += 1;
1331 new_root_id
1332 } else {
1333 left_child
1334 }
1335}
1336
1337#[allow(clippy::too_many_arguments)]
1338fn split_branch_with_insert(
1339 pages: &mut FxHashMap<PageId, Page>,
1340 alloc: &mut PageAllocator,
1341 txn_id: TxnId,
1342 branch_id: PageId,
1343 child_idx: usize,
1344 new_left: PageId,
1345 sep_key: &[u8],
1346 new_right: PageId,
1347) -> (Vec<u8>, PageId) {
1348 let (new_cells, final_right_child) = {
1349 let page = pages.get(&branch_id).unwrap();
1350 let n = page.num_cells() as usize;
1351 let cells: Vec<(PageId, Vec<u8>)> = (0..n)
1352 .map(|i| {
1353 let cell = branch_node::read_cell(page, i as u16);
1354 (cell.child, cell.key.to_vec())
1355 })
1356 .collect();
1357 let old_rc = page.right_child();
1358
1359 let mut result = Vec::with_capacity(n + 1);
1360 let final_rc;
1361
1362 if child_idx < n {
1363 let old_key = cells[child_idx].1.clone();
1364 for (i, (child, key)) in cells.into_iter().enumerate() {
1365 if i == child_idx {
1366 result.push((new_left, sep_key.to_vec()));
1367 result.push((new_right, old_key.clone()));
1368 } else {
1369 result.push((child, key));
1370 }
1371 }
1372 final_rc = old_rc;
1373 } else {
1374 result = cells;
1375 result.push((new_left, sep_key.to_vec()));
1376 final_rc = new_right;
1377 }
1378
1379 (result, final_rc)
1380 };
1381
1382 let total = new_cells.len();
1383 let usable = citadel_core::constants::USABLE_SIZE;
1384 let raw_sizes: Vec<usize> = new_cells.iter().map(|(_, key)| 6 + key.len()).collect();
1385 let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
1386 cum.push(0);
1387 for &sz in &raw_sizes {
1388 cum.push(cum.last().unwrap() + sz);
1389 }
1390 let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
1391 let right_fits = |sp: usize| {
1392 let right_count = total - sp - 1;
1393 (cum[total] - cum[sp + 1]) + right_count * 2 <= usable
1394 };
1395
1396 let mut split_point = total / 2;
1397 if !left_fits(split_point) || !right_fits(split_point) {
1398 split_point = 1;
1399 for sp in 1..total.saturating_sub(1) {
1400 if left_fits(sp) && right_fits(sp) {
1401 split_point = sp;
1402 if sp >= total / 2 {
1403 break;
1404 }
1405 }
1406 }
1407 }
1408
1409 let promoted_sep = new_cells[split_point].1.clone();
1410 let promoted_child = new_cells[split_point].0;
1411
1412 {
1413 let left_raw: Vec<Vec<u8>> = new_cells[..split_point]
1414 .iter()
1415 .map(|(child, key)| branch_node::build_cell(*child, key))
1416 .collect();
1417 let left_refs: Vec<&[u8]> = left_raw.iter().map(|c| c.as_slice()).collect();
1418 let page = pages.get_mut(&branch_id).unwrap();
1419 page.rebuild_cells(&left_refs);
1420 page.set_right_child(promoted_child);
1421 }
1422
1423 let right_branch_id = alloc.allocate();
1424 {
1425 let mut right_page = Page::new(right_branch_id, PageType::Branch, txn_id);
1426 let right_raw: Vec<Vec<u8>> = new_cells[split_point + 1..]
1427 .iter()
1428 .map(|(child, key)| branch_node::build_cell(*child, key))
1429 .collect();
1430 let right_refs: Vec<&[u8]> = right_raw.iter().map(|c| c.as_slice()).collect();
1431 right_page.rebuild_cells(&right_refs);
1432 right_page.set_right_child(final_right_child);
1433 pages.insert(right_branch_id, right_page);
1434 }
1435
1436 (promoted_sep, right_branch_id)
1437}
1438
1439fn remove_child_from_branch(page: &mut Page, child_idx: usize) {
1440 let n = page.num_cells() as usize;
1441 if child_idx < n {
1442 let cell_sz = branch_node::get_cell_size(page, child_idx as u16);
1443 page.delete_cell_at(child_idx as u16, cell_sz);
1444 } else {
1445 assert!(n > 0, "cannot remove right_child from branch with 0 cells");
1446 let last_child = branch_node::read_cell(page, (n - 1) as u16).child;
1447 let cell_sz = branch_node::get_cell_size(page, (n - 1) as u16);
1448 page.delete_cell_at((n - 1) as u16, cell_sz);
1449 page.set_right_child(last_child);
1450 }
1451}
1452
1453fn propagate_remove_up(
1454 pages: &mut FxHashMap<PageId, Page>,
1455 alloc: &mut PageAllocator,
1456 txn_id: TxnId,
1457 path: &mut [(PageId, usize)],
1458 depth: &mut u16,
1459) -> PageId {
1460 let mut level = path.len();
1461 let mut need_remove_at_level = true;
1462 let mut new_child = PageId(0);
1463
1464 while level > 0 && need_remove_at_level {
1465 level -= 1;
1466 let (ancestor_id, child_idx) = path[level];
1467 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
1468
1469 {
1470 let page = pages.get_mut(&new_ancestor).unwrap();
1471 remove_child_from_branch(page, child_idx);
1472 }
1473
1474 let num_cells = pages.get(&new_ancestor).unwrap().num_cells();
1475
1476 if num_cells > 0 || level == 0 {
1477 if num_cells == 0 && level == 0 {
1478 let only_child = pages.get(&new_ancestor).unwrap().right_child();
1479 alloc.free(new_ancestor);
1480 pages.remove(&new_ancestor);
1481 *depth -= 1;
1482 return only_child;
1483 }
1484 new_child = new_ancestor;
1485 need_remove_at_level = false;
1486 } else {
1487 let only_child = pages.get(&new_ancestor).unwrap().right_child();
1488 alloc.free(new_ancestor);
1489 pages.remove(&new_ancestor);
1490 *depth -= 1;
1491
1492 new_child = only_child;
1493 need_remove_at_level = false;
1494 }
1495 }
1496
1497 if level > 0 {
1498 let remaining_path = &mut path[..level];
1499 new_child = propagate_cow_up(pages, alloc, txn_id, remaining_path, new_child);
1500 }
1501
1502 new_child
1503}
1504
1505#[cfg(test)]
1506#[path = "btree_tests.rs"]
1507mod tests;