1use std::cmp::Ordering;
33use std::sync::{Arc, RwLock};
34
35use super::page::{Page, PageType, HEADER_SIZE, PAGE_SIZE};
36use super::pager::{Pager, PagerError};
37
38pub const MAX_KEY_SIZE: usize = 1024;
40
41pub const MAX_VALUE_SIZE: usize = value_layout::MAX_VALUE_SIZE;
54
55pub const MIN_FILL_FACTOR: usize = 25;
57
58const LEAF_PREV_OFFSET: usize = HEADER_SIZE;
60
61const LEAF_NEXT_OFFSET: usize = HEADER_SIZE + 4;
63
64const LEAF_SLOT_ARRAY_OFFSET: usize = HEADER_SIZE + 8;
79
80const LEAF_DATA_OFFSET: usize = LEAF_SLOT_ARRAY_OFFSET;
83
84const INTERIOR_DATA_OFFSET: usize = HEADER_SIZE;
86
87#[derive(Debug, Clone)]
89pub enum BTreeError {
90 NotFound,
92 DuplicateKey,
94 KeyTooLarge(usize),
96 ValueTooLarge(usize),
98 Corrupted(String),
100 Pager(String),
102 LockPoisoned(String),
104 ValueLayout(String),
106}
107
108impl std::fmt::Display for BTreeError {
109 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110 match self {
111 Self::NotFound => write!(f, "Key not found"),
112 Self::DuplicateKey => write!(f, "Key already exists"),
113 Self::KeyTooLarge(size) => {
114 write!(f, "Key too large: {} bytes (max {})", size, MAX_KEY_SIZE)
115 }
116 Self::ValueTooLarge(size) => write!(
117 f,
118 "Value too large: {} bytes (max {})",
119 size, MAX_VALUE_SIZE
120 ),
121 Self::Corrupted(msg) => write!(f, "B-tree corrupted: {}", msg),
122 Self::Pager(msg) => write!(f, "Pager error: {}", msg),
123 Self::LockPoisoned(msg) => write!(f, "Lock poisoned: {}", msg),
124 Self::ValueLayout(msg) => write!(f, "Value layout: {}", msg),
125 }
126 }
127}
128
129impl From<value_layout::ValueLayoutError> for BTreeError {
130 fn from(e: value_layout::ValueLayoutError) -> Self {
131 match e {
132 value_layout::ValueLayoutError::ValueTooLarge(n) => Self::ValueTooLarge(n),
133 other => Self::ValueLayout(other.to_string()),
134 }
135 }
136}
137
138impl std::error::Error for BTreeError {}
139
140impl From<PagerError> for BTreeError {
141 fn from(e: PagerError) -> Self {
142 Self::Pager(e.to_string())
143 }
144}
145
146impl From<super::page::PageError> for BTreeError {
147 fn from(e: super::page::PageError) -> Self {
148 Self::Corrupted(e.to_string())
149 }
150}
151
152pub type BTreeResult<T> = Result<T, BTreeError>;
154
155pub struct BTreeCursor {
157 leaf_page_id: u32,
159 position: usize,
161 pager: Arc<Pager>,
163 prefetched_next: bool,
167}
168
169impl BTreeCursor {
170 pub fn next(&mut self) -> BTreeResult<Option<(Vec<u8>, Vec<u8>)>> {
172 if self.leaf_page_id == 0 {
173 return Ok(None);
174 }
175
176 let page = self.pager.read_page(self.leaf_page_id)?;
177 let cell_count = page.cell_count() as usize;
178
179 if !self.prefetched_next && self.position >= cell_count / 2 && cell_count > 0 {
185 let next_leaf = read_next_leaf(&page);
186 if next_leaf != 0 {
187 self.pager.prefetch_hint(next_leaf);
188 }
189 self.prefetched_next = true;
190 }
191
192 if self.position < cell_count {
194 let (key, stored) = read_leaf_cell(&page, self.position)?;
195 let value = value_layout::decode(&self.pager, &stored)?;
196 self.position += 1;
197 return Ok(Some((key, value)));
198 }
199
200 let next_leaf = read_next_leaf(&page);
202 if next_leaf == 0 {
203 self.leaf_page_id = 0;
204 return Ok(None);
205 }
206
207 self.leaf_page_id = next_leaf;
208 self.position = 0;
209 self.prefetched_next = false; let page = self.pager.read_page(self.leaf_page_id)?;
213 if page.cell_count() == 0 {
214 return Ok(None);
215 }
216
217 let (key, stored) = read_leaf_cell(&page, 0)?;
218 let value = value_layout::decode(&self.pager, &stored)?;
219 self.position = 1;
220 Ok(Some((key, value)))
221 }
222
223 pub fn peek(&self) -> BTreeResult<Option<(Vec<u8>, Vec<u8>)>> {
225 if self.leaf_page_id == 0 {
226 return Ok(None);
227 }
228
229 let page = self.pager.read_page(self.leaf_page_id)?;
230 let cell_count = page.cell_count() as usize;
231
232 if self.position >= cell_count {
233 return Ok(None);
234 }
235
236 let (key, stored) = read_leaf_cell(&page, self.position)?;
237 let value = value_layout::decode(&self.pager, &stored)?;
238 Ok(Some((key, value)))
239 }
240}
241
242pub struct BTree {
244 pager: Arc<Pager>,
246 root_page_id: RwLock<u32>,
248 rightmost_leaf: RwLock<Option<(u32, Vec<u8>)>>,
257}
258
259#[path = "btree/impl.rs"]
260mod btree_impl;
261
262#[path = "btree/lehman_yao.rs"]
263pub mod lehman_yao;
264
265#[path = "btree/value_layout.rs"]
266pub mod value_layout;
267enum SearchResult {
270 Found(usize),
271 NotFound(usize),
272}
273
274fn search_leaf(page: &Page, key: &[u8]) -> BTreeResult<SearchResult> {
275 let cell_count = page.cell_count() as usize;
276
277 let mut low = 0;
279 let mut high = cell_count;
280
281 while low < high {
282 let mid = (low + high) / 2;
283 let (cell_key, _) = read_leaf_cell(page, mid)?;
284
285 match cell_key.as_slice().cmp(key) {
286 Ordering::Less => low = mid + 1,
287 Ordering::Greater => high = mid,
288 Ordering::Equal => return Ok(SearchResult::Found(mid)),
289 }
290 }
291
292 Ok(SearchResult::NotFound(low))
293}
294
295fn find_child(page: &Page, key: &[u8]) -> BTreeResult<u32> {
296 let cell_count = page.cell_count() as usize;
297
298 for i in 0..cell_count {
300 let (cell_key, child) = read_interior_cell(page, i)?;
301 if key < cell_key.as_slice() {
302 return Ok(child);
303 }
304 }
305
306 Ok(page.right_child())
308}
309
310fn find_first_child(page: &Page) -> BTreeResult<u32> {
311 if page.cell_count() == 0 {
312 return Ok(page.right_child());
313 }
314 let (_, child) = read_interior_cell(page, 0)?;
315 Ok(child)
316}
317
318fn leaf_min_bytes() -> usize {
319 (PAGE_SIZE - LEAF_DATA_OFFSET) * MIN_FILL_FACTOR / 100
320}
321
322fn interior_min_bytes() -> usize {
323 (PAGE_SIZE - INTERIOR_DATA_OFFSET) * MIN_FILL_FACTOR / 100
324}
325
326fn leaf_entry_size(entry: &(Vec<u8>, Vec<u8>)) -> usize {
330 2 + 4 + entry.0.len() + entry.1.len()
331}
332
333fn leaf_entries_size(entries: &[(Vec<u8>, Vec<u8>)]) -> usize {
334 entries.iter().map(leaf_entry_size).sum()
335}
336
337#[inline]
338fn leaf_slot_offset_for(index: usize) -> usize {
339 LEAF_SLOT_ARRAY_OFFSET + index * 2
340}
341
342#[inline]
343fn leaf_read_slot(page: &Page, index: usize) -> BTreeResult<usize> {
344 let data = page.as_bytes();
345 let slot_pos = leaf_slot_offset_for(index);
346 if slot_pos + 2 > PAGE_SIZE {
347 return Err(BTreeError::Corrupted("slot array overflows page".into()));
348 }
349 Ok(u16::from_le_bytes([data[slot_pos], data[slot_pos + 1]]) as usize)
350}
351
352#[inline]
353fn leaf_write_slot(page: &mut Page, index: usize, cell_offset: u16) -> BTreeResult<()> {
354 let data = page.as_bytes_mut();
355 let slot_pos = leaf_slot_offset_for(index);
356 if slot_pos + 2 > PAGE_SIZE {
357 return Err(BTreeError::Corrupted("slot array overflows page".into()));
358 }
359 data[slot_pos..slot_pos + 2].copy_from_slice(&cell_offset.to_le_bytes());
360 Ok(())
361}
362
363#[inline]
364fn leaf_slots_end(page: &Page) -> usize {
365 LEAF_SLOT_ARRAY_OFFSET + (page.cell_count() as usize) * 2
366}
367
368#[inline]
369fn leaf_cells_start(page: &Page) -> usize {
370 let end = page.free_end() as usize;
371 if end == 0 {
372 PAGE_SIZE
373 } else {
374 end
375 }
376}
377
378#[inline]
379fn leaf_free_bytes(page: &Page) -> usize {
380 let slot_end = leaf_slots_end(page);
381 let cells = leaf_cells_start(page);
382 cells.saturating_sub(slot_end)
383}
384
385fn interior_key_size(key: &[u8]) -> usize {
386 2 + key.len() + 4
387}
388
389fn interior_entries_size(keys: &[Vec<u8>]) -> usize {
390 keys.iter().map(|k| interior_key_size(k)).sum()
391}
392
393fn read_leaf_cell(page: &Page, index: usize) -> BTreeResult<(Vec<u8>, Vec<u8>)> {
399 let cell_count = page.cell_count() as usize;
400 if index >= cell_count {
401 return Err(BTreeError::Corrupted("Cell index out of range".into()));
402 }
403 let offset = leaf_read_slot(page, index)?;
404 let data = page.as_bytes();
405 if offset + 4 > PAGE_SIZE {
406 return Err(BTreeError::Corrupted("cell header out of range".into()));
407 }
408 let key_len = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
409 let val_len = u16::from_le_bytes([data[offset + 2], data[offset + 3]]) as usize;
410 let end = offset + 4 + key_len + val_len;
411 if end > PAGE_SIZE {
412 return Err(BTreeError::Corrupted("cell body out of range".into()));
413 }
414 let key = data[offset + 4..offset + 4 + key_len].to_vec();
415 let value = data[offset + 4 + key_len..end].to_vec();
416 Ok((key, value))
417}
418
419fn read_leaf_entries(page: &Page) -> BTreeResult<Vec<(Vec<u8>, Vec<u8>)>> {
420 let cell_count = page.cell_count() as usize;
421 let mut entries = Vec::with_capacity(cell_count);
422 for i in 0..cell_count {
423 entries.push(read_leaf_cell(page, i)?);
424 }
425 Ok(entries)
426}
427
428fn write_leaf_entries(page: &mut Page, entries: &[(Vec<u8>, Vec<u8>)]) -> BTreeResult<()> {
434 clear_leaf_cells(page);
435 for (i, (k, v)) in entries.iter().enumerate() {
436 let cell_size = 4 + k.len() + v.len();
437 let cells_start = leaf_cells_start(page);
438 let slot_end = LEAF_SLOT_ARRAY_OFFSET + (i + 1) * 2;
439 if slot_end + cell_size > cells_start {
440 return Err(BTreeError::Corrupted("leaf rebuild overflowed page".into()));
441 }
442 let cell_offset = cells_start - cell_size;
443 {
444 let data = page.as_bytes_mut();
445 data[cell_offset..cell_offset + 2].copy_from_slice(&(k.len() as u16).to_le_bytes());
446 data[cell_offset + 2..cell_offset + 4].copy_from_slice(&(v.len() as u16).to_le_bytes());
447 data[cell_offset + 4..cell_offset + 4 + k.len()].copy_from_slice(k);
448 data[cell_offset + 4 + k.len()..cell_offset + 4 + k.len() + v.len()].copy_from_slice(v);
449 }
450 page.set_free_end(cell_offset as u16);
451 leaf_write_slot(page, i, cell_offset as u16)?;
452 }
453 page.set_cell_count(entries.len() as u16);
454 page.set_free_start((LEAF_SLOT_ARRAY_OFFSET + entries.len() * 2) as u16);
455 Ok(())
456}
457
458fn can_insert_leaf(page: &Page, key: &[u8], value: &[u8]) -> bool {
459 let needed = 2 + 4 + key.len() + value.len();
460 leaf_free_bytes(page) >= needed
461}
462
463pub(crate) fn overwrite_leaf_value(
479 page: &mut Page,
480 index: usize,
481 new_value: &[u8],
482) -> BTreeResult<()> {
483 let cell_offset = leaf_read_slot(page, index)?;
484 let data = page.as_bytes();
485 if cell_offset + 4 > data.len() {
486 return Err(BTreeError::Corrupted(
487 "leaf cell header out of bounds".into(),
488 ));
489 }
490 let k_len = u16::from_le_bytes([data[cell_offset], data[cell_offset + 1]]) as usize;
491 let old_v_len = u16::from_le_bytes([data[cell_offset + 2], data[cell_offset + 3]]) as usize;
492 if new_value.len() > old_v_len {
493 return Err(BTreeError::Corrupted(
494 "overwrite_leaf_value: new value larger than slot".into(),
495 ));
496 }
497 let value_start = cell_offset + 4 + k_len;
498 if value_start + new_value.len() > data.len() {
499 return Err(BTreeError::Corrupted(
500 "leaf cell value out of bounds".into(),
501 ));
502 }
503 let data = page.as_bytes_mut();
504 if new_value.len() != old_v_len {
507 let new_len = new_value.len() as u16;
508 data[cell_offset + 2..cell_offset + 4].copy_from_slice(&new_len.to_le_bytes());
509 }
510 data[value_start..value_start + new_value.len()].copy_from_slice(new_value);
511 Ok(())
512}
513
514fn insert_into_leaf(page: &mut Page, key: &[u8], value: &[u8]) -> BTreeResult<()> {
516 let cell_count = page.cell_count() as usize;
518 let mut low = 0;
519 let mut high = cell_count;
520 while low < high {
521 let mid = (low + high) / 2;
522 let (cell_key, _) = read_leaf_cell(page, mid)?;
523 match cell_key.as_slice().cmp(key) {
524 Ordering::Less => low = mid + 1,
525 Ordering::Greater => high = mid,
526 Ordering::Equal => {
527 low = mid + 1;
530 break;
531 }
532 }
533 }
534 let insert_pos = low;
535
536 let cell_size = 4 + key.len() + value.len();
538 let slot_end_after = LEAF_SLOT_ARRAY_OFFSET + (cell_count + 1) * 2;
539 let cells_start = leaf_cells_start(page);
540 if slot_end_after + cell_size > cells_start {
541 return Err(BTreeError::Corrupted("leaf page full".into()));
542 }
543 let cell_offset = cells_start - cell_size;
544 {
545 let data = page.as_bytes_mut();
546 data[cell_offset..cell_offset + 2].copy_from_slice(&(key.len() as u16).to_le_bytes());
547 data[cell_offset + 2..cell_offset + 4].copy_from_slice(&(value.len() as u16).to_le_bytes());
548 data[cell_offset + 4..cell_offset + 4 + key.len()].copy_from_slice(key);
549 data[cell_offset + 4 + key.len()..cell_offset + 4 + key.len() + value.len()]
550 .copy_from_slice(value);
551 }
552 page.set_free_end(cell_offset as u16);
553
554 {
558 let shift_from = leaf_slot_offset_for(insert_pos);
559 let shift_to = shift_from + 2;
560 let shift_len = (cell_count - insert_pos) * 2;
561 if shift_len > 0 {
562 let data = page.as_bytes_mut();
563 data.copy_within(shift_from..shift_from + shift_len, shift_to);
564 }
565 }
566 leaf_write_slot(page, insert_pos, cell_offset as u16)?;
567
568 page.set_cell_count((cell_count + 1) as u16);
570 page.set_free_start((LEAF_SLOT_ARRAY_OFFSET + (cell_count + 1) * 2) as u16);
571 Ok(())
572}
573
574fn delete_from_leaf(page: &mut Page, index: usize) -> BTreeResult<()> {
579 let cell_count = page.cell_count() as usize;
580 if index >= cell_count {
581 return Err(BTreeError::Corrupted("delete index out of range".into()));
582 }
583 if index + 1 < cell_count {
584 let shift_from = leaf_slot_offset_for(index + 1);
585 let shift_to = leaf_slot_offset_for(index);
586 let shift_len = (cell_count - index - 1) * 2;
587 let data = page.as_bytes_mut();
588 data.copy_within(shift_from..shift_from + shift_len, shift_to);
589 }
590 page.set_cell_count((cell_count - 1) as u16);
591 page.set_free_start((LEAF_SLOT_ARRAY_OFFSET + (cell_count - 1) * 2) as u16);
592 Ok(())
593}
594
595fn clear_leaf_cells(page: &mut Page) {
599 {
600 let data = page.as_bytes_mut();
601 for byte in &mut data[LEAF_SLOT_ARRAY_OFFSET..] {
602 *byte = 0;
603 }
604 }
605 page.set_cell_count(0);
606 page.set_free_start(LEAF_SLOT_ARRAY_OFFSET as u16);
607 page.set_free_end(PAGE_SIZE as u16);
608}
609
610fn init_leaf_links(page: &mut Page, prev: u32, next: u32) {
611 let data = page.as_bytes_mut();
612 data[LEAF_PREV_OFFSET..LEAF_PREV_OFFSET + 4].copy_from_slice(&prev.to_le_bytes());
613 data[LEAF_NEXT_OFFSET..LEAF_NEXT_OFFSET + 4].copy_from_slice(&next.to_le_bytes());
614}
615
616fn read_next_leaf(page: &Page) -> u32 {
617 let data = page.as_bytes();
618 u32::from_le_bytes([
619 data[LEAF_NEXT_OFFSET],
620 data[LEAF_NEXT_OFFSET + 1],
621 data[LEAF_NEXT_OFFSET + 2],
622 data[LEAF_NEXT_OFFSET + 3],
623 ])
624}
625
626#[inline]
629fn leaf_right_sibling(page: &Page) -> u32 {
630 read_next_leaf(page)
631}
632
633fn leaf_high_key(page: &Page) -> BTreeResult<Option<Vec<u8>>> {
637 let cell_count = page.cell_count() as usize;
638 if cell_count == 0 {
639 return Ok(None);
640 }
641 let (key, _) = read_leaf_cell(page, cell_count - 1)?;
642 Ok(Some(key))
643}
644
645fn set_prev_leaf(page: &mut Page, prev: u32) {
646 let data = page.as_bytes_mut();
647 data[LEAF_PREV_OFFSET..LEAF_PREV_OFFSET + 4].copy_from_slice(&prev.to_le_bytes());
648}
649
650fn set_next_leaf(page: &mut Page, next: u32) {
651 let data = page.as_bytes_mut();
652 data[LEAF_NEXT_OFFSET..LEAF_NEXT_OFFSET + 4].copy_from_slice(&next.to_le_bytes());
653}
654
655#[inline]
670fn interior_slot_offset_for(index: usize) -> usize {
671 INTERIOR_DATA_OFFSET + index * 2
672}
673
674#[inline]
675fn interior_read_slot(page: &Page, index: usize) -> BTreeResult<usize> {
676 let data = page.as_bytes();
677 let slot_pos = interior_slot_offset_for(index);
678 if slot_pos + 2 > PAGE_SIZE {
679 return Err(BTreeError::Corrupted(
680 "interior slot array overflows page".into(),
681 ));
682 }
683 Ok(u16::from_le_bytes([data[slot_pos], data[slot_pos + 1]]) as usize)
684}
685
686#[inline]
687fn interior_write_slot(page: &mut Page, index: usize, cell_offset: u16) -> BTreeResult<()> {
688 let data = page.as_bytes_mut();
689 let slot_pos = interior_slot_offset_for(index);
690 if slot_pos + 2 > PAGE_SIZE {
691 return Err(BTreeError::Corrupted(
692 "interior slot array overflows page".into(),
693 ));
694 }
695 data[slot_pos..slot_pos + 2].copy_from_slice(&cell_offset.to_le_bytes());
696 Ok(())
697}
698
699#[inline]
700fn interior_cells_start(page: &Page) -> usize {
701 let end = page.free_end() as usize;
702 if end == 0 {
703 PAGE_SIZE
704 } else {
705 end
706 }
707}
708
709#[inline]
710fn interior_free_bytes(page: &Page) -> usize {
711 let slot_end = INTERIOR_DATA_OFFSET + (page.cell_count() as usize) * 2;
712 interior_cells_start(page).saturating_sub(slot_end)
713}
714
715fn read_interior_cell(page: &Page, index: usize) -> BTreeResult<(Vec<u8>, u32)> {
716 let cell_count = page.cell_count() as usize;
717 if index >= cell_count {
718 return Err(BTreeError::Corrupted("Cell index out of range".into()));
719 }
720 let offset = interior_read_slot(page, index)?;
721 let data = page.as_bytes();
722 if offset + 2 > PAGE_SIZE {
723 return Err(BTreeError::Corrupted(
724 "interior cell header out of range".into(),
725 ));
726 }
727 let key_len = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
728 let end = offset + 2 + key_len + 4;
729 if end > PAGE_SIZE {
730 return Err(BTreeError::Corrupted(
731 "interior cell body out of range".into(),
732 ));
733 }
734 let key = data[offset + 2..offset + 2 + key_len].to_vec();
735 let child = u32::from_le_bytes([
736 data[offset + 2 + key_len],
737 data[offset + 2 + key_len + 1],
738 data[offset + 2 + key_len + 2],
739 data[offset + 2 + key_len + 3],
740 ]);
741 Ok((key, child))
742}
743
744fn read_interior_keys_children(page: &Page) -> BTreeResult<(Vec<Vec<u8>>, Vec<u32>)> {
745 let cell_count = page.cell_count() as usize;
746 let mut keys = Vec::with_capacity(cell_count);
747 let mut children = Vec::with_capacity(cell_count + 1);
748
749 for i in 0..cell_count {
750 let (key, child) = read_interior_cell(page, i)?;
751 keys.push(key);
752 children.push(child);
753 }
754
755 if cell_count == 0 {
756 let right_child = page.right_child();
757 if right_child != 0 {
758 children.push(right_child);
759 }
760 } else {
761 children.push(page.right_child());
762 }
763
764 Ok((keys, children))
765}
766
767fn write_interior_entries(page: &mut Page, keys: &[Vec<u8>], children: &[u32]) -> BTreeResult<()> {
772 if !keys.is_empty() && children.len() != keys.len() + 1 {
773 return Err(BTreeError::Corrupted(
774 "Interior keys/children length mismatch".into(),
775 ));
776 }
777
778 clear_interior_cells(page);
779 if keys.is_empty() {
780 let right_child = children.first().copied().unwrap_or(0);
781 page.set_right_child(right_child);
782 return Ok(());
783 }
784
785 for (i, key) in keys.iter().enumerate() {
786 let cell_size = 2 + key.len() + 4;
787 let cells_start = interior_cells_start(page);
788 let slot_end = INTERIOR_DATA_OFFSET + (i + 1) * 2;
789 if slot_end + cell_size > cells_start {
790 return Err(BTreeError::Corrupted(
791 "interior rebuild overflowed page".into(),
792 ));
793 }
794 let cell_offset = cells_start - cell_size;
795 {
796 let data = page.as_bytes_mut();
797 data[cell_offset..cell_offset + 2].copy_from_slice(&(key.len() as u16).to_le_bytes());
798 data[cell_offset + 2..cell_offset + 2 + key.len()].copy_from_slice(key);
799 data[cell_offset + 2 + key.len()..cell_offset + 2 + key.len() + 4]
800 .copy_from_slice(&children[i].to_le_bytes());
801 }
802 page.set_free_end(cell_offset as u16);
803 interior_write_slot(page, i, cell_offset as u16)?;
804 }
805 page.set_cell_count(keys.len() as u16);
806 page.set_free_start((INTERIOR_DATA_OFFSET + keys.len() * 2) as u16);
807 page.set_right_child(*children.last().ok_or_else(|| {
808 BTreeError::Corrupted("write_interior_entries: children empty with non-empty keys".into())
809 })?);
810 Ok(())
811}
812
813fn can_insert_interior(page: &Page, key: &[u8]) -> bool {
814 let needed = 2 + 2 + key.len() + 4;
815 interior_free_bytes(page) >= needed
816}
817
818fn insert_into_interior(
823 page: &mut Page,
824 key: &[u8],
825 left_child: u32,
826 right_child: u32,
827) -> BTreeResult<()> {
828 let cell_count = page.cell_count() as usize;
830 let mut low = 0;
831 let mut high = cell_count;
832 while low < high {
833 let mid = (low + high) / 2;
834 let (cell_key, _) = read_interior_cell(page, mid)?;
835 match cell_key.as_slice().cmp(key) {
836 Ordering::Less => low = mid + 1,
837 Ordering::Greater | Ordering::Equal => high = mid,
838 }
839 }
840 let insert_pos = low;
841
842 if insert_pos < cell_count {
847 let old_offset = interior_read_slot(page, insert_pos)?;
848 let data = page.as_bytes();
849 let key_len = u16::from_le_bytes([data[old_offset], data[old_offset + 1]]) as usize;
850 let child_pos = old_offset + 2 + key_len;
851 let data = page.as_bytes_mut();
852 data[child_pos..child_pos + 4].copy_from_slice(&right_child.to_le_bytes());
853 } else {
854 page.set_right_child(right_child);
855 }
856
857 let cell_size = 2 + key.len() + 4;
859 let slot_end_after = INTERIOR_DATA_OFFSET + (cell_count + 1) * 2;
860 let cells_start = interior_cells_start(page);
861 if slot_end_after + cell_size > cells_start {
862 return Err(BTreeError::Corrupted("interior page full".into()));
863 }
864 let cell_offset = cells_start - cell_size;
865 {
866 let data = page.as_bytes_mut();
867 data[cell_offset..cell_offset + 2].copy_from_slice(&(key.len() as u16).to_le_bytes());
868 data[cell_offset + 2..cell_offset + 2 + key.len()].copy_from_slice(key);
869 data[cell_offset + 2 + key.len()..cell_offset + 2 + key.len() + 4]
870 .copy_from_slice(&left_child.to_le_bytes());
871 }
872 page.set_free_end(cell_offset as u16);
873
874 {
877 let shift_from = interior_slot_offset_for(insert_pos);
878 let shift_to = shift_from + 2;
879 let shift_len = (cell_count - insert_pos) * 2;
880 if shift_len > 0 {
881 let data = page.as_bytes_mut();
882 data.copy_within(shift_from..shift_from + shift_len, shift_to);
883 }
884 }
885 interior_write_slot(page, insert_pos, cell_offset as u16)?;
886 page.set_cell_count((cell_count + 1) as u16);
887 page.set_free_start((INTERIOR_DATA_OFFSET + (cell_count + 1) * 2) as u16);
888 Ok(())
889}
890
891fn clear_interior_cells(page: &mut Page) {
892 {
893 let data = page.as_bytes_mut();
894 for byte in &mut data[INTERIOR_DATA_OFFSET..] {
895 *byte = 0;
896 }
897 }
898 page.set_cell_count(0);
899 page.set_free_start(INTERIOR_DATA_OFFSET as u16);
900 page.set_free_end(PAGE_SIZE as u16);
901}
902
903#[cfg(test)]
904mod tests {
905 use super::*;
906 use std::path::PathBuf;
907
908 fn temp_db_path() -> PathBuf {
909 use std::sync::atomic::{AtomicU64, Ordering};
910 static COUNTER: AtomicU64 = AtomicU64::new(0);
911 let id = COUNTER.fetch_add(1, Ordering::Relaxed);
912 let mut path = std::env::temp_dir();
913 path.push(format!("reddb_btree_test_{}_{}.db", std::process::id(), id));
914 path
915 }
916
917 fn cleanup(path: &PathBuf) {
918 let _ = std::fs::remove_file(path);
919 }
920
921 #[test]
922 fn test_btree_empty() {
923 let path = temp_db_path();
924 cleanup(&path);
925
926 let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
927 let tree = BTree::new(pager);
928
929 assert!(tree.is_empty());
930 assert_eq!(tree.root_page_id(), 0);
931 assert_eq!(tree.get(b"key").expect("get() should succeed"), None);
932 assert_eq!(tree.count().expect("count() should succeed"), 0);
933
934 cleanup(&path);
935 }
936
937 #[test]
938 fn test_btree_single_insert() {
939 let path = temp_db_path();
940 cleanup(&path);
941
942 let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
943 let tree = BTree::new(pager);
944
945 tree.insert(b"hello", b"world")
946 .expect("insert() should succeed");
947
948 assert!(!tree.is_empty());
949 assert_eq!(
950 tree.get(b"hello").expect("get() should succeed"),
951 Some(b"world".to_vec())
952 );
953 assert_eq!(tree.get(b"other").expect("get() should succeed"), None);
954 assert_eq!(tree.count().expect("count() should succeed"), 1);
955
956 cleanup(&path);
957 }
958
959 #[test]
960 fn test_btree_multiple_inserts() {
961 let path = temp_db_path();
962 cleanup(&path);
963
964 let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
965 let tree = BTree::new(pager);
966
967 tree.insert(b"c", b"3").expect("insert() should succeed");
968 tree.insert(b"a", b"1").expect("insert() should succeed");
969 tree.insert(b"b", b"2").expect("insert() should succeed");
970
971 assert_eq!(
972 tree.get(b"a").expect("get() should succeed"),
973 Some(b"1".to_vec())
974 );
975 assert_eq!(
976 tree.get(b"b").expect("get() should succeed"),
977 Some(b"2".to_vec())
978 );
979 assert_eq!(
980 tree.get(b"c").expect("get() should succeed"),
981 Some(b"3".to_vec())
982 );
983 assert_eq!(tree.count().expect("count() should succeed"), 3);
984
985 cleanup(&path);
986 }
987
988 #[test]
989 fn test_btree_duplicate_key() {
990 let path = temp_db_path();
991 cleanup(&path);
992
993 let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
994 let tree = BTree::new(pager);
995
996 tree.insert(b"key", b"value1")
997 .expect("insert() should succeed");
998 let result = tree.insert(b"key", b"value2");
999
1000 assert!(matches!(result, Err(BTreeError::DuplicateKey)));
1001 assert_eq!(
1002 tree.get(b"key").expect("get() should succeed"),
1003 Some(b"value1".to_vec())
1004 );
1005
1006 cleanup(&path);
1007 }
1008
1009 #[test]
1010 fn test_btree_delete() {
1011 let path = temp_db_path();
1012 cleanup(&path);
1013
1014 let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1015 let tree = BTree::new(pager);
1016
1017 tree.insert(b"a", b"1").expect("insert() should succeed");
1018 tree.insert(b"b", b"2").expect("insert() should succeed");
1019 tree.insert(b"c", b"3").expect("insert() should succeed");
1020
1021 assert!(tree.delete(b"b").expect("delete() should succeed"));
1022 assert!(!tree.delete(b"d").expect("delete() should succeed"));
1023
1024 assert_eq!(
1025 tree.get(b"a").expect("get() should succeed"),
1026 Some(b"1".to_vec())
1027 );
1028 assert_eq!(tree.get(b"b").expect("get() should succeed"), None);
1029 assert_eq!(
1030 tree.get(b"c").expect("get() should succeed"),
1031 Some(b"3".to_vec())
1032 );
1033 assert_eq!(tree.count().expect("count() should succeed"), 2);
1034
1035 cleanup(&path);
1036 }
1037
1038 #[test]
1039 fn test_btree_delete_rebalance_removes_empty_leaf() {
1040 let path = temp_db_path();
1041 cleanup(&path);
1042
1043 let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1044 let tree = BTree::new(pager.clone());
1045
1046 let value = vec![b'v'; 200];
1047 for i in 0..300u32 {
1048 let key = format!("key{:03}", i);
1049 tree.insert(key.as_bytes(), &value)
1050 .expect("insert() should succeed");
1051 }
1052
1053 let root_id = tree.root_page_id();
1054 let first_leaf = tree
1055 .find_first_leaf(root_id)
1056 .expect("find_first_leaf() should succeed");
1057 let mut leaf_ids = Vec::new();
1058 let mut current = first_leaf;
1059 loop {
1060 leaf_ids.push(current);
1061 let page = pager
1062 .read_page(current)
1063 .expect("read_page() should succeed");
1064 let next = read_next_leaf(&page);
1065 if next == 0 {
1066 break;
1067 }
1068 current = next;
1069 }
1070
1071 assert!(leaf_ids.len() >= 3);
1072
1073 let target_leaf = leaf_ids[1];
1074 let page = pager
1075 .read_page(target_leaf)
1076 .expect("read_page() should succeed");
1077 let cell_count = page.cell_count() as usize;
1078 let mut keys = Vec::with_capacity(cell_count);
1079 for i in 0..cell_count {
1080 let (key, _) = read_leaf_cell(&page, i).expect("read_leaf_cell() should succeed");
1081 keys.push(key);
1082 }
1083
1084 for key in &keys {
1085 tree.delete(key).expect("delete() should succeed");
1086 }
1087
1088 let expected = 300 - keys.len();
1089 assert_eq!(tree.count().expect("count() should succeed"), expected);
1090
1091 let mut cursor = tree.cursor_first().expect("cursor_first() should succeed");
1092 let mut results = Vec::new();
1093 while let Some((key, _)) = cursor.next().expect("next() should succeed") {
1094 results.push(key);
1095 }
1096
1097 assert_eq!(results.len(), expected);
1098 let last_key = format!("key{:03}", 299).into_bytes();
1099 assert_eq!(results.last(), Some(&last_key));
1100
1101 cleanup(&path);
1102 }
1103
1104 #[test]
1105 fn test_btree_cursor() {
1106 let path = temp_db_path();
1107 cleanup(&path);
1108
1109 let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1110 let tree = BTree::new(pager);
1111
1112 tree.insert(b"c", b"3").expect("insert() should succeed");
1113 tree.insert(b"a", b"1").expect("insert() should succeed");
1114 tree.insert(b"b", b"2").expect("insert() should succeed");
1115
1116 let mut cursor = tree.cursor_first().expect("cursor_first() should succeed");
1117 let mut results: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
1118
1119 while let Some(entry) = cursor.next().expect("next() should succeed") {
1120 results.push(entry);
1121 }
1122
1123 assert_eq!(results.len(), 3);
1124 assert_eq!(results[0], (b"a".to_vec(), b"1".to_vec()));
1125 assert_eq!(results[1], (b"b".to_vec(), b"2".to_vec()));
1126 assert_eq!(results[2], (b"c".to_vec(), b"3".to_vec()));
1127
1128 cleanup(&path);
1129 }
1130
1131 #[test]
1132 fn test_btree_range() {
1133 let path = temp_db_path();
1134 cleanup(&path);
1135
1136 let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1137 let tree = BTree::new(pager);
1138
1139 for i in 0..10u8 {
1140 let key = format!("key{:02}", i);
1141 let value = format!("val{:02}", i);
1142 tree.insert(key.as_bytes(), value.as_bytes())
1143 .expect("insert() should succeed");
1144 }
1145
1146 let results = tree
1147 .range(b"key03", b"key06")
1148 .expect("range() should succeed");
1149
1150 assert_eq!(results.len(), 4);
1151 assert_eq!(results[0].0, b"key03".to_vec());
1152 assert_eq!(results[3].0, b"key06".to_vec());
1153
1154 cleanup(&path);
1155 }
1156
1157 #[test]
1158 fn test_btree_large_keys() {
1159 let path = temp_db_path();
1160 cleanup(&path);
1161
1162 let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1163 let tree = BTree::new(pager);
1164
1165 let key = vec![b'x'; 500];
1166 let value = vec![b'y'; 500];
1167
1168 tree.insert(&key, &value).expect("insert() should succeed");
1169 assert_eq!(tree.get(&key).expect("get() should succeed"), Some(value));
1170
1171 cleanup(&path);
1172 }
1173
1174 #[test]
1175 fn test_btree_key_too_large() {
1176 let path = temp_db_path();
1177 cleanup(&path);
1178
1179 let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1180 let tree = BTree::new(pager);
1181
1182 let key = vec![b'x'; MAX_KEY_SIZE + 1];
1183 let result = tree.insert(&key, b"value");
1184
1185 assert!(matches!(result, Err(BTreeError::KeyTooLarge(_))));
1186
1187 cleanup(&path);
1188 }
1189
1190 #[test]
1191 fn test_btree_many_inserts() {
1192 let path = temp_db_path();
1193 cleanup(&path);
1194
1195 let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1196 let tree = BTree::new(pager);
1197
1198 for i in 0..100u32 {
1200 let key = format!("key{:08}", i);
1201 let value = format!("value{:08}", i);
1202 tree.insert(key.as_bytes(), value.as_bytes())
1203 .expect("insert() should succeed");
1204 }
1205
1206 for i in 0..100u32 {
1208 let key = format!("key{:08}", i);
1209 let expected = format!("value{:08}", i);
1210 assert_eq!(
1211 tree.get(key.as_bytes()).expect("get() should succeed"),
1212 Some(expected.into_bytes())
1213 );
1214 }
1215
1216 assert_eq!(tree.count().expect("count() should succeed"), 100);
1217
1218 cleanup(&path);
1219 }
1220
1221 #[test]
1222 fn test_btree_bulk_insert_sorted_preserves_leaf_layout() {
1223 let path = temp_db_path();
1224 cleanup(&path);
1225
1226 let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1227 let tree = BTree::new(pager);
1228
1229 let items: Vec<(Vec<u8>, Vec<u8>)> = (0..240u32)
1230 .map(|i| {
1231 (
1232 format!("key{:08}", i).into_bytes(),
1233 vec![b'v'; 32 + (i as usize % 7)],
1234 )
1235 })
1236 .collect();
1237
1238 tree.bulk_insert_sorted(&items)
1239 .expect("bulk_insert_sorted() should succeed");
1240
1241 assert_eq!(tree.count().expect("count() should succeed"), items.len());
1242
1243 for (key, value) in &items {
1244 assert_eq!(
1245 tree.get(key).expect("get() should succeed"),
1246 Some(value.clone())
1247 );
1248 }
1249
1250 cleanup(&path);
1251 }
1252
1253 mod prop_upsert_batch_sorted {
1263 use super::*;
1264 use proptest::prelude::*;
1265 use std::collections::BTreeMap;
1266
1267 fn populate_tree(seed: &[(Vec<u8>, Vec<u8>)]) -> (BTree, PathBuf, Arc<Pager>) {
1270 let path = temp_db_path();
1271 cleanup(&path);
1272 let pager =
1273 Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1274 let tree = BTree::new(Arc::clone(&pager));
1275 for (k, v) in seed {
1276 tree.upsert(k, v).expect("upsert() should succeed");
1277 }
1278 (tree, path, pager)
1279 }
1280
1281 fn key_strategy() -> impl Strategy<Value = Vec<u8>> {
1282 prop::collection::vec(0u8..16u8, 1..=4)
1286 }
1287
1288 fn value_strategy() -> impl Strategy<Value = Vec<u8>> {
1289 prop::collection::vec(0u8..255u8, 0..=8)
1293 }
1294
1295 fn pair_strategy() -> impl Strategy<Value = (Vec<u8>, Vec<u8>)> {
1296 (key_strategy(), value_strategy())
1297 }
1298
1299 proptest! {
1300 #![proptest_config(ProptestConfig {
1301 cases: 64,
1302 .. ProptestConfig::default()
1303 })]
1304
1305 #[test]
1306 fn batch_upsert_matches_loop_upsert(
1307 seed in prop::collection::vec(pair_strategy(), 0..50),
1308 batch in prop::collection::vec(pair_strategy(), 1..200),
1309 ) {
1310 let mut seed_map: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
1313 for (k, v) in seed {
1314 seed_map.insert(k, v);
1315 }
1316 let seed: Vec<(Vec<u8>, Vec<u8>)> = seed_map.into_iter().collect();
1317
1318 let (baseline, baseline_path, _baseline_pager) = populate_tree(&seed);
1320 for (k, v) in &batch {
1321 baseline.upsert(k, v).expect("upsert() should succeed");
1322 }
1323
1324 let (candidate, candidate_path, _candidate_pager) = populate_tree(&seed);
1327 let mut sorted = batch.clone();
1328 sorted.sort_by(|a, b| a.0.cmp(&b.0));
1329 candidate.upsert_batch_sorted(&sorted).expect("upsert_batch_sorted() should succeed");
1330
1331 let mut expected: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
1334 for (k, v) in &seed {
1335 expected.insert(k.clone(), v.clone());
1336 }
1337 for (k, v) in &batch {
1338 expected.insert(k.clone(), v.clone());
1339 }
1340
1341 for (k, v) in &expected {
1343 let baseline_got = baseline.get(k).expect("get() should succeed");
1344 let candidate_got = candidate.get(k).expect("get() should succeed");
1345 prop_assert_eq!(baseline_got.as_ref(), Some(v));
1346 prop_assert_eq!(candidate_got.as_ref(), Some(v));
1347 }
1348
1349 let collect = |t: &BTree| -> Vec<(Vec<u8>, Vec<u8>)> {
1351 let mut out = Vec::new();
1352 let mut cur = t.cursor_first().expect("cursor_first() should succeed");
1353 while let Some(entry) = cur.next().expect("next() should succeed") {
1354 out.push(entry);
1355 }
1356 out
1357 };
1358 let baseline_contents = collect(&baseline);
1359 let candidate_contents = collect(&candidate);
1360 prop_assert_eq!(&baseline_contents, &candidate_contents);
1361
1362 let expected_contents: Vec<(Vec<u8>, Vec<u8>)> =
1364 expected.into_iter().collect();
1365 prop_assert_eq!(&candidate_contents, &expected_contents);
1366
1367 cleanup(&baseline_path);
1368 cleanup(&candidate_path);
1369 }
1370 }
1371 }
1372
1373 #[test]
1374 fn test_btree_sorted_iteration() {
1375 let path = temp_db_path();
1376 cleanup(&path);
1377
1378 let pager = Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1379 let tree = BTree::new(pager);
1380
1381 let keys = vec![50, 25, 75, 10, 30, 60, 80, 5, 15, 27, 35, 55, 65, 77, 90];
1383 for k in &keys {
1384 let key = format!("{:03}", k);
1385 tree.insert(key.as_bytes(), key.as_bytes())
1386 .expect("insert() should succeed");
1387 }
1388
1389 let mut cursor = tree.cursor_first().expect("cursor_first() should succeed");
1391 let mut prev: Option<Vec<u8>> = None;
1392
1393 while let Some((key, _)) = cursor.next().expect("next() should succeed") {
1394 if let Some(p) = &prev {
1395 assert!(p < &key, "Keys not in sorted order");
1396 }
1397 prev = Some(key);
1398 }
1399
1400 cleanup(&path);
1401 }
1402}
1403
1404#[cfg(test)]
1409mod overflow_pipeline_tests {
1410 use super::value_layout::{MAX_VALUE_SIZE, OVERFLOW_THRESHOLD};
1411 use super::*;
1412 use std::path::PathBuf;
1413
1414 fn temp_db_path(tag: &str) -> PathBuf {
1415 use std::sync::atomic::{AtomicU64, Ordering};
1416 static COUNTER: AtomicU64 = AtomicU64::new(0);
1417 let id = COUNTER.fetch_add(1, Ordering::Relaxed);
1418 let mut path = std::env::temp_dir();
1419 path.push(format!(
1420 "reddb_btree_spill_{}_{}_{}.db",
1421 tag,
1422 std::process::id(),
1423 id
1424 ));
1425 path
1426 }
1427
1428 fn cleanup(path: &std::path::Path) {
1429 let _ = std::fs::remove_file(path);
1430 for sidecar in reddb_file::layout::pager_shadow_sidecar_paths(path) {
1431 let _ = std::fs::remove_file(sidecar);
1432 }
1433 }
1434
1435 #[test]
1440 fn inline_path_allocates_no_overflow_pages() {
1441 let path = temp_db_path("inline_no_overflow");
1442 cleanup(&path);
1443 {
1444 let pager =
1445 Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1446 let tree = BTree::new(pager.clone());
1447 let before = pager.page_count().expect("page_count() should succeed");
1448
1449 let value = vec![0x5Au8; OVERFLOW_THRESHOLD - 1];
1450 tree.insert(b"k", &value).expect("insert() should succeed");
1451
1452 let after = pager.page_count().expect("page_count() should succeed");
1453 assert_eq!(
1456 after - before,
1457 1,
1458 "inline value must allocate only the root leaf"
1459 );
1460 assert_eq!(tree.get(b"k").expect("get() should succeed"), Some(value));
1461 }
1462 cleanup(&path);
1463 }
1464
1465 #[test]
1469 fn compressible_44kb_round_trips() {
1470 let path = temp_db_path("compressible_44kb");
1471 cleanup(&path);
1472 {
1473 let pager =
1474 Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1475 let tree = BTree::new(pager);
1476
1477 let snippet = "# heading\n- bullet line that says the same thing over and over\n";
1479 let mut value = Vec::with_capacity(44 * 1024);
1480 while value.len() < 44 * 1024 {
1481 value.extend_from_slice(snippet.as_bytes());
1482 }
1483 assert!(value.len() > OVERFLOW_THRESHOLD);
1484
1485 tree.insert(b"doc", &value)
1486 .expect("insert() should succeed");
1487 assert_eq!(tree.get(b"doc").expect("get() should succeed"), Some(value));
1488 }
1489 cleanup(&path);
1490 }
1491
1492 #[test]
1495 fn incompressible_5mb_spills_and_round_trips() {
1496 let path = temp_db_path("incompressible_5mb");
1497 cleanup(&path);
1498 {
1499 let pager =
1500 Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1501 let tree = BTree::new(pager.clone());
1502
1503 let mut state: u64 = 0xC0FFEE_DEAD_F00D;
1506 let value: Vec<u8> = (0..5 * 1024 * 1024)
1507 .map(|_| {
1508 state ^= state << 13;
1509 state ^= state >> 7;
1510 state ^= state << 17;
1511 state as u8
1512 })
1513 .collect();
1514
1515 let before = pager.page_count().expect("page_count() should succeed");
1516 tree.insert(b"blob", &value)
1517 .expect("insert() should succeed");
1518 let after = pager.page_count().expect("page_count() should succeed");
1519 assert!(
1522 after - before > 2,
1523 "5 MB spill must allocate overflow pages (alloc'd {} pages)",
1524 after - before
1525 );
1526
1527 assert_eq!(
1528 tree.get(b"blob").expect("get() should succeed").as_deref(),
1529 Some(value.as_slice())
1530 );
1531 }
1532 cleanup(&path);
1533 }
1534
1535 #[test]
1539 fn value_above_ceiling_rejected_without_allocation() {
1540 let path = temp_db_path("ceiling_reject");
1541 cleanup(&path);
1542 {
1543 let pager =
1544 Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1545 let tree = BTree::new(pager.clone());
1546 let before = pager.page_count().expect("page_count() should succeed");
1547
1548 let value = vec![0u8; MAX_VALUE_SIZE + 1];
1549 let err = tree.insert(b"too_big", &value).unwrap_err();
1550 assert!(matches!(err, BTreeError::ValueTooLarge(_)));
1551
1552 assert_eq!(
1553 pager.page_count().expect("page_count() should succeed"),
1554 before,
1555 "rejected value must not allocate pages"
1556 );
1557 assert!(tree.is_empty());
1559 }
1560 cleanup(&path);
1561 }
1562
1563 #[test]
1568 fn bulk_insert_skips_oversized_row_keeps_rest() {
1569 let path = temp_db_path("bulk_mixed");
1570 cleanup(&path);
1571 {
1572 let pager =
1573 Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1574 let tree = BTree::new(pager);
1575
1576 let mut items: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
1577 for i in 0..5u32 {
1579 items.push((format!("k{:04}", i).into_bytes(), vec![b'a' + i as u8; 64]));
1580 }
1581 items.push((b"k0005_huge".to_vec(), vec![0u8; MAX_VALUE_SIZE + 1]));
1583 for i in 6..11u32 {
1585 items.push((format!("k{:04}", i).into_bytes(), vec![b'z' - i as u8; 32]));
1586 }
1587
1588 let res = tree.bulk_insert_sorted(&items);
1589 assert!(matches!(res, Err(BTreeError::ValueTooLarge(_))));
1590
1591 for (k, v) in items.iter().filter(|(_, v)| v.len() <= MAX_VALUE_SIZE) {
1593 assert_eq!(
1594 tree.get(k).expect("get() should succeed").as_deref(),
1595 Some(v.as_slice()),
1596 "valid row {:?} must land",
1597 k
1598 );
1599 }
1600 assert_eq!(tree.get(b"k0005_huge").expect("get() should succeed"), None);
1602 }
1603 cleanup(&path);
1604 }
1605
1606 #[test]
1610 fn large_values_persist_transparently() {
1611 let path = temp_db_path("large_transparent");
1612 cleanup(&path);
1613 {
1614 let pager =
1615 Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1616 let tree = BTree::new(pager);
1617
1618 for (i, size) in [
1623 OVERFLOW_THRESHOLD + 1,
1624 OVERFLOW_THRESHOLD * 2,
1625 64 * 1024,
1626 1 * 1024 * 1024,
1627 ]
1628 .iter()
1629 .enumerate()
1630 {
1631 let value: Vec<u8> = (0..*size).map(|n| ((n + i) & 0xff) as u8).collect();
1632 let key = format!("size{:08}", size).into_bytes();
1633 tree.insert(&key, &value).expect("insert() should succeed");
1634 assert_eq!(
1635 tree.get(&key).expect("get() should succeed").as_deref(),
1636 Some(value.as_slice()),
1637 "size {} must round-trip",
1638 size
1639 );
1640 }
1641 }
1642 cleanup(&path);
1643 }
1644}
1645
1646#[cfg(test)]
1649mod overflow_free_tests {
1650 use super::value_layout::OVERFLOW_THRESHOLD;
1651 use super::*;
1652 use std::path::PathBuf;
1653
1654 fn temp_db_path(tag: &str) -> PathBuf {
1655 use std::sync::atomic::{AtomicU64, Ordering};
1656 static COUNTER: AtomicU64 = AtomicU64::new(0);
1657 let id = COUNTER.fetch_add(1, Ordering::Relaxed);
1658 let mut path = std::env::temp_dir();
1659 path.push(format!(
1660 "reddb_btree_free_{}_{}_{}.db",
1661 tag,
1662 std::process::id(),
1663 id
1664 ));
1665 path
1666 }
1667
1668 fn cleanup(path: &std::path::Path) {
1669 let _ = std::fs::remove_file(path);
1670 for sidecar in reddb_file::layout::pager_shadow_sidecar_paths(path) {
1671 let _ = std::fs::remove_file(sidecar);
1672 }
1673 }
1674
1675 fn incompressible(seed: u64, len: usize) -> Vec<u8> {
1676 let mut state = seed | 1;
1677 (0..len)
1678 .map(|_| {
1679 state ^= state << 13;
1680 state ^= state >> 7;
1681 state ^= state << 17;
1682 state as u8
1683 })
1684 .collect()
1685 }
1686
1687 #[test]
1693 fn delete_spilled_value_frees_overflow_chain() {
1694 let path = temp_db_path("delete_frees");
1695 cleanup(&path);
1696 {
1697 let pager =
1698 Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1699 let tree = BTree::new(pager.clone());
1700
1701 let v1 = incompressible(0xA11CE, 2 * 1024 * 1024);
1703 tree.insert(b"k", &v1).expect("insert() should succeed");
1704 let after_first = pager.page_count().expect("page_count() should succeed");
1705
1706 assert!(tree.delete(b"k").expect("delete() should succeed"));
1708
1709 let v2 = incompressible(0xB0B, 2 * 1024 * 1024);
1712 tree.insert(b"k", &v2).expect("insert() should succeed");
1713 let after_second = pager.page_count().expect("page_count() should succeed");
1714
1715 assert_eq!(
1716 after_second, after_first,
1717 "second spill must reuse freed pages, not grow the file"
1718 );
1719 assert_eq!(
1720 tree.get(b"k").expect("get() should succeed").as_deref(),
1721 Some(v2.as_slice())
1722 );
1723 }
1724 cleanup(&path);
1725 }
1726
1727 #[test]
1732 fn shrinking_update_to_inline_frees_chain() {
1733 let path = temp_db_path("shrink_to_inline");
1734 cleanup(&path);
1735 {
1736 let pager =
1737 Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1738 let tree = BTree::new(pager.clone());
1739
1740 let big = incompressible(0xDEAD, 2 * 1024 * 1024);
1741 tree.insert(b"k", &big).expect("insert() should succeed");
1742 let hwm = pager.page_count().expect("page_count() should succeed");
1743
1744 let small = b"tiny".to_vec();
1746 tree.upsert(b"k", &small).expect("upsert() should succeed");
1747 assert_eq!(
1748 tree.get(b"k").expect("get() should succeed").as_deref(),
1749 Some(small.as_slice())
1750 );
1751
1752 let big2 = incompressible(0xBEEF, 2 * 1024 * 1024);
1756 tree.insert(b"k2", &big2).expect("insert() should succeed");
1757 assert_eq!(
1758 pager.page_count().expect("page_count() should succeed"),
1759 hwm,
1760 "shrinking update must free the chain — replacement spill should reuse"
1761 );
1762 assert_eq!(
1763 tree.get(b"k2").expect("get() should succeed").as_deref(),
1764 Some(big2.as_slice())
1765 );
1766 }
1767 cleanup(&path);
1768 }
1769
1770 #[test]
1775 fn shrinking_update_to_shorter_spill_frees_old_chain() {
1776 let path = temp_db_path("shrink_spill_to_spill");
1777 cleanup(&path);
1778 {
1779 let pager =
1780 Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1781 let tree = BTree::new(pager.clone());
1782
1783 let big = incompressible(0xF00D, 3 * 1024 * 1024);
1787 tree.insert(b"k", &big).expect("insert() should succeed");
1788 let hwm_after_big = pager.page_count().expect("page_count() should succeed");
1789
1790 let smaller = incompressible(0xCAFE, 1 * 1024 * 1024);
1801 tree.upsert(b"k", &smaller)
1802 .expect("upsert() should succeed");
1803 assert_eq!(
1804 tree.get(b"k").expect("get() should succeed").as_deref(),
1805 Some(smaller.as_slice())
1806 );
1807 let hwm_after_upsert = pager.page_count().expect("page_count() should succeed");
1808
1809 assert!(tree.delete(b"k").expect("delete() should succeed"));
1810
1811 let again = incompressible(0x1234, 3 * 1024 * 1024);
1812 tree.insert(b"k", &again).expect("insert() should succeed");
1813 assert!(
1814 pager.page_count().expect("page_count() should succeed") <= hwm_after_upsert,
1815 "after delete+insert of original size, file must not grow past the upsert transient \
1816 (page_count = {}, transient high-water = {}, original = {}) — the upsert must have \
1817 freed the old chain so the re-insert reuses freed pages",
1818 pager.page_count().expect("page_count() should succeed"),
1819 hwm_after_upsert,
1820 hwm_after_big,
1821 );
1822 assert_eq!(
1823 tree.get(b"k").expect("get() should succeed").as_deref(),
1824 Some(again.as_slice())
1825 );
1826 }
1827 cleanup(&path);
1828 }
1829
1830 #[test]
1833 fn insert_delete_reinsert_does_not_grow_file() {
1834 let path = temp_db_path("reinsert_no_grow");
1835 cleanup(&path);
1836 {
1837 let pager =
1838 Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1839 let tree = BTree::new(pager.clone());
1840
1841 let v1 = incompressible(0x5EED, 4 * 1024 * 1024);
1842 tree.insert(b"row", &v1).expect("insert() should succeed");
1843 let pages_after_first = pager.page_count().expect("page_count() should succeed");
1844
1845 assert!(tree.delete(b"row").expect("delete() should succeed"));
1846
1847 let v2 = incompressible(0xACE, 4 * 1024 * 1024);
1848 tree.insert(b"row", &v2).expect("insert() should succeed");
1849 let pages_after_second = pager.page_count().expect("page_count() should succeed");
1850
1851 assert_eq!(
1852 pages_after_second, pages_after_first,
1853 "re-inserting at the same size must reuse the freed chain"
1854 );
1855 assert_eq!(
1856 tree.get(b"row").expect("get() should succeed").as_deref(),
1857 Some(v2.as_slice())
1858 );
1859 }
1860 cleanup(&path);
1861 }
1862
1863 #[test]
1868 fn upsert_batch_in_place_shrink_frees_chain() {
1869 let path = temp_db_path("batch_shrink");
1870 cleanup(&path);
1871 {
1872 let pager =
1873 Arc::new(Pager::open_default(&path).expect("open_default() should succeed"));
1874 let tree = BTree::new(pager.clone());
1875
1876 let big_a = incompressible(0x1111, 2 * 1024 * 1024);
1879 let big_b = incompressible(0x2222, 2 * 1024 * 1024);
1880 tree.insert(b"a", &big_a).expect("insert() should succeed");
1881 tree.insert(b"b", &big_b).expect("insert() should succeed");
1882 let hwm = pager.page_count().expect("page_count() should succeed");
1883
1884 let updates = vec![
1885 (b"a".to_vec(), b"shrunk-a".to_vec()),
1886 (b"b".to_vec(), b"shrunk-b".to_vec()),
1887 ];
1888 tree.upsert_batch_sorted(&updates)
1889 .expect("upsert_batch_sorted() should succeed");
1890 assert_eq!(
1891 tree.get(b"a").expect("get() should succeed").as_deref(),
1892 Some(b"shrunk-a".as_slice())
1893 );
1894 assert_eq!(
1895 tree.get(b"b").expect("get() should succeed").as_deref(),
1896 Some(b"shrunk-b".as_slice())
1897 );
1898
1899 let again_a = incompressible(0xAAAA, 2 * 1024 * 1024);
1902 let again_b = incompressible(0xBBBB, 2 * 1024 * 1024);
1903 tree.insert(b"c", &again_a)
1904 .expect("insert() should succeed");
1905 tree.insert(b"d", &again_b)
1906 .expect("insert() should succeed");
1907 assert!(
1908 pager.page_count().expect("page_count() should succeed") <= hwm,
1909 "batch in-place shrink must free chains — replacement spills should reuse"
1910 );
1911 }
1912 cleanup(&path);
1913 }
1914}