1use crate::allocator::PageAllocator;
4use citadel_core::types::{PageId, PageType, TxnId, ValueType};
5use citadel_core::{Error, Result};
6use citadel_page::page::Page;
7use citadel_page::{branch_node, leaf_node};
8use rustc_hash::FxHashMap;
9
10#[derive(Clone)]
12pub struct BTree {
13 pub root: PageId,
14 pub depth: u16,
15 pub entry_count: u64,
16 last_insert: Option<(Vec<(PageId, usize)>, PageId)>,
17}
18
19#[derive(Debug, Clone)]
20pub enum UpsertOutcome {
21 Inserted,
22 Updated,
23 Skipped,
24}
25
26#[derive(Debug, Clone)]
27pub enum UpsertAction {
28 Replace(Vec<u8>),
29 Skip,
30}
31
32impl BTree {
33 pub fn new(
35 pages: &mut FxHashMap<PageId, Page>,
36 alloc: &mut PageAllocator,
37 txn_id: TxnId,
38 ) -> Self {
39 let root_id = alloc.allocate();
40 let root = Page::new(root_id, PageType::Leaf, txn_id);
41 pages.insert(root_id, root);
42 Self {
43 root: root_id,
44 depth: 1,
45 entry_count: 0,
46 last_insert: None,
47 }
48 }
49
50 pub fn from_existing(root: PageId, depth: u16, entry_count: u64) -> Self {
52 Self {
53 root,
54 depth,
55 entry_count,
56 last_insert: None,
57 }
58 }
59
60 pub fn search_at_leaf(
61 pages: &FxHashMap<PageId, Page>,
62 leaf_id: PageId,
63 key: &[u8],
64 ) -> Result<Option<(ValueType, Vec<u8>)>> {
65 let page = pages.get(&leaf_id).ok_or(Error::PageOutOfBounds(leaf_id))?;
66 match leaf_node::search(page, key) {
67 Ok(idx) => {
68 let cell = leaf_node::read_cell(page, idx);
69 Ok(Some((cell.val_type, cell.value.to_vec())))
70 }
71 Err(_) => Ok(None),
72 }
73 }
74
75 pub fn search(
76 &self,
77 pages: &FxHashMap<PageId, Page>,
78 key: &[u8],
79 ) -> Result<Option<(ValueType, Vec<u8>)>> {
80 let mut current = self.root;
81 loop {
82 let page = pages.get(¤t).ok_or(Error::PageOutOfBounds(current))?;
83 match page.page_type() {
84 Some(PageType::Leaf) => {
85 return match leaf_node::search(page, key) {
86 Ok(idx) => {
87 let cell = leaf_node::read_cell(page, idx);
88 Ok(Some((cell.val_type, cell.value.to_vec())))
89 }
90 Err(_) => Ok(None),
91 };
92 }
93 Some(PageType::Branch) => {
94 let idx = branch_node::search_child_index(page, key);
95 current = branch_node::get_child(page, idx);
96 }
97 _ => {
98 return Err(Error::InvalidPageType(page.page_type_raw(), current));
99 }
100 }
101 }
102 }
103
104 pub fn lil_would_hit(&self, pages: &FxHashMap<PageId, Page>, key: &[u8]) -> bool {
105 if let Some((_, cached_leaf)) = &self.last_insert {
106 if let Some(page) = pages.get(cached_leaf) {
107 let n = page.num_cells();
108 return n > 0 && key > leaf_node::read_cell(page, n - 1).key;
109 }
110 }
111 false
112 }
113
114 pub fn try_lil_insert(
116 &mut self,
117 pages: &mut FxHashMap<PageId, Page>,
118 alloc: &mut PageAllocator,
119 txn_id: TxnId,
120 key: &[u8],
121 val_type: ValueType,
122 value: &[u8],
123 ) -> Result<Option<bool>> {
124 let cached_leaf = match self.last_insert.as_ref() {
125 Some((_, leaf)) => *leaf,
126 None => return Ok(None),
127 };
128 let (hit, needs_cow) = {
129 let page = pages
130 .get(&cached_leaf)
131 .ok_or(Error::PageOutOfBounds(cached_leaf))?;
132 let n = page.num_cells();
133 let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
134 let nc = page.txn_id() != txn_id;
135 (h, nc)
136 };
137 if !hit {
138 return Ok(None);
139 }
140 let mut cached_path = self.last_insert.take().unwrap().0;
141 let cow_id = if needs_cow {
142 cow_page(pages, alloc, cached_leaf, txn_id)
143 } else {
144 cached_leaf
145 };
146 let ok = {
147 let page = pages.get_mut(&cow_id).unwrap();
148 leaf_node::insert_append_direct(page, key, val_type, value)
149 };
150 if ok {
151 if cow_id != cached_leaf {
152 self.root = propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
153 }
154 self.entry_count += 1;
155 self.last_insert = Some((cached_path, cow_id));
156 return Ok(Some(true));
157 }
158 let (sep_key, right_id) =
159 split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
160 self.root = propagate_split_up(
161 pages,
162 alloc,
163 txn_id,
164 &cached_path,
165 cow_id,
166 &sep_key,
167 right_id,
168 &mut self.depth,
169 );
170 self.last_insert = None;
171 self.entry_count += 1;
172 Ok(Some(true))
173 }
174
175 pub fn insert(
177 &mut self,
178 pages: &mut FxHashMap<PageId, Page>,
179 alloc: &mut PageAllocator,
180 txn_id: TxnId,
181 key: &[u8],
182 val_type: ValueType,
183 value: &[u8],
184 ) -> Result<bool> {
185 if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
187 let (hit, needs_cow) = {
188 let page = pages
189 .get(&cached_leaf)
190 .ok_or(Error::PageOutOfBounds(cached_leaf))?;
191 let n = page.num_cells();
192 let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
193 let nc = page.txn_id() != txn_id;
194 (h, nc)
195 };
196 if hit {
197 let cow_id = if needs_cow {
198 cow_page(pages, alloc, cached_leaf, txn_id)
199 } else {
200 cached_leaf
201 };
202 let ok = {
203 let page = pages.get_mut(&cow_id).unwrap();
204 leaf_node::insert_direct(page, key, val_type, value)
205 };
206 if ok {
207 if cow_id != cached_leaf {
208 self.root =
209 propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
210 }
211 self.entry_count += 1;
212 self.last_insert = Some((cached_path, cow_id));
213 return Ok(true);
214 }
215 let (sep_key, right_id) =
216 split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
217 self.root = propagate_split_up(
218 pages,
219 alloc,
220 txn_id,
221 &cached_path,
222 cow_id,
223 &sep_key,
224 right_id,
225 &mut self.depth,
226 );
227 self.last_insert = None;
228 self.entry_count += 1;
229 return Ok(true);
230 }
231 }
232
233 let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
234 self.insert_at_leaf(pages, alloc, txn_id, key, val_type, value, path, leaf_id)
235 }
236
237 #[allow(clippy::too_many_arguments)]
238 #[inline]
239 pub fn insert_at_leaf(
240 &mut self,
241 pages: &mut FxHashMap<PageId, Page>,
242 alloc: &mut PageAllocator,
243 txn_id: TxnId,
244 key: &[u8],
245 val_type: ValueType,
246 value: &[u8],
247 path: Vec<(PageId, usize)>,
248 leaf_id: PageId,
249 ) -> Result<bool> {
250 let key_exists = {
251 let page = pages.get(&leaf_id).unwrap();
252 leaf_node::search(page, key).is_ok()
253 };
254
255 let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
256
257 let leaf_ok = {
258 let page = pages.get_mut(&new_leaf_id).unwrap();
259 leaf_node::insert_direct(page, key, val_type, value)
260 };
261
262 if leaf_ok {
263 if alloc.in_place() && new_leaf_id == leaf_id {
264 let mut is_rightmost = true;
265 for &(ancestor_id, child_idx) in path.iter().rev() {
266 let page = pages.get(&ancestor_id).unwrap();
267 if child_idx != page.num_cells() as usize {
268 is_rightmost = false;
269 break;
270 }
271 }
272 if is_rightmost {
273 self.last_insert = Some((path, new_leaf_id));
274 }
275 if !key_exists {
276 self.entry_count += 1;
277 }
278 return Ok(!key_exists);
279 }
280 let mut child = new_leaf_id;
281 let mut is_rightmost = true;
282 let mut new_path = path;
283 for i in (0..new_path.len()).rev() {
284 let (ancestor_id, child_idx) = new_path[i];
285 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
286 let page = pages.get_mut(&new_ancestor).unwrap();
287 update_branch_child(page, child_idx, child);
288 if child_idx != page.num_cells() as usize {
289 is_rightmost = false;
290 }
291 new_path[i] = (new_ancestor, child_idx);
292 child = new_ancestor;
293 }
294 self.root = child;
295
296 if is_rightmost {
297 self.last_insert = Some((new_path, new_leaf_id));
298 }
299
300 if !key_exists {
301 self.entry_count += 1;
302 }
303 return Ok(!key_exists);
304 }
305
306 self.last_insert = None;
307 let (sep_key, right_id) =
308 split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
309 self.root = propagate_split_up(
310 pages,
311 alloc,
312 txn_id,
313 &path,
314 new_leaf_id,
315 &sep_key,
316 right_id,
317 &mut self.depth,
318 );
319
320 if !key_exists {
321 self.entry_count += 1;
322 }
323 Ok(!key_exists)
324 }
325
326 pub fn insert_or_fetch(
327 &mut self,
328 pages: &mut FxHashMap<PageId, Page>,
329 alloc: &mut PageAllocator,
330 txn_id: TxnId,
331 key: &[u8],
332 val_type: ValueType,
333 value: &[u8],
334 ) -> Result<Option<Vec<u8>>> {
335 if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
336 let (hit, needs_cow) = {
337 let page = pages
338 .get(&cached_leaf)
339 .ok_or(Error::PageOutOfBounds(cached_leaf))?;
340 let n = page.num_cells();
341 let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
342 let nc = page.txn_id() != txn_id;
343 (h, nc)
344 };
345 if hit {
346 let cow_id = if needs_cow {
347 cow_page(pages, alloc, cached_leaf, txn_id)
348 } else {
349 cached_leaf
350 };
351 let ok = {
352 let page = pages.get_mut(&cow_id).unwrap();
353 leaf_node::insert_direct(page, key, val_type, value)
354 };
355 if ok {
356 if cow_id != cached_leaf {
357 self.root =
358 propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
359 }
360 self.entry_count += 1;
361 self.last_insert = Some((cached_path, cow_id));
362 return Ok(None);
363 }
364 let (sep_key, right_id) =
365 split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
366 self.root = propagate_split_up(
367 pages,
368 alloc,
369 txn_id,
370 &cached_path,
371 cow_id,
372 &sep_key,
373 right_id,
374 &mut self.depth,
375 );
376 self.last_insert = None;
377 self.entry_count += 1;
378 return Ok(None);
379 }
380 self.last_insert = Some((cached_path, cached_leaf));
381 }
382
383 let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
384
385 let existing_value = {
386 let page = pages.get(&leaf_id).unwrap();
387 match leaf_node::search(page, key) {
388 Ok(idx) => {
389 let cell = leaf_node::read_cell(page, idx);
390 if matches!(cell.val_type, ValueType::Tombstone) {
391 None
392 } else {
393 Some(cell.value.to_vec())
394 }
395 }
396 Err(_) => None,
397 }
398 };
399 if let Some(v) = existing_value {
400 return Ok(Some(v));
401 }
402
403 let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
404 let leaf_ok = {
405 let page = pages.get_mut(&new_leaf_id).unwrap();
406 leaf_node::insert_direct(page, key, val_type, value)
407 };
408
409 if leaf_ok {
410 if alloc.in_place() && new_leaf_id == leaf_id {
411 let mut is_rightmost = true;
412 for &(ancestor_id, child_idx) in path.iter().rev() {
413 let page = pages.get(&ancestor_id).unwrap();
414 if child_idx != page.num_cells() as usize {
415 is_rightmost = false;
416 break;
417 }
418 }
419 if is_rightmost {
420 self.last_insert = Some((path, new_leaf_id));
421 }
422 self.entry_count += 1;
423 return Ok(None);
424 }
425 let mut child = new_leaf_id;
426 let mut is_rightmost = true;
427 let mut new_path = path;
428 for i in (0..new_path.len()).rev() {
429 let (ancestor_id, child_idx) = new_path[i];
430 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
431 let page = pages.get_mut(&new_ancestor).unwrap();
432 update_branch_child(page, child_idx, child);
433 if child_idx != page.num_cells() as usize {
434 is_rightmost = false;
435 }
436 new_path[i] = (new_ancestor, child_idx);
437 child = new_ancestor;
438 }
439 self.root = child;
440
441 if is_rightmost {
442 self.last_insert = Some((new_path, new_leaf_id));
443 }
444 self.entry_count += 1;
445 return Ok(None);
446 }
447
448 self.last_insert = None;
449 let (sep_key, right_id) =
450 split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
451 self.root = propagate_split_up(
452 pages,
453 alloc,
454 txn_id,
455 &path,
456 new_leaf_id,
457 &sep_key,
458 right_id,
459 &mut self.depth,
460 );
461 self.entry_count += 1;
462 Ok(None)
463 }
464
465 #[inline]
466 pub fn insert_if_absent(
467 &mut self,
468 pages: &mut FxHashMap<PageId, Page>,
469 alloc: &mut PageAllocator,
470 txn_id: TxnId,
471 key: &[u8],
472 val_type: ValueType,
473 value: &[u8],
474 ) -> Result<bool> {
475 if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
476 let (hit, needs_cow) = {
477 let page = pages
478 .get(&cached_leaf)
479 .ok_or(Error::PageOutOfBounds(cached_leaf))?;
480 let n = page.num_cells();
481 let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
482 let nc = page.txn_id() != txn_id;
483 (h, nc)
484 };
485 if hit {
486 let cow_id = if needs_cow {
487 cow_page(pages, alloc, cached_leaf, txn_id)
488 } else {
489 cached_leaf
490 };
491 let ok = {
492 let page = pages.get_mut(&cow_id).unwrap();
493 leaf_node::insert_direct(page, key, val_type, value)
494 };
495 if ok {
496 if cow_id != cached_leaf {
497 self.root =
498 propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
499 }
500 self.entry_count += 1;
501 self.last_insert = Some((cached_path, cow_id));
502 return Ok(true);
503 }
504 let (sep_key, right_id) =
505 split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
506 self.root = propagate_split_up(
507 pages,
508 alloc,
509 txn_id,
510 &cached_path,
511 cow_id,
512 &sep_key,
513 right_id,
514 &mut self.depth,
515 );
516 self.last_insert = None;
517 self.entry_count += 1;
518 return Ok(true);
519 }
520 self.last_insert = Some((cached_path, cached_leaf));
521 }
522
523 let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
524 self.insert_if_absent_at_leaf(pages, alloc, txn_id, key, val_type, value, path, leaf_id)
525 }
526
527 #[allow(clippy::too_many_arguments)]
528 #[inline]
529 pub fn insert_if_absent_at_leaf(
530 &mut self,
531 pages: &mut FxHashMap<PageId, Page>,
532 alloc: &mut PageAllocator,
533 txn_id: TxnId,
534 key: &[u8],
535 val_type: ValueType,
536 value: &[u8],
537 path: Vec<(PageId, usize)>,
538 leaf_id: PageId,
539 ) -> Result<bool> {
540 let exists = {
541 let page = pages.get(&leaf_id).unwrap();
542 match leaf_node::search(page, key) {
543 Ok(idx) => {
544 let cell = leaf_node::read_cell(page, idx);
545 !matches!(cell.val_type, ValueType::Tombstone)
546 }
547 Err(_) => false,
548 }
549 };
550 if exists {
551 return Ok(false);
552 }
553
554 let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
555 let leaf_ok = {
556 let page = pages.get_mut(&new_leaf_id).unwrap();
557 leaf_node::insert_direct(page, key, val_type, value)
558 };
559
560 if leaf_ok {
561 if alloc.in_place() && new_leaf_id == leaf_id {
562 let mut is_rightmost = true;
563 for &(ancestor_id, child_idx) in path.iter().rev() {
564 let page = pages.get(&ancestor_id).unwrap();
565 if child_idx != page.num_cells() as usize {
566 is_rightmost = false;
567 break;
568 }
569 }
570 if is_rightmost {
571 self.last_insert = Some((path, new_leaf_id));
572 }
573 self.entry_count += 1;
574 return Ok(true);
575 }
576 let mut child = new_leaf_id;
577 let mut is_rightmost = true;
578 let mut new_path = path;
579 for i in (0..new_path.len()).rev() {
580 let (ancestor_id, child_idx) = new_path[i];
581 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
582 let page = pages.get_mut(&new_ancestor).unwrap();
583 update_branch_child(page, child_idx, child);
584 if child_idx != page.num_cells() as usize {
585 is_rightmost = false;
586 }
587 new_path[i] = (new_ancestor, child_idx);
588 child = new_ancestor;
589 }
590 self.root = child;
591
592 if is_rightmost {
593 self.last_insert = Some((new_path, new_leaf_id));
594 }
595 self.entry_count += 1;
596 return Ok(true);
597 }
598
599 self.last_insert = None;
600 let (sep_key, right_id) =
601 split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
602 self.root = propagate_split_up(
603 pages,
604 alloc,
605 txn_id,
606 &path,
607 new_leaf_id,
608 &sep_key,
609 right_id,
610 &mut self.depth,
611 );
612 self.entry_count += 1;
613 Ok(true)
614 }
615
616 #[allow(clippy::too_many_arguments)]
617 #[inline]
618 pub fn upsert_with<F, E>(
619 &mut self,
620 pages: &mut FxHashMap<PageId, Page>,
621 alloc: &mut PageAllocator,
622 txn_id: TxnId,
623 key: &[u8],
624 val_type: ValueType,
625 default_value: &[u8],
626 f: F,
627 ) -> std::result::Result<UpsertOutcome, E>
628 where
629 F: FnMut(&[u8]) -> std::result::Result<UpsertAction, E>,
630 E: From<Error>,
631 {
632 if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
633 let (hit, needs_cow) = {
634 let page = pages
635 .get(&cached_leaf)
636 .ok_or(Error::PageOutOfBounds(cached_leaf))?;
637 let n = page.num_cells();
638 let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
639 let nc = page.txn_id() != txn_id;
640 (h, nc)
641 };
642 if hit {
643 let cow_id = if needs_cow {
644 cow_page(pages, alloc, cached_leaf, txn_id)
645 } else {
646 cached_leaf
647 };
648 let ok = {
649 let page = pages.get_mut(&cow_id).unwrap();
650 leaf_node::insert_direct(page, key, val_type, default_value)
651 };
652 if ok {
653 if cow_id != cached_leaf {
654 self.root =
655 propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
656 }
657 self.entry_count += 1;
658 self.last_insert = Some((cached_path, cow_id));
659 return Ok(UpsertOutcome::Inserted);
660 }
661 let (sep_key, right_id) = split_leaf_with_insert(
662 pages,
663 alloc,
664 txn_id,
665 cow_id,
666 key,
667 val_type,
668 default_value,
669 );
670 self.root = propagate_split_up(
671 pages,
672 alloc,
673 txn_id,
674 &cached_path,
675 cow_id,
676 &sep_key,
677 right_id,
678 &mut self.depth,
679 );
680 self.last_insert = None;
681 self.entry_count += 1;
682 return Ok(UpsertOutcome::Inserted);
683 }
684 self.last_insert = Some((cached_path, cached_leaf));
685 }
686
687 let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
688 self.upsert_with_at_leaf(
689 pages,
690 alloc,
691 txn_id,
692 key,
693 val_type,
694 default_value,
695 path,
696 leaf_id,
697 f,
698 )
699 }
700
701 #[allow(clippy::too_many_arguments)]
702 #[inline]
703 pub fn upsert_with_at_leaf<F, E>(
704 &mut self,
705 pages: &mut FxHashMap<PageId, Page>,
706 alloc: &mut PageAllocator,
707 txn_id: TxnId,
708 key: &[u8],
709 val_type: ValueType,
710 default_value: &[u8],
711 path: Vec<(PageId, usize)>,
712 leaf_id: PageId,
713 mut f: F,
714 ) -> std::result::Result<UpsertOutcome, E>
715 where
716 F: FnMut(&[u8]) -> std::result::Result<UpsertAction, E>,
717 E: From<Error>,
718 {
719 let action = {
720 let page = pages.get(&leaf_id).unwrap();
721 match leaf_node::search(page, key) {
722 Ok(idx) => {
723 let cell = leaf_node::read_cell(page, idx);
724 if matches!(cell.val_type, ValueType::Tombstone) {
725 None
726 } else {
727 Some(f(cell.value)?)
728 }
729 }
730 Err(_) => None,
731 }
732 };
733
734 if let Some(act) = action {
735 match act {
736 UpsertAction::Skip => return Ok(UpsertOutcome::Skipped),
737 UpsertAction::Replace(new_bytes) => {
738 let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
739 let leaf_ok = {
740 let page = pages.get_mut(&new_leaf_id).unwrap();
741 leaf_node::insert_direct(page, key, val_type, &new_bytes)
742 };
743 if leaf_ok {
744 if new_leaf_id != leaf_id {
745 let mut new_path = path;
746 self.root =
747 propagate_cow_up(pages, alloc, txn_id, &mut new_path, new_leaf_id);
748 }
749 return Ok(UpsertOutcome::Updated);
750 }
751 self.last_insert = None;
752 let (sep_key, right_id) = split_leaf_with_insert(
753 pages,
754 alloc,
755 txn_id,
756 new_leaf_id,
757 key,
758 val_type,
759 &new_bytes,
760 );
761 self.root = propagate_split_up(
762 pages,
763 alloc,
764 txn_id,
765 &path,
766 new_leaf_id,
767 &sep_key,
768 right_id,
769 &mut self.depth,
770 );
771 return Ok(UpsertOutcome::Updated);
772 }
773 }
774 }
775
776 let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
777 let leaf_ok = {
778 let page = pages.get_mut(&new_leaf_id).unwrap();
779 leaf_node::insert_direct(page, key, val_type, default_value)
780 };
781
782 if leaf_ok {
783 if alloc.in_place() && new_leaf_id == leaf_id {
784 let mut is_rightmost = true;
785 for &(ancestor_id, child_idx) in path.iter().rev() {
786 let page = pages.get(&ancestor_id).unwrap();
787 if child_idx != page.num_cells() as usize {
788 is_rightmost = false;
789 break;
790 }
791 }
792 if is_rightmost {
793 self.last_insert = Some((path, new_leaf_id));
794 }
795 self.entry_count += 1;
796 return Ok(UpsertOutcome::Inserted);
797 }
798 let mut child = new_leaf_id;
799 let mut is_rightmost = true;
800 let mut new_path = path;
801 for i in (0..new_path.len()).rev() {
802 let (ancestor_id, child_idx) = new_path[i];
803 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
804 let page = pages.get_mut(&new_ancestor).unwrap();
805 update_branch_child(page, child_idx, child);
806 if child_idx != page.num_cells() as usize {
807 is_rightmost = false;
808 }
809 new_path[i] = (new_ancestor, child_idx);
810 child = new_ancestor;
811 }
812 self.root = child;
813
814 if is_rightmost {
815 self.last_insert = Some((new_path, new_leaf_id));
816 }
817 self.entry_count += 1;
818 return Ok(UpsertOutcome::Inserted);
819 }
820
821 self.last_insert = None;
822 let (sep_key, right_id) = split_leaf_with_insert(
823 pages,
824 alloc,
825 txn_id,
826 new_leaf_id,
827 key,
828 val_type,
829 default_value,
830 );
831 self.root = propagate_split_up(
832 pages,
833 alloc,
834 txn_id,
835 &path,
836 new_leaf_id,
837 &sep_key,
838 right_id,
839 &mut self.depth,
840 );
841 self.entry_count += 1;
842 Ok(UpsertOutcome::Inserted)
843 }
844
845 pub fn update_sorted(
847 &mut self,
848 pages: &mut FxHashMap<PageId, Page>,
849 alloc: &mut PageAllocator,
850 txn_id: TxnId,
851 pairs: &[(&[u8], &[u8])],
852 ) -> Result<u64> {
853 if pairs.is_empty() {
854 return Ok(0);
855 }
856 self.last_insert = None;
857
858 let (mut path, mut leaf_id) = self.walk_to_leaf(pages, pairs[0].0)?;
859 let mut cow_leaf = cow_page(pages, alloc, leaf_id, txn_id);
860 if cow_leaf != leaf_id {
861 self.root = propagate_cow_up(pages, alloc, txn_id, &mut path, cow_leaf);
862 }
863
864 let mut count: u64 = 0;
865 let mut hint: u16 = 0;
866
867 for &(key, value) in pairs {
868 let past_leaf = {
869 let page = pages.get(&cow_leaf).unwrap();
870 let n = page.num_cells();
871 n == 0 || key > leaf_node::read_cell(page, n - 1).key
872 };
873
874 if past_leaf {
875 let (new_path, new_leaf) = self.walk_to_leaf(pages, key)?;
876 path = new_path;
877 leaf_id = new_leaf;
878 cow_leaf = cow_page(pages, alloc, leaf_id, txn_id);
879 if cow_leaf != leaf_id {
880 self.root = propagate_cow_up(pages, alloc, txn_id, &mut path, cow_leaf);
881 }
882 hint = 0;
883 }
884
885 let page = pages.get(&cow_leaf).unwrap();
886 let n = page.num_cells();
887 let idx = {
888 let mut i = hint;
889 loop {
890 if i >= n {
891 break None;
892 }
893 let cell = leaf_node::read_cell(page, i);
894 match key.cmp(cell.key) {
895 std::cmp::Ordering::Equal => break Some(i),
896 std::cmp::Ordering::Less => break None,
897 std::cmp::Ordering::Greater => i += 1,
898 }
899 }
900 };
901
902 if let Some(idx) = idx {
903 hint = idx + 1;
904 let page = pages.get_mut(&cow_leaf).unwrap();
905 if !leaf_node::update_value_in_place(page, idx, ValueType::Inline, value) {
906 leaf_node::insert_direct(page, key, ValueType::Inline, value);
907 }
908 count += 1;
909 }
910 }
911
912 Ok(count)
913 }
914
915 pub fn delete(
917 &mut self,
918 pages: &mut FxHashMap<PageId, Page>,
919 alloc: &mut PageAllocator,
920 txn_id: TxnId,
921 key: &[u8],
922 ) -> Result<bool> {
923 let (mut path, leaf_id) = self.walk_to_leaf(pages, key)?;
924 self.delete_at_leaf(pages, alloc, txn_id, key, &mut path, leaf_id)
925 }
926
927 #[allow(clippy::ptr_arg)]
928 pub fn delete_at_leaf(
929 &mut self,
930 pages: &mut FxHashMap<PageId, Page>,
931 alloc: &mut PageAllocator,
932 txn_id: TxnId,
933 key: &[u8],
934 path: &mut Vec<(PageId, usize)>,
935 leaf_id: PageId,
936 ) -> Result<bool> {
937 self.last_insert = None;
938
939 let found = {
940 let page = pages.get(&leaf_id).unwrap();
941 leaf_node::search(page, key).is_ok()
942 };
943 if !found {
944 return Ok(false);
945 }
946
947 let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
948 {
949 let page = pages.get_mut(&new_leaf_id).unwrap();
950 leaf_node::delete(page, key);
951 }
952
953 let leaf_empty = pages.get(&new_leaf_id).unwrap().num_cells() == 0;
954
955 if !leaf_empty || path.is_empty() {
956 if alloc.in_place() && new_leaf_id == leaf_id {
957 self.entry_count -= 1;
958 return Ok(true);
959 }
960 self.root = propagate_cow_up(pages, alloc, txn_id, path, new_leaf_id);
961 self.entry_count -= 1;
962 return Ok(true);
963 }
964
965 alloc.free(new_leaf_id);
966 pages.remove(&new_leaf_id);
967
968 self.root = propagate_remove_up(pages, alloc, txn_id, path, &mut self.depth);
969 self.entry_count -= 1;
970 Ok(true)
971 }
972
973 pub fn walk_to_leaf(
975 &self,
976 pages: &FxHashMap<PageId, Page>,
977 key: &[u8],
978 ) -> Result<(Vec<(PageId, usize)>, PageId)> {
979 let mut path = Vec::with_capacity(self.depth as usize);
980 let mut current = self.root;
981 loop {
982 let page = pages.get(¤t).ok_or(Error::PageOutOfBounds(current))?;
983 match page.page_type() {
984 Some(PageType::Leaf) => return Ok((path, current)),
985 Some(PageType::Branch) => {
986 let child_idx = branch_node::search_child_index(page, key);
987 let child = branch_node::get_child(page, child_idx);
988 path.push((current, child_idx));
989 current = child;
990 }
991 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
992 }
993 }
994 }
995}
996
997pub fn cow_page(
999 pages: &mut FxHashMap<PageId, Page>,
1000 alloc: &mut PageAllocator,
1001 old_id: PageId,
1002 txn_id: TxnId,
1003) -> PageId {
1004 if alloc.in_place() {
1005 let page = pages.get_mut(&old_id).unwrap();
1006 if page.txn_id() != txn_id {
1007 page.set_txn_id(txn_id);
1008 }
1009 return old_id;
1010 }
1011 let mut new_page = {
1012 let page = pages.get(&old_id).unwrap();
1013 if page.txn_id() == txn_id {
1014 return old_id;
1015 }
1016 page.clone()
1017 };
1018 let new_id = alloc.allocate();
1019 new_page.set_page_id(new_id);
1020 new_page.set_txn_id(txn_id);
1021 pages.insert(new_id, new_page);
1022 alloc.free(old_id);
1023 new_id
1024}
1025
1026fn update_branch_child(page: &mut Page, child_idx: usize, new_child: PageId) {
1028 let n = page.num_cells() as usize;
1029 if child_idx < n {
1030 let offset = page.cell_offset(child_idx as u16) as usize;
1031 page.data[offset..offset + 4].copy_from_slice(&new_child.as_u32().to_le_bytes());
1032 } else {
1033 page.set_right_child(new_child);
1034 }
1035}
1036
1037pub fn propagate_cow_up(
1038 pages: &mut FxHashMap<PageId, Page>,
1039 alloc: &mut PageAllocator,
1040 txn_id: TxnId,
1041 path: &mut [(PageId, usize)],
1042 mut new_child: PageId,
1043) -> PageId {
1044 for i in (0..path.len()).rev() {
1045 let (ancestor_id, child_idx) = path[i];
1046 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
1047 let page = pages.get_mut(&new_ancestor).unwrap();
1048 update_branch_child(page, child_idx, new_child);
1049 path[i] = (new_ancestor, child_idx);
1050 new_child = new_ancestor;
1051 }
1052 new_child
1053}
1054
1055fn split_leaf_with_insert(
1057 pages: &mut FxHashMap<PageId, Page>,
1058 alloc: &mut PageAllocator,
1059 txn_id: TxnId,
1060 leaf_id: PageId,
1061 key: &[u8],
1062 val_type: ValueType,
1063 value: &[u8],
1064) -> (Vec<u8>, PageId) {
1065 let mut cells: Vec<(Vec<u8>, Vec<u8>)> = {
1066 let page = pages.get(&leaf_id).unwrap();
1067 let n = page.num_cells() as usize;
1068 (0..n)
1069 .map(|i| {
1070 let cell = leaf_node::read_cell(page, i as u16);
1071 let raw = leaf_node::read_cell_bytes(page, i as u16);
1072 (cell.key.to_vec(), raw)
1073 })
1074 .collect()
1075 };
1076
1077 let new_raw = leaf_node::build_cell(key, val_type, value);
1078 match cells.binary_search_by(|(k, _)| k.as_slice().cmp(key)) {
1079 Ok(idx) => cells[idx] = (key.to_vec(), new_raw),
1080 Err(idx) => cells.insert(idx, (key.to_vec(), new_raw)),
1081 }
1082
1083 let total = cells.len();
1084
1085 let usable = citadel_core::constants::USABLE_SIZE;
1086 let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
1087 cum.push(0);
1088 for (_, raw) in &cells {
1089 cum.push(cum.last().unwrap() + raw.len());
1090 }
1091 let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
1092 let right_fits = |sp: usize| (cum[total] - cum[sp]) + (total - sp) * 2 <= usable;
1093
1094 let mut split_point = total / 2;
1095 if !left_fits(split_point) || !right_fits(split_point) {
1096 split_point = 1;
1097 for sp in 1..total {
1098 if left_fits(sp) && right_fits(sp) {
1099 split_point = sp;
1100 if sp >= total / 2 {
1101 break;
1102 }
1103 }
1104 }
1105 }
1106
1107 let sep_key = cells[split_point].0.clone();
1108
1109 {
1110 let left_refs: Vec<&[u8]> = cells[..split_point]
1111 .iter()
1112 .map(|(_, raw)| raw.as_slice())
1113 .collect();
1114 let page = pages.get_mut(&leaf_id).unwrap();
1115 page.rebuild_cells(&left_refs);
1116 }
1117
1118 let right_id = alloc.allocate();
1119 {
1120 let mut right_page = Page::new(right_id, PageType::Leaf, txn_id);
1121 let right_refs: Vec<&[u8]> = cells[split_point..]
1122 .iter()
1123 .map(|(_, raw)| raw.as_slice())
1124 .collect();
1125 right_page.rebuild_cells(&right_refs);
1126 pages.insert(right_id, right_page);
1127 }
1128
1129 (sep_key, right_id)
1130}
1131
1132#[allow(clippy::too_many_arguments)]
1133fn propagate_split_up(
1134 pages: &mut FxHashMap<PageId, Page>,
1135 alloc: &mut PageAllocator,
1136 txn_id: TxnId,
1137 path: &[(PageId, usize)],
1138 mut left_child: PageId,
1139 initial_sep: &[u8],
1140 mut right_child: PageId,
1141 depth: &mut u16,
1142) -> PageId {
1143 let mut sep_key = initial_sep.to_vec();
1144 let mut pending_split = true;
1145
1146 for &(ancestor_id, child_idx) in path.iter().rev() {
1147 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
1148
1149 if pending_split {
1150 let ok = {
1151 let page = pages.get_mut(&new_ancestor).unwrap();
1152 branch_node::insert_separator(page, child_idx, left_child, &sep_key, right_child)
1153 };
1154
1155 if ok {
1156 pending_split = false;
1157 left_child = new_ancestor;
1158 } else {
1159 let (new_sep, new_right) = split_branch_with_insert(
1160 pages,
1161 alloc,
1162 txn_id,
1163 new_ancestor,
1164 child_idx,
1165 left_child,
1166 &sep_key,
1167 right_child,
1168 );
1169 left_child = new_ancestor;
1170 sep_key = new_sep;
1171 right_child = new_right;
1172 }
1173 } else {
1174 let page = pages.get_mut(&new_ancestor).unwrap();
1175 update_branch_child(page, child_idx, left_child);
1176 left_child = new_ancestor;
1177 }
1178 }
1179
1180 if pending_split {
1181 let new_root_id = alloc.allocate();
1182 let mut new_root = Page::new(new_root_id, PageType::Branch, txn_id);
1183 let cell = branch_node::build_cell(left_child, &sep_key);
1184 new_root.write_cell(&cell).unwrap();
1185 new_root.set_right_child(right_child);
1186 pages.insert(new_root_id, new_root);
1187 *depth += 1;
1188 new_root_id
1189 } else {
1190 left_child
1191 }
1192}
1193
1194#[allow(clippy::too_many_arguments)]
1195fn split_branch_with_insert(
1196 pages: &mut FxHashMap<PageId, Page>,
1197 alloc: &mut PageAllocator,
1198 txn_id: TxnId,
1199 branch_id: PageId,
1200 child_idx: usize,
1201 new_left: PageId,
1202 sep_key: &[u8],
1203 new_right: PageId,
1204) -> (Vec<u8>, PageId) {
1205 let (new_cells, final_right_child) = {
1206 let page = pages.get(&branch_id).unwrap();
1207 let n = page.num_cells() as usize;
1208 let cells: Vec<(PageId, Vec<u8>)> = (0..n)
1209 .map(|i| {
1210 let cell = branch_node::read_cell(page, i as u16);
1211 (cell.child, cell.key.to_vec())
1212 })
1213 .collect();
1214 let old_rc = page.right_child();
1215
1216 let mut result = Vec::with_capacity(n + 1);
1217 let final_rc;
1218
1219 if child_idx < n {
1220 let old_key = cells[child_idx].1.clone();
1221 for (i, (child, key)) in cells.into_iter().enumerate() {
1222 if i == child_idx {
1223 result.push((new_left, sep_key.to_vec()));
1224 result.push((new_right, old_key.clone()));
1225 } else {
1226 result.push((child, key));
1227 }
1228 }
1229 final_rc = old_rc;
1230 } else {
1231 result = cells;
1232 result.push((new_left, sep_key.to_vec()));
1233 final_rc = new_right;
1234 }
1235
1236 (result, final_rc)
1237 };
1238
1239 let total = new_cells.len();
1240 let usable = citadel_core::constants::USABLE_SIZE;
1241 let raw_sizes: Vec<usize> = new_cells.iter().map(|(_, key)| 6 + key.len()).collect();
1242 let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
1243 cum.push(0);
1244 for &sz in &raw_sizes {
1245 cum.push(cum.last().unwrap() + sz);
1246 }
1247 let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
1248 let right_fits = |sp: usize| {
1249 let right_count = total - sp - 1;
1250 (cum[total] - cum[sp + 1]) + right_count * 2 <= usable
1251 };
1252
1253 let mut split_point = total / 2;
1254 if !left_fits(split_point) || !right_fits(split_point) {
1255 split_point = 1;
1256 for sp in 1..total.saturating_sub(1) {
1257 if left_fits(sp) && right_fits(sp) {
1258 split_point = sp;
1259 if sp >= total / 2 {
1260 break;
1261 }
1262 }
1263 }
1264 }
1265
1266 let promoted_sep = new_cells[split_point].1.clone();
1267 let promoted_child = new_cells[split_point].0;
1268
1269 {
1270 let left_raw: Vec<Vec<u8>> = new_cells[..split_point]
1271 .iter()
1272 .map(|(child, key)| branch_node::build_cell(*child, key))
1273 .collect();
1274 let left_refs: Vec<&[u8]> = left_raw.iter().map(|c| c.as_slice()).collect();
1275 let page = pages.get_mut(&branch_id).unwrap();
1276 page.rebuild_cells(&left_refs);
1277 page.set_right_child(promoted_child);
1278 }
1279
1280 let right_branch_id = alloc.allocate();
1281 {
1282 let mut right_page = Page::new(right_branch_id, PageType::Branch, txn_id);
1283 let right_raw: Vec<Vec<u8>> = new_cells[split_point + 1..]
1284 .iter()
1285 .map(|(child, key)| branch_node::build_cell(*child, key))
1286 .collect();
1287 let right_refs: Vec<&[u8]> = right_raw.iter().map(|c| c.as_slice()).collect();
1288 right_page.rebuild_cells(&right_refs);
1289 right_page.set_right_child(final_right_child);
1290 pages.insert(right_branch_id, right_page);
1291 }
1292
1293 (promoted_sep, right_branch_id)
1294}
1295
1296fn remove_child_from_branch(page: &mut Page, child_idx: usize) {
1297 let n = page.num_cells() as usize;
1298 if child_idx < n {
1299 let cell_sz = branch_node::get_cell_size(page, child_idx as u16);
1300 page.delete_cell_at(child_idx as u16, cell_sz);
1301 } else {
1302 assert!(n > 0, "cannot remove right_child from branch with 0 cells");
1303 let last_child = branch_node::read_cell(page, (n - 1) as u16).child;
1304 let cell_sz = branch_node::get_cell_size(page, (n - 1) as u16);
1305 page.delete_cell_at((n - 1) as u16, cell_sz);
1306 page.set_right_child(last_child);
1307 }
1308}
1309
1310fn propagate_remove_up(
1311 pages: &mut FxHashMap<PageId, Page>,
1312 alloc: &mut PageAllocator,
1313 txn_id: TxnId,
1314 path: &mut [(PageId, usize)],
1315 depth: &mut u16,
1316) -> PageId {
1317 let mut level = path.len();
1318 let mut need_remove_at_level = true;
1319 let mut new_child = PageId(0);
1320
1321 while level > 0 && need_remove_at_level {
1322 level -= 1;
1323 let (ancestor_id, child_idx) = path[level];
1324 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
1325
1326 {
1327 let page = pages.get_mut(&new_ancestor).unwrap();
1328 remove_child_from_branch(page, child_idx);
1329 }
1330
1331 let num_cells = pages.get(&new_ancestor).unwrap().num_cells();
1332
1333 if num_cells > 0 || level == 0 {
1334 if num_cells == 0 && level == 0 {
1335 let only_child = pages.get(&new_ancestor).unwrap().right_child();
1336 alloc.free(new_ancestor);
1337 pages.remove(&new_ancestor);
1338 *depth -= 1;
1339 return only_child;
1340 }
1341 new_child = new_ancestor;
1342 need_remove_at_level = false;
1343 } else {
1344 let only_child = pages.get(&new_ancestor).unwrap().right_child();
1345 alloc.free(new_ancestor);
1346 pages.remove(&new_ancestor);
1347 *depth -= 1;
1348
1349 new_child = only_child;
1350 need_remove_at_level = false;
1351 }
1352 }
1353
1354 if level > 0 {
1355 let remaining_path = &mut path[..level];
1356 new_child = propagate_cow_up(pages, alloc, txn_id, remaining_path, new_child);
1357 }
1358
1359 new_child
1360}
1361
1362#[cfg(test)]
1363#[path = "btree_tests.rs"]
1364mod tests;