1use crate::allocator::PageAllocator;
4use citadel_core::types::{PageId, PageType, TxnId, ValueType};
5use citadel_core::{Error, Result};
6use citadel_page::page::Page;
7use citadel_page::{branch_node, leaf_node};
8use std::collections::HashMap;
9
10#[derive(Clone)]
12pub struct BTree {
13 pub root: PageId,
14 pub depth: u16,
15 pub entry_count: u64,
16 last_insert: Option<(Vec<(PageId, usize)>, PageId)>,
17}
18
19impl BTree {
20 pub fn new(
22 pages: &mut HashMap<PageId, Page>,
23 alloc: &mut PageAllocator,
24 txn_id: TxnId,
25 ) -> Self {
26 let root_id = alloc.allocate();
27 let root = Page::new(root_id, PageType::Leaf, txn_id);
28 pages.insert(root_id, root);
29 Self {
30 root: root_id,
31 depth: 1,
32 entry_count: 0,
33 last_insert: None,
34 }
35 }
36
37 pub fn from_existing(root: PageId, depth: u16, entry_count: u64) -> Self {
39 Self {
40 root,
41 depth,
42 entry_count,
43 last_insert: None,
44 }
45 }
46
47 pub fn search(
49 &self,
50 pages: &HashMap<PageId, Page>,
51 key: &[u8],
52 ) -> Result<Option<(ValueType, Vec<u8>)>> {
53 let mut current = self.root;
54 loop {
55 let page = pages.get(¤t).ok_or(Error::PageOutOfBounds(current))?;
56 match page.page_type() {
57 Some(PageType::Leaf) => {
58 return match leaf_node::search(page, key) {
59 Ok(idx) => {
60 let cell = leaf_node::read_cell(page, idx);
61 Ok(Some((cell.val_type, cell.value.to_vec())))
62 }
63 Err(_) => Ok(None),
64 };
65 }
66 Some(PageType::Branch) => {
67 let idx = branch_node::search_child_index(page, key);
68 current = branch_node::get_child(page, idx);
69 }
70 _ => {
71 return Err(Error::InvalidPageType(page.page_type_raw(), current));
72 }
73 }
74 }
75 }
76
77 pub fn lil_would_hit(&self, pages: &HashMap<PageId, Page>, key: &[u8]) -> bool {
78 if let Some((_, cached_leaf)) = &self.last_insert {
79 if let Some(page) = pages.get(cached_leaf) {
80 let n = page.num_cells();
81 return n > 0 && key > leaf_node::read_cell(page, n - 1).key;
82 }
83 }
84 false
85 }
86
87 pub fn insert(
89 &mut self,
90 pages: &mut HashMap<PageId, Page>,
91 alloc: &mut PageAllocator,
92 txn_id: TxnId,
93 key: &[u8],
94 val_type: ValueType,
95 value: &[u8],
96 ) -> Result<bool> {
97 if let Some((cached_path, cached_leaf)) = self.last_insert.take() {
99 let hit = {
100 let page = pages
101 .get(&cached_leaf)
102 .ok_or(Error::PageOutOfBounds(cached_leaf))?;
103 let n = page.num_cells();
104 n > 0 && key > leaf_node::read_cell(page, n - 1).key
105 };
106 if hit {
107 let cow_id = cow_page(pages, alloc, cached_leaf, txn_id);
108 let ok = {
109 let page = pages.get_mut(&cow_id).unwrap();
110 leaf_node::insert_direct(page, key, val_type, value)
111 };
112 if ok {
113 if cow_id != cached_leaf {
114 self.root = propagate_cow_up(pages, alloc, txn_id, &cached_path, cow_id);
115 }
116 self.entry_count += 1;
117 self.last_insert = Some((cached_path, cow_id));
118 return Ok(true);
119 }
120 let (sep_key, right_id) =
121 split_leaf_with_insert(pages, alloc, txn_id, cow_id, key, val_type, value);
122 self.root = propagate_split_up(
123 pages,
124 alloc,
125 txn_id,
126 &cached_path,
127 cow_id,
128 &sep_key,
129 right_id,
130 &mut self.depth,
131 );
132 self.last_insert = None;
133 self.entry_count += 1;
134 return Ok(true);
135 }
136 }
137
138 let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
139
140 let key_exists = {
141 let page = pages.get(&leaf_id).unwrap();
142 leaf_node::search(page, key).is_ok()
143 };
144
145 let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
146
147 let leaf_ok = {
148 let page = pages.get_mut(&new_leaf_id).unwrap();
149 leaf_node::insert_direct(page, key, val_type, value)
150 };
151
152 if leaf_ok {
153 let mut child = new_leaf_id;
154 let mut is_rightmost = true;
155 let mut new_path = path;
156 for i in (0..new_path.len()).rev() {
157 let (ancestor_id, child_idx) = new_path[i];
158 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
159 let page = pages.get_mut(&new_ancestor).unwrap();
160 update_branch_child(page, child_idx, child);
161 if child_idx != page.num_cells() as usize {
162 is_rightmost = false;
163 }
164 new_path[i] = (new_ancestor, child_idx);
165 child = new_ancestor;
166 }
167 self.root = child;
168
169 if is_rightmost {
170 self.last_insert = Some((new_path, new_leaf_id));
171 }
172
173 if !key_exists {
174 self.entry_count += 1;
175 }
176 return Ok(!key_exists);
177 }
178
179 self.last_insert = None;
180 let (sep_key, right_id) =
181 split_leaf_with_insert(pages, alloc, txn_id, new_leaf_id, key, val_type, value);
182 self.root = propagate_split_up(
183 pages,
184 alloc,
185 txn_id,
186 &path,
187 new_leaf_id,
188 &sep_key,
189 right_id,
190 &mut self.depth,
191 );
192
193 if !key_exists {
194 self.entry_count += 1;
195 }
196 Ok(!key_exists)
197 }
198
199 pub fn update_sorted(
201 &mut self,
202 pages: &mut HashMap<PageId, Page>,
203 alloc: &mut PageAllocator,
204 txn_id: TxnId,
205 pairs: &[(&[u8], &[u8])],
206 ) -> Result<u64> {
207 if pairs.is_empty() {
208 return Ok(0);
209 }
210 self.last_insert = None;
211
212 let (mut path, mut leaf_id) = self.walk_to_leaf(pages, pairs[0].0)?;
213 let mut cow_leaf = cow_page(pages, alloc, leaf_id, txn_id);
214 if cow_leaf != leaf_id {
215 self.root = propagate_cow_up(pages, alloc, txn_id, &path, cow_leaf);
216 }
217
218 let mut count: u64 = 0;
219 let mut hint: u16 = 0;
220
221 for &(key, value) in pairs {
222 let past_leaf = {
224 let page = pages.get(&cow_leaf).unwrap();
225 let n = page.num_cells();
226 n == 0 || key > leaf_node::read_cell(page, n - 1).key
227 };
228
229 if past_leaf {
230 let (new_path, new_leaf) = self.walk_to_leaf(pages, key)?;
231 path = new_path;
232 leaf_id = new_leaf;
233 cow_leaf = cow_page(pages, alloc, leaf_id, txn_id);
234 if cow_leaf != leaf_id {
235 self.root = propagate_cow_up(pages, alloc, txn_id, &path, cow_leaf);
236 }
237 hint = 0;
238 }
239
240 let page = pages.get(&cow_leaf).unwrap();
242 let n = page.num_cells();
243 let idx = {
244 let mut i = hint;
245 loop {
246 if i >= n {
247 break None;
248 }
249 let cell = leaf_node::read_cell(page, i);
250 match key.cmp(cell.key) {
251 std::cmp::Ordering::Equal => break Some(i),
252 std::cmp::Ordering::Less => break None,
253 std::cmp::Ordering::Greater => i += 1,
254 }
255 }
256 };
257
258 if let Some(idx) = idx {
259 hint = idx + 1;
260 let page = pages.get_mut(&cow_leaf).unwrap();
262 if !leaf_node::update_value_in_place(page, idx, ValueType::Inline, value) {
263 leaf_node::insert_direct(page, key, ValueType::Inline, value);
265 }
266 count += 1;
267 }
268 }
269
270 Ok(count)
271 }
272
273 pub fn delete(
275 &mut self,
276 pages: &mut HashMap<PageId, Page>,
277 alloc: &mut PageAllocator,
278 txn_id: TxnId,
279 key: &[u8],
280 ) -> Result<bool> {
281 self.last_insert = None;
282 let (path, leaf_id) = self.walk_to_leaf(pages, key)?;
283
284 let found = {
285 let page = pages.get(&leaf_id).unwrap();
286 leaf_node::search(page, key).is_ok()
287 };
288 if !found {
289 return Ok(false);
290 }
291
292 let new_leaf_id = cow_page(pages, alloc, leaf_id, txn_id);
293 {
294 let page = pages.get_mut(&new_leaf_id).unwrap();
295 leaf_node::delete(page, key);
296 }
297
298 let leaf_empty = pages.get(&new_leaf_id).unwrap().num_cells() == 0;
299
300 if !leaf_empty || path.is_empty() {
301 self.root = propagate_cow_up(pages, alloc, txn_id, &path, new_leaf_id);
302 self.entry_count -= 1;
303 return Ok(true);
304 }
305
306 alloc.free(new_leaf_id);
307 pages.remove(&new_leaf_id);
308
309 self.root = propagate_remove_up(pages, alloc, txn_id, &path, &mut self.depth);
310 self.entry_count -= 1;
311 Ok(true)
312 }
313
314 pub fn walk_to_leaf(
316 &self,
317 pages: &HashMap<PageId, Page>,
318 key: &[u8],
319 ) -> Result<(Vec<(PageId, usize)>, PageId)> {
320 let mut path = Vec::with_capacity(self.depth as usize);
321 let mut current = self.root;
322 loop {
323 let page = pages.get(¤t).ok_or(Error::PageOutOfBounds(current))?;
324 match page.page_type() {
325 Some(PageType::Leaf) => return Ok((path, current)),
326 Some(PageType::Branch) => {
327 let child_idx = branch_node::search_child_index(page, key);
328 let child = branch_node::get_child(page, child_idx);
329 path.push((current, child_idx));
330 current = child;
331 }
332 _ => return Err(Error::InvalidPageType(page.page_type_raw(), current)),
333 }
334 }
335 }
336}
337
338pub fn cow_page(
340 pages: &mut HashMap<PageId, Page>,
341 alloc: &mut PageAllocator,
342 old_id: PageId,
343 txn_id: TxnId,
344) -> PageId {
345 if pages.get(&old_id).unwrap().txn_id() == txn_id {
346 return old_id;
347 }
348 if alloc.in_place() {
349 let page = pages.get_mut(&old_id).unwrap();
350 page.set_txn_id(txn_id);
351 return old_id;
352 }
353 let new_id = alloc.allocate();
354 let mut new_page = pages.get(&old_id).unwrap().clone();
355 new_page.set_page_id(new_id);
356 new_page.set_txn_id(txn_id);
357 pages.insert(new_id, new_page);
358 alloc.free(old_id);
359 new_id
360}
361
362fn update_branch_child(page: &mut Page, child_idx: usize, new_child: PageId) {
364 let n = page.num_cells() as usize;
365 if child_idx < n {
366 let offset = page.cell_offset(child_idx as u16) as usize;
367 page.data[offset..offset + 4].copy_from_slice(&new_child.as_u32().to_le_bytes());
368 } else {
369 page.set_right_child(new_child);
370 }
371}
372
373pub fn propagate_cow_up(
375 pages: &mut HashMap<PageId, Page>,
376 alloc: &mut PageAllocator,
377 txn_id: TxnId,
378 path: &[(PageId, usize)],
379 mut new_child: PageId,
380) -> PageId {
381 for &(ancestor_id, child_idx) in path.iter().rev() {
382 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
383 let page = pages.get_mut(&new_ancestor).unwrap();
384 update_branch_child(page, child_idx, new_child);
385 new_child = new_ancestor;
386 }
387 new_child
388}
389
390fn split_leaf_with_insert(
392 pages: &mut HashMap<PageId, Page>,
393 alloc: &mut PageAllocator,
394 txn_id: TxnId,
395 leaf_id: PageId,
396 key: &[u8],
397 val_type: ValueType,
398 value: &[u8],
399) -> (Vec<u8>, PageId) {
400 let mut cells: Vec<(Vec<u8>, Vec<u8>)> = {
401 let page = pages.get(&leaf_id).unwrap();
402 let n = page.num_cells() as usize;
403 (0..n)
404 .map(|i| {
405 let cell = leaf_node::read_cell(page, i as u16);
406 let raw = leaf_node::read_cell_bytes(page, i as u16);
407 (cell.key.to_vec(), raw)
408 })
409 .collect()
410 };
411
412 let new_raw = leaf_node::build_cell(key, val_type, value);
413 match cells.binary_search_by(|(k, _)| k.as_slice().cmp(key)) {
414 Ok(idx) => cells[idx] = (key.to_vec(), new_raw),
415 Err(idx) => cells.insert(idx, (key.to_vec(), new_raw)),
416 }
417
418 let total = cells.len();
419
420 let usable = citadel_core::constants::USABLE_SIZE;
421 let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
422 cum.push(0);
423 for (_, raw) in &cells {
424 cum.push(cum.last().unwrap() + raw.len());
425 }
426 let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
427 let right_fits = |sp: usize| (cum[total] - cum[sp]) + (total - sp) * 2 <= usable;
428
429 let mut split_point = total / 2;
430 if !left_fits(split_point) || !right_fits(split_point) {
431 split_point = 1;
432 for sp in 1..total {
433 if left_fits(sp) && right_fits(sp) {
434 split_point = sp;
435 if sp >= total / 2 {
436 break;
437 }
438 }
439 }
440 }
441
442 let sep_key = cells[split_point].0.clone();
443
444 {
445 let left_refs: Vec<&[u8]> = cells[..split_point]
446 .iter()
447 .map(|(_, raw)| raw.as_slice())
448 .collect();
449 let page = pages.get_mut(&leaf_id).unwrap();
450 page.rebuild_cells(&left_refs);
451 }
452
453 let right_id = alloc.allocate();
454 {
455 let mut right_page = Page::new(right_id, PageType::Leaf, txn_id);
456 let right_refs: Vec<&[u8]> = cells[split_point..]
457 .iter()
458 .map(|(_, raw)| raw.as_slice())
459 .collect();
460 right_page.rebuild_cells(&right_refs);
461 pages.insert(right_id, right_page);
462 }
463
464 (sep_key, right_id)
465}
466
467#[allow(clippy::too_many_arguments)]
468fn propagate_split_up(
469 pages: &mut HashMap<PageId, Page>,
470 alloc: &mut PageAllocator,
471 txn_id: TxnId,
472 path: &[(PageId, usize)],
473 mut left_child: PageId,
474 initial_sep: &[u8],
475 mut right_child: PageId,
476 depth: &mut u16,
477) -> PageId {
478 let mut sep_key = initial_sep.to_vec();
479 let mut pending_split = true;
480
481 for &(ancestor_id, child_idx) in path.iter().rev() {
482 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
483
484 if pending_split {
485 let ok = {
486 let page = pages.get_mut(&new_ancestor).unwrap();
487 branch_node::insert_separator(page, child_idx, left_child, &sep_key, right_child)
488 };
489
490 if ok {
491 pending_split = false;
492 left_child = new_ancestor;
493 } else {
494 let (new_sep, new_right) = split_branch_with_insert(
495 pages,
496 alloc,
497 txn_id,
498 new_ancestor,
499 child_idx,
500 left_child,
501 &sep_key,
502 right_child,
503 );
504 left_child = new_ancestor;
505 sep_key = new_sep;
506 right_child = new_right;
507 }
508 } else {
509 let page = pages.get_mut(&new_ancestor).unwrap();
510 update_branch_child(page, child_idx, left_child);
511 left_child = new_ancestor;
512 }
513 }
514
515 if pending_split {
516 let new_root_id = alloc.allocate();
517 let mut new_root = Page::new(new_root_id, PageType::Branch, txn_id);
518 let cell = branch_node::build_cell(left_child, &sep_key);
519 new_root.write_cell(&cell).unwrap();
520 new_root.set_right_child(right_child);
521 pages.insert(new_root_id, new_root);
522 *depth += 1;
523 new_root_id
524 } else {
525 left_child
526 }
527}
528
529#[allow(clippy::too_many_arguments)]
530fn split_branch_with_insert(
531 pages: &mut HashMap<PageId, Page>,
532 alloc: &mut PageAllocator,
533 txn_id: TxnId,
534 branch_id: PageId,
535 child_idx: usize,
536 new_left: PageId,
537 sep_key: &[u8],
538 new_right: PageId,
539) -> (Vec<u8>, PageId) {
540 let (new_cells, final_right_child) = {
542 let page = pages.get(&branch_id).unwrap();
543 let n = page.num_cells() as usize;
544 let cells: Vec<(PageId, Vec<u8>)> = (0..n)
545 .map(|i| {
546 let cell = branch_node::read_cell(page, i as u16);
547 (cell.child, cell.key.to_vec())
548 })
549 .collect();
550 let old_rc = page.right_child();
551
552 let mut result = Vec::with_capacity(n + 1);
553 let final_rc;
554
555 if child_idx < n {
556 let old_key = cells[child_idx].1.clone();
557 for (i, (child, key)) in cells.into_iter().enumerate() {
558 if i == child_idx {
559 result.push((new_left, sep_key.to_vec()));
560 result.push((new_right, old_key.clone()));
561 } else {
562 result.push((child, key));
563 }
564 }
565 final_rc = old_rc;
566 } else {
567 result = cells;
568 result.push((new_left, sep_key.to_vec()));
569 final_rc = new_right;
570 }
571
572 (result, final_rc)
573 };
574
575 let total = new_cells.len();
576 let usable = citadel_core::constants::USABLE_SIZE;
577 let raw_sizes: Vec<usize> = new_cells.iter().map(|(_, key)| 6 + key.len()).collect();
578 let mut cum: Vec<usize> = Vec::with_capacity(total + 1);
579 cum.push(0);
580 for &sz in &raw_sizes {
581 cum.push(cum.last().unwrap() + sz);
582 }
583 let left_fits = |sp: usize| cum[sp] + sp * 2 <= usable;
584 let right_fits = |sp: usize| {
585 let right_count = total - sp - 1;
586 (cum[total] - cum[sp + 1]) + right_count * 2 <= usable
587 };
588
589 let mut split_point = total / 2;
590 if !left_fits(split_point) || !right_fits(split_point) {
591 split_point = 1;
592 for sp in 1..total.saturating_sub(1) {
593 if left_fits(sp) && right_fits(sp) {
594 split_point = sp;
595 if sp >= total / 2 {
596 break;
597 }
598 }
599 }
600 }
601
602 let promoted_sep = new_cells[split_point].1.clone();
603 let promoted_child = new_cells[split_point].0;
604
605 {
606 let left_raw: Vec<Vec<u8>> = new_cells[..split_point]
607 .iter()
608 .map(|(child, key)| branch_node::build_cell(*child, key))
609 .collect();
610 let left_refs: Vec<&[u8]> = left_raw.iter().map(|c| c.as_slice()).collect();
611 let page = pages.get_mut(&branch_id).unwrap();
612 page.rebuild_cells(&left_refs);
613 page.set_right_child(promoted_child);
614 }
615
616 let right_branch_id = alloc.allocate();
617 {
618 let mut right_page = Page::new(right_branch_id, PageType::Branch, txn_id);
619 let right_raw: Vec<Vec<u8>> = new_cells[split_point + 1..]
620 .iter()
621 .map(|(child, key)| branch_node::build_cell(*child, key))
622 .collect();
623 let right_refs: Vec<&[u8]> = right_raw.iter().map(|c| c.as_slice()).collect();
624 right_page.rebuild_cells(&right_refs);
625 right_page.set_right_child(final_right_child);
626 pages.insert(right_branch_id, right_page);
627 }
628
629 (promoted_sep, right_branch_id)
630}
631
632fn remove_child_from_branch(page: &mut Page, child_idx: usize) {
633 let n = page.num_cells() as usize;
634 if child_idx < n {
635 let cell_sz = branch_node::get_cell_size(page, child_idx as u16);
636 page.delete_cell_at(child_idx as u16, cell_sz);
637 } else {
638 assert!(n > 0, "cannot remove right_child from branch with 0 cells");
639 let last_child = branch_node::read_cell(page, (n - 1) as u16).child;
640 let cell_sz = branch_node::get_cell_size(page, (n - 1) as u16);
641 page.delete_cell_at((n - 1) as u16, cell_sz);
642 page.set_right_child(last_child);
643 }
644}
645
646fn propagate_remove_up(
647 pages: &mut HashMap<PageId, Page>,
648 alloc: &mut PageAllocator,
649 txn_id: TxnId,
650 path: &[(PageId, usize)],
651 depth: &mut u16,
652) -> PageId {
653 let mut level = path.len();
655
656 let mut need_remove_at_level = true;
658
659 let mut new_child = PageId(0); while level > 0 && need_remove_at_level {
663 level -= 1;
664 let (ancestor_id, child_idx) = path[level];
665 let new_ancestor = cow_page(pages, alloc, ancestor_id, txn_id);
666
667 {
668 let page = pages.get_mut(&new_ancestor).unwrap();
669 remove_child_from_branch(page, child_idx);
670 }
671
672 let num_cells = pages.get(&new_ancestor).unwrap().num_cells();
673
674 if num_cells > 0 || level == 0 {
675 if num_cells == 0 && level == 0 {
676 let only_child = pages.get(&new_ancestor).unwrap().right_child();
678 alloc.free(new_ancestor);
679 pages.remove(&new_ancestor);
680 *depth -= 1;
681 return only_child;
682 }
683 new_child = new_ancestor;
685 need_remove_at_level = false;
686 } else {
687 let only_child = pages.get(&new_ancestor).unwrap().right_child();
689 alloc.free(new_ancestor);
690 pages.remove(&new_ancestor);
691 *depth -= 1;
692
693 new_child = only_child;
695 need_remove_at_level = false;
696 }
697 }
698
699 if level > 0 {
701 let remaining_path = &path[..level];
702 new_child = propagate_cow_up(pages, alloc, txn_id, remaining_path, new_child);
703 }
704
705 new_child
706}
707
708#[cfg(test)]
709mod tests {
710 use super::*;
711
712 fn new_tree() -> (HashMap<PageId, Page>, PageAllocator, BTree) {
713 let mut pages = HashMap::new();
714 let mut alloc = PageAllocator::new(0);
715 let tree = BTree::new(&mut pages, &mut alloc, TxnId(1));
716 (pages, alloc, tree)
717 }
718
719 #[test]
720 fn empty_tree_search() {
721 let (pages, _, tree) = new_tree();
722 assert_eq!(tree.search(&pages, b"anything").unwrap(), None);
723 }
724
725 #[test]
726 fn insert_and_search_single() {
727 let (mut pages, mut alloc, mut tree) = new_tree();
728 let is_new = tree
729 .insert(
730 &mut pages,
731 &mut alloc,
732 TxnId(1),
733 b"hello",
734 ValueType::Inline,
735 b"world",
736 )
737 .unwrap();
738 assert!(is_new);
739 assert_eq!(tree.entry_count, 1);
740
741 let result = tree.search(&pages, b"hello").unwrap();
742 assert_eq!(result, Some((ValueType::Inline, b"world".to_vec())));
743 }
744
745 #[test]
746 fn insert_update_existing() {
747 let (mut pages, mut alloc, mut tree) = new_tree();
748 tree.insert(
749 &mut pages,
750 &mut alloc,
751 TxnId(1),
752 b"key",
753 ValueType::Inline,
754 b"v1",
755 )
756 .unwrap();
757 let is_new = tree
758 .insert(
759 &mut pages,
760 &mut alloc,
761 TxnId(1),
762 b"key",
763 ValueType::Inline,
764 b"v2",
765 )
766 .unwrap();
767 assert!(!is_new);
768 assert_eq!(tree.entry_count, 1);
769
770 let result = tree.search(&pages, b"key").unwrap();
771 assert_eq!(result, Some((ValueType::Inline, b"v2".to_vec())));
772 }
773
774 #[test]
775 fn insert_multiple_sorted() {
776 let (mut pages, mut alloc, mut tree) = new_tree();
777 let keys = [b"dog", b"ant", b"cat", b"fox", b"bat", b"eel"];
778 for k in &keys {
779 tree.insert(&mut pages, &mut alloc, TxnId(1), *k, ValueType::Inline, *k)
780 .unwrap();
781 }
782 assert_eq!(tree.entry_count, 6);
783
784 for k in &keys {
786 let result = tree.search(&pages, *k).unwrap();
787 assert_eq!(result, Some((ValueType::Inline, k.to_vec())));
788 }
789
790 assert_eq!(tree.search(&pages, b"zebra").unwrap(), None);
792 }
793
794 #[test]
795 fn insert_triggers_leaf_split() {
796 let (mut pages, mut alloc, mut tree) = new_tree();
797
798 let count = 500;
800 for i in 0..count {
801 let key = format!("key-{i:05}");
802 let val = format!("val-{i:05}");
803 tree.insert(
804 &mut pages,
805 &mut alloc,
806 TxnId(1),
807 key.as_bytes(),
808 ValueType::Inline,
809 val.as_bytes(),
810 )
811 .unwrap();
812 }
813
814 assert_eq!(tree.entry_count, count);
815 assert!(
816 tree.depth >= 2,
817 "tree should have split (depth={})",
818 tree.depth
819 );
820
821 for i in 0..count {
823 let key = format!("key-{i:05}");
824 let val = format!("val-{i:05}");
825 let result = tree.search(&pages, key.as_bytes()).unwrap();
826 assert_eq!(result, Some((ValueType::Inline, val.into_bytes())));
827 }
828 }
829
830 #[test]
831 fn delete_existing_key() {
832 let (mut pages, mut alloc, mut tree) = new_tree();
833 tree.insert(
834 &mut pages,
835 &mut alloc,
836 TxnId(1),
837 b"a",
838 ValueType::Inline,
839 b"1",
840 )
841 .unwrap();
842 tree.insert(
843 &mut pages,
844 &mut alloc,
845 TxnId(1),
846 b"b",
847 ValueType::Inline,
848 b"2",
849 )
850 .unwrap();
851 tree.insert(
852 &mut pages,
853 &mut alloc,
854 TxnId(1),
855 b"c",
856 ValueType::Inline,
857 b"3",
858 )
859 .unwrap();
860
861 let found = tree.delete(&mut pages, &mut alloc, TxnId(1), b"b").unwrap();
862 assert!(found);
863 assert_eq!(tree.entry_count, 2);
864 assert_eq!(tree.search(&pages, b"b").unwrap(), None);
865 assert_eq!(
866 tree.search(&pages, b"a").unwrap(),
867 Some((ValueType::Inline, b"1".to_vec()))
868 );
869 assert_eq!(
870 tree.search(&pages, b"c").unwrap(),
871 Some((ValueType::Inline, b"3".to_vec()))
872 );
873 }
874
875 #[test]
876 fn delete_nonexistent_key() {
877 let (mut pages, mut alloc, mut tree) = new_tree();
878 tree.insert(
879 &mut pages,
880 &mut alloc,
881 TxnId(1),
882 b"a",
883 ValueType::Inline,
884 b"1",
885 )
886 .unwrap();
887 let found = tree.delete(&mut pages, &mut alloc, TxnId(1), b"z").unwrap();
888 assert!(!found);
889 assert_eq!(tree.entry_count, 1);
890 }
891
892 #[test]
893 fn delete_all_from_root_leaf() {
894 let (mut pages, mut alloc, mut tree) = new_tree();
895 tree.insert(
896 &mut pages,
897 &mut alloc,
898 TxnId(1),
899 b"x",
900 ValueType::Inline,
901 b"1",
902 )
903 .unwrap();
904 tree.delete(&mut pages, &mut alloc, TxnId(1), b"x").unwrap();
905 assert_eq!(tree.entry_count, 0);
906
907 let root = pages.get(&tree.root).unwrap();
909 assert_eq!(root.page_type(), Some(PageType::Leaf));
910 assert_eq!(root.num_cells(), 0);
911 }
912
913 #[test]
914 fn cow_produces_new_page_ids() {
915 let (mut pages, mut alloc, mut tree) = new_tree();
916 let root_before = tree.root;
917
918 tree.insert(
919 &mut pages,
920 &mut alloc,
921 TxnId(2),
922 b"key",
923 ValueType::Inline,
924 b"val",
925 )
926 .unwrap();
927 let root_after = tree.root;
928
929 assert_ne!(root_before, root_after);
931 assert!(alloc.freed_this_txn().contains(&root_before));
933 }
934
935 #[test]
936 fn insert_and_delete_many() {
937 let (mut pages, mut alloc, mut tree) = new_tree();
938 let count = 1000u64;
939
940 for i in 0..count {
942 let key = format!("k{i:06}");
943 let val = format!("v{i:06}");
944 tree.insert(
945 &mut pages,
946 &mut alloc,
947 TxnId(1),
948 key.as_bytes(),
949 ValueType::Inline,
950 val.as_bytes(),
951 )
952 .unwrap();
953 }
954 assert_eq!(tree.entry_count, count);
955
956 for i in (0..count).step_by(2) {
958 let key = format!("k{i:06}");
959 let found = tree
960 .delete(&mut pages, &mut alloc, TxnId(1), key.as_bytes())
961 .unwrap();
962 assert!(found);
963 }
964 assert_eq!(tree.entry_count, count / 2);
965
966 for i in 0..count {
968 let key = format!("k{i:06}");
969 let result = tree.search(&pages, key.as_bytes()).unwrap();
970 if i % 2 == 0 {
971 assert_eq!(result, None, "deleted key {key} should not be found");
972 } else {
973 let val = format!("v{i:06}");
974 assert_eq!(result, Some((ValueType::Inline, val.into_bytes())));
975 }
976 }
977 }
978
979 #[test]
980 fn deep_tree_insert_delete() {
981 let (mut pages, mut alloc, mut tree) = new_tree();
982
983 let count = 2000u64;
985 for i in 0..count {
986 let key = format!("{i:08}");
987 tree.insert(
988 &mut pages,
989 &mut alloc,
990 TxnId(1),
991 key.as_bytes(),
992 ValueType::Inline,
993 b"v",
994 )
995 .unwrap();
996 }
997 assert!(tree.depth >= 2, "depth={} expected >= 2", tree.depth);
998 assert_eq!(tree.entry_count, count);
999
1000 for i in 0..count {
1002 let key = format!("{i:08}");
1003 let found = tree
1004 .delete(&mut pages, &mut alloc, TxnId(1), key.as_bytes())
1005 .unwrap();
1006 assert!(found, "key {key} should be deletable");
1007 }
1008 assert_eq!(tree.entry_count, 0);
1009 }
1010}