1use std::cmp::Ordering;
33use std::sync::{Arc, RwLock};
34
35use super::page::{Page, PageType, HEADER_SIZE, PAGE_SIZE};
36use super::pager::{Pager, PagerError};
37
38pub const MAX_KEY_SIZE: usize = 1024;
40
41pub const MAX_VALUE_SIZE: usize = PAGE_SIZE / 4;
47
48pub const MIN_FILL_FACTOR: usize = 25;
50
51const LEAF_PREV_OFFSET: usize = HEADER_SIZE;
53
54const LEAF_NEXT_OFFSET: usize = HEADER_SIZE + 4;
56
57const LEAF_SLOT_ARRAY_OFFSET: usize = HEADER_SIZE + 8;
72
73const LEAF_DATA_OFFSET: usize = LEAF_SLOT_ARRAY_OFFSET;
76
77const INTERIOR_DATA_OFFSET: usize = HEADER_SIZE;
79
80#[derive(Debug, Clone)]
82pub enum BTreeError {
83 NotFound,
85 DuplicateKey,
87 KeyTooLarge(usize),
89 ValueTooLarge(usize),
91 Corrupted(String),
93 Pager(String),
95 LockPoisoned(String),
97}
98
99impl std::fmt::Display for BTreeError {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 match self {
102 Self::NotFound => write!(f, "Key not found"),
103 Self::DuplicateKey => write!(f, "Key already exists"),
104 Self::KeyTooLarge(size) => {
105 write!(f, "Key too large: {} bytes (max {})", size, MAX_KEY_SIZE)
106 }
107 Self::ValueTooLarge(size) => write!(
108 f,
109 "Value too large: {} bytes (max {})",
110 size, MAX_VALUE_SIZE
111 ),
112 Self::Corrupted(msg) => write!(f, "B-tree corrupted: {}", msg),
113 Self::Pager(msg) => write!(f, "Pager error: {}", msg),
114 Self::LockPoisoned(msg) => write!(f, "Lock poisoned: {}", msg),
115 }
116 }
117}
118
119impl std::error::Error for BTreeError {}
120
121impl From<PagerError> for BTreeError {
122 fn from(e: PagerError) -> Self {
123 Self::Pager(e.to_string())
124 }
125}
126
127impl From<super::page::PageError> for BTreeError {
128 fn from(e: super::page::PageError) -> Self {
129 Self::Corrupted(e.to_string())
130 }
131}
132
133pub type BTreeResult<T> = Result<T, BTreeError>;
135
136pub struct BTreeCursor {
138 leaf_page_id: u32,
140 position: usize,
142 pager: Arc<Pager>,
144 prefetched_next: bool,
148}
149
150impl BTreeCursor {
151 pub fn next(&mut self) -> BTreeResult<Option<(Vec<u8>, Vec<u8>)>> {
153 if self.leaf_page_id == 0 {
154 return Ok(None);
155 }
156
157 let page = self.pager.read_page(self.leaf_page_id)?;
158 let cell_count = page.cell_count() as usize;
159
160 if !self.prefetched_next && self.position >= cell_count / 2 && cell_count > 0 {
166 let next_leaf = read_next_leaf(&page);
167 if next_leaf != 0 {
168 self.pager.prefetch_hint(next_leaf);
169 }
170 self.prefetched_next = true;
171 }
172
173 if self.position < cell_count {
175 let (key, value) = read_leaf_cell(&page, self.position)?;
176 self.position += 1;
177 return Ok(Some((key, value)));
178 }
179
180 let next_leaf = read_next_leaf(&page);
182 if next_leaf == 0 {
183 self.leaf_page_id = 0;
184 return Ok(None);
185 }
186
187 self.leaf_page_id = next_leaf;
188 self.position = 0;
189 self.prefetched_next = false; let page = self.pager.read_page(self.leaf_page_id)?;
193 if page.cell_count() == 0 {
194 return Ok(None);
195 }
196
197 let (key, value) = read_leaf_cell(&page, 0)?;
198 self.position = 1;
199 Ok(Some((key, value)))
200 }
201
202 pub fn peek(&self) -> BTreeResult<Option<(Vec<u8>, Vec<u8>)>> {
204 if self.leaf_page_id == 0 {
205 return Ok(None);
206 }
207
208 let page = self.pager.read_page(self.leaf_page_id)?;
209 let cell_count = page.cell_count() as usize;
210
211 if self.position >= cell_count {
212 return Ok(None);
213 }
214
215 let (key, value) = read_leaf_cell(&page, self.position)?;
216 Ok(Some((key, value)))
217 }
218}
219
220pub struct BTree {
222 pager: Arc<Pager>,
224 root_page_id: RwLock<u32>,
226 rightmost_leaf: RwLock<Option<(u32, Vec<u8>)>>,
235}
236
237#[path = "btree/impl.rs"]
238mod btree_impl;
239
240#[path = "btree/lehman_yao.rs"]
241pub mod lehman_yao;
242enum SearchResult {
245 Found(usize),
246 NotFound(usize),
247}
248
249fn search_leaf(page: &Page, key: &[u8]) -> BTreeResult<SearchResult> {
250 let cell_count = page.cell_count() as usize;
251
252 let mut low = 0;
254 let mut high = cell_count;
255
256 while low < high {
257 let mid = (low + high) / 2;
258 let (cell_key, _) = read_leaf_cell(page, mid)?;
259
260 match cell_key.as_slice().cmp(key) {
261 Ordering::Less => low = mid + 1,
262 Ordering::Greater => high = mid,
263 Ordering::Equal => return Ok(SearchResult::Found(mid)),
264 }
265 }
266
267 Ok(SearchResult::NotFound(low))
268}
269
270fn find_child(page: &Page, key: &[u8]) -> BTreeResult<u32> {
271 let cell_count = page.cell_count() as usize;
272
273 for i in 0..cell_count {
275 let (cell_key, child) = read_interior_cell(page, i)?;
276 if key < cell_key.as_slice() {
277 return Ok(child);
278 }
279 }
280
281 Ok(page.right_child())
283}
284
285fn find_first_child(page: &Page) -> BTreeResult<u32> {
286 if page.cell_count() == 0 {
287 return Ok(page.right_child());
288 }
289 let (_, child) = read_interior_cell(page, 0)?;
290 Ok(child)
291}
292
293fn leaf_min_bytes() -> usize {
294 (PAGE_SIZE - LEAF_DATA_OFFSET) * MIN_FILL_FACTOR / 100
295}
296
297fn interior_min_bytes() -> usize {
298 (PAGE_SIZE - INTERIOR_DATA_OFFSET) * MIN_FILL_FACTOR / 100
299}
300
301fn leaf_entry_size(entry: &(Vec<u8>, Vec<u8>)) -> usize {
305 2 + 4 + entry.0.len() + entry.1.len()
306}
307
308fn leaf_entries_size(entries: &[(Vec<u8>, Vec<u8>)]) -> usize {
309 entries.iter().map(leaf_entry_size).sum()
310}
311
312#[inline]
313fn leaf_slot_offset_for(index: usize) -> usize {
314 LEAF_SLOT_ARRAY_OFFSET + index * 2
315}
316
317#[inline]
318fn leaf_read_slot(page: &Page, index: usize) -> BTreeResult<usize> {
319 let data = page.as_bytes();
320 let slot_pos = leaf_slot_offset_for(index);
321 if slot_pos + 2 > PAGE_SIZE {
322 return Err(BTreeError::Corrupted("slot array overflows page".into()));
323 }
324 Ok(u16::from_le_bytes([data[slot_pos], data[slot_pos + 1]]) as usize)
325}
326
327#[inline]
328fn leaf_write_slot(page: &mut Page, index: usize, cell_offset: u16) -> BTreeResult<()> {
329 let data = page.as_bytes_mut();
330 let slot_pos = leaf_slot_offset_for(index);
331 if slot_pos + 2 > PAGE_SIZE {
332 return Err(BTreeError::Corrupted("slot array overflows page".into()));
333 }
334 data[slot_pos..slot_pos + 2].copy_from_slice(&cell_offset.to_le_bytes());
335 Ok(())
336}
337
338#[inline]
339fn leaf_slots_end(page: &Page) -> usize {
340 LEAF_SLOT_ARRAY_OFFSET + (page.cell_count() as usize) * 2
341}
342
343#[inline]
344fn leaf_cells_start(page: &Page) -> usize {
345 let end = page.free_end() as usize;
346 if end == 0 {
347 PAGE_SIZE
348 } else {
349 end
350 }
351}
352
353#[inline]
354fn leaf_free_bytes(page: &Page) -> usize {
355 let slot_end = leaf_slots_end(page);
356 let cells = leaf_cells_start(page);
357 cells.saturating_sub(slot_end)
358}
359
360fn interior_key_size(key: &[u8]) -> usize {
361 2 + key.len() + 4
362}
363
364fn interior_entries_size(keys: &[Vec<u8>]) -> usize {
365 keys.iter().map(|k| interior_key_size(k)).sum()
366}
367
368fn read_leaf_cell(page: &Page, index: usize) -> BTreeResult<(Vec<u8>, Vec<u8>)> {
374 let cell_count = page.cell_count() as usize;
375 if index >= cell_count {
376 return Err(BTreeError::Corrupted("Cell index out of range".into()));
377 }
378 let offset = leaf_read_slot(page, index)?;
379 let data = page.as_bytes();
380 if offset + 4 > PAGE_SIZE {
381 return Err(BTreeError::Corrupted("cell header out of range".into()));
382 }
383 let key_len = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
384 let val_len = u16::from_le_bytes([data[offset + 2], data[offset + 3]]) as usize;
385 let end = offset + 4 + key_len + val_len;
386 if end > PAGE_SIZE {
387 return Err(BTreeError::Corrupted("cell body out of range".into()));
388 }
389 let key = data[offset + 4..offset + 4 + key_len].to_vec();
390 let value = data[offset + 4 + key_len..end].to_vec();
391 Ok((key, value))
392}
393
394fn read_leaf_entries(page: &Page) -> BTreeResult<Vec<(Vec<u8>, Vec<u8>)>> {
395 let cell_count = page.cell_count() as usize;
396 let mut entries = Vec::with_capacity(cell_count);
397 for i in 0..cell_count {
398 entries.push(read_leaf_cell(page, i)?);
399 }
400 Ok(entries)
401}
402
403fn write_leaf_entries(page: &mut Page, entries: &[(Vec<u8>, Vec<u8>)]) -> BTreeResult<()> {
409 clear_leaf_cells(page);
410 for (i, (k, v)) in entries.iter().enumerate() {
411 let cell_size = 4 + k.len() + v.len();
412 let cells_start = leaf_cells_start(page);
413 let slot_end = LEAF_SLOT_ARRAY_OFFSET + (i + 1) * 2;
414 if slot_end + cell_size > cells_start {
415 return Err(BTreeError::Corrupted("leaf rebuild overflowed page".into()));
416 }
417 let cell_offset = cells_start - cell_size;
418 {
419 let data = page.as_bytes_mut();
420 data[cell_offset..cell_offset + 2].copy_from_slice(&(k.len() as u16).to_le_bytes());
421 data[cell_offset + 2..cell_offset + 4].copy_from_slice(&(v.len() as u16).to_le_bytes());
422 data[cell_offset + 4..cell_offset + 4 + k.len()].copy_from_slice(k);
423 data[cell_offset + 4 + k.len()..cell_offset + 4 + k.len() + v.len()].copy_from_slice(v);
424 }
425 page.set_free_end(cell_offset as u16);
426 leaf_write_slot(page, i, cell_offset as u16)?;
427 }
428 page.set_cell_count(entries.len() as u16);
429 page.set_free_start((LEAF_SLOT_ARRAY_OFFSET + entries.len() * 2) as u16);
430 Ok(())
431}
432
433fn can_insert_leaf(page: &Page, key: &[u8], value: &[u8]) -> bool {
434 let needed = 2 + 4 + key.len() + value.len();
435 leaf_free_bytes(page) >= needed
436}
437
438pub(crate) fn overwrite_leaf_value(
454 page: &mut Page,
455 index: usize,
456 new_value: &[u8],
457) -> BTreeResult<()> {
458 let cell_offset = leaf_read_slot(page, index)?;
459 let data = page.as_bytes();
460 if cell_offset + 4 > data.len() {
461 return Err(BTreeError::Corrupted(
462 "leaf cell header out of bounds".into(),
463 ));
464 }
465 let k_len = u16::from_le_bytes([data[cell_offset], data[cell_offset + 1]]) as usize;
466 let old_v_len = u16::from_le_bytes([data[cell_offset + 2], data[cell_offset + 3]]) as usize;
467 if new_value.len() > old_v_len {
468 return Err(BTreeError::Corrupted(
469 "overwrite_leaf_value: new value larger than slot".into(),
470 ));
471 }
472 let value_start = cell_offset + 4 + k_len;
473 if value_start + new_value.len() > data.len() {
474 return Err(BTreeError::Corrupted(
475 "leaf cell value out of bounds".into(),
476 ));
477 }
478 let data = page.as_bytes_mut();
479 if new_value.len() != old_v_len {
482 let new_len = new_value.len() as u16;
483 data[cell_offset + 2..cell_offset + 4].copy_from_slice(&new_len.to_le_bytes());
484 }
485 data[value_start..value_start + new_value.len()].copy_from_slice(new_value);
486 Ok(())
487}
488
489fn insert_into_leaf(page: &mut Page, key: &[u8], value: &[u8]) -> BTreeResult<()> {
491 let cell_count = page.cell_count() as usize;
493 let mut low = 0;
494 let mut high = cell_count;
495 while low < high {
496 let mid = (low + high) / 2;
497 let (cell_key, _) = read_leaf_cell(page, mid)?;
498 match cell_key.as_slice().cmp(key) {
499 Ordering::Less => low = mid + 1,
500 Ordering::Greater => high = mid,
501 Ordering::Equal => {
502 low = mid + 1;
505 break;
506 }
507 }
508 }
509 let insert_pos = low;
510
511 let cell_size = 4 + key.len() + value.len();
513 let slot_end_after = LEAF_SLOT_ARRAY_OFFSET + (cell_count + 1) * 2;
514 let cells_start = leaf_cells_start(page);
515 if slot_end_after + cell_size > cells_start {
516 return Err(BTreeError::Corrupted("leaf page full".into()));
517 }
518 let cell_offset = cells_start - cell_size;
519 {
520 let data = page.as_bytes_mut();
521 data[cell_offset..cell_offset + 2].copy_from_slice(&(key.len() as u16).to_le_bytes());
522 data[cell_offset + 2..cell_offset + 4].copy_from_slice(&(value.len() as u16).to_le_bytes());
523 data[cell_offset + 4..cell_offset + 4 + key.len()].copy_from_slice(key);
524 data[cell_offset + 4 + key.len()..cell_offset + 4 + key.len() + value.len()]
525 .copy_from_slice(value);
526 }
527 page.set_free_end(cell_offset as u16);
528
529 {
533 let shift_from = leaf_slot_offset_for(insert_pos);
534 let shift_to = shift_from + 2;
535 let shift_len = (cell_count - insert_pos) * 2;
536 if shift_len > 0 {
537 let data = page.as_bytes_mut();
538 data.copy_within(shift_from..shift_from + shift_len, shift_to);
539 }
540 }
541 leaf_write_slot(page, insert_pos, cell_offset as u16)?;
542
543 page.set_cell_count((cell_count + 1) as u16);
545 page.set_free_start((LEAF_SLOT_ARRAY_OFFSET + (cell_count + 1) * 2) as u16);
546 Ok(())
547}
548
549fn delete_from_leaf(page: &mut Page, index: usize) -> BTreeResult<()> {
554 let cell_count = page.cell_count() as usize;
555 if index >= cell_count {
556 return Err(BTreeError::Corrupted("delete index out of range".into()));
557 }
558 if index + 1 < cell_count {
559 let shift_from = leaf_slot_offset_for(index + 1);
560 let shift_to = leaf_slot_offset_for(index);
561 let shift_len = (cell_count - index - 1) * 2;
562 let data = page.as_bytes_mut();
563 data.copy_within(shift_from..shift_from + shift_len, shift_to);
564 }
565 page.set_cell_count((cell_count - 1) as u16);
566 page.set_free_start((LEAF_SLOT_ARRAY_OFFSET + (cell_count - 1) * 2) as u16);
567 Ok(())
568}
569
570fn clear_leaf_cells(page: &mut Page) {
574 {
575 let data = page.as_bytes_mut();
576 for byte in &mut data[LEAF_SLOT_ARRAY_OFFSET..] {
577 *byte = 0;
578 }
579 }
580 page.set_cell_count(0);
581 page.set_free_start(LEAF_SLOT_ARRAY_OFFSET as u16);
582 page.set_free_end(PAGE_SIZE as u16);
583}
584
585fn init_leaf_links(page: &mut Page, prev: u32, next: u32) {
586 let data = page.as_bytes_mut();
587 data[LEAF_PREV_OFFSET..LEAF_PREV_OFFSET + 4].copy_from_slice(&prev.to_le_bytes());
588 data[LEAF_NEXT_OFFSET..LEAF_NEXT_OFFSET + 4].copy_from_slice(&next.to_le_bytes());
589}
590
591fn read_next_leaf(page: &Page) -> u32 {
592 let data = page.as_bytes();
593 u32::from_le_bytes([
594 data[LEAF_NEXT_OFFSET],
595 data[LEAF_NEXT_OFFSET + 1],
596 data[LEAF_NEXT_OFFSET + 2],
597 data[LEAF_NEXT_OFFSET + 3],
598 ])
599}
600
601#[inline]
604fn leaf_right_sibling(page: &Page) -> u32 {
605 read_next_leaf(page)
606}
607
608fn leaf_high_key(page: &Page) -> BTreeResult<Option<Vec<u8>>> {
612 let cell_count = page.cell_count() as usize;
613 if cell_count == 0 {
614 return Ok(None);
615 }
616 let (key, _) = read_leaf_cell(page, cell_count - 1)?;
617 Ok(Some(key))
618}
619
620fn set_prev_leaf(page: &mut Page, prev: u32) {
621 let data = page.as_bytes_mut();
622 data[LEAF_PREV_OFFSET..LEAF_PREV_OFFSET + 4].copy_from_slice(&prev.to_le_bytes());
623}
624
625fn set_next_leaf(page: &mut Page, next: u32) {
626 let data = page.as_bytes_mut();
627 data[LEAF_NEXT_OFFSET..LEAF_NEXT_OFFSET + 4].copy_from_slice(&next.to_le_bytes());
628}
629
630#[inline]
645fn interior_slot_offset_for(index: usize) -> usize {
646 INTERIOR_DATA_OFFSET + index * 2
647}
648
649#[inline]
650fn interior_read_slot(page: &Page, index: usize) -> BTreeResult<usize> {
651 let data = page.as_bytes();
652 let slot_pos = interior_slot_offset_for(index);
653 if slot_pos + 2 > PAGE_SIZE {
654 return Err(BTreeError::Corrupted(
655 "interior slot array overflows page".into(),
656 ));
657 }
658 Ok(u16::from_le_bytes([data[slot_pos], data[slot_pos + 1]]) as usize)
659}
660
661#[inline]
662fn interior_write_slot(page: &mut Page, index: usize, cell_offset: u16) -> BTreeResult<()> {
663 let data = page.as_bytes_mut();
664 let slot_pos = interior_slot_offset_for(index);
665 if slot_pos + 2 > PAGE_SIZE {
666 return Err(BTreeError::Corrupted(
667 "interior slot array overflows page".into(),
668 ));
669 }
670 data[slot_pos..slot_pos + 2].copy_from_slice(&cell_offset.to_le_bytes());
671 Ok(())
672}
673
674#[inline]
675fn interior_cells_start(page: &Page) -> usize {
676 let end = page.free_end() as usize;
677 if end == 0 {
678 PAGE_SIZE
679 } else {
680 end
681 }
682}
683
684#[inline]
685fn interior_free_bytes(page: &Page) -> usize {
686 let slot_end = INTERIOR_DATA_OFFSET + (page.cell_count() as usize) * 2;
687 interior_cells_start(page).saturating_sub(slot_end)
688}
689
690fn read_interior_cell(page: &Page, index: usize) -> BTreeResult<(Vec<u8>, u32)> {
691 let cell_count = page.cell_count() as usize;
692 if index >= cell_count {
693 return Err(BTreeError::Corrupted("Cell index out of range".into()));
694 }
695 let offset = interior_read_slot(page, index)?;
696 let data = page.as_bytes();
697 if offset + 2 > PAGE_SIZE {
698 return Err(BTreeError::Corrupted(
699 "interior cell header out of range".into(),
700 ));
701 }
702 let key_len = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
703 let end = offset + 2 + key_len + 4;
704 if end > PAGE_SIZE {
705 return Err(BTreeError::Corrupted(
706 "interior cell body out of range".into(),
707 ));
708 }
709 let key = data[offset + 2..offset + 2 + key_len].to_vec();
710 let child = u32::from_le_bytes([
711 data[offset + 2 + key_len],
712 data[offset + 2 + key_len + 1],
713 data[offset + 2 + key_len + 2],
714 data[offset + 2 + key_len + 3],
715 ]);
716 Ok((key, child))
717}
718
719fn read_interior_keys_children(page: &Page) -> BTreeResult<(Vec<Vec<u8>>, Vec<u32>)> {
720 let cell_count = page.cell_count() as usize;
721 let mut keys = Vec::with_capacity(cell_count);
722 let mut children = Vec::with_capacity(cell_count + 1);
723
724 for i in 0..cell_count {
725 let (key, child) = read_interior_cell(page, i)?;
726 keys.push(key);
727 children.push(child);
728 }
729
730 if cell_count == 0 {
731 let right_child = page.right_child();
732 if right_child != 0 {
733 children.push(right_child);
734 }
735 } else {
736 children.push(page.right_child());
737 }
738
739 Ok((keys, children))
740}
741
742fn write_interior_entries(page: &mut Page, keys: &[Vec<u8>], children: &[u32]) -> BTreeResult<()> {
747 if !keys.is_empty() && children.len() != keys.len() + 1 {
748 return Err(BTreeError::Corrupted(
749 "Interior keys/children length mismatch".into(),
750 ));
751 }
752
753 clear_interior_cells(page);
754 if keys.is_empty() {
755 let right_child = children.first().copied().unwrap_or(0);
756 page.set_right_child(right_child);
757 return Ok(());
758 }
759
760 for (i, key) in keys.iter().enumerate() {
761 let cell_size = 2 + key.len() + 4;
762 let cells_start = interior_cells_start(page);
763 let slot_end = INTERIOR_DATA_OFFSET + (i + 1) * 2;
764 if slot_end + cell_size > cells_start {
765 return Err(BTreeError::Corrupted(
766 "interior rebuild overflowed page".into(),
767 ));
768 }
769 let cell_offset = cells_start - cell_size;
770 {
771 let data = page.as_bytes_mut();
772 data[cell_offset..cell_offset + 2].copy_from_slice(&(key.len() as u16).to_le_bytes());
773 data[cell_offset + 2..cell_offset + 2 + key.len()].copy_from_slice(key);
774 data[cell_offset + 2 + key.len()..cell_offset + 2 + key.len() + 4]
775 .copy_from_slice(&children[i].to_le_bytes());
776 }
777 page.set_free_end(cell_offset as u16);
778 interior_write_slot(page, i, cell_offset as u16)?;
779 }
780 page.set_cell_count(keys.len() as u16);
781 page.set_free_start((INTERIOR_DATA_OFFSET + keys.len() * 2) as u16);
782 page.set_right_child(*children.last().ok_or_else(|| {
783 BTreeError::Corrupted("write_interior_entries: children empty with non-empty keys".into())
784 })?);
785 Ok(())
786}
787
788fn can_insert_interior(page: &Page, key: &[u8]) -> bool {
789 let needed = 2 + 2 + key.len() + 4;
790 interior_free_bytes(page) >= needed
791}
792
793fn insert_into_interior(
798 page: &mut Page,
799 key: &[u8],
800 left_child: u32,
801 right_child: u32,
802) -> BTreeResult<()> {
803 let cell_count = page.cell_count() as usize;
805 let mut low = 0;
806 let mut high = cell_count;
807 while low < high {
808 let mid = (low + high) / 2;
809 let (cell_key, _) = read_interior_cell(page, mid)?;
810 match cell_key.as_slice().cmp(key) {
811 Ordering::Less => low = mid + 1,
812 Ordering::Greater | Ordering::Equal => high = mid,
813 }
814 }
815 let insert_pos = low;
816
817 if insert_pos < cell_count {
822 let old_offset = interior_read_slot(page, insert_pos)?;
823 let data = page.as_bytes();
824 let key_len = u16::from_le_bytes([data[old_offset], data[old_offset + 1]]) as usize;
825 let child_pos = old_offset + 2 + key_len;
826 let data = page.as_bytes_mut();
827 data[child_pos..child_pos + 4].copy_from_slice(&right_child.to_le_bytes());
828 } else {
829 page.set_right_child(right_child);
830 }
831
832 let cell_size = 2 + key.len() + 4;
834 let slot_end_after = INTERIOR_DATA_OFFSET + (cell_count + 1) * 2;
835 let cells_start = interior_cells_start(page);
836 if slot_end_after + cell_size > cells_start {
837 return Err(BTreeError::Corrupted("interior page full".into()));
838 }
839 let cell_offset = cells_start - cell_size;
840 {
841 let data = page.as_bytes_mut();
842 data[cell_offset..cell_offset + 2].copy_from_slice(&(key.len() as u16).to_le_bytes());
843 data[cell_offset + 2..cell_offset + 2 + key.len()].copy_from_slice(key);
844 data[cell_offset + 2 + key.len()..cell_offset + 2 + key.len() + 4]
845 .copy_from_slice(&left_child.to_le_bytes());
846 }
847 page.set_free_end(cell_offset as u16);
848
849 {
852 let shift_from = interior_slot_offset_for(insert_pos);
853 let shift_to = shift_from + 2;
854 let shift_len = (cell_count - insert_pos) * 2;
855 if shift_len > 0 {
856 let data = page.as_bytes_mut();
857 data.copy_within(shift_from..shift_from + shift_len, shift_to);
858 }
859 }
860 interior_write_slot(page, insert_pos, cell_offset as u16)?;
861 page.set_cell_count((cell_count + 1) as u16);
862 page.set_free_start((INTERIOR_DATA_OFFSET + (cell_count + 1) * 2) as u16);
863 Ok(())
864}
865
866fn clear_interior_cells(page: &mut Page) {
867 {
868 let data = page.as_bytes_mut();
869 for byte in &mut data[INTERIOR_DATA_OFFSET..] {
870 *byte = 0;
871 }
872 }
873 page.set_cell_count(0);
874 page.set_free_start(INTERIOR_DATA_OFFSET as u16);
875 page.set_free_end(PAGE_SIZE as u16);
876}
877
878#[cfg(test)]
879mod tests {
880 use super::*;
881 use std::path::PathBuf;
882
883 fn temp_db_path() -> PathBuf {
884 use std::sync::atomic::{AtomicU64, Ordering};
885 static COUNTER: AtomicU64 = AtomicU64::new(0);
886 let id = COUNTER.fetch_add(1, Ordering::Relaxed);
887 let mut path = std::env::temp_dir();
888 path.push(format!("reddb_btree_test_{}_{}.db", std::process::id(), id));
889 path
890 }
891
892 fn cleanup(path: &PathBuf) {
893 let _ = std::fs::remove_file(path);
894 }
895
896 #[test]
897 fn test_btree_empty() {
898 let path = temp_db_path();
899 cleanup(&path);
900
901 let pager = Arc::new(Pager::open_default(&path).unwrap());
902 let tree = BTree::new(pager);
903
904 assert!(tree.is_empty());
905 assert_eq!(tree.root_page_id(), 0);
906 assert_eq!(tree.get(b"key").unwrap(), None);
907 assert_eq!(tree.count().unwrap(), 0);
908
909 cleanup(&path);
910 }
911
912 #[test]
913 fn test_btree_single_insert() {
914 let path = temp_db_path();
915 cleanup(&path);
916
917 let pager = Arc::new(Pager::open_default(&path).unwrap());
918 let tree = BTree::new(pager);
919
920 tree.insert(b"hello", b"world").unwrap();
921
922 assert!(!tree.is_empty());
923 assert_eq!(tree.get(b"hello").unwrap(), Some(b"world".to_vec()));
924 assert_eq!(tree.get(b"other").unwrap(), None);
925 assert_eq!(tree.count().unwrap(), 1);
926
927 cleanup(&path);
928 }
929
930 #[test]
931 fn test_btree_multiple_inserts() {
932 let path = temp_db_path();
933 cleanup(&path);
934
935 let pager = Arc::new(Pager::open_default(&path).unwrap());
936 let tree = BTree::new(pager);
937
938 tree.insert(b"c", b"3").unwrap();
939 tree.insert(b"a", b"1").unwrap();
940 tree.insert(b"b", b"2").unwrap();
941
942 assert_eq!(tree.get(b"a").unwrap(), Some(b"1".to_vec()));
943 assert_eq!(tree.get(b"b").unwrap(), Some(b"2".to_vec()));
944 assert_eq!(tree.get(b"c").unwrap(), Some(b"3".to_vec()));
945 assert_eq!(tree.count().unwrap(), 3);
946
947 cleanup(&path);
948 }
949
950 #[test]
951 fn test_btree_duplicate_key() {
952 let path = temp_db_path();
953 cleanup(&path);
954
955 let pager = Arc::new(Pager::open_default(&path).unwrap());
956 let tree = BTree::new(pager);
957
958 tree.insert(b"key", b"value1").unwrap();
959 let result = tree.insert(b"key", b"value2");
960
961 assert!(matches!(result, Err(BTreeError::DuplicateKey)));
962 assert_eq!(tree.get(b"key").unwrap(), Some(b"value1".to_vec()));
963
964 cleanup(&path);
965 }
966
967 #[test]
968 fn test_btree_delete() {
969 let path = temp_db_path();
970 cleanup(&path);
971
972 let pager = Arc::new(Pager::open_default(&path).unwrap());
973 let tree = BTree::new(pager);
974
975 tree.insert(b"a", b"1").unwrap();
976 tree.insert(b"b", b"2").unwrap();
977 tree.insert(b"c", b"3").unwrap();
978
979 assert!(tree.delete(b"b").unwrap());
980 assert!(!tree.delete(b"d").unwrap());
981
982 assert_eq!(tree.get(b"a").unwrap(), Some(b"1".to_vec()));
983 assert_eq!(tree.get(b"b").unwrap(), None);
984 assert_eq!(tree.get(b"c").unwrap(), Some(b"3".to_vec()));
985 assert_eq!(tree.count().unwrap(), 2);
986
987 cleanup(&path);
988 }
989
990 #[test]
991 fn test_btree_delete_rebalance_removes_empty_leaf() {
992 let path = temp_db_path();
993 cleanup(&path);
994
995 let pager = Arc::new(Pager::open_default(&path).unwrap());
996 let tree = BTree::new(pager.clone());
997
998 let value = vec![b'v'; 200];
999 for i in 0..300u32 {
1000 let key = format!("key{:03}", i);
1001 tree.insert(key.as_bytes(), &value).unwrap();
1002 }
1003
1004 let root_id = tree.root_page_id();
1005 let first_leaf = tree.find_first_leaf(root_id).unwrap();
1006 let mut leaf_ids = Vec::new();
1007 let mut current = first_leaf;
1008 loop {
1009 leaf_ids.push(current);
1010 let page = pager.read_page(current).unwrap();
1011 let next = read_next_leaf(&page);
1012 if next == 0 {
1013 break;
1014 }
1015 current = next;
1016 }
1017
1018 assert!(leaf_ids.len() >= 3);
1019
1020 let target_leaf = leaf_ids[1];
1021 let page = pager.read_page(target_leaf).unwrap();
1022 let cell_count = page.cell_count() as usize;
1023 let mut keys = Vec::with_capacity(cell_count);
1024 for i in 0..cell_count {
1025 let (key, _) = read_leaf_cell(&page, i).unwrap();
1026 keys.push(key);
1027 }
1028
1029 for key in &keys {
1030 tree.delete(key).unwrap();
1031 }
1032
1033 let expected = 300 - keys.len();
1034 assert_eq!(tree.count().unwrap(), expected);
1035
1036 let mut cursor = tree.cursor_first().unwrap();
1037 let mut results = Vec::new();
1038 while let Some((key, _)) = cursor.next().unwrap() {
1039 results.push(key);
1040 }
1041
1042 assert_eq!(results.len(), expected);
1043 let last_key = format!("key{:03}", 299).into_bytes();
1044 assert_eq!(results.last(), Some(&last_key));
1045
1046 cleanup(&path);
1047 }
1048
1049 #[test]
1050 fn test_btree_cursor() {
1051 let path = temp_db_path();
1052 cleanup(&path);
1053
1054 let pager = Arc::new(Pager::open_default(&path).unwrap());
1055 let tree = BTree::new(pager);
1056
1057 tree.insert(b"c", b"3").unwrap();
1058 tree.insert(b"a", b"1").unwrap();
1059 tree.insert(b"b", b"2").unwrap();
1060
1061 let mut cursor = tree.cursor_first().unwrap();
1062 let mut results: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
1063
1064 while let Some(entry) = cursor.next().unwrap() {
1065 results.push(entry);
1066 }
1067
1068 assert_eq!(results.len(), 3);
1069 assert_eq!(results[0], (b"a".to_vec(), b"1".to_vec()));
1070 assert_eq!(results[1], (b"b".to_vec(), b"2".to_vec()));
1071 assert_eq!(results[2], (b"c".to_vec(), b"3".to_vec()));
1072
1073 cleanup(&path);
1074 }
1075
1076 #[test]
1077 fn test_btree_range() {
1078 let path = temp_db_path();
1079 cleanup(&path);
1080
1081 let pager = Arc::new(Pager::open_default(&path).unwrap());
1082 let tree = BTree::new(pager);
1083
1084 for i in 0..10u8 {
1085 let key = format!("key{:02}", i);
1086 let value = format!("val{:02}", i);
1087 tree.insert(key.as_bytes(), value.as_bytes()).unwrap();
1088 }
1089
1090 let results = tree.range(b"key03", b"key06").unwrap();
1091
1092 assert_eq!(results.len(), 4);
1093 assert_eq!(results[0].0, b"key03".to_vec());
1094 assert_eq!(results[3].0, b"key06".to_vec());
1095
1096 cleanup(&path);
1097 }
1098
1099 #[test]
1100 fn test_btree_large_keys() {
1101 let path = temp_db_path();
1102 cleanup(&path);
1103
1104 let pager = Arc::new(Pager::open_default(&path).unwrap());
1105 let tree = BTree::new(pager);
1106
1107 let key = vec![b'x'; 500];
1108 let value = vec![b'y'; 500];
1109
1110 tree.insert(&key, &value).unwrap();
1111 assert_eq!(tree.get(&key).unwrap(), Some(value));
1112
1113 cleanup(&path);
1114 }
1115
1116 #[test]
1117 fn test_btree_key_too_large() {
1118 let path = temp_db_path();
1119 cleanup(&path);
1120
1121 let pager = Arc::new(Pager::open_default(&path).unwrap());
1122 let tree = BTree::new(pager);
1123
1124 let key = vec![b'x'; MAX_KEY_SIZE + 1];
1125 let result = tree.insert(&key, b"value");
1126
1127 assert!(matches!(result, Err(BTreeError::KeyTooLarge(_))));
1128
1129 cleanup(&path);
1130 }
1131
1132 #[test]
1133 fn test_btree_many_inserts() {
1134 let path = temp_db_path();
1135 cleanup(&path);
1136
1137 let pager = Arc::new(Pager::open_default(&path).unwrap());
1138 let tree = BTree::new(pager);
1139
1140 for i in 0..100u32 {
1142 let key = format!("key{:08}", i);
1143 let value = format!("value{:08}", i);
1144 tree.insert(key.as_bytes(), value.as_bytes()).unwrap();
1145 }
1146
1147 for i in 0..100u32 {
1149 let key = format!("key{:08}", i);
1150 let expected = format!("value{:08}", i);
1151 assert_eq!(
1152 tree.get(key.as_bytes()).unwrap(),
1153 Some(expected.into_bytes())
1154 );
1155 }
1156
1157 assert_eq!(tree.count().unwrap(), 100);
1158
1159 cleanup(&path);
1160 }
1161
1162 #[test]
1163 fn test_btree_bulk_insert_sorted_preserves_leaf_layout() {
1164 let path = temp_db_path();
1165 cleanup(&path);
1166
1167 let pager = Arc::new(Pager::open_default(&path).unwrap());
1168 let tree = BTree::new(pager);
1169
1170 let items: Vec<(Vec<u8>, Vec<u8>)> = (0..240u32)
1171 .map(|i| {
1172 (
1173 format!("key{:08}", i).into_bytes(),
1174 vec![b'v'; 32 + (i as usize % 7)],
1175 )
1176 })
1177 .collect();
1178
1179 tree.bulk_insert_sorted(&items).unwrap();
1180
1181 assert_eq!(tree.count().unwrap(), items.len());
1182
1183 for (key, value) in &items {
1184 assert_eq!(tree.get(key).unwrap(), Some(value.clone()));
1185 }
1186
1187 cleanup(&path);
1188 }
1189
1190 mod prop_upsert_batch_sorted {
1200 use super::*;
1201 use proptest::prelude::*;
1202 use std::collections::BTreeMap;
1203
1204 fn populate_tree(seed: &[(Vec<u8>, Vec<u8>)]) -> (BTree, PathBuf, Arc<Pager>) {
1207 let path = temp_db_path();
1208 cleanup(&path);
1209 let pager = Arc::new(Pager::open_default(&path).unwrap());
1210 let tree = BTree::new(Arc::clone(&pager));
1211 for (k, v) in seed {
1212 tree.upsert(k, v).unwrap();
1213 }
1214 (tree, path, pager)
1215 }
1216
1217 fn key_strategy() -> impl Strategy<Value = Vec<u8>> {
1218 prop::collection::vec(0u8..16u8, 1..=4)
1222 }
1223
1224 fn value_strategy() -> impl Strategy<Value = Vec<u8>> {
1225 prop::collection::vec(0u8..255u8, 0..=8)
1229 }
1230
1231 fn pair_strategy() -> impl Strategy<Value = (Vec<u8>, Vec<u8>)> {
1232 (key_strategy(), value_strategy())
1233 }
1234
1235 proptest! {
1236 #![proptest_config(ProptestConfig {
1237 cases: 64,
1238 .. ProptestConfig::default()
1239 })]
1240
1241 #[test]
1242 fn batch_upsert_matches_loop_upsert(
1243 seed in prop::collection::vec(pair_strategy(), 0..50),
1244 batch in prop::collection::vec(pair_strategy(), 1..200),
1245 ) {
1246 let mut seed_map: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
1249 for (k, v) in seed {
1250 seed_map.insert(k, v);
1251 }
1252 let seed: Vec<(Vec<u8>, Vec<u8>)> = seed_map.into_iter().collect();
1253
1254 let (baseline, baseline_path, _baseline_pager) = populate_tree(&seed);
1256 for (k, v) in &batch {
1257 baseline.upsert(k, v).unwrap();
1258 }
1259
1260 let (candidate, candidate_path, _candidate_pager) = populate_tree(&seed);
1263 let mut sorted = batch.clone();
1264 sorted.sort_by(|a, b| a.0.cmp(&b.0));
1265 candidate.upsert_batch_sorted(&sorted).unwrap();
1266
1267 let mut expected: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
1270 for (k, v) in &seed {
1271 expected.insert(k.clone(), v.clone());
1272 }
1273 for (k, v) in &batch {
1274 expected.insert(k.clone(), v.clone());
1275 }
1276
1277 for (k, v) in &expected {
1279 let baseline_got = baseline.get(k).unwrap();
1280 let candidate_got = candidate.get(k).unwrap();
1281 prop_assert_eq!(baseline_got.as_ref(), Some(v));
1282 prop_assert_eq!(candidate_got.as_ref(), Some(v));
1283 }
1284
1285 let collect = |t: &BTree| -> Vec<(Vec<u8>, Vec<u8>)> {
1287 let mut out = Vec::new();
1288 let mut cur = t.cursor_first().unwrap();
1289 while let Some(entry) = cur.next().unwrap() {
1290 out.push(entry);
1291 }
1292 out
1293 };
1294 let baseline_contents = collect(&baseline);
1295 let candidate_contents = collect(&candidate);
1296 prop_assert_eq!(&baseline_contents, &candidate_contents);
1297
1298 let expected_contents: Vec<(Vec<u8>, Vec<u8>)> =
1300 expected.into_iter().collect();
1301 prop_assert_eq!(&candidate_contents, &expected_contents);
1302
1303 cleanup(&baseline_path);
1304 cleanup(&candidate_path);
1305 }
1306 }
1307 }
1308
1309 #[test]
1310 fn test_btree_sorted_iteration() {
1311 let path = temp_db_path();
1312 cleanup(&path);
1313
1314 let pager = Arc::new(Pager::open_default(&path).unwrap());
1315 let tree = BTree::new(pager);
1316
1317 let keys = vec![50, 25, 75, 10, 30, 60, 80, 5, 15, 27, 35, 55, 65, 77, 90];
1319 for k in &keys {
1320 let key = format!("{:03}", k);
1321 tree.insert(key.as_bytes(), key.as_bytes()).unwrap();
1322 }
1323
1324 let mut cursor = tree.cursor_first().unwrap();
1326 let mut prev: Option<Vec<u8>> = None;
1327
1328 while let Some((key, _)) = cursor.next().unwrap() {
1329 if let Some(p) = &prev {
1330 assert!(p < &key, "Keys not in sorted order");
1331 }
1332 prev = Some(key);
1333 }
1334
1335 cleanup(&path);
1336 }
1337}