use std::collections::{HashMap, VecDeque};
use std::mem::ManuallyDrop;
use std::ops::{Deref, DerefMut};
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
#[cfg(unix)]
use std::os::unix::fs::FileExt;
#[cfg(windows)]
use std::os::windows::fs::FileExt;
#[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
use thread_local::ThreadLocal;
use crate::bf_tree::{
BfTree, Config, StorageBackend, WalReader,
circular_buffer::{CircularBuffer, TombstoneHandle},
error::{BfTreeError, IoErrorKind},
fs::VfsImpl,
nodes::{DISK_PAGE_SIZE, INNER_NODE_SIZE, InnerNode, InnerNodeBuilder, PageID},
range_scan::ScanReturnField,
storage::{LeafStorage, PageLocation, PageTable, make_vfs},
sync::atomic::AtomicU64,
tree::eviction_callback,
utils::{BfsVisitor, NodeInfo, inner_lock::ReadGuard},
wal::{LogEntry, LogEntryImpl, WriteAheadLog},
};
const BF_TREE_MAGIC_BEGIN: &[u8; 16] = b"BF-TREE-V0-BEGIN";
const BF_TREE_MAGIC_END: &[u8; 14] = b"BF-TREE-V0-END";
const META_DATA_PAGE_OFFSET: usize = 0;
struct SectorAlignedVector {
inner: ManuallyDrop<Vec<u8>>,
}
impl Drop for SectorAlignedVector {
fn drop(&mut self) {
let layout =
std::alloc::Layout::from_size_align(self.inner.capacity(), SECTOR_SIZE).unwrap();
let ptr = self.inner.as_mut_ptr();
unsafe {
std::alloc::dealloc(ptr, layout);
}
}
}
impl SectorAlignedVector {
fn new_zeroed(capacity: usize) -> Self {
let layout = std::alloc::Layout::from_size_align(capacity, SECTOR_SIZE).unwrap();
let ptr = unsafe { std::alloc::alloc_zeroed(layout) };
let inner = unsafe { Vec::from_raw_parts(ptr, capacity, capacity) };
Self {
inner: ManuallyDrop::new(inner),
}
}
}
impl Deref for SectorAlignedVector {
type Target = Vec<u8>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl DerefMut for SectorAlignedVector {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
impl BfTree {
pub fn recovery(
config_file: impl AsRef<Path>,
wal_file: impl AsRef<Path>,
wal_segment_size: usize,
buffer_ptr: Option<*mut u8>,
) -> Result<Self, BfTreeError> {
let bf_tree_config = Config::new_with_config_file(config_file);
let snapshot_lsn = Self::read_snapshot_lsn(&bf_tree_config.file_path)?;
let bf_tree = BfTree::new_from_snapshot(bf_tree_config, buffer_ptr)?;
let wal_reader = WalReader::new(wal_file, wal_segment_size)?;
for seg in wal_reader.segment_iter() {
let seg = seg.map_err(BfTreeError::Io)?;
for entry in seg.entry_iter() {
let header = &entry.0;
if snapshot_lsn > 0 && header.lsn <= snapshot_lsn {
continue;
}
let log_entry = LogEntry::read_from_buffer(entry.1);
match log_entry {
LogEntry::Write(op) => {
bf_tree.insert(op.key, op.value);
}
LogEntry::Split(_op) => {
}
}
}
}
bf_tree.snapshot()?;
Ok(bf_tree)
}
fn read_snapshot_lsn(path: &Path) -> Result<u64, BfTreeError> {
if !path.exists() {
return Ok(0);
}
let reader = std::fs::File::open(path).map_err(|_| IoErrorKind::SnapshotRead)?;
let mut buf = SectorAlignedVector::new_zeroed(4096);
#[cfg(unix)]
{
reader
.read_at(&mut buf, 0)
.map_err(|_| IoErrorKind::SnapshotRead)?;
}
#[cfg(windows)]
{
reader
.seek_read(&mut buf, 0)
.map_err(|_| IoErrorKind::SnapshotRead)?;
}
let meta = unsafe { (buf.as_ptr() as *const BfTreeMeta).read() };
if meta.magic_begin != *BF_TREE_MAGIC_BEGIN || meta.magic_end != *BF_TREE_MAGIC_END {
return Ok(0);
}
Ok(meta.snapshot_lsn)
}
pub fn new_from_snapshot(
bf_tree_config: Config,
buffer_ptr: Option<*mut u8>,
) -> Result<Self, BfTreeError> {
if !bf_tree_config.file_path.exists() {
return BfTree::with_config(bf_tree_config.clone(), buffer_ptr);
}
bf_tree_config.validate().map_err(BfTreeError::Config)?;
let reader = std::fs::File::open(bf_tree_config.file_path.clone())
.map_err(|_| IoErrorKind::SnapshotRead)?;
let mut metadata = SectorAlignedVector::new_zeroed(4096);
#[cfg(unix)]
{
reader
.read_at(&mut metadata, 0)
.map_err(|_| IoErrorKind::SnapshotRead)?;
}
#[cfg(windows)]
{
reader
.seek_read(&mut metadata, 0)
.map_err(|_| IoErrorKind::SnapshotRead)?;
}
let bf_meta = unsafe { (metadata.as_ptr() as *const BfTreeMeta).read() };
if bf_meta.magic_begin != *BF_TREE_MAGIC_BEGIN || bf_meta.magic_end != *BF_TREE_MAGIC_END {
return BfTree::with_config(bf_tree_config.clone(), buffer_ptr);
}
let actual_size = reader
.metadata()
.map_err(|_| IoErrorKind::SnapshotRead)?
.len();
if actual_size < bf_meta.file_size {
return Err(BfTreeError::Io(IoErrorKind::Corruption));
}
let config = Arc::new(bf_tree_config);
let wal = match config.write_ahead_log.as_ref() {
Some(s) => Some(WriteAheadLog::new(s.clone()).map_err(BfTreeError::Io)?),
None => None,
};
let vfs =
make_vfs(&config.storage_backend, &config.file_path).map_err(|e| BfTreeError::Io(e))?;
let mut page_buffer = SectorAlignedVector::new_zeroed(INNER_NODE_SIZE);
let mut root_page_id = bf_meta.root_id;
if root_page_id.is_inner_node_pointer() {
let inner_mapping: Vec<(*const InnerNode, usize)> =
read_vec_from_offset(bf_meta.inner_offset, bf_meta.inner_size, &vfs)?;
let mut inner_map = HashMap::new();
for m in inner_mapping {
inner_map.insert(m.0, m.1);
}
let offset = inner_map
.get(&root_page_id.as_inner_node())
.ok_or(BfTreeError::Io(IoErrorKind::Corruption))?;
vfs.read(*offset, &mut page_buffer)
.map_err(|e| BfTreeError::Io(e))?;
let root_page = InnerNodeBuilder::new().build_from_slice(&page_buffer);
root_page_id = PageID::from_pointer(root_page);
let mut inner_resolve_queue = VecDeque::from([root_page]);
while !inner_resolve_queue.is_empty() {
let inner_ptr = inner_resolve_queue.pop_front().unwrap();
let mut inner = ReadGuard::try_read(inner_ptr).unwrap().upgrade().unwrap();
if inner.as_ref().meta.children_is_leaf() {
continue;
}
for (idx, c) in inner.as_ref().get_child_iter().enumerate() {
let offset = inner_map
.get(&c.as_inner_node())
.ok_or(BfTreeError::Io(IoErrorKind::Corruption))?;
vfs.read(*offset, &mut page_buffer)
.map_err(|e| BfTreeError::Io(e))?;
let inner_page = InnerNodeBuilder::new().build_from_slice(&page_buffer);
let inner_id = PageID::from_pointer(inner_page);
inner.as_mut().update_at_pos(idx, inner_id);
inner_resolve_queue.push_back(inner_page);
}
}
}
let leaf_mapping: Vec<(PageID, usize)> =
read_vec_from_offset(bf_meta.leaf_offset, bf_meta.leaf_size, &vfs)?;
let leaf_mapping = leaf_mapping.into_iter().map(|(pid, offset)| {
let loc = PageLocation::Base(offset);
(pid, loc)
});
let pt = PageTable::new_from_mapping(leaf_mapping, vfs.clone(), config.clone());
let circular_buffer = CircularBuffer::new(
config.cb_size_byte,
config.cb_copy_on_access_ratio,
config.cb_min_record_size,
config.cb_max_record_size,
config.leaf_page_size,
config.max_fence_len,
buffer_ptr,
config.cache_only,
);
let storage = LeafStorage::new_inner(config.clone(), pt, circular_buffer, vfs);
let raw_root_id = if root_page_id.is_id() {
root_page_id.raw() | Self::ROOT_IS_LEAF_MASK
} else {
root_page_id.raw()
};
let size_classes = Self::create_mem_page_size_classes(
config.cb_min_record_size,
config.cb_max_record_size,
config.leaf_page_size,
config.max_fence_len,
config.cache_only,
);
Ok(BfTree {
storage,
root_page_id: AtomicU64::new(raw_root_id),
wal,
write_load_full_page: true,
cache_only: false,
mini_page_size_classes: size_classes,
config,
#[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
metrics_recorder: Some(Arc::new(ThreadLocal::new())),
})
}
pub fn snapshot(&self) -> Result<PathBuf, BfTreeError> {
let callback = |h| -> Result<TombstoneHandle, TombstoneHandle> {
match eviction_callback(&h, self) {
Ok(_) => Ok(h),
Err(_e) => Err(h),
}
};
self.storage.circular_buffer.drain(callback);
let root_id = self.get_root_page();
let mut inner_mapping: Vec<(*const InnerNode, usize)> = Vec::new();
let visitor = BfsVisitor::new_inner_only(self);
let mut batched_writes: Vec<(usize, Vec<u8>)> = Vec::new();
for node in visitor {
match node {
NodeInfo::Inner { ptr, .. } => {
let inner = ReadGuard::try_read(ptr).unwrap();
if inner.as_ref().is_valid_disk_offset() {
let offset = inner.as_ref().disk_offset as usize;
batched_writes.push((offset, inner.as_ref().as_slice().to_vec()));
inner_mapping.push((ptr, offset));
}
}
NodeInfo::Leaf { level, .. } => {
assert_eq!(level, 0);
}
}
}
for (offset, data) in &batched_writes {
self.storage
.vfs
.write(*offset, data)
.map_err(BfTreeError::Io)?;
}
let (inner_offset, inner_size) = serialize_vec_to_disk(&inner_mapping, &self.storage.vfs)?;
let mut leaf_mapping = Vec::new();
let page_table_iter = self.storage.page_table.iter();
for (entry, pid) in page_table_iter {
assert!(pid.is_id());
match entry.try_read().unwrap().as_ref() {
PageLocation::Base(base) => leaf_mapping.push((pid, *base)),
PageLocation::Full(_) | PageLocation::Mini(_) => {
continue;
}
PageLocation::Null => panic!("Snapshot of Null page"),
}
}
let (leaf_offset, leaf_size) = serialize_vec_to_disk(&leaf_mapping, &self.storage.vfs)?;
let file_size = (leaf_offset + align_to_sector_size(leaf_size)) as u64;
#[cfg(feature = "std")]
let snapshot_lsn = self.wal.as_ref().map(|w| w.get_flushed_lsn()).unwrap_or(0);
#[cfg(not(feature = "std"))]
let snapshot_lsn = 0u64;
let metadata = BfTreeMeta {
magic_begin: *BF_TREE_MAGIC_BEGIN,
root_id: root_id.0,
inner_offset,
inner_size,
leaf_offset,
leaf_size,
file_size,
snapshot_lsn,
magic_end: *BF_TREE_MAGIC_END,
};
self.storage
.vfs
.write(META_DATA_PAGE_OFFSET, metadata.as_slice())
.map_err(BfTreeError::Io)?;
self.storage.vfs.flush().map_err(BfTreeError::Io)?;
self.storage.page_table.set_cow_boundary(file_size as usize);
Ok(self.config.file_path.clone())
}
pub fn snapshot_memory_to_disk(
&self,
snapshot_path: impl AsRef<Path>,
) -> Result<PathBuf, BfTreeError> {
let snapshot_path = snapshot_path.as_ref();
assert!(
!snapshot_path.exists(),
"snapshot_memory_to_disk: target file already exists: {:?}",
snapshot_path
);
let mut disk_config = self.config.as_ref().clone();
disk_config.storage_backend(StorageBackend::Std);
disk_config.cache_only(false);
disk_config.file_path(snapshot_path);
let disk_tree = BfTree::with_config(disk_config, None)?;
if self.cache_only {
panic!("snapshot_memory_to_disk does not support cache_only trees");
} else {
Self::copy_records_via_scan(self, &disk_tree);
}
disk_tree.snapshot()
}
fn copy_records_via_scan(src: &BfTree, dst: &BfTree) {
let buf_size = src.config.leaf_page_size;
let mut scan_buf = vec![0u8; buf_size];
let start_key: &[u8] = &[0u8];
let mut scan_iter =
match src.scan_with_count(start_key, usize::MAX, ScanReturnField::KeyAndValue) {
Ok(iter) => iter,
Err(_) => return, };
while let Ok(Some((key_len, value_len))) = scan_iter.next(&mut scan_buf) {
let key = &scan_buf[..key_len];
let value = &scan_buf[key_len..key_len + value_len];
dst.insert(key, value);
}
}
pub fn new_from_snapshot_disk_to_memory(
snapshot_path: impl AsRef<Path>,
memory_config: Config,
) -> Result<Self, BfTreeError> {
let snapshot_path = snapshot_path.as_ref();
assert!(
snapshot_path.exists(),
"new_from_snapshot_disk_to_memory: snapshot file does not exist: {:?}",
snapshot_path
);
let mut disk_config = memory_config.clone();
disk_config.storage_backend(StorageBackend::Std);
disk_config.cache_only(false);
disk_config.file_path(snapshot_path);
let disk_tree = BfTree::new_from_snapshot(disk_config, None)?;
let mut mem_config = memory_config;
mem_config.storage_backend(StorageBackend::Memory);
mem_config.cache_only(false);
let mem_tree = BfTree::with_config(mem_config, None)?;
Self::copy_records_via_scan(&disk_tree, &mem_tree);
Ok(mem_tree)
}
}
#[repr(C, align(512))]
struct BfTreeMeta {
magic_begin: [u8; 16],
root_id: PageID,
inner_offset: usize,
inner_size: usize,
leaf_offset: usize,
leaf_size: usize,
file_size: u64,
snapshot_lsn: u64,
magic_end: [u8; 14],
}
const _: () = assert!(std::mem::size_of::<BfTreeMeta>() <= DISK_PAGE_SIZE);
impl BfTreeMeta {
fn as_slice(&self) -> &[u8] {
let ptr = self as *const Self as *const u8;
let size = std::mem::size_of::<Self>();
unsafe { std::slice::from_raw_parts(ptr, size) }
}
fn check_magic(&self) {
assert_eq!(self.magic_begin, *BF_TREE_MAGIC_BEGIN);
assert_eq!(self.magic_end, *BF_TREE_MAGIC_END);
}
}
fn serialize_vec_to_disk<T>(
v: &[T],
vfs: &Arc<dyn VfsImpl>,
) -> Result<(usize, usize), BfTreeError> {
if v.is_empty() {
return Ok((0, 0));
}
let unaligned_ptr = v.as_ptr() as *const u8;
let unaligned_size = std::mem::size_of_val(v);
let aligned_size = align_to_sector_size(unaligned_size);
let layout = std::alloc::Layout::from_size_align(aligned_size, SECTOR_SIZE).unwrap();
unsafe {
let aligned_ptr = std::alloc::alloc_zeroed(layout);
std::ptr::copy_nonoverlapping(unaligned_ptr, aligned_ptr, unaligned_size);
let slice = std::slice::from_raw_parts(aligned_ptr, aligned_size);
let offset = serialize_u8_slice_to_disk(slice, vfs)?;
std::alloc::dealloc(aligned_ptr, layout);
Ok((offset, unaligned_size))
}
}
fn read_vec_from_offset<T: Clone>(
offset: usize,
size: usize,
vfs: &Arc<dyn VfsImpl>,
) -> Result<Vec<T>, BfTreeError> {
if size == 0 {
return Err(BfTreeError::Io(IoErrorKind::Corruption));
}
let slice = read_u8_slice_from_disk(offset, size, vfs)?;
let ptr = slice.as_ptr() as *const T;
let count = size / std::mem::size_of::<T>();
let items = unsafe { std::slice::from_raw_parts(ptr, count) };
Ok(items.to_vec())
}
fn read_u8_slice_from_disk(
offset: usize,
size: usize,
vfs: &Arc<dyn VfsImpl>,
) -> Result<Vec<u8>, BfTreeError> {
let mut res = Vec::new();
let mut buffer = vec![0; DISK_PAGE_SIZE];
for i in (0..size).step_by(DISK_PAGE_SIZE) {
vfs.read(offset + i, &mut buffer)
.map_err(|e| BfTreeError::Io(e))?;
res.extend_from_slice(&buffer);
}
Ok(res)
}
const SECTOR_SIZE: usize = 512;
fn align_to_sector_size(n: usize) -> usize {
(n + SECTOR_SIZE - 1) & !(SECTOR_SIZE - 1)
}
fn serialize_u8_slice_to_disk(slice: &[u8], vfs: &Arc<dyn VfsImpl>) -> Result<usize, BfTreeError> {
if slice.is_empty() {
return Err(BfTreeError::Io(IoErrorKind::SnapshotWrite));
}
let alloc_size = (slice.len() + DISK_PAGE_SIZE - 1) & !(DISK_PAGE_SIZE - 1);
let start_offset = vfs.alloc_offset(alloc_size);
vfs.write(start_offset, slice).map_err(BfTreeError::Io)?;
Ok(start_offset)
}
#[cfg(all(test, not(feature = "shuttle")))]
mod tests {
use crate::bf_tree::{
BfTree, Config, nodes::leaf_node::LeafReadResult, range_scan::ScanReturnField,
utils::test_util::install_value_to_buffer,
};
use rstest::rstest;
use std::str::FromStr;
#[rstest]
#[case(64, 2408, 8192, 500, "target/test_simple_1.bftree")] #[case(64, 2048, 16384, 500, "target/test_simple_2.bftree")] #[case(3072, 3072, 16384, 500, "target/test_simple_3.bftree")] fn persist_roundtrip_simple(
#[case] min_record_size: usize,
#[case] max_record_size: usize,
#[case] leaf_page_size: usize,
#[case] record_cnt: usize,
#[case] snapshot_file_path: String,
) {
let tmp_file_path = std::path::PathBuf::from_str(&snapshot_file_path).unwrap();
let mut config = Config::new(&tmp_file_path, leaf_page_size * 16); config.storage_backend(crate::bf_tree::StorageBackend::Std);
config.cb_min_record_size = min_record_size;
config.cb_max_record_size = max_record_size;
config.leaf_page_size = leaf_page_size;
config.max_fence_len = max_record_size;
let bftree = BfTree::with_config(config.clone(), None).unwrap();
let key_len: usize = min_record_size / 2;
let mut key_buffer = vec![0; key_len / 8];
for r in 0..record_cnt {
let key = install_value_to_buffer(&mut key_buffer, r);
bftree.insert(key, key);
}
bftree.snapshot().unwrap();
drop(bftree);
let bftree = BfTree::new_from_snapshot(config.clone(), None).unwrap();
let mut out_buffer = vec![0; key_len];
for r in 0..record_cnt {
let key = install_value_to_buffer(&mut key_buffer, r);
let bytes_read = bftree.read(key, &mut out_buffer);
match bytes_read {
LeafReadResult::Found(v) => {
assert_eq!(v as usize, key_len);
assert_eq!(&out_buffer, key);
}
_ => {
panic!("Key not found");
}
}
}
std::fs::remove_file(tmp_file_path).unwrap();
}
#[test]
fn snapshot_memory_to_disk_roundtrip() {
let snapshot_path = std::path::PathBuf::from_str("target/test_mem_to_disk.bftree").unwrap();
let _ = std::fs::remove_file(&snapshot_path);
let min_record_size: usize = 64;
let max_record_size: usize = 2048;
let leaf_page_size: usize = 8192;
let mut config = Config::new(":memory:", leaf_page_size * 16);
config.cb_min_record_size = min_record_size;
config.cb_max_record_size = max_record_size;
config.leaf_page_size = leaf_page_size;
config.max_fence_len = max_record_size;
let bftree = BfTree::with_config(config.clone(), None).unwrap();
let key_len: usize = min_record_size / 2;
let record_cnt = 200;
let mut key_buffer = vec![0usize; key_len / 8];
for r in 0..record_cnt {
let key = install_value_to_buffer(&mut key_buffer, r);
bftree.insert(key, key);
}
let path = bftree.snapshot_memory_to_disk(&snapshot_path).unwrap();
assert_eq!(path, snapshot_path);
assert!(snapshot_path.exists());
drop(bftree);
let mut disk_config = Config::new(&snapshot_path, leaf_page_size * 16);
disk_config.storage_backend(crate::bf_tree::StorageBackend::Std);
disk_config.cb_min_record_size = min_record_size;
disk_config.cb_max_record_size = max_record_size;
disk_config.leaf_page_size = leaf_page_size;
disk_config.max_fence_len = max_record_size;
let loaded = BfTree::new_from_snapshot(disk_config, None).unwrap();
let mut out_buffer = vec![0u8; key_len];
for r in 0..record_cnt {
let key = install_value_to_buffer(&mut key_buffer, r);
let result = loaded.read(key, &mut out_buffer);
match result {
LeafReadResult::Found(v) => {
assert_eq!(v as usize, key_len);
assert_eq!(&out_buffer[..key_len], key);
}
other => {
panic!("Key {r} not found, got: {:?}", other);
}
}
}
std::fs::remove_file(snapshot_path).unwrap();
}
#[test]
fn snapshot_disk_to_memory_roundtrip() {
let snapshot_path = std::path::PathBuf::from_str("target/test_disk_to_mem.bftree").unwrap();
let _ = std::fs::remove_file(&snapshot_path);
let min_record_size: usize = 64;
let max_record_size: usize = 2048;
let leaf_page_size: usize = 8192;
let record_cnt: usize = 500;
{
let mut config = Config::new(&snapshot_path, leaf_page_size * 16);
config.storage_backend(crate::bf_tree::StorageBackend::Std);
config.cb_min_record_size = min_record_size;
config.cb_max_record_size = max_record_size;
config.leaf_page_size = leaf_page_size;
config.max_fence_len = max_record_size;
let tree = BfTree::with_config(config, None).unwrap();
let key_len = min_record_size / 2;
let mut key_buffer = vec![0usize; key_len / 8];
for r in 0..record_cnt {
let key = install_value_to_buffer(&mut key_buffer, r);
tree.insert(key, key);
}
tree.snapshot().unwrap();
}
let mut mem_config = Config::new(":memory:", leaf_page_size * 16);
mem_config.cb_min_record_size = min_record_size;
mem_config.cb_max_record_size = max_record_size;
mem_config.leaf_page_size = leaf_page_size;
mem_config.max_fence_len = max_record_size;
let mem_tree =
BfTree::new_from_snapshot_disk_to_memory(&snapshot_path, mem_config).unwrap();
let key_len = min_record_size / 2;
let mut key_buffer = vec![0usize; key_len / 8];
let mut out_buffer = vec![0u8; key_len];
for r in 0..record_cnt {
let key = install_value_to_buffer(&mut key_buffer, r);
let result = mem_tree.read(key, &mut out_buffer);
match result {
LeafReadResult::Found(v) => {
assert_eq!(v as usize, key_len);
assert_eq!(&out_buffer[..key_len], key);
}
other => {
panic!("Key {r} not found, got: {:?}", other);
}
}
}
std::fs::remove_file(snapshot_path).unwrap();
}
#[test]
fn snapshot_recovery_preserves_pre_snapshot_data() {
let snapshot_path =
std::path::PathBuf::from_str("target/test_recovery_pre_snapshot.bftree").unwrap();
let _ = std::fs::remove_file(&snapshot_path);
let min_record_size: usize = 64;
let max_record_size: usize = 2048;
let leaf_page_size: usize = 8192;
let record_cnt: usize = 1000;
let make_config = || {
let mut config = Config::new(&snapshot_path, leaf_page_size * 16);
config.storage_backend(crate::bf_tree::StorageBackend::Std);
config.cb_min_record_size = min_record_size;
config.cb_max_record_size = max_record_size;
config.leaf_page_size = leaf_page_size;
config.max_fence_len = max_record_size;
config
};
let key_len: usize = min_record_size / 2;
let mut key_buffer = vec![0usize; key_len / 8];
{
let tree = BfTree::with_config(make_config(), None).unwrap();
for r in 0..record_cnt {
let key = install_value_to_buffer(&mut key_buffer, r);
tree.insert(key, key);
}
tree.snapshot().unwrap();
}
let tree = BfTree::new_from_snapshot(make_config(), None).unwrap();
let mut out_buffer = vec![0u8; key_len];
let mut found = 0usize;
for r in 0..record_cnt {
let key = install_value_to_buffer(&mut key_buffer, r);
match tree.read(key, &mut out_buffer) {
LeafReadResult::Found(v) => {
assert_eq!(v as usize, key_len);
assert_eq!(&out_buffer[..key_len], key);
found += 1;
}
other => panic!("key {r} not found after recovery: {other:?}"),
}
}
assert_eq!(found, record_cnt, "not all keys recovered");
{
let scan = tree
.scan_with_count(
&[0u8],
record_cnt + 10,
crate::bf_tree::range_scan::ScanReturnField::Key,
)
.unwrap();
let mut scan_buf = vec![0u8; key_len + max_record_size];
let mut prev: Option<Vec<u8>> = None;
let mut scan_count = 0;
let mut scan_ref = scan;
while let Ok(Some((kl, _vl))) = scan_ref.next(&mut scan_buf) {
let key = scan_buf[..kl].to_vec();
if let Some(ref p) = prev {
assert!(key > *p, "scan order violated at entry {scan_count}");
}
prev = Some(key);
scan_count += 1;
}
assert_eq!(
scan_count, record_cnt,
"scan returned wrong number of entries"
);
}
drop(tree);
std::fs::remove_file(snapshot_path).unwrap();
}
#[test]
fn recovery_after_splits_produces_correct_tree() {
let snapshot_path =
std::path::PathBuf::from_str("target/test_recovery_splits.bftree").unwrap();
let _ = std::fs::remove_file(&snapshot_path);
let min_record_size: usize = 64;
let max_record_size: usize = 2048;
let leaf_page_size: usize = 8192;
let record_cnt: usize = 2000;
let make_config = || {
let mut config = Config::new(&snapshot_path, leaf_page_size * 16);
config.storage_backend(crate::bf_tree::StorageBackend::Std);
config.cb_min_record_size = min_record_size;
config.cb_max_record_size = max_record_size;
config.leaf_page_size = leaf_page_size;
config.max_fence_len = max_record_size;
config
};
let key_len: usize = min_record_size / 2;
let mut key_buffer = vec![0usize; key_len / 8];
{
let tree = BfTree::with_config(make_config(), None).unwrap();
for r in 0..record_cnt {
let key = install_value_to_buffer(&mut key_buffer, r);
tree.insert(key, key);
}
tree.snapshot().unwrap();
}
let tree = BfTree::new_from_snapshot(make_config(), None).unwrap();
let mut out_buffer = vec![0u8; key_len];
let mut missing = 0usize;
for r in 0..record_cnt {
let key = install_value_to_buffer(&mut key_buffer, r);
match tree.read(key, &mut out_buffer) {
LeafReadResult::Found(v) => {
assert_eq!(v as usize, key_len);
}
_ => missing += 1,
}
}
assert_eq!(
missing, 0,
"{missing} keys missing after recovery with splits"
);
drop(tree);
std::fs::remove_file(snapshot_path).unwrap();
}
#[test]
fn recovery_with_checksums_enabled() {
let snapshot_path =
std::path::PathBuf::from_str("target/test_recovery_checksums.bftree").unwrap();
let _ = std::fs::remove_file(&snapshot_path);
let min_record_size: usize = 64;
let max_record_size: usize = 2048;
let leaf_page_size: usize = 8192;
let record_cnt: usize = 500;
let make_config = || {
let mut config = Config::new(&snapshot_path, leaf_page_size * 16);
config.storage_backend(crate::bf_tree::StorageBackend::Std);
config.cb_min_record_size = min_record_size;
config.cb_max_record_size = max_record_size;
config.leaf_page_size = leaf_page_size;
config.max_fence_len = max_record_size;
config.verify_checksums = true;
config
};
let key_len: usize = min_record_size / 2;
let mut key_buffer = vec![0usize; key_len / 8];
{
let tree = BfTree::with_config(make_config(), None).unwrap();
for r in 0..record_cnt {
let key = install_value_to_buffer(&mut key_buffer, r);
tree.insert(key, key);
}
tree.snapshot().unwrap();
}
let tree = BfTree::new_from_snapshot(make_config(), None).unwrap();
let mut out_buffer = vec![0u8; key_len];
for r in 0..record_cnt {
let key = install_value_to_buffer(&mut key_buffer, r);
match tree.read(key, &mut out_buffer) {
LeafReadResult::Found(v) => {
assert_eq!(v as usize, key_len);
}
other => panic!("checksum-verified read failed for key {r}: {other:?}"),
}
}
drop(tree);
std::fs::remove_file(snapshot_path).unwrap();
}
#[test]
fn wal_replay_recovers_post_snapshot_entries() {
let pid = std::process::id();
let test_dir = std::path::PathBuf::from(format!("target/test_wal_replay_{pid}"));
let _ = std::fs::remove_dir_all(&test_dir);
std::fs::create_dir_all(&test_dir).unwrap();
let snapshot_path = test_dir.join("data.bftree");
let wal_path = test_dir.join("wal.log");
let config_path = test_dir.join("config.toml");
let min_record_size: usize = 64;
let max_record_size: usize = 2048;
let leaf_page_size: usize = 8192;
let cb_size: usize = leaf_page_size * 64;
let wal_segment_size: usize = 1024 * 1024;
let pre_count: usize = 500;
let total_count: usize = 800;
let key_len: usize = min_record_size / 2;
let mut key_buffer = vec![0usize; key_len / 8];
let make_config = || {
let mut config = Config::new(&snapshot_path, cb_size);
config.storage_backend(crate::bf_tree::StorageBackend::Std);
config.cb_min_record_size = min_record_size;
config.cb_max_record_size = max_record_size;
config.leaf_page_size = leaf_page_size;
config.max_fence_len = max_record_size;
let mut wal_config = crate::bf_tree::config::WalConfig::new(&wal_path);
wal_config.segment_size(wal_segment_size);
wal_config.flush_interval(std::time::Duration::from_micros(1));
config.enable_write_ahead_log(std::sync::Arc::new(wal_config));
config
};
{
let tree = BfTree::with_config(make_config(), None).unwrap();
for r in 0..pre_count {
let key = install_value_to_buffer(&mut key_buffer, r);
tree.insert(key, key);
}
tree.snapshot().unwrap();
let mut snap_buf = vec![0u8; key_len];
for r in 0..pre_count {
let key = install_value_to_buffer(&mut key_buffer, r);
match tree.read(key, &mut snap_buf) {
LeafReadResult::Found(_) => {}
other => panic!("live tree missing key {r} right after snapshot: {other:?}"),
}
}
for r in pre_count..total_count {
let key = install_value_to_buffer(&mut key_buffer, r);
tree.insert(key, key);
}
}
let max_key_len = max_record_size / 2;
let config_toml = format!(
"cb_size_byte = {cb_size}\n\
cb_min_record_size = {min_record_size}\n\
cb_max_record_size = {max_record_size}\n\
cb_max_key_len = {max_key_len}\n\
leaf_page_size = {leaf_page_size}\n\
index_file_path = \"{}\"\n\
backend_storage = \"disk\"\n\
read_promotion_rate = 50\n\
write_load_full_page = true\n\
cache_only = false\n",
snapshot_path.to_string_lossy().replace('\\', "\\\\"),
);
std::fs::write(&config_path, &config_toml).unwrap();
{
let snap_tree =
BfTree::new_from_snapshot(Config::new_with_config_file(&config_path), None)
.expect("snapshot load failed");
let mut snap_buf = vec![0u8; key_len];
let mut missing_keys = Vec::new();
for r in 0..pre_count {
let key = install_value_to_buffer(&mut key_buffer, r);
match snap_tree.read(key, &mut snap_buf) {
LeafReadResult::Found(_) => {}
_ => missing_keys.push(r),
}
}
if !missing_keys.is_empty() {
panic!(
"snapshot alone is missing {} pre-snapshot keys (first 10: {:?})",
missing_keys.len(),
&missing_keys[..missing_keys.len().min(10)]
);
}
}
let tree = BfTree::recovery(&config_path, &wal_path, wal_segment_size, None)
.expect("WAL recovery failed");
let mut out_buffer = vec![0u8; key_len];
for r in 0..total_count {
let key = install_value_to_buffer(&mut key_buffer, r);
match tree.read(key, &mut out_buffer) {
LeafReadResult::Found(v) => {
assert_eq!(v as usize, key_len, "wrong value size for key {r}");
assert_eq!(&out_buffer[..key_len], key, "wrong value for key {r}");
}
other => panic!(
"key {r} not found after WAL recovery: {other:?} \
(pre_count={pre_count}, total={total_count})"
),
}
}
drop(tree);
let _ = std::fs::remove_dir_all(&test_dir);
}
#[test]
fn wal_replay_with_splits_produces_correct_tree() {
let pid = std::process::id();
let test_dir = std::path::PathBuf::from(format!("target/test_wal_replay_splits_{pid}"));
let _ = std::fs::remove_dir_all(&test_dir);
std::fs::create_dir_all(&test_dir).unwrap();
let snapshot_path = test_dir.join("data.bftree");
let wal_path = test_dir.join("wal.log");
let config_path = test_dir.join("config.toml");
let min_record_size: usize = 64;
let max_record_size: usize = 2048;
let leaf_page_size: usize = 8192;
let cb_size: usize = leaf_page_size * 64;
let wal_segment_size: usize = 1024 * 1024;
let pre_count: usize = 100;
let total_count: usize = 2000;
let key_len: usize = min_record_size / 2;
let mut key_buffer = vec![0usize; key_len / 8];
let make_config = || {
let mut config = Config::new(&snapshot_path, cb_size);
config.storage_backend(crate::bf_tree::StorageBackend::Std);
config.cb_min_record_size = min_record_size;
config.cb_max_record_size = max_record_size;
config.leaf_page_size = leaf_page_size;
config.max_fence_len = max_record_size;
let mut wal_config = crate::bf_tree::config::WalConfig::new(&wal_path);
wal_config.segment_size(wal_segment_size);
wal_config.flush_interval(std::time::Duration::from_micros(1));
config.enable_write_ahead_log(std::sync::Arc::new(wal_config));
config
};
{
let tree = BfTree::with_config(make_config(), None).unwrap();
for r in 0..pre_count {
let key = install_value_to_buffer(&mut key_buffer, r);
tree.insert(key, key);
}
tree.snapshot().unwrap();
for r in pre_count..total_count {
let key = install_value_to_buffer(&mut key_buffer, r);
tree.insert(key, key);
}
}
let max_key_len = max_record_size / 2;
let config_toml = format!(
"cb_size_byte = {cb_size}\n\
cb_min_record_size = {min_record_size}\n\
cb_max_record_size = {max_record_size}\n\
cb_max_key_len = {max_key_len}\n\
leaf_page_size = {leaf_page_size}\n\
index_file_path = \"{}\"\n\
backend_storage = \"disk\"\n\
read_promotion_rate = 50\n\
write_load_full_page = true\n\
cache_only = false\n",
snapshot_path.to_string_lossy().replace('\\', "\\\\"),
);
std::fs::write(&config_path, &config_toml).unwrap();
let tree = BfTree::recovery(&config_path, &wal_path, wal_segment_size, None)
.expect("WAL+split recovery failed");
let mut out_buffer = vec![0u8; key_len];
for r in 0..total_count {
let key = install_value_to_buffer(&mut key_buffer, r);
match tree.read(key, &mut out_buffer) {
LeafReadResult::Found(v) => {
assert_eq!(v as usize, key_len);
}
other => panic!("key {r} not found after WAL+split recovery: {other:?}"),
}
}
{
let scan = tree
.scan_with_count(
&[0u8],
total_count + 10,
crate::bf_tree::range_scan::ScanReturnField::Key,
)
.unwrap();
let mut scan_buf = vec![0u8; key_len + max_record_size];
let mut prev: Option<Vec<u8>> = None;
let mut scan_count = 0;
let mut scan_ref = scan;
while let Ok(Some((kl, _vl))) = scan_ref.next(&mut scan_buf) {
let key = scan_buf[..kl].to_vec();
if let Some(ref p) = prev {
assert!(key > *p, "scan order violated at entry {scan_count}");
}
prev = Some(key);
scan_count += 1;
}
assert_eq!(
scan_count, total_count,
"scan returned wrong count after WAL+split recovery"
);
}
drop(tree);
let _ = std::fs::remove_dir_all(&test_dir);
}
#[test]
fn scan_return_field_variants_after_recovery() {
let snapshot_path =
std::path::PathBuf::from_str("target/test_scan_return_fields.bftree").unwrap();
let _ = std::fs::remove_file(&snapshot_path);
let min_record_size: usize = 64;
let max_record_size: usize = 2048;
let leaf_page_size: usize = 8192;
let record_cnt: usize = 200;
let make_config = || {
let mut config = Config::new(&snapshot_path, leaf_page_size * 16);
config.storage_backend(crate::bf_tree::StorageBackend::Std);
config.cb_min_record_size = min_record_size;
config.cb_max_record_size = max_record_size;
config.leaf_page_size = leaf_page_size;
config.max_fence_len = max_record_size;
config
};
let key_len: usize = min_record_size / 2;
let mut key_buffer = vec![0usize; key_len / 8];
{
let tree = BfTree::with_config(make_config(), None).unwrap();
for r in 0..record_cnt {
let key = install_value_to_buffer(&mut key_buffer, r);
tree.insert(key, key);
}
tree.snapshot().unwrap();
}
let tree = BfTree::new_from_snapshot(make_config(), None).unwrap();
let buf_size = key_len + max_record_size;
{
let scan = tree
.scan_with_count(&[0u8], record_cnt + 10, ScanReturnField::Value)
.unwrap();
let mut scan_buf = vec![0u8; buf_size];
let mut count = 0;
let mut scan_ref = scan;
while let Ok(Some((_kl, vl))) = scan_ref.next(&mut scan_buf) {
assert_eq!(vl, key_len, "value size mismatch at entry {count}");
count += 1;
}
assert_eq!(count, record_cnt, "Value scan returned wrong count");
}
{
let scan = tree
.scan_with_count(&[0u8], record_cnt + 10, ScanReturnField::KeyAndValue)
.unwrap();
let mut scan_buf = vec![0u8; buf_size];
let mut prev: Option<Vec<u8>> = None;
let mut count = 0;
let mut scan_ref = scan;
while let Ok(Some((kl, vl))) = scan_ref.next(&mut scan_buf) {
assert_eq!(kl, key_len, "key size mismatch at entry {count}");
assert_eq!(vl, key_len, "value size mismatch at entry {count}");
assert_eq!(
&scan_buf[..kl],
&scan_buf[kl..kl + vl],
"key != value at entry {count}"
);
let key = scan_buf[..kl].to_vec();
if let Some(ref p) = prev {
assert!(key > *p, "KeyAndValue scan order violated at entry {count}");
}
prev = Some(key);
count += 1;
}
assert_eq!(count, record_cnt, "KeyAndValue scan returned wrong count");
}
std::fs::remove_file(snapshot_path).unwrap();
}
#[test]
fn scan_with_end_key_after_recovery() {
let snapshot_path =
std::path::PathBuf::from_str("target/test_scan_end_key.bftree").unwrap();
let _ = std::fs::remove_file(&snapshot_path);
let min_record_size: usize = 64;
let max_record_size: usize = 2048;
let leaf_page_size: usize = 8192;
let record_cnt: usize = 500;
let make_config = || {
let mut config = Config::new(&snapshot_path, leaf_page_size * 16);
config.storage_backend(crate::bf_tree::StorageBackend::Std);
config.cb_min_record_size = min_record_size;
config.cb_max_record_size = max_record_size;
config.leaf_page_size = leaf_page_size;
config.max_fence_len = max_record_size;
config
};
let key_len: usize = min_record_size / 2;
let mut key_buffer = vec![0usize; key_len / 8];
let mut all_keys: Vec<Vec<u8>> = (0..record_cnt)
.map(|r| {
let key = install_value_to_buffer(&mut key_buffer, r);
key.to_vec()
})
.collect();
all_keys.sort();
{
let tree = BfTree::with_config(make_config(), None).unwrap();
for r in 0..record_cnt {
let key = install_value_to_buffer(&mut key_buffer, r);
tree.insert(key, key);
}
tree.snapshot().unwrap();
}
let tree = BfTree::new_from_snapshot(make_config(), None).unwrap();
let buf_size = key_len + max_record_size;
let start_idx = record_cnt / 4;
let end_idx = (record_cnt * 3) / 4;
let start_key = &all_keys[start_idx];
let end_key = &all_keys[end_idx];
let expected = all_keys
.iter()
.filter(|k| k.as_slice() >= start_key.as_slice() && k.as_slice() <= end_key.as_slice())
.count();
let scan = tree
.scan_with_end_key(start_key, end_key, ScanReturnField::Key)
.unwrap();
let mut scan_buf = vec![0u8; buf_size];
let mut prev: Option<Vec<u8>> = None;
let mut count = 0;
let mut scan_ref = scan;
while let Ok(Some((kl, _vl))) = scan_ref.next(&mut scan_buf) {
let key = scan_buf[..kl].to_vec();
assert!(
key.as_slice() >= start_key.as_slice(),
"scan returned key below start_key at entry {count}"
);
assert!(
key.as_slice() <= end_key.as_slice(),
"scan returned key > end_key at entry {count}"
);
if let Some(ref p) = prev {
assert!(key > *p, "end_key scan order violated at entry {count}");
}
prev = Some(key);
count += 1;
}
assert_eq!(
count, expected,
"end_key scan returned {count} entries, expected {expected}"
);
std::fs::remove_file(snapshot_path).unwrap();
}
}