1use crate::allocator::PageAllocator;
4use citadel_core::types::{PageId, PageType, TxnId, ValueType};
5use citadel_core::{Error, Result};
6use citadel_page::page::Page;
7use citadel_page::{branch_node, leaf_node};
8use rustc_hash::FxHashMap;
9
10#[derive(Clone)]
12pub struct BTree {
13 pub root: PageId,
14 pub depth: u16,
15 pub entry_count: u64,
16 last_insert: Option<(Vec<(PageId, usize)>, PageId)>,
17}
18
19#[derive(Debug, Clone)]
20pub enum UpsertOutcome {
21 Inserted,
22 Updated,
23 Skipped,
24}
25
26#[derive(Debug, Clone)]
27pub enum UpsertAction {
28 Replace(Vec<u8>),
29 Skip,
30}
31
32impl BTree {
33 pub fn new(
35 pages: &mut FxHashMap<PageId, Page>,
36 alloc: &mut PageAllocator,
37 txn_id: TxnId,
38 ) -> Self {
39 let root_id = alloc.allocate();
40 let root = Page::new(root_id, PageType::Leaf, txn_id);
41 pages.insert(root_id, root);
42 Self {
43 root: root_id,
44 depth: 1,
45 entry_count: 0,
46 last_insert: None,
47 }
48 }
49
50 pub fn from_existing(root: PageId, depth: u16, entry_count: u64) -> Self {
52 Self {
53 root,
54 depth,
55 entry_count,
56 last_insert: None,
57 }
58 }
59
60 pub fn search(
62 &self,
63 pages: &FxHashMap<PageId, Page>,
64 key: &[u8],
65 ) -> Result<Option<(ValueType, Vec<u8>)>> {
66 let mut current = self.root;
67 loop {
68 let page = pages.get(¤t).ok_or(Error::PageOutOfBounds(current))?;
69 match page.page_type() {
70 Some(PageType::Leaf) => {
71 return match leaf_node::search(page, key) {
72 Ok(idx) => {
73 let cell = leaf_node::read_cell(page, idx);
74 Ok(Some((cell.val_type, cell.value.to_vec())))
75 }
76 Err(_) => Ok(None),
77 };
78 }
79 Some(PageType::Branch) => {
80 let idx = branch_node::search_child_index(page, key);
81 current = branch_node::get_child(page, idx);
82 }
83 _ => {
84 return Err(Error::InvalidPageType(page.page_type_raw(), current));
85 }
86 }
87 }
88 }
89
90 pub fn lil_would_hit(&self, pages: &FxHashMap<PageId, Page>, key: &[u8]) -> bool {
91 if let Some((_, cached_leaf)) = &self.last_insert {
92 if let Some(page) = pages.get(cached_leaf) {
93 let n = page.num_cells();
94 return n > 0 && key > leaf_node::read_cell(page, n - 1).key;
95 }
96 }
97 false
98 }
99
100 pub fn try_lil_insert(
102 &mut self,
103 pages: &mut FxHashMap<PageId, Page>,
104 alloc: &mut PageAllocator,
105 txn_id: TxnId,
106 key: &[u8],
107 val_type: ValueType,
108 value: &[u8],
109 ) -> Result<Option<bool>> {
110 let cached_leaf = match self.last_insert.as_ref() {
111 Some((_, leaf)) => *leaf,
112 None => return Ok(None),
113 };
114 let (hit, needs_cow) = {
115 let page = pages
116 .get(&cached_leaf)
117 .ok_or(Error::PageOutOfBounds(cached_leaf))?;
118 let n = page.num_cells();
119 let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
120 let nc = page.txn_id() != txn_id;
121 (h, nc)
122 };
123 if !hit {
124 return Ok(None);
125 }
126 let mut cached_path = self.last_insert.take().unwrap().0;
127 let cow_id = if needs_cow {
128 cow_page(pages, alloc, cached_leaf, txn_id)
129 } else {
130 cached_leaf
131 };
132 let ok = {
133 let page = pages.get_mut(&cow_id).unwrap();
134 leaf_node::insert_append_direct(page, key, val_type, value)
135 };
136 if ok {
137 if cow_id != cached_leaf {
138 self.root = propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
139 }
140 self.entry_count += 1;
141 self.last_insert = Some((cached_path, cow_id));
142 return Ok(Some(true));
143 }
144 let (sep_key, right_id) =
145 split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
146 self.root = propagate_split_up(
147 pages,
148 alloc,
149 txn_id,
150 &cached_path,
151 cow_id,
152 &sep_key,
153 right_id,
154 &mut self.depth,
155 );
156 self.last_insert = None;
157 self.entry_count += 1;
158 Ok(Some(true))
159 }
160
161 pub fn insert(
163 &mut self,
164 pages: &mut FxHashMap<PageId, Page>,
165 alloc: &mut PageAllocator,
166 txn_id: TxnId,
167 key: &[u8],
168 val_type: ValueType,
169 value: &[u8],
170 ) -> Result<bool> {
171 if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
173 let (hit, needs_cow) = {
174 let page = pages
175 .get(&cached_leaf)
176 .ok_or(Error::PageOutOfBounds(cached_leaf))?;
177 let n = page.num_cells();
178 let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
179 let nc = page.txn_id() != txn_id;
180 (h, nc)
181 };
182 if hit {
183 let cow_id = if needs_cow {
184 cow_page(pages, alloc, cached_leaf, txn_id)
185 } else {
186 cached_leaf
187 };
188 let ok = {
189 let page = pages.get_mut(&cow_id).unwrap();
190 leaf_node::insert_direct(page, key, val_type, value)
191 };
192 if ok {
193 if cow_id != cached_leaf {
194 self.root =
195 propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
196 }
197 self.entry_count += 1;
198 self.last_insert = Some((cached_path, cow_id));
199 return Ok(true);
200 }
201 let (sep_key, right_id) =
202 split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
203 self.root = propagate_split_up(
204 pages,
205 alloc,
206 txn_id,
207 &cached_path,
208 cow_id,
209 &sep_key,
210 right_id,
211 &mut self.depth,
212 );
213 self.last_insert = None;
214 self.entry_count += 1;
215 return Ok(true);
216 }
217 }
218
219 let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
220 self.insert_at_leaf(pages, alloc, txn_id, key, val_type, value, path, leaf_id)
221 }
222
223 #[allow(clippy::too_many_arguments)]
224 #[inline]
225 pub fn insert_at_leaf(
226 &mut self,
227 pages: &mut FxHashMap<PageId, Page>,
228 alloc: &mut PageAllocator,
229 txn_id: TxnId,
230 key: &[u8],
231 val_type: ValueType,
232 value: &[u8],
233 path: Vec<(PageId, usize)>,
234 leaf_id: PageId,
235 ) -> Result<bool> {
236 let key_exists = {
237 let page = pages.get(&leaf_id).unwrap();
238 leaf_node::search(page, key).is_ok()
239 };
240
241 let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
242
243 let leaf_ok = {
244 let page = pages.get_mut(&new_leaf_id).unwrap();
245 leaf_node::insert_direct(page, key, val_type, value)
246 };
247
248 if leaf_ok {
249 if alloc.in_place() && new_leaf_id == leaf_id {
250 let mut is_rightmost = true;
251 for &(ancestor_id, child_idx) in path.iter().rev() {
252 let page = pages.get(&ancestor_id).unwrap();
253 if child_idx != page.num_cells() as usize {
254 is_rightmost = false;
255 break;
256 }
257 }
258 if is_rightmost {
259 self.last_insert = Some((path, new_leaf_id));
260 }
261 if !key_exists {
262 self.entry_count += 1;
263 }
264 return Ok(!key_exists);
265 }
266 let mut child = new_leaf_id;
267 let mut is_rightmost = true;
268 let mut new_path = path;
269 for i in (0..new_path.len()).rev() {
270 let (ancestor_id, child_idx) = new_path[i];
271 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
272 let page = pages.get_mut(&new_ancestor).unwrap();
273 update_branch_child(page, child_idx, child);
274 if child_idx != page.num_cells() as usize {
275 is_rightmost = false;
276 }
277 new_path[i] = (new_ancestor, child_idx);
278 child = new_ancestor;
279 }
280 self.root = child;
281
282 if is_rightmost {
283 self.last_insert = Some((new_path, new_leaf_id));
284 }
285
286 if !key_exists {
287 self.entry_count += 1;
288 }
289 return Ok(!key_exists);
290 }
291
292 self.last_insert = None;
293 let (sep_key, right_id) =
294 split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
295 self.root = propagate_split_up(
296 pages,
297 alloc,
298 txn_id,
299 &path,
300 new_leaf_id,
301 &sep_key,
302 right_id,
303 &mut self.depth,
304 );
305
306 if !key_exists {
307 self.entry_count += 1;
308 }
309 Ok(!key_exists)
310 }
311
312 pub fn insert_or_fetch(
313 &mut self,
314 pages: &mut FxHashMap<PageId, Page>,
315 alloc: &mut PageAllocator,
316 txn_id: TxnId,
317 key: &[u8],
318 val_type: ValueType,
319 value: &[u8],
320 ) -> Result<Option<Vec<u8>>> {
321 if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
322 let (hit, needs_cow) = {
323 let page = pages
324 .get(&cached_leaf)
325 .ok_or(Error::PageOutOfBounds(cached_leaf))?;
326 let n = page.num_cells();
327 let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
328 let nc = page.txn_id() != txn_id;
329 (h, nc)
330 };
331 if hit {
332 let cow_id = if needs_cow {
333 cow_page(pages, alloc, cached_leaf, txn_id)
334 } else {
335 cached_leaf
336 };
337 let ok = {
338 let page = pages.get_mut(&cow_id).unwrap();
339 leaf_node::insert_direct(page, key, val_type, value)
340 };
341 if ok {
342 if cow_id != cached_leaf {
343 self.root =
344 propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
345 }
346 self.entry_count += 1;
347 self.last_insert = Some((cached_path, cow_id));
348 return Ok(None);
349 }
350 let (sep_key, right_id) =
351 split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
352 self.root = propagate_split_up(
353 pages,
354 alloc,
355 txn_id,
356 &cached_path,
357 cow_id,
358 &sep_key,
359 right_id,
360 &mut self.depth,
361 );
362 self.last_insert = None;
363 self.entry_count += 1;
364 return Ok(None);
365 }
366 self.last_insert = Some((cached_path, cached_leaf));
367 }
368
369 let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
370
371 let existing_value = {
372 let page = pages.get(&leaf_id).unwrap();
373 match leaf_node::search(page, key) {
374 Ok(idx) => {
375 let cell = leaf_node::read_cell(page, idx);
376 if matches!(cell.val_type, ValueType::Tombstone) {
377 None
378 } else {
379 Some(cell.value.to_vec())
380 }
381 }
382 Err(_) => None,
383 }
384 };
385 if let Some(v) = existing_value {
386 return Ok(Some(v));
387 }
388
389 let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
390 let leaf_ok = {
391 let page = pages.get_mut(&new_leaf_id).unwrap();
392 leaf_node::insert_direct(page, key, val_type, value)
393 };
394
395 if leaf_ok {
396 if alloc.in_place() && new_leaf_id == leaf_id {
397 let mut is_rightmost = true;
398 for &(ancestor_id, child_idx) in path.iter().rev() {
399 let page = pages.get(&ancestor_id).unwrap();
400 if child_idx != page.num_cells() as usize {
401 is_rightmost = false;
402 break;
403 }
404 }
405 if is_rightmost {
406 self.last_insert = Some((path, new_leaf_id));
407 }
408 self.entry_count += 1;
409 return Ok(None);
410 }
411 let mut child = new_leaf_id;
412 let mut is_rightmost = true;
413 let mut new_path = path;
414 for i in (0..new_path.len()).rev() {
415 let (ancestor_id, child_idx) = new_path[i];
416 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
417 let page = pages.get_mut(&new_ancestor).unwrap();
418 update_branch_child(page, child_idx, child);
419 if child_idx != page.num_cells() as usize {
420 is_rightmost = false;
421 }
422 new_path[i] = (new_ancestor, child_idx);
423 child = new_ancestor;
424 }
425 self.root = child;
426
427 if is_rightmost {
428 self.last_insert = Some((new_path, new_leaf_id));
429 }
430 self.entry_count += 1;
431 return Ok(None);
432 }
433
434 self.last_insert = None;
435 let (sep_key, right_id) =
436 split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
437 self.root = propagate_split_up(
438 pages,
439 alloc,
440 txn_id,
441 &path,
442 new_leaf_id,
443 &sep_key,
444 right_id,
445 &mut self.depth,
446 );
447 self.entry_count += 1;
448 Ok(None)
449 }
450
451 #[inline]
452 pub fn insert_if_absent(
453 &mut self,
454 pages: &mut FxHashMap<PageId, Page>,
455 alloc: &mut PageAllocator,
456 txn_id: TxnId,
457 key: &[u8],
458 val_type: ValueType,
459 value: &[u8],
460 ) -> Result<bool> {
461 if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
462 let (hit, needs_cow) = {
463 let page = pages
464 .get(&cached_leaf)
465 .ok_or(Error::PageOutOfBounds(cached_leaf))?;
466 let n = page.num_cells();
467 let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
468 let nc = page.txn_id() != txn_id;
469 (h, nc)
470 };
471 if hit {
472 let cow_id = if needs_cow {
473 cow_page(pages, alloc, cached_leaf, txn_id)
474 } else {
475 cached_leaf
476 };
477 let ok = {
478 let page = pages.get_mut(&cow_id).unwrap();
479 leaf_node::insert_direct(page, key, val_type, value)
480 };
481 if ok {
482 if cow_id != cached_leaf {
483 self.root =
484 propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
485 }
486 self.entry_count += 1;
487 self.last_insert = Some((cached_path, cow_id));
488 return Ok(true);
489 }
490 let (sep_key, right_id) =
491 split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
492 self.root = propagate_split_up(
493 pages,
494 alloc,
495 txn_id,
496 &cached_path,
497 cow_id,
498 &sep_key,
499 right_id,
500 &mut self.depth,
501 );
502 self.last_insert = None;
503 self.entry_count += 1;
504 return Ok(true);
505 }
506 self.last_insert = Some((cached_path, cached_leaf));
507 }
508
509 let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
510 self.insert_if_absent_at_leaf(pages, alloc, txn_id, key, val_type, value, path, leaf_id)
511 }
512
513 #[allow(clippy::too_many_arguments)]
514 #[inline]
515 pub fn insert_if_absent_at_leaf(
516 &mut self,
517 pages: &mut FxHashMap<PageId, Page>,
518 alloc: &mut PageAllocator,
519 txn_id: TxnId,
520 key: &[u8],
521 val_type: ValueType,
522 value: &[u8],
523 path: Vec<(PageId, usize)>,
524 leaf_id: PageId,
525 ) -> Result<bool> {
526 let exists = {
527 let page = pages.get(&leaf_id).unwrap();
528 match leaf_node::search(page, key) {
529 Ok(idx) => {
530 let cell = leaf_node::read_cell(page, idx);
531 !matches!(cell.val_type, ValueType::Tombstone)
532 }
533 Err(_) => false,
534 }
535 };
536 if exists {
537 return Ok(false);
538 }
539
540 let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
541 let leaf_ok = {
542 let page = pages.get_mut(&new_leaf_id).unwrap();
543 leaf_node::insert_direct(page, key, val_type, value)
544 };
545
546 if leaf_ok {
547 if alloc.in_place() && new_leaf_id == leaf_id {
548 let mut is_rightmost = true;
549 for &(ancestor_id, child_idx) in path.iter().rev() {
550 let page = pages.get(&ancestor_id).unwrap();
551 if child_idx != page.num_cells() as usize {
552 is_rightmost = false;
553 break;
554 }
555 }
556 if is_rightmost {
557 self.last_insert = Some((path, new_leaf_id));
558 }
559 self.entry_count += 1;
560 return Ok(true);
561 }
562 let mut child = new_leaf_id;
563 let mut is_rightmost = true;
564 let mut new_path = path;
565 for i in (0..new_path.len()).rev() {
566 let (ancestor_id, child_idx) = new_path[i];
567 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
568 let page = pages.get_mut(&new_ancestor).unwrap();
569 update_branch_child(page, child_idx, child);
570 if child_idx != page.num_cells() as usize {
571 is_rightmost = false;
572 }
573 new_path[i] = (new_ancestor, child_idx);
574 child = new_ancestor;
575 }
576 self.root = child;
577
578 if is_rightmost {
579 self.last_insert = Some((new_path, new_leaf_id));
580 }
581 self.entry_count += 1;
582 return Ok(true);
583 }
584
585 self.last_insert = None;
586 let (sep_key, right_id) =
587 split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
588 self.root = propagate_split_up(
589 pages,
590 alloc,
591 txn_id,
592 &path,
593 new_leaf_id,
594 &sep_key,
595 right_id,
596 &mut self.depth,
597 );
598 self.entry_count += 1;
599 Ok(true)
600 }
601
602 #[allow(clippy::too_many_arguments)]
603 #[inline]
604 pub fn upsert_with<F, E>(
605 &mut self,
606 pages: &mut FxHashMap<PageId, Page>,
607 alloc: &mut PageAllocator,
608 txn_id: TxnId,
609 key: &[u8],
610 val_type: ValueType,
611 default_value: &[u8],
612 f: F,
613 ) -> std::result::Result<UpsertOutcome, E>
614 where
615 F: FnMut(&[u8]) -> std::result::Result<UpsertAction, E>,
616 E: From<Error>,
617 {
618 if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
619 let (hit, needs_cow) = {
620 let page = pages
621 .get(&cached_leaf)
622 .ok_or(Error::PageOutOfBounds(cached_leaf))?;
623 let n = page.num_cells();
624 let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
625 let nc = page.txn_id() != txn_id;
626 (h, nc)
627 };
628 if hit {
629 let cow_id = if needs_cow {
630 cow_page(pages, alloc, cached_leaf, txn_id)
631 } else {
632 cached_leaf
633 };
634 let ok = {
635 let page = pages.get_mut(&cow_id).unwrap();
636 leaf_node::insert_direct(page, key, val_type, default_value)
637 };
638 if ok {
639 if cow_id != cached_leaf {
640 self.root =
641 propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
642 }
643 self.entry_count += 1;
644 self.last_insert = Some((cached_path, cow_id));
645 return Ok(UpsertOutcome::Inserted);
646 }
647 let (sep_key, right_id) = split_leaf_with_insert(
648 pages,
649 alloc,
650 txn_id,
651 cow_id,
652 key,
653 val_type,
654 default_value,
655 );
656 self.root = propagate_split_up(
657 pages,
658 alloc,
659 txn_id,
660 &cached_path,
661 cow_id,
662 &sep_key,
663 right_id,
664 &mut self.depth,
665 );
666 self.last_insert = None;
667 self.entry_count += 1;
668 return Ok(UpsertOutcome::Inserted);
669 }
670 self.last_insert = Some((cached_path, cached_leaf));
671 }
672
673 let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
674 self.upsert_with_at_leaf(
675 pages,
676 alloc,
677 txn_id,
678 key,
679 val_type,
680 default_value,
681 path,
682 leaf_id,
683 f,
684 )
685 }
686
687 #[allow(clippy::too_many_arguments)]
688 #[inline]
689 pub fn upsert_with_at_leaf<F, E>(
690 &mut self,
691 pages: &mut FxHashMap<PageId, Page>,
692 alloc: &mut PageAllocator,
693 txn_id: TxnId,
694 key: &[u8],
695 val_type: ValueType,
696 default_value: &[u8],
697 path: Vec<(PageId, usize)>,
698 leaf_id: PageId,
699 mut f: F,
700 ) -> std::result::Result<UpsertOutcome, E>
701 where
702 F: FnMut(&[u8]) -> std::result::Result<UpsertAction, E>,
703 E: From<Error>,
704 {
705 let action = {
706 let page = pages.get(&leaf_id).unwrap();
707 match leaf_node::search(page, key) {
708 Ok(idx) => {
709 let cell = leaf_node::read_cell(page, idx);
710 if matches!(cell.val_type, ValueType::Tombstone) {
711 None
712 } else {
713 Some(f(cell.value)?)
714 }
715 }
716 Err(_) => None,
717 }
718 };
719
720 if let Some(act) = action {
721 match act {
722 UpsertAction::Skip => return Ok(UpsertOutcome::Skipped),
723 UpsertAction::Replace(new_bytes) => {
724 let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
725 let leaf_ok = {
726 let page = pages.get_mut(&new_leaf_id).unwrap();
727 leaf_node::insert_direct(page, key, val_type, &new_bytes)
728 };
729 if leaf_ok {
730 if new_leaf_id != leaf_id {
731 let mut new_path = path;
732 self.root =
733 propagate_cow_up(pages, alloc, txn_id, &mut new_path, new_leaf_id);
734 }
735 return Ok(UpsertOutcome::Updated);
736 }
737 self.last_insert = None;
738 let (sep_key, right_id) = split_leaf_with_insert(
739 pages,
740 alloc,
741 txn_id,
742 new_leaf_id,
743 key,
744 val_type,
745 &new_bytes,
746 );
747 self.root = propagate_split_up(
748 pages,
749 alloc,
750 txn_id,
751 &path,
752 new_leaf_id,
753 &sep_key,
754 right_id,
755 &mut self.depth,
756 );
757 return Ok(UpsertOutcome::Updated);
758 }
759 }
760 }
761
762 let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
763 let leaf_ok = {
764 let page = pages.get_mut(&new_leaf_id).unwrap();
765 leaf_node::insert_direct(page, key, val_type, default_value)
766 };
767
768 if leaf_ok {
769 if alloc.in_place() && new_leaf_id == leaf_id {
770 let mut is_rightmost = true;
771 for &(ancestor_id, child_idx) in path.iter().rev() {
772 let page = pages.get(&ancestor_id).unwrap();
773 if child_idx != page.num_cells() as usize {
774 is_rightmost = false;
775 break;
776 }
777 }
778 if is_rightmost {
779 self.last_insert = Some((path, new_leaf_id));
780 }
781 self.entry_count += 1;
782 return Ok(UpsertOutcome::Inserted);
783 }
784 let mut child = new_leaf_id;
785 let mut is_rightmost = true;
786 let mut new_path = path;
787 for i in (0..new_path.len()).rev() {
788 let (ancestor_id, child_idx) = new_path[i];
789 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
790 let page = pages.get_mut(&new_ancestor).unwrap();
791 update_branch_child(page, child_idx, child);
792 if child_idx != page.num_cells() as usize {
793 is_rightmost = false;
794 }
795 new_path[i] = (new_ancestor, child_idx);
796 child = new_ancestor;
797 }
798 self.root = child;
799
800 if is_rightmost {
801 self.last_insert = Some((new_path, new_leaf_id));
802 }
803 self.entry_count += 1;
804 return Ok(UpsertOutcome::Inserted);
805 }
806
807 self.last_insert = None;
808 let (sep_key, right_id) = split_leaf_with_insert(
809 pages,
810 alloc,
811 txn_id,
812 new_leaf_id,
813 key,
814 val_type,
815 default_value,
816 );
817 self.root = propagate_split_up(
818 pages,
819 alloc,
820 txn_id,
821 &path,
822 new_leaf_id,
823 &sep_key,
824 right_id,
825 &mut self.depth,
826 );
827 self.entry_count += 1;
828 Ok(UpsertOutcome::Inserted)
829 }
830
831 pub fn update_sorted(
833 &mut self,
834 pages: &mut FxHashMap<PageId, Page>,
835 alloc: &mut PageAllocator,
836 txn_id: TxnId,
837 pairs: &[(&[u8], &[u8])],
838 ) -> Result<u64> {
839 if pairs.is_empty() {
840 return Ok(0);
841 }
842 self.last_insert = None;
843
844 let (mut path, mut leaf_id) = self.walk_to_leaf(pages, pairs[0].0)?;
845 let mut cow_leaf = cow_page(pages, alloc, leaf_id, txn_id);
846 if cow_leaf != leaf_id {
847 self.root = propagate_cow_up(pages, alloc, txn_id, &mut path, cow_leaf);
848 }
849
850 let mut count: u64 = 0;
851 let mut hint: u16 = 0;
852
853 for &(key, value) in pairs {
854 let past_leaf = {
855 let page = pages.get(&cow_leaf).unwrap();
856 let n = page.num_cells();
857 n == 0 || key > leaf_node::read_cell(page, n - 1).key
858 };
859
860 if past_leaf {
861 let (new_path, new_leaf) = self.walk_to_leaf(pages, key)?;
862 path = new_path;
863 leaf_id = new_leaf;
864 cow_leaf = cow_page(pages, alloc, leaf_id, txn_id);
865 if cow_leaf != leaf_id {
866 self.root = propagate_cow_up(pages, alloc, txn_id, &mut path, cow_leaf);
867 }
868 hint = 0;
869 }
870
871 let page = pages.get(&cow_leaf).unwrap();
872 let n = page.num_cells();
873 let idx = {
874 let mut i = hint;
875 loop {
876 if i >= n {
877 break None;
878 }
879 let cell = leaf_node::read_cell(page, i);
880 match key.cmp(cell.key) {
881 std::cmp::Ordering::Equal => break Some(i),
882 std::cmp::Ordering::Less => break None,
883 std::cmp::Ordering::Greater => i += 1,
884 }
885 }
886 };
887
888 if let Some(idx) = idx {
889 hint = idx + 1;
890 let page = pages.get_mut(&cow_leaf).unwrap();
891 if !leaf_node::update_value_in_place(page, idx, ValueType::Inline, value) {
892 leaf_node::insert_direct(page, key, ValueType::Inline, value);
893 }
894 count += 1;
895 }
896 }
897
898 Ok(count)
899 }
900
901 pub fn delete(
903 &mut self,
904 pages: &mut FxHashMap<PageId, Page>,
905 alloc: &mut PageAllocator,
906 txn_id: TxnId,
907 key: &[u8],
908 ) -> Result<bool> {
909 self.last_insert = None;
910 let (mut path, leaf_id) = self.walk_to_leaf(pages, key)?;
911
912 let found = {
913 let page = pages.get(&leaf_id).unwrap();
914 leaf_node::search(page, key).is_ok()
915 };
916 if !found {
917 return Ok(false);
918 }
919
920 let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
921 {
922 let page = pages.get_mut(&new_leaf_id).unwrap();
923 leaf_node::delete(page, key);
924 }
925
926 let leaf_empty = pages.get(&new_leaf_id).unwrap().num_cells() == 0;
927
928 if !leaf_empty || path.is_empty() {
929 if alloc.in_place() && new_leaf_id == leaf_id {
930 self.entry_count -= 1;
931 return Ok(true);
932 }
933 self.root = propagate_cow_up(pages, alloc, txn_id, &mut path, new_leaf_id);
934 self.entry_count -= 1;
935 return Ok(true);
936 }
937
938 alloc.free(new_leaf_id);
939 pages.remove(&new_leaf_id);
940
941 self.root = propagate_remove_up(pages, alloc, txn_id, &mut path, &mut self.depth);
942 self.entry_count -= 1;
943 Ok(true)
944 }
945
946 pub fn walk_to_leaf(
948 &self,
949 pages: &FxHashMap<PageId, Page>,
950 key: &[u8],
951 ) -> Result<(Vec<(PageId, usize)>, PageId)> {
952 let mut path = Vec::with_capacity(self.depth as usize);
953 let mut current = self.root;
954 loop {
955 let page = pages.get(¤t).ok_or(Error::PageOutOfBounds(current))?;
956 match page.page_type() {
957 Some(PageType::Leaf) => return Ok((path, current)),
958 Some(PageType::Branch) => {
959 let child_idx = branch_node::search_child_index(page, key);
960 let child = branch_node::get_child(page, child_idx);
961 path.push((current, child_idx));
962 current = child;
963 }
964 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
965 }
966 }
967 }
968}
969
970pub fn cow_page(
972 pages: &mut FxHashMap<PageId, Page>,
973 alloc: &mut PageAllocator,
974 old_id: PageId,
975 txn_id: TxnId,
976) -> PageId {
977 if alloc.in_place() {
978 let page = pages.get_mut(&old_id).unwrap();
979 if page.txn_id() != txn_id {
980 page.set_txn_id(txn_id);
981 }
982 return old_id;
983 }
984 let mut new_page = {
985 let page = pages.get(&old_id).unwrap();
986 if page.txn_id() == txn_id {
987 return old_id;
988 }
989 page.clone()
990 };
991 let new_id = alloc.allocate();
992 new_page.set_page_id(new_id);
993 new_page.set_txn_id(txn_id);
994 pages.insert(new_id, new_page);
995 alloc.free(old_id);
996 new_id
997}
998
999fn update_branch_child(page: &mut Page, child_idx: usize, new_child: PageId) {
1001 let n = page.num_cells() as usize;
1002 if child_idx < n {
1003 let offset = page.cell_offset(child_idx as u16) as usize;
1004 page.data[offset..offset + 4].copy_from_slice(&new_child.as_u32().to_le_bytes());
1005 } else {
1006 page.set_right_child(new_child);
1007 }
1008}
1009
1010pub fn propagate_cow_up(
1014 pages: &mut FxHashMap<PageId, Page>,
1015 alloc: &mut PageAllocator,
1016 txn_id: TxnId,
1017 path: &mut [(PageId, usize)],
1018 mut new_child: PageId,
1019) -> PageId {
1020 for i in (0..path.len()).rev() {
1021 let (ancestor_id, child_idx) = path[i];
1022 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
1023 let page = pages.get_mut(&new_ancestor).unwrap();
1024 update_branch_child(page, child_idx, new_child);
1025 path[i] = (new_ancestor, child_idx);
1026 new_child = new_ancestor;
1027 }
1028 new_child
1029}
1030
1031fn split_leaf_with_insert(
1033 pages: &mut FxHashMap<PageId, Page>,
1034 alloc: &mut PageAllocator,
1035 txn_id: TxnId,
1036 leaf_id: PageId,
1037 key: &[u8],
1038 val_type: ValueType,
1039 value: &[u8],
1040) -> (Vec<u8>, PageId) {
1041 let mut cells: Vec<(Vec<u8>, Vec<u8>)> = {
1042 let page = pages.get(&leaf_id).unwrap();
1043 let n = page.num_cells() as usize;
1044 (0..n)
1045 .map(|i| {
1046 let cell = leaf_node::read_cell(page, i as u16);
1047 let raw = leaf_node::read_cell_bytes(page, i as u16);
1048 (cell.key.to_vec(), raw)
1049 })
1050 .collect()
1051 };
1052
1053 let new_raw = leaf_node::build_cell(key, val_type, value);
1054 match cells.binary_search_by(|(k, _)| k.as_slice().cmp(key)) {
1055 Ok(idx) => cells[idx] = (key.to_vec(), new_raw),
1056 Err(idx) => cells.insert(idx, (key.to_vec(), new_raw)),
1057 }
1058
1059 let total = cells.len();
1060
1061 let usable = citadel_core::constants::USABLE_SIZE;
1062 let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
1063 cum.push(0);
1064 for (_, raw) in &cells {
1065 cum.push(cum.last().unwrap() + raw.len());
1066 }
1067 let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
1068 let right_fits = |sp: usize| (cum[total] - cum[sp]) + (total - sp) * 2 <= usable;
1069
1070 let mut split_point = total / 2;
1071 if !left_fits(split_point) || !right_fits(split_point) {
1072 split_point = 1;
1073 for sp in 1..total {
1074 if left_fits(sp) && right_fits(sp) {
1075 split_point = sp;
1076 if sp >= total / 2 {
1077 break;
1078 }
1079 }
1080 }
1081 }
1082
1083 let sep_key = cells[split_point].0.clone();
1084
1085 {
1086 let left_refs: Vec<&[u8]> = cells[..split_point]
1087 .iter()
1088 .map(|(_, raw)| raw.as_slice())
1089 .collect();
1090 let page = pages.get_mut(&leaf_id).unwrap();
1091 page.rebuild_cells(&left_refs);
1092 }
1093
1094 let right_id = alloc.allocate();
1095 {
1096 let mut right_page = Page::new(right_id, PageType::Leaf, txn_id);
1097 let right_refs: Vec<&[u8]> = cells[split_point..]
1098 .iter()
1099 .map(|(_, raw)| raw.as_slice())
1100 .collect();
1101 right_page.rebuild_cells(&right_refs);
1102 pages.insert(right_id, right_page);
1103 }
1104
1105 (sep_key, right_id)
1106}
1107
1108#[allow(clippy::too_many_arguments)]
1109fn propagate_split_up(
1110 pages: &mut FxHashMap<PageId, Page>,
1111 alloc: &mut PageAllocator,
1112 txn_id: TxnId,
1113 path: &[(PageId, usize)],
1114 mut left_child: PageId,
1115 initial_sep: &[u8],
1116 mut right_child: PageId,
1117 depth: &mut u16,
1118) -> PageId {
1119 let mut sep_key = initial_sep.to_vec();
1120 let mut pending_split = true;
1121
1122 for &(ancestor_id, child_idx) in path.iter().rev() {
1123 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
1124
1125 if pending_split {
1126 let ok = {
1127 let page = pages.get_mut(&new_ancestor).unwrap();
1128 branch_node::insert_separator(page, child_idx, left_child, &sep_key, right_child)
1129 };
1130
1131 if ok {
1132 pending_split = false;
1133 left_child = new_ancestor;
1134 } else {
1135 let (new_sep, new_right) = split_branch_with_insert(
1136 pages,
1137 alloc,
1138 txn_id,
1139 new_ancestor,
1140 child_idx,
1141 left_child,
1142 &sep_key,
1143 right_child,
1144 );
1145 left_child = new_ancestor;
1146 sep_key = new_sep;
1147 right_child = new_right;
1148 }
1149 } else {
1150 let page = pages.get_mut(&new_ancestor).unwrap();
1151 update_branch_child(page, child_idx, left_child);
1152 left_child = new_ancestor;
1153 }
1154 }
1155
1156 if pending_split {
1157 let new_root_id = alloc.allocate();
1158 let mut new_root = Page::new(new_root_id, PageType::Branch, txn_id);
1159 let cell = branch_node::build_cell(left_child, &sep_key);
1160 new_root.write_cell(&cell).unwrap();
1161 new_root.set_right_child(right_child);
1162 pages.insert(new_root_id, new_root);
1163 *depth += 1;
1164 new_root_id
1165 } else {
1166 left_child
1167 }
1168}
1169
1170#[allow(clippy::too_many_arguments)]
1171fn split_branch_with_insert(
1172 pages: &mut FxHashMap<PageId, Page>,
1173 alloc: &mut PageAllocator,
1174 txn_id: TxnId,
1175 branch_id: PageId,
1176 child_idx: usize,
1177 new_left: PageId,
1178 sep_key: &[u8],
1179 new_right: PageId,
1180) -> (Vec<u8>, PageId) {
1181 let (new_cells, final_right_child) = {
1182 let page = pages.get(&branch_id).unwrap();
1183 let n = page.num_cells() as usize;
1184 let cells: Vec<(PageId, Vec<u8>)> = (0..n)
1185 .map(|i| {
1186 let cell = branch_node::read_cell(page, i as u16);
1187 (cell.child, cell.key.to_vec())
1188 })
1189 .collect();
1190 let old_rc = page.right_child();
1191
1192 let mut result = Vec::with_capacity(n + 1);
1193 let final_rc;
1194
1195 if child_idx < n {
1196 let old_key = cells[child_idx].1.clone();
1197 for (i, (child, key)) in cells.into_iter().enumerate() {
1198 if i == child_idx {
1199 result.push((new_left, sep_key.to_vec()));
1200 result.push((new_right, old_key.clone()));
1201 } else {
1202 result.push((child, key));
1203 }
1204 }
1205 final_rc = old_rc;
1206 } else {
1207 result = cells;
1208 result.push((new_left, sep_key.to_vec()));
1209 final_rc = new_right;
1210 }
1211
1212 (result, final_rc)
1213 };
1214
1215 let total = new_cells.len();
1216 let usable = citadel_core::constants::USABLE_SIZE;
1217 let raw_sizes: Vec<usize> = new_cells.iter().map(|(_, key)| 6 + key.len()).collect();
1218 let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
1219 cum.push(0);
1220 for &sz in &raw_sizes {
1221 cum.push(cum.last().unwrap() + sz);
1222 }
1223 let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
1224 let right_fits = |sp: usize| {
1225 let right_count = total - sp - 1;
1226 (cum[total] - cum[sp + 1]) + right_count * 2 <= usable
1227 };
1228
1229 let mut split_point = total / 2;
1230 if !left_fits(split_point) || !right_fits(split_point) {
1231 split_point = 1;
1232 for sp in 1..total.saturating_sub(1) {
1233 if left_fits(sp) && right_fits(sp) {
1234 split_point = sp;
1235 if sp >= total / 2 {
1236 break;
1237 }
1238 }
1239 }
1240 }
1241
1242 let promoted_sep = new_cells[split_point].1.clone();
1243 let promoted_child = new_cells[split_point].0;
1244
1245 {
1246 let left_raw: Vec<Vec<u8>> = new_cells[..split_point]
1247 .iter()
1248 .map(|(child, key)| branch_node::build_cell(*child, key))
1249 .collect();
1250 let left_refs: Vec<&[u8]> = left_raw.iter().map(|c| c.as_slice()).collect();
1251 let page = pages.get_mut(&branch_id).unwrap();
1252 page.rebuild_cells(&left_refs);
1253 page.set_right_child(promoted_child);
1254 }
1255
1256 let right_branch_id = alloc.allocate();
1257 {
1258 let mut right_page = Page::new(right_branch_id, PageType::Branch, txn_id);
1259 let right_raw: Vec<Vec<u8>> = new_cells[split_point + 1..]
1260 .iter()
1261 .map(|(child, key)| branch_node::build_cell(*child, key))
1262 .collect();
1263 let right_refs: Vec<&[u8]> = right_raw.iter().map(|c| c.as_slice()).collect();
1264 right_page.rebuild_cells(&right_refs);
1265 right_page.set_right_child(final_right_child);
1266 pages.insert(right_branch_id, right_page);
1267 }
1268
1269 (promoted_sep, right_branch_id)
1270}
1271
1272fn remove_child_from_branch(page: &mut Page, child_idx: usize) {
1273 let n = page.num_cells() as usize;
1274 if child_idx < n {
1275 let cell_sz = branch_node::get_cell_size(page, child_idx as u16);
1276 page.delete_cell_at(child_idx as u16, cell_sz);
1277 } else {
1278 assert!(n > 0, "cannot remove right_child from branch with 0 cells");
1279 let last_child = branch_node::read_cell(page, (n - 1) as u16).child;
1280 let cell_sz = branch_node::get_cell_size(page, (n - 1) as u16);
1281 page.delete_cell_at((n - 1) as u16, cell_sz);
1282 page.set_right_child(last_child);
1283 }
1284}
1285
1286fn propagate_remove_up(
1287 pages: &mut FxHashMap<PageId, Page>,
1288 alloc: &mut PageAllocator,
1289 txn_id: TxnId,
1290 path: &mut [(PageId, usize)],
1291 depth: &mut u16,
1292) -> PageId {
1293 let mut level = path.len();
1294 let mut need_remove_at_level = true;
1295 let mut new_child = PageId(0);
1296
1297 while level > 0 && need_remove_at_level {
1298 level -= 1;
1299 let (ancestor_id, child_idx) = path[level];
1300 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
1301
1302 {
1303 let page = pages.get_mut(&new_ancestor).unwrap();
1304 remove_child_from_branch(page, child_idx);
1305 }
1306
1307 let num_cells = pages.get(&new_ancestor).unwrap().num_cells();
1308
1309 if num_cells > 0 || level == 0 {
1310 if num_cells == 0 && level == 0 {
1311 let only_child = pages.get(&new_ancestor).unwrap().right_child();
1313 alloc.free(new_ancestor);
1314 pages.remove(&new_ancestor);
1315 *depth -= 1;
1316 return only_child;
1317 }
1318 new_child = new_ancestor;
1320 need_remove_at_level = false;
1321 } else {
1322 let only_child = pages.get(&new_ancestor).unwrap().right_child();
1324 alloc.free(new_ancestor);
1325 pages.remove(&new_ancestor);
1326 *depth -= 1;
1327
1328 new_child = only_child;
1329 need_remove_at_level = false;
1330 }
1331 }
1332
1333 if level > 0 {
1334 let remaining_path = &mut path[..level];
1335 new_child = propagate_cow_up(pages, alloc, txn_id, remaining_path, new_child);
1336 }
1337
1338 new_child
1339}
1340
1341#[cfg(test)]
1342#[path = "btree_tests.rs"]
1343mod tests;