1use std::cmp::Ordering;
33use std::sync::{Arc, RwLock};
34
35use super::page::{Page, PageType, HEADER_SIZE, PAGE_SIZE};
36use super::pager::{Pager, PagerError};
37
38pub const MAX_KEY_SIZE: usize = 1024;
40
41pub const MAX_VALUE_SIZE: usize = value_layout::MAX_VALUE_SIZE;
54
55pub const MIN_FILL_FACTOR: usize = 25;
57
58const LEAF_PREV_OFFSET: usize = HEADER_SIZE;
60
61const LEAF_NEXT_OFFSET: usize = HEADER_SIZE + 4;
63
64const LEAF_SLOT_ARRAY_OFFSET: usize = HEADER_SIZE + 8;
79
80const LEAF_DATA_OFFSET: usize = LEAF_SLOT_ARRAY_OFFSET;
83
84const INTERIOR_DATA_OFFSET: usize = HEADER_SIZE;
86
87#[derive(Debug, Clone)]
89pub enum BTreeError {
90 NotFound,
92 DuplicateKey,
94 KeyTooLarge(usize),
96 ValueTooLarge(usize),
98 Corrupted(String),
100 Pager(String),
102 LockPoisoned(String),
104 ValueLayout(String),
106}
107
108impl std::fmt::Display for BTreeError {
109 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110 match self {
111 Self::NotFound => write!(f, "Key not found"),
112 Self::DuplicateKey => write!(f, "Key already exists"),
113 Self::KeyTooLarge(size) => {
114 write!(f, "Key too large: {} bytes (max {})", size, MAX_KEY_SIZE)
115 }
116 Self::ValueTooLarge(size) => write!(
117 f,
118 "Value too large: {} bytes (max {})",
119 size, MAX_VALUE_SIZE
120 ),
121 Self::Corrupted(msg) => write!(f, "B-tree corrupted: {}", msg),
122 Self::Pager(msg) => write!(f, "Pager error: {}", msg),
123 Self::LockPoisoned(msg) => write!(f, "Lock poisoned: {}", msg),
124 Self::ValueLayout(msg) => write!(f, "Value layout: {}", msg),
125 }
126 }
127}
128
129impl From<value_layout::ValueLayoutError> for BTreeError {
130 fn from(e: value_layout::ValueLayoutError) -> Self {
131 match e {
132 value_layout::ValueLayoutError::ValueTooLarge(n) => Self::ValueTooLarge(n),
133 other => Self::ValueLayout(other.to_string()),
134 }
135 }
136}
137
138impl std::error::Error for BTreeError {}
139
140impl From<PagerError> for BTreeError {
141 fn from(e: PagerError) -> Self {
142 Self::Pager(e.to_string())
143 }
144}
145
146impl From<super::page::PageError> for BTreeError {
147 fn from(e: super::page::PageError) -> Self {
148 Self::Corrupted(e.to_string())
149 }
150}
151
152pub type BTreeResult<T> = Result<T, BTreeError>;
154
155pub struct BTreeCursor {
157 leaf_page_id: u32,
159 position: usize,
161 pager: Arc<Pager>,
163 prefetched_next: bool,
167}
168
169impl BTreeCursor {
170 pub fn next(&mut self) -> BTreeResult<Option<(Vec<u8>, Vec<u8>)>> {
172 if self.leaf_page_id == 0 {
173 return Ok(None);
174 }
175
176 let page = self.pager.read_page(self.leaf_page_id)?;
177 let cell_count = page.cell_count() as usize;
178
179 if !self.prefetched_next && self.position >= cell_count / 2 && cell_count > 0 {
185 let next_leaf = read_next_leaf(&page);
186 if next_leaf != 0 {
187 self.pager.prefetch_hint(next_leaf);
188 }
189 self.prefetched_next = true;
190 }
191
192 if self.position < cell_count {
194 let (key, stored) = read_leaf_cell(&page, self.position)?;
195 let value = value_layout::decode(&self.pager, &stored)?;
196 self.position += 1;
197 return Ok(Some((key, value)));
198 }
199
200 let next_leaf = read_next_leaf(&page);
202 if next_leaf == 0 {
203 self.leaf_page_id = 0;
204 return Ok(None);
205 }
206
207 self.leaf_page_id = next_leaf;
208 self.position = 0;
209 self.prefetched_next = false; let page = self.pager.read_page(self.leaf_page_id)?;
213 if page.cell_count() == 0 {
214 return Ok(None);
215 }
216
217 let (key, stored) = read_leaf_cell(&page, 0)?;
218 let value = value_layout::decode(&self.pager, &stored)?;
219 self.position = 1;
220 Ok(Some((key, value)))
221 }
222
223 pub fn peek(&self) -> BTreeResult<Option<(Vec<u8>, Vec<u8>)>> {
225 if self.leaf_page_id == 0 {
226 return Ok(None);
227 }
228
229 let page = self.pager.read_page(self.leaf_page_id)?;
230 let cell_count = page.cell_count() as usize;
231
232 if self.position >= cell_count {
233 return Ok(None);
234 }
235
236 let (key, stored) = read_leaf_cell(&page, self.position)?;
237 let value = value_layout::decode(&self.pager, &stored)?;
238 Ok(Some((key, value)))
239 }
240}
241
242pub struct BTree {
244 pager: Arc<Pager>,
246 root_page_id: RwLock<u32>,
248 rightmost_leaf: RwLock<Option<(u32, Vec<u8>)>>,
257}
258
259#[path = "btree/impl.rs"]
260mod btree_impl;
261
262#[path = "btree/lehman_yao.rs"]
263pub mod lehman_yao;
264
265#[path = "btree/value_layout.rs"]
266pub mod value_layout;
267enum SearchResult {
270 Found(usize),
271 NotFound(usize),
272}
273
274fn search_leaf(page: &Page, key: &[u8]) -> BTreeResult<SearchResult> {
275 let cell_count = page.cell_count() as usize;
276
277 let mut low = 0;
279 let mut high = cell_count;
280
281 while low < high {
282 let mid = (low + high) / 2;
283 let (cell_key, _) = read_leaf_cell(page, mid)?;
284
285 match cell_key.as_slice().cmp(key) {
286 Ordering::Less => low = mid + 1,
287 Ordering::Greater => high = mid,
288 Ordering::Equal => return Ok(SearchResult::Found(mid)),
289 }
290 }
291
292 Ok(SearchResult::NotFound(low))
293}
294
295fn find_child(page: &Page, key: &[u8]) -> BTreeResult<u32> {
296 let cell_count = page.cell_count() as usize;
297
298 for i in 0..cell_count {
300 let (cell_key, child) = read_interior_cell(page, i)?;
301 if key < cell_key.as_slice() {
302 return Ok(child);
303 }
304 }
305
306 Ok(page.right_child())
308}
309
310fn find_first_child(page: &Page) -> BTreeResult<u32> {
311 if page.cell_count() == 0 {
312 return Ok(page.right_child());
313 }
314 let (_, child) = read_interior_cell(page, 0)?;
315 Ok(child)
316}
317
318fn leaf_min_bytes() -> usize {
319 (PAGE_SIZE - LEAF_DATA_OFFSET) * MIN_FILL_FACTOR / 100
320}
321
322fn interior_min_bytes() -> usize {
323 (PAGE_SIZE - INTERIOR_DATA_OFFSET) * MIN_FILL_FACTOR / 100
324}
325
326fn leaf_entry_size(entry: &(Vec<u8>, Vec<u8>)) -> usize {
330 2 + 4 + entry.0.len() + entry.1.len()
331}
332
333fn leaf_entries_size(entries: &[(Vec<u8>, Vec<u8>)]) -> usize {
334 entries.iter().map(leaf_entry_size).sum()
335}
336
337#[inline]
338fn leaf_slot_offset_for(index: usize) -> usize {
339 LEAF_SLOT_ARRAY_OFFSET + index * 2
340}
341
342#[inline]
343fn leaf_read_slot(page: &Page, index: usize) -> BTreeResult<usize> {
344 let data = page.as_bytes();
345 let slot_pos = leaf_slot_offset_for(index);
346 if slot_pos + 2 > PAGE_SIZE {
347 return Err(BTreeError::Corrupted("slot array overflows page".into()));
348 }
349 Ok(u16::from_le_bytes([data[slot_pos], data[slot_pos + 1]]) as usize)
350}
351
352#[inline]
353fn leaf_write_slot(page: &mut Page, index: usize, cell_offset: u16) -> BTreeResult<()> {
354 let data = page.as_bytes_mut();
355 let slot_pos = leaf_slot_offset_for(index);
356 if slot_pos + 2 > PAGE_SIZE {
357 return Err(BTreeError::Corrupted("slot array overflows page".into()));
358 }
359 data[slot_pos..slot_pos + 2].copy_from_slice(&cell_offset.to_le_bytes());
360 Ok(())
361}
362
363#[inline]
364fn leaf_slots_end(page: &Page) -> usize {
365 LEAF_SLOT_ARRAY_OFFSET + (page.cell_count() as usize) * 2
366}
367
368#[inline]
369fn leaf_cells_start(page: &Page) -> usize {
370 let end = page.free_end() as usize;
371 if end == 0 {
372 PAGE_SIZE
373 } else {
374 end
375 }
376}
377
378#[inline]
379fn leaf_free_bytes(page: &Page) -> usize {
380 let slot_end = leaf_slots_end(page);
381 let cells = leaf_cells_start(page);
382 cells.saturating_sub(slot_end)
383}
384
385fn interior_key_size(key: &[u8]) -> usize {
386 2 + key.len() + 4
387}
388
389fn interior_entries_size(keys: &[Vec<u8>]) -> usize {
390 keys.iter().map(|k| interior_key_size(k)).sum()
391}
392
393fn read_leaf_cell(page: &Page, index: usize) -> BTreeResult<(Vec<u8>, Vec<u8>)> {
399 let cell_count = page.cell_count() as usize;
400 if index >= cell_count {
401 return Err(BTreeError::Corrupted("Cell index out of range".into()));
402 }
403 let offset = leaf_read_slot(page, index)?;
404 let data = page.as_bytes();
405 if offset + 4 > PAGE_SIZE {
406 return Err(BTreeError::Corrupted("cell header out of range".into()));
407 }
408 let key_len = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
409 let val_len = u16::from_le_bytes([data[offset + 2], data[offset + 3]]) as usize;
410 let end = offset + 4 + key_len + val_len;
411 if end > PAGE_SIZE {
412 return Err(BTreeError::Corrupted("cell body out of range".into()));
413 }
414 let key = data[offset + 4..offset + 4 + key_len].to_vec();
415 let value = data[offset + 4 + key_len..end].to_vec();
416 Ok((key, value))
417}
418
419fn read_leaf_entries(page: &Page) -> BTreeResult<Vec<(Vec<u8>, Vec<u8>)>> {
420 let cell_count = page.cell_count() as usize;
421 let mut entries = Vec::with_capacity(cell_count);
422 for i in 0..cell_count {
423 entries.push(read_leaf_cell(page, i)?);
424 }
425 Ok(entries)
426}
427
428fn write_leaf_entries(page: &mut Page, entries: &[(Vec<u8>, Vec<u8>)]) -> BTreeResult<()> {
434 clear_leaf_cells(page);
435 for (i, (k, v)) in entries.iter().enumerate() {
436 let cell_size = 4 + k.len() + v.len();
437 let cells_start = leaf_cells_start(page);
438 let slot_end = LEAF_SLOT_ARRAY_OFFSET + (i + 1) * 2;
439 if slot_end + cell_size > cells_start {
440 return Err(BTreeError::Corrupted("leaf rebuild overflowed page".into()));
441 }
442 let cell_offset = cells_start - cell_size;
443 {
444 let data = page.as_bytes_mut();
445 data[cell_offset..cell_offset + 2].copy_from_slice(&(k.len() as u16).to_le_bytes());
446 data[cell_offset + 2..cell_offset + 4].copy_from_slice(&(v.len() as u16).to_le_bytes());
447 data[cell_offset + 4..cell_offset + 4 + k.len()].copy_from_slice(k);
448 data[cell_offset + 4 + k.len()..cell_offset + 4 + k.len() + v.len()].copy_from_slice(v);
449 }
450 page.set_free_end(cell_offset as u16);
451 leaf_write_slot(page, i, cell_offset as u16)?;
452 }
453 page.set_cell_count(entries.len() as u16);
454 page.set_free_start((LEAF_SLOT_ARRAY_OFFSET + entries.len() * 2) as u16);
455 Ok(())
456}
457
458fn can_insert_leaf(page: &Page, key: &[u8], value: &[u8]) -> bool {
459 let needed = 2 + 4 + key.len() + value.len();
460 leaf_free_bytes(page) >= needed
461}
462
463pub(crate) fn overwrite_leaf_value(
479 page: &mut Page,
480 index: usize,
481 new_value: &[u8],
482) -> BTreeResult<()> {
483 let cell_offset = leaf_read_slot(page, index)?;
484 let data = page.as_bytes();
485 if cell_offset + 4 > data.len() {
486 return Err(BTreeError::Corrupted(
487 "leaf cell header out of bounds".into(),
488 ));
489 }
490 let k_len = u16::from_le_bytes([data[cell_offset], data[cell_offset + 1]]) as usize;
491 let old_v_len = u16::from_le_bytes([data[cell_offset + 2], data[cell_offset + 3]]) as usize;
492 if new_value.len() > old_v_len {
493 return Err(BTreeError::Corrupted(
494 "overwrite_leaf_value: new value larger than slot".into(),
495 ));
496 }
497 let value_start = cell_offset + 4 + k_len;
498 if value_start + new_value.len() > data.len() {
499 return Err(BTreeError::Corrupted(
500 "leaf cell value out of bounds".into(),
501 ));
502 }
503 let data = page.as_bytes_mut();
504 if new_value.len() != old_v_len {
507 let new_len = new_value.len() as u16;
508 data[cell_offset + 2..cell_offset + 4].copy_from_slice(&new_len.to_le_bytes());
509 }
510 data[value_start..value_start + new_value.len()].copy_from_slice(new_value);
511 Ok(())
512}
513
514fn insert_into_leaf(page: &mut Page, key: &[u8], value: &[u8]) -> BTreeResult<()> {
516 let cell_count = page.cell_count() as usize;
518 let mut low = 0;
519 let mut high = cell_count;
520 while low < high {
521 let mid = (low + high) / 2;
522 let (cell_key, _) = read_leaf_cell(page, mid)?;
523 match cell_key.as_slice().cmp(key) {
524 Ordering::Less => low = mid + 1,
525 Ordering::Greater => high = mid,
526 Ordering::Equal => {
527 low = mid + 1;
530 break;
531 }
532 }
533 }
534 let insert_pos = low;
535
536 let cell_size = 4 + key.len() + value.len();
538 let slot_end_after = LEAF_SLOT_ARRAY_OFFSET + (cell_count + 1) * 2;
539 let cells_start = leaf_cells_start(page);
540 if slot_end_after + cell_size > cells_start {
541 return Err(BTreeError::Corrupted("leaf page full".into()));
542 }
543 let cell_offset = cells_start - cell_size;
544 {
545 let data = page.as_bytes_mut();
546 data[cell_offset..cell_offset + 2].copy_from_slice(&(key.len() as u16).to_le_bytes());
547 data[cell_offset + 2..cell_offset + 4].copy_from_slice(&(value.len() as u16).to_le_bytes());
548 data[cell_offset + 4..cell_offset + 4 + key.len()].copy_from_slice(key);
549 data[cell_offset + 4 + key.len()..cell_offset + 4 + key.len() + value.len()]
550 .copy_from_slice(value);
551 }
552 page.set_free_end(cell_offset as u16);
553
554 {
558 let shift_from = leaf_slot_offset_for(insert_pos);
559 let shift_to = shift_from + 2;
560 let shift_len = (cell_count - insert_pos) * 2;
561 if shift_len > 0 {
562 let data = page.as_bytes_mut();
563 data.copy_within(shift_from..shift_from + shift_len, shift_to);
564 }
565 }
566 leaf_write_slot(page, insert_pos, cell_offset as u16)?;
567
568 page.set_cell_count((cell_count + 1) as u16);
570 page.set_free_start((LEAF_SLOT_ARRAY_OFFSET + (cell_count + 1) * 2) as u16);
571 Ok(())
572}
573
574fn delete_from_leaf(page: &mut Page, index: usize) -> BTreeResult<()> {
579 let cell_count = page.cell_count() as usize;
580 if index >= cell_count {
581 return Err(BTreeError::Corrupted("delete index out of range".into()));
582 }
583 if index + 1 < cell_count {
584 let shift_from = leaf_slot_offset_for(index + 1);
585 let shift_to = leaf_slot_offset_for(index);
586 let shift_len = (cell_count - index - 1) * 2;
587 let data = page.as_bytes_mut();
588 data.copy_within(shift_from..shift_from + shift_len, shift_to);
589 }
590 page.set_cell_count((cell_count - 1) as u16);
591 page.set_free_start((LEAF_SLOT_ARRAY_OFFSET + (cell_count - 1) * 2) as u16);
592 Ok(())
593}
594
595fn clear_leaf_cells(page: &mut Page) {
599 {
600 let data = page.as_bytes_mut();
601 for byte in &mut data[LEAF_SLOT_ARRAY_OFFSET..] {
602 *byte = 0;
603 }
604 }
605 page.set_cell_count(0);
606 page.set_free_start(LEAF_SLOT_ARRAY_OFFSET as u16);
607 page.set_free_end(PAGE_SIZE as u16);
608}
609
610fn init_leaf_links(page: &mut Page, prev: u32, next: u32) {
611 let data = page.as_bytes_mut();
612 data[LEAF_PREV_OFFSET..LEAF_PREV_OFFSET + 4].copy_from_slice(&prev.to_le_bytes());
613 data[LEAF_NEXT_OFFSET..LEAF_NEXT_OFFSET + 4].copy_from_slice(&next.to_le_bytes());
614}
615
616fn read_next_leaf(page: &Page) -> u32 {
617 let data = page.as_bytes();
618 u32::from_le_bytes([
619 data[LEAF_NEXT_OFFSET],
620 data[LEAF_NEXT_OFFSET + 1],
621 data[LEAF_NEXT_OFFSET + 2],
622 data[LEAF_NEXT_OFFSET + 3],
623 ])
624}
625
626#[inline]
629fn leaf_right_sibling(page: &Page) -> u32 {
630 read_next_leaf(page)
631}
632
633fn leaf_high_key(page: &Page) -> BTreeResult<Option<Vec<u8>>> {
637 let cell_count = page.cell_count() as usize;
638 if cell_count == 0 {
639 return Ok(None);
640 }
641 let (key, _) = read_leaf_cell(page, cell_count - 1)?;
642 Ok(Some(key))
643}
644
645fn set_prev_leaf(page: &mut Page, prev: u32) {
646 let data = page.as_bytes_mut();
647 data[LEAF_PREV_OFFSET..LEAF_PREV_OFFSET + 4].copy_from_slice(&prev.to_le_bytes());
648}
649
650fn set_next_leaf(page: &mut Page, next: u32) {
651 let data = page.as_bytes_mut();
652 data[LEAF_NEXT_OFFSET..LEAF_NEXT_OFFSET + 4].copy_from_slice(&next.to_le_bytes());
653}
654
655#[inline]
670fn interior_slot_offset_for(index: usize) -> usize {
671 INTERIOR_DATA_OFFSET + index * 2
672}
673
674#[inline]
675fn interior_read_slot(page: &Page, index: usize) -> BTreeResult<usize> {
676 let data = page.as_bytes();
677 let slot_pos = interior_slot_offset_for(index);
678 if slot_pos + 2 > PAGE_SIZE {
679 return Err(BTreeError::Corrupted(
680 "interior slot array overflows page".into(),
681 ));
682 }
683 Ok(u16::from_le_bytes([data[slot_pos], data[slot_pos + 1]]) as usize)
684}
685
686#[inline]
687fn interior_write_slot(page: &mut Page, index: usize, cell_offset: u16) -> BTreeResult<()> {
688 let data = page.as_bytes_mut();
689 let slot_pos = interior_slot_offset_for(index);
690 if slot_pos + 2 > PAGE_SIZE {
691 return Err(BTreeError::Corrupted(
692 "interior slot array overflows page".into(),
693 ));
694 }
695 data[slot_pos..slot_pos + 2].copy_from_slice(&cell_offset.to_le_bytes());
696 Ok(())
697}
698
699#[inline]
700fn interior_cells_start(page: &Page) -> usize {
701 let end = page.free_end() as usize;
702 if end == 0 {
703 PAGE_SIZE
704 } else {
705 end
706 }
707}
708
709#[inline]
710fn interior_free_bytes(page: &Page) -> usize {
711 let slot_end = INTERIOR_DATA_OFFSET + (page.cell_count() as usize) * 2;
712 interior_cells_start(page).saturating_sub(slot_end)
713}
714
715fn read_interior_cell(page: &Page, index: usize) -> BTreeResult<(Vec<u8>, u32)> {
716 let cell_count = page.cell_count() as usize;
717 if index >= cell_count {
718 return Err(BTreeError::Corrupted("Cell index out of range".into()));
719 }
720 let offset = interior_read_slot(page, index)?;
721 let data = page.as_bytes();
722 if offset + 2 > PAGE_SIZE {
723 return Err(BTreeError::Corrupted(
724 "interior cell header out of range".into(),
725 ));
726 }
727 let key_len = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
728 let end = offset + 2 + key_len + 4;
729 if end > PAGE_SIZE {
730 return Err(BTreeError::Corrupted(
731 "interior cell body out of range".into(),
732 ));
733 }
734 let key = data[offset + 2..offset + 2 + key_len].to_vec();
735 let child = u32::from_le_bytes([
736 data[offset + 2 + key_len],
737 data[offset + 2 + key_len + 1],
738 data[offset + 2 + key_len + 2],
739 data[offset + 2 + key_len + 3],
740 ]);
741 Ok((key, child))
742}
743
744fn read_interior_keys_children(page: &Page) -> BTreeResult<(Vec<Vec<u8>>, Vec<u32>)> {
745 let cell_count = page.cell_count() as usize;
746 let mut keys = Vec::with_capacity(cell_count);
747 let mut children = Vec::with_capacity(cell_count + 1);
748
749 for i in 0..cell_count {
750 let (key, child) = read_interior_cell(page, i)?;
751 keys.push(key);
752 children.push(child);
753 }
754
755 if cell_count == 0 {
756 let right_child = page.right_child();
757 if right_child != 0 {
758 children.push(right_child);
759 }
760 } else {
761 children.push(page.right_child());
762 }
763
764 Ok((keys, children))
765}
766
767fn write_interior_entries(page: &mut Page, keys: &[Vec<u8>], children: &[u32]) -> BTreeResult<()> {
772 if !keys.is_empty() && children.len() != keys.len() + 1 {
773 return Err(BTreeError::Corrupted(
774 "Interior keys/children length mismatch".into(),
775 ));
776 }
777
778 clear_interior_cells(page);
779 if keys.is_empty() {
780 let right_child = children.first().copied().unwrap_or(0);
781 page.set_right_child(right_child);
782 return Ok(());
783 }
784
785 for (i, key) in keys.iter().enumerate() {
786 let cell_size = 2 + key.len() + 4;
787 let cells_start = interior_cells_start(page);
788 let slot_end = INTERIOR_DATA_OFFSET + (i + 1) * 2;
789 if slot_end + cell_size > cells_start {
790 return Err(BTreeError::Corrupted(
791 "interior rebuild overflowed page".into(),
792 ));
793 }
794 let cell_offset = cells_start - cell_size;
795 {
796 let data = page.as_bytes_mut();
797 data[cell_offset..cell_offset + 2].copy_from_slice(&(key.len() as u16).to_le_bytes());
798 data[cell_offset + 2..cell_offset + 2 + key.len()].copy_from_slice(key);
799 data[cell_offset + 2 + key.len()..cell_offset + 2 + key.len() + 4]
800 .copy_from_slice(&children[i].to_le_bytes());
801 }
802 page.set_free_end(cell_offset as u16);
803 interior_write_slot(page, i, cell_offset as u16)?;
804 }
805 page.set_cell_count(keys.len() as u16);
806 page.set_free_start((INTERIOR_DATA_OFFSET + keys.len() * 2) as u16);
807 page.set_right_child(*children.last().ok_or_else(|| {
808 BTreeError::Corrupted("write_interior_entries: children empty with non-empty keys".into())
809 })?);
810 Ok(())
811}
812
813fn can_insert_interior(page: &Page, key: &[u8]) -> bool {
814 let needed = 2 + 2 + key.len() + 4;
815 interior_free_bytes(page) >= needed
816}
817
818fn insert_into_interior(
823 page: &mut Page,
824 key: &[u8],
825 left_child: u32,
826 right_child: u32,
827) -> BTreeResult<()> {
828 let cell_count = page.cell_count() as usize;
830 let mut low = 0;
831 let mut high = cell_count;
832 while low < high {
833 let mid = (low + high) / 2;
834 let (cell_key, _) = read_interior_cell(page, mid)?;
835 match cell_key.as_slice().cmp(key) {
836 Ordering::Less => low = mid + 1,
837 Ordering::Greater | Ordering::Equal => high = mid,
838 }
839 }
840 let insert_pos = low;
841
842 if insert_pos < cell_count {
847 let old_offset = interior_read_slot(page, insert_pos)?;
848 let data = page.as_bytes();
849 let key_len = u16::from_le_bytes([data[old_offset], data[old_offset + 1]]) as usize;
850 let child_pos = old_offset + 2 + key_len;
851 let data = page.as_bytes_mut();
852 data[child_pos..child_pos + 4].copy_from_slice(&right_child.to_le_bytes());
853 } else {
854 page.set_right_child(right_child);
855 }
856
857 let cell_size = 2 + key.len() + 4;
859 let slot_end_after = INTERIOR_DATA_OFFSET + (cell_count + 1) * 2;
860 let cells_start = interior_cells_start(page);
861 if slot_end_after + cell_size > cells_start {
862 return Err(BTreeError::Corrupted("interior page full".into()));
863 }
864 let cell_offset = cells_start - cell_size;
865 {
866 let data = page.as_bytes_mut();
867 data[cell_offset..cell_offset + 2].copy_from_slice(&(key.len() as u16).to_le_bytes());
868 data[cell_offset + 2..cell_offset + 2 + key.len()].copy_from_slice(key);
869 data[cell_offset + 2 + key.len()..cell_offset + 2 + key.len() + 4]
870 .copy_from_slice(&left_child.to_le_bytes());
871 }
872 page.set_free_end(cell_offset as u16);
873
874 {
877 let shift_from = interior_slot_offset_for(insert_pos);
878 let shift_to = shift_from + 2;
879 let shift_len = (cell_count - insert_pos) * 2;
880 if shift_len > 0 {
881 let data = page.as_bytes_mut();
882 data.copy_within(shift_from..shift_from + shift_len, shift_to);
883 }
884 }
885 interior_write_slot(page, insert_pos, cell_offset as u16)?;
886 page.set_cell_count((cell_count + 1) as u16);
887 page.set_free_start((INTERIOR_DATA_OFFSET + (cell_count + 1) * 2) as u16);
888 Ok(())
889}
890
891fn clear_interior_cells(page: &mut Page) {
892 {
893 let data = page.as_bytes_mut();
894 for byte in &mut data[INTERIOR_DATA_OFFSET..] {
895 *byte = 0;
896 }
897 }
898 page.set_cell_count(0);
899 page.set_free_start(INTERIOR_DATA_OFFSET as u16);
900 page.set_free_end(PAGE_SIZE as u16);
901}
902
903#[cfg(test)]
904mod tests {
905 use super::*;
906 use std::path::PathBuf;
907
908 fn temp_db_path() -> PathBuf {
909 use std::sync::atomic::{AtomicU64, Ordering};
910 static COUNTER: AtomicU64 = AtomicU64::new(0);
911 let id = COUNTER.fetch_add(1, Ordering::Relaxed);
912 let mut path = std::env::temp_dir();
913 path.push(format!("reddb_btree_test_{}_{}.db", std::process::id(), id));
914 path
915 }
916
917 fn cleanup(path: &PathBuf) {
918 let _ = std::fs::remove_file(path);
919 }
920
921 #[test]
922 fn test_btree_empty() {
923 let path = temp_db_path();
924 cleanup(&path);
925
926 let pager = Arc::new(Pager::open_default(&path).unwrap());
927 let tree = BTree::new(pager);
928
929 assert!(tree.is_empty());
930 assert_eq!(tree.root_page_id(), 0);
931 assert_eq!(tree.get(b"key").unwrap(), None);
932 assert_eq!(tree.count().unwrap(), 0);
933
934 cleanup(&path);
935 }
936
937 #[test]
938 fn test_btree_single_insert() {
939 let path = temp_db_path();
940 cleanup(&path);
941
942 let pager = Arc::new(Pager::open_default(&path).unwrap());
943 let tree = BTree::new(pager);
944
945 tree.insert(b"hello", b"world").unwrap();
946
947 assert!(!tree.is_empty());
948 assert_eq!(tree.get(b"hello").unwrap(), Some(b"world".to_vec()));
949 assert_eq!(tree.get(b"other").unwrap(), None);
950 assert_eq!(tree.count().unwrap(), 1);
951
952 cleanup(&path);
953 }
954
955 #[test]
956 fn test_btree_multiple_inserts() {
957 let path = temp_db_path();
958 cleanup(&path);
959
960 let pager = Arc::new(Pager::open_default(&path).unwrap());
961 let tree = BTree::new(pager);
962
963 tree.insert(b"c", b"3").unwrap();
964 tree.insert(b"a", b"1").unwrap();
965 tree.insert(b"b", b"2").unwrap();
966
967 assert_eq!(tree.get(b"a").unwrap(), Some(b"1".to_vec()));
968 assert_eq!(tree.get(b"b").unwrap(), Some(b"2".to_vec()));
969 assert_eq!(tree.get(b"c").unwrap(), Some(b"3".to_vec()));
970 assert_eq!(tree.count().unwrap(), 3);
971
972 cleanup(&path);
973 }
974
975 #[test]
976 fn test_btree_duplicate_key() {
977 let path = temp_db_path();
978 cleanup(&path);
979
980 let pager = Arc::new(Pager::open_default(&path).unwrap());
981 let tree = BTree::new(pager);
982
983 tree.insert(b"key", b"value1").unwrap();
984 let result = tree.insert(b"key", b"value2");
985
986 assert!(matches!(result, Err(BTreeError::DuplicateKey)));
987 assert_eq!(tree.get(b"key").unwrap(), Some(b"value1".to_vec()));
988
989 cleanup(&path);
990 }
991
992 #[test]
993 fn test_btree_delete() {
994 let path = temp_db_path();
995 cleanup(&path);
996
997 let pager = Arc::new(Pager::open_default(&path).unwrap());
998 let tree = BTree::new(pager);
999
1000 tree.insert(b"a", b"1").unwrap();
1001 tree.insert(b"b", b"2").unwrap();
1002 tree.insert(b"c", b"3").unwrap();
1003
1004 assert!(tree.delete(b"b").unwrap());
1005 assert!(!tree.delete(b"d").unwrap());
1006
1007 assert_eq!(tree.get(b"a").unwrap(), Some(b"1".to_vec()));
1008 assert_eq!(tree.get(b"b").unwrap(), None);
1009 assert_eq!(tree.get(b"c").unwrap(), Some(b"3".to_vec()));
1010 assert_eq!(tree.count().unwrap(), 2);
1011
1012 cleanup(&path);
1013 }
1014
1015 #[test]
1016 fn test_btree_delete_rebalance_removes_empty_leaf() {
1017 let path = temp_db_path();
1018 cleanup(&path);
1019
1020 let pager = Arc::new(Pager::open_default(&path).unwrap());
1021 let tree = BTree::new(pager.clone());
1022
1023 let value = vec![b'v'; 200];
1024 for i in 0..300u32 {
1025 let key = format!("key{:03}", i);
1026 tree.insert(key.as_bytes(), &value).unwrap();
1027 }
1028
1029 let root_id = tree.root_page_id();
1030 let first_leaf = tree.find_first_leaf(root_id).unwrap();
1031 let mut leaf_ids = Vec::new();
1032 let mut current = first_leaf;
1033 loop {
1034 leaf_ids.push(current);
1035 let page = pager.read_page(current).unwrap();
1036 let next = read_next_leaf(&page);
1037 if next == 0 {
1038 break;
1039 }
1040 current = next;
1041 }
1042
1043 assert!(leaf_ids.len() >= 3);
1044
1045 let target_leaf = leaf_ids[1];
1046 let page = pager.read_page(target_leaf).unwrap();
1047 let cell_count = page.cell_count() as usize;
1048 let mut keys = Vec::with_capacity(cell_count);
1049 for i in 0..cell_count {
1050 let (key, _) = read_leaf_cell(&page, i).unwrap();
1051 keys.push(key);
1052 }
1053
1054 for key in &keys {
1055 tree.delete(key).unwrap();
1056 }
1057
1058 let expected = 300 - keys.len();
1059 assert_eq!(tree.count().unwrap(), expected);
1060
1061 let mut cursor = tree.cursor_first().unwrap();
1062 let mut results = Vec::new();
1063 while let Some((key, _)) = cursor.next().unwrap() {
1064 results.push(key);
1065 }
1066
1067 assert_eq!(results.len(), expected);
1068 let last_key = format!("key{:03}", 299).into_bytes();
1069 assert_eq!(results.last(), Some(&last_key));
1070
1071 cleanup(&path);
1072 }
1073
1074 #[test]
1075 fn test_btree_cursor() {
1076 let path = temp_db_path();
1077 cleanup(&path);
1078
1079 let pager = Arc::new(Pager::open_default(&path).unwrap());
1080 let tree = BTree::new(pager);
1081
1082 tree.insert(b"c", b"3").unwrap();
1083 tree.insert(b"a", b"1").unwrap();
1084 tree.insert(b"b", b"2").unwrap();
1085
1086 let mut cursor = tree.cursor_first().unwrap();
1087 let mut results: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
1088
1089 while let Some(entry) = cursor.next().unwrap() {
1090 results.push(entry);
1091 }
1092
1093 assert_eq!(results.len(), 3);
1094 assert_eq!(results[0], (b"a".to_vec(), b"1".to_vec()));
1095 assert_eq!(results[1], (b"b".to_vec(), b"2".to_vec()));
1096 assert_eq!(results[2], (b"c".to_vec(), b"3".to_vec()));
1097
1098 cleanup(&path);
1099 }
1100
1101 #[test]
1102 fn test_btree_range() {
1103 let path = temp_db_path();
1104 cleanup(&path);
1105
1106 let pager = Arc::new(Pager::open_default(&path).unwrap());
1107 let tree = BTree::new(pager);
1108
1109 for i in 0..10u8 {
1110 let key = format!("key{:02}", i);
1111 let value = format!("val{:02}", i);
1112 tree.insert(key.as_bytes(), value.as_bytes()).unwrap();
1113 }
1114
1115 let results = tree.range(b"key03", b"key06").unwrap();
1116
1117 assert_eq!(results.len(), 4);
1118 assert_eq!(results[0].0, b"key03".to_vec());
1119 assert_eq!(results[3].0, b"key06".to_vec());
1120
1121 cleanup(&path);
1122 }
1123
1124 #[test]
1125 fn test_btree_large_keys() {
1126 let path = temp_db_path();
1127 cleanup(&path);
1128
1129 let pager = Arc::new(Pager::open_default(&path).unwrap());
1130 let tree = BTree::new(pager);
1131
1132 let key = vec![b'x'; 500];
1133 let value = vec![b'y'; 500];
1134
1135 tree.insert(&key, &value).unwrap();
1136 assert_eq!(tree.get(&key).unwrap(), Some(value));
1137
1138 cleanup(&path);
1139 }
1140
1141 #[test]
1142 fn test_btree_key_too_large() {
1143 let path = temp_db_path();
1144 cleanup(&path);
1145
1146 let pager = Arc::new(Pager::open_default(&path).unwrap());
1147 let tree = BTree::new(pager);
1148
1149 let key = vec![b'x'; MAX_KEY_SIZE + 1];
1150 let result = tree.insert(&key, b"value");
1151
1152 assert!(matches!(result, Err(BTreeError::KeyTooLarge(_))));
1153
1154 cleanup(&path);
1155 }
1156
1157 #[test]
1158 fn test_btree_many_inserts() {
1159 let path = temp_db_path();
1160 cleanup(&path);
1161
1162 let pager = Arc::new(Pager::open_default(&path).unwrap());
1163 let tree = BTree::new(pager);
1164
1165 for i in 0..100u32 {
1167 let key = format!("key{:08}", i);
1168 let value = format!("value{:08}", i);
1169 tree.insert(key.as_bytes(), value.as_bytes()).unwrap();
1170 }
1171
1172 for i in 0..100u32 {
1174 let key = format!("key{:08}", i);
1175 let expected = format!("value{:08}", i);
1176 assert_eq!(
1177 tree.get(key.as_bytes()).unwrap(),
1178 Some(expected.into_bytes())
1179 );
1180 }
1181
1182 assert_eq!(tree.count().unwrap(), 100);
1183
1184 cleanup(&path);
1185 }
1186
1187 #[test]
1188 fn test_btree_bulk_insert_sorted_preserves_leaf_layout() {
1189 let path = temp_db_path();
1190 cleanup(&path);
1191
1192 let pager = Arc::new(Pager::open_default(&path).unwrap());
1193 let tree = BTree::new(pager);
1194
1195 let items: Vec<(Vec<u8>, Vec<u8>)> = (0..240u32)
1196 .map(|i| {
1197 (
1198 format!("key{:08}", i).into_bytes(),
1199 vec![b'v'; 32 + (i as usize % 7)],
1200 )
1201 })
1202 .collect();
1203
1204 tree.bulk_insert_sorted(&items).unwrap();
1205
1206 assert_eq!(tree.count().unwrap(), items.len());
1207
1208 for (key, value) in &items {
1209 assert_eq!(tree.get(key).unwrap(), Some(value.clone()));
1210 }
1211
1212 cleanup(&path);
1213 }
1214
1215 mod prop_upsert_batch_sorted {
1225 use super::*;
1226 use proptest::prelude::*;
1227 use std::collections::BTreeMap;
1228
1229 fn populate_tree(seed: &[(Vec<u8>, Vec<u8>)]) -> (BTree, PathBuf, Arc<Pager>) {
1232 let path = temp_db_path();
1233 cleanup(&path);
1234 let pager = Arc::new(Pager::open_default(&path).unwrap());
1235 let tree = BTree::new(Arc::clone(&pager));
1236 for (k, v) in seed {
1237 tree.upsert(k, v).unwrap();
1238 }
1239 (tree, path, pager)
1240 }
1241
1242 fn key_strategy() -> impl Strategy<Value = Vec<u8>> {
1243 prop::collection::vec(0u8..16u8, 1..=4)
1247 }
1248
1249 fn value_strategy() -> impl Strategy<Value = Vec<u8>> {
1250 prop::collection::vec(0u8..255u8, 0..=8)
1254 }
1255
1256 fn pair_strategy() -> impl Strategy<Value = (Vec<u8>, Vec<u8>)> {
1257 (key_strategy(), value_strategy())
1258 }
1259
1260 proptest! {
1261 #![proptest_config(ProptestConfig {
1262 cases: 64,
1263 .. ProptestConfig::default()
1264 })]
1265
1266 #[test]
1267 fn batch_upsert_matches_loop_upsert(
1268 seed in prop::collection::vec(pair_strategy(), 0..50),
1269 batch in prop::collection::vec(pair_strategy(), 1..200),
1270 ) {
1271 let mut seed_map: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
1274 for (k, v) in seed {
1275 seed_map.insert(k, v);
1276 }
1277 let seed: Vec<(Vec<u8>, Vec<u8>)> = seed_map.into_iter().collect();
1278
1279 let (baseline, baseline_path, _baseline_pager) = populate_tree(&seed);
1281 for (k, v) in &batch {
1282 baseline.upsert(k, v).unwrap();
1283 }
1284
1285 let (candidate, candidate_path, _candidate_pager) = populate_tree(&seed);
1288 let mut sorted = batch.clone();
1289 sorted.sort_by(|a, b| a.0.cmp(&b.0));
1290 candidate.upsert_batch_sorted(&sorted).unwrap();
1291
1292 let mut expected: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
1295 for (k, v) in &seed {
1296 expected.insert(k.clone(), v.clone());
1297 }
1298 for (k, v) in &batch {
1299 expected.insert(k.clone(), v.clone());
1300 }
1301
1302 for (k, v) in &expected {
1304 let baseline_got = baseline.get(k).unwrap();
1305 let candidate_got = candidate.get(k).unwrap();
1306 prop_assert_eq!(baseline_got.as_ref(), Some(v));
1307 prop_assert_eq!(candidate_got.as_ref(), Some(v));
1308 }
1309
1310 let collect = |t: &BTree| -> Vec<(Vec<u8>, Vec<u8>)> {
1312 let mut out = Vec::new();
1313 let mut cur = t.cursor_first().unwrap();
1314 while let Some(entry) = cur.next().unwrap() {
1315 out.push(entry);
1316 }
1317 out
1318 };
1319 let baseline_contents = collect(&baseline);
1320 let candidate_contents = collect(&candidate);
1321 prop_assert_eq!(&baseline_contents, &candidate_contents);
1322
1323 let expected_contents: Vec<(Vec<u8>, Vec<u8>)> =
1325 expected.into_iter().collect();
1326 prop_assert_eq!(&candidate_contents, &expected_contents);
1327
1328 cleanup(&baseline_path);
1329 cleanup(&candidate_path);
1330 }
1331 }
1332 }
1333
1334 #[test]
1335 fn test_btree_sorted_iteration() {
1336 let path = temp_db_path();
1337 cleanup(&path);
1338
1339 let pager = Arc::new(Pager::open_default(&path).unwrap());
1340 let tree = BTree::new(pager);
1341
1342 let keys = vec![50, 25, 75, 10, 30, 60, 80, 5, 15, 27, 35, 55, 65, 77, 90];
1344 for k in &keys {
1345 let key = format!("{:03}", k);
1346 tree.insert(key.as_bytes(), key.as_bytes()).unwrap();
1347 }
1348
1349 let mut cursor = tree.cursor_first().unwrap();
1351 let mut prev: Option<Vec<u8>> = None;
1352
1353 while let Some((key, _)) = cursor.next().unwrap() {
1354 if let Some(p) = &prev {
1355 assert!(p < &key, "Keys not in sorted order");
1356 }
1357 prev = Some(key);
1358 }
1359
1360 cleanup(&path);
1361 }
1362}
1363
1364#[cfg(test)]
1369mod overflow_pipeline_tests {
1370 use super::value_layout::{MAX_VALUE_SIZE, OVERFLOW_THRESHOLD};
1371 use super::*;
1372 use std::path::PathBuf;
1373
1374 fn temp_db_path(tag: &str) -> PathBuf {
1375 use std::sync::atomic::{AtomicU64, Ordering};
1376 static COUNTER: AtomicU64 = AtomicU64::new(0);
1377 let id = COUNTER.fetch_add(1, Ordering::Relaxed);
1378 let mut path = std::env::temp_dir();
1379 path.push(format!(
1380 "reddb_btree_spill_{}_{}_{}.db",
1381 tag,
1382 std::process::id(),
1383 id
1384 ));
1385 path
1386 }
1387
1388 fn cleanup(path: &std::path::Path) {
1389 let _ = std::fs::remove_file(path);
1390 for suffix in ["-hdr", "-meta", "-dwb"] {
1391 let mut p = path.to_path_buf().into_os_string();
1392 p.push(suffix);
1393 let _ = std::fs::remove_file(&p);
1394 }
1395 }
1396
1397 #[test]
1402 fn inline_path_allocates_no_overflow_pages() {
1403 let path = temp_db_path("inline_no_overflow");
1404 cleanup(&path);
1405 {
1406 let pager = Arc::new(Pager::open_default(&path).unwrap());
1407 let tree = BTree::new(pager.clone());
1408 let before = pager.page_count().unwrap();
1409
1410 let value = vec![0x5Au8; OVERFLOW_THRESHOLD - 1];
1411 tree.insert(b"k", &value).unwrap();
1412
1413 let after = pager.page_count().unwrap();
1414 assert_eq!(
1417 after - before,
1418 1,
1419 "inline value must allocate only the root leaf"
1420 );
1421 assert_eq!(tree.get(b"k").unwrap(), Some(value));
1422 }
1423 cleanup(&path);
1424 }
1425
1426 #[test]
1430 fn compressible_44kb_round_trips() {
1431 let path = temp_db_path("compressible_44kb");
1432 cleanup(&path);
1433 {
1434 let pager = Arc::new(Pager::open_default(&path).unwrap());
1435 let tree = BTree::new(pager);
1436
1437 let snippet = "# heading\n- bullet line that says the same thing over and over\n";
1439 let mut value = Vec::with_capacity(44 * 1024);
1440 while value.len() < 44 * 1024 {
1441 value.extend_from_slice(snippet.as_bytes());
1442 }
1443 assert!(value.len() > OVERFLOW_THRESHOLD);
1444
1445 tree.insert(b"doc", &value).unwrap();
1446 assert_eq!(tree.get(b"doc").unwrap(), Some(value));
1447 }
1448 cleanup(&path);
1449 }
1450
1451 #[test]
1454 fn incompressible_5mb_spills_and_round_trips() {
1455 let path = temp_db_path("incompressible_5mb");
1456 cleanup(&path);
1457 {
1458 let pager = Arc::new(Pager::open_default(&path).unwrap());
1459 let tree = BTree::new(pager.clone());
1460
1461 let mut state: u64 = 0xC0FFEE_DEAD_F00D;
1464 let value: Vec<u8> = (0..5 * 1024 * 1024)
1465 .map(|_| {
1466 state ^= state << 13;
1467 state ^= state >> 7;
1468 state ^= state << 17;
1469 state as u8
1470 })
1471 .collect();
1472
1473 let before = pager.page_count().unwrap();
1474 tree.insert(b"blob", &value).unwrap();
1475 let after = pager.page_count().unwrap();
1476 assert!(
1479 after - before > 2,
1480 "5 MB spill must allocate overflow pages (alloc'd {} pages)",
1481 after - before
1482 );
1483
1484 assert_eq!(
1485 tree.get(b"blob").unwrap().as_deref(),
1486 Some(value.as_slice())
1487 );
1488 }
1489 cleanup(&path);
1490 }
1491
1492 #[test]
1496 fn value_above_ceiling_rejected_without_allocation() {
1497 let path = temp_db_path("ceiling_reject");
1498 cleanup(&path);
1499 {
1500 let pager = Arc::new(Pager::open_default(&path).unwrap());
1501 let tree = BTree::new(pager.clone());
1502 let before = pager.page_count().unwrap();
1503
1504 let value = vec![0u8; MAX_VALUE_SIZE + 1];
1505 let err = tree.insert(b"too_big", &value).unwrap_err();
1506 assert!(matches!(err, BTreeError::ValueTooLarge(_)));
1507
1508 assert_eq!(
1509 pager.page_count().unwrap(),
1510 before,
1511 "rejected value must not allocate pages"
1512 );
1513 assert!(tree.is_empty());
1515 }
1516 cleanup(&path);
1517 }
1518
1519 #[test]
1524 fn bulk_insert_skips_oversized_row_keeps_rest() {
1525 let path = temp_db_path("bulk_mixed");
1526 cleanup(&path);
1527 {
1528 let pager = Arc::new(Pager::open_default(&path).unwrap());
1529 let tree = BTree::new(pager);
1530
1531 let mut items: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
1532 for i in 0..5u32 {
1534 items.push((format!("k{:04}", i).into_bytes(), vec![b'a' + i as u8; 64]));
1535 }
1536 items.push((b"k0005_huge".to_vec(), vec![0u8; MAX_VALUE_SIZE + 1]));
1538 for i in 6..11u32 {
1540 items.push((format!("k{:04}", i).into_bytes(), vec![b'z' - i as u8; 32]));
1541 }
1542
1543 let res = tree.bulk_insert_sorted(&items);
1544 assert!(matches!(res, Err(BTreeError::ValueTooLarge(_))));
1545
1546 for (k, v) in items.iter().filter(|(_, v)| v.len() <= MAX_VALUE_SIZE) {
1548 assert_eq!(
1549 tree.get(k).unwrap().as_deref(),
1550 Some(v.as_slice()),
1551 "valid row {:?} must land",
1552 k
1553 );
1554 }
1555 assert_eq!(tree.get(b"k0005_huge").unwrap(), None);
1557 }
1558 cleanup(&path);
1559 }
1560
1561 #[test]
1565 fn large_values_persist_transparently() {
1566 let path = temp_db_path("large_transparent");
1567 cleanup(&path);
1568 {
1569 let pager = Arc::new(Pager::open_default(&path).unwrap());
1570 let tree = BTree::new(pager);
1571
1572 for (i, size) in [
1577 OVERFLOW_THRESHOLD + 1,
1578 OVERFLOW_THRESHOLD * 2,
1579 64 * 1024,
1580 1 * 1024 * 1024,
1581 ]
1582 .iter()
1583 .enumerate()
1584 {
1585 let value: Vec<u8> = (0..*size).map(|n| ((n + i) & 0xff) as u8).collect();
1586 let key = format!("size{:08}", size).into_bytes();
1587 tree.insert(&key, &value).unwrap();
1588 assert_eq!(
1589 tree.get(&key).unwrap().as_deref(),
1590 Some(value.as_slice()),
1591 "size {} must round-trip",
1592 size
1593 );
1594 }
1595 }
1596 cleanup(&path);
1597 }
1598}
1599
1600#[cfg(test)]
1603mod overflow_free_tests {
1604 use super::value_layout::OVERFLOW_THRESHOLD;
1605 use super::*;
1606 use std::path::PathBuf;
1607
1608 fn temp_db_path(tag: &str) -> PathBuf {
1609 use std::sync::atomic::{AtomicU64, Ordering};
1610 static COUNTER: AtomicU64 = AtomicU64::new(0);
1611 let id = COUNTER.fetch_add(1, Ordering::Relaxed);
1612 let mut path = std::env::temp_dir();
1613 path.push(format!(
1614 "reddb_btree_free_{}_{}_{}.db",
1615 tag,
1616 std::process::id(),
1617 id
1618 ));
1619 path
1620 }
1621
1622 fn cleanup(path: &std::path::Path) {
1623 let _ = std::fs::remove_file(path);
1624 for suffix in ["-hdr", "-meta", "-dwb"] {
1625 let mut p = path.to_path_buf().into_os_string();
1626 p.push(suffix);
1627 let _ = std::fs::remove_file(&p);
1628 }
1629 }
1630
1631 fn incompressible(seed: u64, len: usize) -> Vec<u8> {
1632 let mut state = seed | 1;
1633 (0..len)
1634 .map(|_| {
1635 state ^= state << 13;
1636 state ^= state >> 7;
1637 state ^= state << 17;
1638 state as u8
1639 })
1640 .collect()
1641 }
1642
1643 #[test]
1649 fn delete_spilled_value_frees_overflow_chain() {
1650 let path = temp_db_path("delete_frees");
1651 cleanup(&path);
1652 {
1653 let pager = Arc::new(Pager::open_default(&path).unwrap());
1654 let tree = BTree::new(pager.clone());
1655
1656 let v1 = incompressible(0xA11CE, 2 * 1024 * 1024);
1658 tree.insert(b"k", &v1).unwrap();
1659 let after_first = pager.page_count().unwrap();
1660
1661 assert!(tree.delete(b"k").unwrap());
1663
1664 let v2 = incompressible(0xB0B, 2 * 1024 * 1024);
1667 tree.insert(b"k", &v2).unwrap();
1668 let after_second = pager.page_count().unwrap();
1669
1670 assert_eq!(
1671 after_second, after_first,
1672 "second spill must reuse freed pages, not grow the file"
1673 );
1674 assert_eq!(tree.get(b"k").unwrap().as_deref(), Some(v2.as_slice()));
1675 }
1676 cleanup(&path);
1677 }
1678
1679 #[test]
1684 fn shrinking_update_to_inline_frees_chain() {
1685 let path = temp_db_path("shrink_to_inline");
1686 cleanup(&path);
1687 {
1688 let pager = Arc::new(Pager::open_default(&path).unwrap());
1689 let tree = BTree::new(pager.clone());
1690
1691 let big = incompressible(0xDEAD, 2 * 1024 * 1024);
1692 tree.insert(b"k", &big).unwrap();
1693 let hwm = pager.page_count().unwrap();
1694
1695 let small = b"tiny".to_vec();
1697 tree.upsert(b"k", &small).unwrap();
1698 assert_eq!(tree.get(b"k").unwrap().as_deref(), Some(small.as_slice()));
1699
1700 let big2 = incompressible(0xBEEF, 2 * 1024 * 1024);
1704 tree.insert(b"k2", &big2).unwrap();
1705 assert_eq!(
1706 pager.page_count().unwrap(),
1707 hwm,
1708 "shrinking update must free the chain — replacement spill should reuse"
1709 );
1710 assert_eq!(tree.get(b"k2").unwrap().as_deref(), Some(big2.as_slice()));
1711 }
1712 cleanup(&path);
1713 }
1714
1715 #[test]
1720 fn shrinking_update_to_shorter_spill_frees_old_chain() {
1721 let path = temp_db_path("shrink_spill_to_spill");
1722 cleanup(&path);
1723 {
1724 let pager = Arc::new(Pager::open_default(&path).unwrap());
1725 let tree = BTree::new(pager.clone());
1726
1727 let big = incompressible(0xF00D, 3 * 1024 * 1024);
1731 tree.insert(b"k", &big).unwrap();
1732 let hwm_after_big = pager.page_count().unwrap();
1733
1734 let smaller = incompressible(0xCAFE, 1 * 1024 * 1024);
1745 tree.upsert(b"k", &smaller).unwrap();
1746 assert_eq!(tree.get(b"k").unwrap().as_deref(), Some(smaller.as_slice()));
1747 let hwm_after_upsert = pager.page_count().unwrap();
1748
1749 assert!(tree.delete(b"k").unwrap());
1750
1751 let again = incompressible(0x1234, 3 * 1024 * 1024);
1752 tree.insert(b"k", &again).unwrap();
1753 assert!(
1754 pager.page_count().unwrap() <= hwm_after_upsert,
1755 "after delete+insert of original size, file must not grow past the upsert transient \
1756 (page_count = {}, transient high-water = {}, original = {}) — the upsert must have \
1757 freed the old chain so the re-insert reuses freed pages",
1758 pager.page_count().unwrap(),
1759 hwm_after_upsert,
1760 hwm_after_big,
1761 );
1762 assert_eq!(tree.get(b"k").unwrap().as_deref(), Some(again.as_slice()));
1763 }
1764 cleanup(&path);
1765 }
1766
1767 #[test]
1770 fn insert_delete_reinsert_does_not_grow_file() {
1771 let path = temp_db_path("reinsert_no_grow");
1772 cleanup(&path);
1773 {
1774 let pager = Arc::new(Pager::open_default(&path).unwrap());
1775 let tree = BTree::new(pager.clone());
1776
1777 let v1 = incompressible(0x5EED, 4 * 1024 * 1024);
1778 tree.insert(b"row", &v1).unwrap();
1779 let pages_after_first = pager.page_count().unwrap();
1780
1781 assert!(tree.delete(b"row").unwrap());
1782
1783 let v2 = incompressible(0xACE, 4 * 1024 * 1024);
1784 tree.insert(b"row", &v2).unwrap();
1785 let pages_after_second = pager.page_count().unwrap();
1786
1787 assert_eq!(
1788 pages_after_second, pages_after_first,
1789 "re-inserting at the same size must reuse the freed chain"
1790 );
1791 assert_eq!(tree.get(b"row").unwrap().as_deref(), Some(v2.as_slice()));
1792 }
1793 cleanup(&path);
1794 }
1795
1796 #[test]
1801 fn upsert_batch_in_place_shrink_frees_chain() {
1802 let path = temp_db_path("batch_shrink");
1803 cleanup(&path);
1804 {
1805 let pager = Arc::new(Pager::open_default(&path).unwrap());
1806 let tree = BTree::new(pager.clone());
1807
1808 let big_a = incompressible(0x1111, 2 * 1024 * 1024);
1811 let big_b = incompressible(0x2222, 2 * 1024 * 1024);
1812 tree.insert(b"a", &big_a).unwrap();
1813 tree.insert(b"b", &big_b).unwrap();
1814 let hwm = pager.page_count().unwrap();
1815
1816 let updates = vec![
1817 (b"a".to_vec(), b"shrunk-a".to_vec()),
1818 (b"b".to_vec(), b"shrunk-b".to_vec()),
1819 ];
1820 tree.upsert_batch_sorted(&updates).unwrap();
1821 assert_eq!(
1822 tree.get(b"a").unwrap().as_deref(),
1823 Some(b"shrunk-a".as_slice())
1824 );
1825 assert_eq!(
1826 tree.get(b"b").unwrap().as_deref(),
1827 Some(b"shrunk-b".as_slice())
1828 );
1829
1830 let again_a = incompressible(0xAAAA, 2 * 1024 * 1024);
1833 let again_b = incompressible(0xBBBB, 2 * 1024 * 1024);
1834 tree.insert(b"c", &again_a).unwrap();
1835 tree.insert(b"d", &again_b).unwrap();
1836 assert!(
1837 pager.page_count().unwrap() <= hwm,
1838 "batch in-place shrink must free chains — replacement spills should reuse"
1839 );
1840 }
1841 cleanup(&path);
1842 }
1843}