1use std::cmp::Ordering;
33use std::sync::{Arc, RwLock};
34
35use super::page::{Page, PageType, HEADER_SIZE, PAGE_SIZE};
36use super::pager::{Pager, PagerError};
37
38pub const MAX_KEY_SIZE: usize = 1024;
40
41pub const MAX_VALUE_SIZE: usize = value_layout::MAX_VALUE_SIZE;
54
55pub const MIN_FILL_FACTOR: usize = 25;
57
58const LEAF_PREV_OFFSET: usize = HEADER_SIZE;
60
61const LEAF_NEXT_OFFSET: usize = HEADER_SIZE + 4;
63
64const LEAF_SLOT_ARRAY_OFFSET: usize = HEADER_SIZE + 8;
79
80const LEAF_DATA_OFFSET: usize = LEAF_SLOT_ARRAY_OFFSET;
83
84const INTERIOR_DATA_OFFSET: usize = HEADER_SIZE;
86
87#[derive(Debug, Clone)]
89pub enum BTreeError {
90 NotFound,
92 DuplicateKey,
94 KeyTooLarge(usize),
96 ValueTooLarge(usize),
98 Corrupted(String),
100 Pager(String),
102 LockPoisoned(String),
104 ValueLayout(String),
106}
107
108impl std::fmt::Display for BTreeError {
109 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110 match self {
111 Self::NotFound => write!(f, "Key not found"),
112 Self::DuplicateKey => write!(f, "Key already exists"),
113 Self::KeyTooLarge(size) => {
114 write!(f, "Key too large: {} bytes (max {})", size, MAX_KEY_SIZE)
115 }
116 Self::ValueTooLarge(size) => write!(
117 f,
118 "Value too large: {} bytes (max {})",
119 size, MAX_VALUE_SIZE
120 ),
121 Self::Corrupted(msg) => write!(f, "B-tree corrupted: {}", msg),
122 Self::Pager(msg) => write!(f, "Pager error: {}", msg),
123 Self::LockPoisoned(msg) => write!(f, "Lock poisoned: {}", msg),
124 Self::ValueLayout(msg) => write!(f, "Value layout: {}", msg),
125 }
126 }
127}
128
129impl From<value_layout::ValueLayoutError> for BTreeError {
130 fn from(e: value_layout::ValueLayoutError) -> Self {
131 match e {
132 value_layout::ValueLayoutError::ValueTooLarge(n) => Self::ValueTooLarge(n),
133 other => Self::ValueLayout(other.to_string()),
134 }
135 }
136}
137
138impl std::error::Error for BTreeError {}
139
140impl From<PagerError> for BTreeError {
141 fn from(e: PagerError) -> Self {
142 Self::Pager(e.to_string())
143 }
144}
145
146impl From<super::page::PageError> for BTreeError {
147 fn from(e: super::page::PageError) -> Self {
148 Self::Corrupted(e.to_string())
149 }
150}
151
152pub type BTreeResult<T> = Result<T, BTreeError>;
154
155pub struct BTreeCursor {
157 leaf_page_id: u32,
159 position: usize,
161 pager: Arc<Pager>,
163 prefetched_next: bool,
167}
168
169impl BTreeCursor {
170 pub fn next(&mut self) -> BTreeResult<Option<(Vec<u8>, Vec<u8>)>> {
172 if self.leaf_page_id == 0 {
173 return Ok(None);
174 }
175
176 let page = self.pager.read_page(self.leaf_page_id)?;
177 let cell_count = page.cell_count() as usize;
178
179 if !self.prefetched_next && self.position >= cell_count / 2 && cell_count > 0 {
185 let next_leaf = read_next_leaf(&page);
186 if next_leaf != 0 {
187 self.pager.prefetch_hint(next_leaf);
188 }
189 self.prefetched_next = true;
190 }
191
192 if self.position < cell_count {
194 let (key, stored) = read_leaf_cell(&page, self.position)?;
195 let value = value_layout::decode(&self.pager, &stored)?;
196 self.position += 1;
197 return Ok(Some((key, value)));
198 }
199
200 let next_leaf = read_next_leaf(&page);
202 if next_leaf == 0 {
203 self.leaf_page_id = 0;
204 return Ok(None);
205 }
206
207 self.leaf_page_id = next_leaf;
208 self.position = 0;
209 self.prefetched_next = false; let page = self.pager.read_page(self.leaf_page_id)?;
213 if page.cell_count() == 0 {
214 return Ok(None);
215 }
216
217 let (key, stored) = read_leaf_cell(&page, 0)?;
218 let value = value_layout::decode(&self.pager, &stored)?;
219 self.position = 1;
220 Ok(Some((key, value)))
221 }
222
223 pub fn peek(&self) -> BTreeResult<Option<(Vec<u8>, Vec<u8>)>> {
225 if self.leaf_page_id == 0 {
226 return Ok(None);
227 }
228
229 let page = self.pager.read_page(self.leaf_page_id)?;
230 let cell_count = page.cell_count() as usize;
231
232 if self.position >= cell_count {
233 return Ok(None);
234 }
235
236 let (key, stored) = read_leaf_cell(&page, self.position)?;
237 let value = value_layout::decode(&self.pager, &stored)?;
238 Ok(Some((key, value)))
239 }
240}
241
242pub struct BTree {
244 pager: Arc<Pager>,
246 root_page_id: RwLock<u32>,
248 rightmost_leaf: RwLock<Option<(u32, Vec<u8>)>>,
257}
258
259#[path = "btree/impl.rs"]
260mod btree_impl;
261
262#[path = "btree/lehman_yao.rs"]
263pub mod lehman_yao;
264
265#[path = "btree/value_layout.rs"]
266pub mod value_layout;
267enum SearchResult {
270 Found(usize),
271 NotFound(usize),
272}
273
274fn search_leaf(page: &Page, key: &[u8]) -> BTreeResult<SearchResult> {
275 let cell_count = page.cell_count() as usize;
276
277 let mut low = 0;
279 let mut high = cell_count;
280
281 while low < high {
282 let mid = (low + high) / 2;
283 let (cell_key, _) = read_leaf_cell(page, mid)?;
284
285 match cell_key.as_slice().cmp(key) {
286 Ordering::Less => low = mid + 1,
287 Ordering::Greater => high = mid,
288 Ordering::Equal => return Ok(SearchResult::Found(mid)),
289 }
290 }
291
292 Ok(SearchResult::NotFound(low))
293}
294
295fn find_child(page: &Page, key: &[u8]) -> BTreeResult<u32> {
296 let cell_count = page.cell_count() as usize;
297
298 for i in 0..cell_count {
300 let (cell_key, child) = read_interior_cell(page, i)?;
301 if key < cell_key.as_slice() {
302 return Ok(child);
303 }
304 }
305
306 Ok(page.right_child())
308}
309
310fn find_first_child(page: &Page) -> BTreeResult<u32> {
311 if page.cell_count() == 0 {
312 return Ok(page.right_child());
313 }
314 let (_, child) = read_interior_cell(page, 0)?;
315 Ok(child)
316}
317
318fn leaf_min_bytes() -> usize {
319 (PAGE_SIZE - LEAF_DATA_OFFSET) * MIN_FILL_FACTOR / 100
320}
321
322fn interior_min_bytes() -> usize {
323 (PAGE_SIZE - INTERIOR_DATA_OFFSET) * MIN_FILL_FACTOR / 100
324}
325
326fn leaf_entry_size(entry: &(Vec<u8>, Vec<u8>)) -> usize {
330 2 + 4 + entry.0.len() + entry.1.len()
331}
332
333fn leaf_entries_size(entries: &[(Vec<u8>, Vec<u8>)]) -> usize {
334 entries.iter().map(leaf_entry_size).sum()
335}
336
337#[inline]
338fn leaf_slot_offset_for(index: usize) -> usize {
339 LEAF_SLOT_ARRAY_OFFSET + index * 2
340}
341
342#[inline]
343fn leaf_read_slot(page: &Page, index: usize) -> BTreeResult<usize> {
344 let data = page.as_bytes();
345 let slot_pos = leaf_slot_offset_for(index);
346 if slot_pos + 2 > PAGE_SIZE {
347 return Err(BTreeError::Corrupted("slot array overflows page".into()));
348 }
349 Ok(u16::from_le_bytes([data[slot_pos], data[slot_pos + 1]]) as usize)
350}
351
352#[inline]
353fn leaf_write_slot(page: &mut Page, index: usize, cell_offset: u16) -> BTreeResult<()> {
354 let data = page.as_bytes_mut();
355 let slot_pos = leaf_slot_offset_for(index);
356 if slot_pos + 2 > PAGE_SIZE {
357 return Err(BTreeError::Corrupted("slot array overflows page".into()));
358 }
359 data[slot_pos..slot_pos + 2].copy_from_slice(&cell_offset.to_le_bytes());
360 Ok(())
361}
362
363#[inline]
364fn leaf_slots_end(page: &Page) -> usize {
365 LEAF_SLOT_ARRAY_OFFSET + (page.cell_count() as usize) * 2
366}
367
368#[inline]
369fn leaf_cells_start(page: &Page) -> usize {
370 let end = page.free_end() as usize;
371 if end == 0 {
372 PAGE_SIZE
373 } else {
374 end
375 }
376}
377
378#[inline]
379fn leaf_free_bytes(page: &Page) -> usize {
380 let slot_end = leaf_slots_end(page);
381 let cells = leaf_cells_start(page);
382 cells.saturating_sub(slot_end)
383}
384
385fn interior_key_size(key: &[u8]) -> usize {
386 2 + key.len() + 4
387}
388
389fn interior_entries_size(keys: &[Vec<u8>]) -> usize {
390 keys.iter().map(|k| interior_key_size(k)).sum()
391}
392
393fn read_leaf_cell(page: &Page, index: usize) -> BTreeResult<(Vec<u8>, Vec<u8>)> {
399 let cell_count = page.cell_count() as usize;
400 if index >= cell_count {
401 return Err(BTreeError::Corrupted("Cell index out of range".into()));
402 }
403 let offset = leaf_read_slot(page, index)?;
404 let data = page.as_bytes();
405 if offset + 4 > PAGE_SIZE {
406 return Err(BTreeError::Corrupted("cell header out of range".into()));
407 }
408 let key_len = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
409 let val_len = u16::from_le_bytes([data[offset + 2], data[offset + 3]]) as usize;
410 let end = offset + 4 + key_len + val_len;
411 if end > PAGE_SIZE {
412 return Err(BTreeError::Corrupted("cell body out of range".into()));
413 }
414 let key = data[offset + 4..offset + 4 + key_len].to_vec();
415 let value = data[offset + 4 + key_len..end].to_vec();
416 Ok((key, value))
417}
418
419fn read_leaf_entries(page: &Page) -> BTreeResult<Vec<(Vec<u8>, Vec<u8>)>> {
420 let cell_count = page.cell_count() as usize;
421 let mut entries = Vec::with_capacity(cell_count);
422 for i in 0..cell_count {
423 entries.push(read_leaf_cell(page, i)?);
424 }
425 Ok(entries)
426}
427
428fn write_leaf_entries(page: &mut Page, entries: &[(Vec<u8>, Vec<u8>)]) -> BTreeResult<()> {
434 clear_leaf_cells(page);
435 for (i, (k, v)) in entries.iter().enumerate() {
436 let cell_size = 4 + k.len() + v.len();
437 let cells_start = leaf_cells_start(page);
438 let slot_end = LEAF_SLOT_ARRAY_OFFSET + (i + 1) * 2;
439 if slot_end + cell_size > cells_start {
440 return Err(BTreeError::Corrupted("leaf rebuild overflowed page".into()));
441 }
442 let cell_offset = cells_start - cell_size;
443 {
444 let data = page.as_bytes_mut();
445 data[cell_offset..cell_offset + 2].copy_from_slice(&(k.len() as u16).to_le_bytes());
446 data[cell_offset + 2..cell_offset + 4].copy_from_slice(&(v.len() as u16).to_le_bytes());
447 data[cell_offset + 4..cell_offset + 4 + k.len()].copy_from_slice(k);
448 data[cell_offset + 4 + k.len()..cell_offset + 4 + k.len() + v.len()].copy_from_slice(v);
449 }
450 page.set_free_end(cell_offset as u16);
451 leaf_write_slot(page, i, cell_offset as u16)?;
452 }
453 page.set_cell_count(entries.len() as u16);
454 page.set_free_start((LEAF_SLOT_ARRAY_OFFSET + entries.len() * 2) as u16);
455 Ok(())
456}
457
458fn can_insert_leaf(page: &Page, key: &[u8], value: &[u8]) -> bool {
459 let needed = 2 + 4 + key.len() + value.len();
460 leaf_free_bytes(page) >= needed
461}
462
463pub(crate) fn overwrite_leaf_value(
479 page: &mut Page,
480 index: usize,
481 new_value: &[u8],
482) -> BTreeResult<()> {
483 let cell_offset = leaf_read_slot(page, index)?;
484 let data = page.as_bytes();
485 if cell_offset + 4 > data.len() {
486 return Err(BTreeError::Corrupted(
487 "leaf cell header out of bounds".into(),
488 ));
489 }
490 let k_len = u16::from_le_bytes([data[cell_offset], data[cell_offset + 1]]) as usize;
491 let old_v_len = u16::from_le_bytes([data[cell_offset + 2], data[cell_offset + 3]]) as usize;
492 if new_value.len() > old_v_len {
493 return Err(BTreeError::Corrupted(
494 "overwrite_leaf_value: new value larger than slot".into(),
495 ));
496 }
497 let value_start = cell_offset + 4 + k_len;
498 if value_start + new_value.len() > data.len() {
499 return Err(BTreeError::Corrupted(
500 "leaf cell value out of bounds".into(),
501 ));
502 }
503 let data = page.as_bytes_mut();
504 if new_value.len() != old_v_len {
507 let new_len = new_value.len() as u16;
508 data[cell_offset + 2..cell_offset + 4].copy_from_slice(&new_len.to_le_bytes());
509 }
510 data[value_start..value_start + new_value.len()].copy_from_slice(new_value);
511 Ok(())
512}
513
514fn insert_into_leaf(page: &mut Page, key: &[u8], value: &[u8]) -> BTreeResult<()> {
516 let cell_count = page.cell_count() as usize;
518 let mut low = 0;
519 let mut high = cell_count;
520 while low < high {
521 let mid = (low + high) / 2;
522 let (cell_key, _) = read_leaf_cell(page, mid)?;
523 match cell_key.as_slice().cmp(key) {
524 Ordering::Less => low = mid + 1,
525 Ordering::Greater => high = mid,
526 Ordering::Equal => {
527 low = mid + 1;
530 break;
531 }
532 }
533 }
534 let insert_pos = low;
535
536 let cell_size = 4 + key.len() + value.len();
538 let slot_end_after = LEAF_SLOT_ARRAY_OFFSET + (cell_count + 1) * 2;
539 let cells_start = leaf_cells_start(page);
540 if slot_end_after + cell_size > cells_start {
541 return Err(BTreeError::Corrupted("leaf page full".into()));
542 }
543 let cell_offset = cells_start - cell_size;
544 {
545 let data = page.as_bytes_mut();
546 data[cell_offset..cell_offset + 2].copy_from_slice(&(key.len() as u16).to_le_bytes());
547 data[cell_offset + 2..cell_offset + 4].copy_from_slice(&(value.len() as u16).to_le_bytes());
548 data[cell_offset + 4..cell_offset + 4 + key.len()].copy_from_slice(key);
549 data[cell_offset + 4 + key.len()..cell_offset + 4 + key.len() + value.len()]
550 .copy_from_slice(value);
551 }
552 page.set_free_end(cell_offset as u16);
553
554 {
558 let shift_from = leaf_slot_offset_for(insert_pos);
559 let shift_to = shift_from + 2;
560 let shift_len = (cell_count - insert_pos) * 2;
561 if shift_len > 0 {
562 let data = page.as_bytes_mut();
563 data.copy_within(shift_from..shift_from + shift_len, shift_to);
564 }
565 }
566 leaf_write_slot(page, insert_pos, cell_offset as u16)?;
567
568 page.set_cell_count((cell_count + 1) as u16);
570 page.set_free_start((LEAF_SLOT_ARRAY_OFFSET + (cell_count + 1) * 2) as u16);
571 Ok(())
572}
573
574fn delete_from_leaf(page: &mut Page, index: usize) -> BTreeResult<()> {
579 let cell_count = page.cell_count() as usize;
580 if index >= cell_count {
581 return Err(BTreeError::Corrupted("delete index out of range".into()));
582 }
583 if index + 1 < cell_count {
584 let shift_from = leaf_slot_offset_for(index + 1);
585 let shift_to = leaf_slot_offset_for(index);
586 let shift_len = (cell_count - index - 1) * 2;
587 let data = page.as_bytes_mut();
588 data.copy_within(shift_from..shift_from + shift_len, shift_to);
589 }
590 page.set_cell_count((cell_count - 1) as u16);
591 page.set_free_start((LEAF_SLOT_ARRAY_OFFSET + (cell_count - 1) * 2) as u16);
592 Ok(())
593}
594
595fn clear_leaf_cells(page: &mut Page) {
599 {
600 let data = page.as_bytes_mut();
601 for byte in &mut data[LEAF_SLOT_ARRAY_OFFSET..] {
602 *byte = 0;
603 }
604 }
605 page.set_cell_count(0);
606 page.set_free_start(LEAF_SLOT_ARRAY_OFFSET as u16);
607 page.set_free_end(PAGE_SIZE as u16);
608}
609
610fn init_leaf_links(page: &mut Page, prev: u32, next: u32) {
611 let data = page.as_bytes_mut();
612 data[LEAF_PREV_OFFSET..LEAF_PREV_OFFSET + 4].copy_from_slice(&prev.to_le_bytes());
613 data[LEAF_NEXT_OFFSET..LEAF_NEXT_OFFSET + 4].copy_from_slice(&next.to_le_bytes());
614}
615
616fn read_next_leaf(page: &Page) -> u32 {
617 let data = page.as_bytes();
618 u32::from_le_bytes([
619 data[LEAF_NEXT_OFFSET],
620 data[LEAF_NEXT_OFFSET + 1],
621 data[LEAF_NEXT_OFFSET + 2],
622 data[LEAF_NEXT_OFFSET + 3],
623 ])
624}
625
626#[inline]
629fn leaf_right_sibling(page: &Page) -> u32 {
630 read_next_leaf(page)
631}
632
633fn leaf_high_key(page: &Page) -> BTreeResult<Option<Vec<u8>>> {
637 let cell_count = page.cell_count() as usize;
638 if cell_count == 0 {
639 return Ok(None);
640 }
641 let (key, _) = read_leaf_cell(page, cell_count - 1)?;
642 Ok(Some(key))
643}
644
645fn set_prev_leaf(page: &mut Page, prev: u32) {
646 let data = page.as_bytes_mut();
647 data[LEAF_PREV_OFFSET..LEAF_PREV_OFFSET + 4].copy_from_slice(&prev.to_le_bytes());
648}
649
650fn set_next_leaf(page: &mut Page, next: u32) {
651 let data = page.as_bytes_mut();
652 data[LEAF_NEXT_OFFSET..LEAF_NEXT_OFFSET + 4].copy_from_slice(&next.to_le_bytes());
653}
654
655#[inline]
670fn interior_slot_offset_for(index: usize) -> usize {
671 INTERIOR_DATA_OFFSET + index * 2
672}
673
674#[inline]
675fn interior_read_slot(page: &Page, index: usize) -> BTreeResult<usize> {
676 let data = page.as_bytes();
677 let slot_pos = interior_slot_offset_for(index);
678 if slot_pos + 2 > PAGE_SIZE {
679 return Err(BTreeError::Corrupted(
680 "interior slot array overflows page".into(),
681 ));
682 }
683 Ok(u16::from_le_bytes([data[slot_pos], data[slot_pos + 1]]) as usize)
684}
685
686#[inline]
687fn interior_write_slot(page: &mut Page, index: usize, cell_offset: u16) -> BTreeResult<()> {
688 let data = page.as_bytes_mut();
689 let slot_pos = interior_slot_offset_for(index);
690 if slot_pos + 2 > PAGE_SIZE {
691 return Err(BTreeError::Corrupted(
692 "interior slot array overflows page".into(),
693 ));
694 }
695 data[slot_pos..slot_pos + 2].copy_from_slice(&cell_offset.to_le_bytes());
696 Ok(())
697}
698
699#[inline]
700fn interior_cells_start(page: &Page) -> usize {
701 let end = page.free_end() as usize;
702 if end == 0 {
703 PAGE_SIZE
704 } else {
705 end
706 }
707}
708
709#[inline]
710fn interior_free_bytes(page: &Page) -> usize {
711 let slot_end = INTERIOR_DATA_OFFSET + (page.cell_count() as usize) * 2;
712 interior_cells_start(page).saturating_sub(slot_end)
713}
714
715fn read_interior_cell(page: &Page, index: usize) -> BTreeResult<(Vec<u8>, u32)> {
716 let cell_count = page.cell_count() as usize;
717 if index >= cell_count {
718 return Err(BTreeError::Corrupted("Cell index out of range".into()));
719 }
720 let offset = interior_read_slot(page, index)?;
721 let data = page.as_bytes();
722 if offset + 2 > PAGE_SIZE {
723 return Err(BTreeError::Corrupted(
724 "interior cell header out of range".into(),
725 ));
726 }
727 let key_len = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
728 let end = offset + 2 + key_len + 4;
729 if end > PAGE_SIZE {
730 return Err(BTreeError::Corrupted(
731 "interior cell body out of range".into(),
732 ));
733 }
734 let key = data[offset + 2..offset + 2 + key_len].to_vec();
735 let child = u32::from_le_bytes([
736 data[offset + 2 + key_len],
737 data[offset + 2 + key_len + 1],
738 data[offset + 2 + key_len + 2],
739 data[offset + 2 + key_len + 3],
740 ]);
741 Ok((key, child))
742}
743
744fn read_interior_keys_children(page: &Page) -> BTreeResult<(Vec<Vec<u8>>, Vec<u32>)> {
745 let cell_count = page.cell_count() as usize;
746 let mut keys = Vec::with_capacity(cell_count);
747 let mut children = Vec::with_capacity(cell_count + 1);
748
749 for i in 0..cell_count {
750 let (key, child) = read_interior_cell(page, i)?;
751 keys.push(key);
752 children.push(child);
753 }
754
755 if cell_count == 0 {
756 let right_child = page.right_child();
757 if right_child != 0 {
758 children.push(right_child);
759 }
760 } else {
761 children.push(page.right_child());
762 }
763
764 Ok((keys, children))
765}
766
767fn write_interior_entries(page: &mut Page, keys: &[Vec<u8>], children: &[u32]) -> BTreeResult<()> {
772 if !keys.is_empty() && children.len() != keys.len() + 1 {
773 return Err(BTreeError::Corrupted(
774 "Interior keys/children length mismatch".into(),
775 ));
776 }
777
778 clear_interior_cells(page);
779 if keys.is_empty() {
780 let right_child = children.first().copied().unwrap_or(0);
781 page.set_right_child(right_child);
782 return Ok(());
783 }
784
785 for (i, key) in keys.iter().enumerate() {
786 let cell_size = 2 + key.len() + 4;
787 let cells_start = interior_cells_start(page);
788 let slot_end = INTERIOR_DATA_OFFSET + (i + 1) * 2;
789 if slot_end + cell_size > cells_start {
790 return Err(BTreeError::Corrupted(
791 "interior rebuild overflowed page".into(),
792 ));
793 }
794 let cell_offset = cells_start - cell_size;
795 {
796 let data = page.as_bytes_mut();
797 data[cell_offset..cell_offset + 2].copy_from_slice(&(key.len() as u16).to_le_bytes());
798 data[cell_offset + 2..cell_offset + 2 + key.len()].copy_from_slice(key);
799 data[cell_offset + 2 + key.len()..cell_offset + 2 + key.len() + 4]
800 .copy_from_slice(&children[i].to_le_bytes());
801 }
802 page.set_free_end(cell_offset as u16);
803 interior_write_slot(page, i, cell_offset as u16)?;
804 }
805 page.set_cell_count(keys.len() as u16);
806 page.set_free_start((INTERIOR_DATA_OFFSET + keys.len() * 2) as u16);
807 page.set_right_child(*children.last().ok_or_else(|| {
808 BTreeError::Corrupted("write_interior_entries: children empty with non-empty keys".into())
809 })?);
810 Ok(())
811}
812
813fn can_insert_interior(page: &Page, key: &[u8]) -> bool {
814 let needed = 2 + 2 + key.len() + 4;
815 interior_free_bytes(page) >= needed
816}
817
818fn insert_into_interior(
823 page: &mut Page,
824 key: &[u8],
825 left_child: u32,
826 right_child: u32,
827) -> BTreeResult<()> {
828 let cell_count = page.cell_count() as usize;
830 let mut low = 0;
831 let mut high = cell_count;
832 while low < high {
833 let mid = (low + high) / 2;
834 let (cell_key, _) = read_interior_cell(page, mid)?;
835 match cell_key.as_slice().cmp(key) {
836 Ordering::Less => low = mid + 1,
837 Ordering::Greater | Ordering::Equal => high = mid,
838 }
839 }
840 let insert_pos = low;
841
842 if insert_pos < cell_count {
847 let old_offset = interior_read_slot(page, insert_pos)?;
848 let data = page.as_bytes();
849 let key_len = u16::from_le_bytes([data[old_offset], data[old_offset + 1]]) as usize;
850 let child_pos = old_offset + 2 + key_len;
851 let data = page.as_bytes_mut();
852 data[child_pos..child_pos + 4].copy_from_slice(&right_child.to_le_bytes());
853 } else {
854 page.set_right_child(right_child);
855 }
856
857 let cell_size = 2 + key.len() + 4;
859 let slot_end_after = INTERIOR_DATA_OFFSET + (cell_count + 1) * 2;
860 let cells_start = interior_cells_start(page);
861 if slot_end_after + cell_size > cells_start {
862 return Err(BTreeError::Corrupted("interior page full".into()));
863 }
864 let cell_offset = cells_start - cell_size;
865 {
866 let data = page.as_bytes_mut();
867 data[cell_offset..cell_offset + 2].copy_from_slice(&(key.len() as u16).to_le_bytes());
868 data[cell_offset + 2..cell_offset + 2 + key.len()].copy_from_slice(key);
869 data[cell_offset + 2 + key.len()..cell_offset + 2 + key.len() + 4]
870 .copy_from_slice(&left_child.to_le_bytes());
871 }
872 page.set_free_end(cell_offset as u16);
873
874 {
877 let shift_from = interior_slot_offset_for(insert_pos);
878 let shift_to = shift_from + 2;
879 let shift_len = (cell_count - insert_pos) * 2;
880 if shift_len > 0 {
881 let data = page.as_bytes_mut();
882 data.copy_within(shift_from..shift_from + shift_len, shift_to);
883 }
884 }
885 interior_write_slot(page, insert_pos, cell_offset as u16)?;
886 page.set_cell_count((cell_count + 1) as u16);
887 page.set_free_start((INTERIOR_DATA_OFFSET + (cell_count + 1) * 2) as u16);
888 Ok(())
889}
890
891fn clear_interior_cells(page: &mut Page) {
892 {
893 let data = page.as_bytes_mut();
894 for byte in &mut data[INTERIOR_DATA_OFFSET..] {
895 *byte = 0;
896 }
897 }
898 page.set_cell_count(0);
899 page.set_free_start(INTERIOR_DATA_OFFSET as u16);
900 page.set_free_end(PAGE_SIZE as u16);
901}
902
903#[cfg(test)]
904mod tests {
905 use super::*;
906 use std::path::PathBuf;
907
908 struct TempDbPath(PathBuf);
915 impl std::ops::Deref for TempDbPath {
916 type Target = PathBuf;
917 fn deref(&self) -> &PathBuf {
918 &self.0
919 }
920 }
921 impl AsRef<std::path::Path> for TempDbPath {
922 fn as_ref(&self) -> &std::path::Path {
923 self.0.as_ref()
924 }
925 }
926 impl Drop for TempDbPath {
927 fn drop(&mut self) {
928 cleanup(&self.0);
929 }
930 }
931
932 fn temp_db_path() -> TempDbPath {
933 use std::sync::atomic::{AtomicU64, Ordering};
934 static COUNTER: AtomicU64 = AtomicU64::new(0);
935 let id = COUNTER.fetch_add(1, Ordering::Relaxed);
936 let mut path = std::env::temp_dir();
937 path.push(format!("reddb_btree_test_{}_{}.db", std::process::id(), id));
938 TempDbPath(path)
939 }
940
941 fn cleanup(path: &PathBuf) {
942 if let (Some(dir), Some(name)) = (
946 path.parent(),
947 path.file_name().and_then(|name| name.to_str()),
948 ) {
949 if let Ok(entries) = std::fs::read_dir(dir) {
950 for entry in entries.flatten() {
951 if entry
952 .file_name()
953 .to_str()
954 .is_some_and(|entry_name| entry_name.starts_with(name))
955 {
956 let _ = std::fs::remove_file(entry.path());
957 }
958 }
959 }
960 }
961 }
962
963 #[test]
964 fn test_btree_empty() {
965 let path = temp_db_path();
966 cleanup(&path);
967
968 let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
969 let tree = BTree::new(pager);
970
971 assert!(tree.is_empty());
972 assert_eq!(tree.root_page_id(), 0);
973 assert_eq!(tree.get(b"key").expect("get() should succeed"), None);
974 assert_eq!(tree.count().expect("count() should succeed"), 0);
975
976 cleanup(&path);
977 }
978
979 #[test]
980 fn test_btree_single_insert() {
981 let path = temp_db_path();
982 cleanup(&path);
983
984 let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
985 let tree = BTree::new(pager);
986
987 tree.insert(b"hello", b"world")
988 .expect("insert() should succeed");
989
990 assert!(!tree.is_empty());
991 assert_eq!(
992 tree.get(b"hello").expect("get() should succeed"),
993 Some(b"world".to_vec())
994 );
995 assert_eq!(tree.get(b"other").expect("get() should succeed"), None);
996 assert_eq!(tree.count().expect("count() should succeed"), 1);
997
998 cleanup(&path);
999 }
1000
1001 #[test]
1002 fn test_btree_multiple_inserts() {
1003 let path = temp_db_path();
1004 cleanup(&path);
1005
1006 let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1007 let tree = BTree::new(pager);
1008
1009 tree.insert(b"c", b"3").expect("insert() should succeed");
1010 tree.insert(b"a", b"1").expect("insert() should succeed");
1011 tree.insert(b"b", b"2").expect("insert() should succeed");
1012
1013 assert_eq!(
1014 tree.get(b"a").expect("get() should succeed"),
1015 Some(b"1".to_vec())
1016 );
1017 assert_eq!(
1018 tree.get(b"b").expect("get() should succeed"),
1019 Some(b"2".to_vec())
1020 );
1021 assert_eq!(
1022 tree.get(b"c").expect("get() should succeed"),
1023 Some(b"3".to_vec())
1024 );
1025 assert_eq!(tree.count().expect("count() should succeed"), 3);
1026
1027 cleanup(&path);
1028 }
1029
1030 #[test]
1031 fn test_btree_duplicate_key() {
1032 let path = temp_db_path();
1033 cleanup(&path);
1034
1035 let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1036 let tree = BTree::new(pager);
1037
1038 tree.insert(b"key", b"value1")
1039 .expect("insert() should succeed");
1040 let result = tree.insert(b"key", b"value2");
1041
1042 assert!(matches!(result, Err(BTreeError::DuplicateKey)));
1043 assert_eq!(
1044 tree.get(b"key").expect("get() should succeed"),
1045 Some(b"value1".to_vec())
1046 );
1047
1048 cleanup(&path);
1049 }
1050
1051 #[test]
1052 fn test_btree_delete() {
1053 let path = temp_db_path();
1054 cleanup(&path);
1055
1056 let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1057 let tree = BTree::new(pager);
1058
1059 tree.insert(b"a", b"1").expect("insert() should succeed");
1060 tree.insert(b"b", b"2").expect("insert() should succeed");
1061 tree.insert(b"c", b"3").expect("insert() should succeed");
1062
1063 assert!(tree.delete(b"b").expect("delete() should succeed"));
1064 assert!(!tree.delete(b"d").expect("delete() should succeed"));
1065
1066 assert_eq!(
1067 tree.get(b"a").expect("get() should succeed"),
1068 Some(b"1".to_vec())
1069 );
1070 assert_eq!(tree.get(b"b").expect("get() should succeed"), None);
1071 assert_eq!(
1072 tree.get(b"c").expect("get() should succeed"),
1073 Some(b"3".to_vec())
1074 );
1075 assert_eq!(tree.count().expect("count() should succeed"), 2);
1076
1077 cleanup(&path);
1078 }
1079
1080 #[test]
1081 fn test_btree_delete_rebalance_removes_empty_leaf() {
1082 let path = temp_db_path();
1083 cleanup(&path);
1084
1085 let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1086 let tree = BTree::new(pager.clone());
1087
1088 let value = vec![b'v'; 200];
1089 for i in 0..300u32 {
1090 let key = format!("key{:03}", i);
1091 tree.insert(key.as_bytes(), &value)
1092 .expect("insert() should succeed");
1093 }
1094
1095 let root_id = tree.root_page_id();
1096 let first_leaf = tree
1097 .find_first_leaf(root_id)
1098 .expect("find_first_leaf() should succeed");
1099 let mut leaf_ids = Vec::new();
1100 let mut current = first_leaf;
1101 loop {
1102 leaf_ids.push(current);
1103 let page = pager
1104 .read_page(current)
1105 .expect("read_page() should succeed");
1106 let next = read_next_leaf(&page);
1107 if next == 0 {
1108 break;
1109 }
1110 current = next;
1111 }
1112
1113 assert!(leaf_ids.len() >= 3);
1114
1115 let target_leaf = leaf_ids[1];
1116 let page = pager
1117 .read_page(target_leaf)
1118 .expect("read_page() should succeed");
1119 let cell_count = page.cell_count() as usize;
1120 let mut keys = Vec::with_capacity(cell_count);
1121 for i in 0..cell_count {
1122 let (key, _) = read_leaf_cell(&page, i).expect("read_leaf_cell() should succeed");
1123 keys.push(key);
1124 }
1125
1126 for key in &keys {
1127 tree.delete(key).expect("delete() should succeed");
1128 }
1129
1130 let expected = 300 - keys.len();
1131 assert_eq!(tree.count().expect("count() should succeed"), expected);
1132
1133 let mut cursor = tree.cursor_first().expect("cursor_first() should succeed");
1134 let mut results = Vec::new();
1135 while let Some((key, _)) = cursor.next().expect("next() should succeed") {
1136 results.push(key);
1137 }
1138
1139 assert_eq!(results.len(), expected);
1140 let last_key = format!("key{:03}", 299).into_bytes();
1141 assert_eq!(results.last(), Some(&last_key));
1142
1143 cleanup(&path);
1144 }
1145
1146 #[test]
1147 fn test_btree_cursor() {
1148 let path = temp_db_path();
1149 cleanup(&path);
1150
1151 let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1152 let tree = BTree::new(pager);
1153
1154 tree.insert(b"c", b"3").expect("insert() should succeed");
1155 tree.insert(b"a", b"1").expect("insert() should succeed");
1156 tree.insert(b"b", b"2").expect("insert() should succeed");
1157
1158 let mut cursor = tree.cursor_first().expect("cursor_first() should succeed");
1159 let mut results: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
1160
1161 while let Some(entry) = cursor.next().expect("next() should succeed") {
1162 results.push(entry);
1163 }
1164
1165 assert_eq!(results.len(), 3);
1166 assert_eq!(results[0], (b"a".to_vec(), b"1".to_vec()));
1167 assert_eq!(results[1], (b"b".to_vec(), b"2".to_vec()));
1168 assert_eq!(results[2], (b"c".to_vec(), b"3".to_vec()));
1169
1170 cleanup(&path);
1171 }
1172
1173 #[test]
1174 fn test_btree_range() {
1175 let path = temp_db_path();
1176 cleanup(&path);
1177
1178 let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1179 let tree = BTree::new(pager);
1180
1181 for i in 0..10u8 {
1182 let key = format!("key{:02}", i);
1183 let value = format!("val{:02}", i);
1184 tree.insert(key.as_bytes(), value.as_bytes())
1185 .expect("insert() should succeed");
1186 }
1187
1188 let results = tree
1189 .range(b"key03", b"key06")
1190 .expect("range() should succeed");
1191
1192 assert_eq!(results.len(), 4);
1193 assert_eq!(results[0].0, b"key03".to_vec());
1194 assert_eq!(results[3].0, b"key06".to_vec());
1195
1196 cleanup(&path);
1197 }
1198
1199 #[test]
1200 fn test_btree_large_keys() {
1201 let path = temp_db_path();
1202 cleanup(&path);
1203
1204 let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1205 let tree = BTree::new(pager);
1206
1207 let key = vec![b'x'; 500];
1208 let value = vec![b'y'; 500];
1209
1210 tree.insert(&key, &value).expect("insert() should succeed");
1211 assert_eq!(tree.get(&key).expect("get() should succeed"), Some(value));
1212
1213 cleanup(&path);
1214 }
1215
1216 #[test]
1217 fn test_btree_key_too_large() {
1218 let path = temp_db_path();
1219 cleanup(&path);
1220
1221 let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1222 let tree = BTree::new(pager);
1223
1224 let key = vec![b'x'; MAX_KEY_SIZE + 1];
1225 let result = tree.insert(&key, b"value");
1226
1227 assert!(matches!(result, Err(BTreeError::KeyTooLarge(_))));
1228
1229 cleanup(&path);
1230 }
1231
1232 #[test]
1233 fn test_btree_many_inserts() {
1234 let path = temp_db_path();
1235 cleanup(&path);
1236
1237 let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1238 let tree = BTree::new(pager);
1239
1240 for i in 0..100u32 {
1242 let key = format!("key{:08}", i);
1243 let value = format!("value{:08}", i);
1244 tree.insert(key.as_bytes(), value.as_bytes())
1245 .expect("insert() should succeed");
1246 }
1247
1248 for i in 0..100u32 {
1250 let key = format!("key{:08}", i);
1251 let expected = format!("value{:08}", i);
1252 assert_eq!(
1253 tree.get(key.as_bytes()).expect("get() should succeed"),
1254 Some(expected.into_bytes())
1255 );
1256 }
1257
1258 assert_eq!(tree.count().expect("count() should succeed"), 100);
1259
1260 cleanup(&path);
1261 }
1262
1263 #[test]
1264 fn test_btree_bulk_insert_sorted_preserves_leaf_layout() {
1265 let path = temp_db_path();
1266 cleanup(&path);
1267
1268 let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1269 let tree = BTree::new(pager);
1270
1271 let items: Vec<(Vec<u8>, Vec<u8>)> = (0..240u32)
1272 .map(|i| {
1273 (
1274 format!("key{:08}", i).into_bytes(),
1275 vec![b'v'; 32 + (i as usize % 7)],
1276 )
1277 })
1278 .collect();
1279
1280 tree.bulk_insert_sorted(&items)
1281 .expect("bulk_insert_sorted() should succeed");
1282
1283 assert_eq!(tree.count().expect("count() should succeed"), items.len());
1284
1285 for (key, value) in &items {
1286 assert_eq!(
1287 tree.get(key).expect("get() should succeed"),
1288 Some(value.clone())
1289 );
1290 }
1291
1292 cleanup(&path);
1293 }
1294
1295 mod prop_upsert_batch_sorted {
1305 use super::*;
1306 use proptest::prelude::*;
1307 use std::collections::BTreeMap;
1308
1309 fn populate_tree(seed: &[(Vec<u8>, Vec<u8>)]) -> (BTree, TempDbPath, Arc<Pager>) {
1312 let path = temp_db_path();
1313 cleanup(&path);
1314 let pager =
1315 Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1316 let tree = BTree::new(Arc::clone(&pager));
1317 for (k, v) in seed {
1318 tree.upsert(k, v).expect("upsert() should succeed");
1319 }
1320 (tree, path, pager)
1321 }
1322
1323 fn key_strategy() -> impl Strategy<Value = Vec<u8>> {
1324 prop::collection::vec(0u8..16u8, 1..=4)
1328 }
1329
1330 fn value_strategy() -> impl Strategy<Value = Vec<u8>> {
1331 prop::collection::vec(0u8..255u8, 0..=8)
1335 }
1336
1337 fn pair_strategy() -> impl Strategy<Value = (Vec<u8>, Vec<u8>)> {
1338 (key_strategy(), value_strategy())
1339 }
1340
1341 proptest! {
1342 #![proptest_config(ProptestConfig {
1343 cases: 64,
1344 .. ProptestConfig::default()
1345 })]
1346
1347 #[test]
1348 fn batch_upsert_matches_loop_upsert(
1349 seed in prop::collection::vec(pair_strategy(), 0..50),
1350 batch in prop::collection::vec(pair_strategy(), 1..200),
1351 ) {
1352 let mut seed_map: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
1355 for (k, v) in seed {
1356 seed_map.insert(k, v);
1357 }
1358 let seed: Vec<(Vec<u8>, Vec<u8>)> = seed_map.into_iter().collect();
1359
1360 let (baseline, baseline_path, _baseline_pager) = populate_tree(&seed);
1362 for (k, v) in &batch {
1363 baseline.upsert(k, v).expect("upsert() should succeed");
1364 }
1365
1366 let (candidate, candidate_path, _candidate_pager) = populate_tree(&seed);
1369 let mut sorted = batch.clone();
1370 sorted.sort_by(|a, b| a.0.cmp(&b.0));
1371 candidate.upsert_batch_sorted(&sorted).expect("upsert_batch_sorted() should succeed");
1372
1373 let mut expected: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
1376 for (k, v) in &seed {
1377 expected.insert(k.clone(), v.clone());
1378 }
1379 for (k, v) in &batch {
1380 expected.insert(k.clone(), v.clone());
1381 }
1382
1383 for (k, v) in &expected {
1385 let baseline_got = baseline.get(k).expect("get() should succeed");
1386 let candidate_got = candidate.get(k).expect("get() should succeed");
1387 prop_assert_eq!(baseline_got.as_ref(), Some(v));
1388 prop_assert_eq!(candidate_got.as_ref(), Some(v));
1389 }
1390
1391 let collect = |t: &BTree| -> Vec<(Vec<u8>, Vec<u8>)> {
1393 let mut out = Vec::new();
1394 let mut cur = t.cursor_first().expect("cursor_first() should succeed");
1395 while let Some(entry) = cur.next().expect("next() should succeed") {
1396 out.push(entry);
1397 }
1398 out
1399 };
1400 let baseline_contents = collect(&baseline);
1401 let candidate_contents = collect(&candidate);
1402 prop_assert_eq!(&baseline_contents, &candidate_contents);
1403
1404 let expected_contents: Vec<(Vec<u8>, Vec<u8>)> =
1406 expected.into_iter().collect();
1407 prop_assert_eq!(&candidate_contents, &expected_contents);
1408
1409 cleanup(&baseline_path);
1410 cleanup(&candidate_path);
1411 }
1412 }
1413 }
1414
1415 #[test]
1416 fn test_btree_sorted_iteration() {
1417 let path = temp_db_path();
1418 cleanup(&path);
1419
1420 let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1421 let tree = BTree::new(pager);
1422
1423 let keys = vec![50, 25, 75, 10, 30, 60, 80, 5, 15, 27, 35, 55, 65, 77, 90];
1425 for k in &keys {
1426 let key = format!("{:03}", k);
1427 tree.insert(key.as_bytes(), key.as_bytes())
1428 .expect("insert() should succeed");
1429 }
1430
1431 let mut cursor = tree.cursor_first().expect("cursor_first() should succeed");
1433 let mut prev: Option<Vec<u8>> = None;
1434
1435 while let Some((key, _)) = cursor.next().expect("next() should succeed") {
1436 if let Some(p) = &prev {
1437 assert!(p < &key, "Keys not in sorted order");
1438 }
1439 prev = Some(key);
1440 }
1441
1442 cleanup(&path);
1443 }
1444}
1445
1446#[cfg(test)]
1451mod overflow_pipeline_tests {
1452 use super::value_layout::{MAX_VALUE_SIZE, OVERFLOW_THRESHOLD};
1453 use super::*;
1454 use std::path::PathBuf;
1455
1456 fn temp_db_path(tag: &str) -> PathBuf {
1457 use std::sync::atomic::{AtomicU64, Ordering};
1458 static COUNTER: AtomicU64 = AtomicU64::new(0);
1459 let id = COUNTER.fetch_add(1, Ordering::Relaxed);
1460 let mut path = std::env::temp_dir();
1461 path.push(format!(
1462 "reddb_btree_spill_{}_{}_{}.db",
1463 tag,
1464 std::process::id(),
1465 id
1466 ));
1467 path
1468 }
1469
1470 fn cleanup(path: &std::path::Path) {
1471 let _ = std::fs::remove_file(path);
1472 for sidecar in reddb_file::layout::pager_shadow_sidecar_paths(path) {
1473 let _ = std::fs::remove_file(sidecar);
1474 }
1475 }
1476
1477 #[test]
1482 fn inline_path_allocates_no_overflow_pages() {
1483 let path = temp_db_path("inline_no_overflow");
1484 cleanup(&path);
1485 {
1486 let pager =
1487 Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1488 let tree = BTree::new(pager.clone());
1489 let before = pager.page_count().expect("page_count() should succeed");
1490
1491 let value = vec![0x5Au8; OVERFLOW_THRESHOLD - 1];
1492 tree.insert(b"k", &value).expect("insert() should succeed");
1493
1494 let after = pager.page_count().expect("page_count() should succeed");
1495 assert_eq!(
1498 after - before,
1499 1,
1500 "inline value must allocate only the root leaf"
1501 );
1502 assert_eq!(tree.get(b"k").expect("get() should succeed"), Some(value));
1503 }
1504 cleanup(&path);
1505 }
1506
1507 #[test]
1511 fn compressible_44kb_round_trips() {
1512 let path = temp_db_path("compressible_44kb");
1513 cleanup(&path);
1514 {
1515 let pager =
1516 Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1517 let tree = BTree::new(pager);
1518
1519 let snippet = "# heading\n- bullet line that says the same thing over and over\n";
1521 let mut value = Vec::with_capacity(44 * 1024);
1522 while value.len() < 44 * 1024 {
1523 value.extend_from_slice(snippet.as_bytes());
1524 }
1525 assert!(value.len() > OVERFLOW_THRESHOLD);
1526
1527 tree.insert(b"doc", &value)
1528 .expect("insert() should succeed");
1529 assert_eq!(tree.get(b"doc").expect("get() should succeed"), Some(value));
1530 }
1531 cleanup(&path);
1532 }
1533
1534 #[test]
1537 fn incompressible_5mb_spills_and_round_trips() {
1538 let path = temp_db_path("incompressible_5mb");
1539 cleanup(&path);
1540 {
1541 let pager =
1542 Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1543 let tree = BTree::new(pager.clone());
1544
1545 let mut state: u64 = 0xC0FFEE_DEAD_F00D;
1548 let value: Vec<u8> = (0..5 * 1024 * 1024)
1549 .map(|_| {
1550 state ^= state << 13;
1551 state ^= state >> 7;
1552 state ^= state << 17;
1553 state as u8
1554 })
1555 .collect();
1556
1557 let before = pager.page_count().expect("page_count() should succeed");
1558 tree.insert(b"blob", &value)
1559 .expect("insert() should succeed");
1560 let after = pager.page_count().expect("page_count() should succeed");
1561 assert!(
1564 after - before > 2,
1565 "5 MB spill must allocate overflow pages (alloc'd {} pages)",
1566 after - before
1567 );
1568
1569 assert_eq!(
1570 tree.get(b"blob").expect("get() should succeed").as_deref(),
1571 Some(value.as_slice())
1572 );
1573 }
1574 cleanup(&path);
1575 }
1576
1577 #[test]
1581 fn value_above_ceiling_rejected_without_allocation() {
1582 let path = temp_db_path("ceiling_reject");
1583 cleanup(&path);
1584 {
1585 let pager =
1586 Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1587 let tree = BTree::new(pager.clone());
1588 let before = pager.page_count().expect("page_count() should succeed");
1589
1590 let value = vec![0u8; MAX_VALUE_SIZE + 1];
1591 let err = tree.insert(b"too_big", &value).unwrap_err();
1592 assert!(matches!(err, BTreeError::ValueTooLarge(_)));
1593
1594 assert_eq!(
1595 pager.page_count().expect("page_count() should succeed"),
1596 before,
1597 "rejected value must not allocate pages"
1598 );
1599 assert!(tree.is_empty());
1601 }
1602 cleanup(&path);
1603 }
1604
1605 #[test]
1610 fn bulk_insert_skips_oversized_row_keeps_rest() {
1611 let path = temp_db_path("bulk_mixed");
1612 cleanup(&path);
1613 {
1614 let pager =
1615 Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1616 let tree = BTree::new(pager);
1617
1618 let mut items: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
1619 for i in 0..5u32 {
1621 items.push((format!("k{:04}", i).into_bytes(), vec![b'a' + i as u8; 64]));
1622 }
1623 items.push((b"k0005_huge".to_vec(), vec![0u8; MAX_VALUE_SIZE + 1]));
1625 for i in 6..11u32 {
1627 items.push((format!("k{:04}", i).into_bytes(), vec![b'z' - i as u8; 32]));
1628 }
1629
1630 let res = tree.bulk_insert_sorted(&items);
1631 assert!(matches!(res, Err(BTreeError::ValueTooLarge(_))));
1632
1633 for (k, v) in items.iter().filter(|(_, v)| v.len() <= MAX_VALUE_SIZE) {
1635 assert_eq!(
1636 tree.get(k).expect("get() should succeed").as_deref(),
1637 Some(v.as_slice()),
1638 "valid row {:?} must land",
1639 k
1640 );
1641 }
1642 assert_eq!(tree.get(b"k0005_huge").expect("get() should succeed"), None);
1644 }
1645 cleanup(&path);
1646 }
1647
1648 #[test]
1652 fn large_values_persist_transparently() {
1653 let path = temp_db_path("large_transparent");
1654 cleanup(&path);
1655 {
1656 let pager =
1657 Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1658 let tree = BTree::new(pager);
1659
1660 for (i, size) in [
1665 OVERFLOW_THRESHOLD + 1,
1666 OVERFLOW_THRESHOLD * 2,
1667 64 * 1024,
1668 1 * 1024 * 1024,
1669 ]
1670 .iter()
1671 .enumerate()
1672 {
1673 let value: Vec<u8> = (0..*size).map(|n| ((n + i) & 0xff) as u8).collect();
1674 let key = format!("size{:08}", size).into_bytes();
1675 tree.insert(&key, &value).expect("insert() should succeed");
1676 assert_eq!(
1677 tree.get(&key).expect("get() should succeed").as_deref(),
1678 Some(value.as_slice()),
1679 "size {} must round-trip",
1680 size
1681 );
1682 }
1683 }
1684 cleanup(&path);
1685 }
1686}
1687
1688#[cfg(test)]
1691mod overflow_free_tests {
1692 use super::value_layout::OVERFLOW_THRESHOLD;
1693 use super::*;
1694 use std::path::PathBuf;
1695
1696 fn temp_db_path(tag: &str) -> PathBuf {
1697 use std::sync::atomic::{AtomicU64, Ordering};
1698 static COUNTER: AtomicU64 = AtomicU64::new(0);
1699 let id = COUNTER.fetch_add(1, Ordering::Relaxed);
1700 let mut path = std::env::temp_dir();
1701 path.push(format!(
1702 "reddb_btree_free_{}_{}_{}.db",
1703 tag,
1704 std::process::id(),
1705 id
1706 ));
1707 path
1708 }
1709
1710 fn cleanup(path: &std::path::Path) {
1711 let _ = std::fs::remove_file(path);
1712 for sidecar in reddb_file::layout::pager_shadow_sidecar_paths(path) {
1713 let _ = std::fs::remove_file(sidecar);
1714 }
1715 }
1716
1717 fn incompressible(seed: u64, len: usize) -> Vec<u8> {
1718 let mut state = seed | 1;
1719 (0..len)
1720 .map(|_| {
1721 state ^= state << 13;
1722 state ^= state >> 7;
1723 state ^= state << 17;
1724 state as u8
1725 })
1726 .collect()
1727 }
1728
1729 #[test]
1735 fn delete_spilled_value_frees_overflow_chain() {
1736 let path = temp_db_path("delete_frees");
1737 cleanup(&path);
1738 {
1739 let pager =
1740 Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1741 let tree = BTree::new(pager.clone());
1742
1743 let v1 = incompressible(0xA11CE, 2 * 1024 * 1024);
1745 tree.insert(b"k", &v1).expect("insert() should succeed");
1746 let after_first = pager.page_count().expect("page_count() should succeed");
1747
1748 assert!(tree.delete(b"k").expect("delete() should succeed"));
1750
1751 let v2 = incompressible(0xB0B, 2 * 1024 * 1024);
1754 tree.insert(b"k", &v2).expect("insert() should succeed");
1755 let after_second = pager.page_count().expect("page_count() should succeed");
1756
1757 assert_eq!(
1758 after_second, after_first,
1759 "second spill must reuse freed pages, not grow the file"
1760 );
1761 assert_eq!(
1762 tree.get(b"k").expect("get() should succeed").as_deref(),
1763 Some(v2.as_slice())
1764 );
1765 }
1766 cleanup(&path);
1767 }
1768
1769 #[test]
1774 fn shrinking_update_to_inline_frees_chain() {
1775 let path = temp_db_path("shrink_to_inline");
1776 cleanup(&path);
1777 {
1778 let pager =
1779 Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1780 let tree = BTree::new(pager.clone());
1781
1782 let big = incompressible(0xDEAD, 2 * 1024 * 1024);
1783 tree.insert(b"k", &big).expect("insert() should succeed");
1784 let hwm = pager.page_count().expect("page_count() should succeed");
1785
1786 let small = b"tiny".to_vec();
1788 tree.upsert(b"k", &small).expect("upsert() should succeed");
1789 assert_eq!(
1790 tree.get(b"k").expect("get() should succeed").as_deref(),
1791 Some(small.as_slice())
1792 );
1793
1794 let big2 = incompressible(0xBEEF, 2 * 1024 * 1024);
1798 tree.insert(b"k2", &big2).expect("insert() should succeed");
1799 assert_eq!(
1800 pager.page_count().expect("page_count() should succeed"),
1801 hwm,
1802 "shrinking update must free the chain — replacement spill should reuse"
1803 );
1804 assert_eq!(
1805 tree.get(b"k2").expect("get() should succeed").as_deref(),
1806 Some(big2.as_slice())
1807 );
1808 }
1809 cleanup(&path);
1810 }
1811
1812 #[test]
1817 fn shrinking_update_to_shorter_spill_frees_old_chain() {
1818 let path = temp_db_path("shrink_spill_to_spill");
1819 cleanup(&path);
1820 {
1821 let pager =
1822 Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1823 let tree = BTree::new(pager.clone());
1824
1825 let big = incompressible(0xF00D, 3 * 1024 * 1024);
1829 tree.insert(b"k", &big).expect("insert() should succeed");
1830 let hwm_after_big = pager.page_count().expect("page_count() should succeed");
1831
1832 let smaller = incompressible(0xCAFE, 1 * 1024 * 1024);
1843 tree.upsert(b"k", &smaller)
1844 .expect("upsert() should succeed");
1845 assert_eq!(
1846 tree.get(b"k").expect("get() should succeed").as_deref(),
1847 Some(smaller.as_slice())
1848 );
1849 let hwm_after_upsert = pager.page_count().expect("page_count() should succeed");
1850
1851 assert!(tree.delete(b"k").expect("delete() should succeed"));
1852
1853 let again = incompressible(0x1234, 3 * 1024 * 1024);
1854 tree.insert(b"k", &again).expect("insert() should succeed");
1855 assert!(
1856 pager.page_count().expect("page_count() should succeed") <= hwm_after_upsert,
1857 "after delete+insert of original size, file must not grow past the upsert transient \
1858 (page_count = {}, transient high-water = {}, original = {}) — the upsert must have \
1859 freed the old chain so the re-insert reuses freed pages",
1860 pager.page_count().expect("page_count() should succeed"),
1861 hwm_after_upsert,
1862 hwm_after_big,
1863 );
1864 assert_eq!(
1865 tree.get(b"k").expect("get() should succeed").as_deref(),
1866 Some(again.as_slice())
1867 );
1868 }
1869 cleanup(&path);
1870 }
1871
1872 #[test]
1875 fn insert_delete_reinsert_does_not_grow_file() {
1876 let path = temp_db_path("reinsert_no_grow");
1877 cleanup(&path);
1878 {
1879 let pager =
1880 Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1881 let tree = BTree::new(pager.clone());
1882
1883 let v1 = incompressible(0x5EED, 4 * 1024 * 1024);
1884 tree.insert(b"row", &v1).expect("insert() should succeed");
1885 let pages_after_first = pager.page_count().expect("page_count() should succeed");
1886
1887 assert!(tree.delete(b"row").expect("delete() should succeed"));
1888
1889 let v2 = incompressible(0xACE, 4 * 1024 * 1024);
1890 tree.insert(b"row", &v2).expect("insert() should succeed");
1891 let pages_after_second = pager.page_count().expect("page_count() should succeed");
1892
1893 assert_eq!(
1894 pages_after_second, pages_after_first,
1895 "re-inserting at the same size must reuse the freed chain"
1896 );
1897 assert_eq!(
1898 tree.get(b"row").expect("get() should succeed").as_deref(),
1899 Some(v2.as_slice())
1900 );
1901 }
1902 cleanup(&path);
1903 }
1904
1905 #[test]
1910 fn upsert_batch_in_place_shrink_frees_chain() {
1911 let path = temp_db_path("batch_shrink");
1912 cleanup(&path);
1913 {
1914 let pager =
1915 Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1916 let tree = BTree::new(pager.clone());
1917
1918 let big_a = incompressible(0x1111, 2 * 1024 * 1024);
1921 let big_b = incompressible(0x2222, 2 * 1024 * 1024);
1922 tree.insert(b"a", &big_a).expect("insert() should succeed");
1923 tree.insert(b"b", &big_b).expect("insert() should succeed");
1924 let hwm = pager.page_count().expect("page_count() should succeed");
1925
1926 let updates = vec![
1927 (b"a".to_vec(), b"shrunk-a".to_vec()),
1928 (b"b".to_vec(), b"shrunk-b".to_vec()),
1929 ];
1930 tree.upsert_batch_sorted(&updates)
1931 .expect("upsert_batch_sorted() should succeed");
1932 assert_eq!(
1933 tree.get(b"a").expect("get() should succeed").as_deref(),
1934 Some(b"shrunk-a".as_slice())
1935 );
1936 assert_eq!(
1937 tree.get(b"b").expect("get() should succeed").as_deref(),
1938 Some(b"shrunk-b".as_slice())
1939 );
1940
1941 let again_a = incompressible(0xAAAA, 2 * 1024 * 1024);
1944 let again_b = incompressible(0xBBBB, 2 * 1024 * 1024);
1945 tree.insert(b"c", &again_a)
1946 .expect("insert() should succeed");
1947 tree.insert(b"d", &again_b)
1948 .expect("insert() should succeed");
1949 assert!(
1950 pager.page_count().expect("page_count() should succeed") <= hwm,
1951 "batch in-place shrink must free chains — replacement spills should reuse"
1952 );
1953 }
1954 cleanup(&path);
1955 }
1956}