1use std::cmp::Ordering;
33use std::sync::{Arc, RwLock};
34
35use super::page::{Page, PageType, HEADER_SIZE, PAGE_SIZE};
36use super::pager::{Pager, PagerError};
37
38pub const MAX_KEY_SIZE: usize = 1024;
40
41pub const MAX_VALUE_SIZE: usize = 1024;
43
44pub const MIN_FILL_FACTOR: usize = 25;
46
47const LEAF_PREV_OFFSET: usize = HEADER_SIZE;
49
50const LEAF_NEXT_OFFSET: usize = HEADER_SIZE + 4;
52
53const LEAF_SLOT_ARRAY_OFFSET: usize = HEADER_SIZE + 8;
68
69const LEAF_DATA_OFFSET: usize = LEAF_SLOT_ARRAY_OFFSET;
72
73const INTERIOR_DATA_OFFSET: usize = HEADER_SIZE;
75
76#[derive(Debug, Clone)]
78pub enum BTreeError {
79 NotFound,
81 DuplicateKey,
83 KeyTooLarge(usize),
85 ValueTooLarge(usize),
87 Corrupted(String),
89 Pager(String),
91 LockPoisoned(String),
93}
94
95impl std::fmt::Display for BTreeError {
96 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 match self {
98 Self::NotFound => write!(f, "Key not found"),
99 Self::DuplicateKey => write!(f, "Key already exists"),
100 Self::KeyTooLarge(size) => {
101 write!(f, "Key too large: {} bytes (max {})", size, MAX_KEY_SIZE)
102 }
103 Self::ValueTooLarge(size) => write!(
104 f,
105 "Value too large: {} bytes (max {})",
106 size, MAX_VALUE_SIZE
107 ),
108 Self::Corrupted(msg) => write!(f, "B-tree corrupted: {}", msg),
109 Self::Pager(msg) => write!(f, "Pager error: {}", msg),
110 Self::LockPoisoned(msg) => write!(f, "Lock poisoned: {}", msg),
111 }
112 }
113}
114
115impl std::error::Error for BTreeError {}
116
117impl From<PagerError> for BTreeError {
118 fn from(e: PagerError) -> Self {
119 Self::Pager(e.to_string())
120 }
121}
122
123impl From<super::page::PageError> for BTreeError {
124 fn from(e: super::page::PageError) -> Self {
125 Self::Corrupted(e.to_string())
126 }
127}
128
129pub type BTreeResult<T> = Result<T, BTreeError>;
131
132pub struct BTreeCursor {
134 leaf_page_id: u32,
136 position: usize,
138 pager: Arc<Pager>,
140 prefetched_next: bool,
144}
145
146impl BTreeCursor {
147 pub fn next(&mut self) -> BTreeResult<Option<(Vec<u8>, Vec<u8>)>> {
149 if self.leaf_page_id == 0 {
150 return Ok(None);
151 }
152
153 let page = self.pager.read_page(self.leaf_page_id)?;
154 let cell_count = page.cell_count() as usize;
155
156 if !self.prefetched_next && self.position >= cell_count / 2 && cell_count > 0 {
162 let next_leaf = read_next_leaf(&page);
163 if next_leaf != 0 {
164 self.pager.prefetch_hint(next_leaf);
165 }
166 self.prefetched_next = true;
167 }
168
169 if self.position < cell_count {
171 let (key, value) = read_leaf_cell(&page, self.position)?;
172 self.position += 1;
173 return Ok(Some((key, value)));
174 }
175
176 let next_leaf = read_next_leaf(&page);
178 if next_leaf == 0 {
179 self.leaf_page_id = 0;
180 return Ok(None);
181 }
182
183 self.leaf_page_id = next_leaf;
184 self.position = 0;
185 self.prefetched_next = false; let page = self.pager.read_page(self.leaf_page_id)?;
189 if page.cell_count() == 0 {
190 return Ok(None);
191 }
192
193 let (key, value) = read_leaf_cell(&page, 0)?;
194 self.position = 1;
195 Ok(Some((key, value)))
196 }
197
198 pub fn peek(&self) -> BTreeResult<Option<(Vec<u8>, Vec<u8>)>> {
200 if self.leaf_page_id == 0 {
201 return Ok(None);
202 }
203
204 let page = self.pager.read_page(self.leaf_page_id)?;
205 let cell_count = page.cell_count() as usize;
206
207 if self.position >= cell_count {
208 return Ok(None);
209 }
210
211 let (key, value) = read_leaf_cell(&page, self.position)?;
212 Ok(Some((key, value)))
213 }
214}
215
216pub struct BTree {
218 pager: Arc<Pager>,
220 root_page_id: RwLock<u32>,
222 rightmost_leaf: RwLock<Option<(u32, Vec<u8>)>>,
231}
232
233#[path = "btree/impl.rs"]
234mod btree_impl;
235
236#[path = "btree/lehman_yao.rs"]
237pub mod lehman_yao;
238enum SearchResult {
241 Found(usize),
242 NotFound(usize),
243}
244
245fn search_leaf(page: &Page, key: &[u8]) -> BTreeResult<SearchResult> {
246 let cell_count = page.cell_count() as usize;
247
248 let mut low = 0;
250 let mut high = cell_count;
251
252 while low < high {
253 let mid = (low + high) / 2;
254 let (cell_key, _) = read_leaf_cell(page, mid)?;
255
256 match cell_key.as_slice().cmp(key) {
257 Ordering::Less => low = mid + 1,
258 Ordering::Greater => high = mid,
259 Ordering::Equal => return Ok(SearchResult::Found(mid)),
260 }
261 }
262
263 Ok(SearchResult::NotFound(low))
264}
265
266fn find_child(page: &Page, key: &[u8]) -> BTreeResult<u32> {
267 let cell_count = page.cell_count() as usize;
268
269 for i in 0..cell_count {
271 let (cell_key, child) = read_interior_cell(page, i)?;
272 if key < cell_key.as_slice() {
273 return Ok(child);
274 }
275 }
276
277 Ok(page.right_child())
279}
280
281fn find_first_child(page: &Page) -> BTreeResult<u32> {
282 if page.cell_count() == 0 {
283 return Ok(page.right_child());
284 }
285 let (_, child) = read_interior_cell(page, 0)?;
286 Ok(child)
287}
288
289fn leaf_min_bytes() -> usize {
290 (PAGE_SIZE - LEAF_DATA_OFFSET) * MIN_FILL_FACTOR / 100
291}
292
293fn interior_min_bytes() -> usize {
294 (PAGE_SIZE - INTERIOR_DATA_OFFSET) * MIN_FILL_FACTOR / 100
295}
296
297fn leaf_entry_size(entry: &(Vec<u8>, Vec<u8>)) -> usize {
301 2 + 4 + entry.0.len() + entry.1.len()
302}
303
304fn leaf_entries_size(entries: &[(Vec<u8>, Vec<u8>)]) -> usize {
305 entries.iter().map(leaf_entry_size).sum()
306}
307
308#[inline]
309fn leaf_slot_offset_for(index: usize) -> usize {
310 LEAF_SLOT_ARRAY_OFFSET + index * 2
311}
312
313#[inline]
314fn leaf_read_slot(page: &Page, index: usize) -> BTreeResult<usize> {
315 let data = page.as_bytes();
316 let slot_pos = leaf_slot_offset_for(index);
317 if slot_pos + 2 > PAGE_SIZE {
318 return Err(BTreeError::Corrupted("slot array overflows page".into()));
319 }
320 Ok(u16::from_le_bytes([data[slot_pos], data[slot_pos + 1]]) as usize)
321}
322
323#[inline]
324fn leaf_write_slot(page: &mut Page, index: usize, cell_offset: u16) -> BTreeResult<()> {
325 let data = page.as_bytes_mut();
326 let slot_pos = leaf_slot_offset_for(index);
327 if slot_pos + 2 > PAGE_SIZE {
328 return Err(BTreeError::Corrupted("slot array overflows page".into()));
329 }
330 data[slot_pos..slot_pos + 2].copy_from_slice(&cell_offset.to_le_bytes());
331 Ok(())
332}
333
334#[inline]
335fn leaf_slots_end(page: &Page) -> usize {
336 LEAF_SLOT_ARRAY_OFFSET + (page.cell_count() as usize) * 2
337}
338
339#[inline]
340fn leaf_cells_start(page: &Page) -> usize {
341 let end = page.free_end() as usize;
342 if end == 0 {
343 PAGE_SIZE
344 } else {
345 end
346 }
347}
348
349#[inline]
350fn leaf_free_bytes(page: &Page) -> usize {
351 let slot_end = leaf_slots_end(page);
352 let cells = leaf_cells_start(page);
353 cells.saturating_sub(slot_end)
354}
355
356fn interior_key_size(key: &[u8]) -> usize {
357 2 + key.len() + 4
358}
359
360fn interior_entries_size(keys: &[Vec<u8>]) -> usize {
361 keys.iter().map(|k| interior_key_size(k)).sum()
362}
363
364fn read_leaf_cell(page: &Page, index: usize) -> BTreeResult<(Vec<u8>, Vec<u8>)> {
370 let cell_count = page.cell_count() as usize;
371 if index >= cell_count {
372 return Err(BTreeError::Corrupted("Cell index out of range".into()));
373 }
374 let offset = leaf_read_slot(page, index)?;
375 let data = page.as_bytes();
376 if offset + 4 > PAGE_SIZE {
377 return Err(BTreeError::Corrupted("cell header out of range".into()));
378 }
379 let key_len = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
380 let val_len = u16::from_le_bytes([data[offset + 2], data[offset + 3]]) as usize;
381 let end = offset + 4 + key_len + val_len;
382 if end > PAGE_SIZE {
383 return Err(BTreeError::Corrupted("cell body out of range".into()));
384 }
385 let key = data[offset + 4..offset + 4 + key_len].to_vec();
386 let value = data[offset + 4 + key_len..end].to_vec();
387 Ok((key, value))
388}
389
390fn read_leaf_entries(page: &Page) -> BTreeResult<Vec<(Vec<u8>, Vec<u8>)>> {
391 let cell_count = page.cell_count() as usize;
392 let mut entries = Vec::with_capacity(cell_count);
393 for i in 0..cell_count {
394 entries.push(read_leaf_cell(page, i)?);
395 }
396 Ok(entries)
397}
398
399fn write_leaf_entries(page: &mut Page, entries: &[(Vec<u8>, Vec<u8>)]) -> BTreeResult<()> {
405 clear_leaf_cells(page);
406 for (i, (k, v)) in entries.iter().enumerate() {
407 let cell_size = 4 + k.len() + v.len();
408 let cells_start = leaf_cells_start(page);
409 let slot_end = LEAF_SLOT_ARRAY_OFFSET + (i + 1) * 2;
410 if slot_end + cell_size > cells_start {
411 return Err(BTreeError::Corrupted("leaf rebuild overflowed page".into()));
412 }
413 let cell_offset = cells_start - cell_size;
414 {
415 let data = page.as_bytes_mut();
416 data[cell_offset..cell_offset + 2].copy_from_slice(&(k.len() as u16).to_le_bytes());
417 data[cell_offset + 2..cell_offset + 4].copy_from_slice(&(v.len() as u16).to_le_bytes());
418 data[cell_offset + 4..cell_offset + 4 + k.len()].copy_from_slice(k);
419 data[cell_offset + 4 + k.len()..cell_offset + 4 + k.len() + v.len()].copy_from_slice(v);
420 }
421 page.set_free_end(cell_offset as u16);
422 leaf_write_slot(page, i, cell_offset as u16)?;
423 }
424 page.set_cell_count(entries.len() as u16);
425 page.set_free_start((LEAF_SLOT_ARRAY_OFFSET + entries.len() * 2) as u16);
426 Ok(())
427}
428
429fn can_insert_leaf(page: &Page, key: &[u8], value: &[u8]) -> bool {
430 let needed = 2 + 4 + key.len() + value.len();
431 leaf_free_bytes(page) >= needed
432}
433
434pub(crate) fn overwrite_leaf_value(
450 page: &mut Page,
451 index: usize,
452 new_value: &[u8],
453) -> BTreeResult<()> {
454 let cell_offset = leaf_read_slot(page, index)?;
455 let data = page.as_bytes();
456 if cell_offset + 4 > data.len() {
457 return Err(BTreeError::Corrupted(
458 "leaf cell header out of bounds".into(),
459 ));
460 }
461 let k_len = u16::from_le_bytes([data[cell_offset], data[cell_offset + 1]]) as usize;
462 let old_v_len = u16::from_le_bytes([data[cell_offset + 2], data[cell_offset + 3]]) as usize;
463 if new_value.len() > old_v_len {
464 return Err(BTreeError::Corrupted(
465 "overwrite_leaf_value: new value larger than slot".into(),
466 ));
467 }
468 let value_start = cell_offset + 4 + k_len;
469 if value_start + new_value.len() > data.len() {
470 return Err(BTreeError::Corrupted(
471 "leaf cell value out of bounds".into(),
472 ));
473 }
474 let data = page.as_bytes_mut();
475 if new_value.len() != old_v_len {
478 let new_len = new_value.len() as u16;
479 data[cell_offset + 2..cell_offset + 4].copy_from_slice(&new_len.to_le_bytes());
480 }
481 data[value_start..value_start + new_value.len()].copy_from_slice(new_value);
482 Ok(())
483}
484
485fn insert_into_leaf(page: &mut Page, key: &[u8], value: &[u8]) -> BTreeResult<()> {
487 let cell_count = page.cell_count() as usize;
489 let mut low = 0;
490 let mut high = cell_count;
491 while low < high {
492 let mid = (low + high) / 2;
493 let (cell_key, _) = read_leaf_cell(page, mid)?;
494 match cell_key.as_slice().cmp(key) {
495 Ordering::Less => low = mid + 1,
496 Ordering::Greater => high = mid,
497 Ordering::Equal => {
498 low = mid + 1;
501 break;
502 }
503 }
504 }
505 let insert_pos = low;
506
507 let cell_size = 4 + key.len() + value.len();
509 let slot_end_after = LEAF_SLOT_ARRAY_OFFSET + (cell_count + 1) * 2;
510 let cells_start = leaf_cells_start(page);
511 if slot_end_after + cell_size > cells_start {
512 return Err(BTreeError::Corrupted("leaf page full".into()));
513 }
514 let cell_offset = cells_start - cell_size;
515 {
516 let data = page.as_bytes_mut();
517 data[cell_offset..cell_offset + 2].copy_from_slice(&(key.len() as u16).to_le_bytes());
518 data[cell_offset + 2..cell_offset + 4].copy_from_slice(&(value.len() as u16).to_le_bytes());
519 data[cell_offset + 4..cell_offset + 4 + key.len()].copy_from_slice(key);
520 data[cell_offset + 4 + key.len()..cell_offset + 4 + key.len() + value.len()]
521 .copy_from_slice(value);
522 }
523 page.set_free_end(cell_offset as u16);
524
525 {
529 let shift_from = leaf_slot_offset_for(insert_pos);
530 let shift_to = shift_from + 2;
531 let shift_len = (cell_count - insert_pos) * 2;
532 if shift_len > 0 {
533 let data = page.as_bytes_mut();
534 data.copy_within(shift_from..shift_from + shift_len, shift_to);
535 }
536 }
537 leaf_write_slot(page, insert_pos, cell_offset as u16)?;
538
539 page.set_cell_count((cell_count + 1) as u16);
541 page.set_free_start((LEAF_SLOT_ARRAY_OFFSET + (cell_count + 1) * 2) as u16);
542 Ok(())
543}
544
545fn delete_from_leaf(page: &mut Page, index: usize) -> BTreeResult<()> {
550 let cell_count = page.cell_count() as usize;
551 if index >= cell_count {
552 return Err(BTreeError::Corrupted("delete index out of range".into()));
553 }
554 if index + 1 < cell_count {
555 let shift_from = leaf_slot_offset_for(index + 1);
556 let shift_to = leaf_slot_offset_for(index);
557 let shift_len = (cell_count - index - 1) * 2;
558 let data = page.as_bytes_mut();
559 data.copy_within(shift_from..shift_from + shift_len, shift_to);
560 }
561 page.set_cell_count((cell_count - 1) as u16);
562 page.set_free_start((LEAF_SLOT_ARRAY_OFFSET + (cell_count - 1) * 2) as u16);
563 Ok(())
564}
565
566fn clear_leaf_cells(page: &mut Page) {
570 {
571 let data = page.as_bytes_mut();
572 for byte in &mut data[LEAF_SLOT_ARRAY_OFFSET..] {
573 *byte = 0;
574 }
575 }
576 page.set_cell_count(0);
577 page.set_free_start(LEAF_SLOT_ARRAY_OFFSET as u16);
578 page.set_free_end(PAGE_SIZE as u16);
579}
580
581fn init_leaf_links(page: &mut Page, prev: u32, next: u32) {
582 let data = page.as_bytes_mut();
583 data[LEAF_PREV_OFFSET..LEAF_PREV_OFFSET + 4].copy_from_slice(&prev.to_le_bytes());
584 data[LEAF_NEXT_OFFSET..LEAF_NEXT_OFFSET + 4].copy_from_slice(&next.to_le_bytes());
585}
586
587fn read_next_leaf(page: &Page) -> u32 {
588 let data = page.as_bytes();
589 u32::from_le_bytes([
590 data[LEAF_NEXT_OFFSET],
591 data[LEAF_NEXT_OFFSET + 1],
592 data[LEAF_NEXT_OFFSET + 2],
593 data[LEAF_NEXT_OFFSET + 3],
594 ])
595}
596
597#[inline]
600fn leaf_right_sibling(page: &Page) -> u32 {
601 read_next_leaf(page)
602}
603
604fn leaf_high_key(page: &Page) -> BTreeResult<Option<Vec<u8>>> {
608 let cell_count = page.cell_count() as usize;
609 if cell_count == 0 {
610 return Ok(None);
611 }
612 let (key, _) = read_leaf_cell(page, cell_count - 1)?;
613 Ok(Some(key))
614}
615
616fn set_prev_leaf(page: &mut Page, prev: u32) {
617 let data = page.as_bytes_mut();
618 data[LEAF_PREV_OFFSET..LEAF_PREV_OFFSET + 4].copy_from_slice(&prev.to_le_bytes());
619}
620
621fn set_next_leaf(page: &mut Page, next: u32) {
622 let data = page.as_bytes_mut();
623 data[LEAF_NEXT_OFFSET..LEAF_NEXT_OFFSET + 4].copy_from_slice(&next.to_le_bytes());
624}
625
626#[inline]
641fn interior_slot_offset_for(index: usize) -> usize {
642 INTERIOR_DATA_OFFSET + index * 2
643}
644
645#[inline]
646fn interior_read_slot(page: &Page, index: usize) -> BTreeResult<usize> {
647 let data = page.as_bytes();
648 let slot_pos = interior_slot_offset_for(index);
649 if slot_pos + 2 > PAGE_SIZE {
650 return Err(BTreeError::Corrupted(
651 "interior slot array overflows page".into(),
652 ));
653 }
654 Ok(u16::from_le_bytes([data[slot_pos], data[slot_pos + 1]]) as usize)
655}
656
657#[inline]
658fn interior_write_slot(page: &mut Page, index: usize, cell_offset: u16) -> BTreeResult<()> {
659 let data = page.as_bytes_mut();
660 let slot_pos = interior_slot_offset_for(index);
661 if slot_pos + 2 > PAGE_SIZE {
662 return Err(BTreeError::Corrupted(
663 "interior slot array overflows page".into(),
664 ));
665 }
666 data[slot_pos..slot_pos + 2].copy_from_slice(&cell_offset.to_le_bytes());
667 Ok(())
668}
669
670#[inline]
671fn interior_cells_start(page: &Page) -> usize {
672 let end = page.free_end() as usize;
673 if end == 0 {
674 PAGE_SIZE
675 } else {
676 end
677 }
678}
679
680#[inline]
681fn interior_free_bytes(page: &Page) -> usize {
682 let slot_end = INTERIOR_DATA_OFFSET + (page.cell_count() as usize) * 2;
683 interior_cells_start(page).saturating_sub(slot_end)
684}
685
686fn read_interior_cell(page: &Page, index: usize) -> BTreeResult<(Vec<u8>, u32)> {
687 let cell_count = page.cell_count() as usize;
688 if index >= cell_count {
689 return Err(BTreeError::Corrupted("Cell index out of range".into()));
690 }
691 let offset = interior_read_slot(page, index)?;
692 let data = page.as_bytes();
693 if offset + 2 > PAGE_SIZE {
694 return Err(BTreeError::Corrupted(
695 "interior cell header out of range".into(),
696 ));
697 }
698 let key_len = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
699 let end = offset + 2 + key_len + 4;
700 if end > PAGE_SIZE {
701 return Err(BTreeError::Corrupted(
702 "interior cell body out of range".into(),
703 ));
704 }
705 let key = data[offset + 2..offset + 2 + key_len].to_vec();
706 let child = u32::from_le_bytes([
707 data[offset + 2 + key_len],
708 data[offset + 2 + key_len + 1],
709 data[offset + 2 + key_len + 2],
710 data[offset + 2 + key_len + 3],
711 ]);
712 Ok((key, child))
713}
714
715fn read_interior_keys_children(page: &Page) -> BTreeResult<(Vec<Vec<u8>>, Vec<u32>)> {
716 let cell_count = page.cell_count() as usize;
717 let mut keys = Vec::with_capacity(cell_count);
718 let mut children = Vec::with_capacity(cell_count + 1);
719
720 for i in 0..cell_count {
721 let (key, child) = read_interior_cell(page, i)?;
722 keys.push(key);
723 children.push(child);
724 }
725
726 if cell_count == 0 {
727 let right_child = page.right_child();
728 if right_child != 0 {
729 children.push(right_child);
730 }
731 } else {
732 children.push(page.right_child());
733 }
734
735 Ok((keys, children))
736}
737
738fn write_interior_entries(page: &mut Page, keys: &[Vec<u8>], children: &[u32]) -> BTreeResult<()> {
743 if !keys.is_empty() && children.len() != keys.len() + 1 {
744 return Err(BTreeError::Corrupted(
745 "Interior keys/children length mismatch".into(),
746 ));
747 }
748
749 clear_interior_cells(page);
750 if keys.is_empty() {
751 let right_child = children.first().copied().unwrap_or(0);
752 page.set_right_child(right_child);
753 return Ok(());
754 }
755
756 for (i, key) in keys.iter().enumerate() {
757 let cell_size = 2 + key.len() + 4;
758 let cells_start = interior_cells_start(page);
759 let slot_end = INTERIOR_DATA_OFFSET + (i + 1) * 2;
760 if slot_end + cell_size > cells_start {
761 return Err(BTreeError::Corrupted(
762 "interior rebuild overflowed page".into(),
763 ));
764 }
765 let cell_offset = cells_start - cell_size;
766 {
767 let data = page.as_bytes_mut();
768 data[cell_offset..cell_offset + 2].copy_from_slice(&(key.len() as u16).to_le_bytes());
769 data[cell_offset + 2..cell_offset + 2 + key.len()].copy_from_slice(key);
770 data[cell_offset + 2 + key.len()..cell_offset + 2 + key.len() + 4]
771 .copy_from_slice(&children[i].to_le_bytes());
772 }
773 page.set_free_end(cell_offset as u16);
774 interior_write_slot(page, i, cell_offset as u16)?;
775 }
776 page.set_cell_count(keys.len() as u16);
777 page.set_free_start((INTERIOR_DATA_OFFSET + keys.len() * 2) as u16);
778 page.set_right_child(*children.last().ok_or_else(|| {
779 BTreeError::Corrupted("write_interior_entries: children empty with non-empty keys".into())
780 })?);
781 Ok(())
782}
783
784fn can_insert_interior(page: &Page, key: &[u8]) -> bool {
785 let needed = 2 + 2 + key.len() + 4;
786 interior_free_bytes(page) >= needed
787}
788
789fn insert_into_interior(
794 page: &mut Page,
795 key: &[u8],
796 left_child: u32,
797 right_child: u32,
798) -> BTreeResult<()> {
799 let cell_count = page.cell_count() as usize;
801 let mut low = 0;
802 let mut high = cell_count;
803 while low < high {
804 let mid = (low + high) / 2;
805 let (cell_key, _) = read_interior_cell(page, mid)?;
806 match cell_key.as_slice().cmp(key) {
807 Ordering::Less => low = mid + 1,
808 Ordering::Greater | Ordering::Equal => high = mid,
809 }
810 }
811 let insert_pos = low;
812
813 if insert_pos < cell_count {
818 let old_offset = interior_read_slot(page, insert_pos)?;
819 let data = page.as_bytes();
820 let key_len = u16::from_le_bytes([data[old_offset], data[old_offset + 1]]) as usize;
821 let child_pos = old_offset + 2 + key_len;
822 let data = page.as_bytes_mut();
823 data[child_pos..child_pos + 4].copy_from_slice(&right_child.to_le_bytes());
824 } else {
825 page.set_right_child(right_child);
826 }
827
828 let cell_size = 2 + key.len() + 4;
830 let slot_end_after = INTERIOR_DATA_OFFSET + (cell_count + 1) * 2;
831 let cells_start = interior_cells_start(page);
832 if slot_end_after + cell_size > cells_start {
833 return Err(BTreeError::Corrupted("interior page full".into()));
834 }
835 let cell_offset = cells_start - cell_size;
836 {
837 let data = page.as_bytes_mut();
838 data[cell_offset..cell_offset + 2].copy_from_slice(&(key.len() as u16).to_le_bytes());
839 data[cell_offset + 2..cell_offset + 2 + key.len()].copy_from_slice(key);
840 data[cell_offset + 2 + key.len()..cell_offset + 2 + key.len() + 4]
841 .copy_from_slice(&left_child.to_le_bytes());
842 }
843 page.set_free_end(cell_offset as u16);
844
845 {
848 let shift_from = interior_slot_offset_for(insert_pos);
849 let shift_to = shift_from + 2;
850 let shift_len = (cell_count - insert_pos) * 2;
851 if shift_len > 0 {
852 let data = page.as_bytes_mut();
853 data.copy_within(shift_from..shift_from + shift_len, shift_to);
854 }
855 }
856 interior_write_slot(page, insert_pos, cell_offset as u16)?;
857 page.set_cell_count((cell_count + 1) as u16);
858 page.set_free_start((INTERIOR_DATA_OFFSET + (cell_count + 1) * 2) as u16);
859 Ok(())
860}
861
862fn clear_interior_cells(page: &mut Page) {
863 {
864 let data = page.as_bytes_mut();
865 for byte in &mut data[INTERIOR_DATA_OFFSET..] {
866 *byte = 0;
867 }
868 }
869 page.set_cell_count(0);
870 page.set_free_start(INTERIOR_DATA_OFFSET as u16);
871 page.set_free_end(PAGE_SIZE as u16);
872}
873
874#[cfg(test)]
875mod tests {
876 use super::*;
877 use std::path::PathBuf;
878
879 fn temp_db_path() -> PathBuf {
880 use std::sync::atomic::{AtomicU64, Ordering};
881 static COUNTER: AtomicU64 = AtomicU64::new(0);
882 let id = COUNTER.fetch_add(1, Ordering::Relaxed);
883 let mut path = std::env::temp_dir();
884 path.push(format!("reddb_btree_test_{}_{}.db", std::process::id(), id));
885 path
886 }
887
888 fn cleanup(path: &PathBuf) {
889 let _ = std::fs::remove_file(path);
890 }
891
892 #[test]
893 fn test_btree_empty() {
894 let path = temp_db_path();
895 cleanup(&path);
896
897 let pager = Arc::new(Pager::open_default(&path).unwrap());
898 let tree = BTree::new(pager);
899
900 assert!(tree.is_empty());
901 assert_eq!(tree.root_page_id(), 0);
902 assert_eq!(tree.get(b"key").unwrap(), None);
903 assert_eq!(tree.count().unwrap(), 0);
904
905 cleanup(&path);
906 }
907
908 #[test]
909 fn test_btree_single_insert() {
910 let path = temp_db_path();
911 cleanup(&path);
912
913 let pager = Arc::new(Pager::open_default(&path).unwrap());
914 let tree = BTree::new(pager);
915
916 tree.insert(b"hello", b"world").unwrap();
917
918 assert!(!tree.is_empty());
919 assert_eq!(tree.get(b"hello").unwrap(), Some(b"world".to_vec()));
920 assert_eq!(tree.get(b"other").unwrap(), None);
921 assert_eq!(tree.count().unwrap(), 1);
922
923 cleanup(&path);
924 }
925
926 #[test]
927 fn test_btree_multiple_inserts() {
928 let path = temp_db_path();
929 cleanup(&path);
930
931 let pager = Arc::new(Pager::open_default(&path).unwrap());
932 let tree = BTree::new(pager);
933
934 tree.insert(b"c", b"3").unwrap();
935 tree.insert(b"a", b"1").unwrap();
936 tree.insert(b"b", b"2").unwrap();
937
938 assert_eq!(tree.get(b"a").unwrap(), Some(b"1".to_vec()));
939 assert_eq!(tree.get(b"b").unwrap(), Some(b"2".to_vec()));
940 assert_eq!(tree.get(b"c").unwrap(), Some(b"3".to_vec()));
941 assert_eq!(tree.count().unwrap(), 3);
942
943 cleanup(&path);
944 }
945
946 #[test]
947 fn test_btree_duplicate_key() {
948 let path = temp_db_path();
949 cleanup(&path);
950
951 let pager = Arc::new(Pager::open_default(&path).unwrap());
952 let tree = BTree::new(pager);
953
954 tree.insert(b"key", b"value1").unwrap();
955 let result = tree.insert(b"key", b"value2");
956
957 assert!(matches!(result, Err(BTreeError::DuplicateKey)));
958 assert_eq!(tree.get(b"key").unwrap(), Some(b"value1".to_vec()));
959
960 cleanup(&path);
961 }
962
963 #[test]
964 fn test_btree_delete() {
965 let path = temp_db_path();
966 cleanup(&path);
967
968 let pager = Arc::new(Pager::open_default(&path).unwrap());
969 let tree = BTree::new(pager);
970
971 tree.insert(b"a", b"1").unwrap();
972 tree.insert(b"b", b"2").unwrap();
973 tree.insert(b"c", b"3").unwrap();
974
975 assert!(tree.delete(b"b").unwrap());
976 assert!(!tree.delete(b"d").unwrap());
977
978 assert_eq!(tree.get(b"a").unwrap(), Some(b"1".to_vec()));
979 assert_eq!(tree.get(b"b").unwrap(), None);
980 assert_eq!(tree.get(b"c").unwrap(), Some(b"3".to_vec()));
981 assert_eq!(tree.count().unwrap(), 2);
982
983 cleanup(&path);
984 }
985
986 #[test]
987 fn test_btree_delete_rebalance_removes_empty_leaf() {
988 let path = temp_db_path();
989 cleanup(&path);
990
991 let pager = Arc::new(Pager::open_default(&path).unwrap());
992 let tree = BTree::new(pager.clone());
993
994 let value = vec![b'v'; 200];
995 for i in 0..60u32 {
996 let key = format!("key{:03}", i);
997 tree.insert(key.as_bytes(), &value).unwrap();
998 }
999
1000 let root_id = tree.root_page_id();
1001 let first_leaf = tree.find_first_leaf(root_id).unwrap();
1002 let mut leaf_ids = Vec::new();
1003 let mut current = first_leaf;
1004 loop {
1005 leaf_ids.push(current);
1006 let page = pager.read_page(current).unwrap();
1007 let next = read_next_leaf(&page);
1008 if next == 0 {
1009 break;
1010 }
1011 current = next;
1012 }
1013
1014 assert!(leaf_ids.len() >= 3);
1015
1016 let target_leaf = leaf_ids[1];
1017 let page = pager.read_page(target_leaf).unwrap();
1018 let cell_count = page.cell_count() as usize;
1019 let mut keys = Vec::with_capacity(cell_count);
1020 for i in 0..cell_count {
1021 let (key, _) = read_leaf_cell(&page, i).unwrap();
1022 keys.push(key);
1023 }
1024
1025 for key in &keys {
1026 tree.delete(key).unwrap();
1027 }
1028
1029 let expected = 60 - keys.len();
1030 assert_eq!(tree.count().unwrap(), expected);
1031
1032 let mut cursor = tree.cursor_first().unwrap();
1033 let mut results = Vec::new();
1034 while let Some((key, _)) = cursor.next().unwrap() {
1035 results.push(key);
1036 }
1037
1038 assert_eq!(results.len(), expected);
1039 let last_key = format!("key{:03}", 59).into_bytes();
1040 assert_eq!(results.last(), Some(&last_key));
1041
1042 cleanup(&path);
1043 }
1044
1045 #[test]
1046 fn test_btree_cursor() {
1047 let path = temp_db_path();
1048 cleanup(&path);
1049
1050 let pager = Arc::new(Pager::open_default(&path).unwrap());
1051 let tree = BTree::new(pager);
1052
1053 tree.insert(b"c", b"3").unwrap();
1054 tree.insert(b"a", b"1").unwrap();
1055 tree.insert(b"b", b"2").unwrap();
1056
1057 let mut cursor = tree.cursor_first().unwrap();
1058 let mut results: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
1059
1060 while let Some(entry) = cursor.next().unwrap() {
1061 results.push(entry);
1062 }
1063
1064 assert_eq!(results.len(), 3);
1065 assert_eq!(results[0], (b"a".to_vec(), b"1".to_vec()));
1066 assert_eq!(results[1], (b"b".to_vec(), b"2".to_vec()));
1067 assert_eq!(results[2], (b"c".to_vec(), b"3".to_vec()));
1068
1069 cleanup(&path);
1070 }
1071
1072 #[test]
1073 fn test_btree_range() {
1074 let path = temp_db_path();
1075 cleanup(&path);
1076
1077 let pager = Arc::new(Pager::open_default(&path).unwrap());
1078 let tree = BTree::new(pager);
1079
1080 for i in 0..10u8 {
1081 let key = format!("key{:02}", i);
1082 let value = format!("val{:02}", i);
1083 tree.insert(key.as_bytes(), value.as_bytes()).unwrap();
1084 }
1085
1086 let results = tree.range(b"key03", b"key06").unwrap();
1087
1088 assert_eq!(results.len(), 4);
1089 assert_eq!(results[0].0, b"key03".to_vec());
1090 assert_eq!(results[3].0, b"key06".to_vec());
1091
1092 cleanup(&path);
1093 }
1094
1095 #[test]
1096 fn test_btree_large_keys() {
1097 let path = temp_db_path();
1098 cleanup(&path);
1099
1100 let pager = Arc::new(Pager::open_default(&path).unwrap());
1101 let tree = BTree::new(pager);
1102
1103 let key = vec![b'x'; 500];
1104 let value = vec![b'y'; 500];
1105
1106 tree.insert(&key, &value).unwrap();
1107 assert_eq!(tree.get(&key).unwrap(), Some(value));
1108
1109 cleanup(&path);
1110 }
1111
1112 #[test]
1113 fn test_btree_key_too_large() {
1114 let path = temp_db_path();
1115 cleanup(&path);
1116
1117 let pager = Arc::new(Pager::open_default(&path).unwrap());
1118 let tree = BTree::new(pager);
1119
1120 let key = vec![b'x'; MAX_KEY_SIZE + 1];
1121 let result = tree.insert(&key, b"value");
1122
1123 assert!(matches!(result, Err(BTreeError::KeyTooLarge(_))));
1124
1125 cleanup(&path);
1126 }
1127
1128 #[test]
1129 fn test_btree_many_inserts() {
1130 let path = temp_db_path();
1131 cleanup(&path);
1132
1133 let pager = Arc::new(Pager::open_default(&path).unwrap());
1134 let tree = BTree::new(pager);
1135
1136 for i in 0..100u32 {
1138 let key = format!("key{:08}", i);
1139 let value = format!("value{:08}", i);
1140 tree.insert(key.as_bytes(), value.as_bytes()).unwrap();
1141 }
1142
1143 for i in 0..100u32 {
1145 let key = format!("key{:08}", i);
1146 let expected = format!("value{:08}", i);
1147 assert_eq!(
1148 tree.get(key.as_bytes()).unwrap(),
1149 Some(expected.into_bytes())
1150 );
1151 }
1152
1153 assert_eq!(tree.count().unwrap(), 100);
1154
1155 cleanup(&path);
1156 }
1157
1158 #[test]
1159 fn test_btree_bulk_insert_sorted_preserves_leaf_layout() {
1160 let path = temp_db_path();
1161 cleanup(&path);
1162
1163 let pager = Arc::new(Pager::open_default(&path).unwrap());
1164 let tree = BTree::new(pager);
1165
1166 let items: Vec<(Vec<u8>, Vec<u8>)> = (0..240u32)
1167 .map(|i| {
1168 (
1169 format!("key{:08}", i).into_bytes(),
1170 vec![b'v'; 32 + (i as usize % 7)],
1171 )
1172 })
1173 .collect();
1174
1175 tree.bulk_insert_sorted(&items).unwrap();
1176
1177 assert_eq!(tree.count().unwrap(), items.len());
1178
1179 for (key, value) in &items {
1180 assert_eq!(tree.get(key).unwrap(), Some(value.clone()));
1181 }
1182
1183 cleanup(&path);
1184 }
1185
1186 mod prop_upsert_batch_sorted {
1196 use super::*;
1197 use proptest::prelude::*;
1198 use std::collections::BTreeMap;
1199
1200 fn populate_tree(seed: &[(Vec<u8>, Vec<u8>)]) -> (BTree, PathBuf, Arc<Pager>) {
1203 let path = temp_db_path();
1204 cleanup(&path);
1205 let pager = Arc::new(Pager::open_default(&path).unwrap());
1206 let tree = BTree::new(Arc::clone(&pager));
1207 for (k, v) in seed {
1208 tree.upsert(k, v).unwrap();
1209 }
1210 (tree, path, pager)
1211 }
1212
1213 fn key_strategy() -> impl Strategy<Value = Vec<u8>> {
1214 prop::collection::vec(0u8..16u8, 1..=4)
1218 }
1219
1220 fn value_strategy() -> impl Strategy<Value = Vec<u8>> {
1221 prop::collection::vec(0u8..255u8, 0..=8)
1225 }
1226
1227 fn pair_strategy() -> impl Strategy<Value = (Vec<u8>, Vec<u8>)> {
1228 (key_strategy(), value_strategy())
1229 }
1230
1231 proptest! {
1232 #![proptest_config(ProptestConfig {
1233 cases: 64,
1234 .. ProptestConfig::default()
1235 })]
1236
1237 #[test]
1238 fn batch_upsert_matches_loop_upsert(
1239 seed in prop::collection::vec(pair_strategy(), 0..50),
1240 batch in prop::collection::vec(pair_strategy(), 1..200),
1241 ) {
1242 let mut seed_map: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
1245 for (k, v) in seed {
1246 seed_map.insert(k, v);
1247 }
1248 let seed: Vec<(Vec<u8>, Vec<u8>)> = seed_map.into_iter().collect();
1249
1250 let (baseline, baseline_path, _baseline_pager) = populate_tree(&seed);
1252 for (k, v) in &batch {
1253 baseline.upsert(k, v).unwrap();
1254 }
1255
1256 let (candidate, candidate_path, _candidate_pager) = populate_tree(&seed);
1259 let mut sorted = batch.clone();
1260 sorted.sort_by(|a, b| a.0.cmp(&b.0));
1261 candidate.upsert_batch_sorted(&sorted).unwrap();
1262
1263 let mut expected: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
1266 for (k, v) in &seed {
1267 expected.insert(k.clone(), v.clone());
1268 }
1269 for (k, v) in &batch {
1270 expected.insert(k.clone(), v.clone());
1271 }
1272
1273 for (k, v) in &expected {
1275 let baseline_got = baseline.get(k).unwrap();
1276 let candidate_got = candidate.get(k).unwrap();
1277 prop_assert_eq!(baseline_got.as_ref(), Some(v));
1278 prop_assert_eq!(candidate_got.as_ref(), Some(v));
1279 }
1280
1281 let collect = |t: &BTree| -> Vec<(Vec<u8>, Vec<u8>)> {
1283 let mut out = Vec::new();
1284 let mut cur = t.cursor_first().unwrap();
1285 while let Some(entry) = cur.next().unwrap() {
1286 out.push(entry);
1287 }
1288 out
1289 };
1290 let baseline_contents = collect(&baseline);
1291 let candidate_contents = collect(&candidate);
1292 prop_assert_eq!(&baseline_contents, &candidate_contents);
1293
1294 let expected_contents: Vec<(Vec<u8>, Vec<u8>)> =
1296 expected.into_iter().collect();
1297 prop_assert_eq!(&candidate_contents, &expected_contents);
1298
1299 cleanup(&baseline_path);
1300 cleanup(&candidate_path);
1301 }
1302 }
1303 }
1304
1305 #[test]
1306 fn test_btree_sorted_iteration() {
1307 let path = temp_db_path();
1308 cleanup(&path);
1309
1310 let pager = Arc::new(Pager::open_default(&path).unwrap());
1311 let tree = BTree::new(pager);
1312
1313 let keys = vec![50, 25, 75, 10, 30, 60, 80, 5, 15, 27, 35, 55, 65, 77, 90];
1315 for k in &keys {
1316 let key = format!("{:03}", k);
1317 tree.insert(key.as_bytes(), key.as_bytes()).unwrap();
1318 }
1319
1320 let mut cursor = tree.cursor_first().unwrap();
1322 let mut prev: Option<Vec<u8>> = None;
1323
1324 while let Some((key, _)) = cursor.next().unwrap() {
1325 if let Some(p) = &prev {
1326 assert!(p < &key, "Keys not in sorted order");
1327 }
1328 prev = Some(key);
1329 }
1330
1331 cleanup(&path);
1332 }
1333}