use crate::compat::{HashMap, Mutex};
#[cfg(feature = "std")]
use crate::db::CorruptPageInfo;
pub(crate) const MAX_TREE_DEPTH: u32 = 64;
use crate::db::TransactionGuard;
use crate::tree_store::btree_base::{
AccessGuardMut, BRANCH, BranchAccessor, BranchMutator, BtreeHeader, Checksum, DEFERRED, LEAF,
LeafAccessor, branch_checksum, leaf_checksum,
};
use crate::tree_store::btree_iters::BtreeExtractIf;
use crate::tree_store::btree_mutator::MutateHelper;
use crate::tree_store::page_store::compression::CompressionConfig;
use crate::tree_store::page_store::{Page, PageImpl, PageMut, TransactionalMemory};
use crate::tree_store::{
AccessGuardMutInPlace, AllPageNumbersBtreeIter, BtreeRangeIter, PageHint, PageNumber,
PageTrackerPolicy,
};
use crate::types::{Key, MutInPlaceValue, Value};
use crate::{AccessGuard, Result, StorageError};
use alloc::format;
use alloc::string::String;
#[cfg(feature = "std")]
use alloc::string::ToString;
use alloc::sync::Arc;
use alloc::vec;
use alloc::vec::Vec;
use core::borrow::Borrow;
use core::cmp::max;
use core::marker::PhantomData;
use core::ops::RangeBounds;
#[cfg(feature = "logging")]
use log::trace;
pub(crate) struct BtreeStats {
pub(crate) tree_height: u32,
pub(crate) leaf_pages: u64,
pub(crate) branch_pages: u64,
pub(crate) stored_leaf_bytes: u64,
pub(crate) metadata_bytes: u64,
pub(crate) fragmented_bytes: u64,
}
#[derive(Clone)]
pub(crate) struct PagePath {
path: Vec<PageNumber>,
}
impl PagePath {
pub(crate) fn new_root(page_number: PageNumber) -> Self {
Self {
path: vec![page_number],
}
}
pub(crate) fn with_child(&self, page_number: PageNumber) -> Self {
let mut path = self.path.clone();
path.push(page_number);
Self { path }
}
pub(crate) fn with_subpath(&self, other: &Self) -> Self {
let mut path = self.path.clone();
path.extend(&other.path);
Self { path }
}
pub(crate) fn parents(&self) -> &[PageNumber] {
&self.path[..self.path.len() - 1]
}
pub(crate) fn page_number(&self) -> PageNumber {
self.path[self.path.len() - 1]
}
}
pub(crate) struct UntypedBtree {
mem: Arc<TransactionalMemory>,
root: Option<BtreeHeader>,
key_width: Option<usize>,
_value_width: Option<usize>,
}
impl UntypedBtree {
pub(crate) fn new(
root: Option<BtreeHeader>,
mem: Arc<TransactionalMemory>,
key_width: Option<usize>,
value_width: Option<usize>,
) -> Self {
Self {
mem,
root,
key_width,
_value_width: value_width,
}
}
pub(crate) fn visit_all_pages<F>(&self, mut visitor: F) -> Result
where
F: FnMut(&PagePath) -> Result,
{
if let Some(page_number) = self.root.map(|x| x.root) {
self.visit_pages_helper(PagePath::new_root(page_number), &mut visitor, 0)?;
}
Ok(())
}
fn visit_pages_helper<F>(&self, path: PagePath, visitor: &mut F, depth: u32) -> Result
where
F: FnMut(&PagePath) -> Result,
{
if depth > MAX_TREE_DEPTH {
return Err(StorageError::Corrupted(format!(
"B-tree depth {depth} exceeds maximum {MAX_TREE_DEPTH}"
)));
}
visitor(&path)?;
let page = self.mem.get_page(path.page_number())?;
match page.memory()[0] {
LEAF => {
}
BRANCH => {
let accessor = BranchAccessor::new(&page, self.key_width)?;
for i in 0..accessor.count_children() {
let child_page = accessor.child_page(i).ok_or_else(|| {
StorageError::invalid_child_pointer(page.get_page_number(), i)
})?;
let child_path = path.with_child(child_page);
self.visit_pages_helper(child_path, visitor, depth + 1)?;
}
}
x => {
return Err(StorageError::invalid_page_type(page.get_page_number(), x));
}
}
Ok(())
}
}
#[cfg(feature = "std")]
pub(crate) fn salvage_tree_leaves(
root: BtreeHeader,
mem: &TransactionalMemory,
fixed_key_size: Option<usize>,
fixed_value_size: Option<usize>,
pairs: &mut Vec<(Vec<u8>, Vec<u8>)>,
corruptions: &mut Vec<CorruptPageInfo>,
) -> u64 {
let mut recovered = 0u64;
salvage_node(
root.root,
mem,
fixed_key_size,
fixed_value_size,
pairs,
corruptions,
&mut recovered,
0,
);
recovered
}
#[allow(clippy::too_many_arguments)]
#[cfg(feature = "std")]
fn salvage_node(
page_number: PageNumber,
mem: &TransactionalMemory,
fixed_key_size: Option<usize>,
fixed_value_size: Option<usize>,
pairs: &mut Vec<(Vec<u8>, Vec<u8>)>,
corruptions: &mut Vec<CorruptPageInfo>,
recovered: &mut u64,
depth: u32,
) {
if depth > MAX_TREE_DEPTH {
corruptions.push(CorruptPageInfo {
page_number: u64::from_le_bytes(page_number.to_le_bytes()),
table_name: None,
description: format!("B-tree depth {depth} exceeds maximum {MAX_TREE_DEPTH}"),
});
return;
}
let Ok(page) = mem.get_page(page_number) else {
corruptions.push(CorruptPageInfo {
page_number: u64::from_le_bytes(page_number.to_le_bytes()),
table_name: None,
description: "failed to read page".to_string(),
});
return;
};
let node_mem = page.memory();
if node_mem.is_empty() {
corruptions.push(CorruptPageInfo {
page_number: u64::from_le_bytes(page_number.to_le_bytes()),
table_name: None,
description: "empty page".to_string(),
});
return;
}
match node_mem[0] {
LEAF => {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let accessor = LeafAccessor::new(node_mem, fixed_key_size, fixed_value_size)
.expect("internal: valid page");
let mut leaf_pairs = Vec::new();
for i in 0..accessor.num_pairs() {
if let Some(entry) = accessor.entry(i) {
leaf_pairs.push((entry.key().to_vec(), entry.value().to_vec()));
}
}
leaf_pairs
}));
match result {
Ok(leaf_pairs) => {
*recovered += leaf_pairs.len() as u64;
pairs.extend(leaf_pairs);
}
Err(_) => {
corruptions.push(CorruptPageInfo {
page_number: u64::from_le_bytes(page_number.to_le_bytes()),
table_name: None,
description: "leaf page data corrupt (parse panic)".to_string(),
});
}
}
}
BRANCH => {
let children: Vec<PageNumber> =
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let accessor =
BranchAccessor::new(&page, fixed_key_size).expect("internal: valid page");
let mut kids = Vec::new();
for i in 0..accessor.count_children() {
if let Some(child) = accessor.child_page(i) {
kids.push(child);
}
}
kids
}))
.unwrap_or_default();
if children.is_empty() && node_mem.len() > 4 {
corruptions.push(CorruptPageInfo {
page_number: u64::from_le_bytes(page_number.to_le_bytes()),
table_name: None,
description: "branch page corrupt or empty".to_string(),
});
}
for child in children {
salvage_node(
child,
mem,
fixed_key_size,
fixed_value_size,
pairs,
corruptions,
recovered,
depth + 1,
);
}
}
other => {
corruptions.push(CorruptPageInfo {
page_number: u64::from_le_bytes(page_number.to_le_bytes()),
table_name: None,
description: format!("unknown page type: {other}"),
});
}
}
}
pub(crate) struct UntypedBtreeMut {
mem: Arc<TransactionalMemory>,
root: Option<BtreeHeader>,
freed_pages: Arc<Mutex<Vec<PageNumber>>>,
key_width: Option<usize>,
value_width: Option<usize>,
}
impl UntypedBtreeMut {
pub(crate) fn new(
root: Option<BtreeHeader>,
mem: Arc<TransactionalMemory>,
freed_pages: Arc<Mutex<Vec<PageNumber>>>,
key_width: Option<usize>,
value_width: Option<usize>,
) -> Self {
Self {
mem,
root,
freed_pages,
key_width,
value_width,
}
}
pub(crate) fn get_root(&self) -> Option<BtreeHeader> {
self.root
}
pub(crate) fn finalize_dirty_checksums(&mut self) -> Result<Option<BtreeHeader>> {
let mut root = self.root;
if let Some(BtreeHeader {
root: ref p,
ref mut checksum,
length: _,
}) = root
{
if !self.mem.uncommitted(*p) {
return Ok(root);
}
*checksum = self.finalize_dirty_checksums_helper(*p)?;
self.root = root;
}
Ok(root)
}
fn finalize_dirty_checksums_helper(&mut self, page_number: PageNumber) -> Result<Checksum> {
if !self.mem.uncommitted(page_number) {
return Err(StorageError::page_corrupted(
page_number,
"finalize_dirty_checksums_helper called on committed page",
));
}
let mut page = self.mem.get_page_mut(page_number)?;
match page.memory()[0] {
LEAF => leaf_checksum(&page, self.key_width, self.value_width),
BRANCH => {
let accessor = BranchAccessor::new(&page, self.key_width)?;
let mut new_children = vec![];
for i in 0..accessor.count_children() {
let child_page = accessor
.child_page(i)
.ok_or_else(|| StorageError::invalid_child_pointer(page_number, i))?;
if self.mem.uncommitted(child_page) {
let new_checksum = self.finalize_dirty_checksums_helper(child_page)?;
new_children.push(Some((i, child_page, new_checksum)));
} else {
new_children.push(None);
}
}
let mut mutator = BranchMutator::new(page.memory_mut()?);
for (child_index, child_page, child_checksum) in new_children.into_iter().flatten()
{
mutator.write_child_page(child_index, child_page, child_checksum);
}
branch_checksum(&page, self.key_width)
}
x => Err(StorageError::invalid_page_type(page.get_page_number(), x)),
}
}
pub(crate) fn dirty_leaf_visitor<F>(&mut self, visitor: F) -> Result
where
F: Fn(PageMut) -> Result,
{
if let Some(page_number) = self.root.map(|x| x.root) {
if !self.mem.uncommitted(page_number) {
return Ok(());
}
let page = self.mem.get_page_mut(page_number)?;
match page.memory()[0] {
LEAF => {
visitor(page)?;
}
BRANCH => {
drop(page);
self.dirty_leaf_visitor_helper(page_number, &visitor, 0)?;
}
x => {
return Err(StorageError::invalid_page_type(page.get_page_number(), x));
}
}
}
Ok(())
}
fn dirty_leaf_visitor_helper<F>(
&mut self,
page_number: PageNumber,
visitor: &F,
depth: u32,
) -> Result
where
F: Fn(PageMut) -> Result,
{
if depth > MAX_TREE_DEPTH {
return Err(StorageError::Corrupted(format!(
"B-tree depth {depth} exceeds maximum {MAX_TREE_DEPTH}"
)));
}
if !self.mem.uncommitted(page_number) {
return Err(StorageError::page_corrupted(
page_number,
"dirty_leaf_visitor_helper called on committed page",
));
}
let page = self.mem.get_page_mut(page_number)?;
match page.memory()[0] {
LEAF => {
visitor(page)?;
}
BRANCH => {
let accessor = BranchAccessor::new(&page, self.key_width)?;
for i in 0..accessor.count_children() {
let child_page = accessor
.child_page(i)
.ok_or_else(|| StorageError::invalid_child_pointer(page_number, i))?;
if self.mem.uncommitted(child_page) {
self.dirty_leaf_visitor_helper(child_page, visitor, depth + 1)?;
}
}
}
x => {
return Err(StorageError::invalid_page_type(page.get_page_number(), x));
}
}
Ok(())
}
pub(crate) fn relocate(
&mut self,
relocation_map: &HashMap<PageNumber, PageNumber>,
) -> Result<bool> {
if let Some(root) = self.get_root()
&& let Some((new_root, new_checksum)) =
self.relocate_helper(root.root, relocation_map, 0)?
{
self.root = Some(BtreeHeader::new(new_root, new_checksum, root.length));
return Ok(true);
}
Ok(false)
}
fn relocate_helper(
&mut self,
page_number: PageNumber,
relocation_map: &HashMap<PageNumber, PageNumber>,
depth: u32,
) -> Result<Option<(PageNumber, Checksum)>> {
if depth > MAX_TREE_DEPTH {
return Err(StorageError::Corrupted(format!(
"B-tree depth {depth} exceeds maximum {MAX_TREE_DEPTH} during relocation"
)));
}
let old_page = self.mem.get_page(page_number)?;
let mut new_page = if let Some(new_page_number) = relocation_map.get(&page_number) {
self.mem.get_page_mut(*new_page_number)?
} else {
return Ok(None);
};
new_page.memory_mut()?.copy_from_slice(old_page.memory());
let node_mem = old_page.memory();
match node_mem[0] {
LEAF => {
}
BRANCH => {
let accessor = BranchAccessor::new(&old_page, self.key_width)?;
let mut mutator = BranchMutator::new(new_page.memory_mut()?);
for i in 0..accessor.count_children() {
let child = accessor
.child_page(i)
.ok_or_else(|| StorageError::invalid_child_pointer(page_number, i))?;
if let Some((new_child, new_checksum)) =
self.relocate_helper(child, relocation_map, depth + 1)?
{
mutator.write_child_page(i, new_child, new_checksum);
}
}
}
x => {
return Err(StorageError::invalid_page_type(
old_page.get_page_number(),
x,
));
}
}
let mut freed_pages = self.freed_pages.lock();
let mut ignore = PageTrackerPolicy::Ignore;
if !self.mem.free_if_uncommitted(page_number, &mut ignore)? {
freed_pages.push(page_number);
}
Ok(Some((new_page.get_page_number(), DEFERRED)))
}
}
pub(crate) struct BtreeMut<'a, K: Key + 'static, V: Value + 'static> {
mem: Arc<TransactionalMemory>,
transaction_guard: Arc<TransactionGuard>,
root: Option<BtreeHeader>,
freed_pages: Arc<Mutex<Vec<PageNumber>>>,
allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
compression_override: Option<CompressionConfig>,
_key_type: PhantomData<K>,
_value_type: PhantomData<V>,
_lifetime: PhantomData<&'a ()>,
}
impl<K: Key + 'static, V: Value + 'static> BtreeMut<'_, K, V> {
pub(crate) fn new(
root: Option<BtreeHeader>,
guard: Arc<TransactionGuard>,
mem: Arc<TransactionalMemory>,
freed_pages: Arc<Mutex<Vec<PageNumber>>>,
allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
) -> Self {
Self {
mem,
transaction_guard: guard,
root,
freed_pages,
allocated_pages,
compression_override: None,
_key_type: Default::default(),
_value_type: Default::default(),
_lifetime: Default::default(),
}
}
pub(crate) fn new_uncompressed(
root: Option<BtreeHeader>,
guard: Arc<TransactionGuard>,
mem: Arc<TransactionalMemory>,
freed_pages: Arc<Mutex<Vec<PageNumber>>>,
allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
) -> Self {
Self {
mem,
transaction_guard: guard,
root,
freed_pages,
allocated_pages,
compression_override: Some(CompressionConfig::None),
_key_type: Default::default(),
_value_type: Default::default(),
_lifetime: Default::default(),
}
}
fn compression(&self) -> CompressionConfig {
self.compression_override
.unwrap_or_else(|| self.mem.compression())
}
fn value_width(&self) -> Option<usize> {
if self.compression().is_enabled() {
None
} else {
V::fixed_width()
}
}
pub(crate) fn finalize_dirty_checksums(&mut self) -> Result<Option<BtreeHeader>> {
let mut tree = UntypedBtreeMut::new(
self.get_root(),
self.mem.clone(),
self.freed_pages.clone(),
K::fixed_width(),
self.value_width(),
);
self.root = tree.finalize_dirty_checksums()?;
Ok(self.root)
}
#[allow(dead_code)]
pub(crate) fn all_pages_iter(&self) -> Result<Option<AllPageNumbersBtreeIter>> {
if let Some(root) = self.root.map(|x| x.root) {
Ok(Some(AllPageNumbersBtreeIter::new(
root,
K::fixed_width(),
self.value_width(),
self.mem.clone(),
)?))
} else {
Ok(None)
}
}
pub(crate) fn visit_all_pages<F>(&self, visitor: F) -> Result
where
F: FnMut(&PagePath) -> Result,
{
self.read_tree()?.visit_all_pages(visitor)
}
pub(crate) fn get_root(&self) -> Option<BtreeHeader> {
self.root
}
pub(crate) fn set_root(&mut self, root: Option<BtreeHeader>) {
self.root = root;
}
pub(crate) fn relocate(
&mut self,
relocation_map: &HashMap<PageNumber, PageNumber>,
) -> Result<bool> {
let mut tree = UntypedBtreeMut::new(
self.get_root(),
self.mem.clone(),
self.freed_pages.clone(),
K::fixed_width(),
self.value_width(),
);
if tree.relocate(relocation_map)? {
self.root = tree.get_root();
Ok(true)
} else {
Ok(false)
}
}
pub(crate) fn insert(
&mut self,
key: &K::SelfType<'_>,
value: &V::SelfType<'_>,
) -> Result<Option<AccessGuard<'_, V>>> {
#[cfg(feature = "logging")]
trace!(
"Btree(root={:?}): Inserting {:?} with value of length {}",
&self.root,
key,
V::as_bytes(value).as_ref().len()
);
let compression = self.compression();
let mut freed_pages = self.freed_pages.lock();
let mut operation: MutateHelper<'_, '_, K, V> = MutateHelper::new(
&mut self.root,
self.mem.clone(),
freed_pages.as_mut(),
self.allocated_pages.clone(),
compression,
);
let (old_value, _) = operation.insert(key, value)?;
Ok(old_value)
}
pub(crate) fn insert_inplace(
&mut self,
key: &K::SelfType<'_>,
value: &V::SelfType<'_>,
) -> Result<()> {
let compression = self.compression();
let mut fake_freed_pages = vec![];
let fake_allocated_pages = Arc::new(Mutex::new(PageTrackerPolicy::Closed));
let mut operation = MutateHelper::<K, V>::new(
&mut self.root,
self.mem.clone(),
fake_freed_pages.as_mut(),
fake_allocated_pages,
compression,
);
operation.insert_inplace(key, value)?;
if !fake_freed_pages.is_empty() {
return Err(StorageError::Internal(format!(
"insert_inplace unexpectedly freed {} pages",
fake_freed_pages.len()
)));
}
Ok(())
}
pub(crate) fn remove(&mut self, key: &K::SelfType<'_>) -> Result<Option<AccessGuard<'_, V>>> {
#[cfg(feature = "logging")]
trace!("Btree(root={:?}): Deleting {:?}", &self.root, key);
let compression = self.compression();
let mut freed_pages = self.freed_pages.lock();
let mut operation: MutateHelper<'_, '_, K, V> = MutateHelper::new(
&mut self.root,
self.mem.clone(),
freed_pages.as_mut(),
self.allocated_pages.clone(),
compression,
);
let result = operation.delete(key)?;
Ok(result)
}
pub(crate) fn pop_first(
&mut self,
) -> Result<Option<(AccessGuard<'static, K>, AccessGuard<'_, V>)>> {
let compression = self.compression();
let mut freed_pages = self.freed_pages.lock();
let mut operation: MutateHelper<'_, '_, K, V> = MutateHelper::new(
&mut self.root,
self.mem.clone(),
freed_pages.as_mut(),
self.allocated_pages.clone(),
compression,
);
match operation.delete_first()? {
Some((key_bytes, value_guard)) => Ok(Some((
AccessGuard::with_owned_value(key_bytes),
value_guard,
))),
None => Ok(None),
}
}
pub(crate) fn pop_last(
&mut self,
) -> Result<Option<(AccessGuard<'static, K>, AccessGuard<'_, V>)>> {
let compression = self.compression();
let mut freed_pages = self.freed_pages.lock();
let mut operation: MutateHelper<'_, '_, K, V> = MutateHelper::new(
&mut self.root,
self.mem.clone(),
freed_pages.as_mut(),
self.allocated_pages.clone(),
compression,
);
match operation.delete_last()? {
Some((key_bytes, value_guard)) => Ok(Some((
AccessGuard::with_owned_value(key_bytes),
value_guard,
))),
None => Ok(None),
}
}
#[allow(dead_code)]
#[cfg(feature = "std")]
pub(crate) fn print_debug(&self, include_values: bool) -> Result {
self.read_tree()?.print_debug(include_values)
}
pub(crate) fn stats(&self) -> Result<BtreeStats> {
btree_stats(
self.get_root().map(|x| x.root),
&self.mem,
K::fixed_width(),
self.value_width(),
)
}
fn read_tree(&self) -> Result<Btree<K, V>> {
if self.compression_override == Some(CompressionConfig::None) {
Btree::new_uncompressed(
self.get_root(),
PageHint::None,
self.transaction_guard.clone(),
self.mem.clone(),
)
} else {
Btree::new(
self.get_root(),
PageHint::None,
self.transaction_guard.clone(),
self.mem.clone(),
)
}
}
pub(crate) fn get(&self, key: &K::SelfType<'_>) -> Result<Option<AccessGuard<'_, V>>> {
self.read_tree()?.get(key)
}
pub(crate) fn get_mut(
&mut self,
key: &K::SelfType<'_>,
) -> Result<Option<AccessGuardMut<'_, V>>> {
if let Some(ref mut root) = self.root {
let key_bytes = K::as_bytes(key);
let query = key_bytes.as_ref();
let page_mut = if self.mem.uncommitted(root.root) {
self.mem.get_page_mut(root.root)?
} else {
let mut freed_pages = self.freed_pages.lock();
let mut allocated = self.allocated_pages.lock();
let page_size: u32 = self.mem.get_page_size().try_into().map_err(|_| {
StorageError::Corrupted(format!(
"page size {} exceeds u32 range",
self.mem.get_page_size()
))
})?;
let required: usize =
root.root
.page_size_bytes(page_size)
.try_into()
.map_err(|_| {
StorageError::page_corrupted(
root.root,
"page size bytes overflow usize",
)
})?;
let mut new_page = self.mem.allocate(required, &mut allocated)?;
let old_page = self.mem.get_page(root.root)?;
new_page.memory_mut()?.copy_from_slice(old_page.memory());
drop(old_page);
freed_pages.push(root.root);
root.root = new_page.get_page_number();
root.checksum = DEFERRED;
new_page
};
self.get_mut_helper(None, page_mut, query)
} else {
Ok(None)
}
}
fn get_mut_helper(
&mut self,
parent: Option<(PageMut, usize)>,
mut page: PageMut,
query: &[u8],
) -> Result<Option<AccessGuardMut<'_, V>>> {
let node_mem = page.memory();
match node_mem[0] {
LEAF => {
let value_width = self.value_width();
let compression = self.compression();
let accessor = LeafAccessor::new(page.memory(), K::fixed_width(), value_width)?;
if let Some(entry_index) = accessor.find_key::<K>(query) {
let (start, end) = accessor.value_range(entry_index).ok_or_else(|| {
StorageError::invalid_entry_index(page.get_page_number(), entry_index)
})?;
let root_header = self.root.as_mut().ok_or_else(|| {
StorageError::Corrupted(String::from(
"btree root missing during mutable access",
))
})?;
let guard = AccessGuardMut::new(
page,
start,
end - start,
entry_index,
parent,
self.mem.clone(),
self.allocated_pages.clone(),
root_header,
K::fixed_width(),
value_width,
compression,
)?;
Ok(Some(guard))
} else {
Ok(None)
}
}
BRANCH => {
let (child_index, child_page) = {
let accessor = BranchAccessor::new(&page, K::fixed_width())?;
accessor.child_for_key::<K>(query)?
};
let child_page_mut = if self.mem.uncommitted(child_page) {
self.mem.get_page_mut(child_page)?
} else {
let mut freed_pages = self.freed_pages.lock();
let mut allocated = self.allocated_pages.lock();
let page_size: u32 = self.mem.get_page_size().try_into().map_err(|_| {
StorageError::Corrupted(format!(
"page size {} exceeds u32 range",
self.mem.get_page_size()
))
})?;
let required: usize = child_page
.page_size_bytes(page_size)
.try_into()
.map_err(|_| {
StorageError::page_corrupted(
child_page,
"page size bytes overflow usize",
)
})?;
let mut new_page = self.mem.allocate(required, &mut allocated)?;
let old_child_page = self.mem.get_page(child_page)?;
new_page
.memory_mut()?
.copy_from_slice(old_child_page.memory());
drop(old_child_page);
freed_pages.push(child_page);
let mut mutator = BranchMutator::new(page.memory_mut()?);
mutator.write_child_page(child_index, new_page.get_page_number(), DEFERRED);
new_page
};
self.get_mut_helper(Some((page, child_index)), child_page_mut, query)
}
x => Err(StorageError::invalid_page_type(page.get_page_number(), x)),
}
}
pub(crate) fn first(
&self,
) -> Result<Option<(AccessGuard<'static, K>, AccessGuard<'static, V>)>> {
self.read_tree()?.first()
}
pub(crate) fn last(
&self,
) -> Result<Option<(AccessGuard<'static, K>, AccessGuard<'static, V>)>> {
self.read_tree()?.last()
}
pub(crate) fn range<'a0, T: RangeBounds<KR> + 'a0, KR: Borrow<K::SelfType<'a0>> + 'a0>(
&self,
range: &'_ T,
) -> Result<BtreeRangeIter<K, V>>
where
K: 'a0,
{
self.read_tree()?.range(range)
}
pub(crate) fn extract_from_if<
'a,
'a0,
T: RangeBounds<KR> + 'a0,
KR: Borrow<K::SelfType<'a0>> + 'a0,
F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool,
>(
&'a mut self,
range: &'_ T,
predicate: F,
) -> Result<BtreeExtractIf<'a, K, V, F>>
where
K: 'a0,
{
let iter = self.range(range)?;
let compression = self.compression();
let result = BtreeExtractIf::new(
&mut self.root,
iter,
predicate,
self.freed_pages.clone(),
self.allocated_pages.clone(),
self.mem.clone(),
compression,
);
Ok(result)
}
pub(crate) fn retain_in<'a, KR, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool>(
&mut self,
mut predicate: F,
range: impl RangeBounds<KR> + 'a,
) -> Result
where
KR: Borrow<K::SelfType<'a>> + 'a,
{
let iter = self.range(&range)?;
let mut freed = vec![];
let compression = self.compression();
let mut operation: MutateHelper<'_, '_, K, V> = MutateHelper::new_do_not_modify(
&mut self.root,
self.mem.clone(),
&mut freed,
self.allocated_pages.clone(),
compression,
);
for entry in iter {
let entry = entry?;
if !predicate(entry.key(), entry.value()) && operation.delete(&entry.key())?.is_none() {
return Err(StorageError::Internal(String::from(
"retain_in: delete returned None for existing key",
)));
}
}
let mut freed_pages = self.freed_pages.lock();
let mut allocated_pages = self.allocated_pages.lock();
for page in freed {
if !self.mem.free_if_uncommitted(page, &mut allocated_pages)? {
freed_pages.push(page);
}
}
Ok(())
}
pub(crate) fn len(&self) -> Result<u64> {
self.read_tree()?.len()
}
}
impl<'a, K: Key + 'a, V: MutInPlaceValue + 'a> BtreeMut<'a, K, V> {
pub(crate) fn insert_reserve(
&mut self,
key: &K::SelfType<'_>,
value_length: usize,
) -> Result<AccessGuardMutInPlace<'_, V>> {
#[cfg(feature = "logging")]
trace!(
"Btree(root={:?}): Inserting {:?} with {} reserved bytes for the value",
&self.root, key, value_length
);
let compression = self.compression();
let mut freed_pages = self.freed_pages.lock();
let mut value = vec![0u8; value_length];
V::initialize(&mut value);
let mut operation = MutateHelper::<K, V>::new(
&mut self.root,
self.mem.clone(),
freed_pages.as_mut(),
self.allocated_pages.clone(),
compression,
);
let (_, guard) = operation.insert(key, &V::from_bytes(&value))?;
Ok(guard)
}
}
#[cfg(feature = "std")]
type KeyComparator = fn(&[u8], &[u8]) -> core::cmp::Ordering;
pub(crate) struct RawBtree {
mem: Arc<TransactionalMemory>,
root: Option<BtreeHeader>,
fixed_key_size: Option<usize>,
fixed_value_size: Option<usize>,
}
impl RawBtree {
pub(crate) fn new(
root: Option<BtreeHeader>,
fixed_key_size: Option<usize>,
fixed_value_size: Option<usize>,
mem: Arc<TransactionalMemory>,
) -> Self {
Self {
mem,
root,
fixed_key_size,
fixed_value_size,
}
}
pub(crate) fn get_root(&self) -> Option<BtreeHeader> {
self.root
}
pub(crate) fn stats(&self) -> Result<BtreeStats> {
btree_stats(
self.root.map(|x| x.root),
&self.mem,
self.fixed_key_size,
self.fixed_value_size,
)
}
pub(crate) fn len(&self) -> Result<u64> {
Ok(self.root.map_or(0, |x| x.length))
}
pub(crate) fn raw_iter(&self) -> Result<crate::tree_store::btree_iters::RawEntryIter> {
crate::tree_store::btree_iters::RawEntryIter::new(
self.root.map(|h| h.root),
self.fixed_key_size,
self.fixed_value_size,
self.mem.clone(),
)
}
pub(crate) fn verify_checksum(&self) -> Result<bool> {
if let Some(header) = self.root {
self.verify_checksum_helper(header.root, header.checksum)
} else {
Ok(true)
}
}
fn verify_checksum_helper(
&self,
page_number: PageNumber,
expected_checksum: Checksum,
) -> Result<bool> {
let page = self.mem.get_page(page_number)?;
let node_mem = page.memory();
Ok(match node_mem[0] {
LEAF => {
if let Ok(computed) =
leaf_checksum(&page, self.fixed_key_size, self.fixed_value_size)
{
expected_checksum == computed
} else {
false
}
}
BRANCH => {
if let Ok(computed) = branch_checksum(&page, self.fixed_key_size) {
if expected_checksum != computed {
return Ok(false);
}
} else {
return Ok(false);
}
let accessor = BranchAccessor::new(&page, self.fixed_key_size)?;
for i in 0..accessor.count_children() {
let child = accessor
.child_page(i)
.ok_or_else(|| StorageError::invalid_child_pointer(page_number, i))?;
let checksum = accessor
.child_checksum(i)
.ok_or_else(|| StorageError::invalid_child_checksum(page_number, i))?;
if !self.verify_checksum_helper(child, checksum)? {
return Ok(false);
}
}
true
}
_ => false,
})
}
#[cfg(feature = "std")]
pub(crate) fn verify_checksum_detailed(&self) -> Result<(u64, Vec<CorruptPageInfo>)> {
let mut pages_checked = 0u64;
let mut corruptions = Vec::new();
if let Some(header) = self.root {
self.verify_checksum_detailed_helper(
header.root,
header.checksum,
&mut pages_checked,
&mut corruptions,
)?;
}
Ok((pages_checked, corruptions))
}
#[cfg(feature = "std")]
fn verify_checksum_detailed_helper(
&self,
page_number: PageNumber,
expected_checksum: Checksum,
pages_checked: &mut u64,
corruptions: &mut Vec<CorruptPageInfo>,
) -> Result {
let page = self.mem.get_page(page_number)?;
let node_mem = page.memory();
*pages_checked += 1;
match node_mem[0] {
LEAF => {
let valid = if let Ok(computed) =
leaf_checksum(&page, self.fixed_key_size, self.fixed_value_size)
{
expected_checksum == computed
} else {
false
};
if !valid {
corruptions.push(CorruptPageInfo {
page_number: u64::from_le_bytes(page_number.to_le_bytes()),
table_name: None,
description: "leaf page checksum mismatch".to_string(),
});
}
}
BRANCH => {
let valid = if let Ok(computed) = branch_checksum(&page, self.fixed_key_size) {
expected_checksum == computed
} else {
false
};
if !valid {
corruptions.push(CorruptPageInfo {
page_number: u64::from_le_bytes(page_number.to_le_bytes()),
table_name: None,
description: "branch page checksum mismatch".to_string(),
});
}
let accessor = BranchAccessor::new(&page, self.fixed_key_size)?;
for i in 0..accessor.count_children() {
let child = accessor.child_page(i).ok_or_else(|| {
StorageError::Corrupted(format!(
"invalid child page pointer at index {i} on page {page_number:?}"
))
})?;
let checksum = accessor.child_checksum(i).ok_or_else(|| {
StorageError::Corrupted(format!(
"invalid child checksum at index {i} on page {page_number:?}"
))
})?;
self.verify_checksum_detailed_helper(
child,
checksum,
pages_checked,
corruptions,
)?;
}
}
other => {
corruptions.push(CorruptPageInfo {
page_number: u64::from_le_bytes(page_number.to_le_bytes()),
table_name: None,
description: format!("unknown page type tag: {other}"),
});
}
}
Ok(())
}
#[cfg(feature = "std")]
pub(crate) fn verify_structure(
&self,
key_cmp: Option<KeyComparator>,
) -> Result<Vec<CorruptPageInfo>> {
let mut corruptions = Vec::new();
if let Some(header) = self.root {
let expected_depth = self.measure_depth(header.root)?;
self.verify_structure_helper(
header.root,
0,
expected_depth,
key_cmp,
&mut corruptions,
)?;
}
Ok(corruptions)
}
#[cfg(feature = "std")]
fn measure_depth(&self, page_number: PageNumber) -> Result<u32> {
let page = self.mem.get_page(page_number)?;
let node_mem = page.memory();
if node_mem[0] == BRANCH {
let accessor = BranchAccessor::new(&page, self.fixed_key_size)?;
if let Some(child) = accessor.child_page(0) {
return Ok(1 + self.measure_depth(child)?);
}
}
Ok(0)
}
#[cfg(feature = "std")]
fn verify_structure_helper(
&self,
page_number: PageNumber,
current_depth: u32,
expected_depth: u32,
key_cmp: Option<KeyComparator>,
corruptions: &mut Vec<CorruptPageInfo>,
) -> Result {
let page = self.mem.get_page(page_number)?;
let node_mem = page.memory();
match node_mem[0] {
LEAF => {
if current_depth != expected_depth {
corruptions.push(CorruptPageInfo {
page_number: u64::from_le_bytes(page_number.to_le_bytes()),
table_name: None,
description: format!(
"leaf at depth {current_depth}, expected {expected_depth}"
),
});
}
if let Some(cmp) = key_cmp {
let accessor =
LeafAccessor::new(node_mem, self.fixed_key_size, self.fixed_value_size)?;
let num = accessor.num_pairs();
for i in 1..num {
if let (Some(prev), Some(curr)) = (accessor.entry(i - 1), accessor.entry(i))
&& cmp(prev.key(), curr.key()) != core::cmp::Ordering::Less
{
corruptions.push(CorruptPageInfo {
page_number: u64::from_le_bytes(page_number.to_le_bytes()),
table_name: None,
description: format!(
"leaf keys not sorted at index {i} (key[{}] >= key[{i}])",
i - 1
),
});
break;
}
}
}
}
BRANCH => {
if current_depth >= expected_depth {
corruptions.push(CorruptPageInfo {
page_number: u64::from_le_bytes(page_number.to_le_bytes()),
table_name: None,
description: format!(
"branch at depth {current_depth}, expected leaf at depth {expected_depth}"
),
});
return Ok(());
}
let accessor = BranchAccessor::new(&page, self.fixed_key_size)?;
for i in 0..accessor.count_children() {
if let Some(child) = accessor.child_page(i) {
self.verify_structure_helper(
child,
current_depth + 1,
expected_depth,
key_cmp,
corruptions,
)?;
} else {
corruptions.push(CorruptPageInfo {
page_number: u64::from_le_bytes(page_number.to_le_bytes()),
table_name: None,
description: format!("branch child {i} has invalid page pointer"),
});
}
}
}
other => {
corruptions.push(CorruptPageInfo {
page_number: u64::from_le_bytes(page_number.to_le_bytes()),
table_name: None,
description: format!("unknown page type tag: {other}"),
});
}
}
Ok(())
}
}
pub(crate) struct Btree<K: Key + 'static, V: Value + 'static> {
mem: Arc<TransactionalMemory>,
cached_root: Option<PageImpl>,
transaction_guard: Arc<TransactionGuard>,
root: Option<BtreeHeader>,
hint: PageHint,
compression_override: Option<CompressionConfig>,
_key_type: PhantomData<K>,
_value_type: PhantomData<V>,
}
impl<K: Key, V: Value> Btree<K, V> {
pub(crate) fn new(
root: Option<BtreeHeader>,
hint: PageHint,
guard: Arc<TransactionGuard>,
mem: Arc<TransactionalMemory>,
) -> Result<Self> {
let cached_root = if let Some(header) = root {
Some(mem.get_page_extended(header.root, hint)?)
} else {
None
};
Ok(Self {
mem,
transaction_guard: guard,
cached_root,
root,
hint,
compression_override: None,
_key_type: Default::default(),
_value_type: Default::default(),
})
}
pub(crate) fn new_uncompressed(
root: Option<BtreeHeader>,
hint: PageHint,
guard: Arc<TransactionGuard>,
mem: Arc<TransactionalMemory>,
) -> Result<Self> {
let cached_root = if let Some(header) = root {
Some(mem.get_page_extended(header.root, hint)?)
} else {
None
};
Ok(Self {
mem,
transaction_guard: guard,
cached_root,
root,
hint,
compression_override: Some(CompressionConfig::None),
_key_type: Default::default(),
_value_type: Default::default(),
})
}
fn compression(&self) -> CompressionConfig {
self.compression_override
.unwrap_or_else(|| self.mem.compression())
}
fn value_width(&self) -> Option<usize> {
if self.compression().is_enabled() {
None
} else {
V::fixed_width()
}
}
fn maybe_verify_page(&self, page: &PageImpl, expected_checksum: Checksum) -> Result {
if expected_checksum == DEFERRED || !self.mem.should_verify_read() {
return Ok(());
}
let node_mem = page.memory();
let computed = match node_mem[0] {
LEAF => leaf_checksum(page, K::fixed_width(), self.value_width())?,
BRANCH => branch_checksum(page, K::fixed_width())?,
_ => {
return Err(crate::StorageError::Corrupted(alloc::string::String::from(
"Read verification: unknown page type",
)));
}
};
if computed != expected_checksum {
self.mem.on_verification_failure(page.get_page_number())?;
}
Ok(())
}
pub(crate) fn transaction_guard(&self) -> &Arc<TransactionGuard> {
&self.transaction_guard
}
pub(crate) fn get_root(&self) -> Option<BtreeHeader> {
self.root
}
pub(crate) fn verify_checksum(&self) -> Result<bool> {
RawBtree::new(
self.get_root(),
K::fixed_width(),
self.value_width(),
self.mem.clone(),
)
.verify_checksum()
}
#[cfg(feature = "std")]
pub(crate) fn verify_checksum_detailed(&self) -> Result<(u64, Vec<CorruptPageInfo>)> {
RawBtree::new(
self.get_root(),
K::fixed_width(),
self.value_width(),
self.mem.clone(),
)
.verify_checksum_detailed()
}
#[cfg(feature = "std")]
pub(crate) fn verify_structure(&self) -> Result<Vec<CorruptPageInfo>> {
RawBtree::new(
self.get_root(),
K::fixed_width(),
self.value_width(),
self.mem.clone(),
)
.verify_structure(Some(K::compare))
}
pub(crate) fn visit_all_pages<F>(&self, visitor: F) -> Result
where
F: FnMut(&PagePath) -> Result,
{
let tree = UntypedBtree::new(
self.root,
self.mem.clone(),
K::fixed_width(),
self.value_width(),
);
tree.visit_all_pages(visitor)
}
pub(crate) fn get(&self, key: &K::SelfType<'_>) -> Result<Option<AccessGuard<'static, V>>> {
if let Some(ref root_page) = self.cached_root {
let expected = self
.root
.ok_or_else(|| {
StorageError::Corrupted(String::from(
"cached_root present but btree header is None",
))
})?
.checksum;
self.get_helper(root_page.clone(), K::as_bytes(key).as_ref(), expected)
} else {
Ok(None)
}
}
fn get_helper(
&self,
page: PageImpl,
query: &[u8],
expected_checksum: Checksum,
) -> Result<Option<AccessGuard<'static, V>>> {
let node_mem = page.memory();
match node_mem[0] {
LEAF => {
self.maybe_verify_page(&page, expected_checksum)?;
let accessor =
LeafAccessor::new(page.memory(), K::fixed_width(), self.value_width())?;
if let Some(entry_index) = accessor.find_key::<K>(query) {
let (start, end) = accessor.value_range(entry_index).ok_or_else(|| {
StorageError::invalid_entry_index(page.get_page_number(), entry_index)
})?;
let guard = if self.compression().is_enabled() {
AccessGuard::with_page_decompress(page, start..end)?
} else {
AccessGuard::with_page(page, start..end)
};
Ok(Some(guard))
} else {
Ok(None)
}
}
BRANCH => {
self.maybe_verify_page(&page, expected_checksum)?;
let accessor = BranchAccessor::new(&page, K::fixed_width())?;
let (child_index, child_page_num) = accessor.child_for_key::<K>(query)?;
let child_checksum = accessor.child_checksum(child_index).ok_or_else(|| {
StorageError::invalid_child_checksum(page.get_page_number(), child_index)
})?;
self.get_helper(
self.mem.get_page_extended(child_page_num, self.hint)?,
query,
child_checksum,
)
}
x => Err(StorageError::invalid_page_type(page.get_page_number(), x)),
}
}
pub(crate) fn first(
&self,
) -> Result<Option<(AccessGuard<'static, K>, AccessGuard<'static, V>)>> {
if let Some(ref root) = self.cached_root {
let expected = self
.root
.ok_or_else(|| {
StorageError::Corrupted(String::from(
"cached_root present but btree header is None",
))
})?
.checksum;
self.first_helper(root.clone(), expected)
} else {
Ok(None)
}
}
fn first_helper(
&self,
page: PageImpl,
expected_checksum: Checksum,
) -> Result<Option<(AccessGuard<'static, K>, AccessGuard<'static, V>)>> {
let node_mem = page.memory();
match node_mem[0] {
LEAF => {
self.maybe_verify_page(&page, expected_checksum)?;
let accessor =
LeafAccessor::new(page.memory(), K::fixed_width(), self.value_width())?;
let (key_range, value_range) = accessor
.entry_ranges(0)
.ok_or_else(|| StorageError::invalid_entry_index(page.get_page_number(), 0))?;
let key_guard = AccessGuard::with_page(page.clone(), key_range);
let value_guard = if self.compression().is_enabled() {
AccessGuard::with_page_decompress(page, value_range)?
} else {
AccessGuard::with_page(page, value_range)
};
Ok(Some((key_guard, value_guard)))
}
BRANCH => {
self.maybe_verify_page(&page, expected_checksum)?;
let accessor = BranchAccessor::new(&page, K::fixed_width())?;
let child_page = accessor.child_page(0).ok_or_else(|| {
StorageError::invalid_child_pointer(page.get_page_number(), 0)
})?;
let child_checksum = accessor.child_checksum(0).ok_or_else(|| {
StorageError::invalid_child_checksum(page.get_page_number(), 0)
})?;
self.first_helper(
self.mem.get_page_extended(child_page, self.hint)?,
child_checksum,
)
}
x => Err(StorageError::invalid_page_type(page.get_page_number(), x)),
}
}
pub(crate) fn last(
&self,
) -> Result<Option<(AccessGuard<'static, K>, AccessGuard<'static, V>)>> {
if let Some(ref root) = self.cached_root {
let expected = self
.root
.ok_or_else(|| {
StorageError::Corrupted(String::from(
"cached_root present but btree header is None",
))
})?
.checksum;
self.last_helper(root.clone(), expected)
} else {
Ok(None)
}
}
fn last_helper(
&self,
page: PageImpl,
expected_checksum: Checksum,
) -> Result<Option<(AccessGuard<'static, K>, AccessGuard<'static, V>)>> {
let node_mem = page.memory();
match node_mem[0] {
LEAF => {
self.maybe_verify_page(&page, expected_checksum)?;
let accessor =
LeafAccessor::new(page.memory(), K::fixed_width(), self.value_width())?;
let last_index = accessor.num_pairs() - 1;
let (key_range, value_range) =
accessor.entry_ranges(last_index).ok_or_else(|| {
StorageError::invalid_entry_index(page.get_page_number(), last_index)
})?;
let key_guard = AccessGuard::with_page(page.clone(), key_range);
let value_guard = if self.compression().is_enabled() {
AccessGuard::with_page_decompress(page, value_range)?
} else {
AccessGuard::with_page(page, value_range)
};
Ok(Some((key_guard, value_guard)))
}
BRANCH => {
self.maybe_verify_page(&page, expected_checksum)?;
let accessor = BranchAccessor::new(&page, K::fixed_width())?;
let last_child = accessor.count_children() - 1;
let child_page = accessor.child_page(last_child).ok_or_else(|| {
StorageError::invalid_child_pointer(page.get_page_number(), last_child)
})?;
let child_checksum = accessor.child_checksum(last_child).ok_or_else(|| {
StorageError::invalid_child_checksum(page.get_page_number(), last_child)
})?;
self.last_helper(
self.mem.get_page_extended(child_page, self.hint)?,
child_checksum,
)
}
x => Err(StorageError::invalid_page_type(page.get_page_number(), x)),
}
}
pub(crate) fn range<'a0, T: RangeBounds<KR>, KR: Borrow<K::SelfType<'a0>>>(
&self,
range: &'_ T,
) -> Result<BtreeRangeIter<K, V>> {
BtreeRangeIter::new(
range,
self.root.map(|x| x.root),
self.root.map(|x| x.checksum),
self.value_width(),
self.mem.clone(),
self.compression().is_enabled(),
)
}
pub(crate) fn len(&self) -> Result<u64> {
Ok(self.root.map_or(0, |x| x.length))
}
pub(crate) fn stats(&self) -> Result<BtreeStats> {
btree_stats(
self.root.map(|x| x.root),
&self.mem,
K::fixed_width(),
self.value_width(),
)
}
#[allow(dead_code)]
#[cfg(feature = "std")]
pub(crate) fn print_debug(&self, include_values: bool) -> Result {
if let Some(p) = self.root.map(|x| x.root) {
let mut pages = vec![self.mem.get_page(p)?];
while !pages.is_empty() {
let mut next_children = vec![];
for page in pages.drain(..) {
let node_mem = page.memory();
match node_mem[0] {
LEAF => {
eprint!("Leaf[ (page={:?})", page.get_page_number());
LeafAccessor::new(page.memory(), K::fixed_width(), self.value_width())?
.print_node::<K, V>(include_values);
eprint!("]");
}
BRANCH => {
let accessor = BranchAccessor::new(&page, K::fixed_width())?;
for i in 0..accessor.count_children() {
let child = accessor.child_page(i).ok_or_else(|| {
StorageError::Corrupted(format!(
"invalid child page pointer at index {} on page {:?}",
i,
page.get_page_number()
))
})?;
next_children.push(self.mem.get_page(child)?);
}
accessor.print_node::<K>();
}
x => {
return Err(StorageError::Corrupted(format!(
"Invalid page type byte {} on page {:?}, expected LEAF ({}) or BRANCH ({})",
x,
page.get_page_number(),
LEAF,
BRANCH
)));
}
}
eprint!(" ");
}
eprintln!();
pages = next_children;
}
}
Ok(())
}
}
impl<K: Key, V: Value> Drop for Btree<K, V> {
fn drop(&mut self) {
self.cached_root = None;
}
}
pub(crate) fn btree_stats(
root: Option<PageNumber>,
mem: &TransactionalMemory,
fixed_key_size: Option<usize>,
fixed_value_size: Option<usize>,
) -> Result<BtreeStats> {
if let Some(root) = root {
stats_helper(root, mem, fixed_key_size, fixed_value_size, 0)
} else {
Ok(BtreeStats {
tree_height: 0,
leaf_pages: 0,
branch_pages: 0,
stored_leaf_bytes: 0,
metadata_bytes: 0,
fragmented_bytes: 0,
})
}
}
fn stats_helper(
page_number: PageNumber,
mem: &TransactionalMemory,
fixed_key_size: Option<usize>,
fixed_value_size: Option<usize>,
depth: u32,
) -> Result<BtreeStats> {
if depth > MAX_TREE_DEPTH {
return Err(StorageError::Corrupted(format!(
"B-tree depth {depth} exceeds maximum {MAX_TREE_DEPTH}"
)));
}
let page = mem.get_page(page_number)?;
let node_mem = page.memory();
match node_mem[0] {
LEAF => {
let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size)?;
let leaf_bytes = accessor.length_of_pairs(0, accessor.num_pairs());
let overhead_bytes = accessor.total_length().saturating_sub(leaf_bytes);
let fragmented_bytes =
page.memory().len().saturating_sub(accessor.total_length()) as u64;
let stored_leaf_bytes: u64 = leaf_bytes.try_into().map_err(|_| {
StorageError::page_corrupted(page_number, "leaf bytes overflows u64")
})?;
let metadata_bytes: u64 = overhead_bytes.try_into().map_err(|_| {
StorageError::page_corrupted(page_number, "overhead bytes overflows u64")
})?;
Ok(BtreeStats {
tree_height: 1,
leaf_pages: 1,
branch_pages: 0,
stored_leaf_bytes,
metadata_bytes,
fragmented_bytes,
})
}
BRANCH => {
let accessor = BranchAccessor::new(&page, fixed_key_size)?;
let mut max_child_height = 0;
let mut leaf_pages = 0;
let mut branch_pages = 1;
let mut stored_leaf_bytes = 0;
let mut metadata_bytes = accessor.total_length() as u64;
let mut fragmented_bytes =
page.memory().len().saturating_sub(accessor.total_length()) as u64;
for i in 0..accessor.count_children() {
if let Some(child) = accessor.child_page(i) {
let stats =
stats_helper(child, mem, fixed_key_size, fixed_value_size, depth + 1)?;
max_child_height = max(max_child_height, stats.tree_height);
leaf_pages += stats.leaf_pages;
branch_pages += stats.branch_pages;
stored_leaf_bytes += stats.stored_leaf_bytes;
metadata_bytes += stats.metadata_bytes;
fragmented_bytes += stats.fragmented_bytes;
}
}
Ok(BtreeStats {
tree_height: max_child_height + 1,
leaf_pages,
branch_pages,
stored_leaf_bytes,
metadata_bytes,
fragmented_bytes,
})
}
x => Err(StorageError::invalid_page_type(page.get_page_number(), x)),
}
}