1use super::*;
2
3impl BTree {
4 pub fn new(pager: Arc<Pager>) -> Self {
6 Self {
7 pager,
8 root_page_id: RwLock::new(0),
9 rightmost_leaf: RwLock::new(None),
10 }
11 }
12
13 pub fn with_root(pager: Arc<Pager>, root_page_id: u32) -> Self {
15 Self {
16 pager,
17 root_page_id: RwLock::new(root_page_id),
18 rightmost_leaf: RwLock::new(None),
19 }
20 }
21
22 pub fn root_page_id(&self) -> u32 {
24 *self
25 .root_page_id
26 .read()
27 .unwrap_or_else(|poisoned| poisoned.into_inner())
28 }
29
30 pub fn is_empty(&self) -> bool {
32 self.root_page_id() == 0
33 }
34
35 pub fn get(&self, key: &[u8]) -> BTreeResult<Option<Vec<u8>>> {
37 let root_id = self.root_page_id();
38 if root_id == 0 {
39 return Ok(None);
40 }
41
42 let (leaf_id, _) = self.find_leaf(root_id, key)?;
44
45 const MAX_RIGHTLINK_HOPS: usize = 32;
54 let mut current_id = leaf_id;
55 for _ in 0..MAX_RIGHTLINK_HOPS {
56 let page = self.pager.read_page(current_id)?;
57
58 match search_leaf(&page, key)? {
59 SearchResult::Found(pos) => {
60 let (_, value) = read_leaf_cell(&page, pos)?;
61 return Ok(Some(value));
62 }
63 SearchResult::NotFound(_) => {
64 let right = leaf_right_sibling(&page);
66 if right != 0 {
67 if let Some(high_key) = leaf_high_key(&page)? {
68 if key > high_key.as_slice() {
69 current_id = right;
71 continue;
72 }
73 }
74 }
75 return Ok(None);
77 }
78 }
79 }
80
81 let (leaf_id, _) = self.find_leaf(self.root_page_id(), key)?;
83 let page = self.pager.read_page(leaf_id)?;
84 match search_leaf(&page, key)? {
85 SearchResult::Found(pos) => {
86 let (_, value) = read_leaf_cell(&page, pos)?;
87 Ok(Some(value))
88 }
89 SearchResult::NotFound(_) => Ok(None),
90 }
91 }
92
93 pub fn insert(&self, key: &[u8], value: &[u8]) -> BTreeResult<()> {
95 if key.len() > MAX_KEY_SIZE {
97 return Err(BTreeError::KeyTooLarge(key.len()));
98 }
99 if value.len() > MAX_VALUE_SIZE {
100 return Err(BTreeError::ValueTooLarge(value.len()));
101 }
102
103 let root_id = self.root_page_id();
104
105 if root_id == 0 {
107 let mut page = self.pager.allocate_page(PageType::BTreeLeaf)?;
108 clear_leaf_cells(&mut page);
109 insert_into_leaf(&mut page, key, value)?;
110 init_leaf_links(&mut page, 0, 0);
111 page.update_checksum();
112 let new_root = page.page_id();
113 self.pager.write_page(new_root, page)?;
114 *self.root_page_id.write().map_err(|e| {
115 BTreeError::LockPoisoned(format!("insert: root_page_id write lock: {e}"))
116 })? = new_root;
117 return Ok(());
118 }
119
120 let cached = {
125 self.rightmost_leaf
126 .read()
127 .map_err(|e| BTreeError::LockPoisoned(format!("rightmost_leaf read: {e}")))?
128 .clone()
129 };
130 if let Some((cached_page_id, ref high_key)) = cached {
131 if key > high_key.as_slice() {
132 let mut page = self.pager.read_page(cached_page_id)?;
133 let right_sibling = leaf_right_sibling(&page);
137 if right_sibling == 0 {
138 if can_insert_leaf(&page, key, value) {
140 insert_into_leaf(&mut page, key, value)?;
141 let page_id = page.page_id();
145 *self.rightmost_leaf.write().map_err(|e| {
147 BTreeError::LockPoisoned(format!("rightmost_leaf write: {e}"))
148 })? = Some((page_id, key.to_vec()));
149 self.pager.write_page(page_id, page)?;
150 return Ok(());
151 }
152 let (new_leaf, separator_key) = self.split_leaf(&mut page, key, value)?;
155 let new_leaf_id = new_leaf.page_id();
156 page.update_checksum();
157 let page_id = page.page_id();
158 self.pager.write_page(page_id, page.clone())?;
159 *self.rightmost_leaf.write().map_err(|e| {
161 BTreeError::LockPoisoned(format!("rightmost_leaf write: {e}"))
162 })? = Some((new_leaf_id, key.to_vec()));
163 let (_, path) = self.find_leaf(root_id, &separator_key)?;
166 self.insert_into_parent(path, cached_page_id, &separator_key, new_leaf_id)?;
167 return Ok(());
168 }
169 *self.rightmost_leaf.write().map_err(|e| {
173 BTreeError::LockPoisoned(format!("rightmost_leaf write: {e}"))
174 })? = None;
175 }
176 }
177
178 let (leaf_id, path) = self.find_leaf(root_id, key)?;
180 let mut page = self.pager.read_page(leaf_id)?;
181
182 if let SearchResult::Found(_) = search_leaf(&page, key)? {
184 return Err(BTreeError::DuplicateKey);
185 }
186
187 if can_insert_leaf(&page, key, value) {
189 insert_into_leaf(&mut page, key, value)?;
190 let page_id = page.page_id();
192 if leaf_right_sibling(&page) == 0 {
194 *self.rightmost_leaf.write().map_err(|e| {
195 BTreeError::LockPoisoned(format!("rightmost_leaf write: {e}"))
196 })? = Some((page_id, key.to_vec()));
197 }
198 self.pager.write_page(page_id, page)?;
199 return Ok(());
200 }
201
202 let (new_leaf, separator_key) = self.split_leaf(&mut page, key, value)?;
204 let new_leaf_id = new_leaf.page_id();
205 page.update_checksum();
206 let page_id = page.page_id();
207 self.pager.write_page(page_id, page.clone())?;
208 *self
210 .rightmost_leaf
211 .write()
212 .map_err(|e| BTreeError::LockPoisoned(format!("rightmost_leaf write: {e}")))? =
213 Some((new_leaf_id, key.to_vec()));
214
215 self.insert_into_parent(path, page_id, &separator_key, new_leaf_id)?;
217
218 Ok(())
219 }
220
221 pub fn upsert(&self, key: &[u8], value: &[u8]) -> BTreeResult<()> {
233 let root_id = self.root_page_id();
234 if root_id == 0 {
235 return self.insert(key, value);
236 }
237
238 let (leaf_id, _path) = self.find_leaf(root_id, key)?;
239 let mut page = self.pager.read_page(leaf_id)?;
240
241 if let SearchResult::Found(pos) = search_leaf(&page, key)? {
242 let (_, old_value) = read_leaf_cell(&page, pos)?;
243 if value.len() <= old_value.len() {
244 overwrite_leaf_value(&mut page, pos, value)?;
250 let page_id = page.page_id();
251 self.pager.write_page(page_id, page)?;
252 return Ok(());
253 }
254 let _ = self.delete(key);
259 return self.insert(key, value);
260 }
261
262 self.insert(key, value)
264 }
265
266 pub fn upsert_batch_sorted(&self, items: &[(Vec<u8>, Vec<u8>)]) -> BTreeResult<()> {
277 if items.is_empty() {
278 return Ok(());
279 }
280 let root_id = self.root_page_id();
281 if root_id == 0 {
282 for (k, v) in items {
283 self.upsert(k, v)?;
284 }
285 return Ok(());
286 }
287
288 let mut current_leaf: Option<(u32, Page, bool)> = None; for (key, value) in items {
290 let (leaf_id, _path) = self.find_leaf(root_id, key)?;
293 if current_leaf.as_ref().map(|(id, _, _)| *id) != Some(leaf_id) {
294 if let Some((id, page, dirty)) = current_leaf.take() {
295 if dirty {
296 self.pager.write_page(id, page)?;
297 }
298 }
299 let page = self.pager.read_page(leaf_id)?;
300 current_leaf = Some((leaf_id, page, false));
301 }
302
303 let applied_in_place = {
304 let (_, page, dirty) = current_leaf.as_mut().expect("set above");
305 if let SearchResult::Found(pos) = search_leaf(page, key)? {
306 let (_, old_value) = read_leaf_cell(page, pos)?;
307 if value.len() <= old_value.len() {
308 overwrite_leaf_value(page, pos, value)?;
309 *dirty = true;
310 true
311 } else {
312 false
313 }
314 } else {
315 false
316 }
317 };
318 if !applied_in_place {
319 if let Some((id, page, dirty)) = current_leaf.take() {
320 if dirty {
321 self.pager.write_page(id, page)?;
322 }
323 }
324 self.upsert(key, value)?;
325 }
326 }
327
328 if let Some((id, page, dirty)) = current_leaf.take() {
329 if dirty {
330 self.pager.write_page(id, page)?;
331 }
332 }
333 Ok(())
334 }
335
336 pub fn bulk_insert_sorted(&self, items: &[(Vec<u8>, Vec<u8>)]) -> BTreeResult<()> {
345 if items.is_empty() {
346 return Ok(());
347 }
348 for (key, value) in items {
349 if key.len() > MAX_KEY_SIZE {
350 return Err(BTreeError::KeyTooLarge(key.len()));
351 }
352 if value.len() > MAX_VALUE_SIZE {
353 return Err(BTreeError::ValueTooLarge(value.len()));
354 }
355 }
356
357 let items: Vec<(&[u8], &[u8])> = items
367 .iter()
368 .map(|(k, v)| (k.as_slice(), v.as_slice()))
369 .collect();
370
371 let mut last_filled_leaf: Option<u32> = None;
379 let mut i = 0;
380 while i < items.len() {
381 let root_id = self.root_page_id();
382 if root_id == 0 {
383 let (k, v) = items[i];
385 self.insert(k, v)?;
386 i += 1;
387 last_filled_leaf = None;
388 continue;
389 }
390
391 let hint_leaf_id: Option<u32> = if let Some(prev) = last_filled_leaf {
397 let prev_page = self.pager.read_page(prev)?;
398 let sibling = leaf_right_sibling(&prev_page);
399 if sibling != 0 {
400 let sib_page = self.pager.read_page(sibling)?;
401 match leaf_high_key(&sib_page)? {
402 None => Some(sibling), Some(hk) if items[i].0 > hk.as_slice() => Some(sibling),
404 Some(_) => None,
405 }
406 } else {
407 None
408 }
409 } else {
410 None
411 };
412
413 let (leaf_id, _path) = match hint_leaf_id {
414 Some(id) => (id, Vec::new()),
415 None => self.find_leaf(root_id, items[i].0)?,
416 };
417 let mut page = self.pager.read_page(leaf_id)?;
418
419 let mut last_key_in_leaf: Option<Vec<u8>> = {
424 let cell_count = page.cell_count() as usize;
425 if cell_count == 0 {
426 None
427 } else {
428 Some(read_leaf_cell(&page, cell_count - 1)?.0)
429 }
430 };
431
432 let mut inserted = 0usize;
438 while i + inserted < items.len() {
439 let (key, value) = items[i + inserted];
440
441 if let Some(lk) = &last_key_in_leaf {
444 match key.cmp(lk.as_slice()) {
445 Ordering::Greater => {}
446 Ordering::Equal => return Err(BTreeError::DuplicateKey),
447 Ordering::Less => break,
448 }
449 }
450
451 let cell_size = 4 + key.len() + value.len();
456 let slot_end_after =
457 LEAF_SLOT_ARRAY_OFFSET + (page.cell_count() as usize + inserted + 1) * 2;
458 let cells_start = leaf_cells_start(&page);
459 if slot_end_after + cell_size > cells_start {
460 break;
461 }
462
463 let cell_offset = cells_start - cell_size;
468 {
469 let data = page.as_bytes_mut();
470 data[cell_offset..cell_offset + 2]
471 .copy_from_slice(&(key.len() as u16).to_le_bytes());
472 data[cell_offset + 2..cell_offset + 4]
473 .copy_from_slice(&(value.len() as u16).to_le_bytes());
474 data[cell_offset + 4..cell_offset + 4 + key.len()].copy_from_slice(key);
475 data[cell_offset + 4 + key.len()..cell_offset + cell_size]
476 .copy_from_slice(value);
477 }
478 page.set_free_end(cell_offset as u16);
479
480 let new_slot_index = page.cell_count() as usize + inserted;
481 let slot_pos = leaf_slot_offset_for(new_slot_index);
482 {
483 let data = page.as_bytes_mut();
484 data[slot_pos..slot_pos + 2]
485 .copy_from_slice(&(cell_offset as u16).to_le_bytes());
486 }
487
488 last_key_in_leaf = Some(key.to_vec());
489 inserted += 1;
490 }
491
492 if inserted > 0 {
493 let new_count = page.cell_count() as usize + inserted;
494 page.set_cell_count(new_count as u16);
495 page.set_free_start((LEAF_SLOT_ARRAY_OFFSET + new_count * 2) as u16);
496 page.update_checksum();
497 self.pager.write_page(leaf_id, page)?;
498 i += inserted;
499 last_filled_leaf = Some(leaf_id);
502 } else {
503 let (k, v) = items[i];
510 self.insert(k, v)?;
511 i += 1;
512 last_filled_leaf = None;
513 }
514 }
515
516 Ok(())
517 }
518
519 pub fn delete(&self, key: &[u8]) -> BTreeResult<bool> {
521 let root_id = self.root_page_id();
522 if root_id == 0 {
523 return Ok(false);
524 }
525
526 let (leaf_id, path) = self.find_leaf(root_id, key)?;
530 let mut current_id = leaf_id;
531 const MAX_RIGHTLINK_HOPS: usize = 32;
532 for _ in 0..MAX_RIGHTLINK_HOPS {
533 let mut page = self.pager.read_page(current_id)?;
534 match search_leaf(&page, key)? {
535 SearchResult::Found(pos) => {
536 delete_from_leaf(&mut page, pos)?;
538 page.update_checksum();
539 let page_id = page.page_id();
540 self.pager.write_page(page_id, page.clone())?;
541
542 *self
543 .rightmost_leaf
544 .write()
545 .unwrap_or_else(|e| e.into_inner()) = None;
546
547 if page.cell_count() == 0 && page_id == root_id {
548 self.pager.free_page(root_id)?;
549 *self.root_page_id.write().map_err(|e| {
550 BTreeError::LockPoisoned(format!(
551 "delete: root_page_id write lock: {e}"
552 ))
553 })? = 0;
554 } else {
555 self.rebalance_leaf(current_id, path)?;
556 }
557 return Ok(true);
558 }
559 SearchResult::NotFound(_) => {
560 let right = leaf_right_sibling(&page);
561 if right != 0 {
562 if let Some(high_key) = leaf_high_key(&page)? {
563 if key > high_key.as_slice() {
564 current_id = right;
565 continue;
566 }
567 }
568 }
569 return Ok(false);
570 }
571 }
572 }
573 Ok(false)
574 }
575
576 pub fn cursor_first(&self) -> BTreeResult<BTreeCursor> {
578 let root_id = self.root_page_id();
579 if root_id == 0 {
580 return Ok(BTreeCursor {
581 leaf_page_id: 0,
582 position: 0,
583 pager: self.pager.clone(),
584 prefetched_next: false,
585 });
586 }
587
588 let first_leaf = self.find_first_leaf(root_id)?;
590
591 Ok(BTreeCursor {
592 leaf_page_id: first_leaf,
593 position: 0,
594 pager: self.pager.clone(),
595 prefetched_next: false,
596 })
597 }
598
599 pub fn cursor_seek(&self, key: &[u8]) -> BTreeResult<BTreeCursor> {
601 let root_id = self.root_page_id();
602 if root_id == 0 {
603 return Ok(BTreeCursor {
604 leaf_page_id: 0,
605 position: 0,
606 pager: self.pager.clone(),
607 prefetched_next: false,
608 });
609 }
610
611 let (leaf_id, _) = self.find_leaf(root_id, key)?;
612 let page = self.pager.read_page(leaf_id)?;
613
614 let position = match search_leaf(&page, key)? {
615 SearchResult::Found(pos) => pos,
616 SearchResult::NotFound(pos) => pos,
617 };
618
619 Ok(BTreeCursor {
620 leaf_page_id: leaf_id,
621 position,
622 pager: self.pager.clone(),
623 prefetched_next: false,
624 })
625 }
626
627 pub fn range(&self, start_key: &[u8], end_key: &[u8]) -> BTreeResult<Vec<(Vec<u8>, Vec<u8>)>> {
629 let mut results = Vec::new();
630 let mut cursor = self.cursor_seek(start_key)?;
631
632 while let Some((key, value)) = cursor.next()? {
633 if key.as_slice() > end_key {
634 break;
635 }
636 results.push((key, value));
637 }
638
639 Ok(results)
640 }
641
642 pub fn count(&self) -> BTreeResult<usize> {
644 let root_id = self.root_page_id();
645 if root_id == 0 {
646 return Ok(0);
647 }
648
649 let mut count = 0;
650 let mut cursor = self.cursor_first()?;
651 while cursor.next()?.is_some() {
652 count += 1;
653 }
654
655 Ok(count)
656 }
657
658 fn find_leaf(&self, page_id: u32, key: &[u8]) -> BTreeResult<(u32, Vec<u32>)> {
662 let mut current_id = page_id;
663 let mut path = Vec::new();
664
665 loop {
666 let page = self.pager.read_page(current_id)?;
667
668 match page.page_type()? {
669 PageType::BTreeLeaf => {
670 return Ok((current_id, path));
671 }
672 PageType::BTreeInterior => {
673 path.push(current_id);
674 current_id = find_child(&page, key)?;
675 }
676 other => {
677 return Err(BTreeError::Corrupted(format!(
678 "Unexpected page type in B-tree: {:?}",
679 other
680 )));
681 }
682 }
683 }
684 }
685
686 pub(crate) fn find_first_leaf(&self, page_id: u32) -> BTreeResult<u32> {
688 let mut current_id = page_id;
689
690 loop {
691 let page = self.pager.read_page(current_id)?;
692
693 match page.page_type()? {
694 PageType::BTreeLeaf => return Ok(current_id),
695 PageType::BTreeInterior => {
696 current_id = find_first_child(&page)?;
698 }
699 _ => {
700 return Err(BTreeError::Corrupted("Invalid page type".into()));
701 }
702 }
703 }
704 }
705
706 fn split_leaf(
708 &self,
709 page: &mut Page,
710 new_key: &[u8],
711 new_value: &[u8],
712 ) -> BTreeResult<(Page, Vec<u8>)> {
713 let mut entries: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
715 let cell_count = page.cell_count() as usize;
716
717 for i in 0..cell_count {
718 entries.push(read_leaf_cell(page, i)?);
719 }
720
721 let insert_pos = entries.partition_point(|(k, _)| k.as_slice() < new_key);
723 entries.insert(insert_pos, (new_key.to_vec(), new_value.to_vec()));
724
725 let mid = entries.len() / 2;
727
728 let mut new_page = self.pager.allocate_page(PageType::BTreeLeaf)?;
730
731 let old_next = read_next_leaf(page);
733 init_leaf_links(&mut new_page, page.page_id(), old_next);
734 set_next_leaf(page, new_page.page_id());
735
736 write_leaf_entries(page, &entries[..mid])?;
739 write_leaf_entries(&mut new_page, &entries[mid..])?;
740
741 let separator = entries[mid].0.clone();
743
744 new_page.update_checksum();
745 let new_page_id = new_page.page_id();
746 self.pager.write_page(new_page_id, new_page.clone())?;
747
748 Ok((new_page, separator))
749 }
750
751 fn insert_into_parent(
753 &self,
754 mut path: Vec<u32>,
755 left_child: u32,
756 key: &[u8],
757 right_child: u32,
758 ) -> BTreeResult<()> {
759 if path.is_empty() {
761 let mut new_root = self.pager.allocate_page(PageType::BTreeInterior)?;
762 write_interior_entries(&mut new_root, &[key.to_vec()], &[left_child, right_child])?;
765
766 new_root.update_checksum();
767 let new_root_id = new_root.page_id();
768 self.pager.write_page(new_root_id, new_root)?;
769 *self.root_page_id.write().map_err(|e| {
770 BTreeError::LockPoisoned(format!(
771 "insert_into_parent: root_page_id write lock: {e}"
772 ))
773 })? = new_root_id;
774 return Ok(());
775 }
776
777 let parent_id = path.pop().ok_or_else(|| {
779 BTreeError::Corrupted("insert_into_parent: path unexpectedly empty".into())
780 })?;
781 let mut parent = self.pager.read_page(parent_id)?;
782
783 if can_insert_interior(&parent, key) {
785 insert_into_interior(&mut parent, key, left_child, right_child)?;
786 parent.update_checksum();
787 self.pager.write_page(parent_id, parent)?;
788 return Ok(());
789 }
790
791 let (new_interior, separator) =
793 self.split_interior(&mut parent, key, left_child, right_child)?;
794 parent.update_checksum();
795 self.pager.write_page(parent_id, parent.clone())?;
796
797 self.insert_into_parent(path, parent.page_id(), &separator, new_interior.page_id())
799 }
800
801 fn split_interior(
803 &self,
804 page: &mut Page,
805 new_key: &[u8],
806 left_child: u32,
807 right_child: u32,
808 ) -> BTreeResult<(Page, Vec<u8>)> {
809 let (mut keys, mut children) = read_interior_keys_children(page)?;
810 let key_insert_pos = keys.partition_point(|key| key.as_slice() < new_key);
811 let child_insert_pos = children
812 .iter()
813 .position(|child| *child == left_child)
814 .unwrap_or(key_insert_pos);
815
816 keys.insert(key_insert_pos, new_key.to_vec());
817 children.insert(child_insert_pos + 1, right_child);
818
819 let mid = keys.len() / 2;
822 let separator = keys[mid].clone();
823
824 let mut new_page = self.pager.allocate_page(PageType::BTreeInterior)?;
826
827 write_interior_entries(page, &keys[..mid], &children[..mid + 1])?;
828 write_interior_entries(&mut new_page, &keys[mid + 1..], &children[mid + 1..])?;
829
830 new_page.update_checksum();
831 let new_page_id = new_page.page_id();
832 self.pager.write_page(new_page_id, new_page.clone())?;
833
834 Ok((new_page, separator))
835 }
836
837 fn rebalance_leaf(&self, leaf_id: u32, path: Vec<u32>) -> BTreeResult<()> {
838 if path.is_empty() {
839 return Ok(());
840 }
841
842 let root_id = self.root_page_id();
843 if leaf_id == root_id {
844 return Ok(());
845 }
846
847 let mut leaf = self.pager.read_page(leaf_id)?;
848 let mut leaf_entries = read_leaf_entries(&leaf)?;
849 let min_bytes = leaf_min_bytes();
850
851 let parent_id = *path.last().ok_or_else(|| {
852 BTreeError::Corrupted("rebalance_leaf: path unexpectedly empty".into())
853 })?;
854 let mut parent = self.pager.read_page(parent_id)?;
855 let (mut parent_keys, mut parent_children) = read_interior_keys_children(&parent)?;
856
857 let child_index = parent_children
858 .iter()
859 .position(|&id| id == leaf_id)
860 .ok_or_else(|| BTreeError::Corrupted("Leaf missing from parent".into()))?;
861
862 if child_index > 0 {
863 if let Some((first_key, _)) = leaf_entries.first() {
864 if parent_keys.get(child_index - 1).map(|k| k.as_slice())
865 != Some(first_key.as_slice())
866 {
867 parent_keys[child_index - 1] = first_key.clone();
868 write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
869 parent.update_checksum();
870 self.pager.write_page(parent_id, parent.clone())?;
871 }
872 }
873 }
874
875 if leaf_entries_size(&leaf_entries) >= min_bytes {
876 return Ok(());
877 }
878
879 if child_index > 0 {
880 let left_id = parent_children[child_index - 1];
881 let mut left = self.pager.read_page(left_id)?;
882 let mut left_entries = read_leaf_entries(&left)?;
883 let mut borrowed = false;
884
885 while leaf_entries_size(&leaf_entries) < min_bytes {
886 let Some(entry) = left_entries.pop() else {
887 break;
888 };
889 if leaf_entries_size(&left_entries) < min_bytes {
890 left_entries.push(entry);
891 break;
892 }
893 leaf_entries.insert(0, entry);
894 borrowed = true;
895 }
896
897 if borrowed {
898 write_leaf_entries(&mut left, &left_entries)?;
899 left.update_checksum();
900 self.pager.write_page(left_id, left)?;
901
902 write_leaf_entries(&mut leaf, &leaf_entries)?;
903 leaf.update_checksum();
904 self.pager.write_page(leaf_id, leaf)?;
905
906 if let Some((first_key, _)) = leaf_entries.first() {
907 parent_keys[child_index - 1] = first_key.clone();
908 write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
909 parent.update_checksum();
910 self.pager.write_page(parent_id, parent)?;
911 }
912
913 return Ok(());
914 }
915 }
916
917 if child_index + 1 < parent_children.len() {
918 let right_id = parent_children[child_index + 1];
919 let mut right = self.pager.read_page(right_id)?;
920 let mut right_entries = read_leaf_entries(&right)?;
921 let mut borrowed = false;
922
923 while leaf_entries_size(&leaf_entries) < min_bytes {
924 if right_entries.is_empty() {
925 break;
926 }
927 let entry = right_entries.remove(0);
928 if leaf_entries_size(&right_entries) < min_bytes {
929 right_entries.insert(0, entry);
930 break;
931 }
932 leaf_entries.push(entry);
933 borrowed = true;
934 }
935
936 if borrowed {
937 write_leaf_entries(&mut right, &right_entries)?;
938 right.update_checksum();
939 self.pager.write_page(right_id, right)?;
940
941 write_leaf_entries(&mut leaf, &leaf_entries)?;
942 leaf.update_checksum();
943 self.pager.write_page(leaf_id, leaf)?;
944
945 if let Some((first_key, _)) = right_entries.first() {
946 parent_keys[child_index] = first_key.clone();
947 write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
948 parent.update_checksum();
949 self.pager.write_page(parent_id, parent)?;
950 }
951
952 return Ok(());
953 }
954 }
955
956 if child_index > 0 {
957 let left_id = parent_children[child_index - 1];
958 let mut left = self.pager.read_page(left_id)?;
959 let mut left_entries = read_leaf_entries(&left)?;
960
961 left_entries.extend(leaf_entries);
962 write_leaf_entries(&mut left, &left_entries)?;
963
964 let next_leaf = read_next_leaf(&leaf);
965 set_next_leaf(&mut left, next_leaf);
966 if next_leaf != 0 {
967 let mut next = self.pager.read_page(next_leaf)?;
968 set_prev_leaf(&mut next, left_id);
969 next.update_checksum();
970 self.pager.write_page(next_leaf, next)?;
971 }
972
973 left.update_checksum();
974 self.pager.write_page(left_id, left)?;
975 self.pager.free_page(leaf_id)?;
976
977 parent_keys.remove(child_index - 1);
978 parent_children.remove(child_index);
979 write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
980 parent.update_checksum();
981 self.pager.write_page(parent_id, parent)?;
982
983 let mut parent_path = path;
984 parent_path.pop();
985 return self.rebalance_interior(parent_id, parent_path);
986 }
987
988 if child_index + 1 < parent_children.len() {
989 let right_id = parent_children[child_index + 1];
990 let right = self.pager.read_page(right_id)?;
991 let right_entries = read_leaf_entries(&right)?;
992
993 leaf_entries.extend(right_entries);
994 write_leaf_entries(&mut leaf, &leaf_entries)?;
995
996 let next_leaf = read_next_leaf(&right);
997 set_next_leaf(&mut leaf, next_leaf);
998 if next_leaf != 0 {
999 let mut next = self.pager.read_page(next_leaf)?;
1000 set_prev_leaf(&mut next, leaf_id);
1001 next.update_checksum();
1002 self.pager.write_page(next_leaf, next)?;
1003 }
1004
1005 leaf.update_checksum();
1006 self.pager.write_page(leaf_id, leaf)?;
1007 self.pager.free_page(right_id)?;
1008
1009 parent_keys.remove(child_index);
1010 parent_children.remove(child_index + 1);
1011 write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
1012 parent.update_checksum();
1013 self.pager.write_page(parent_id, parent)?;
1014
1015 let mut parent_path = path;
1016 parent_path.pop();
1017 return self.rebalance_interior(parent_id, parent_path);
1018 }
1019
1020 Ok(())
1021 }
1022
1023 fn rebalance_interior(&self, node_id: u32, mut path: Vec<u32>) -> BTreeResult<()> {
1024 let root_id = self.root_page_id();
1025 let mut node = self.pager.read_page(node_id)?;
1026 let (mut node_keys, mut node_children) = read_interior_keys_children(&node)?;
1027 let min_bytes = interior_min_bytes();
1028
1029 if node_id == root_id {
1030 if node_keys.is_empty() {
1031 let next_root = node_children.first().copied().unwrap_or(0);
1032 self.pager.free_page(node_id)?;
1033 *self.root_page_id.write().map_err(|e| {
1034 BTreeError::LockPoisoned(format!(
1035 "rebalance_interior: root_page_id write lock: {e}"
1036 ))
1037 })? = next_root;
1038 }
1039 return Ok(());
1040 }
1041
1042 if interior_entries_size(&node_keys) >= min_bytes {
1043 return Ok(());
1044 }
1045
1046 let parent_id = match path.pop() {
1047 Some(id) => id,
1048 None => return Ok(()),
1049 };
1050 let mut parent = self.pager.read_page(parent_id)?;
1051 let (mut parent_keys, mut parent_children) = read_interior_keys_children(&parent)?;
1052
1053 let child_index = parent_children
1054 .iter()
1055 .position(|&id| id == node_id)
1056 .ok_or_else(|| BTreeError::Corrupted("Interior missing from parent".into()))?;
1057
1058 if child_index > 0 {
1059 let left_id = parent_children[child_index - 1];
1060 let mut left = self.pager.read_page(left_id)?;
1061 let (mut left_keys, mut left_children) = read_interior_keys_children(&left)?;
1062
1063 if let Some(borrow_key) = left_keys.last().cloned() {
1064 let borrow_size = interior_key_size(&borrow_key);
1065 if interior_entries_size(&left_keys).saturating_sub(borrow_size) >= min_bytes {
1066 let parent_key = parent_keys[child_index - 1].clone();
1067 let borrowed_key = left_keys.pop().ok_or_else(|| {
1068 BTreeError::Corrupted(
1069 "rebalance_interior: left_keys empty after check".into(),
1070 )
1071 })?;
1072 let borrowed_child = left_children.pop().ok_or_else(|| {
1073 BTreeError::Corrupted("rebalance_interior: left_children empty".into())
1074 })?;
1075
1076 node_keys.insert(0, parent_key);
1077 node_children.insert(0, borrowed_child);
1078 parent_keys[child_index - 1] = borrowed_key;
1079
1080 write_interior_entries(&mut left, &left_keys, &left_children)?;
1081 left.update_checksum();
1082 self.pager.write_page(left_id, left)?;
1083
1084 write_interior_entries(&mut node, &node_keys, &node_children)?;
1085 node.update_checksum();
1086 self.pager.write_page(node_id, node)?;
1087
1088 write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
1089 parent.update_checksum();
1090 self.pager.write_page(parent_id, parent)?;
1091
1092 return Ok(());
1093 }
1094 }
1095 }
1096
1097 if child_index + 1 < parent_children.len() {
1098 let right_id = parent_children[child_index + 1];
1099 let mut right = self.pager.read_page(right_id)?;
1100 let (mut right_keys, mut right_children) = read_interior_keys_children(&right)?;
1101
1102 if let Some(borrow_key) = right_keys.first().cloned() {
1103 let borrow_size = interior_key_size(&borrow_key);
1104 if interior_entries_size(&right_keys).saturating_sub(borrow_size) >= min_bytes {
1105 let parent_key = parent_keys[child_index].clone();
1106 let new_parent_key = right_keys.remove(0);
1107 let borrowed_child = right_children.remove(0);
1108
1109 node_keys.push(parent_key);
1110 node_children.push(borrowed_child);
1111 parent_keys[child_index] = new_parent_key;
1112
1113 write_interior_entries(&mut right, &right_keys, &right_children)?;
1114 right.update_checksum();
1115 self.pager.write_page(right_id, right)?;
1116
1117 write_interior_entries(&mut node, &node_keys, &node_children)?;
1118 node.update_checksum();
1119 self.pager.write_page(node_id, node)?;
1120
1121 write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
1122 parent.update_checksum();
1123 self.pager.write_page(parent_id, parent)?;
1124
1125 return Ok(());
1126 }
1127 }
1128 }
1129
1130 if child_index > 0 {
1131 let left_id = parent_children[child_index - 1];
1132 let mut left = self.pager.read_page(left_id)?;
1133 let (mut left_keys, mut left_children) = read_interior_keys_children(&left)?;
1134 let parent_key = parent_keys.remove(child_index - 1);
1135 parent_children.remove(child_index);
1136
1137 left_keys.push(parent_key);
1138 left_keys.extend(node_keys);
1139 left_children.extend(node_children);
1140
1141 write_interior_entries(&mut left, &left_keys, &left_children)?;
1142 left.update_checksum();
1143 self.pager.write_page(left_id, left)?;
1144 self.pager.free_page(node_id)?;
1145
1146 write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
1147 parent.update_checksum();
1148 self.pager.write_page(parent_id, parent)?;
1149
1150 return self.rebalance_interior(parent_id, path);
1151 }
1152
1153 if child_index + 1 < parent_children.len() {
1154 let right_id = parent_children[child_index + 1];
1155 let right = self.pager.read_page(right_id)?;
1156 let (right_keys, right_children) = read_interior_keys_children(&right)?;
1157 let parent_key = parent_keys.remove(child_index);
1158 parent_children.remove(child_index + 1);
1159
1160 node_keys.push(parent_key);
1161 node_keys.extend(right_keys);
1162 node_children.extend(right_children);
1163
1164 write_interior_entries(&mut node, &node_keys, &node_children)?;
1165 node.update_checksum();
1166 self.pager.write_page(node_id, node)?;
1167 self.pager.free_page(right_id)?;
1168
1169 write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
1170 parent.update_checksum();
1171 self.pager.write_page(parent_id, parent)?;
1172
1173 return self.rebalance_interior(parent_id, path);
1174 }
1175
1176 Ok(())
1177 }
1178}