use std::cmp::Ordering;
use std::sync::{Arc, RwLock};
use super::page::{Page, PageType, HEADER_SIZE, PAGE_SIZE};
use super::pager::{Pager, PagerError};
pub const MAX_KEY_SIZE: usize = 1024;
pub const MAX_VALUE_SIZE: usize = value_layout::MAX_VALUE_SIZE;
pub const MIN_FILL_FACTOR: usize = 25;
const LEAF_PREV_OFFSET: usize = HEADER_SIZE;
const LEAF_NEXT_OFFSET: usize = HEADER_SIZE + 4;
const LEAF_SLOT_ARRAY_OFFSET: usize = HEADER_SIZE + 8;
const LEAF_DATA_OFFSET: usize = LEAF_SLOT_ARRAY_OFFSET;
const INTERIOR_DATA_OFFSET: usize = HEADER_SIZE;
#[derive(Debug, Clone)]
pub enum BTreeError {
NotFound,
DuplicateKey,
KeyTooLarge(usize),
ValueTooLarge(usize),
Corrupted(String),
Pager(String),
LockPoisoned(String),
ValueLayout(String),
}
impl std::fmt::Display for BTreeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::NotFound => write!(f, "Key not found"),
Self::DuplicateKey => write!(f, "Key already exists"),
Self::KeyTooLarge(size) => {
write!(f, "Key too large: {} bytes (max {})", size, MAX_KEY_SIZE)
}
Self::ValueTooLarge(size) => write!(
f,
"Value too large: {} bytes (max {})",
size, MAX_VALUE_SIZE
),
Self::Corrupted(msg) => write!(f, "B-tree corrupted: {}", msg),
Self::Pager(msg) => write!(f, "Pager error: {}", msg),
Self::LockPoisoned(msg) => write!(f, "Lock poisoned: {}", msg),
Self::ValueLayout(msg) => write!(f, "Value layout: {}", msg),
}
}
}
impl From<value_layout::ValueLayoutError> for BTreeError {
fn from(e: value_layout::ValueLayoutError) -> Self {
match e {
value_layout::ValueLayoutError::ValueTooLarge(n) => Self::ValueTooLarge(n),
other => Self::ValueLayout(other.to_string()),
}
}
}
impl std::error::Error for BTreeError {}
impl From<PagerError> for BTreeError {
fn from(e: PagerError) -> Self {
Self::Pager(e.to_string())
}
}
impl From<super::page::PageError> for BTreeError {
fn from(e: super::page::PageError) -> Self {
Self::Corrupted(e.to_string())
}
}
pub type BTreeResult<T> = Result<T, BTreeError>;
pub struct BTreeCursor {
leaf_page_id: u32,
position: usize,
pager: Arc<Pager>,
prefetched_next: bool,
}
impl BTreeCursor {
pub fn next(&mut self) -> BTreeResult<Option<(Vec<u8>, Vec<u8>)>> {
if self.leaf_page_id == 0 {
return Ok(None);
}
let page = self.pager.read_page(self.leaf_page_id)?;
let cell_count = page.cell_count() as usize;
if !self.prefetched_next && self.position >= cell_count / 2 && cell_count > 0 {
let next_leaf = read_next_leaf(&page);
if next_leaf != 0 {
self.pager.prefetch_hint(next_leaf);
}
self.prefetched_next = true;
}
if self.position < cell_count {
let (key, stored) = read_leaf_cell(&page, self.position)?;
let value = value_layout::decode(&self.pager, &stored)?;
self.position += 1;
return Ok(Some((key, value)));
}
let next_leaf = read_next_leaf(&page);
if next_leaf == 0 {
self.leaf_page_id = 0;
return Ok(None);
}
self.leaf_page_id = next_leaf;
self.position = 0;
self.prefetched_next = false;
let page = self.pager.read_page(self.leaf_page_id)?;
if page.cell_count() == 0 {
return Ok(None);
}
let (key, stored) = read_leaf_cell(&page, 0)?;
let value = value_layout::decode(&self.pager, &stored)?;
self.position = 1;
Ok(Some((key, value)))
}
pub fn peek(&self) -> BTreeResult<Option<(Vec<u8>, Vec<u8>)>> {
if self.leaf_page_id == 0 {
return Ok(None);
}
let page = self.pager.read_page(self.leaf_page_id)?;
let cell_count = page.cell_count() as usize;
if self.position >= cell_count {
return Ok(None);
}
let (key, stored) = read_leaf_cell(&page, self.position)?;
let value = value_layout::decode(&self.pager, &stored)?;
Ok(Some((key, value)))
}
}
pub struct BTree {
pager: Arc<Pager>,
root_page_id: RwLock<u32>,
rightmost_leaf: RwLock<Option<(u32, Vec<u8>)>>,
}
#[path = "btree/impl.rs"]
mod btree_impl;
#[path = "btree/lehman_yao.rs"]
pub mod lehman_yao;
#[path = "btree/value_layout.rs"]
pub mod value_layout;
enum SearchResult {
Found(usize),
NotFound(usize),
}
fn search_leaf(page: &Page, key: &[u8]) -> BTreeResult<SearchResult> {
let cell_count = page.cell_count() as usize;
let mut low = 0;
let mut high = cell_count;
while low < high {
let mid = (low + high) / 2;
let (cell_key, _) = read_leaf_cell(page, mid)?;
match cell_key.as_slice().cmp(key) {
Ordering::Less => low = mid + 1,
Ordering::Greater => high = mid,
Ordering::Equal => return Ok(SearchResult::Found(mid)),
}
}
Ok(SearchResult::NotFound(low))
}
fn find_child(page: &Page, key: &[u8]) -> BTreeResult<u32> {
let cell_count = page.cell_count() as usize;
for i in 0..cell_count {
let (cell_key, child) = read_interior_cell(page, i)?;
if key < cell_key.as_slice() {
return Ok(child);
}
}
Ok(page.right_child())
}
fn find_first_child(page: &Page) -> BTreeResult<u32> {
if page.cell_count() == 0 {
return Ok(page.right_child());
}
let (_, child) = read_interior_cell(page, 0)?;
Ok(child)
}
fn leaf_min_bytes() -> usize {
(PAGE_SIZE - LEAF_DATA_OFFSET) * MIN_FILL_FACTOR / 100
}
fn interior_min_bytes() -> usize {
(PAGE_SIZE - INTERIOR_DATA_OFFSET) * MIN_FILL_FACTOR / 100
}
fn leaf_entry_size(entry: &(Vec<u8>, Vec<u8>)) -> usize {
2 + 4 + entry.0.len() + entry.1.len()
}
fn leaf_entries_size(entries: &[(Vec<u8>, Vec<u8>)]) -> usize {
entries.iter().map(leaf_entry_size).sum()
}
#[inline]
fn leaf_slot_offset_for(index: usize) -> usize {
LEAF_SLOT_ARRAY_OFFSET + index * 2
}
#[inline]
fn leaf_read_slot(page: &Page, index: usize) -> BTreeResult<usize> {
let data = page.as_bytes();
let slot_pos = leaf_slot_offset_for(index);
if slot_pos + 2 > PAGE_SIZE {
return Err(BTreeError::Corrupted("slot array overflows page".into()));
}
Ok(u16::from_le_bytes([data[slot_pos], data[slot_pos + 1]]) as usize)
}
#[inline]
fn leaf_write_slot(page: &mut Page, index: usize, cell_offset: u16) -> BTreeResult<()> {
let data = page.as_bytes_mut();
let slot_pos = leaf_slot_offset_for(index);
if slot_pos + 2 > PAGE_SIZE {
return Err(BTreeError::Corrupted("slot array overflows page".into()));
}
data[slot_pos..slot_pos + 2].copy_from_slice(&cell_offset.to_le_bytes());
Ok(())
}
#[inline]
fn leaf_slots_end(page: &Page) -> usize {
LEAF_SLOT_ARRAY_OFFSET + (page.cell_count() as usize) * 2
}
#[inline]
fn leaf_cells_start(page: &Page) -> usize {
let end = page.free_end() as usize;
if end == 0 {
PAGE_SIZE
} else {
end
}
}
#[inline]
fn leaf_free_bytes(page: &Page) -> usize {
let slot_end = leaf_slots_end(page);
let cells = leaf_cells_start(page);
cells.saturating_sub(slot_end)
}
fn interior_key_size(key: &[u8]) -> usize {
2 + key.len() + 4
}
fn interior_entries_size(keys: &[Vec<u8>]) -> usize {
keys.iter().map(|k| interior_key_size(k)).sum()
}
fn read_leaf_cell(page: &Page, index: usize) -> BTreeResult<(Vec<u8>, Vec<u8>)> {
let cell_count = page.cell_count() as usize;
if index >= cell_count {
return Err(BTreeError::Corrupted("Cell index out of range".into()));
}
let offset = leaf_read_slot(page, index)?;
let data = page.as_bytes();
if offset + 4 > PAGE_SIZE {
return Err(BTreeError::Corrupted("cell header out of range".into()));
}
let key_len = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
let val_len = u16::from_le_bytes([data[offset + 2], data[offset + 3]]) as usize;
let end = offset + 4 + key_len + val_len;
if end > PAGE_SIZE {
return Err(BTreeError::Corrupted("cell body out of range".into()));
}
let key = data[offset + 4..offset + 4 + key_len].to_vec();
let value = data[offset + 4 + key_len..end].to_vec();
Ok((key, value))
}
fn read_leaf_entries(page: &Page) -> BTreeResult<Vec<(Vec<u8>, Vec<u8>)>> {
let cell_count = page.cell_count() as usize;
let mut entries = Vec::with_capacity(cell_count);
for i in 0..cell_count {
entries.push(read_leaf_cell(page, i)?);
}
Ok(entries)
}
fn write_leaf_entries(page: &mut Page, entries: &[(Vec<u8>, Vec<u8>)]) -> BTreeResult<()> {
clear_leaf_cells(page);
for (i, (k, v)) in entries.iter().enumerate() {
let cell_size = 4 + k.len() + v.len();
let cells_start = leaf_cells_start(page);
let slot_end = LEAF_SLOT_ARRAY_OFFSET + (i + 1) * 2;
if slot_end + cell_size > cells_start {
return Err(BTreeError::Corrupted("leaf rebuild overflowed page".into()));
}
let cell_offset = cells_start - cell_size;
{
let data = page.as_bytes_mut();
data[cell_offset..cell_offset + 2].copy_from_slice(&(k.len() as u16).to_le_bytes());
data[cell_offset + 2..cell_offset + 4].copy_from_slice(&(v.len() as u16).to_le_bytes());
data[cell_offset + 4..cell_offset + 4 + k.len()].copy_from_slice(k);
data[cell_offset + 4 + k.len()..cell_offset + 4 + k.len() + v.len()].copy_from_slice(v);
}
page.set_free_end(cell_offset as u16);
leaf_write_slot(page, i, cell_offset as u16)?;
}
page.set_cell_count(entries.len() as u16);
page.set_free_start((LEAF_SLOT_ARRAY_OFFSET + entries.len() * 2) as u16);
Ok(())
}
fn can_insert_leaf(page: &Page, key: &[u8], value: &[u8]) -> bool {
let needed = 2 + 4 + key.len() + value.len();
leaf_free_bytes(page) >= needed
}
pub(crate) fn overwrite_leaf_value(
page: &mut Page,
index: usize,
new_value: &[u8],
) -> BTreeResult<()> {
let cell_offset = leaf_read_slot(page, index)?;
let data = page.as_bytes();
if cell_offset + 4 > data.len() {
return Err(BTreeError::Corrupted(
"leaf cell header out of bounds".into(),
));
}
let k_len = u16::from_le_bytes([data[cell_offset], data[cell_offset + 1]]) as usize;
let old_v_len = u16::from_le_bytes([data[cell_offset + 2], data[cell_offset + 3]]) as usize;
if new_value.len() > old_v_len {
return Err(BTreeError::Corrupted(
"overwrite_leaf_value: new value larger than slot".into(),
));
}
let value_start = cell_offset + 4 + k_len;
if value_start + new_value.len() > data.len() {
return Err(BTreeError::Corrupted(
"leaf cell value out of bounds".into(),
));
}
let data = page.as_bytes_mut();
if new_value.len() != old_v_len {
let new_len = new_value.len() as u16;
data[cell_offset + 2..cell_offset + 4].copy_from_slice(&new_len.to_le_bytes());
}
data[value_start..value_start + new_value.len()].copy_from_slice(new_value);
Ok(())
}
fn insert_into_leaf(page: &mut Page, key: &[u8], value: &[u8]) -> BTreeResult<()> {
let cell_count = page.cell_count() as usize;
let mut low = 0;
let mut high = cell_count;
while low < high {
let mid = (low + high) / 2;
let (cell_key, _) = read_leaf_cell(page, mid)?;
match cell_key.as_slice().cmp(key) {
Ordering::Less => low = mid + 1,
Ordering::Greater => high = mid,
Ordering::Equal => {
low = mid + 1;
break;
}
}
}
let insert_pos = low;
let cell_size = 4 + key.len() + value.len();
let slot_end_after = LEAF_SLOT_ARRAY_OFFSET + (cell_count + 1) * 2;
let cells_start = leaf_cells_start(page);
if slot_end_after + cell_size > cells_start {
return Err(BTreeError::Corrupted("leaf page full".into()));
}
let cell_offset = cells_start - cell_size;
{
let data = page.as_bytes_mut();
data[cell_offset..cell_offset + 2].copy_from_slice(&(key.len() as u16).to_le_bytes());
data[cell_offset + 2..cell_offset + 4].copy_from_slice(&(value.len() as u16).to_le_bytes());
data[cell_offset + 4..cell_offset + 4 + key.len()].copy_from_slice(key);
data[cell_offset + 4 + key.len()..cell_offset + 4 + key.len() + value.len()]
.copy_from_slice(value);
}
page.set_free_end(cell_offset as u16);
{
let shift_from = leaf_slot_offset_for(insert_pos);
let shift_to = shift_from + 2;
let shift_len = (cell_count - insert_pos) * 2;
if shift_len > 0 {
let data = page.as_bytes_mut();
data.copy_within(shift_from..shift_from + shift_len, shift_to);
}
}
leaf_write_slot(page, insert_pos, cell_offset as u16)?;
page.set_cell_count((cell_count + 1) as u16);
page.set_free_start((LEAF_SLOT_ARRAY_OFFSET + (cell_count + 1) * 2) as u16);
Ok(())
}
fn delete_from_leaf(page: &mut Page, index: usize) -> BTreeResult<()> {
let cell_count = page.cell_count() as usize;
if index >= cell_count {
return Err(BTreeError::Corrupted("delete index out of range".into()));
}
if index + 1 < cell_count {
let shift_from = leaf_slot_offset_for(index + 1);
let shift_to = leaf_slot_offset_for(index);
let shift_len = (cell_count - index - 1) * 2;
let data = page.as_bytes_mut();
data.copy_within(shift_from..shift_from + shift_len, shift_to);
}
page.set_cell_count((cell_count - 1) as u16);
page.set_free_start((LEAF_SLOT_ARRAY_OFFSET + (cell_count - 1) * 2) as u16);
Ok(())
}
fn clear_leaf_cells(page: &mut Page) {
{
let data = page.as_bytes_mut();
for byte in &mut data[LEAF_SLOT_ARRAY_OFFSET..] {
*byte = 0;
}
}
page.set_cell_count(0);
page.set_free_start(LEAF_SLOT_ARRAY_OFFSET as u16);
page.set_free_end(PAGE_SIZE as u16);
}
fn init_leaf_links(page: &mut Page, prev: u32, next: u32) {
let data = page.as_bytes_mut();
data[LEAF_PREV_OFFSET..LEAF_PREV_OFFSET + 4].copy_from_slice(&prev.to_le_bytes());
data[LEAF_NEXT_OFFSET..LEAF_NEXT_OFFSET + 4].copy_from_slice(&next.to_le_bytes());
}
fn read_next_leaf(page: &Page) -> u32 {
let data = page.as_bytes();
u32::from_le_bytes([
data[LEAF_NEXT_OFFSET],
data[LEAF_NEXT_OFFSET + 1],
data[LEAF_NEXT_OFFSET + 2],
data[LEAF_NEXT_OFFSET + 3],
])
}
#[inline]
fn leaf_right_sibling(page: &Page) -> u32 {
read_next_leaf(page)
}
fn leaf_high_key(page: &Page) -> BTreeResult<Option<Vec<u8>>> {
let cell_count = page.cell_count() as usize;
if cell_count == 0 {
return Ok(None);
}
let (key, _) = read_leaf_cell(page, cell_count - 1)?;
Ok(Some(key))
}
fn set_prev_leaf(page: &mut Page, prev: u32) {
let data = page.as_bytes_mut();
data[LEAF_PREV_OFFSET..LEAF_PREV_OFFSET + 4].copy_from_slice(&prev.to_le_bytes());
}
fn set_next_leaf(page: &mut Page, next: u32) {
let data = page.as_bytes_mut();
data[LEAF_NEXT_OFFSET..LEAF_NEXT_OFFSET + 4].copy_from_slice(&next.to_le_bytes());
}
#[inline]
fn interior_slot_offset_for(index: usize) -> usize {
INTERIOR_DATA_OFFSET + index * 2
}
#[inline]
fn interior_read_slot(page: &Page, index: usize) -> BTreeResult<usize> {
let data = page.as_bytes();
let slot_pos = interior_slot_offset_for(index);
if slot_pos + 2 > PAGE_SIZE {
return Err(BTreeError::Corrupted(
"interior slot array overflows page".into(),
));
}
Ok(u16::from_le_bytes([data[slot_pos], data[slot_pos + 1]]) as usize)
}
#[inline]
fn interior_write_slot(page: &mut Page, index: usize, cell_offset: u16) -> BTreeResult<()> {
let data = page.as_bytes_mut();
let slot_pos = interior_slot_offset_for(index);
if slot_pos + 2 > PAGE_SIZE {
return Err(BTreeError::Corrupted(
"interior slot array overflows page".into(),
));
}
data[slot_pos..slot_pos + 2].copy_from_slice(&cell_offset.to_le_bytes());
Ok(())
}
#[inline]
fn interior_cells_start(page: &Page) -> usize {
let end = page.free_end() as usize;
if end == 0 {
PAGE_SIZE
} else {
end
}
}
#[inline]
fn interior_free_bytes(page: &Page) -> usize {
let slot_end = INTERIOR_DATA_OFFSET + (page.cell_count() as usize) * 2;
interior_cells_start(page).saturating_sub(slot_end)
}
fn read_interior_cell(page: &Page, index: usize) -> BTreeResult<(Vec<u8>, u32)> {
let cell_count = page.cell_count() as usize;
if index >= cell_count {
return Err(BTreeError::Corrupted("Cell index out of range".into()));
}
let offset = interior_read_slot(page, index)?;
let data = page.as_bytes();
if offset + 2 > PAGE_SIZE {
return Err(BTreeError::Corrupted(
"interior cell header out of range".into(),
));
}
let key_len = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
let end = offset + 2 + key_len + 4;
if end > PAGE_SIZE {
return Err(BTreeError::Corrupted(
"interior cell body out of range".into(),
));
}
let key = data[offset + 2..offset + 2 + key_len].to_vec();
let child = u32::from_le_bytes([
data[offset + 2 + key_len],
data[offset + 2 + key_len + 1],
data[offset + 2 + key_len + 2],
data[offset + 2 + key_len + 3],
]);
Ok((key, child))
}
fn read_interior_keys_children(page: &Page) -> BTreeResult<(Vec<Vec<u8>>, Vec<u32>)> {
let cell_count = page.cell_count() as usize;
let mut keys = Vec::with_capacity(cell_count);
let mut children = Vec::with_capacity(cell_count + 1);
for i in 0..cell_count {
let (key, child) = read_interior_cell(page, i)?;
keys.push(key);
children.push(child);
}
if cell_count == 0 {
let right_child = page.right_child();
if right_child != 0 {
children.push(right_child);
}
} else {
children.push(page.right_child());
}
Ok((keys, children))
}
fn write_interior_entries(page: &mut Page, keys: &[Vec<u8>], children: &[u32]) -> BTreeResult<()> {
if !keys.is_empty() && children.len() != keys.len() + 1 {
return Err(BTreeError::Corrupted(
"Interior keys/children length mismatch".into(),
));
}
clear_interior_cells(page);
if keys.is_empty() {
let right_child = children.first().copied().unwrap_or(0);
page.set_right_child(right_child);
return Ok(());
}
for (i, key) in keys.iter().enumerate() {
let cell_size = 2 + key.len() + 4;
let cells_start = interior_cells_start(page);
let slot_end = INTERIOR_DATA_OFFSET + (i + 1) * 2;
if slot_end + cell_size > cells_start {
return Err(BTreeError::Corrupted(
"interior rebuild overflowed page".into(),
));
}
let cell_offset = cells_start - cell_size;
{
let data = page.as_bytes_mut();
data[cell_offset..cell_offset + 2].copy_from_slice(&(key.len() as u16).to_le_bytes());
data[cell_offset + 2..cell_offset + 2 + key.len()].copy_from_slice(key);
data[cell_offset + 2 + key.len()..cell_offset + 2 + key.len() + 4]
.copy_from_slice(&children[i].to_le_bytes());
}
page.set_free_end(cell_offset as u16);
interior_write_slot(page, i, cell_offset as u16)?;
}
page.set_cell_count(keys.len() as u16);
page.set_free_start((INTERIOR_DATA_OFFSET + keys.len() * 2) as u16);
page.set_right_child(*children.last().ok_or_else(|| {
BTreeError::Corrupted("write_interior_entries: children empty with non-empty keys".into())
})?);
Ok(())
}
fn can_insert_interior(page: &Page, key: &[u8]) -> bool {
let needed = 2 + 2 + key.len() + 4;
interior_free_bytes(page) >= needed
}
fn insert_into_interior(
page: &mut Page,
key: &[u8],
left_child: u32,
right_child: u32,
) -> BTreeResult<()> {
let cell_count = page.cell_count() as usize;
let mut low = 0;
let mut high = cell_count;
while low < high {
let mid = (low + high) / 2;
let (cell_key, _) = read_interior_cell(page, mid)?;
match cell_key.as_slice().cmp(key) {
Ordering::Less => low = mid + 1,
Ordering::Greater | Ordering::Equal => high = mid,
}
}
let insert_pos = low;
if insert_pos < cell_count {
let old_offset = interior_read_slot(page, insert_pos)?;
let data = page.as_bytes();
let key_len = u16::from_le_bytes([data[old_offset], data[old_offset + 1]]) as usize;
let child_pos = old_offset + 2 + key_len;
let data = page.as_bytes_mut();
data[child_pos..child_pos + 4].copy_from_slice(&right_child.to_le_bytes());
} else {
page.set_right_child(right_child);
}
let cell_size = 2 + key.len() + 4;
let slot_end_after = INTERIOR_DATA_OFFSET + (cell_count + 1) * 2;
let cells_start = interior_cells_start(page);
if slot_end_after + cell_size > cells_start {
return Err(BTreeError::Corrupted("interior page full".into()));
}
let cell_offset = cells_start - cell_size;
{
let data = page.as_bytes_mut();
data[cell_offset..cell_offset + 2].copy_from_slice(&(key.len() as u16).to_le_bytes());
data[cell_offset + 2..cell_offset + 2 + key.len()].copy_from_slice(key);
data[cell_offset + 2 + key.len()..cell_offset + 2 + key.len() + 4]
.copy_from_slice(&left_child.to_le_bytes());
}
page.set_free_end(cell_offset as u16);
{
let shift_from = interior_slot_offset_for(insert_pos);
let shift_to = shift_from + 2;
let shift_len = (cell_count - insert_pos) * 2;
if shift_len > 0 {
let data = page.as_bytes_mut();
data.copy_within(shift_from..shift_from + shift_len, shift_to);
}
}
interior_write_slot(page, insert_pos, cell_offset as u16)?;
page.set_cell_count((cell_count + 1) as u16);
page.set_free_start((INTERIOR_DATA_OFFSET + (cell_count + 1) * 2) as u16);
Ok(())
}
fn clear_interior_cells(page: &mut Page) {
{
let data = page.as_bytes_mut();
for byte in &mut data[INTERIOR_DATA_OFFSET..] {
*byte = 0;
}
}
page.set_cell_count(0);
page.set_free_start(INTERIOR_DATA_OFFSET as u16);
page.set_free_end(PAGE_SIZE as u16);
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
fn temp_db_path() -> PathBuf {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let id = COUNTER.fetch_add(1, Ordering::Relaxed);
let mut path = std::env::temp_dir();
path.push(format!("reddb_btree_test_{}_{}.db", std::process::id(), id));
path
}
fn cleanup(path: &PathBuf) {
let _ = std::fs::remove_file(path);
}
#[test]
fn test_btree_empty() {
let path = temp_db_path();
cleanup(&path);
let pager = Arc::new(Pager::open_default(&path).unwrap());
let tree = BTree::new(pager);
assert!(tree.is_empty());
assert_eq!(tree.root_page_id(), 0);
assert_eq!(tree.get(b"key").unwrap(), None);
assert_eq!(tree.count().unwrap(), 0);
cleanup(&path);
}
#[test]
fn test_btree_single_insert() {
let path = temp_db_path();
cleanup(&path);
let pager = Arc::new(Pager::open_default(&path).unwrap());
let tree = BTree::new(pager);
tree.insert(b"hello", b"world").unwrap();
assert!(!tree.is_empty());
assert_eq!(tree.get(b"hello").unwrap(), Some(b"world".to_vec()));
assert_eq!(tree.get(b"other").unwrap(), None);
assert_eq!(tree.count().unwrap(), 1);
cleanup(&path);
}
#[test]
fn test_btree_multiple_inserts() {
let path = temp_db_path();
cleanup(&path);
let pager = Arc::new(Pager::open_default(&path).unwrap());
let tree = BTree::new(pager);
tree.insert(b"c", b"3").unwrap();
tree.insert(b"a", b"1").unwrap();
tree.insert(b"b", b"2").unwrap();
assert_eq!(tree.get(b"a").unwrap(), Some(b"1".to_vec()));
assert_eq!(tree.get(b"b").unwrap(), Some(b"2".to_vec()));
assert_eq!(tree.get(b"c").unwrap(), Some(b"3".to_vec()));
assert_eq!(tree.count().unwrap(), 3);
cleanup(&path);
}
#[test]
fn test_btree_duplicate_key() {
let path = temp_db_path();
cleanup(&path);
let pager = Arc::new(Pager::open_default(&path).unwrap());
let tree = BTree::new(pager);
tree.insert(b"key", b"value1").unwrap();
let result = tree.insert(b"key", b"value2");
assert!(matches!(result, Err(BTreeError::DuplicateKey)));
assert_eq!(tree.get(b"key").unwrap(), Some(b"value1".to_vec()));
cleanup(&path);
}
#[test]
fn test_btree_delete() {
let path = temp_db_path();
cleanup(&path);
let pager = Arc::new(Pager::open_default(&path).unwrap());
let tree = BTree::new(pager);
tree.insert(b"a", b"1").unwrap();
tree.insert(b"b", b"2").unwrap();
tree.insert(b"c", b"3").unwrap();
assert!(tree.delete(b"b").unwrap());
assert!(!tree.delete(b"d").unwrap());
assert_eq!(tree.get(b"a").unwrap(), Some(b"1".to_vec()));
assert_eq!(tree.get(b"b").unwrap(), None);
assert_eq!(tree.get(b"c").unwrap(), Some(b"3".to_vec()));
assert_eq!(tree.count().unwrap(), 2);
cleanup(&path);
}
#[test]
fn test_btree_delete_rebalance_removes_empty_leaf() {
let path = temp_db_path();
cleanup(&path);
let pager = Arc::new(Pager::open_default(&path).unwrap());
let tree = BTree::new(pager.clone());
let value = vec![b'v'; 200];
for i in 0..300u32 {
let key = format!("key{:03}", i);
tree.insert(key.as_bytes(), &value).unwrap();
}
let root_id = tree.root_page_id();
let first_leaf = tree.find_first_leaf(root_id).unwrap();
let mut leaf_ids = Vec::new();
let mut current = first_leaf;
loop {
leaf_ids.push(current);
let page = pager.read_page(current).unwrap();
let next = read_next_leaf(&page);
if next == 0 {
break;
}
current = next;
}
assert!(leaf_ids.len() >= 3);
let target_leaf = leaf_ids[1];
let page = pager.read_page(target_leaf).unwrap();
let cell_count = page.cell_count() as usize;
let mut keys = Vec::with_capacity(cell_count);
for i in 0..cell_count {
let (key, _) = read_leaf_cell(&page, i).unwrap();
keys.push(key);
}
for key in &keys {
tree.delete(key).unwrap();
}
let expected = 300 - keys.len();
assert_eq!(tree.count().unwrap(), expected);
let mut cursor = tree.cursor_first().unwrap();
let mut results = Vec::new();
while let Some((key, _)) = cursor.next().unwrap() {
results.push(key);
}
assert_eq!(results.len(), expected);
let last_key = format!("key{:03}", 299).into_bytes();
assert_eq!(results.last(), Some(&last_key));
cleanup(&path);
}
#[test]
fn test_btree_cursor() {
let path = temp_db_path();
cleanup(&path);
let pager = Arc::new(Pager::open_default(&path).unwrap());
let tree = BTree::new(pager);
tree.insert(b"c", b"3").unwrap();
tree.insert(b"a", b"1").unwrap();
tree.insert(b"b", b"2").unwrap();
let mut cursor = tree.cursor_first().unwrap();
let mut results: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
while let Some(entry) = cursor.next().unwrap() {
results.push(entry);
}
assert_eq!(results.len(), 3);
assert_eq!(results[0], (b"a".to_vec(), b"1".to_vec()));
assert_eq!(results[1], (b"b".to_vec(), b"2".to_vec()));
assert_eq!(results[2], (b"c".to_vec(), b"3".to_vec()));
cleanup(&path);
}
#[test]
fn test_btree_range() {
let path = temp_db_path();
cleanup(&path);
let pager = Arc::new(Pager::open_default(&path).unwrap());
let tree = BTree::new(pager);
for i in 0..10u8 {
let key = format!("key{:02}", i);
let value = format!("val{:02}", i);
tree.insert(key.as_bytes(), value.as_bytes()).unwrap();
}
let results = tree.range(b"key03", b"key06").unwrap();
assert_eq!(results.len(), 4);
assert_eq!(results[0].0, b"key03".to_vec());
assert_eq!(results[3].0, b"key06".to_vec());
cleanup(&path);
}
#[test]
fn test_btree_large_keys() {
let path = temp_db_path();
cleanup(&path);
let pager = Arc::new(Pager::open_default(&path).unwrap());
let tree = BTree::new(pager);
let key = vec![b'x'; 500];
let value = vec![b'y'; 500];
tree.insert(&key, &value).unwrap();
assert_eq!(tree.get(&key).unwrap(), Some(value));
cleanup(&path);
}
#[test]
fn test_btree_key_too_large() {
let path = temp_db_path();
cleanup(&path);
let pager = Arc::new(Pager::open_default(&path).unwrap());
let tree = BTree::new(pager);
let key = vec![b'x'; MAX_KEY_SIZE + 1];
let result = tree.insert(&key, b"value");
assert!(matches!(result, Err(BTreeError::KeyTooLarge(_))));
cleanup(&path);
}
#[test]
fn test_btree_many_inserts() {
let path = temp_db_path();
cleanup(&path);
let pager = Arc::new(Pager::open_default(&path).unwrap());
let tree = BTree::new(pager);
for i in 0..100u32 {
let key = format!("key{:08}", i);
let value = format!("value{:08}", i);
tree.insert(key.as_bytes(), value.as_bytes()).unwrap();
}
for i in 0..100u32 {
let key = format!("key{:08}", i);
let expected = format!("value{:08}", i);
assert_eq!(
tree.get(key.as_bytes()).unwrap(),
Some(expected.into_bytes())
);
}
assert_eq!(tree.count().unwrap(), 100);
cleanup(&path);
}
#[test]
fn test_btree_bulk_insert_sorted_preserves_leaf_layout() {
let path = temp_db_path();
cleanup(&path);
let pager = Arc::new(Pager::open_default(&path).unwrap());
let tree = BTree::new(pager);
let items: Vec<(Vec<u8>, Vec<u8>)> = (0..240u32)
.map(|i| {
(
format!("key{:08}", i).into_bytes(),
vec![b'v'; 32 + (i as usize % 7)],
)
})
.collect();
tree.bulk_insert_sorted(&items).unwrap();
assert_eq!(tree.count().unwrap(), items.len());
for (key, value) in &items {
assert_eq!(tree.get(key).unwrap(), Some(value.clone()));
}
cleanup(&path);
}
mod prop_upsert_batch_sorted {
use super::*;
use proptest::prelude::*;
use std::collections::BTreeMap;
fn populate_tree(seed: &[(Vec<u8>, Vec<u8>)]) -> (BTree, PathBuf, Arc<Pager>) {
let path = temp_db_path();
cleanup(&path);
let pager = Arc::new(Pager::open_default(&path).unwrap());
let tree = BTree::new(Arc::clone(&pager));
for (k, v) in seed {
tree.upsert(k, v).unwrap();
}
(tree, path, pager)
}
fn key_strategy() -> impl Strategy<Value = Vec<u8>> {
prop::collection::vec(0u8..16u8, 1..=4)
}
fn value_strategy() -> impl Strategy<Value = Vec<u8>> {
prop::collection::vec(0u8..255u8, 0..=8)
}
fn pair_strategy() -> impl Strategy<Value = (Vec<u8>, Vec<u8>)> {
(key_strategy(), value_strategy())
}
proptest! {
#![proptest_config(ProptestConfig {
cases: 64,
.. ProptestConfig::default()
})]
#[test]
fn batch_upsert_matches_loop_upsert(
seed in prop::collection::vec(pair_strategy(), 0..50),
batch in prop::collection::vec(pair_strategy(), 1..200),
) {
let mut seed_map: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
for (k, v) in seed {
seed_map.insert(k, v);
}
let seed: Vec<(Vec<u8>, Vec<u8>)> = seed_map.into_iter().collect();
let (baseline, baseline_path, _baseline_pager) = populate_tree(&seed);
for (k, v) in &batch {
baseline.upsert(k, v).unwrap();
}
let (candidate, candidate_path, _candidate_pager) = populate_tree(&seed);
let mut sorted = batch.clone();
sorted.sort_by(|a, b| a.0.cmp(&b.0));
candidate.upsert_batch_sorted(&sorted).unwrap();
let mut expected: BTreeMap<Vec<u8>, Vec<u8>> = BTreeMap::new();
for (k, v) in &seed {
expected.insert(k.clone(), v.clone());
}
for (k, v) in &batch {
expected.insert(k.clone(), v.clone());
}
for (k, v) in &expected {
let baseline_got = baseline.get(k).unwrap();
let candidate_got = candidate.get(k).unwrap();
prop_assert_eq!(baseline_got.as_ref(), Some(v));
prop_assert_eq!(candidate_got.as_ref(), Some(v));
}
let collect = |t: &BTree| -> Vec<(Vec<u8>, Vec<u8>)> {
let mut out = Vec::new();
let mut cur = t.cursor_first().unwrap();
while let Some(entry) = cur.next().unwrap() {
out.push(entry);
}
out
};
let baseline_contents = collect(&baseline);
let candidate_contents = collect(&candidate);
prop_assert_eq!(&baseline_contents, &candidate_contents);
let expected_contents: Vec<(Vec<u8>, Vec<u8>)> =
expected.into_iter().collect();
prop_assert_eq!(&candidate_contents, &expected_contents);
cleanup(&baseline_path);
cleanup(&candidate_path);
}
}
}
#[test]
fn test_btree_sorted_iteration() {
let path = temp_db_path();
cleanup(&path);
let pager = Arc::new(Pager::open_default(&path).unwrap());
let tree = BTree::new(pager);
let keys = vec![50, 25, 75, 10, 30, 60, 80, 5, 15, 27, 35, 55, 65, 77, 90];
for k in &keys {
let key = format!("{:03}", k);
tree.insert(key.as_bytes(), key.as_bytes()).unwrap();
}
let mut cursor = tree.cursor_first().unwrap();
let mut prev: Option<Vec<u8>> = None;
while let Some((key, _)) = cursor.next().unwrap() {
if let Some(p) = &prev {
assert!(p < &key, "Keys not in sorted order");
}
prev = Some(key);
}
cleanup(&path);
}
}
#[cfg(test)]
mod overflow_pipeline_tests {
use super::value_layout::{MAX_VALUE_SIZE, OVERFLOW_THRESHOLD};
use super::*;
use std::path::PathBuf;
fn temp_db_path(tag: &str) -> PathBuf {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let id = COUNTER.fetch_add(1, Ordering::Relaxed);
let mut path = std::env::temp_dir();
path.push(format!(
"reddb_btree_spill_{}_{}_{}.db",
tag,
std::process::id(),
id
));
path
}
fn cleanup(path: &std::path::Path) {
let _ = std::fs::remove_file(path);
for suffix in ["-hdr", "-meta", "-dwb"] {
let mut p = path.to_path_buf().into_os_string();
p.push(suffix);
let _ = std::fs::remove_file(&p);
}
}
#[test]
fn inline_path_allocates_no_overflow_pages() {
let path = temp_db_path("inline_no_overflow");
cleanup(&path);
{
let pager = Arc::new(Pager::open_default(&path).unwrap());
let tree = BTree::new(pager.clone());
let before = pager.page_count().unwrap();
let value = vec![0x5Au8; OVERFLOW_THRESHOLD - 1];
tree.insert(b"k", &value).unwrap();
let after = pager.page_count().unwrap();
assert_eq!(
after - before,
1,
"inline value must allocate only the root leaf"
);
assert_eq!(tree.get(b"k").unwrap(), Some(value));
}
cleanup(&path);
}
#[test]
fn compressible_44kb_round_trips() {
let path = temp_db_path("compressible_44kb");
cleanup(&path);
{
let pager = Arc::new(Pager::open_default(&path).unwrap());
let tree = BTree::new(pager);
let snippet = "# heading\n- bullet line that says the same thing over and over\n";
let mut value = Vec::with_capacity(44 * 1024);
while value.len() < 44 * 1024 {
value.extend_from_slice(snippet.as_bytes());
}
assert!(value.len() > OVERFLOW_THRESHOLD);
tree.insert(b"doc", &value).unwrap();
assert_eq!(tree.get(b"doc").unwrap(), Some(value));
}
cleanup(&path);
}
#[test]
fn incompressible_5mb_spills_and_round_trips() {
let path = temp_db_path("incompressible_5mb");
cleanup(&path);
{
let pager = Arc::new(Pager::open_default(&path).unwrap());
let tree = BTree::new(pager.clone());
let mut state: u64 = 0xC0FFEE_DEAD_F00D;
let value: Vec<u8> = (0..5 * 1024 * 1024)
.map(|_| {
state ^= state << 13;
state ^= state >> 7;
state ^= state << 17;
state as u8
})
.collect();
let before = pager.page_count().unwrap();
tree.insert(b"blob", &value).unwrap();
let after = pager.page_count().unwrap();
assert!(
after - before > 2,
"5 MB spill must allocate overflow pages (alloc'd {} pages)",
after - before
);
assert_eq!(
tree.get(b"blob").unwrap().as_deref(),
Some(value.as_slice())
);
}
cleanup(&path);
}
#[test]
fn value_above_ceiling_rejected_without_allocation() {
let path = temp_db_path("ceiling_reject");
cleanup(&path);
{
let pager = Arc::new(Pager::open_default(&path).unwrap());
let tree = BTree::new(pager.clone());
let before = pager.page_count().unwrap();
let value = vec![0u8; MAX_VALUE_SIZE + 1];
let err = tree.insert(b"too_big", &value).unwrap_err();
assert!(matches!(err, BTreeError::ValueTooLarge(_)));
assert_eq!(
pager.page_count().unwrap(),
before,
"rejected value must not allocate pages"
);
assert!(tree.is_empty());
}
cleanup(&path);
}
#[test]
fn bulk_insert_skips_oversized_row_keeps_rest() {
let path = temp_db_path("bulk_mixed");
cleanup(&path);
{
let pager = Arc::new(Pager::open_default(&path).unwrap());
let tree = BTree::new(pager);
let mut items: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
for i in 0..5u32 {
items.push((format!("k{:04}", i).into_bytes(), vec![b'a' + i as u8; 64]));
}
items.push((b"k0005_huge".to_vec(), vec![0u8; MAX_VALUE_SIZE + 1]));
for i in 6..11u32 {
items.push((format!("k{:04}", i).into_bytes(), vec![b'z' - i as u8; 32]));
}
let res = tree.bulk_insert_sorted(&items);
assert!(matches!(res, Err(BTreeError::ValueTooLarge(_))));
for (k, v) in items.iter().filter(|(_, v)| v.len() <= MAX_VALUE_SIZE) {
assert_eq!(
tree.get(k).unwrap().as_deref(),
Some(v.as_slice()),
"valid row {:?} must land",
k
);
}
assert_eq!(tree.get(b"k0005_huge").unwrap(), None);
}
cleanup(&path);
}
#[test]
fn large_values_persist_transparently() {
let path = temp_db_path("large_transparent");
cleanup(&path);
{
let pager = Arc::new(Pager::open_default(&path).unwrap());
let tree = BTree::new(pager);
for (i, size) in [
OVERFLOW_THRESHOLD + 1,
OVERFLOW_THRESHOLD * 2,
64 * 1024,
1 * 1024 * 1024,
]
.iter()
.enumerate()
{
let value: Vec<u8> = (0..*size).map(|n| ((n + i) & 0xff) as u8).collect();
let key = format!("size{:08}", size).into_bytes();
tree.insert(&key, &value).unwrap();
assert_eq!(
tree.get(&key).unwrap().as_deref(),
Some(value.as_slice()),
"size {} must round-trip",
size
);
}
}
cleanup(&path);
}
}
#[cfg(test)]
mod overflow_free_tests {
use super::value_layout::OVERFLOW_THRESHOLD;
use super::*;
use std::path::PathBuf;
fn temp_db_path(tag: &str) -> PathBuf {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let id = COUNTER.fetch_add(1, Ordering::Relaxed);
let mut path = std::env::temp_dir();
path.push(format!(
"reddb_btree_free_{}_{}_{}.db",
tag,
std::process::id(),
id
));
path
}
fn cleanup(path: &std::path::Path) {
let _ = std::fs::remove_file(path);
for suffix in ["-hdr", "-meta", "-dwb"] {
let mut p = path.to_path_buf().into_os_string();
p.push(suffix);
let _ = std::fs::remove_file(&p);
}
}
fn incompressible(seed: u64, len: usize) -> Vec<u8> {
let mut state = seed | 1;
(0..len)
.map(|_| {
state ^= state << 13;
state ^= state >> 7;
state ^= state << 17;
state as u8
})
.collect()
}
#[test]
fn delete_spilled_value_frees_overflow_chain() {
let path = temp_db_path("delete_frees");
cleanup(&path);
{
let pager = Arc::new(Pager::open_default(&path).unwrap());
let tree = BTree::new(pager.clone());
let v1 = incompressible(0xA11CE, 2 * 1024 * 1024);
tree.insert(b"k", &v1).unwrap();
let after_first = pager.page_count().unwrap();
assert!(tree.delete(b"k").unwrap());
let v2 = incompressible(0xB0B, 2 * 1024 * 1024);
tree.insert(b"k", &v2).unwrap();
let after_second = pager.page_count().unwrap();
assert_eq!(
after_second, after_first,
"second spill must reuse freed pages, not grow the file"
);
assert_eq!(tree.get(b"k").unwrap().as_deref(), Some(v2.as_slice()));
}
cleanup(&path);
}
#[test]
fn shrinking_update_to_inline_frees_chain() {
let path = temp_db_path("shrink_to_inline");
cleanup(&path);
{
let pager = Arc::new(Pager::open_default(&path).unwrap());
let tree = BTree::new(pager.clone());
let big = incompressible(0xDEAD, 2 * 1024 * 1024);
tree.insert(b"k", &big).unwrap();
let hwm = pager.page_count().unwrap();
let small = b"tiny".to_vec();
tree.upsert(b"k", &small).unwrap();
assert_eq!(tree.get(b"k").unwrap().as_deref(), Some(small.as_slice()));
let big2 = incompressible(0xBEEF, 2 * 1024 * 1024);
tree.insert(b"k2", &big2).unwrap();
assert_eq!(
pager.page_count().unwrap(),
hwm,
"shrinking update must free the chain — replacement spill should reuse"
);
assert_eq!(tree.get(b"k2").unwrap().as_deref(), Some(big2.as_slice()));
}
cleanup(&path);
}
#[test]
fn shrinking_update_to_shorter_spill_frees_old_chain() {
let path = temp_db_path("shrink_spill_to_spill");
cleanup(&path);
{
let pager = Arc::new(Pager::open_default(&path).unwrap());
let tree = BTree::new(pager.clone());
let big = incompressible(0xF00D, 3 * 1024 * 1024);
tree.insert(b"k", &big).unwrap();
let hwm_after_big = pager.page_count().unwrap();
let smaller = incompressible(0xCAFE, 1 * 1024 * 1024);
tree.upsert(b"k", &smaller).unwrap();
assert_eq!(tree.get(b"k").unwrap().as_deref(), Some(smaller.as_slice()));
let hwm_after_upsert = pager.page_count().unwrap();
assert!(tree.delete(b"k").unwrap());
let again = incompressible(0x1234, 3 * 1024 * 1024);
tree.insert(b"k", &again).unwrap();
assert!(
pager.page_count().unwrap() <= hwm_after_upsert,
"after delete+insert of original size, file must not grow past the upsert transient \
(page_count = {}, transient high-water = {}, original = {}) — the upsert must have \
freed the old chain so the re-insert reuses freed pages",
pager.page_count().unwrap(),
hwm_after_upsert,
hwm_after_big,
);
assert_eq!(tree.get(b"k").unwrap().as_deref(), Some(again.as_slice()));
}
cleanup(&path);
}
#[test]
fn insert_delete_reinsert_does_not_grow_file() {
let path = temp_db_path("reinsert_no_grow");
cleanup(&path);
{
let pager = Arc::new(Pager::open_default(&path).unwrap());
let tree = BTree::new(pager.clone());
let v1 = incompressible(0x5EED, 4 * 1024 * 1024);
tree.insert(b"row", &v1).unwrap();
let pages_after_first = pager.page_count().unwrap();
assert!(tree.delete(b"row").unwrap());
let v2 = incompressible(0xACE, 4 * 1024 * 1024);
tree.insert(b"row", &v2).unwrap();
let pages_after_second = pager.page_count().unwrap();
assert_eq!(
pages_after_second, pages_after_first,
"re-inserting at the same size must reuse the freed chain"
);
assert_eq!(tree.get(b"row").unwrap().as_deref(), Some(v2.as_slice()));
}
cleanup(&path);
}
#[test]
fn upsert_batch_in_place_shrink_frees_chain() {
let path = temp_db_path("batch_shrink");
cleanup(&path);
{
let pager = Arc::new(Pager::open_default(&path).unwrap());
let tree = BTree::new(pager.clone());
let big_a = incompressible(0x1111, 2 * 1024 * 1024);
let big_b = incompressible(0x2222, 2 * 1024 * 1024);
tree.insert(b"a", &big_a).unwrap();
tree.insert(b"b", &big_b).unwrap();
let hwm = pager.page_count().unwrap();
let updates = vec![
(b"a".to_vec(), b"shrunk-a".to_vec()),
(b"b".to_vec(), b"shrunk-b".to_vec()),
];
tree.upsert_batch_sorted(&updates).unwrap();
assert_eq!(
tree.get(b"a").unwrap().as_deref(),
Some(b"shrunk-a".as_slice())
);
assert_eq!(
tree.get(b"b").unwrap().as_deref(),
Some(b"shrunk-b".as_slice())
);
let again_a = incompressible(0xAAAA, 2 * 1024 * 1024);
let again_b = incompressible(0xBBBB, 2 * 1024 * 1024);
tree.insert(b"c", &again_a).unwrap();
tree.insert(b"d", &again_b).unwrap();
assert!(
pager.page_count().unwrap() <= hwm,
"batch in-place shrink must free chains — replacement spills should reuse"
);
}
cleanup(&path);
}
}