1use crate::allocator::PageAllocator;
4use citadel_core::types::{PageId, PageType, TxnId, ValueType};
5use citadel_core::{Error, Result};
6use citadel_page::page::Page;
7use citadel_page::{branch_node, leaf_node};
8use rustc_hash::FxHashMap;
9
10#[derive(Clone)]
12pub struct BTree {
13 pub root: PageId,
14 pub depth: u16,
15 pub entry_count: u64,
16 last_insert: Option<(Vec<(PageId, usize)>, PageId)>,
17}
18
19#[derive(Debug, Clone)]
20pub enum UpsertOutcome {
21 Inserted,
22 Updated,
23 Skipped,
24}
25
26#[derive(Debug, Clone)]
27pub enum UpsertAction {
28 Replace(Vec<u8>),
29 Skip,
30}
31
32impl BTree {
33 pub fn new(
35 pages: &mut FxHashMap<PageId, Page>,
36 alloc: &mut PageAllocator,
37 txn_id: TxnId,
38 ) -> Self {
39 let root_id = alloc.allocate();
40 let root = Page::new(root_id, PageType::Leaf, txn_id);
41 pages.insert(root_id, root);
42 Self {
43 root: root_id,
44 depth: 1,
45 entry_count: 0,
46 last_insert: None,
47 }
48 }
49
50 pub fn from_existing(root: PageId, depth: u16, entry_count: u64) -> Self {
52 Self {
53 root,
54 depth,
55 entry_count,
56 last_insert: None,
57 }
58 }
59
60 pub fn search(
62 &self,
63 pages: &FxHashMap<PageId, Page>,
64 key: &[u8],
65 ) -> Result<Option<(ValueType, Vec<u8>)>> {
66 let mut current = self.root;
67 loop {
68 let page = pages.get(¤t).ok_or(Error::PageOutOfBounds(current))?;
69 match page.page_type() {
70 Some(PageType::Leaf) => {
71 return match leaf_node::search(page, key) {
72 Ok(idx) => {
73 let cell = leaf_node::read_cell(page, idx);
74 Ok(Some((cell.val_type, cell.value.to_vec())))
75 }
76 Err(_) => Ok(None),
77 };
78 }
79 Some(PageType::Branch) => {
80 let idx = branch_node::search_child_index(page, key);
81 current = branch_node::get_child(page, idx);
82 }
83 _ => {
84 return Err(Error::InvalidPageType(page.page_type_raw(), current));
85 }
86 }
87 }
88 }
89
90 pub fn lil_would_hit(&self, pages: &FxHashMap<PageId, Page>, key: &[u8]) -> bool {
91 if let Some((_, cached_leaf)) = &self.last_insert {
92 if let Some(page) = pages.get(cached_leaf) {
93 let n = page.num_cells();
94 return n > 0 && key > leaf_node::read_cell(page, n - 1).key;
95 }
96 }
97 false
98 }
99
100 pub fn insert(
102 &mut self,
103 pages: &mut FxHashMap<PageId, Page>,
104 alloc: &mut PageAllocator,
105 txn_id: TxnId,
106 key: &[u8],
107 val_type: ValueType,
108 value: &[u8],
109 ) -> Result<bool> {
110 if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
112 let (hit, needs_cow) = {
113 let page = pages
114 .get(&cached_leaf)
115 .ok_or(Error::PageOutOfBounds(cached_leaf))?;
116 let n = page.num_cells();
117 let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
118 let nc = page.txn_id() != txn_id;
119 (h, nc)
120 };
121 if hit {
122 let cow_id = if needs_cow {
123 cow_page(pages, alloc, cached_leaf, txn_id)
124 } else {
125 cached_leaf
126 };
127 let ok = {
128 let page = pages.get_mut(&cow_id).unwrap();
129 leaf_node::insert_direct(page, key, val_type, value)
130 };
131 if ok {
132 if cow_id != cached_leaf {
133 self.root =
134 propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
135 }
136 self.entry_count += 1;
137 self.last_insert = Some((cached_path, cow_id));
138 return Ok(true);
139 }
140 let (sep_key, right_id) =
141 split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
142 self.root = propagate_split_up(
143 pages,
144 alloc,
145 txn_id,
146 &cached_path,
147 cow_id,
148 &sep_key,
149 right_id,
150 &mut self.depth,
151 );
152 self.last_insert = None;
153 self.entry_count += 1;
154 return Ok(true);
155 }
156 }
157
158 let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
159 self.insert_at_leaf(pages, alloc, txn_id, key, val_type, value, path, leaf_id)
160 }
161
162 #[allow(clippy::too_many_arguments)]
163 #[inline]
164 pub fn insert_at_leaf(
165 &mut self,
166 pages: &mut FxHashMap<PageId, Page>,
167 alloc: &mut PageAllocator,
168 txn_id: TxnId,
169 key: &[u8],
170 val_type: ValueType,
171 value: &[u8],
172 path: Vec<(PageId, usize)>,
173 leaf_id: PageId,
174 ) -> Result<bool> {
175 let key_exists = {
176 let page = pages.get(&leaf_id).unwrap();
177 leaf_node::search(page, key).is_ok()
178 };
179
180 let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
181
182 let leaf_ok = {
183 let page = pages.get_mut(&new_leaf_id).unwrap();
184 leaf_node::insert_direct(page, key, val_type, value)
185 };
186
187 if leaf_ok {
188 if alloc.in_place() && new_leaf_id == leaf_id {
189 let mut is_rightmost = true;
190 for &(ancestor_id, child_idx) in path.iter().rev() {
191 let page = pages.get(&ancestor_id).unwrap();
192 if child_idx != page.num_cells() as usize {
193 is_rightmost = false;
194 break;
195 }
196 }
197 if is_rightmost {
198 self.last_insert = Some((path, new_leaf_id));
199 }
200 if !key_exists {
201 self.entry_count += 1;
202 }
203 return Ok(!key_exists);
204 }
205 let mut child = new_leaf_id;
206 let mut is_rightmost = true;
207 let mut new_path = path;
208 for i in (0..new_path.len()).rev() {
209 let (ancestor_id, child_idx) = new_path[i];
210 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
211 let page = pages.get_mut(&new_ancestor).unwrap();
212 update_branch_child(page, child_idx, child);
213 if child_idx != page.num_cells() as usize {
214 is_rightmost = false;
215 }
216 new_path[i] = (new_ancestor, child_idx);
217 child = new_ancestor;
218 }
219 self.root = child;
220
221 if is_rightmost {
222 self.last_insert = Some((new_path, new_leaf_id));
223 }
224
225 if !key_exists {
226 self.entry_count += 1;
227 }
228 return Ok(!key_exists);
229 }
230
231 self.last_insert = None;
232 let (sep_key, right_id) =
233 split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
234 self.root = propagate_split_up(
235 pages,
236 alloc,
237 txn_id,
238 &path,
239 new_leaf_id,
240 &sep_key,
241 right_id,
242 &mut self.depth,
243 );
244
245 if !key_exists {
246 self.entry_count += 1;
247 }
248 Ok(!key_exists)
249 }
250
251 pub fn insert_or_fetch(
252 &mut self,
253 pages: &mut FxHashMap<PageId, Page>,
254 alloc: &mut PageAllocator,
255 txn_id: TxnId,
256 key: &[u8],
257 val_type: ValueType,
258 value: &[u8],
259 ) -> Result<Option<Vec<u8>>> {
260 if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
261 let (hit, needs_cow) = {
262 let page = pages
263 .get(&cached_leaf)
264 .ok_or(Error::PageOutOfBounds(cached_leaf))?;
265 let n = page.num_cells();
266 let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
267 let nc = page.txn_id() != txn_id;
268 (h, nc)
269 };
270 if hit {
271 let cow_id = if needs_cow {
272 cow_page(pages, alloc, cached_leaf, txn_id)
273 } else {
274 cached_leaf
275 };
276 let ok = {
277 let page = pages.get_mut(&cow_id).unwrap();
278 leaf_node::insert_direct(page, key, val_type, value)
279 };
280 if ok {
281 if cow_id != cached_leaf {
282 self.root =
283 propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
284 }
285 self.entry_count += 1;
286 self.last_insert = Some((cached_path, cow_id));
287 return Ok(None);
288 }
289 let (sep_key, right_id) =
290 split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
291 self.root = propagate_split_up(
292 pages,
293 alloc,
294 txn_id,
295 &cached_path,
296 cow_id,
297 &sep_key,
298 right_id,
299 &mut self.depth,
300 );
301 self.last_insert = None;
302 self.entry_count += 1;
303 return Ok(None);
304 }
305 self.last_insert = Some((cached_path, cached_leaf));
306 }
307
308 let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
309
310 let existing_value = {
311 let page = pages.get(&leaf_id).unwrap();
312 match leaf_node::search(page, key) {
313 Ok(idx) => {
314 let cell = leaf_node::read_cell(page, idx);
315 if matches!(cell.val_type, ValueType::Tombstone) {
316 None
317 } else {
318 Some(cell.value.to_vec())
319 }
320 }
321 Err(_) => None,
322 }
323 };
324 if let Some(v) = existing_value {
325 return Ok(Some(v));
326 }
327
328 let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
329 let leaf_ok = {
330 let page = pages.get_mut(&new_leaf_id).unwrap();
331 leaf_node::insert_direct(page, key, val_type, value)
332 };
333
334 if leaf_ok {
335 if alloc.in_place() && new_leaf_id == leaf_id {
336 let mut is_rightmost = true;
337 for &(ancestor_id, child_idx) in path.iter().rev() {
338 let page = pages.get(&ancestor_id).unwrap();
339 if child_idx != page.num_cells() as usize {
340 is_rightmost = false;
341 break;
342 }
343 }
344 if is_rightmost {
345 self.last_insert = Some((path, new_leaf_id));
346 }
347 self.entry_count += 1;
348 return Ok(None);
349 }
350 let mut child = new_leaf_id;
351 let mut is_rightmost = true;
352 let mut new_path = path;
353 for i in (0..new_path.len()).rev() {
354 let (ancestor_id, child_idx) = new_path[i];
355 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
356 let page = pages.get_mut(&new_ancestor).unwrap();
357 update_branch_child(page, child_idx, child);
358 if child_idx != page.num_cells() as usize {
359 is_rightmost = false;
360 }
361 new_path[i] = (new_ancestor, child_idx);
362 child = new_ancestor;
363 }
364 self.root = child;
365
366 if is_rightmost {
367 self.last_insert = Some((new_path, new_leaf_id));
368 }
369 self.entry_count += 1;
370 return Ok(None);
371 }
372
373 self.last_insert = None;
374 let (sep_key, right_id) =
375 split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
376 self.root = propagate_split_up(
377 pages,
378 alloc,
379 txn_id,
380 &path,
381 new_leaf_id,
382 &sep_key,
383 right_id,
384 &mut self.depth,
385 );
386 self.entry_count += 1;
387 Ok(None)
388 }
389
390 #[inline]
391 pub fn insert_if_absent(
392 &mut self,
393 pages: &mut FxHashMap<PageId, Page>,
394 alloc: &mut PageAllocator,
395 txn_id: TxnId,
396 key: &[u8],
397 val_type: ValueType,
398 value: &[u8],
399 ) -> Result<bool> {
400 if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
401 let (hit, needs_cow) = {
402 let page = pages
403 .get(&cached_leaf)
404 .ok_or(Error::PageOutOfBounds(cached_leaf))?;
405 let n = page.num_cells();
406 let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
407 let nc = page.txn_id() != txn_id;
408 (h, nc)
409 };
410 if hit {
411 let cow_id = if needs_cow {
412 cow_page(pages, alloc, cached_leaf, txn_id)
413 } else {
414 cached_leaf
415 };
416 let ok = {
417 let page = pages.get_mut(&cow_id).unwrap();
418 leaf_node::insert_direct(page, key, val_type, value)
419 };
420 if ok {
421 if cow_id != cached_leaf {
422 self.root =
423 propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
424 }
425 self.entry_count += 1;
426 self.last_insert = Some((cached_path, cow_id));
427 return Ok(true);
428 }
429 let (sep_key, right_id) =
430 split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
431 self.root = propagate_split_up(
432 pages,
433 alloc,
434 txn_id,
435 &cached_path,
436 cow_id,
437 &sep_key,
438 right_id,
439 &mut self.depth,
440 );
441 self.last_insert = None;
442 self.entry_count += 1;
443 return Ok(true);
444 }
445 self.last_insert = Some((cached_path, cached_leaf));
446 }
447
448 let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
449 self.insert_if_absent_at_leaf(pages, alloc, txn_id, key, val_type, value, path, leaf_id)
450 }
451
452 #[allow(clippy::too_many_arguments)]
453 #[inline]
454 pub fn insert_if_absent_at_leaf(
455 &mut self,
456 pages: &mut FxHashMap<PageId, Page>,
457 alloc: &mut PageAllocator,
458 txn_id: TxnId,
459 key: &[u8],
460 val_type: ValueType,
461 value: &[u8],
462 path: Vec<(PageId, usize)>,
463 leaf_id: PageId,
464 ) -> Result<bool> {
465 let exists = {
466 let page = pages.get(&leaf_id).unwrap();
467 match leaf_node::search(page, key) {
468 Ok(idx) => {
469 let cell = leaf_node::read_cell(page, idx);
470 !matches!(cell.val_type, ValueType::Tombstone)
471 }
472 Err(_) => false,
473 }
474 };
475 if exists {
476 return Ok(false);
477 }
478
479 let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
480 let leaf_ok = {
481 let page = pages.get_mut(&new_leaf_id).unwrap();
482 leaf_node::insert_direct(page, key, val_type, value)
483 };
484
485 if leaf_ok {
486 if alloc.in_place() && new_leaf_id == leaf_id {
487 let mut is_rightmost = true;
488 for &(ancestor_id, child_idx) in path.iter().rev() {
489 let page = pages.get(&ancestor_id).unwrap();
490 if child_idx != page.num_cells() as usize {
491 is_rightmost = false;
492 break;
493 }
494 }
495 if is_rightmost {
496 self.last_insert = Some((path, new_leaf_id));
497 }
498 self.entry_count += 1;
499 return Ok(true);
500 }
501 let mut child = new_leaf_id;
502 let mut is_rightmost = true;
503 let mut new_path = path;
504 for i in (0..new_path.len()).rev() {
505 let (ancestor_id, child_idx) = new_path[i];
506 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
507 let page = pages.get_mut(&new_ancestor).unwrap();
508 update_branch_child(page, child_idx, child);
509 if child_idx != page.num_cells() as usize {
510 is_rightmost = false;
511 }
512 new_path[i] = (new_ancestor, child_idx);
513 child = new_ancestor;
514 }
515 self.root = child;
516
517 if is_rightmost {
518 self.last_insert = Some((new_path, new_leaf_id));
519 }
520 self.entry_count += 1;
521 return Ok(true);
522 }
523
524 self.last_insert = None;
525 let (sep_key, right_id) =
526 split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
527 self.root = propagate_split_up(
528 pages,
529 alloc,
530 txn_id,
531 &path,
532 new_leaf_id,
533 &sep_key,
534 right_id,
535 &mut self.depth,
536 );
537 self.entry_count += 1;
538 Ok(true)
539 }
540
541 #[allow(clippy::too_many_arguments)]
542 #[inline]
543 pub fn upsert_with<F, E>(
544 &mut self,
545 pages: &mut FxHashMap<PageId, Page>,
546 alloc: &mut PageAllocator,
547 txn_id: TxnId,
548 key: &[u8],
549 val_type: ValueType,
550 default_value: &[u8],
551 f: F,
552 ) -> std::result::Result<UpsertOutcome, E>
553 where
554 F: FnMut(&[u8]) -> std::result::Result<UpsertAction, E>,
555 E: From<Error>,
556 {
557 if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
558 let (hit, needs_cow) = {
559 let page = pages
560 .get(&cached_leaf)
561 .ok_or(Error::PageOutOfBounds(cached_leaf))?;
562 let n = page.num_cells();
563 let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
564 let nc = page.txn_id() != txn_id;
565 (h, nc)
566 };
567 if hit {
568 let cow_id = if needs_cow {
569 cow_page(pages, alloc, cached_leaf, txn_id)
570 } else {
571 cached_leaf
572 };
573 let ok = {
574 let page = pages.get_mut(&cow_id).unwrap();
575 leaf_node::insert_direct(page, key, val_type, default_value)
576 };
577 if ok {
578 if cow_id != cached_leaf {
579 self.root =
580 propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
581 }
582 self.entry_count += 1;
583 self.last_insert = Some((cached_path, cow_id));
584 return Ok(UpsertOutcome::Inserted);
585 }
586 let (sep_key, right_id) = split_leaf_with_insert(
587 pages,
588 alloc,
589 txn_id,
590 cow_id,
591 key,
592 val_type,
593 default_value,
594 );
595 self.root = propagate_split_up(
596 pages,
597 alloc,
598 txn_id,
599 &cached_path,
600 cow_id,
601 &sep_key,
602 right_id,
603 &mut self.depth,
604 );
605 self.last_insert = None;
606 self.entry_count += 1;
607 return Ok(UpsertOutcome::Inserted);
608 }
609 self.last_insert = Some((cached_path, cached_leaf));
610 }
611
612 let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
613 self.upsert_with_at_leaf(
614 pages,
615 alloc,
616 txn_id,
617 key,
618 val_type,
619 default_value,
620 path,
621 leaf_id,
622 f,
623 )
624 }
625
626 #[allow(clippy::too_many_arguments)]
627 #[inline]
628 pub fn upsert_with_at_leaf<F, E>(
629 &mut self,
630 pages: &mut FxHashMap<PageId, Page>,
631 alloc: &mut PageAllocator,
632 txn_id: TxnId,
633 key: &[u8],
634 val_type: ValueType,
635 default_value: &[u8],
636 path: Vec<(PageId, usize)>,
637 leaf_id: PageId,
638 mut f: F,
639 ) -> std::result::Result<UpsertOutcome, E>
640 where
641 F: FnMut(&[u8]) -> std::result::Result<UpsertAction, E>,
642 E: From<Error>,
643 {
644 let action = {
645 let page = pages.get(&leaf_id).unwrap();
646 match leaf_node::search(page, key) {
647 Ok(idx) => {
648 let cell = leaf_node::read_cell(page, idx);
649 if matches!(cell.val_type, ValueType::Tombstone) {
650 None
651 } else {
652 Some(f(cell.value)?)
653 }
654 }
655 Err(_) => None,
656 }
657 };
658
659 if let Some(act) = action {
660 match act {
661 UpsertAction::Skip => return Ok(UpsertOutcome::Skipped),
662 UpsertAction::Replace(new_bytes) => {
663 let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
664 let leaf_ok = {
665 let page = pages.get_mut(&new_leaf_id).unwrap();
666 leaf_node::insert_direct(page, key, val_type, &new_bytes)
667 };
668 if leaf_ok {
669 if new_leaf_id != leaf_id {
670 let mut new_path = path;
671 self.root =
672 propagate_cow_up(pages, alloc, txn_id, &mut new_path, new_leaf_id);
673 }
674 return Ok(UpsertOutcome::Updated);
675 }
676 self.last_insert = None;
677 let (sep_key, right_id) = split_leaf_with_insert(
678 pages,
679 alloc,
680 txn_id,
681 new_leaf_id,
682 key,
683 val_type,
684 &new_bytes,
685 );
686 self.root = propagate_split_up(
687 pages,
688 alloc,
689 txn_id,
690 &path,
691 new_leaf_id,
692 &sep_key,
693 right_id,
694 &mut self.depth,
695 );
696 return Ok(UpsertOutcome::Updated);
697 }
698 }
699 }
700
701 let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
702 let leaf_ok = {
703 let page = pages.get_mut(&new_leaf_id).unwrap();
704 leaf_node::insert_direct(page, key, val_type, default_value)
705 };
706
707 if leaf_ok {
708 if alloc.in_place() && new_leaf_id == leaf_id {
709 let mut is_rightmost = true;
710 for &(ancestor_id, child_idx) in path.iter().rev() {
711 let page = pages.get(&ancestor_id).unwrap();
712 if child_idx != page.num_cells() as usize {
713 is_rightmost = false;
714 break;
715 }
716 }
717 if is_rightmost {
718 self.last_insert = Some((path, new_leaf_id));
719 }
720 self.entry_count += 1;
721 return Ok(UpsertOutcome::Inserted);
722 }
723 let mut child = new_leaf_id;
724 let mut is_rightmost = true;
725 let mut new_path = path;
726 for i in (0..new_path.len()).rev() {
727 let (ancestor_id, child_idx) = new_path[i];
728 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
729 let page = pages.get_mut(&new_ancestor).unwrap();
730 update_branch_child(page, child_idx, child);
731 if child_idx != page.num_cells() as usize {
732 is_rightmost = false;
733 }
734 new_path[i] = (new_ancestor, child_idx);
735 child = new_ancestor;
736 }
737 self.root = child;
738
739 if is_rightmost {
740 self.last_insert = Some((new_path, new_leaf_id));
741 }
742 self.entry_count += 1;
743 return Ok(UpsertOutcome::Inserted);
744 }
745
746 self.last_insert = None;
747 let (sep_key, right_id) = split_leaf_with_insert(
748 pages,
749 alloc,
750 txn_id,
751 new_leaf_id,
752 key,
753 val_type,
754 default_value,
755 );
756 self.root = propagate_split_up(
757 pages,
758 alloc,
759 txn_id,
760 &path,
761 new_leaf_id,
762 &sep_key,
763 right_id,
764 &mut self.depth,
765 );
766 self.entry_count += 1;
767 Ok(UpsertOutcome::Inserted)
768 }
769
770 pub fn update_sorted(
772 &mut self,
773 pages: &mut FxHashMap<PageId, Page>,
774 alloc: &mut PageAllocator,
775 txn_id: TxnId,
776 pairs: &[(&[u8], &[u8])],
777 ) -> Result<u64> {
778 if pairs.is_empty() {
779 return Ok(0);
780 }
781 self.last_insert = None;
782
783 let (mut path, mut leaf_id) = self.walk_to_leaf(pages, pairs[0].0)?;
784 let mut cow_leaf = cow_page(pages, alloc, leaf_id, txn_id);
785 if cow_leaf != leaf_id {
786 self.root = propagate_cow_up(pages, alloc, txn_id, &mut path, cow_leaf);
787 }
788
789 let mut count: u64 = 0;
790 let mut hint: u16 = 0;
791
792 for &(key, value) in pairs {
793 let past_leaf = {
794 let page = pages.get(&cow_leaf).unwrap();
795 let n = page.num_cells();
796 n == 0 || key > leaf_node::read_cell(page, n - 1).key
797 };
798
799 if past_leaf {
800 let (new_path, new_leaf) = self.walk_to_leaf(pages, key)?;
801 path = new_path;
802 leaf_id = new_leaf;
803 cow_leaf = cow_page(pages, alloc, leaf_id, txn_id);
804 if cow_leaf != leaf_id {
805 self.root = propagate_cow_up(pages, alloc, txn_id, &mut path, cow_leaf);
806 }
807 hint = 0;
808 }
809
810 let page = pages.get(&cow_leaf).unwrap();
811 let n = page.num_cells();
812 let idx = {
813 let mut i = hint;
814 loop {
815 if i >= n {
816 break None;
817 }
818 let cell = leaf_node::read_cell(page, i);
819 match key.cmp(cell.key) {
820 std::cmp::Ordering::Equal => break Some(i),
821 std::cmp::Ordering::Less => break None,
822 std::cmp::Ordering::Greater => i += 1,
823 }
824 }
825 };
826
827 if let Some(idx) = idx {
828 hint = idx + 1;
829 let page = pages.get_mut(&cow_leaf).unwrap();
830 if !leaf_node::update_value_in_place(page, idx, ValueType::Inline, value) {
831 leaf_node::insert_direct(page, key, ValueType::Inline, value);
832 }
833 count += 1;
834 }
835 }
836
837 Ok(count)
838 }
839
840 pub fn delete(
842 &mut self,
843 pages: &mut FxHashMap<PageId, Page>,
844 alloc: &mut PageAllocator,
845 txn_id: TxnId,
846 key: &[u8],
847 ) -> Result<bool> {
848 self.last_insert = None;
849 let (mut path, leaf_id) = self.walk_to_leaf(pages, key)?;
850
851 let found = {
852 let page = pages.get(&leaf_id).unwrap();
853 leaf_node::search(page, key).is_ok()
854 };
855 if !found {
856 return Ok(false);
857 }
858
859 let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
860 {
861 let page = pages.get_mut(&new_leaf_id).unwrap();
862 leaf_node::delete(page, key);
863 }
864
865 let leaf_empty = pages.get(&new_leaf_id).unwrap().num_cells() == 0;
866
867 if !leaf_empty || path.is_empty() {
868 if alloc.in_place() && new_leaf_id == leaf_id {
869 self.entry_count -= 1;
870 return Ok(true);
871 }
872 self.root = propagate_cow_up(pages, alloc, txn_id, &mut path, new_leaf_id);
873 self.entry_count -= 1;
874 return Ok(true);
875 }
876
877 alloc.free(new_leaf_id);
878 pages.remove(&new_leaf_id);
879
880 self.root = propagate_remove_up(pages, alloc, txn_id, &mut path, &mut self.depth);
881 self.entry_count -= 1;
882 Ok(true)
883 }
884
885 pub fn walk_to_leaf(
887 &self,
888 pages: &FxHashMap<PageId, Page>,
889 key: &[u8],
890 ) -> Result<(Vec<(PageId, usize)>, PageId)> {
891 let mut path = Vec::with_capacity(self.depth as usize);
892 let mut current = self.root;
893 loop {
894 let page = pages.get(¤t).ok_or(Error::PageOutOfBounds(current))?;
895 match page.page_type() {
896 Some(PageType::Leaf) => return Ok((path, current)),
897 Some(PageType::Branch) => {
898 let child_idx = branch_node::search_child_index(page, key);
899 let child = branch_node::get_child(page, child_idx);
900 path.push((current, child_idx));
901 current = child;
902 }
903 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
904 }
905 }
906 }
907}
908
909pub fn cow_page(
911 pages: &mut FxHashMap<PageId, Page>,
912 alloc: &mut PageAllocator,
913 old_id: PageId,
914 txn_id: TxnId,
915) -> PageId {
916 if alloc.in_place() {
917 let page = pages.get_mut(&old_id).unwrap();
918 if page.txn_id() != txn_id {
919 page.set_txn_id(txn_id);
920 }
921 return old_id;
922 }
923 let mut new_page = {
924 let page = pages.get(&old_id).unwrap();
925 if page.txn_id() == txn_id {
926 return old_id;
927 }
928 page.clone()
929 };
930 let new_id = alloc.allocate();
931 new_page.set_page_id(new_id);
932 new_page.set_txn_id(txn_id);
933 pages.insert(new_id, new_page);
934 alloc.free(old_id);
935 new_id
936}
937
938fn update_branch_child(page: &mut Page, child_idx: usize, new_child: PageId) {
940 let n = page.num_cells() as usize;
941 if child_idx < n {
942 let offset = page.cell_offset(child_idx as u16) as usize;
943 page.data[offset..offset + 4].copy_from_slice(&new_child.as_u32().to_le_bytes());
944 } else {
945 page.set_right_child(new_child);
946 }
947}
948
949pub fn propagate_cow_up(
953 pages: &mut FxHashMap<PageId, Page>,
954 alloc: &mut PageAllocator,
955 txn_id: TxnId,
956 path: &mut [(PageId, usize)],
957 mut new_child: PageId,
958) -> PageId {
959 for i in (0..path.len()).rev() {
960 let (ancestor_id, child_idx) = path[i];
961 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
962 let page = pages.get_mut(&new_ancestor).unwrap();
963 update_branch_child(page, child_idx, new_child);
964 path[i] = (new_ancestor, child_idx);
965 new_child = new_ancestor;
966 }
967 new_child
968}
969
970fn split_leaf_with_insert(
972 pages: &mut FxHashMap<PageId, Page>,
973 alloc: &mut PageAllocator,
974 txn_id: TxnId,
975 leaf_id: PageId,
976 key: &[u8],
977 val_type: ValueType,
978 value: &[u8],
979) -> (Vec<u8>, PageId) {
980 let mut cells: Vec<(Vec<u8>, Vec<u8>)> = {
981 let page = pages.get(&leaf_id).unwrap();
982 let n = page.num_cells() as usize;
983 (0..n)
984 .map(|i| {
985 let cell = leaf_node::read_cell(page, i as u16);
986 let raw = leaf_node::read_cell_bytes(page, i as u16);
987 (cell.key.to_vec(), raw)
988 })
989 .collect()
990 };
991
992 let new_raw = leaf_node::build_cell(key, val_type, value);
993 match cells.binary_search_by(|(k, _)| k.as_slice().cmp(key)) {
994 Ok(idx) => cells[idx] = (key.to_vec(), new_raw),
995 Err(idx) => cells.insert(idx, (key.to_vec(), new_raw)),
996 }
997
998 let total = cells.len();
999
1000 let usable = citadel_core::constants::USABLE_SIZE;
1001 let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
1002 cum.push(0);
1003 for (_, raw) in &cells {
1004 cum.push(cum.last().unwrap() + raw.len());
1005 }
1006 let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
1007 let right_fits = |sp: usize| (cum[total] - cum[sp]) + (total - sp) * 2 <= usable;
1008
1009 let mut split_point = total / 2;
1010 if !left_fits(split_point) || !right_fits(split_point) {
1011 split_point = 1;
1012 for sp in 1..total {
1013 if left_fits(sp) && right_fits(sp) {
1014 split_point = sp;
1015 if sp >= total / 2 {
1016 break;
1017 }
1018 }
1019 }
1020 }
1021
1022 let sep_key = cells[split_point].0.clone();
1023
1024 {
1025 let left_refs: Vec<&[u8]> = cells[..split_point]
1026 .iter()
1027 .map(|(_, raw)| raw.as_slice())
1028 .collect();
1029 let page = pages.get_mut(&leaf_id).unwrap();
1030 page.rebuild_cells(&left_refs);
1031 }
1032
1033 let right_id = alloc.allocate();
1034 {
1035 let mut right_page = Page::new(right_id, PageType::Leaf, txn_id);
1036 let right_refs: Vec<&[u8]> = cells[split_point..]
1037 .iter()
1038 .map(|(_, raw)| raw.as_slice())
1039 .collect();
1040 right_page.rebuild_cells(&right_refs);
1041 pages.insert(right_id, right_page);
1042 }
1043
1044 (sep_key, right_id)
1045}
1046
1047#[allow(clippy::too_many_arguments)]
1048fn propagate_split_up(
1049 pages: &mut FxHashMap<PageId, Page>,
1050 alloc: &mut PageAllocator,
1051 txn_id: TxnId,
1052 path: &[(PageId, usize)],
1053 mut left_child: PageId,
1054 initial_sep: &[u8],
1055 mut right_child: PageId,
1056 depth: &mut u16,
1057) -> PageId {
1058 let mut sep_key = initial_sep.to_vec();
1059 let mut pending_split = true;
1060
1061 for &(ancestor_id, child_idx) in path.iter().rev() {
1062 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
1063
1064 if pending_split {
1065 let ok = {
1066 let page = pages.get_mut(&new_ancestor).unwrap();
1067 branch_node::insert_separator(page, child_idx, left_child, &sep_key, right_child)
1068 };
1069
1070 if ok {
1071 pending_split = false;
1072 left_child = new_ancestor;
1073 } else {
1074 let (new_sep, new_right) = split_branch_with_insert(
1075 pages,
1076 alloc,
1077 txn_id,
1078 new_ancestor,
1079 child_idx,
1080 left_child,
1081 &sep_key,
1082 right_child,
1083 );
1084 left_child = new_ancestor;
1085 sep_key = new_sep;
1086 right_child = new_right;
1087 }
1088 } else {
1089 let page = pages.get_mut(&new_ancestor).unwrap();
1090 update_branch_child(page, child_idx, left_child);
1091 left_child = new_ancestor;
1092 }
1093 }
1094
1095 if pending_split {
1096 let new_root_id = alloc.allocate();
1097 let mut new_root = Page::new(new_root_id, PageType::Branch, txn_id);
1098 let cell = branch_node::build_cell(left_child, &sep_key);
1099 new_root.write_cell(&cell).unwrap();
1100 new_root.set_right_child(right_child);
1101 pages.insert(new_root_id, new_root);
1102 *depth += 1;
1103 new_root_id
1104 } else {
1105 left_child
1106 }
1107}
1108
1109#[allow(clippy::too_many_arguments)]
1110fn split_branch_with_insert(
1111 pages: &mut FxHashMap<PageId, Page>,
1112 alloc: &mut PageAllocator,
1113 txn_id: TxnId,
1114 branch_id: PageId,
1115 child_idx: usize,
1116 new_left: PageId,
1117 sep_key: &[u8],
1118 new_right: PageId,
1119) -> (Vec<u8>, PageId) {
1120 let (new_cells, final_right_child) = {
1121 let page = pages.get(&branch_id).unwrap();
1122 let n = page.num_cells() as usize;
1123 let cells: Vec<(PageId, Vec<u8>)> = (0..n)
1124 .map(|i| {
1125 let cell = branch_node::read_cell(page, i as u16);
1126 (cell.child, cell.key.to_vec())
1127 })
1128 .collect();
1129 let old_rc = page.right_child();
1130
1131 let mut result = Vec::with_capacity(n + 1);
1132 let final_rc;
1133
1134 if child_idx < n {
1135 let old_key = cells[child_idx].1.clone();
1136 for (i, (child, key)) in cells.into_iter().enumerate() {
1137 if i == child_idx {
1138 result.push((new_left, sep_key.to_vec()));
1139 result.push((new_right, old_key.clone()));
1140 } else {
1141 result.push((child, key));
1142 }
1143 }
1144 final_rc = old_rc;
1145 } else {
1146 result = cells;
1147 result.push((new_left, sep_key.to_vec()));
1148 final_rc = new_right;
1149 }
1150
1151 (result, final_rc)
1152 };
1153
1154 let total = new_cells.len();
1155 let usable = citadel_core::constants::USABLE_SIZE;
1156 let raw_sizes: Vec<usize> = new_cells.iter().map(|(_, key)| 6 + key.len()).collect();
1157 let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
1158 cum.push(0);
1159 for &sz in &raw_sizes {
1160 cum.push(cum.last().unwrap() + sz);
1161 }
1162 let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
1163 let right_fits = |sp: usize| {
1164 let right_count = total - sp - 1;
1165 (cum[total] - cum[sp + 1]) + right_count * 2 <= usable
1166 };
1167
1168 let mut split_point = total / 2;
1169 if !left_fits(split_point) || !right_fits(split_point) {
1170 split_point = 1;
1171 for sp in 1..total.saturating_sub(1) {
1172 if left_fits(sp) && right_fits(sp) {
1173 split_point = sp;
1174 if sp >= total / 2 {
1175 break;
1176 }
1177 }
1178 }
1179 }
1180
1181 let promoted_sep = new_cells[split_point].1.clone();
1182 let promoted_child = new_cells[split_point].0;
1183
1184 {
1185 let left_raw: Vec<Vec<u8>> = new_cells[..split_point]
1186 .iter()
1187 .map(|(child, key)| branch_node::build_cell(*child, key))
1188 .collect();
1189 let left_refs: Vec<&[u8]> = left_raw.iter().map(|c| c.as_slice()).collect();
1190 let page = pages.get_mut(&branch_id).unwrap();
1191 page.rebuild_cells(&left_refs);
1192 page.set_right_child(promoted_child);
1193 }
1194
1195 let right_branch_id = alloc.allocate();
1196 {
1197 let mut right_page = Page::new(right_branch_id, PageType::Branch, txn_id);
1198 let right_raw: Vec<Vec<u8>> = new_cells[split_point + 1..]
1199 .iter()
1200 .map(|(child, key)| branch_node::build_cell(*child, key))
1201 .collect();
1202 let right_refs: Vec<&[u8]> = right_raw.iter().map(|c| c.as_slice()).collect();
1203 right_page.rebuild_cells(&right_refs);
1204 right_page.set_right_child(final_right_child);
1205 pages.insert(right_branch_id, right_page);
1206 }
1207
1208 (promoted_sep, right_branch_id)
1209}
1210
1211fn remove_child_from_branch(page: &mut Page, child_idx: usize) {
1212 let n = page.num_cells() as usize;
1213 if child_idx < n {
1214 let cell_sz = branch_node::get_cell_size(page, child_idx as u16);
1215 page.delete_cell_at(child_idx as u16, cell_sz);
1216 } else {
1217 assert!(n > 0, "cannot remove right_child from branch with 0 cells");
1218 let last_child = branch_node::read_cell(page, (n - 1) as u16).child;
1219 let cell_sz = branch_node::get_cell_size(page, (n - 1) as u16);
1220 page.delete_cell_at((n - 1) as u16, cell_sz);
1221 page.set_right_child(last_child);
1222 }
1223}
1224
1225fn propagate_remove_up(
1226 pages: &mut FxHashMap<PageId, Page>,
1227 alloc: &mut PageAllocator,
1228 txn_id: TxnId,
1229 path: &mut [(PageId, usize)],
1230 depth: &mut u16,
1231) -> PageId {
1232 let mut level = path.len();
1233 let mut need_remove_at_level = true;
1234 let mut new_child = PageId(0);
1235
1236 while level > 0 && need_remove_at_level {
1237 level -= 1;
1238 let (ancestor_id, child_idx) = path[level];
1239 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
1240
1241 {
1242 let page = pages.get_mut(&new_ancestor).unwrap();
1243 remove_child_from_branch(page, child_idx);
1244 }
1245
1246 let num_cells = pages.get(&new_ancestor).unwrap().num_cells();
1247
1248 if num_cells > 0 || level == 0 {
1249 if num_cells == 0 && level == 0 {
1250 let only_child = pages.get(&new_ancestor).unwrap().right_child();
1252 alloc.free(new_ancestor);
1253 pages.remove(&new_ancestor);
1254 *depth -= 1;
1255 return only_child;
1256 }
1257 new_child = new_ancestor;
1259 need_remove_at_level = false;
1260 } else {
1261 let only_child = pages.get(&new_ancestor).unwrap().right_child();
1263 alloc.free(new_ancestor);
1264 pages.remove(&new_ancestor);
1265 *depth -= 1;
1266
1267 new_child = only_child;
1268 need_remove_at_level = false;
1269 }
1270 }
1271
1272 if level > 0 {
1273 let remaining_path = &mut path[..level];
1274 new_child = propagate_cow_up(pages, alloc, txn_id, remaining_path, new_child);
1275 }
1276
1277 new_child
1278}
1279
1280#[cfg(test)]
1281mod tests {
1282 use super::*;
1283
1284 fn new_tree() -> (FxHashMap<PageId, Page>, PageAllocator, BTree) {
1285 let mut pages = FxHashMap::default();
1286 let mut alloc = PageAllocator::new(0);
1287 let tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
1288 (pages, alloc, tree)
1289 }
1290
1291 #[test]
1292 fn empty_tree_search() {
1293 let (pages, _, tree) = new_tree();
1294 assert_eq!(tree.search(&pages, b"anything").unwrap(), None);
1295 }
1296
1297 #[test]
1298 fn insert_and_search_single() {
1299 let (mut pages, mut alloc, mut tree) = new_tree();
1300 let is_new = tree
1301 .insert(
1302 &mut pages,
1303 &mut alloc,
1304 TxnId(1),
1305 b"hello",
1306 ValueType::Inline,
1307 b"world",
1308 )
1309 .unwrap();
1310 assert!(is_new);
1311 assert_eq!(tree.entry_count, 1);
1312
1313 let result = tree.search(&pages, b"hello").unwrap();
1314 assert_eq!(result, Some((ValueType::Inline, b"world".to_vec())));
1315 }
1316
1317 #[test]
1318 fn insert_update_existing() {
1319 let (mut pages, mut alloc, mut tree) = new_tree();
1320 tree.insert(
1321 &mut pages,
1322 &mut alloc,
1323 TxnId(1),
1324 b"key",
1325 ValueType::Inline,
1326 b"v1",
1327 )
1328 .unwrap();
1329 let is_new = tree
1330 .insert(
1331 &mut pages,
1332 &mut alloc,
1333 TxnId(1),
1334 b"key",
1335 ValueType::Inline,
1336 b"v2",
1337 )
1338 .unwrap();
1339 assert!(!is_new);
1340 assert_eq!(tree.entry_count, 1);
1341
1342 let result = tree.search(&pages, b"key").unwrap();
1343 assert_eq!(result, Some((ValueType::Inline, b"v2".to_vec())));
1344 }
1345
1346 #[test]
1347 fn insert_multiple_sorted() {
1348 let (mut pages, mut alloc, mut tree) = new_tree();
1349 let keys = [b"dog", b"ant", b"cat", b"fox", b"bat", b"eel"];
1350 for k in &keys {
1351 tree.insert(&mut pages, &mut alloc, TxnId(1), *k, ValueType::Inline, *k)
1352 .unwrap();
1353 }
1354 assert_eq!(tree.entry_count, 6);
1355
1356 for k in &keys {
1357 let result = tree.search(&pages, *k).unwrap();
1358 assert_eq!(result, Some((ValueType::Inline, k.to_vec())));
1359 }
1360
1361 assert_eq!(tree.search(&pages, b"zebra").unwrap(), None);
1362 }
1363
1364 #[test]
1365 fn insert_triggers_leaf_split() {
1366 let (mut pages, mut alloc, mut tree) = new_tree();
1367
1368 let count = 500;
1370 for i in 0..count {
1371 let key = format!("key-{i:05}");
1372 let val = format!("val-{i:05}");
1373 tree.insert(
1374 &mut pages,
1375 &mut alloc,
1376 TxnId(1),
1377 key.as_bytes(),
1378 ValueType::Inline,
1379 val.as_bytes(),
1380 )
1381 .unwrap();
1382 }
1383
1384 assert_eq!(tree.entry_count, count);
1385 assert!(
1386 tree.depth >= 2,
1387 "tree should have split (depth={})",
1388 tree.depth
1389 );
1390
1391 for i in 0..count {
1392 let key = format!("key-{i:05}");
1393 let val = format!("val-{i:05}");
1394 let result = tree.search(&pages, key.as_bytes()).unwrap();
1395 assert_eq!(result, Some((ValueType::Inline, val.into_bytes())));
1396 }
1397 }
1398
1399 #[test]
1400 fn delete_existing_key() {
1401 let (mut pages, mut alloc, mut tree) = new_tree();
1402 tree.insert(
1403 &mut pages,
1404 &mut alloc,
1405 TxnId(1),
1406 b"a",
1407 ValueType::Inline,
1408 b"1",
1409 )
1410 .unwrap();
1411 tree.insert(
1412 &mut pages,
1413 &mut alloc,
1414 TxnId(1),
1415 b"b",
1416 ValueType::Inline,
1417 b"2",
1418 )
1419 .unwrap();
1420 tree.insert(
1421 &mut pages,
1422 &mut alloc,
1423 TxnId(1),
1424 b"c",
1425 ValueType::Inline,
1426 b"3",
1427 )
1428 .unwrap();
1429
1430 let found = tree.delete(&mut pages, &mut alloc, TxnId(1), b"b").unwrap();
1431 assert!(found);
1432 assert_eq!(tree.entry_count, 2);
1433 assert_eq!(tree.search(&pages, b"b").unwrap(), None);
1434 assert_eq!(
1435 tree.search(&pages, b"a").unwrap(),
1436 Some((ValueType::Inline, b"1".to_vec()))
1437 );
1438 assert_eq!(
1439 tree.search(&pages, b"c").unwrap(),
1440 Some((ValueType::Inline, b"3".to_vec()))
1441 );
1442 }
1443
1444 #[test]
1445 fn delete_nonexistent_key() {
1446 let (mut pages, mut alloc, mut tree) = new_tree();
1447 tree.insert(
1448 &mut pages,
1449 &mut alloc,
1450 TxnId(1),
1451 b"a",
1452 ValueType::Inline,
1453 b"1",
1454 )
1455 .unwrap();
1456 let found = tree.delete(&mut pages, &mut alloc, TxnId(1), b"z").unwrap();
1457 assert!(!found);
1458 assert_eq!(tree.entry_count, 1);
1459 }
1460
1461 #[test]
1462 fn delete_all_from_root_leaf() {
1463 let (mut pages, mut alloc, mut tree) = new_tree();
1464 tree.insert(
1465 &mut pages,
1466 &mut alloc,
1467 TxnId(1),
1468 b"x",
1469 ValueType::Inline,
1470 b"1",
1471 )
1472 .unwrap();
1473 tree.delete(&mut pages, &mut alloc, TxnId(1), b"x").unwrap();
1474 assert_eq!(tree.entry_count, 0);
1475
1476 let root = pages.get(&tree.root).unwrap();
1477 assert_eq!(root.page_type(), Some(PageType::Leaf));
1478 assert_eq!(root.num_cells(), 0);
1479 }
1480
1481 #[test]
1482 fn cow_produces_new_page_ids() {
1483 let (mut pages, mut alloc, mut tree) = new_tree();
1484 let root_before = tree.root;
1485
1486 tree.insert(
1487 &mut pages,
1488 &mut alloc,
1489 TxnId(2),
1490 b"key",
1491 ValueType::Inline,
1492 b"val",
1493 )
1494 .unwrap();
1495 let root_after = tree.root;
1496
1497 assert_ne!(root_before, root_after);
1498 assert!(alloc.freed_this_txn().contains(&root_before));
1499 }
1500
1501 #[test]
1502 fn insert_and_delete_many() {
1503 let (mut pages, mut alloc, mut tree) = new_tree();
1504 let count = 1000u64;
1505
1506 for i in 0..count {
1507 let key = format!("k{i:06}");
1508 let val = format!("v{i:06}");
1509 tree.insert(
1510 &mut pages,
1511 &mut alloc,
1512 TxnId(1),
1513 key.as_bytes(),
1514 ValueType::Inline,
1515 val.as_bytes(),
1516 )
1517 .unwrap();
1518 }
1519 assert_eq!(tree.entry_count, count);
1520
1521 for i in (0..count).step_by(2) {
1522 let key = format!("k{i:06}");
1523 let found = tree
1524 .delete(&mut pages, &mut alloc, TxnId(1), key.as_bytes())
1525 .unwrap();
1526 assert!(found);
1527 }
1528 assert_eq!(tree.entry_count, count / 2);
1529
1530 for i in 0..count {
1531 let key = format!("k{i:06}");
1532 let result = tree.search(&pages, key.as_bytes()).unwrap();
1533 if i % 2 == 0 {
1534 assert_eq!(result, None, "deleted key {key} should not be found");
1535 } else {
1536 let val = format!("v{i:06}");
1537 assert_eq!(result, Some((ValueType::Inline, val.into_bytes())));
1538 }
1539 }
1540 }
1541
1542 #[test]
1543 fn deep_tree_insert_delete() {
1544 let (mut pages, mut alloc, mut tree) = new_tree();
1545
1546 let count = 2000u64;
1547 for i in 0..count {
1548 let key = format!("{i:08}");
1549 tree.insert(
1550 &mut pages,
1551 &mut alloc,
1552 TxnId(1),
1553 key.as_bytes(),
1554 ValueType::Inline,
1555 b"v",
1556 )
1557 .unwrap();
1558 }
1559 assert!(tree.depth >= 2, "depth={} expected >= 2", tree.depth);
1560 assert_eq!(tree.entry_count, count);
1561
1562 for i in 0..count {
1563 let key = format!("{i:08}");
1564 let found = tree
1565 .delete(&mut pages, &mut alloc, TxnId(1), key.as_bytes())
1566 .unwrap();
1567 assert!(found, "key {key} should be deletable");
1568 }
1569 assert_eq!(tree.entry_count, 0);
1570 }
1571}