1use super::*;
2use crate::storage::engine::overflow::OverflowChain;
3
4impl BTree {
5 fn free_chain_if_pointer(&self, stored: &[u8]) -> BTreeResult<()> {
17 if let Some(head) = value_layout::pointer_head(stored) {
18 OverflowChain::new(&self.pager)
19 .free(head)
20 .map_err(value_layout::ValueLayoutError::from)?;
21 }
22 Ok(())
23 }
24 pub fn new(pager: Arc<Pager>) -> Self {
26 Self {
27 pager,
28 root_page_id: RwLock::new(0),
29 rightmost_leaf: RwLock::new(None),
30 }
31 }
32
33 pub fn with_root(pager: Arc<Pager>, root_page_id: u32) -> Self {
35 Self {
36 pager,
37 root_page_id: RwLock::new(root_page_id),
38 rightmost_leaf: RwLock::new(None),
39 }
40 }
41
42 pub fn root_page_id(&self) -> u32 {
44 *self
45 .root_page_id
46 .read()
47 .unwrap_or_else(|poisoned| poisoned.into_inner())
48 }
49
50 pub fn is_empty(&self) -> bool {
52 self.root_page_id() == 0
53 }
54
55 pub fn get(&self, key: &[u8]) -> BTreeResult<Option<Vec<u8>>> {
57 let root_id = self.root_page_id();
58 if root_id == 0 {
59 return Ok(None);
60 }
61
62 let (leaf_id, _) = self.find_leaf(root_id, key)?;
64
65 const MAX_RIGHTLINK_HOPS: usize = 32;
74 let mut current_id = leaf_id;
75 for _ in 0..MAX_RIGHTLINK_HOPS {
76 let page = self.pager.read_page(current_id)?;
77
78 match search_leaf(&page, key)? {
79 SearchResult::Found(pos) => {
80 let (_, stored) = read_leaf_cell(&page, pos)?;
81 let value = value_layout::decode(&self.pager, &stored)?;
82 return Ok(Some(value));
83 }
84 SearchResult::NotFound(_) => {
85 let right = leaf_right_sibling(&page);
87 if right != 0 {
88 if let Some(high_key) = leaf_high_key(&page)? {
89 if key > high_key.as_slice() {
90 current_id = right;
92 continue;
93 }
94 }
95 }
96 return Ok(None);
98 }
99 }
100 }
101
102 let (leaf_id, _) = self.find_leaf(self.root_page_id(), key)?;
104 let page = self.pager.read_page(leaf_id)?;
105 match search_leaf(&page, key)? {
106 SearchResult::Found(pos) => {
107 let (_, stored) = read_leaf_cell(&page, pos)?;
108 let value = value_layout::decode(&self.pager, &stored)?;
109 Ok(Some(value))
110 }
111 SearchResult::NotFound(_) => Ok(None),
112 }
113 }
114
115 pub fn insert(&self, key: &[u8], value: &[u8]) -> BTreeResult<()> {
123 if key.len() > MAX_KEY_SIZE {
124 return Err(BTreeError::KeyTooLarge(key.len()));
125 }
126 let stored = value_layout::encode(&self.pager, value)?;
127 self.insert_encoded(key, &stored)
128 }
129
130 pub(crate) fn insert_encoded(&self, key: &[u8], value: &[u8]) -> BTreeResult<()> {
138 let root_id = self.root_page_id();
139
140 if root_id == 0 {
142 let mut page = self.pager.allocate_page(PageType::BTreeLeaf)?;
143 clear_leaf_cells(&mut page);
144 insert_into_leaf(&mut page, key, value)?;
145 init_leaf_links(&mut page, 0, 0);
146 page.update_checksum();
147 let new_root = page.page_id();
148 self.pager.write_page(new_root, page)?;
149 *self.root_page_id.write().map_err(|e| {
150 BTreeError::LockPoisoned(format!("insert: root_page_id write lock: {e}"))
151 })? = new_root;
152 return Ok(());
153 }
154
155 let cached = {
160 self.rightmost_leaf
161 .read()
162 .map_err(|e| BTreeError::LockPoisoned(format!("rightmost_leaf read: {e}")))?
163 .clone()
164 };
165 if let Some((cached_page_id, ref high_key)) = cached {
166 if key > high_key.as_slice() {
167 let mut page = self.pager.read_page(cached_page_id)?;
168 let right_sibling = leaf_right_sibling(&page);
172 if right_sibling == 0 {
173 if can_insert_leaf(&page, key, value) {
175 insert_into_leaf(&mut page, key, value)?;
176 let page_id = page.page_id();
180 *self.rightmost_leaf.write().map_err(|e| {
182 BTreeError::LockPoisoned(format!("rightmost_leaf write: {e}"))
183 })? = Some((page_id, key.to_vec()));
184 self.pager.write_page(page_id, page)?;
185 return Ok(());
186 }
187 let (new_leaf, separator_key) = self.split_leaf(&mut page, key, value)?;
190 let new_leaf_id = new_leaf.page_id();
191 page.update_checksum();
192 let page_id = page.page_id();
193 self.pager.write_page(page_id, page.clone())?;
194 *self.rightmost_leaf.write().map_err(|e| {
196 BTreeError::LockPoisoned(format!("rightmost_leaf write: {e}"))
197 })? = Some((new_leaf_id, key.to_vec()));
198 let (_, path) = self.find_leaf(root_id, &separator_key)?;
201 self.insert_into_parent(path, cached_page_id, &separator_key, new_leaf_id)?;
202 return Ok(());
203 }
204 *self.rightmost_leaf.write().map_err(|e| {
208 BTreeError::LockPoisoned(format!("rightmost_leaf write: {e}"))
209 })? = None;
210 }
211 }
212
213 let (leaf_id, path) = self.find_leaf(root_id, key)?;
215 let mut page = self.pager.read_page(leaf_id)?;
216
217 if let SearchResult::Found(_) = search_leaf(&page, key)? {
219 return Err(BTreeError::DuplicateKey);
220 }
221
222 if can_insert_leaf(&page, key, value) {
224 insert_into_leaf(&mut page, key, value)?;
225 let page_id = page.page_id();
227 if leaf_right_sibling(&page) == 0 {
229 *self.rightmost_leaf.write().map_err(|e| {
230 BTreeError::LockPoisoned(format!("rightmost_leaf write: {e}"))
231 })? = Some((page_id, key.to_vec()));
232 }
233 self.pager.write_page(page_id, page)?;
234 return Ok(());
235 }
236
237 let (new_leaf, separator_key) = self.split_leaf(&mut page, key, value)?;
239 let new_leaf_id = new_leaf.page_id();
240 page.update_checksum();
241 let page_id = page.page_id();
242 self.pager.write_page(page_id, page.clone())?;
243 *self
245 .rightmost_leaf
246 .write()
247 .map_err(|e| BTreeError::LockPoisoned(format!("rightmost_leaf write: {e}")))? =
248 Some((new_leaf_id, key.to_vec()));
249
250 self.insert_into_parent(path, page_id, &separator_key, new_leaf_id)?;
252
253 Ok(())
254 }
255
256 pub fn upsert(&self, key: &[u8], value: &[u8]) -> BTreeResult<()> {
268 let root_id = self.root_page_id();
269 if root_id == 0 {
270 return self.insert(key, value);
271 }
272
273 let new_stored = value_layout::encode(&self.pager, value)?;
278
279 let (leaf_id, _path) = self.find_leaf(root_id, key)?;
280 let mut page = self.pager.read_page(leaf_id)?;
281
282 if let SearchResult::Found(pos) = search_leaf(&page, key)? {
283 let (_, old_value) = read_leaf_cell(&page, pos)?;
284 if new_stored.len() <= old_value.len() {
285 self.free_chain_if_pointer(&old_value)?;
290 overwrite_leaf_value(&mut page, pos, &new_stored)?;
291 let page_id = page.page_id();
292 self.pager.write_page(page_id, page)?;
293 return Ok(());
294 }
295 let _ = self.delete(key);
298 return self.insert_encoded(key, &new_stored);
299 }
300
301 self.insert_encoded(key, &new_stored)
304 }
305
306 pub fn upsert_batch_sorted(&self, items: &[(Vec<u8>, Vec<u8>)]) -> BTreeResult<()> {
317 if items.is_empty() {
318 return Ok(());
319 }
320 let root_id = self.root_page_id();
321 if root_id == 0 {
322 for (k, v) in items {
323 self.upsert(k, v)?;
324 }
325 return Ok(());
326 }
327
328 let mut current_leaf: Option<(u32, Page, bool)> = None; for (key, value) in items {
330 let (leaf_id, _path) = self.find_leaf(root_id, key)?;
333 if current_leaf.as_ref().map(|(id, _, _)| *id) != Some(leaf_id) {
334 if let Some((id, page, dirty)) = current_leaf.take() {
335 if dirty {
336 self.pager.write_page(id, page)?;
337 }
338 }
339 let page = self.pager.read_page(leaf_id)?;
340 current_leaf = Some((leaf_id, page, false));
341 }
342
343 let applied_in_place = {
349 let (_, page, dirty) = current_leaf.as_mut().expect("set above");
350 if let SearchResult::Found(pos) = search_leaf(page, key)? {
351 let (_, old_value) = read_leaf_cell(page, pos)?;
352 match value_layout::encode(&self.pager, value) {
353 Ok(new_stored) if new_stored.len() <= old_value.len() => {
354 self.free_chain_if_pointer(&old_value)?;
358 overwrite_leaf_value(page, pos, &new_stored)?;
359 *dirty = true;
360 true
361 }
362 _ => false,
363 }
364 } else {
365 false
366 }
367 };
368 if !applied_in_place {
369 if let Some((id, page, dirty)) = current_leaf.take() {
370 if dirty {
371 self.pager.write_page(id, page)?;
372 }
373 }
374 self.upsert(key, value)?;
375 }
376 }
377
378 if let Some((id, page, dirty)) = current_leaf.take() {
379 if dirty {
380 self.pager.write_page(id, page)?;
381 }
382 }
383 Ok(())
384 }
385
386 pub fn bulk_insert_sorted(&self, items: &[(Vec<u8>, Vec<u8>)]) -> BTreeResult<()> {
395 if items.is_empty() {
396 return Ok(());
397 }
398 for (key, _) in items {
404 if key.len() > MAX_KEY_SIZE {
405 return Err(BTreeError::KeyTooLarge(key.len()));
406 }
407 }
408
409 let mut encoded: Vec<(&[u8], Vec<u8>)> = Vec::with_capacity(items.len());
416 let mut first_oversize: Option<usize> = None;
417 for (key, value) in items {
418 match value_layout::encode(&self.pager, value) {
419 Ok(stored) => encoded.push((key.as_slice(), stored)),
420 Err(value_layout::ValueLayoutError::ValueTooLarge(n)) => {
421 if first_oversize.is_none() {
422 first_oversize = Some(n);
423 }
424 }
425 Err(other) => return Err(other.into()),
426 }
427 }
428
429 let items: Vec<(&[u8], &[u8])> = encoded.iter().map(|(k, v)| (*k, v.as_slice())).collect();
430
431 let mut last_filled_leaf: Option<u32> = None;
439 let mut i = 0;
440 while i < items.len() {
441 let root_id = self.root_page_id();
442 if root_id == 0 {
443 let (k, v) = items[i];
447 self.insert_encoded(k, v)?;
448 i += 1;
449 last_filled_leaf = None;
450 continue;
451 }
452
453 let hint_leaf_id: Option<u32> = if let Some(prev) = last_filled_leaf {
459 let prev_page = self.pager.read_page(prev)?;
460 let sibling = leaf_right_sibling(&prev_page);
461 if sibling != 0 {
462 let sib_page = self.pager.read_page(sibling)?;
463 match leaf_high_key(&sib_page)? {
464 None => Some(sibling), Some(hk) if items[i].0 > hk.as_slice() => Some(sibling),
466 Some(_) => None,
467 }
468 } else {
469 None
470 }
471 } else {
472 None
473 };
474
475 let (leaf_id, _path) = match hint_leaf_id {
476 Some(id) => (id, Vec::new()),
477 None => self.find_leaf(root_id, items[i].0)?,
478 };
479 let mut page = self.pager.read_page(leaf_id)?;
480
481 let mut last_key_in_leaf: Option<Vec<u8>> = {
486 let cell_count = page.cell_count() as usize;
487 if cell_count == 0 {
488 None
489 } else {
490 Some(read_leaf_cell(&page, cell_count - 1)?.0)
491 }
492 };
493
494 let mut inserted = 0usize;
500 while i + inserted < items.len() {
501 let (key, value) = items[i + inserted];
502
503 if let Some(lk) = &last_key_in_leaf {
506 match key.cmp(lk.as_slice()) {
507 Ordering::Greater => {}
508 Ordering::Equal => return Err(BTreeError::DuplicateKey),
509 Ordering::Less => break,
510 }
511 }
512
513 let cell_size = 4 + key.len() + value.len();
518 let slot_end_after =
519 LEAF_SLOT_ARRAY_OFFSET + (page.cell_count() as usize + inserted + 1) * 2;
520 let cells_start = leaf_cells_start(&page);
521 if slot_end_after + cell_size > cells_start {
522 break;
523 }
524
525 let cell_offset = cells_start - cell_size;
530 {
531 let data = page.as_bytes_mut();
532 data[cell_offset..cell_offset + 2]
533 .copy_from_slice(&(key.len() as u16).to_le_bytes());
534 data[cell_offset + 2..cell_offset + 4]
535 .copy_from_slice(&(value.len() as u16).to_le_bytes());
536 data[cell_offset + 4..cell_offset + 4 + key.len()].copy_from_slice(key);
537 data[cell_offset + 4 + key.len()..cell_offset + cell_size]
538 .copy_from_slice(value);
539 }
540 page.set_free_end(cell_offset as u16);
541
542 let new_slot_index = page.cell_count() as usize + inserted;
543 let slot_pos = leaf_slot_offset_for(new_slot_index);
544 {
545 let data = page.as_bytes_mut();
546 data[slot_pos..slot_pos + 2]
547 .copy_from_slice(&(cell_offset as u16).to_le_bytes());
548 }
549
550 last_key_in_leaf = Some(key.to_vec());
551 inserted += 1;
552 }
553
554 if inserted > 0 {
555 let new_count = page.cell_count() as usize + inserted;
556 page.set_cell_count(new_count as u16);
557 page.set_free_start((LEAF_SLOT_ARRAY_OFFSET + new_count * 2) as u16);
558 page.update_checksum();
559 self.pager.write_page(leaf_id, page)?;
560 i += inserted;
561 last_filled_leaf = Some(leaf_id);
564 } else {
565 let (k, v) = items[i];
572 self.insert_encoded(k, v)?;
573 i += 1;
574 last_filled_leaf = None;
575 }
576 }
577
578 if let Some(n) = first_oversize {
583 return Err(BTreeError::ValueTooLarge(n));
584 }
585
586 Ok(())
587 }
588
589 pub fn delete(&self, key: &[u8]) -> BTreeResult<bool> {
591 let root_id = self.root_page_id();
592 if root_id == 0 {
593 return Ok(false);
594 }
595
596 let (leaf_id, path) = self.find_leaf(root_id, key)?;
600 let mut current_id = leaf_id;
601 const MAX_RIGHTLINK_HOPS: usize = 32;
602 for _ in 0..MAX_RIGHTLINK_HOPS {
603 let mut page = self.pager.read_page(current_id)?;
604 match search_leaf(&page, key)? {
605 SearchResult::Found(pos) => {
606 let (_, stored) = read_leaf_cell(&page, pos)?;
611 self.free_chain_if_pointer(&stored)?;
612 delete_from_leaf(&mut page, pos)?;
613 page.update_checksum();
614 let page_id = page.page_id();
615 self.pager.write_page(page_id, page.clone())?;
616
617 *self
618 .rightmost_leaf
619 .write()
620 .unwrap_or_else(|e| e.into_inner()) = None;
621
622 if page.cell_count() == 0 && page_id == root_id {
623 self.pager.free_page(root_id)?;
624 *self.root_page_id.write().map_err(|e| {
625 BTreeError::LockPoisoned(format!(
626 "delete: root_page_id write lock: {e}"
627 ))
628 })? = 0;
629 } else {
630 self.rebalance_leaf(current_id, path)?;
631 }
632 return Ok(true);
633 }
634 SearchResult::NotFound(_) => {
635 let right = leaf_right_sibling(&page);
636 if right != 0 {
637 if let Some(high_key) = leaf_high_key(&page)? {
638 if key > high_key.as_slice() {
639 current_id = right;
640 continue;
641 }
642 }
643 }
644 return Ok(false);
645 }
646 }
647 }
648 Ok(false)
649 }
650
651 pub fn cursor_first(&self) -> BTreeResult<BTreeCursor> {
653 let root_id = self.root_page_id();
654 if root_id == 0 {
655 return Ok(BTreeCursor {
656 leaf_page_id: 0,
657 position: 0,
658 pager: self.pager.clone(),
659 prefetched_next: false,
660 });
661 }
662
663 let first_leaf = self.find_first_leaf(root_id)?;
665
666 Ok(BTreeCursor {
667 leaf_page_id: first_leaf,
668 position: 0,
669 pager: self.pager.clone(),
670 prefetched_next: false,
671 })
672 }
673
674 pub fn cursor_seek(&self, key: &[u8]) -> BTreeResult<BTreeCursor> {
676 let root_id = self.root_page_id();
677 if root_id == 0 {
678 return Ok(BTreeCursor {
679 leaf_page_id: 0,
680 position: 0,
681 pager: self.pager.clone(),
682 prefetched_next: false,
683 });
684 }
685
686 let (leaf_id, _) = self.find_leaf(root_id, key)?;
687 let page = self.pager.read_page(leaf_id)?;
688
689 let position = match search_leaf(&page, key)? {
690 SearchResult::Found(pos) => pos,
691 SearchResult::NotFound(pos) => pos,
692 };
693
694 Ok(BTreeCursor {
695 leaf_page_id: leaf_id,
696 position,
697 pager: self.pager.clone(),
698 prefetched_next: false,
699 })
700 }
701
702 pub fn range(&self, start_key: &[u8], end_key: &[u8]) -> BTreeResult<Vec<(Vec<u8>, Vec<u8>)>> {
704 let mut results = Vec::new();
705 let mut cursor = self.cursor_seek(start_key)?;
706
707 while let Some((key, value)) = cursor.next()? {
708 if key.as_slice() > end_key {
709 break;
710 }
711 results.push((key, value));
712 }
713
714 Ok(results)
715 }
716
717 pub fn count(&self) -> BTreeResult<usize> {
719 let root_id = self.root_page_id();
720 if root_id == 0 {
721 return Ok(0);
722 }
723
724 let mut count = 0;
725 let mut cursor = self.cursor_first()?;
726 while cursor.next()?.is_some() {
727 count += 1;
728 }
729
730 Ok(count)
731 }
732
733 fn find_leaf(&self, page_id: u32, key: &[u8]) -> BTreeResult<(u32, Vec<u32>)> {
737 let mut current_id = page_id;
738 let mut path = Vec::new();
739
740 loop {
741 let page = self.pager.read_page(current_id)?;
742
743 match page.page_type()? {
744 PageType::BTreeLeaf => {
745 return Ok((current_id, path));
746 }
747 PageType::BTreeInterior => {
748 path.push(current_id);
749 current_id = find_child(&page, key)?;
750 }
751 other => {
752 return Err(BTreeError::Corrupted(format!(
753 "Unexpected page type in B-tree: {:?}",
754 other
755 )));
756 }
757 }
758 }
759 }
760
761 pub(crate) fn find_first_leaf(&self, page_id: u32) -> BTreeResult<u32> {
763 let mut current_id = page_id;
764
765 loop {
766 let page = self.pager.read_page(current_id)?;
767
768 match page.page_type()? {
769 PageType::BTreeLeaf => return Ok(current_id),
770 PageType::BTreeInterior => {
771 current_id = find_first_child(&page)?;
773 }
774 _ => {
775 return Err(BTreeError::Corrupted("Invalid page type".into()));
776 }
777 }
778 }
779 }
780
781 fn split_leaf(
783 &self,
784 page: &mut Page,
785 new_key: &[u8],
786 new_value: &[u8],
787 ) -> BTreeResult<(Page, Vec<u8>)> {
788 let mut entries: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
790 let cell_count = page.cell_count() as usize;
791
792 for i in 0..cell_count {
793 entries.push(read_leaf_cell(page, i)?);
794 }
795
796 let insert_pos = entries.partition_point(|(k, _)| k.as_slice() < new_key);
798 entries.insert(insert_pos, (new_key.to_vec(), new_value.to_vec()));
799
800 let mid = entries.len() / 2;
802
803 let mut new_page = self.pager.allocate_page(PageType::BTreeLeaf)?;
805
806 let old_next = read_next_leaf(page);
808 init_leaf_links(&mut new_page, page.page_id(), old_next);
809 set_next_leaf(page, new_page.page_id());
810
811 write_leaf_entries(page, &entries[..mid])?;
814 write_leaf_entries(&mut new_page, &entries[mid..])?;
815
816 let separator = entries[mid].0.clone();
818
819 new_page.update_checksum();
820 let new_page_id = new_page.page_id();
821 self.pager.write_page(new_page_id, new_page.clone())?;
822
823 Ok((new_page, separator))
824 }
825
826 fn insert_into_parent(
828 &self,
829 mut path: Vec<u32>,
830 left_child: u32,
831 key: &[u8],
832 right_child: u32,
833 ) -> BTreeResult<()> {
834 if path.is_empty() {
836 let mut new_root = self.pager.allocate_page(PageType::BTreeInterior)?;
837 write_interior_entries(&mut new_root, &[key.to_vec()], &[left_child, right_child])?;
840
841 new_root.update_checksum();
842 let new_root_id = new_root.page_id();
843 self.pager.write_page(new_root_id, new_root)?;
844 *self.root_page_id.write().map_err(|e| {
845 BTreeError::LockPoisoned(format!(
846 "insert_into_parent: root_page_id write lock: {e}"
847 ))
848 })? = new_root_id;
849 return Ok(());
850 }
851
852 let parent_id = path.pop().ok_or_else(|| {
854 BTreeError::Corrupted("insert_into_parent: path unexpectedly empty".into())
855 })?;
856 let mut parent = self.pager.read_page(parent_id)?;
857
858 if can_insert_interior(&parent, key) {
860 insert_into_interior(&mut parent, key, left_child, right_child)?;
861 parent.update_checksum();
862 self.pager.write_page(parent_id, parent)?;
863 return Ok(());
864 }
865
866 let (new_interior, separator) =
868 self.split_interior(&mut parent, key, left_child, right_child)?;
869 parent.update_checksum();
870 self.pager.write_page(parent_id, parent.clone())?;
871
872 self.insert_into_parent(path, parent.page_id(), &separator, new_interior.page_id())
874 }
875
876 fn split_interior(
878 &self,
879 page: &mut Page,
880 new_key: &[u8],
881 left_child: u32,
882 right_child: u32,
883 ) -> BTreeResult<(Page, Vec<u8>)> {
884 let (mut keys, mut children) = read_interior_keys_children(page)?;
885 let key_insert_pos = keys.partition_point(|key| key.as_slice() < new_key);
886 let child_insert_pos = children
887 .iter()
888 .position(|child| *child == left_child)
889 .unwrap_or(key_insert_pos);
890
891 keys.insert(key_insert_pos, new_key.to_vec());
892 children.insert(child_insert_pos + 1, right_child);
893
894 let mid = keys.len() / 2;
897 let separator = keys[mid].clone();
898
899 let mut new_page = self.pager.allocate_page(PageType::BTreeInterior)?;
901
902 write_interior_entries(page, &keys[..mid], &children[..mid + 1])?;
903 write_interior_entries(&mut new_page, &keys[mid + 1..], &children[mid + 1..])?;
904
905 new_page.update_checksum();
906 let new_page_id = new_page.page_id();
907 self.pager.write_page(new_page_id, new_page.clone())?;
908
909 Ok((new_page, separator))
910 }
911
912 fn rebalance_leaf(&self, leaf_id: u32, path: Vec<u32>) -> BTreeResult<()> {
913 if path.is_empty() {
914 return Ok(());
915 }
916
917 let root_id = self.root_page_id();
918 if leaf_id == root_id {
919 return Ok(());
920 }
921
922 let mut leaf = self.pager.read_page(leaf_id)?;
923 let mut leaf_entries = read_leaf_entries(&leaf)?;
924 let min_bytes = leaf_min_bytes();
925
926 let parent_id = *path.last().ok_or_else(|| {
927 BTreeError::Corrupted("rebalance_leaf: path unexpectedly empty".into())
928 })?;
929 let mut parent = self.pager.read_page(parent_id)?;
930 let (mut parent_keys, mut parent_children) = read_interior_keys_children(&parent)?;
931
932 let child_index = parent_children
933 .iter()
934 .position(|&id| id == leaf_id)
935 .ok_or_else(|| BTreeError::Corrupted("Leaf missing from parent".into()))?;
936
937 if child_index > 0 {
938 if let Some((first_key, _)) = leaf_entries.first() {
939 if parent_keys.get(child_index - 1).map(|k| k.as_slice())
940 != Some(first_key.as_slice())
941 {
942 parent_keys[child_index - 1] = first_key.clone();
943 write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
944 parent.update_checksum();
945 self.pager.write_page(parent_id, parent.clone())?;
946 }
947 }
948 }
949
950 if leaf_entries_size(&leaf_entries) >= min_bytes {
951 return Ok(());
952 }
953
954 if child_index > 0 {
955 let left_id = parent_children[child_index - 1];
956 let mut left = self.pager.read_page(left_id)?;
957 let mut left_entries = read_leaf_entries(&left)?;
958 let mut borrowed = false;
959
960 while leaf_entries_size(&leaf_entries) < min_bytes {
961 let Some(entry) = left_entries.pop() else {
962 break;
963 };
964 if leaf_entries_size(&left_entries) < min_bytes {
965 left_entries.push(entry);
966 break;
967 }
968 leaf_entries.insert(0, entry);
969 borrowed = true;
970 }
971
972 if borrowed {
973 write_leaf_entries(&mut left, &left_entries)?;
974 left.update_checksum();
975 self.pager.write_page(left_id, left)?;
976
977 write_leaf_entries(&mut leaf, &leaf_entries)?;
978 leaf.update_checksum();
979 self.pager.write_page(leaf_id, leaf)?;
980
981 if let Some((first_key, _)) = leaf_entries.first() {
982 parent_keys[child_index - 1] = first_key.clone();
983 write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
984 parent.update_checksum();
985 self.pager.write_page(parent_id, parent)?;
986 }
987
988 return Ok(());
989 }
990 }
991
992 if child_index + 1 < parent_children.len() {
993 let right_id = parent_children[child_index + 1];
994 let mut right = self.pager.read_page(right_id)?;
995 let mut right_entries = read_leaf_entries(&right)?;
996 let mut borrowed = false;
997
998 while leaf_entries_size(&leaf_entries) < min_bytes {
999 if right_entries.is_empty() {
1000 break;
1001 }
1002 let entry = right_entries.remove(0);
1003 if leaf_entries_size(&right_entries) < min_bytes {
1004 right_entries.insert(0, entry);
1005 break;
1006 }
1007 leaf_entries.push(entry);
1008 borrowed = true;
1009 }
1010
1011 if borrowed {
1012 write_leaf_entries(&mut right, &right_entries)?;
1013 right.update_checksum();
1014 self.pager.write_page(right_id, right)?;
1015
1016 write_leaf_entries(&mut leaf, &leaf_entries)?;
1017 leaf.update_checksum();
1018 self.pager.write_page(leaf_id, leaf)?;
1019
1020 if let Some((first_key, _)) = right_entries.first() {
1021 parent_keys[child_index] = first_key.clone();
1022 write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
1023 parent.update_checksum();
1024 self.pager.write_page(parent_id, parent)?;
1025 }
1026
1027 return Ok(());
1028 }
1029 }
1030
1031 if child_index > 0 {
1032 let left_id = parent_children[child_index - 1];
1033 let mut left = self.pager.read_page(left_id)?;
1034 let mut left_entries = read_leaf_entries(&left)?;
1035
1036 left_entries.extend(leaf_entries);
1037 write_leaf_entries(&mut left, &left_entries)?;
1038
1039 let next_leaf = read_next_leaf(&leaf);
1040 set_next_leaf(&mut left, next_leaf);
1041 if next_leaf != 0 {
1042 let mut next = self.pager.read_page(next_leaf)?;
1043 set_prev_leaf(&mut next, left_id);
1044 next.update_checksum();
1045 self.pager.write_page(next_leaf, next)?;
1046 }
1047
1048 left.update_checksum();
1049 self.pager.write_page(left_id, left)?;
1050 self.pager.free_page(leaf_id)?;
1051
1052 parent_keys.remove(child_index - 1);
1053 parent_children.remove(child_index);
1054 write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
1055 parent.update_checksum();
1056 self.pager.write_page(parent_id, parent)?;
1057
1058 let mut parent_path = path;
1059 parent_path.pop();
1060 return self.rebalance_interior(parent_id, parent_path);
1061 }
1062
1063 if child_index + 1 < parent_children.len() {
1064 let right_id = parent_children[child_index + 1];
1065 let right = self.pager.read_page(right_id)?;
1066 let right_entries = read_leaf_entries(&right)?;
1067
1068 leaf_entries.extend(right_entries);
1069 write_leaf_entries(&mut leaf, &leaf_entries)?;
1070
1071 let next_leaf = read_next_leaf(&right);
1072 set_next_leaf(&mut leaf, next_leaf);
1073 if next_leaf != 0 {
1074 let mut next = self.pager.read_page(next_leaf)?;
1075 set_prev_leaf(&mut next, leaf_id);
1076 next.update_checksum();
1077 self.pager.write_page(next_leaf, next)?;
1078 }
1079
1080 leaf.update_checksum();
1081 self.pager.write_page(leaf_id, leaf)?;
1082 self.pager.free_page(right_id)?;
1083
1084 parent_keys.remove(child_index);
1085 parent_children.remove(child_index + 1);
1086 write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
1087 parent.update_checksum();
1088 self.pager.write_page(parent_id, parent)?;
1089
1090 let mut parent_path = path;
1091 parent_path.pop();
1092 return self.rebalance_interior(parent_id, parent_path);
1093 }
1094
1095 Ok(())
1096 }
1097
1098 fn rebalance_interior(&self, node_id: u32, mut path: Vec<u32>) -> BTreeResult<()> {
1099 let root_id = self.root_page_id();
1100 let mut node = self.pager.read_page(node_id)?;
1101 let (mut node_keys, mut node_children) = read_interior_keys_children(&node)?;
1102 let min_bytes = interior_min_bytes();
1103
1104 if node_id == root_id {
1105 if node_keys.is_empty() {
1106 let next_root = node_children.first().copied().unwrap_or(0);
1107 self.pager.free_page(node_id)?;
1108 *self.root_page_id.write().map_err(|e| {
1109 BTreeError::LockPoisoned(format!(
1110 "rebalance_interior: root_page_id write lock: {e}"
1111 ))
1112 })? = next_root;
1113 }
1114 return Ok(());
1115 }
1116
1117 if interior_entries_size(&node_keys) >= min_bytes {
1118 return Ok(());
1119 }
1120
1121 let parent_id = match path.pop() {
1122 Some(id) => id,
1123 None => return Ok(()),
1124 };
1125 let mut parent = self.pager.read_page(parent_id)?;
1126 let (mut parent_keys, mut parent_children) = read_interior_keys_children(&parent)?;
1127
1128 let child_index = parent_children
1129 .iter()
1130 .position(|&id| id == node_id)
1131 .ok_or_else(|| BTreeError::Corrupted("Interior missing from parent".into()))?;
1132
1133 if child_index > 0 {
1134 let left_id = parent_children[child_index - 1];
1135 let mut left = self.pager.read_page(left_id)?;
1136 let (mut left_keys, mut left_children) = read_interior_keys_children(&left)?;
1137
1138 if let Some(borrow_key) = left_keys.last().cloned() {
1139 let borrow_size = interior_key_size(&borrow_key);
1140 if interior_entries_size(&left_keys).saturating_sub(borrow_size) >= min_bytes {
1141 let parent_key = parent_keys[child_index - 1].clone();
1142 let borrowed_key = left_keys.pop().ok_or_else(|| {
1143 BTreeError::Corrupted(
1144 "rebalance_interior: left_keys empty after check".into(),
1145 )
1146 })?;
1147 let borrowed_child = left_children.pop().ok_or_else(|| {
1148 BTreeError::Corrupted("rebalance_interior: left_children empty".into())
1149 })?;
1150
1151 node_keys.insert(0, parent_key);
1152 node_children.insert(0, borrowed_child);
1153 parent_keys[child_index - 1] = borrowed_key;
1154
1155 write_interior_entries(&mut left, &left_keys, &left_children)?;
1156 left.update_checksum();
1157 self.pager.write_page(left_id, left)?;
1158
1159 write_interior_entries(&mut node, &node_keys, &node_children)?;
1160 node.update_checksum();
1161 self.pager.write_page(node_id, node)?;
1162
1163 write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
1164 parent.update_checksum();
1165 self.pager.write_page(parent_id, parent)?;
1166
1167 return Ok(());
1168 }
1169 }
1170 }
1171
1172 if child_index + 1 < parent_children.len() {
1173 let right_id = parent_children[child_index + 1];
1174 let mut right = self.pager.read_page(right_id)?;
1175 let (mut right_keys, mut right_children) = read_interior_keys_children(&right)?;
1176
1177 if let Some(borrow_key) = right_keys.first().cloned() {
1178 let borrow_size = interior_key_size(&borrow_key);
1179 if interior_entries_size(&right_keys).saturating_sub(borrow_size) >= min_bytes {
1180 let parent_key = parent_keys[child_index].clone();
1181 let new_parent_key = right_keys.remove(0);
1182 let borrowed_child = right_children.remove(0);
1183
1184 node_keys.push(parent_key);
1185 node_children.push(borrowed_child);
1186 parent_keys[child_index] = new_parent_key;
1187
1188 write_interior_entries(&mut right, &right_keys, &right_children)?;
1189 right.update_checksum();
1190 self.pager.write_page(right_id, right)?;
1191
1192 write_interior_entries(&mut node, &node_keys, &node_children)?;
1193 node.update_checksum();
1194 self.pager.write_page(node_id, node)?;
1195
1196 write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
1197 parent.update_checksum();
1198 self.pager.write_page(parent_id, parent)?;
1199
1200 return Ok(());
1201 }
1202 }
1203 }
1204
1205 if child_index > 0 {
1206 let left_id = parent_children[child_index - 1];
1207 let mut left = self.pager.read_page(left_id)?;
1208 let (mut left_keys, mut left_children) = read_interior_keys_children(&left)?;
1209 let parent_key = parent_keys.remove(child_index - 1);
1210 parent_children.remove(child_index);
1211
1212 left_keys.push(parent_key);
1213 left_keys.extend(node_keys);
1214 left_children.extend(node_children);
1215
1216 write_interior_entries(&mut left, &left_keys, &left_children)?;
1217 left.update_checksum();
1218 self.pager.write_page(left_id, left)?;
1219 self.pager.free_page(node_id)?;
1220
1221 write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
1222 parent.update_checksum();
1223 self.pager.write_page(parent_id, parent)?;
1224
1225 return self.rebalance_interior(parent_id, path);
1226 }
1227
1228 if child_index + 1 < parent_children.len() {
1229 let right_id = parent_children[child_index + 1];
1230 let right = self.pager.read_page(right_id)?;
1231 let (right_keys, right_children) = read_interior_keys_children(&right)?;
1232 let parent_key = parent_keys.remove(child_index);
1233 parent_children.remove(child_index + 1);
1234
1235 node_keys.push(parent_key);
1236 node_keys.extend(right_keys);
1237 node_children.extend(right_children);
1238
1239 write_interior_entries(&mut node, &node_keys, &node_children)?;
1240 node.update_checksum();
1241 self.pager.write_page(node_id, node)?;
1242 self.pager.free_page(right_id)?;
1243
1244 write_interior_entries(&mut parent, &parent_keys, &parent_children)?;
1245 parent.update_checksum();
1246 self.pager.write_page(parent_id, parent)?;
1247
1248 return self.rebalance_interior(parent_id, path);
1249 }
1250
1251 Ok(())
1252 }
1253}