use std::collections::{HashMap, VecDeque};
use std::mem::ManuallyDrop;
use std::ops::{Deref, DerefMut};
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
#[cfg(unix)]
use std::os::unix::fs::FileExt;
#[cfg(windows)]
use std::os::windows::fs::FileExt;
#[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
use thread_local::ThreadLocal;
use crate::{
circular_buffer::{CircularBuffer, TombstoneHandle},
error::ConfigError,
fs::VfsImpl,
nodes::{InnerNode, InnerNodeBuilder, PageID, DISK_PAGE_SIZE, INNER_NODE_SIZE},
range_scan::ScanReturnField,
storage::{make_vfs, LeafStorage, PageLocation, PageTable},
sync::atomic::AtomicU64,
tree::eviction_callback,
utils::{inner_lock::ReadGuard, BfsVisitor, NodeInfo},
wal::{LogEntry, LogEntryImpl, WriteAheadLog},
BfTree, Config, StorageBackend, WalReader,
};
const BF_TREE_MAGIC_BEGIN: &[u8; 16] = b"BF-TREE-V0-BEGIN";
const BF_TREE_MAGIC_END: &[u8; 14] = b"BF-TREE-V0-END";
const META_DATA_PAGE_OFFSET: usize = 0;
struct SectorAlignedVector {
inner: ManuallyDrop<Vec<u8>>,
}
impl Drop for SectorAlignedVector {
fn drop(&mut self) {
let layout =
std::alloc::Layout::from_size_align(self.inner.capacity(), SECTOR_SIZE).unwrap();
let ptr = self.inner.as_mut_ptr();
unsafe {
std::alloc::dealloc(ptr, layout);
}
}
}
impl SectorAlignedVector {
fn new_zeroed(capacity: usize) -> Self {
let layout = std::alloc::Layout::from_size_align(capacity, SECTOR_SIZE).unwrap();
let ptr = unsafe { std::alloc::alloc_zeroed(layout) };
let inner = unsafe { Vec::from_raw_parts(ptr, capacity, capacity) };
Self {
inner: ManuallyDrop::new(inner),
}
}
}
impl Deref for SectorAlignedVector {
type Target = Vec<u8>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl DerefMut for SectorAlignedVector {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
impl BfTree {
pub fn recovery(
config_file: impl AsRef<Path>,
wal_file: impl AsRef<Path>,
buffer_ptr: Option<*mut u8>,
) {
let bf_tree_config = Config::new_with_config_file(config_file);
let bf_tree = BfTree::new_from_snapshot(bf_tree_config, buffer_ptr).unwrap();
let wal_reader = WalReader::new(wal_file, 4096);
for seg in wal_reader.segment_iter() {
for entry in seg.entry_iter() {
let log_entry = LogEntry::read_from_buffer(entry.1);
match log_entry {
LogEntry::Write(op) => {
bf_tree.insert(op.key, op.value);
}
LogEntry::Split(_op) => {
todo!("implement split op in wal!")
}
}
}
}
}
pub fn new_from_snapshot(
bf_tree_config: Config,
buffer_ptr: Option<*mut u8>,
) -> Result<Self, ConfigError> {
if !bf_tree_config.file_path.exists() {
return BfTree::with_config(bf_tree_config.clone(), buffer_ptr);
}
bf_tree_config.validate()?;
let reader = std::fs::File::open(bf_tree_config.file_path.clone()).unwrap();
let mut metadata = SectorAlignedVector::new_zeroed(4096);
#[cfg(unix)]
{
reader.read_at(&mut metadata, 0).unwrap();
}
#[cfg(windows)]
{
reader.seek_read(&mut metadata, 0).unwrap();
}
let bf_meta = unsafe { (metadata.as_ptr() as *const BfTreeMeta).read() };
bf_meta.check_magic();
assert_eq!(reader.metadata().unwrap().len(), bf_meta.file_size);
let config = Arc::new(bf_tree_config);
let wal = config
.write_ahead_log
.as_ref()
.map(|s| WriteAheadLog::new(s.clone()));
let vfs = make_vfs(&config.storage_backend, &config.file_path);
let mut page_buffer = SectorAlignedVector::new_zeroed(INNER_NODE_SIZE);
let mut root_page_id = bf_meta.root_id;
if root_page_id.is_inner_node_pointer() {
let inner_mapping: Vec<(*const InnerNode, usize)> =
read_vec_from_offset(bf_meta.inner_offset, bf_meta.inner_size, &vfs);
let mut inner_map = HashMap::new();
for m in inner_mapping {
inner_map.insert(m.0, m.1);
}
let offset = inner_map.get(&root_page_id.as_inner_node()).unwrap();
vfs.read(*offset, &mut page_buffer);
let root_page = InnerNodeBuilder::new().build_from_slice(&page_buffer);
root_page_id = PageID::from_pointer(root_page);
let mut inner_resolve_queue = VecDeque::from([root_page]);
while !inner_resolve_queue.is_empty() {
let inner_ptr = inner_resolve_queue.pop_front().unwrap();
let mut inner = ReadGuard::try_read(inner_ptr).unwrap().upgrade().unwrap();
if inner.as_ref().meta.children_is_leaf() {
continue;
}
for (idx, c) in inner.as_ref().get_child_iter().enumerate() {
let offset = inner_map.get(&c.as_inner_node()).unwrap();
vfs.read(*offset, &mut page_buffer);
let inner_page = InnerNodeBuilder::new().build_from_slice(&page_buffer);
let inner_id = PageID::from_pointer(inner_page);
inner.as_mut().update_at_pos(idx, inner_id);
inner_resolve_queue.push_back(inner_page);
}
}
}
let leaf_mapping: Vec<(PageID, usize)> =
read_vec_from_offset(bf_meta.leaf_offset, bf_meta.leaf_size, &vfs);
let leaf_mapping = leaf_mapping.into_iter().map(|(pid, offset)| {
let loc = PageLocation::Base(offset);
(pid, loc)
});
let pt = PageTable::new_from_mapping(leaf_mapping, vfs.clone(), config.clone());
let circular_buffer = CircularBuffer::new(
config.cb_size_byte,
config.cb_copy_on_access_ratio,
config.cb_min_record_size,
config.cb_max_record_size,
config.leaf_page_size,
config.max_fence_len,
buffer_ptr,
config.cache_only,
);
let storage = LeafStorage::new_inner(config.clone(), pt, circular_buffer, vfs);
let raw_root_id = if root_page_id.is_id() {
root_page_id.raw() | Self::ROOT_IS_LEAF_MASK
} else {
root_page_id.raw()
};
let size_classes = Self::create_mem_page_size_classes(
config.cb_min_record_size,
config.cb_max_record_size,
config.leaf_page_size,
config.max_fence_len,
config.cache_only,
);
Ok(BfTree {
storage,
root_page_id: AtomicU64::new(raw_root_id),
wal,
write_load_full_page: true,
cache_only: false,
mini_page_size_classes: size_classes,
config,
#[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
metrics_recorder: Some(Arc::new(ThreadLocal::new())),
})
}
pub fn snapshot(&self) -> PathBuf {
let callback = |h| -> Result<TombstoneHandle, TombstoneHandle> {
match eviction_callback(&h, self) {
Ok(_) => Ok(h),
Err(_e) => Err(h),
}
};
self.storage.circular_buffer.drain(callback);
let root_id = self.get_root_page();
let mut inner_mapping: Vec<(*const InnerNode, usize)> = Vec::new();
let visitor = BfsVisitor::new_inner_only(self);
for node in visitor {
match node {
NodeInfo::Inner { ptr, .. } => {
let inner = ReadGuard::try_read(ptr).unwrap();
if inner.as_ref().is_valid_disk_offset() {
let offset = inner.as_ref().disk_offset as usize;
self.storage.vfs.write(offset, inner.as_ref().as_slice());
inner_mapping.push((ptr, offset));
}
}
NodeInfo::Leaf { level, .. } => {
assert_eq!(level, 0);
}
}
}
let (inner_offset, inner_size) = serialize_vec_to_disk(&inner_mapping, &self.storage.vfs);
let mut leaf_mapping = Vec::new();
let page_table_iter = self.storage.page_table.iter();
for (entry, pid) in page_table_iter {
assert!(pid.is_id());
match entry.try_read().unwrap().as_ref() {
PageLocation::Base(base) => leaf_mapping.push((pid, *base)),
PageLocation::Full(_) | PageLocation::Mini(_) => {
unreachable!("Circular buffer should already be drained!")
}
PageLocation::Null => panic!("Snapshot of Null page"),
}
}
let (leaf_offset, leaf_size) = serialize_vec_to_disk(&leaf_mapping, &self.storage.vfs);
let file_size = (leaf_offset + align_to_sector_size(leaf_size)) as u64;
let metadata = BfTreeMeta {
magic_begin: *BF_TREE_MAGIC_BEGIN,
root_id: root_id.0,
inner_offset,
inner_size,
leaf_offset,
leaf_size,
file_size,
magic_end: *BF_TREE_MAGIC_END,
};
self.storage
.vfs
.write(META_DATA_PAGE_OFFSET, metadata.as_slice());
self.storage.vfs.flush();
self.config.file_path.clone()
}
pub fn snapshot_memory_to_disk(&self, snapshot_path: impl AsRef<Path>) -> PathBuf {
let snapshot_path = snapshot_path.as_ref();
assert!(
!snapshot_path.exists(),
"snapshot_memory_to_disk: target file already exists: {:?}",
snapshot_path
);
let mut disk_config = self.config.as_ref().clone();
disk_config.storage_backend(StorageBackend::Std);
disk_config.cache_only(false);
disk_config.file_path(snapshot_path);
let disk_tree = BfTree::with_config(disk_config, None)
.expect("Failed to create disk-backed BfTree for snapshot");
if self.cache_only {
panic!("snapshot_memory_to_disk does not support cache_only trees");
} else {
Self::copy_records_via_scan(self, &disk_tree);
}
disk_tree.snapshot()
}
fn copy_records_via_scan(src: &BfTree, dst: &BfTree) {
let buf_size = src.config.leaf_page_size;
let mut scan_buf = vec![0u8; buf_size];
let start_key: &[u8] = &[0u8];
let mut scan_iter =
match src.scan_with_count(start_key, usize::MAX, ScanReturnField::KeyAndValue) {
Ok(iter) => iter,
Err(_) => return, };
while let Some((key_len, value_len)) = scan_iter.next(&mut scan_buf) {
let key = &scan_buf[..key_len];
let value = &scan_buf[key_len..key_len + value_len];
dst.insert(key, value);
}
}
pub fn new_from_snapshot_disk_to_memory(
snapshot_path: impl AsRef<Path>,
memory_config: Config,
) -> Result<Self, ConfigError> {
let snapshot_path = snapshot_path.as_ref();
assert!(
snapshot_path.exists(),
"new_from_snapshot_disk_to_memory: snapshot file does not exist: {:?}",
snapshot_path
);
let mut disk_config = memory_config.clone();
disk_config.storage_backend(StorageBackend::Std);
disk_config.cache_only(false);
disk_config.file_path(snapshot_path);
let disk_tree = BfTree::new_from_snapshot(disk_config, None)?;
let mut mem_config = memory_config;
mem_config.storage_backend(StorageBackend::Memory);
mem_config.cache_only(false);
let mem_tree = BfTree::with_config(mem_config, None)?;
Self::copy_records_via_scan(&disk_tree, &mem_tree);
Ok(mem_tree)
}
}
#[repr(C, align(512))]
struct BfTreeMeta {
magic_begin: [u8; 16],
root_id: PageID,
inner_offset: usize,
inner_size: usize,
leaf_offset: usize,
leaf_size: usize,
file_size: u64,
magic_end: [u8; 14],
}
const _: () = assert!(std::mem::size_of::<BfTreeMeta>() <= DISK_PAGE_SIZE);
impl BfTreeMeta {
fn as_slice(&self) -> &[u8] {
let ptr = self as *const Self as *const u8;
let size = std::mem::size_of::<Self>();
unsafe { std::slice::from_raw_parts(ptr, size) }
}
fn check_magic(&self) {
assert_eq!(self.magic_begin, *BF_TREE_MAGIC_BEGIN);
assert_eq!(self.magic_end, *BF_TREE_MAGIC_END);
}
}
fn serialize_vec_to_disk<T>(v: &[T], vfs: &Arc<dyn VfsImpl>) -> (usize, usize) {
if v.is_empty() {
return (0, 0);
}
let unaligned_ptr = v.as_ptr() as *const u8;
let unaligned_size = std::mem::size_of_val(v);
let aligned_size = align_to_sector_size(unaligned_size);
let layout = std::alloc::Layout::from_size_align(aligned_size, SECTOR_SIZE).unwrap();
unsafe {
let aligned_ptr = std::alloc::alloc_zeroed(layout);
std::ptr::copy_nonoverlapping(unaligned_ptr, aligned_ptr, unaligned_size);
let slice = std::slice::from_raw_parts(aligned_ptr, aligned_size);
let offset = serialize_u8_slice_to_disk(slice, vfs);
std::alloc::dealloc(aligned_ptr, layout);
(offset, unaligned_size)
}
}
fn read_vec_from_offset<T: Clone>(offset: usize, size: usize, vfs: &Arc<dyn VfsImpl>) -> Vec<T> {
assert!(size > 0);
let slice = read_u8_slice_from_disk(offset, size, vfs);
let ptr = slice.as_ptr() as *const T;
let size = size / std::mem::size_of::<T>();
let slice = unsafe { std::slice::from_raw_parts(ptr, size) };
slice.to_vec()
}
fn read_u8_slice_from_disk(offset: usize, size: usize, vfs: &Arc<dyn VfsImpl>) -> Vec<u8> {
let mut res = Vec::new();
let mut buffer = vec![0; DISK_PAGE_SIZE];
for i in (0..size).step_by(DISK_PAGE_SIZE) {
vfs.read(offset + i, &mut buffer); res.extend_from_slice(&buffer);
}
res
}
const SECTOR_SIZE: usize = 512;
fn align_to_sector_size(n: usize) -> usize {
(n + SECTOR_SIZE - 1) & !(SECTOR_SIZE - 1)
}
fn serialize_u8_slice_to_disk(slice: &[u8], vfs: &Arc<dyn VfsImpl>) -> usize {
let mut start_offset = None;
for chunk in slice.chunks(DISK_PAGE_SIZE) {
let offset = vfs.alloc_offset(DISK_PAGE_SIZE); if start_offset.is_none() {
start_offset = Some(offset);
}
vfs.write(offset, chunk);
}
start_offset.unwrap()
}
#[cfg(test)]
mod tests {
use crate::{
nodes::leaf_node::LeafReadResult, utils::test_util::install_value_to_buffer, BfTree, Config,
};
use rstest::rstest;
use std::str::FromStr;
#[rstest]
#[case(64, 2408, 8192, 500, "target/test_simple_1.bftree")] #[case(64, 2048, 16384, 500, "target/test_simple_2.bftree")] #[case(3072, 3072, 16384, 500, "target/test_simple_3.bftree")] fn persist_roundtrip_simple(
#[case] min_record_size: usize,
#[case] max_record_size: usize,
#[case] leaf_page_size: usize,
#[case] record_cnt: usize,
#[case] snapshot_file_path: String,
) {
let tmp_file_path = std::path::PathBuf::from_str(&snapshot_file_path).unwrap();
let mut config = Config::new(&tmp_file_path, leaf_page_size * 16); config.storage_backend(crate::StorageBackend::Std);
config.cb_min_record_size = min_record_size;
config.cb_max_record_size = max_record_size;
config.leaf_page_size = leaf_page_size;
config.max_fence_len = max_record_size;
let bftree = BfTree::with_config(config.clone(), None).unwrap();
let key_len: usize = min_record_size / 2;
let mut key_buffer = vec![0; key_len / 8];
for r in 0..record_cnt {
let key = install_value_to_buffer(&mut key_buffer, r);
bftree.insert(key, key);
}
bftree.snapshot();
drop(bftree);
let bftree = BfTree::new_from_snapshot(config.clone(), None).unwrap();
let mut out_buffer = vec![0; key_len];
for r in 0..record_cnt {
let key = install_value_to_buffer(&mut key_buffer, r);
let bytes_read = bftree.read(key, &mut out_buffer);
match bytes_read {
LeafReadResult::Found(v) => {
assert_eq!(v as usize, key_len);
assert_eq!(&out_buffer, key);
}
_ => {
panic!("Key not found");
}
}
}
std::fs::remove_file(tmp_file_path).unwrap();
}
#[test]
fn snapshot_memory_to_disk_roundtrip() {
let snapshot_path = std::path::PathBuf::from_str("target/test_mem_to_disk.bftree").unwrap();
let _ = std::fs::remove_file(&snapshot_path);
let min_record_size: usize = 64;
let max_record_size: usize = 2048;
let leaf_page_size: usize = 8192;
let mut config = Config::new(":memory:", leaf_page_size * 16);
config.cb_min_record_size = min_record_size;
config.cb_max_record_size = max_record_size;
config.leaf_page_size = leaf_page_size;
config.max_fence_len = max_record_size;
let bftree = BfTree::with_config(config.clone(), None).unwrap();
let key_len: usize = min_record_size / 2;
let record_cnt = 200;
let mut key_buffer = vec![0usize; key_len / 8];
for r in 0..record_cnt {
let key = install_value_to_buffer(&mut key_buffer, r);
bftree.insert(key, key);
}
let path = bftree.snapshot_memory_to_disk(&snapshot_path);
assert_eq!(path, snapshot_path);
assert!(snapshot_path.exists());
drop(bftree);
let mut disk_config = Config::new(&snapshot_path, leaf_page_size * 16);
disk_config.storage_backend(crate::StorageBackend::Std);
disk_config.cb_min_record_size = min_record_size;
disk_config.cb_max_record_size = max_record_size;
disk_config.leaf_page_size = leaf_page_size;
disk_config.max_fence_len = max_record_size;
let loaded = BfTree::new_from_snapshot(disk_config, None).unwrap();
let mut out_buffer = vec![0u8; key_len];
for r in 0..record_cnt {
let key = install_value_to_buffer(&mut key_buffer, r);
let result = loaded.read(key, &mut out_buffer);
match result {
LeafReadResult::Found(v) => {
assert_eq!(v as usize, key_len);
assert_eq!(&out_buffer[..key_len], key);
}
other => {
panic!("Key {r} not found, got: {:?}", other);
}
}
}
std::fs::remove_file(snapshot_path).unwrap();
}
#[test]
fn snapshot_disk_to_memory_roundtrip() {
let snapshot_path = std::path::PathBuf::from_str("target/test_disk_to_mem.bftree").unwrap();
let _ = std::fs::remove_file(&snapshot_path);
let min_record_size: usize = 64;
let max_record_size: usize = 2048;
let leaf_page_size: usize = 8192;
let record_cnt: usize = 500;
{
let mut config = Config::new(&snapshot_path, leaf_page_size * 16);
config.storage_backend(crate::StorageBackend::Std);
config.cb_min_record_size = min_record_size;
config.cb_max_record_size = max_record_size;
config.leaf_page_size = leaf_page_size;
config.max_fence_len = max_record_size;
let tree = BfTree::with_config(config, None).unwrap();
let key_len = min_record_size / 2;
let mut key_buffer = vec![0usize; key_len / 8];
for r in 0..record_cnt {
let key = install_value_to_buffer(&mut key_buffer, r);
tree.insert(key, key);
}
tree.snapshot();
}
let mut mem_config = Config::new(":memory:", leaf_page_size * 16);
mem_config.cb_min_record_size = min_record_size;
mem_config.cb_max_record_size = max_record_size;
mem_config.leaf_page_size = leaf_page_size;
mem_config.max_fence_len = max_record_size;
let mem_tree =
BfTree::new_from_snapshot_disk_to_memory(&snapshot_path, mem_config).unwrap();
let key_len = min_record_size / 2;
let mut key_buffer = vec![0usize; key_len / 8];
let mut out_buffer = vec![0u8; key_len];
for r in 0..record_cnt {
let key = install_value_to_buffer(&mut key_buffer, r);
let result = mem_tree.read(key, &mut out_buffer);
match result {
LeafReadResult::Found(v) => {
assert_eq!(v as usize, key_len);
assert_eq!(&out_buffer[..key_len], key);
}
other => {
panic!("Key {r} not found, got: {:?}", other);
}
}
}
std::fs::remove_file(snapshot_path).unwrap();
}
}