use std::cell::UnsafeCell;
use std::collections::{HashMap, VecDeque};
use std::mem::{ManuallyDrop, MaybeUninit};
use std::ops::{Deref, DerefMut};
use std::panic;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
#[cfg(not(all(feature = "shuttle", test)))]
use rand::Rng;
#[cfg(all(feature = "shuttle", test))]
use shuttle::rand::Rng;
#[cfg(unix)]
use std::os::unix::fs::FileExt;
#[cfg(windows)]
use std::os::windows::fs::FileExt;
#[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
use thread_local::ThreadLocal;
use crate::{
circular_buffer::CircularBuffer,
error::ConfigError,
fs::VfsImpl,
mini_page_op::LeafOperations,
nodes::{leaf_node::MiniPageNextLevel, LeafNode, INVALID_DISK_OFFSET},
nodes::{InnerNode, InnerNodeBuilder, PageID, DISK_PAGE_SIZE, INNER_NODE_SIZE},
storage::{make_vfs, LeafStorage, PageLocation, PageTable},
sync::atomic::{AtomicBool, AtomicU64, Ordering},
sync::thread,
sync::RwLock,
utils::{get_rng, inner_lock::ReadGuard, BfsVisitor, NodeInfo},
wal::{LogEntry, LogEntryImpl, WriteAheadLog},
BfTree, Config, StorageBackend, WalConfig, WalReader,
};
const BF_TREE_MAGIC_BEGIN: &[u8; 16] = b"BF-TREE-V0-BEGIN";
const BF_TREE_MAGIC_END: &[u8; 14] = b"BF-TREE-V0-END";
const META_DATA_PAGE_OFFSET: usize = 0;
const INVALID_SNAPSHOT_THREAD_ID: usize = usize::MAX; const NULL_PAGE_LOCATION_OFFSET: usize = usize::MAX; const INVALID_SNAPSHOT_STATE: u64 = u64::MAX; pub const INVALID_SNAPSHOT_VERSION: u64 = u64::MAX >> 1; const DEFAULT_MAX_SNAPSHOT_THREAD_NUM: usize = 64; const SNAPSHOT_STATE_PHASE_ID_SHIFT: usize = 61; const SNAPSHOT_STATE_PHASE_NUM: u64 = 4; const SNAPSHOT_STATE_PHASE_ID_MASK: u64 = 0b111 << SNAPSHOT_STATE_PHASE_ID_SHIFT; const SNAPSHOT_STATE_VERSION_MASK: u64 = (1 << SNAPSHOT_STATE_PHASE_ID_SHIFT) - 1;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PhaseId {
Rest,
Prepare,
InProgress,
Sweep,
}
impl PhaseId {
fn from_raw(value: u64) -> Self {
match value {
0 => PhaseId::Rest,
1 => PhaseId::Prepare,
2 => PhaseId::InProgress,
3 => PhaseId::Sweep,
_ => panic!("Invalid phase id: {}", value),
}
}
fn as_raw(self) -> u64 {
match self {
PhaseId::Rest => 0,
PhaseId::Prepare => 1,
PhaseId::InProgress => 2,
PhaseId::Sweep => 3,
}
}
}
pub struct CPRSnapShotMgr {
global_state: AtomicU64,
thread_slots: [AtomicBool; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
thread_local_states: [AtomicU64; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
thread_local_inner_mappings:
UnsafeCell<[Vec<(*const InnerNode, usize)>; DEFAULT_MAX_SNAPSHOT_THREAD_NUM]>,
thread_local_base_mappings: UnsafeCell<[Vec<(PageID, usize)>; DEFAULT_MAX_SNAPSHOT_THREAD_NUM]>,
thread_local_mini_mappings: UnsafeCell<[Vec<(PageID, usize)>; DEFAULT_MAX_SNAPSHOT_THREAD_NUM]>,
thread_local_mini_size_mappings:
UnsafeCell<[Vec<(PageID, usize)>; DEFAULT_MAX_SNAPSHOT_THREAD_NUM]>,
root_id: AtomicU64, pause_snapshot: AtomicBool,
vfs: RwLock<Arc<dyn VfsImpl>>,
snapshot_in_progress: AtomicBool,
}
unsafe impl Sync for CPRSnapShotMgr {}
unsafe impl Send for CPRSnapShotMgr {}
pub struct CPRSnapshotGuard {
snapshot_mgr: Option<Arc<CPRSnapShotMgr>>,
thread_slot_id: usize,
snapshot_version: u64,
phase_id: PhaseId,
}
impl CPRSnapshotGuard {
pub fn new(snapshot_mgr: Option<Arc<CPRSnapShotMgr>>) -> Result<Self, ()> {
match snapshot_mgr {
None => Ok(Self {
snapshot_mgr: None,
thread_slot_id: INVALID_SNAPSHOT_THREAD_ID,
snapshot_version: INVALID_SNAPSHOT_VERSION,
phase_id: PhaseId::Rest,
}),
Some(ref mgr) => {
let (thread_slot_id, snapshot_version, phase_id) = mgr.reserve_thread_slot()?;
Ok(Self {
snapshot_mgr: Some(mgr.clone()),
thread_slot_id,
snapshot_version,
phase_id,
})
}
}
}
pub fn snapshot_version(&self) -> u64 {
self.snapshot_version
}
pub fn get_local_phase_id(&self) -> PhaseId {
self.phase_id
}
pub fn is_protected(&self) -> bool {
self.thread_slot_id != INVALID_SNAPSHOT_THREAD_ID
}
pub fn snapshot_base_page(&self, id: PageID, ptr: &[u8], size: usize) {
self.snapshot_mgr
.as_ref()
.unwrap()
.snapshot_base_page(id, ptr, size, self.thread_slot_id);
}
pub fn snapshot_mini_page(&self, id: PageID, ptr: &[u8], size: usize) {
self.snapshot_mgr
.as_ref()
.unwrap()
.snapshot_mini_page(id, ptr, size, self.thread_slot_id);
}
pub fn snapshot_inner_node(&self, ptr: *const InnerNode) {
self.snapshot_mgr
.as_ref()
.unwrap()
.snapshot_inner_node(ptr, self.thread_slot_id);
}
pub fn snapshot_root_page(&self, root_id: PageID) {
self.snapshot_mgr
.as_ref()
.unwrap()
.snapshot_root_page(root_id);
}
}
impl Drop for CPRSnapshotGuard {
fn drop(&mut self) {
if let Some(ref mgr) = self.snapshot_mgr {
mgr.release_thread_slot(self.thread_slot_id);
}
}
}
impl CPRSnapShotMgr {
pub fn are_all_threads_in_next_version(&self) -> bool {
if !self.snapshot_in_progress.load(Ordering::Acquire) {
false
} else {
let global_phase_id = self.get_global_phase_id();
global_phase_id == PhaseId::Sweep || global_phase_id == PhaseId::Rest
}
}
pub fn new(version: u64) -> Self {
let vfs: Arc<dyn VfsImpl> = make_vfs(&StorageBackend::Memory, ":memory:");
Self {
global_state: AtomicU64::new(Self::new_snapshot_state(0, version)), thread_slots: [const { AtomicBool::new(false) }; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
thread_local_states: [const { AtomicU64::new(INVALID_SNAPSHOT_STATE) };
DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
thread_local_inner_mappings: UnsafeCell::new(
[const { Vec::new() }; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
),
thread_local_base_mappings: UnsafeCell::new(
[const { Vec::new() }; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
),
thread_local_mini_mappings: UnsafeCell::new(
[const { Vec::new() }; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
),
thread_local_mini_size_mappings: UnsafeCell::new(
[const { Vec::new() }; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
),
root_id: AtomicU64::new(0),
pause_snapshot: AtomicBool::new(false),
vfs: RwLock::new(vfs),
snapshot_in_progress: AtomicBool::new(false),
}
}
fn reset(&self) {
let local_inner_mappings = unsafe { &mut *self.thread_local_inner_mappings.get() };
let local_mini_mappings = unsafe { &mut *self.thread_local_mini_mappings.get() };
let local_base_mappings = unsafe { &mut *self.thread_local_base_mappings.get() };
let local_mini_size_mappings = unsafe { &mut *self.thread_local_mini_size_mappings.get() };
for thread_slot_id in 0..DEFAULT_MAX_SNAPSHOT_THREAD_NUM {
local_inner_mappings[thread_slot_id] = Vec::new();
local_mini_mappings[thread_slot_id] = Vec::new();
local_base_mappings[thread_slot_id] = Vec::new();
local_mini_size_mappings[thread_slot_id] = Vec::new();
}
self.root_id.store(0, Ordering::Release);
}
pub fn new_snapshot_state(phase_id: u64, version: u64) -> u64 {
assert!(
phase_id < SNAPSHOT_STATE_PHASE_NUM,
"Phase id must be less than {}",
SNAPSHOT_STATE_PHASE_NUM
);
assert!(
version < (1 << SNAPSHOT_STATE_PHASE_ID_SHIFT),
"Version must be less than 2^61"
);
(phase_id << SNAPSHOT_STATE_PHASE_ID_SHIFT) | version
}
fn get_global_version(&self) -> u64 {
self.global_state.load(Ordering::Acquire) & SNAPSHOT_STATE_VERSION_MASK
}
fn get_global_phase_id(&self) -> PhaseId {
PhaseId::from_raw(
(self.global_state.load(Ordering::Acquire) & SNAPSHOT_STATE_PHASE_ID_MASK)
>> SNAPSHOT_STATE_PHASE_ID_SHIFT,
)
}
fn get_local_state(&self, thread_slot_id: &usize) -> u64 {
self.thread_local_states[*thread_slot_id].load(Ordering::Acquire)
}
fn set_local_state(&self, thread_slot_id: &usize, state: u64) {
let current_state = self.get_local_state(thread_slot_id);
if current_state == state {
return;
}
self.thread_local_states[*thread_slot_id].store(state, Ordering::Release);
}
fn advance_global_state(&self) -> u64 {
let phase_id = self.get_global_phase_id();
let version = self.get_global_version();
match phase_id {
PhaseId::Rest => {
let new_state = Self::new_snapshot_state(PhaseId::Prepare.as_raw(), version);
self.global_state.store(new_state, Ordering::Release);
new_state
}
PhaseId::Prepare => {
let new_state = Self::new_snapshot_state(PhaseId::InProgress.as_raw(), version + 1);
self.global_state.store(new_state, Ordering::Release);
new_state
}
PhaseId::InProgress => {
let new_state = Self::new_snapshot_state(PhaseId::Sweep.as_raw(), version);
self.global_state.store(new_state, Ordering::Release);
new_state
}
PhaseId::Sweep => {
let new_state = Self::new_snapshot_state(PhaseId::Rest.as_raw(), version);
self.global_state.store(new_state, Ordering::Release);
new_state
}
}
}
fn check_if_phase_completed(&self, target_state: u64) -> bool {
for thread_slot_id in 0..DEFAULT_MAX_SNAPSHOT_THREAD_NUM {
let local_state = self.thread_local_states[thread_slot_id].load(Ordering::Acquire);
if local_state != INVALID_SNAPSHOT_STATE && local_state != target_state {
return false;
}
}
true
}
pub fn reserve_thread_slot(&self) -> Result<(usize, u64, PhaseId), ()> {
if self.pause_snapshot.load(Ordering::Acquire) {
return Err(());
}
let start = get_rng().random_range(0..DEFAULT_MAX_SNAPSHOT_THREAD_NUM);
let end = 2 * DEFAULT_MAX_SNAPSHOT_THREAD_NUM;
for i in start..end {
let tid = i % DEFAULT_MAX_SNAPSHOT_THREAD_NUM;
if self.thread_slots[tid]
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
let global_state = self.global_state.load(Ordering::Acquire);
self.set_local_state(&tid, global_state);
if self.get_local_state(&tid) != self.global_state.load(Ordering::Acquire)
|| self.pause_snapshot.load(Ordering::Acquire)
{
self.set_local_state(&tid, INVALID_SNAPSHOT_STATE);
assert!(self.thread_slots[tid]
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
.is_ok());
return Err(());
} else {
let version = global_state & SNAPSHOT_STATE_VERSION_MASK;
let phase_id = PhaseId::from_raw(
(global_state & SNAPSHOT_STATE_PHASE_ID_MASK)
>> SNAPSHOT_STATE_PHASE_ID_SHIFT,
);
return Ok((tid, version, phase_id));
}
}
}
Err(())
}
pub fn release_thread_slot(&self, thread_slot_id: usize) {
self.set_local_state(&thread_slot_id, INVALID_SNAPSHOT_STATE);
self.thread_slots[thread_slot_id].store(false, Ordering::Release);
}
pub fn get_snapshot_guard(
snapshot_mgr: Option<Arc<CPRSnapShotMgr>>,
) -> Result<CPRSnapshotGuard, ()> {
CPRSnapshotGuard::new(snapshot_mgr)
}
pub fn snapshot_page(&self, ptr: &[u8], size: usize) -> usize {
let vfs = self.vfs.read().unwrap().clone();
let offset = vfs.alloc_offset(size);
vfs.write(offset, ptr);
offset
}
pub fn snapshot_inner_node(&self, ptr: *const InnerNode, thread_slot_id: usize) {
let offset = unsafe { self.snapshot_page((&*ptr).as_slice(), INNER_NODE_SIZE) };
let inner_mappings = unsafe { &mut *self.thread_local_inner_mappings.get() };
inner_mappings[thread_slot_id].push((ptr, offset));
}
pub fn snapshot_mini_page(&self, id: PageID, ptr: &[u8], size: usize, thread_slot_id: usize) {
let offset = if size != 0 {
self.snapshot_page(ptr, size)
} else {
NULL_PAGE_LOCATION_OFFSET
};
let mini_mappings = unsafe { &mut *self.thread_local_mini_mappings.get() };
mini_mappings[thread_slot_id].push((id, offset));
let mini_size_mappings = unsafe { &mut *self.thread_local_mini_size_mappings.get() };
mini_size_mappings[thread_slot_id].push((id, size));
assert!(mini_mappings[thread_slot_id].len() == mini_size_mappings[thread_slot_id].len());
}
pub fn snapshot_base_page(&self, id: PageID, ptr: &[u8], size: usize, thread_slot_id: usize) {
let offset = self.snapshot_page(ptr, size);
let base_mappings = unsafe { &mut *self.thread_local_base_mappings.get() };
base_mappings[thread_slot_id].push((id, offset));
}
pub fn snapshot_root_page(&self, root_id: PageID) {
let cur_root_id = self.root_id.load(Ordering::Acquire);
if cur_root_id != 0 {
assert_eq!(cur_root_id, root_id.raw());
}
self.root_id.store(root_id.raw(), Ordering::Release);
}
fn sweep(
&self,
tree: &BfTree,
version: u64,
inner_mapping: &mut Vec<(*const InnerNode, usize)>,
mini_mapping: &mut Vec<(PageID, usize)>,
mini_size_mapping: &mut Vec<(PageID, usize)>,
base_mapping: &mut Vec<(PageID, usize)>,
) -> usize {
self.pause_snapshot.store(true, Ordering::Release);
loop {
if self.check_if_phase_completed(INVALID_SNAPSHOT_STATE) {
loop {
let root_id = tree.get_root_page();
let rid = root_id.0;
if root_id.1 {
let mut leaf = tree.mapping_table().get(&rid);
let page_loc = leaf.get_page_location();
match page_loc {
PageLocation::Base(offset) => {
let base_ref = leaf.load_base_page(*offset);
if base_ref.get_clean_snapshot_version() < version {
let base_ptr = unsafe {
std::slice::from_raw_parts(
base_ref as *const LeafNode as *const u8,
base_ref.meta.node_size as usize,
)
};
let offset = self
.snapshot_page(base_ptr, base_ref.meta.node_size as usize);
base_mapping.push((rid, offset));
self.snapshot_root_page(rid);
}
}
PageLocation::Mini(ptr) => {
assert!(tree.cache_only);
let mini_ref = leaf.load_cache_page(*ptr);
if mini_ref.get_clean_snapshot_version() < version {
let mini_ptr = unsafe {
std::slice::from_raw_parts(
mini_ref as *const LeafNode as *const u8,
mini_ref.meta.node_size as usize,
)
};
let offset = self
.snapshot_page(mini_ptr, mini_ref.meta.node_size as usize);
mini_mapping.push((rid, offset));
mini_size_mapping.push((rid, mini_ref.meta.node_size as usize));
self.snapshot_root_page(rid);
}
}
_ => {
panic!("Unexpected page location for root page: {:?}", page_loc);
}
}
break;
} else {
let ptr = rid.as_inner_node();
let inner = match ReadGuard::try_read(ptr) {
Ok(inner) => inner,
Err(_) => continue,
};
if inner.as_ref().get_clean_snapshot_version() < version {
let offset =
unsafe { self.snapshot_page((&*ptr).as_slice(), INNER_NODE_SIZE) };
inner_mapping.push((ptr, offset));
self.snapshot_root_page(rid);
}
break;
}
}
let visitor = BfsVisitor::new_inner_only(tree);
for node in visitor {
loop {
match node {
NodeInfo::Inner { ptr, .. } => {
let inner = match ReadGuard::try_read(ptr) {
Ok(inner) => inner,
Err(_) => continue,
};
if inner.as_ref().get_clean_snapshot_version() < version {
let offset = unsafe {
self.snapshot_page((&*ptr).as_slice(), INNER_NODE_SIZE)
};
inner_mapping.push((ptr, offset));
}
break;
}
NodeInfo::Leaf { level, .. } => {
assert_eq!(level, 0);
break;
}
}
}
}
break;
}
thread::sleep(std::time::Duration::from_secs(1));
#[cfg(all(feature = "shuttle", test))]
shuttle::thread::yield_now();
}
self.pause_snapshot.store(false, Ordering::Release);
let page_table_iter = tree.storage.page_table.iter();
let mut enumerate_leaf_count = 0;
for (_, pid) in page_table_iter {
assert!(pid.is_id());
let mut leaf = tree.mapping_table().get(&pid);
let page_loc = leaf.get_page_location().clone();
enumerate_leaf_count += 1;
match page_loc {
PageLocation::Base(offset) => {
let base_ref = leaf.load_base_page(offset);
if base_ref.get_clean_snapshot_version() < version {
let base_ptr = unsafe {
std::slice::from_raw_parts(
base_ref as *const LeafNode as *const u8,
base_ref.meta.node_size as usize,
)
};
let new_offset =
self.snapshot_page(base_ptr, base_ref.meta.node_size as usize);
base_mapping.push((pid, new_offset));
}
}
PageLocation::Full(ptr) => {
let full_ref = leaf.load_cache_page(ptr);
if full_ref.get_clean_snapshot_version() < version {
let next_level = full_ref.next_level;
let full_page = unsafe { &mut *ptr };
full_page.next_level = MiniPageNextLevel::new_null();
let full_ptr = unsafe {
std::slice::from_raw_parts(
full_ref as *const LeafNode as *const u8,
full_ref.meta.node_size as usize,
)
};
let offset = self.snapshot_page(full_ptr, full_ref.meta.node_size as usize);
full_page.next_level = next_level;
base_mapping.push((pid, offset));
}
}
PageLocation::Mini(ptr) => {
let mini_ref = leaf.load_cache_page(ptr);
if mini_ref.get_clean_snapshot_version() < version {
let mini_ptr = unsafe {
std::slice::from_raw_parts(
mini_ref as *const LeafNode as *const u8,
mini_ref.meta.node_size as usize,
)
};
let offset = self.snapshot_page(mini_ptr, mini_ref.meta.node_size as usize);
mini_mapping.push((pid, offset));
mini_size_mapping.push((pid, mini_ref.meta.node_size as usize));
if !tree.cache_only {
let base_ref = leaf.load_base_page(mini_ref.next_level.as_offset());
assert!(base_ref.get_clean_snapshot_version() < version);
let base_ptr = unsafe {
std::slice::from_raw_parts(
base_ref as *const LeafNode as *const u8,
base_ref.meta.node_size as usize,
)
};
let offset =
self.snapshot_page(base_ptr, base_ref.meta.node_size as usize);
base_mapping.push((pid, offset));
}
}
}
PageLocation::Null => {
assert!(tree.cache_only);
mini_mapping.push((pid, NULL_PAGE_LOCATION_OFFSET)); mini_size_mapping.push((pid, 0));
}
}
}
enumerate_leaf_count
}
#[allow(clippy::too_many_arguments)]
fn finalize(
&self,
snapshot_version: u64,
inner_mapping: &mut [(*const InnerNode, usize)],
mini_mapping: &mut [(PageID, usize)],
mini_size_mapping: &mut [(PageID, usize)],
base_mapping: &mut [(PageID, usize)],
leaf_count_upper: usize,
config: Arc<Config>,
) {
let mut inner_mapping_unique: HashMap<*const InnerNode, usize> = HashMap::new();
let mut mini_mapping_unique: HashMap<PageID, usize> = HashMap::new();
let mut mini_size_mapping_unique: HashMap<PageID, usize> = HashMap::new();
let mut base_mapping_unique: HashMap<PageID, usize> = HashMap::new();
let local_inner_mappings = unsafe { &mut *self.thread_local_inner_mappings.get() };
let local_mini_mappings = unsafe { &mut *self.thread_local_mini_mappings.get() };
let local_mini_size_mappings = unsafe { &mut *self.thread_local_mini_size_mappings.get() };
let local_base_mappings = unsafe { &mut *self.thread_local_base_mappings.get() };
for thread_slot_id in 0..DEFAULT_MAX_SNAPSHOT_THREAD_NUM {
let entry_num = local_mini_mappings[thread_slot_id].len();
assert!(
local_mini_mappings[thread_slot_id].len()
== local_mini_size_mappings[thread_slot_id].len()
);
for i in 0..entry_num {
assert!(
local_mini_mappings[thread_slot_id][i].0
== local_mini_size_mappings[thread_slot_id][i].0
);
if local_mini_mappings[thread_slot_id][i].1 == NULL_PAGE_LOCATION_OFFSET {
assert_eq!(local_mini_size_mappings[thread_slot_id][i].1, 0);
} else {
assert!(local_mini_size_mappings[thread_slot_id][i].1 > 0);
}
if let std::collections::hash_map::Entry::Vacant(e) =
mini_mapping_unique.entry(local_mini_mappings[thread_slot_id][i].0)
{
e.insert(local_mini_mappings[thread_slot_id][i].1);
mini_size_mapping_unique.insert(
local_mini_size_mappings[thread_slot_id][i].0,
local_mini_size_mappings[thread_slot_id][i].1,
);
assert!(mini_mapping_unique.len() == mini_size_mapping_unique.len());
}
}
assert_eq!(local_mini_mappings[thread_slot_id].len(), entry_num);
assert_eq!(local_mini_size_mappings[thread_slot_id].len(), entry_num);
}
for thread_slot_id in 0..DEFAULT_MAX_SNAPSHOT_THREAD_NUM {
for m in local_inner_mappings[thread_slot_id].iter() {
inner_mapping_unique.entry(m.0).or_insert(m.1);
}
for m in local_base_mappings[thread_slot_id].iter() {
base_mapping_unique.entry(m.0).or_insert(m.1);
}
}
assert!(mini_mapping_unique.len() == mini_size_mapping_unique.len());
for (k, v) in mini_mapping_unique.iter() {
assert!(mini_size_mapping_unique.contains_key(k));
if *v == NULL_PAGE_LOCATION_OFFSET {
assert_eq!(mini_size_mapping_unique.get(k).copied().unwrap(), 0);
} else {
assert!(mini_size_mapping_unique.get(k).copied().unwrap() > 0);
}
}
for (k, v) in inner_mapping.iter() {
if !inner_mapping_unique.contains_key(k) {
inner_mapping_unique.insert(*k, *v);
}
}
for (k, v) in mini_mapping.iter() {
if !mini_mapping_unique.contains_key(k) {
mini_mapping_unique.insert(*k, *v);
}
}
for (k, v) in mini_size_mapping.iter() {
if !mini_size_mapping_unique.contains_key(k) {
mini_size_mapping_unique.insert(*k, *v);
} else {
if !config.cache_only {
assert!(*v == mini_size_mapping_unique.get(k).copied().unwrap());
}
}
}
for (k, v) in base_mapping.iter() {
if !base_mapping_unique.contains_key(k) {
base_mapping_unique.insert(*k, *v);
}
}
assert!(mini_mapping_unique.len() == mini_size_mapping_unique.len());
for (k, v) in mini_mapping_unique.iter() {
assert!(mini_size_mapping_unique.contains_key(k));
if *v == NULL_PAGE_LOCATION_OFFSET {
assert_eq!(mini_size_mapping_unique.get(k).copied().unwrap(), 0);
} else {
assert!(mini_size_mapping_unique.get(k).copied().unwrap() > 0);
}
}
if config.cache_only {
assert!(base_mapping_unique.is_empty());
} else {
assert!(mini_mapping_unique.len() <= base_mapping_unique.len());
}
let mut final_inner_mapping: Vec<(*const InnerNode, usize)> = Vec::new();
for (k, v) in inner_mapping_unique.into_iter() {
final_inner_mapping.push((k, v));
}
let mut sorted_base_mapping_uninit: Vec<MaybeUninit<(PageID, usize)>> =
Vec::with_capacity(base_mapping_unique.len());
unsafe {
sorted_base_mapping_uninit.set_len(base_mapping_unique.len());
}
let mut sorted_base_mapping_init = vec![false; base_mapping_unique.len()];
for (k, v) in base_mapping_unique.iter() {
assert!(k.is_id());
let offset = k.as_id();
assert!((offset as usize) < sorted_base_mapping_uninit.len());
sorted_base_mapping_init[offset as usize] = true;
sorted_base_mapping_uninit[offset as usize].write((*k, *v));
}
let final_sorted_base_mapping: Vec<(PageID, usize)> = if !config.cache_only {
assert_eq!(
base_mapping_unique.len(),
sorted_base_mapping_init.iter().filter(|&&b| b).count()
);
unsafe {
std::mem::transmute::<
std::vec::Vec<std::mem::MaybeUninit<(PageID, usize)>>,
std::vec::Vec<(PageID, usize)>,
>(sorted_base_mapping_uninit)
}
} else {
Vec::new()
};
let mut final_mini_mapping: Vec<(PageID, usize)> = Vec::new();
for (k, v) in mini_mapping_unique.into_iter() {
final_mini_mapping.push((k, v));
}
let mut final_mini_size_mapping: Vec<(PageID, usize)> = Vec::new();
for (k, v) in mini_size_mapping_unique.into_iter() {
final_mini_size_mapping.push((k, v));
}
let leaf_page_num = if config.cache_only {
leaf_count_upper
} else {
assert!(leaf_count_upper >= final_sorted_base_mapping.len());
final_sorted_base_mapping.len()
};
let mut file_size = std::mem::size_of::<BfTreeMeta>() as u64;
let (inner_offset, inner_size) =
serialize_vec_to_disk(&final_inner_mapping, &self.vfs.read().unwrap());
if inner_offset != 0 {
file_size = (inner_offset + align_to_sector_size(inner_size)) as u64;
}
let (mini_offset, mini_size) =
serialize_vec_to_disk(&final_mini_mapping, &self.vfs.read().unwrap());
if mini_offset != 0 {
file_size = (mini_offset + align_to_sector_size(mini_size)) as u64;
}
let (mini_size_offset, mini_size_size) =
serialize_vec_to_disk(&final_mini_size_mapping, &self.vfs.read().unwrap());
if mini_size_offset != 0 {
file_size = (mini_size_offset + align_to_sector_size(mini_size_size)) as u64;
}
let (base_offset, base_size) =
serialize_vec_to_disk(&final_sorted_base_mapping, &self.vfs.read().unwrap());
if base_offset != 0 {
file_size = (base_offset + align_to_sector_size(base_size)) as u64;
}
let metadata = BfTreeMeta {
magic_begin: *BF_TREE_MAGIC_BEGIN,
root_id: unsafe { PageID::from_raw(self.root_id.load(Ordering::Acquire)) },
inner_offset,
inner_size,
mini_offset,
mini_size,
mini_size_offset,
mini_size_size,
base_offset,
base_size,
file_size,
leaf_page_num,
snapshot_version,
cache_only: config.cache_only,
cb_size_byte: config.cb_size_byte,
read_promotion_rate: config.read_promotion_rate.load(Ordering::Relaxed),
scan_promotion_rate: config.scan_promotion_rate.load(Ordering::Relaxed),
cb_min_record_size: config.cb_min_record_size,
cb_max_record_size: config.cb_max_record_size,
leaf_page_size: config.leaf_page_size,
cb_max_key_len: config.cb_max_key_len,
max_fence_len: config.max_fence_len,
cb_copy_on_access_ratio: config.cb_copy_on_access_ratio,
read_record_cache: config.read_record_cache,
max_mini_page_size: config.max_mini_page_size,
mini_page_binary_search: config.mini_page_binary_search,
write_load_full_page: config.write_load_full_page,
magic_end: *BF_TREE_MAGIC_END,
};
let vfs = self.vfs.read().unwrap();
vfs.write(META_DATA_PAGE_OFFSET, metadata.as_slice());
vfs.flush();
self.reset();
}
pub fn snapshot(&self, tree: &BfTree, snapshot_file_path: impl AsRef<Path>) {
if self
.snapshot_in_progress
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_err()
{
println!("Another snapshot is in progress, skipping this snapshot request.");
return;
}
let mut vfs_guard = self.vfs.write().unwrap();
let snapshot_vfs = make_vfs(&tree.config.snapshot_backend, snapshot_file_path);
let old_vfs = std::mem::replace(&mut *vfs_guard, snapshot_vfs);
drop(old_vfs);
vfs_guard.reset();
drop(vfs_guard);
let mut sweep_inner_mapping: Vec<(*const InnerNode, usize)> = Vec::new();
let mut sweep_mini_mapping: Vec<(PageID, usize)> = Vec::new();
let mut sweep_mini_size_mapping: Vec<(PageID, usize)> = Vec::new();
let mut sweep_base_mapping: Vec<(PageID, usize)> = Vec::new();
let mut current_global_phase_id = self.get_global_phase_id();
assert_eq!(current_global_phase_id, PhaseId::Rest);
let snapshot_version = self.get_global_version();
let mut current_global_state = self.advance_global_state();
current_global_phase_id = self.get_global_phase_id();
assert_eq!(current_global_phase_id, PhaseId::Prepare);
assert_eq!(snapshot_version, self.get_global_version());
let mut leaf_node_count_upper_bound = 0;
loop {
if self.check_if_phase_completed(current_global_state) {
match current_global_phase_id {
PhaseId::Rest => {
self.finalize(
snapshot_version,
&mut sweep_inner_mapping,
&mut sweep_mini_mapping,
&mut sweep_mini_size_mapping,
&mut sweep_base_mapping,
leaf_node_count_upper_bound,
tree.config.clone(),
);
let mut vfs_guard = self.vfs.write().unwrap();
let snapshot_vfs = make_vfs(&StorageBackend::Memory, ":memory:");
let old_vfs = std::mem::replace(&mut *vfs_guard, snapshot_vfs);
drop(old_vfs);
drop(vfs_guard);
break;
}
PhaseId::Prepare => {
current_global_state = self.advance_global_state();
current_global_phase_id = self.get_global_phase_id();
assert_eq!(current_global_phase_id, PhaseId::InProgress);
assert_eq!(snapshot_version + 1, self.get_global_version());
}
PhaseId::InProgress => {
current_global_state = self.advance_global_state();
current_global_phase_id = self.get_global_phase_id();
assert_eq!(current_global_phase_id, PhaseId::Sweep);
assert_eq!(snapshot_version + 1, self.get_global_version());
}
PhaseId::Sweep => {
leaf_node_count_upper_bound = self.sweep(
tree,
snapshot_version + 1,
&mut sweep_inner_mapping,
&mut sweep_mini_mapping,
&mut sweep_mini_size_mapping,
&mut sweep_base_mapping,
);
current_global_state = self.advance_global_state();
current_global_phase_id = self.get_global_phase_id();
assert_eq!(current_global_phase_id, PhaseId::Rest);
assert_eq!(snapshot_version + 1, self.get_global_version());
}
}
}
thread::sleep(std::time::Duration::from_secs(1));
#[cfg(all(feature = "shuttle", test))]
shuttle::thread::yield_now();
}
assert!(self
.snapshot_in_progress
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
.is_ok());
}
pub fn new_from_snapshot(
recovery_snapshot_file_path: impl AsRef<Path>, use_snapshot: bool,
buffer_ptr: Option<*mut u8>,
buffer_size: Option<usize>, wal_config: Option<Arc<WalConfig>>,
) -> Result<BfTree, ConfigError> {
if !recovery_snapshot_file_path.as_ref().exists() {
return Err(ConfigError::SnapshotFileInvalid(
"Not found ".to_string() + recovery_snapshot_file_path.as_ref().to_str().unwrap(),
));
}
let wal = wal_config.as_ref().map(|s| WriteAheadLog::new(s.clone()));
let reader = std::fs::File::open(recovery_snapshot_file_path.as_ref()).unwrap();
let mut metadata = SectorAlignedVector::new_zeroed(DISK_PAGE_SIZE); #[cfg(unix)]
{
reader.read_at(&mut metadata, 0).unwrap();
}
#[cfg(windows)]
{
reader.seek_read(&mut metadata, 0).unwrap();
}
let bf_meta = unsafe { (metadata.as_ptr() as *const BfTreeMeta).read() };
bf_meta.check_magic();
assert_eq!(reader.metadata().unwrap().len(), bf_meta.file_size);
let mut bf_tree_config = Config::new_from_snapshot(&bf_meta);
let recovery_snapshot_file_backend = StorageBackend::Std; if !bf_tree_config.cache_only {
bf_tree_config.file_path(recovery_snapshot_file_path.as_ref());
bf_tree_config.storage_backend = recovery_snapshot_file_backend.clone();
} else {
bf_tree_config.storage_backend = StorageBackend::Memory;
}
bf_tree_config.use_snapshot = use_snapshot;
bf_tree_config.snapshot_backend = StorageBackend::Std;
let snapshot_mgr = if bf_tree_config.use_snapshot {
Some(Arc::new(CPRSnapShotMgr::new(
bf_tree_config.snapshot_version,
)))
} else {
None
};
if let Some(size) = buffer_size {
bf_tree_config.cb_size_byte = size
}
let size_classes = BfTree::create_mem_page_size_classes(
bf_tree_config.cb_min_record_size,
bf_tree_config.cb_max_record_size,
bf_tree_config.leaf_page_size,
bf_tree_config.max_fence_len,
bf_tree_config.cache_only,
);
bf_tree_config.write_ahead_log = wal_config.clone();
bf_tree_config.validate()?;
let config = Arc::new(bf_tree_config);
let recovery_snapshot_vfs = make_vfs(
&recovery_snapshot_file_backend,
recovery_snapshot_file_path.as_ref(),
);
let mut root_page_id = bf_meta.root_id;
let mut inner_node_page_buffer = SectorAlignedVector::new_zeroed(INNER_NODE_SIZE);
if root_page_id.is_inner_node_pointer() {
let inner_mapping: Vec<(*const InnerNode, usize)> = read_vec_from_offset(
bf_meta.inner_offset,
bf_meta.inner_size,
&recovery_snapshot_vfs,
);
let mut root_cnt = 0;
for (_ptr, offset) in &inner_mapping {
recovery_snapshot_vfs.read(*offset, &mut inner_node_page_buffer);
let inner_node = InnerNodeBuilder::new().build_from_slice(&inner_node_page_buffer);
if unsafe { (*inner_node).is_root() } {
root_cnt += 1;
}
InnerNode::free_node(inner_node);
}
assert_eq!(root_cnt, 1, "Root count in inner mapping: {}", root_cnt);
let mut inner_map = HashMap::new();
for m in inner_mapping {
inner_map.insert(m.0, m.1);
}
let offset = inner_map.get(&root_page_id.as_inner_node()).unwrap();
recovery_snapshot_vfs.read(*offset, &mut inner_node_page_buffer);
let root_page = InnerNodeBuilder::new().build_from_slice(&inner_node_page_buffer);
unsafe {
(*root_page).set_disk_offset(INVALID_DISK_OFFSET as u64);
}
unsafe {
(*root_page).set_root(true);
}
root_page_id = PageID::from_pointer(root_page);
let mut inner_resolve_queue = VecDeque::from([root_page]);
while !inner_resolve_queue.is_empty() {
let inner_ptr = inner_resolve_queue.pop_front().unwrap();
let mut inner = ReadGuard::try_read(inner_ptr).unwrap().upgrade().unwrap();
if inner.as_ref().meta.children_is_leaf() {
continue;
}
for (idx, c) in inner.as_ref().get_child_iter().enumerate() {
let offset = inner_map.get(&c.as_inner_node()).unwrap();
recovery_snapshot_vfs.read(*offset, &mut inner_node_page_buffer);
let inner_page =
InnerNodeBuilder::new().build_from_slice(&inner_node_page_buffer);
unsafe {
(*inner_page).set_disk_offset(INVALID_DISK_OFFSET as u64);
}
let inner_id = PageID::from_pointer(inner_page);
inner.as_mut().update_at_pos(idx, inner_id);
inner_resolve_queue.push_back(inner_page);
}
}
}
let raw_root_id = if root_page_id.is_id() {
root_page_id.raw() | BfTree::ROOT_IS_LEAF_MASK
} else {
root_page_id.raw()
};
if !bf_meta.cache_only {
let base_mapping: Vec<(PageID, usize)> = if bf_meta.base_size > 0 {
read_vec_from_offset(
bf_meta.base_offset,
bf_meta.base_size,
&recovery_snapshot_vfs,
)
} else {
Vec::new()
};
let base_page_loc_mapping = base_mapping.into_iter().map(|(pid, offset)| {
let loc = PageLocation::Base(offset);
(pid, loc)
});
let pt = PageTable::new_from_mapping(
base_page_loc_mapping,
recovery_snapshot_vfs.clone(),
config.clone(),
snapshot_mgr.clone(),
);
let circular_buffer = CircularBuffer::new(
config.cb_size_byte,
config.cb_copy_on_access_ratio,
config.cb_min_record_size,
config.cb_max_record_size,
config.leaf_page_size,
config.max_fence_len,
buffer_ptr,
config.cache_only,
);
let mini_mapping: Vec<(PageID, usize)> = if bf_meta.mini_size > 0 {
read_vec_from_offset(
bf_meta.mini_offset,
bf_meta.mini_size,
&recovery_snapshot_vfs,
)
} else {
Vec::new()
};
let mini_size_mapping: Vec<(PageID, usize)> = if bf_meta.mini_size_size > 0 {
read_vec_from_offset(
bf_meta.mini_size_offset,
bf_meta.mini_size_size,
&recovery_snapshot_vfs,
)
} else {
Vec::new()
};
let mut mini_size_mapping_unique: HashMap<PageID, usize> = HashMap::new();
for (pid, size) in mini_size_mapping {
mini_size_mapping_unique.insert(pid, size);
}
let storage =
LeafStorage::new_inner(config.clone(), pt, circular_buffer, recovery_snapshot_vfs);
for (pid, offset) in &mini_mapping {
let mini_size = *mini_size_mapping_unique.get(pid).unwrap();
let mini_page_guard = match storage.alloc_mini_page(mini_size) {
Ok(mini_page_ptr) => mini_page_ptr,
Err(_) => {
return Err(ConfigError::CircularBufferSize("buffer size set too small. Consider increasing it or not specifying at all".to_string()));
}
};
let mut page_buffer = SectorAlignedVector::new_zeroed(mini_size);
storage.vfs.read(*offset, &mut page_buffer);
unsafe {
std::ptr::copy_nonoverlapping(
page_buffer.as_ptr(),
mini_page_guard.as_ptr(),
mini_size,
);
}
let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
let mini_page = unsafe { &mut *new_mini_ptr };
let mut base_page = storage.page_table.get_mut(pid);
let page_loc = base_page.get_page_location().clone();
match page_loc {
PageLocation::Base(off) => {
mini_page.next_level = MiniPageNextLevel::new(off);
}
_ => {
panic!("Unexpected page location for base page");
}
}
let mini_loc = PageLocation::Mini(new_mini_ptr);
base_page.create_cache_page_loc(mini_loc);
}
Ok(BfTree {
storage,
root_page_id: AtomicU64::new(raw_root_id),
wal,
write_load_full_page: config.write_load_full_page,
cache_only: false,
mini_page_size_classes: size_classes,
snapshot_mgr,
config,
#[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
metrics_recorder: Some(Arc::new(ThreadLocal::new())),
})
} else {
let mini_mapping_unallocated: Vec<(PageID, PageLocation)> = (0..bf_meta.leaf_page_num)
.map(|pid| (PageID::from_id(pid as u64), PageLocation::Null))
.collect();
let mini_mapping: Vec<(PageID, usize)> = if bf_meta.mini_size > 0 {
read_vec_from_offset(
bf_meta.mini_offset,
bf_meta.mini_size,
&recovery_snapshot_vfs,
)
} else {
Vec::new()
};
let mini_size_mapping: Vec<(PageID, usize)> = if bf_meta.mini_size_size > 0 {
read_vec_from_offset(
bf_meta.mini_size_offset,
bf_meta.mini_size_size,
&recovery_snapshot_vfs,
)
} else {
Vec::new()
};
let mut mini_size_mapping_unique: HashMap<PageID, usize> = HashMap::new();
for (pid, size) in mini_size_mapping {
mini_size_mapping_unique.insert(pid, size);
}
let storage_vfs = make_vfs(&config.storage_backend, PathBuf::new());
let pt = PageTable::new_from_mapping(
mini_mapping_unallocated.into_iter(),
storage_vfs.clone(),
config.clone(),
snapshot_mgr.clone(),
);
let circular_buffer = CircularBuffer::new(
config.cb_size_byte,
config.cb_copy_on_access_ratio,
config.cb_min_record_size,
config.cb_max_record_size,
config.leaf_page_size,
config.max_fence_len,
buffer_ptr,
config.cache_only,
);
let storage = LeafStorage::new_inner(config.clone(), pt, circular_buffer, storage_vfs);
for (pid, offset) in &mini_mapping {
let mini_size = *mini_size_mapping_unique.get(pid).unwrap();
if *offset == NULL_PAGE_LOCATION_OFFSET {
continue;
}
let mini_page_guard = match storage.alloc_mini_page(mini_size) {
Ok(mini_page_ptr) => mini_page_ptr,
Err(_) => {
panic!("Please increase cb_size_byte in config");
}
};
let mut page_buffer = SectorAlignedVector::new_zeroed(mini_size);
recovery_snapshot_vfs.read(*offset, &mut page_buffer);
unsafe {
std::ptr::copy_nonoverlapping(
page_buffer.as_ptr(),
mini_page_guard.as_ptr(),
mini_size,
);
}
let mini_page_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
let mini_page = unsafe { &mut *mini_page_ptr };
mini_page.next_level = MiniPageNextLevel::new_null();
let mut null_page = storage.page_table.get_mut(pid);
let page_loc = null_page.get_page_location().clone();
match page_loc {
PageLocation::Null => {
let mini_loc = PageLocation::Mini(mini_page_ptr);
null_page.create_cache_page_loc(mini_loc);
}
_ => {
panic!("Unexpected page location for null page");
}
}
}
Ok(BfTree {
storage,
root_page_id: AtomicU64::new(raw_root_id),
wal,
write_load_full_page: config.write_load_full_page,
cache_only: true,
mini_page_size_classes: size_classes,
snapshot_mgr,
config,
#[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
metrics_recorder: Some(Arc::new(ThreadLocal::new())),
})
}
}
}
impl BfTree {
pub fn recovery(
recovery_snapshot_file_path: PathBuf, wal_file: impl AsRef<Path>,
use_snapshot: bool,
buffer_ptr: Option<*mut u8>,
buffer_size: Option<usize>,
wal: Option<Arc<WalConfig>>,
) {
let bf_tree = BfTree::new_from_cpr_snapshot(
recovery_snapshot_file_path,
use_snapshot,
buffer_ptr,
buffer_size,
wal,
)
.unwrap();
let wal_reader = WalReader::new(wal_file, 4096);
for seg in wal_reader.segment_iter() {
for entry in seg.entry_iter() {
let log_entry = LogEntry::read_from_buffer(entry.1);
match log_entry {
LogEntry::Write(op) => {
bf_tree.insert(op.key, op.value);
}
LogEntry::Split(_op) => {
todo!("implement split op in wal!")
}
}
}
}
}
pub fn cpr_snapshot(&self, snapshot_file_path: impl AsRef<Path>) {
if !self.config.use_snapshot {
panic!("Snapshots are not enabled in the configuration");
}
let snpshot_mgr = self.snapshot_mgr.clone().unwrap();
snpshot_mgr.snapshot(self, snapshot_file_path);
}
pub fn new_from_cpr_snapshot(
recovery_snapshot_file_path: impl AsRef<Path>, use_snapshot: bool,
buffer_ptr: Option<*mut u8>,
buffer_size: Option<usize>,
wal: Option<Arc<WalConfig>>,
) -> Result<BfTree, ConfigError> {
CPRSnapShotMgr::new_from_snapshot(
recovery_snapshot_file_path,
use_snapshot,
buffer_ptr,
buffer_size,
wal,
)
}
pub fn are_all_threads_in_next_snapshot_version(&self) -> bool {
if let Some(snapshot_mgr) = &self.snapshot_mgr {
return snapshot_mgr.are_all_threads_in_next_version();
}
false
}
}
struct SectorAlignedVector {
inner: ManuallyDrop<Vec<u8>>,
}
impl Drop for SectorAlignedVector {
fn drop(&mut self) {
let layout =
std::alloc::Layout::from_size_align(self.inner.capacity(), SECTOR_SIZE).unwrap();
let ptr = self.inner.as_mut_ptr();
unsafe {
std::alloc::dealloc(ptr, layout);
}
}
}
impl SectorAlignedVector {
fn new_zeroed(capacity: usize) -> Self {
let layout = std::alloc::Layout::from_size_align(capacity, SECTOR_SIZE).unwrap();
let ptr = unsafe { std::alloc::alloc_zeroed(layout) };
let inner = unsafe { Vec::from_raw_parts(ptr, capacity, capacity) };
Self {
inner: ManuallyDrop::new(inner),
}
}
}
impl Deref for SectorAlignedVector {
type Target = Vec<u8>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl DerefMut for SectorAlignedVector {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
#[repr(C, align(512))]
pub(crate) struct BfTreeMeta {
magic_begin: [u8; 16],
root_id: PageID,
inner_offset: usize,
inner_size: usize,
mini_offset: usize,
mini_size: usize,
mini_size_offset: usize,
mini_size_size: usize,
base_offset: usize,
base_size: usize,
file_size: u64,
leaf_page_num: usize,
pub(crate) cb_size_byte: usize,
pub(crate) snapshot_version: u64,
pub(crate) cache_only: bool,
pub(crate) read_promotion_rate: usize,
pub(crate) scan_promotion_rate: usize,
pub(crate) cb_min_record_size: usize,
pub(crate) cb_max_record_size: usize,
pub(crate) leaf_page_size: usize,
pub(crate) cb_max_key_len: usize,
pub(crate) max_fence_len: usize,
pub(crate) cb_copy_on_access_ratio: f64,
pub(crate) read_record_cache: bool,
pub(crate) max_mini_page_size: usize,
pub(crate) mini_page_binary_search: bool,
pub(crate) write_load_full_page: bool,
magic_end: [u8; 14],
}
const _: () = assert!(std::mem::size_of::<BfTreeMeta>() <= DISK_PAGE_SIZE);
impl BfTreeMeta {
fn as_slice(&self) -> &[u8] {
let ptr = self as *const Self as *const u8;
let size = std::mem::size_of::<Self>();
unsafe { std::slice::from_raw_parts(ptr, size) }
}
fn check_magic(&self) {
assert_eq!(self.magic_begin, *BF_TREE_MAGIC_BEGIN);
assert_eq!(self.magic_end, *BF_TREE_MAGIC_END);
}
}
fn serialize_vec_to_disk<T>(v: &[T], vfs: &Arc<dyn VfsImpl>) -> (usize, usize) {
if v.is_empty() {
return (0, 0);
}
let unaligned_ptr = v.as_ptr() as *const u8;
let unaligned_size = std::mem::size_of_val(v);
let aligned_size = align_to_sector_size(unaligned_size);
let layout = std::alloc::Layout::from_size_align(aligned_size, SECTOR_SIZE).unwrap();
unsafe {
let aligned_ptr = std::alloc::alloc_zeroed(layout);
std::ptr::copy_nonoverlapping(unaligned_ptr, aligned_ptr, unaligned_size);
let slice = std::slice::from_raw_parts(aligned_ptr, aligned_size);
let offset = serialize_u8_slice_to_disk(slice, vfs);
std::alloc::dealloc(aligned_ptr, layout);
(offset, unaligned_size)
}
}
fn read_vec_from_offset<T: Clone>(offset: usize, size: usize, vfs: &Arc<dyn VfsImpl>) -> Vec<T> {
assert!(size > 0);
let slice = read_u8_slice_from_disk(offset, size, vfs);
let ptr = slice.as_ptr() as *const T;
let size = size / std::mem::size_of::<T>();
let slice = unsafe { std::slice::from_raw_parts(ptr, size) };
slice.to_vec()
}
fn read_u8_slice_from_disk(offset: usize, size: usize, vfs: &Arc<dyn VfsImpl>) -> Vec<u8> {
let mut res = Vec::new();
let mut buffer = vec![0; DISK_PAGE_SIZE];
for i in (0..size).step_by(DISK_PAGE_SIZE) {
vfs.read(offset + i, &mut buffer); res.extend_from_slice(&buffer);
}
res
}
const SECTOR_SIZE: usize = 512;
fn align_to_sector_size(n: usize) -> usize {
(n + SECTOR_SIZE - 1) & !(SECTOR_SIZE - 1)
}
fn serialize_u8_slice_to_disk(slice: &[u8], vfs: &Arc<dyn VfsImpl>) -> usize {
let mut start_offset = None;
for chunk in slice.chunks(DISK_PAGE_SIZE) {
let offset = vfs.alloc_offset(DISK_PAGE_SIZE); if start_offset.is_none() {
start_offset = Some(offset);
}
vfs.write(offset, chunk);
}
start_offset.unwrap()
}
#[cfg(test)]
mod tests {
use crate::{nodes::leaf_node::LeafReadResult, sync::thread, BfTree, Config};
use std::panic;
#[cfg(feature = "shuttle")]
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::sync::{atomic::AtomicBool, Arc};
#[test]
fn cpr_snapshot_disk() {
panic::set_hook(Box::new(|info| {
eprintln!("PANIC: {info}");
unsafe { std::arch::asm!("int 3") };
}));
let min_record_size: usize = 64;
let max_record_size: usize = 2408;
let leaf_page_size: usize = 8192;
let snapshot_num: usize = 10;
let num_threads: usize = 4;
let file_path: String = "target/test_simple.bftree".to_string();
let snapshot_file_path: String = "target/test_simple_snapshot.bftree".to_string();
let tmp_file_path = std::path::PathBuf::from_str(&file_path).unwrap();
let tmp_snapshot_file_path = std::path::PathBuf::from_str(&snapshot_file_path).unwrap();
let mut config = Config::new(&tmp_file_path, 128 * 1024); config.storage_backend(crate::StorageBackend::Std);
config.cb_min_record_size = min_record_size + 2 * std::mem::size_of::<usize>();
config.cb_max_record_size = max_record_size;
config.leaf_page_size = leaf_page_size;
config.max_fence_len = min_record_size + 2 * std::mem::size_of::<usize>();
config.use_snapshot(true);
let bftree = Arc::new(BfTree::with_config(config.clone(), None).unwrap());
let finish = Arc::new(AtomicBool::new(false));
let handles: Vec<_> = (0..num_threads)
.map(|i| {
let finish_clone = finish.clone();
let bftree_clone = bftree.clone();
thread::spawn(move || {
let key_len: usize = min_record_size / 2 + std::mem::size_of::<usize>();
assert!(key_len * 2 <= max_record_size);
let mut key_buffer = vec![0usize; key_len / std::mem::size_of::<usize>()];
let mut r: usize = 0;
while !finish_clone.load(Ordering::Relaxed) {
key_buffer.fill(r);
key_buffer[0] = i;
match bftree_clone.insert(
bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
) {
crate::LeafInsertResult::Success => {}
_ => {
panic!("Insert failed");
}
}
r += 1;
}
r
})
})
.collect();
thread::sleep(std::time::Duration::from_secs(5));
for _ in 0..snapshot_num {
let _ = std::fs::remove_file(&tmp_snapshot_file_path);
bftree.cpr_snapshot(&tmp_snapshot_file_path);
thread::sleep(std::time::Duration::from_secs(5));
}
let mut rs = vec![0usize; num_threads];
finish.store(true, Ordering::Relaxed);
for (i, h) in handles.into_iter().enumerate() {
let r = h.join().unwrap();
rs[i] = r;
}
verify_snapshot_recovery(
&tmp_snapshot_file_path,
num_threads,
min_record_size,
&rs,
true,
);
std::fs::remove_file(tmp_file_path).unwrap();
std::fs::remove_file(tmp_snapshot_file_path).unwrap();
}
#[test]
fn cpr_snapshot_cache_only() {
let min_record_size: usize = 64;
let max_record_size: usize = 2408;
let leaf_page_size: usize = 8192;
let num_threads: usize = 4;
let snapshot_file_path: String =
"target/test_simple_cache_only_snapshot.bftree".to_string();
let tmp_snapshot_file_path = std::path::PathBuf::from_str(&snapshot_file_path).unwrap();
let mut config = Config::default(); config.storage_backend(crate::StorageBackend::Memory);
config.file_path(":memory:");
config.cache_only = true;
config.cb_size_byte(1024 * 1024 * 1024);
config.cb_min_record_size = min_record_size;
config.cb_max_record_size = max_record_size;
config.leaf_page_size = leaf_page_size;
config.max_fence_len = max_record_size;
config.use_snapshot(true);
let bftree = Arc::new(BfTree::with_config(config.clone(), None).unwrap());
let finish = Arc::new(AtomicBool::new(false));
let handles: Vec<_> = (0..num_threads)
.map(|i| {
let finish_clone = finish.clone();
let bftree_clone = bftree.clone();
thread::spawn(move || {
let key_len: usize = min_record_size / 2 + std::mem::size_of::<usize>();
assert!(key_len * 2 <= max_record_size);
let mut key_buffer = vec![0usize; key_len / std::mem::size_of::<usize>()];
let mut r: usize = 0;
while !finish_clone.load(Ordering::Relaxed) {
key_buffer.fill(r);
key_buffer[0] = i;
match bftree_clone.insert(
bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
) {
crate::LeafInsertResult::Success => {}
_ => {
panic!("Insert failed");
}
}
r += 1;
}
r
})
})
.collect();
thread::sleep(std::time::Duration::from_secs(5));
bftree.cpr_snapshot(&tmp_snapshot_file_path);
thread::sleep(std::time::Duration::from_secs(5));
let mut rs = vec![0usize; num_threads];
finish.store(true, Ordering::Relaxed);
for (i, h) in handles.into_iter().enumerate() {
let r = h.join().unwrap();
rs[i] = r;
}
verify_snapshot_recovery(
&tmp_snapshot_file_path,
num_threads,
min_record_size,
&rs,
false,
);
std::fs::remove_file(tmp_snapshot_file_path).unwrap();
}
fn verify_snapshot_recovery(
snapshot_file: impl AsRef<std::path::Path>,
num_threads: usize,
min_record_size: usize,
records_num_per_threads: &Vec<usize>,
check_prefix: bool,
) {
let bftree = BfTree::new_from_cpr_snapshot(snapshot_file, false, None, None, None)
.expect("fail to recover from snapshot");
let mut rs_captured = vec![0usize; num_threads];
for i in 0..num_threads {
let record_num = records_num_per_threads[i];
let key_len: usize = min_record_size / 2 + std::mem::size_of::<usize>();
let mut key_buffer = vec![0usize; key_len / std::mem::size_of::<usize>()];
let mut res_buffer = vec![0u8; key_len];
let mut not_included = false;
let mut first_gap_record: Option<usize> = None;
for r in 0..record_num {
key_buffer.fill(r);
key_buffer[0] = i;
match bftree.read(
bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
&mut res_buffer,
) {
LeafReadResult::Found(v) => {
if check_prefix && not_included {
let mut gaps = Vec::new();
let mut found_after = Vec::new();
let gap_start = first_gap_record.unwrap();
let scan_end = std::cmp::min(r + 50, record_num);
for scan_r in gap_start..scan_end {
key_buffer.fill(scan_r);
key_buffer[0] = i;
match bftree.read(
bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
&mut res_buffer,
) {
LeafReadResult::Found(_) => {
found_after.push(scan_r);
}
LeafReadResult::NotFound => {
gaps.push(scan_r);
}
_ => {}
}
}
panic!(
"PREFIX VIOLATION: thread={}, first_gap_at={}, found_record_after_gap={}, \
total_captured_before_gap={}, total_records={}\n\
Gaps in [{}, {}): {:?}\n\
Found in [{}, {}): {:?}",
i, gap_start, r, rs_captured[i], record_num,
gap_start, scan_end, &gaps[..std::cmp::min(gaps.len(), 20)],
gap_start, scan_end, &found_after[..std::cmp::min(found_after.len(), 20)],
);
}
assert_eq!(v as usize, key_len);
assert_eq!(
&res_buffer,
bytemuck::must_cast_slice::<usize, u8>(&key_buffer)
);
rs_captured[i] += 1;
}
LeafReadResult::NotFound => {
if !not_included {
not_included = true;
first_gap_record = Some(r);
}
}
_ => {
panic!("Unexpected read result")
}
}
}
assert!(rs_captured[i] <= record_num);
println!("Total inserted records for thread {}: {}", i, record_num);
println!(
"Hit ratio for thread {}: {}",
i,
rs_captured[i] as f64 / record_num as f64
);
}
}
#[cfg(feature = "shuttle")]
fn shuttle_cpr_snapshot_cache_only_inner(iter: usize) {
let min_record_size: usize = 64;
let max_record_size: usize = 2408;
let leaf_page_size: usize = 8192;
let num_threads: usize = 4;
let inserts_per_thread: usize = 1_000;
let snapshot_file_path: String = format!(
"target/shuttle_cpr_snapshot_cache_only_{}_{}.bftree",
std::process::id(),
iter,
);
let tmp_snapshot_file_path = std::path::PathBuf::from_str(&snapshot_file_path).unwrap();
let mut config = Config::default();
config.storage_backend(crate::StorageBackend::Memory);
config.file_path(":memory:");
config.cache_only = true;
config.cb_size_byte(1024 * 1024 * 1024);
config.cb_min_record_size = min_record_size;
config.cb_max_record_size = max_record_size;
config.leaf_page_size = leaf_page_size;
config.max_fence_len = max_record_size;
config.use_snapshot(true);
let bftree = Arc::new(BfTree::with_config(config.clone(), None).unwrap());
let mut rs = vec![0usize; num_threads];
for j in 0..2 {
let handles: Vec<_> = (0..num_threads)
.map(|i| {
let bftree_clone = bftree.clone();
let start_id = j * inserts_per_thread;
let end_id = start_id + inserts_per_thread;
thread::spawn(move || {
let key_len: usize = min_record_size / 2 + std::mem::size_of::<usize>();
assert!(key_len * 2 <= max_record_size);
let mut key_buffer = vec![0usize; key_len / std::mem::size_of::<usize>()];
for r in start_id..end_id {
key_buffer.fill(r);
key_buffer[0] = i;
match bftree_clone.insert(
bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
) {
crate::LeafInsertResult::Success => {}
_ => {
panic!("Insert failed");
}
}
}
inserts_per_thread
})
})
.collect();
let bftree_for_snap = bftree.clone();
let snap_path = tmp_snapshot_file_path.clone();
let snap_handle = thread::spawn(move || {
bftree_for_snap.cpr_snapshot(&snap_path);
});
for (i, h) in handles.into_iter().enumerate() {
let r = h.join().unwrap();
rs[i] += r;
}
snap_handle.join().unwrap();
let snap_path = tmp_snapshot_file_path.clone();
verify_snapshot_recovery(&snap_path, num_threads, min_record_size, &rs, false);
}
let _ = std::fs::remove_file(tmp_snapshot_file_path);
}
#[cfg(feature = "shuttle")]
#[test]
fn shuttle_cpr_snapshot_cache_only() {
use std::sync::atomic::AtomicUsize;
static ITER: AtomicUsize = AtomicUsize::new(0);
let mut shuttle_config = shuttle::Config::default();
shuttle_config.max_steps = shuttle::MaxSteps::None;
shuttle_config.stack_size = 1024 * 1024 * 1024; shuttle_config.failure_persistence =
shuttle::FailurePersistence::File(Some(PathBuf::from_str("target").unwrap()));
let mut runner = shuttle::PortfolioRunner::new(true, shuttle_config);
let available_cores = std::thread::available_parallelism().unwrap().get().min(4);
for _ in 0..available_cores {
runner.add(shuttle::scheduler::PctScheduler::new(10, 1000));
}
runner.run(|| {
let iter = ITER.fetch_add(1, Ordering::Relaxed);
shuttle_cpr_snapshot_cache_only_inner(iter);
eprintln!("Completed shuttle iteration {}", iter);
});
}
#[cfg(feature = "shuttle")]
fn shuttle_cpr_snapshot_disk_inner(iter: usize) {
let min_record_size: usize = 64;
let max_record_size: usize = 2408;
let leaf_page_size: usize = 8192;
let num_threads: usize = 4;
let inserts_per_thread: usize = 500;
let file_path: String = format!(
"target/shuttle_cpr_snapshot_disk_{}_{}.bftree",
std::process::id(),
iter,
);
let snapshot_file_path: String = format!(
"target/shuttle_cpr_snapshot_disk_{}_{}_snap.bftree",
std::process::id(),
iter,
);
let tmp_file_path = std::path::PathBuf::from_str(&file_path).unwrap();
let tmp_snapshot_file_path = std::path::PathBuf::from_str(&snapshot_file_path).unwrap();
let mut config = Config::new(&tmp_file_path, 128 * 1024); config.storage_backend(crate::StorageBackend::Std);
config.cb_min_record_size = min_record_size + 2 * std::mem::size_of::<usize>();
config.cb_max_record_size = max_record_size;
config.leaf_page_size = leaf_page_size;
config.max_fence_len = min_record_size + 2 * std::mem::size_of::<usize>();
config.use_snapshot(true);
let bftree = Arc::new(BfTree::with_config(config.clone(), None).unwrap());
let mut rs = vec![0usize; num_threads];
for j in 0..3 {
let handles: Vec<_> = (0..num_threads)
.map(|i| {
let bftree_clone = bftree.clone();
let start_id = j * inserts_per_thread;
let end_id = start_id + inserts_per_thread;
thread::spawn(move || {
let key_len: usize = min_record_size / 2 + std::mem::size_of::<usize>();
assert!(key_len * 2 <= max_record_size);
let mut key_buffer = vec![0usize; key_len / std::mem::size_of::<usize>()];
for r in start_id..end_id {
key_buffer.fill(r);
key_buffer[0] = i;
match bftree_clone.insert(
bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
) {
crate::LeafInsertResult::Success => {}
_ => {
panic!("Insert failed");
}
}
}
inserts_per_thread
})
})
.collect();
let bftree_for_snap = bftree.clone();
let snap_path = tmp_snapshot_file_path.clone();
let snap_handle = thread::spawn(move || {
bftree_for_snap.cpr_snapshot(&snap_path);
});
for (i, h) in handles.into_iter().enumerate() {
let r = h.join().unwrap();
rs[i] += r;
}
snap_handle.join().unwrap();
verify_snapshot_recovery(
&tmp_snapshot_file_path,
num_threads,
min_record_size,
&rs,
true,
);
}
let _ = std::fs::remove_file(tmp_file_path);
let _ = std::fs::remove_file(tmp_snapshot_file_path);
}
#[cfg(feature = "shuttle")]
#[test]
fn shuttle_cpr_snapshot_disk() {
use std::sync::atomic::AtomicUsize;
static ITER: AtomicUsize = AtomicUsize::new(0);
let mut shuttle_config = shuttle::Config::default();
shuttle_config.max_steps = shuttle::MaxSteps::None;
shuttle_config.stack_size = 4 * 1024 * 1024; shuttle_config.failure_persistence =
shuttle::FailurePersistence::File(Some(PathBuf::from_str("target").unwrap()));
let mut runner = shuttle::PortfolioRunner::new(true, shuttle_config);
let available_cores = std::thread::available_parallelism().unwrap().get().min(4);
for _ in 0..available_cores {
runner.add(shuttle::scheduler::PctScheduler::new(10, 100));
}
runner.run(|| {
let iter = ITER.fetch_add(1, Ordering::Relaxed);
shuttle_cpr_snapshot_disk_inner(iter);
});
}
#[cfg(feature = "shuttle")]
#[test]
fn shuttle_replay() {
let schedule_path = "target/schedule000.txt";
if !std::path::Path::new(schedule_path).exists() {
eprintln!("No schedule file at {schedule_path}; run shuttle_cpr_snapshot_disk to generate one on failure.");
return;
}
tracing_subscriber::fmt()
.with_ansi(true)
.with_thread_names(false)
.with_target(false)
.init();
shuttle::replay_from_file(|| shuttle_cpr_snapshot_disk_inner(0), schedule_path);
}
}