1use crate::allocator::PageAllocator;
4use citadel_core::types::{PageId, PageType, TxnId, ValueType};
5use citadel_core::{Error, Result};
6use citadel_page::page::Page;
7use citadel_page::{branch_node, leaf_node};
8use rustc_hash::FxHashMap;
9
10#[derive(Clone)]
12pub struct BTree {
13 pub root: PageId,
14 pub depth: u16,
15 pub entry_count: u64,
16 last_insert: Option<(Vec<(PageId, usize)>, PageId)>,
17}
18
19impl BTree {
20 pub fn new(
22 pages: &mut FxHashMap<PageId, Page>,
23 alloc: &mut PageAllocator,
24 txn_id: TxnId,
25 ) -> Self {
26 let root_id = alloc.allocate();
27 let root = Page::new(root_id, PageType::Leaf, txn_id);
28 pages.insert(root_id, root);
29 Self {
30 root: root_id,
31 depth: 1,
32 entry_count: 0,
33 last_insert: None,
34 }
35 }
36
37 pub fn from_existing(root: PageId, depth: u16, entry_count: u64) -> Self {
39 Self {
40 root,
41 depth,
42 entry_count,
43 last_insert: None,
44 }
45 }
46
47 pub fn search(
49 &self,
50 pages: &FxHashMap<PageId, Page>,
51 key: &[u8],
52 ) -> Result<Option<(ValueType, Vec<u8>)>> {
53 let mut current = self.root;
54 loop {
55 let page = pages.get(¤t).ok_or(Error::PageOutOfBounds(current))?;
56 match page.page_type() {
57 Some(PageType::Leaf) => {
58 return match leaf_node::search(page, key) {
59 Ok(idx) => {
60 let cell = leaf_node::read_cell(page, idx);
61 Ok(Some((cell.val_type, cell.value.to_vec())))
62 }
63 Err(_) => Ok(None),
64 };
65 }
66 Some(PageType::Branch) => {
67 let idx = branch_node::search_child_index(page, key);
68 current = branch_node::get_child(page, idx);
69 }
70 _ => {
71 return Err(Error::InvalidPageType(page.page_type_raw(), current));
72 }
73 }
74 }
75 }
76
77 pub fn lil_would_hit(&self, pages: &FxHashMap<PageId, Page>, key: &[u8]) -> bool {
78 if let Some((_, cached_leaf)) = &self.last_insert {
79 if let Some(page) = pages.get(cached_leaf) {
80 let n = page.num_cells();
81 return n > 0 && key > leaf_node::read_cell(page, n - 1).key;
82 }
83 }
84 false
85 }
86
87 pub fn insert(
89 &mut self,
90 pages: &mut FxHashMap<PageId, Page>,
91 alloc: &mut PageAllocator,
92 txn_id: TxnId,
93 key: &[u8],
94 val_type: ValueType,
95 value: &[u8],
96 ) -> Result<bool> {
97 if let Some((mut cached_path, cached_leaf)) = self.last_insert.take() {
99 let (hit, needs_cow) = {
100 let page = pages
101 .get(&cached_leaf)
102 .ok_or(Error::PageOutOfBounds(cached_leaf))?;
103 let n = page.num_cells();
104 let h = n > 0 && key > leaf_node::read_cell(page, n - 1).key;
105 let nc = page.txn_id() != txn_id;
106 (h, nc)
107 };
108 if hit {
109 let cow_id = if needs_cow {
110 cow_page(pages, alloc, cached_leaf, txn_id)
111 } else {
112 cached_leaf
113 };
114 let ok = {
115 let page = pages.get_mut(&cow_id).unwrap();
116 leaf_node::insert_direct(page, key, val_type, value)
117 };
118 if ok {
119 if cow_id != cached_leaf {
120 self.root =
121 propagate_cow_up(pages, alloc, txn_id, &mut cached_path, cow_id);
122 }
123 self.entry_count += 1;
124 self.last_insert = Some((cached_path, cow_id));
125 return Ok(true);
126 }
127 let (sep_key, right_id) =
128 split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
129 self.root = propagate_split_up(
130 pages,
131 alloc,
132 txn_id,
133 &cached_path,
134 cow_id,
135 &sep_key,
136 right_id,
137 &mut self.depth,
138 );
139 self.last_insert = None;
140 self.entry_count += 1;
141 return Ok(true);
142 }
143 }
144
145 let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
146
147 let key_exists = {
148 let page = pages.get(&leaf_id).unwrap();
149 leaf_node::search(page, key).is_ok()
150 };
151
152 let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
153
154 let leaf_ok = {
155 let page = pages.get_mut(&new_leaf_id).unwrap();
156 leaf_node::insert_direct(page, key, val_type, value)
157 };
158
159 if leaf_ok {
160 if alloc.in_place() && new_leaf_id == leaf_id {
163 let mut is_rightmost = true;
164 for &(ancestor_id, child_idx) in path.iter().rev() {
165 let page = pages.get(&ancestor_id).unwrap();
166 if child_idx != page.num_cells() as usize {
167 is_rightmost = false;
168 break;
169 }
170 }
171 if is_rightmost {
172 self.last_insert = Some((path, new_leaf_id));
173 }
174 if !key_exists {
175 self.entry_count += 1;
176 }
177 return Ok(!key_exists);
178 }
179 let mut child = new_leaf_id;
180 let mut is_rightmost = true;
181 let mut new_path = path;
182 for i in (0..new_path.len()).rev() {
183 let (ancestor_id, child_idx) = new_path[i];
184 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
185 let page = pages.get_mut(&new_ancestor).unwrap();
186 update_branch_child(page, child_idx, child);
187 if child_idx != page.num_cells() as usize {
188 is_rightmost = false;
189 }
190 new_path[i] = (new_ancestor, child_idx);
191 child = new_ancestor;
192 }
193 self.root = child;
194
195 if is_rightmost {
196 self.last_insert = Some((new_path, new_leaf_id));
197 }
198
199 if !key_exists {
200 self.entry_count += 1;
201 }
202 return Ok(!key_exists);
203 }
204
205 self.last_insert = None;
206 let (sep_key, right_id) =
207 split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
208 self.root = propagate_split_up(
209 pages,
210 alloc,
211 txn_id,
212 &path,
213 new_leaf_id,
214 &sep_key,
215 right_id,
216 &mut self.depth,
217 );
218
219 if !key_exists {
220 self.entry_count += 1;
221 }
222 Ok(!key_exists)
223 }
224
225 pub fn update_sorted(
227 &mut self,
228 pages: &mut FxHashMap<PageId, Page>,
229 alloc: &mut PageAllocator,
230 txn_id: TxnId,
231 pairs: &[(&[u8], &[u8])],
232 ) -> Result<u64> {
233 if pairs.is_empty() {
234 return Ok(0);
235 }
236 self.last_insert = None;
237
238 let (mut path, mut leaf_id) = self.walk_to_leaf(pages, pairs[0].0)?;
239 let mut cow_leaf = cow_page(pages, alloc, leaf_id, txn_id);
240 if cow_leaf != leaf_id {
241 self.root = propagate_cow_up(pages, alloc, txn_id, &mut path, cow_leaf);
242 }
243
244 let mut count: u64 = 0;
245 let mut hint: u16 = 0;
246
247 for &(key, value) in pairs {
248 let past_leaf = {
249 let page = pages.get(&cow_leaf).unwrap();
250 let n = page.num_cells();
251 n == 0 || key > leaf_node::read_cell(page, n - 1).key
252 };
253
254 if past_leaf {
255 let (new_path, new_leaf) = self.walk_to_leaf(pages, key)?;
256 path = new_path;
257 leaf_id = new_leaf;
258 cow_leaf = cow_page(pages, alloc, leaf_id, txn_id);
259 if cow_leaf != leaf_id {
260 self.root = propagate_cow_up(pages, alloc, txn_id, &mut path, cow_leaf);
261 }
262 hint = 0;
263 }
264
265 let page = pages.get(&cow_leaf).unwrap();
266 let n = page.num_cells();
267 let idx = {
268 let mut i = hint;
269 loop {
270 if i >= n {
271 break None;
272 }
273 let cell = leaf_node::read_cell(page, i);
274 match key.cmp(cell.key) {
275 std::cmp::Ordering::Equal => break Some(i),
276 std::cmp::Ordering::Less => break None,
277 std::cmp::Ordering::Greater => i += 1,
278 }
279 }
280 };
281
282 if let Some(idx) = idx {
283 hint = idx + 1;
284 let page = pages.get_mut(&cow_leaf).unwrap();
285 if !leaf_node::update_value_in_place(page, idx, ValueType::Inline, value) {
286 leaf_node::insert_direct(page, key, ValueType::Inline, value);
287 }
288 count += 1;
289 }
290 }
291
292 Ok(count)
293 }
294
295 pub fn delete(
297 &mut self,
298 pages: &mut FxHashMap<PageId, Page>,
299 alloc: &mut PageAllocator,
300 txn_id: TxnId,
301 key: &[u8],
302 ) -> Result<bool> {
303 self.last_insert = None;
304 let (mut path, leaf_id) = self.walk_to_leaf(pages, key)?;
305
306 let found = {
307 let page = pages.get(&leaf_id).unwrap();
308 leaf_node::search(page, key).is_ok()
309 };
310 if !found {
311 return Ok(false);
312 }
313
314 let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
315 {
316 let page = pages.get_mut(&new_leaf_id).unwrap();
317 leaf_node::delete(page, key);
318 }
319
320 let leaf_empty = pages.get(&new_leaf_id).unwrap().num_cells() == 0;
321
322 if !leaf_empty || path.is_empty() {
323 if alloc.in_place() && new_leaf_id == leaf_id {
324 self.entry_count -= 1;
325 return Ok(true);
326 }
327 self.root = propagate_cow_up(pages, alloc, txn_id, &mut path, new_leaf_id);
328 self.entry_count -= 1;
329 return Ok(true);
330 }
331
332 alloc.free(new_leaf_id);
333 pages.remove(&new_leaf_id);
334
335 self.root = propagate_remove_up(pages, alloc, txn_id, &mut path, &mut self.depth);
336 self.entry_count -= 1;
337 Ok(true)
338 }
339
340 pub fn walk_to_leaf(
342 &self,
343 pages: &FxHashMap<PageId, Page>,
344 key: &[u8],
345 ) -> Result<(Vec<(PageId, usize)>, PageId)> {
346 let mut path = Vec::with_capacity(self.depth as usize);
347 let mut current = self.root;
348 loop {
349 let page = pages.get(¤t).ok_or(Error::PageOutOfBounds(current))?;
350 match page.page_type() {
351 Some(PageType::Leaf) => return Ok((path, current)),
352 Some(PageType::Branch) => {
353 let child_idx = branch_node::search_child_index(page, key);
354 let child = branch_node::get_child(page, child_idx);
355 path.push((current, child_idx));
356 current = child;
357 }
358 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
359 }
360 }
361 }
362}
363
364pub fn cow_page(
366 pages: &mut FxHashMap<PageId, Page>,
367 alloc: &mut PageAllocator,
368 old_id: PageId,
369 txn_id: TxnId,
370) -> PageId {
371 if alloc.in_place() {
372 let page = pages.get_mut(&old_id).unwrap();
373 if page.txn_id() != txn_id {
374 page.set_txn_id(txn_id);
375 }
376 return old_id;
377 }
378 let mut new_page = {
379 let page = pages.get(&old_id).unwrap();
380 if page.txn_id() == txn_id {
381 return old_id;
382 }
383 page.clone()
384 };
385 let new_id = alloc.allocate();
386 new_page.set_page_id(new_id);
387 new_page.set_txn_id(txn_id);
388 pages.insert(new_id, new_page);
389 alloc.free(old_id);
390 new_id
391}
392
393fn update_branch_child(page: &mut Page, child_idx: usize, new_child: PageId) {
395 let n = page.num_cells() as usize;
396 if child_idx < n {
397 let offset = page.cell_offset(child_idx as u16) as usize;
398 page.data[offset..offset + 4].copy_from_slice(&new_child.as_u32().to_le_bytes());
399 } else {
400 page.set_right_child(new_child);
401 }
402}
403
404pub fn propagate_cow_up(
408 pages: &mut FxHashMap<PageId, Page>,
409 alloc: &mut PageAllocator,
410 txn_id: TxnId,
411 path: &mut [(PageId, usize)],
412 mut new_child: PageId,
413) -> PageId {
414 for i in (0..path.len()).rev() {
415 let (ancestor_id, child_idx) = path[i];
416 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
417 let page = pages.get_mut(&new_ancestor).unwrap();
418 update_branch_child(page, child_idx, new_child);
419 path[i] = (new_ancestor, child_idx);
420 new_child = new_ancestor;
421 }
422 new_child
423}
424
425fn split_leaf_with_insert(
427 pages: &mut FxHashMap<PageId, Page>,
428 alloc: &mut PageAllocator,
429 txn_id: TxnId,
430 leaf_id: PageId,
431 key: &[u8],
432 val_type: ValueType,
433 value: &[u8],
434) -> (Vec<u8>, PageId) {
435 let mut cells: Vec<(Vec<u8>, Vec<u8>)> = {
436 let page = pages.get(&leaf_id).unwrap();
437 let n = page.num_cells() as usize;
438 (0..n)
439 .map(|i| {
440 let cell = leaf_node::read_cell(page, i as u16);
441 let raw = leaf_node::read_cell_bytes(page, i as u16);
442 (cell.key.to_vec(), raw)
443 })
444 .collect()
445 };
446
447 let new_raw = leaf_node::build_cell(key, val_type, value);
448 match cells.binary_search_by(|(k, _)| k.as_slice().cmp(key)) {
449 Ok(idx) => cells[idx] = (key.to_vec(), new_raw),
450 Err(idx) => cells.insert(idx, (key.to_vec(), new_raw)),
451 }
452
453 let total = cells.len();
454
455 let usable = citadel_core::constants::USABLE_SIZE;
456 let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
457 cum.push(0);
458 for (_, raw) in &cells {
459 cum.push(cum.last().unwrap() + raw.len());
460 }
461 let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
462 let right_fits = |sp: usize| (cum[total] - cum[sp]) + (total - sp) * 2 <= usable;
463
464 let mut split_point = total / 2;
465 if !left_fits(split_point) || !right_fits(split_point) {
466 split_point = 1;
467 for sp in 1..total {
468 if left_fits(sp) && right_fits(sp) {
469 split_point = sp;
470 if sp >= total / 2 {
471 break;
472 }
473 }
474 }
475 }
476
477 let sep_key = cells[split_point].0.clone();
478
479 {
480 let left_refs: Vec<&[u8]> = cells[..split_point]
481 .iter()
482 .map(|(_, raw)| raw.as_slice())
483 .collect();
484 let page = pages.get_mut(&leaf_id).unwrap();
485 page.rebuild_cells(&left_refs);
486 }
487
488 let right_id = alloc.allocate();
489 {
490 let mut right_page = Page::new(right_id, PageType::Leaf, txn_id);
491 let right_refs: Vec<&[u8]> = cells[split_point..]
492 .iter()
493 .map(|(_, raw)| raw.as_slice())
494 .collect();
495 right_page.rebuild_cells(&right_refs);
496 pages.insert(right_id, right_page);
497 }
498
499 (sep_key, right_id)
500}
501
502#[allow(clippy::too_many_arguments)]
503fn propagate_split_up(
504 pages: &mut FxHashMap<PageId, Page>,
505 alloc: &mut PageAllocator,
506 txn_id: TxnId,
507 path: &[(PageId, usize)],
508 mut left_child: PageId,
509 initial_sep: &[u8],
510 mut right_child: PageId,
511 depth: &mut u16,
512) -> PageId {
513 let mut sep_key = initial_sep.to_vec();
514 let mut pending_split = true;
515
516 for &(ancestor_id, child_idx) in path.iter().rev() {
517 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
518
519 if pending_split {
520 let ok = {
521 let page = pages.get_mut(&new_ancestor).unwrap();
522 branch_node::insert_separator(page, child_idx, left_child, &sep_key, right_child)
523 };
524
525 if ok {
526 pending_split = false;
527 left_child = new_ancestor;
528 } else {
529 let (new_sep, new_right) = split_branch_with_insert(
530 pages,
531 alloc,
532 txn_id,
533 new_ancestor,
534 child_idx,
535 left_child,
536 &sep_key,
537 right_child,
538 );
539 left_child = new_ancestor;
540 sep_key = new_sep;
541 right_child = new_right;
542 }
543 } else {
544 let page = pages.get_mut(&new_ancestor).unwrap();
545 update_branch_child(page, child_idx, left_child);
546 left_child = new_ancestor;
547 }
548 }
549
550 if pending_split {
551 let new_root_id = alloc.allocate();
552 let mut new_root = Page::new(new_root_id, PageType::Branch, txn_id);
553 let cell = branch_node::build_cell(left_child, &sep_key);
554 new_root.write_cell(&cell).unwrap();
555 new_root.set_right_child(right_child);
556 pages.insert(new_root_id, new_root);
557 *depth += 1;
558 new_root_id
559 } else {
560 left_child
561 }
562}
563
564#[allow(clippy::too_many_arguments)]
565fn split_branch_with_insert(
566 pages: &mut FxHashMap<PageId, Page>,
567 alloc: &mut PageAllocator,
568 txn_id: TxnId,
569 branch_id: PageId,
570 child_idx: usize,
571 new_left: PageId,
572 sep_key: &[u8],
573 new_right: PageId,
574) -> (Vec<u8>, PageId) {
575 let (new_cells, final_right_child) = {
576 let page = pages.get(&branch_id).unwrap();
577 let n = page.num_cells() as usize;
578 let cells: Vec<(PageId, Vec<u8>)> = (0..n)
579 .map(|i| {
580 let cell = branch_node::read_cell(page, i as u16);
581 (cell.child, cell.key.to_vec())
582 })
583 .collect();
584 let old_rc = page.right_child();
585
586 let mut result = Vec::with_capacity(n + 1);
587 let final_rc;
588
589 if child_idx < n {
590 let old_key = cells[child_idx].1.clone();
591 for (i, (child, key)) in cells.into_iter().enumerate() {
592 if i == child_idx {
593 result.push((new_left, sep_key.to_vec()));
594 result.push((new_right, old_key.clone()));
595 } else {
596 result.push((child, key));
597 }
598 }
599 final_rc = old_rc;
600 } else {
601 result = cells;
602 result.push((new_left, sep_key.to_vec()));
603 final_rc = new_right;
604 }
605
606 (result, final_rc)
607 };
608
609 let total = new_cells.len();
610 let usable = citadel_core::constants::USABLE_SIZE;
611 let raw_sizes: Vec<usize> = new_cells.iter().map(|(_, key)| 6 + key.len()).collect();
612 let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
613 cum.push(0);
614 for &sz in &raw_sizes {
615 cum.push(cum.last().unwrap() + sz);
616 }
617 let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
618 let right_fits = |sp: usize| {
619 let right_count = total - sp - 1;
620 (cum[total] - cum[sp + 1]) + right_count * 2 <= usable
621 };
622
623 let mut split_point = total / 2;
624 if !left_fits(split_point) || !right_fits(split_point) {
625 split_point = 1;
626 for sp in 1..total.saturating_sub(1) {
627 if left_fits(sp) && right_fits(sp) {
628 split_point = sp;
629 if sp >= total / 2 {
630 break;
631 }
632 }
633 }
634 }
635
636 let promoted_sep = new_cells[split_point].1.clone();
637 let promoted_child = new_cells[split_point].0;
638
639 {
640 let left_raw: Vec<Vec<u8>> = new_cells[..split_point]
641 .iter()
642 .map(|(child, key)| branch_node::build_cell(*child, key))
643 .collect();
644 let left_refs: Vec<&[u8]> = left_raw.iter().map(|c| c.as_slice()).collect();
645 let page = pages.get_mut(&branch_id).unwrap();
646 page.rebuild_cells(&left_refs);
647 page.set_right_child(promoted_child);
648 }
649
650 let right_branch_id = alloc.allocate();
651 {
652 let mut right_page = Page::new(right_branch_id, PageType::Branch, txn_id);
653 let right_raw: Vec<Vec<u8>> = new_cells[split_point + 1..]
654 .iter()
655 .map(|(child, key)| branch_node::build_cell(*child, key))
656 .collect();
657 let right_refs: Vec<&[u8]> = right_raw.iter().map(|c| c.as_slice()).collect();
658 right_page.rebuild_cells(&right_refs);
659 right_page.set_right_child(final_right_child);
660 pages.insert(right_branch_id, right_page);
661 }
662
663 (promoted_sep, right_branch_id)
664}
665
666fn remove_child_from_branch(page: &mut Page, child_idx: usize) {
667 let n = page.num_cells() as usize;
668 if child_idx < n {
669 let cell_sz = branch_node::get_cell_size(page, child_idx as u16);
670 page.delete_cell_at(child_idx as u16, cell_sz);
671 } else {
672 assert!(n > 0, "cannot remove right_child from branch with 0 cells");
673 let last_child = branch_node::read_cell(page, (n - 1) as u16).child;
674 let cell_sz = branch_node::get_cell_size(page, (n - 1) as u16);
675 page.delete_cell_at((n - 1) as u16, cell_sz);
676 page.set_right_child(last_child);
677 }
678}
679
680fn propagate_remove_up(
681 pages: &mut FxHashMap<PageId, Page>,
682 alloc: &mut PageAllocator,
683 txn_id: TxnId,
684 path: &mut [(PageId, usize)],
685 depth: &mut u16,
686) -> PageId {
687 let mut level = path.len();
688 let mut need_remove_at_level = true;
689 let mut new_child = PageId(0);
690
691 while level > 0 && need_remove_at_level {
692 level -= 1;
693 let (ancestor_id, child_idx) = path[level];
694 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
695
696 {
697 let page = pages.get_mut(&new_ancestor).unwrap();
698 remove_child_from_branch(page, child_idx);
699 }
700
701 let num_cells = pages.get(&new_ancestor).unwrap().num_cells();
702
703 if num_cells > 0 || level == 0 {
704 if num_cells == 0 && level == 0 {
705 let only_child = pages.get(&new_ancestor).unwrap().right_child();
707 alloc.free(new_ancestor);
708 pages.remove(&new_ancestor);
709 *depth -= 1;
710 return only_child;
711 }
712 new_child = new_ancestor;
714 need_remove_at_level = false;
715 } else {
716 let only_child = pages.get(&new_ancestor).unwrap().right_child();
718 alloc.free(new_ancestor);
719 pages.remove(&new_ancestor);
720 *depth -= 1;
721
722 new_child = only_child;
723 need_remove_at_level = false;
724 }
725 }
726
727 if level > 0 {
728 let remaining_path = &mut path[..level];
729 new_child = propagate_cow_up(pages, alloc, txn_id, remaining_path, new_child);
730 }
731
732 new_child
733}
734
735#[cfg(test)]
736mod tests {
737 use super::*;
738
739 fn new_tree() -> (FxHashMap<PageId, Page>, PageAllocator, BTree) {
740 let mut pages = FxHashMap::default();
741 let mut alloc = PageAllocator::new(0);
742 let tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
743 (pages, alloc, tree)
744 }
745
746 #[test]
747 fn empty_tree_search() {
748 let (pages, _, tree) = new_tree();
749 assert_eq!(tree.search(&pages, b"anything").unwrap(), None);
750 }
751
752 #[test]
753 fn insert_and_search_single() {
754 let (mut pages, mut alloc, mut tree) = new_tree();
755 let is_new = tree
756 .insert(
757 &mut pages,
758 &mut alloc,
759 TxnId(1),
760 b"hello",
761 ValueType::Inline,
762 b"world",
763 )
764 .unwrap();
765 assert!(is_new);
766 assert_eq!(tree.entry_count, 1);
767
768 let result = tree.search(&pages, b"hello").unwrap();
769 assert_eq!(result, Some((ValueType::Inline, b"world".to_vec())));
770 }
771
772 #[test]
773 fn insert_update_existing() {
774 let (mut pages, mut alloc, mut tree) = new_tree();
775 tree.insert(
776 &mut pages,
777 &mut alloc,
778 TxnId(1),
779 b"key",
780 ValueType::Inline,
781 b"v1",
782 )
783 .unwrap();
784 let is_new = tree
785 .insert(
786 &mut pages,
787 &mut alloc,
788 TxnId(1),
789 b"key",
790 ValueType::Inline,
791 b"v2",
792 )
793 .unwrap();
794 assert!(!is_new);
795 assert_eq!(tree.entry_count, 1);
796
797 let result = tree.search(&pages, b"key").unwrap();
798 assert_eq!(result, Some((ValueType::Inline, b"v2".to_vec())));
799 }
800
801 #[test]
802 fn insert_multiple_sorted() {
803 let (mut pages, mut alloc, mut tree) = new_tree();
804 let keys = [b"dog", b"ant", b"cat", b"fox", b"bat", b"eel"];
805 for k in &keys {
806 tree.insert(&mut pages, &mut alloc, TxnId(1), *k, ValueType::Inline, *k)
807 .unwrap();
808 }
809 assert_eq!(tree.entry_count, 6);
810
811 for k in &keys {
812 let result = tree.search(&pages, *k).unwrap();
813 assert_eq!(result, Some((ValueType::Inline, k.to_vec())));
814 }
815
816 assert_eq!(tree.search(&pages, b"zebra").unwrap(), None);
817 }
818
819 #[test]
820 fn insert_triggers_leaf_split() {
821 let (mut pages, mut alloc, mut tree) = new_tree();
822
823 let count = 500;
825 for i in 0..count {
826 let key = format!("key-{i:05}");
827 let val = format!("val-{i:05}");
828 tree.insert(
829 &mut pages,
830 &mut alloc,
831 TxnId(1),
832 key.as_bytes(),
833 ValueType::Inline,
834 val.as_bytes(),
835 )
836 .unwrap();
837 }
838
839 assert_eq!(tree.entry_count, count);
840 assert!(
841 tree.depth >= 2,
842 "tree should have split (depth={})",
843 tree.depth
844 );
845
846 for i in 0..count {
847 let key = format!("key-{i:05}");
848 let val = format!("val-{i:05}");
849 let result = tree.search(&pages, key.as_bytes()).unwrap();
850 assert_eq!(result, Some((ValueType::Inline, val.into_bytes())));
851 }
852 }
853
854 #[test]
855 fn delete_existing_key() {
856 let (mut pages, mut alloc, mut tree) = new_tree();
857 tree.insert(
858 &mut pages,
859 &mut alloc,
860 TxnId(1),
861 b"a",
862 ValueType::Inline,
863 b"1",
864 )
865 .unwrap();
866 tree.insert(
867 &mut pages,
868 &mut alloc,
869 TxnId(1),
870 b"b",
871 ValueType::Inline,
872 b"2",
873 )
874 .unwrap();
875 tree.insert(
876 &mut pages,
877 &mut alloc,
878 TxnId(1),
879 b"c",
880 ValueType::Inline,
881 b"3",
882 )
883 .unwrap();
884
885 let found = tree.delete(&mut pages, &mut alloc, TxnId(1), b"b").unwrap();
886 assert!(found);
887 assert_eq!(tree.entry_count, 2);
888 assert_eq!(tree.search(&pages, b"b").unwrap(), None);
889 assert_eq!(
890 tree.search(&pages, b"a").unwrap(),
891 Some((ValueType::Inline, b"1".to_vec()))
892 );
893 assert_eq!(
894 tree.search(&pages, b"c").unwrap(),
895 Some((ValueType::Inline, b"3".to_vec()))
896 );
897 }
898
899 #[test]
900 fn delete_nonexistent_key() {
901 let (mut pages, mut alloc, mut tree) = new_tree();
902 tree.insert(
903 &mut pages,
904 &mut alloc,
905 TxnId(1),
906 b"a",
907 ValueType::Inline,
908 b"1",
909 )
910 .unwrap();
911 let found = tree.delete(&mut pages, &mut alloc, TxnId(1), b"z").unwrap();
912 assert!(!found);
913 assert_eq!(tree.entry_count, 1);
914 }
915
916 #[test]
917 fn delete_all_from_root_leaf() {
918 let (mut pages, mut alloc, mut tree) = new_tree();
919 tree.insert(
920 &mut pages,
921 &mut alloc,
922 TxnId(1),
923 b"x",
924 ValueType::Inline,
925 b"1",
926 )
927 .unwrap();
928 tree.delete(&mut pages, &mut alloc, TxnId(1), b"x").unwrap();
929 assert_eq!(tree.entry_count, 0);
930
931 let root = pages.get(&tree.root).unwrap();
932 assert_eq!(root.page_type(), Some(PageType::Leaf));
933 assert_eq!(root.num_cells(), 0);
934 }
935
936 #[test]
937 fn cow_produces_new_page_ids() {
938 let (mut pages, mut alloc, mut tree) = new_tree();
939 let root_before = tree.root;
940
941 tree.insert(
942 &mut pages,
943 &mut alloc,
944 TxnId(2),
945 b"key",
946 ValueType::Inline,
947 b"val",
948 )
949 .unwrap();
950 let root_after = tree.root;
951
952 assert_ne!(root_before, root_after);
953 assert!(alloc.freed_this_txn().contains(&root_before));
954 }
955
956 #[test]
957 fn insert_and_delete_many() {
958 let (mut pages, mut alloc, mut tree) = new_tree();
959 let count = 1000u64;
960
961 for i in 0..count {
962 let key = format!("k{i:06}");
963 let val = format!("v{i:06}");
964 tree.insert(
965 &mut pages,
966 &mut alloc,
967 TxnId(1),
968 key.as_bytes(),
969 ValueType::Inline,
970 val.as_bytes(),
971 )
972 .unwrap();
973 }
974 assert_eq!(tree.entry_count, count);
975
976 for i in (0..count).step_by(2) {
977 let key = format!("k{i:06}");
978 let found = tree
979 .delete(&mut pages, &mut alloc, TxnId(1), key.as_bytes())
980 .unwrap();
981 assert!(found);
982 }
983 assert_eq!(tree.entry_count, count / 2);
984
985 for i in 0..count {
986 let key = format!("k{i:06}");
987 let result = tree.search(&pages, key.as_bytes()).unwrap();
988 if i % 2 == 0 {
989 assert_eq!(result, None, "deleted key {key} should not be found");
990 } else {
991 let val = format!("v{i:06}");
992 assert_eq!(result, Some((ValueType::Inline, val.into_bytes())));
993 }
994 }
995 }
996
997 #[test]
998 fn deep_tree_insert_delete() {
999 let (mut pages, mut alloc, mut tree) = new_tree();
1000
1001 let count = 2000u64;
1002 for i in 0..count {
1003 let key = format!("{i:08}");
1004 tree.insert(
1005 &mut pages,
1006 &mut alloc,
1007 TxnId(1),
1008 key.as_bytes(),
1009 ValueType::Inline,
1010 b"v",
1011 )
1012 .unwrap();
1013 }
1014 assert!(tree.depth >= 2, "depth={} expected >= 2", tree.depth);
1015 assert_eq!(tree.entry_count, count);
1016
1017 for i in 0..count {
1018 let key = format!("{i:08}");
1019 let found = tree
1020 .delete(&mut pages, &mut alloc, TxnId(1), key.as_bytes())
1021 .unwrap();
1022 assert!(found, "key {key} should be deletable");
1023 }
1024 assert_eq!(tree.entry_count, 0);
1025 }
1026}