use std::{
collections::HashMap,
convert::Infallible,
fmt::{Debug, Display},
marker::PhantomData,
ops::{Bound, RangeBounds},
};
use byteorder::{ReadBytesExt, WriteBytesExt};
use super::{
interior::Interior,
key_entry::KeyEntry,
modify::{Modification, Operation},
serialization::BinarySerialization,
versioned::ChangeResult,
KeyRange, PagedWriter,
};
use crate::{
chunk_cache::CacheEntry,
error::Error,
io::File,
tree::{key_entry::ValueIndex, read_chunk, versioned::Children, ScanEvaluation},
vault::AnyVault,
AbortError, ArcBytes, ChunkCache, ErrorKind,
};
#[derive(Clone, Debug)]
pub struct BTreeEntry<Index, ReducedIndex> {
pub dirty: bool,
pub node: BTreeNode<Index, ReducedIndex>,
}
#[derive(Clone, Debug)]
pub enum BTreeNode<Index, ReducedIndex> {
Uninitialized,
Leaf(Vec<KeyEntry<Index>>),
Interior(Vec<Interior<Index, ReducedIndex>>),
}
impl<Index, ReducedIndex> From<BTreeNode<Index, ReducedIndex>> for BTreeEntry<Index, ReducedIndex> {
fn from(node: BTreeNode<Index, ReducedIndex>) -> Self {
Self { node, dirty: true }
}
}
impl<Index, ReducedIndex> Default for BTreeEntry<Index, ReducedIndex> {
fn default() -> Self {
Self::from(BTreeNode::Leaf(Vec::new()))
}
}
pub trait Reducer<Index> {
fn reduce<'a, Indexes, IndexesIter>(indexes: Indexes) -> Self
where
Index: 'a,
Indexes: IntoIterator<Item = &'a Index, IntoIter = IndexesIter> + ExactSizeIterator,
IndexesIter: Iterator<Item = &'a Index> + ExactSizeIterator + Clone;
fn rereduce<'a, ReducedIndexes, ReducedIndexesIter>(values: ReducedIndexes) -> Self
where
Self: 'a,
ReducedIndexes:
IntoIterator<Item = &'a Self, IntoIter = ReducedIndexesIter> + ExactSizeIterator,
ReducedIndexesIter: Iterator<Item = &'a Self> + ExactSizeIterator + Clone;
}
impl<Index> Reducer<Index> for () {
fn reduce<'a, Indexes, IndexesIter>(_indexes: Indexes) -> Self
where
Index: 'a,
Indexes: IntoIterator<Item = &'a Index, IntoIter = IndexesIter> + ExactSizeIterator,
IndexesIter: Iterator<Item = &'a Index> + ExactSizeIterator + Clone,
{
}
fn rereduce<'a, ReducedIndexes, ReducedIndexesIter>(_values: ReducedIndexes) -> Self
where
Self: 'a,
ReducedIndexes:
IntoIterator<Item = &'a Self, IntoIter = ReducedIndexesIter> + ExactSizeIterator,
ReducedIndexesIter: Iterator<Item = &'a Self> + ExactSizeIterator + Clone,
{
}
}
pub struct ModificationContext<IndexedType, Index, Context, Indexer, Loader>
where
Indexer: Fn(
&ArcBytes<'_>,
Option<&IndexedType>,
Option<&Index>,
&mut Context,
&mut PagedWriter<'_>,
) -> Result<KeyOperation<Index>, Error>,
Loader: Fn(&Index, &mut PagedWriter<'_>) -> Result<Option<IndexedType>, Error>,
{
pub current_order: usize,
pub minimum_children: usize,
pub indexer: Indexer,
pub loader: Loader,
pub _phantom: PhantomData<(IndexedType, Index, Context)>,
}
#[cfg(any(debug_assertions, feature = "paranoid"))]
macro_rules! assert_children_order {
($children:expr) => {
assert_eq!(
$children
.windows(2)
.find_map(|w| (w[0].key > w[1].key).then(|| (&w[0].key, &w[1].key))),
None
);
};
}
#[cfg(not(any(debug_assertions, feature = "paranoid")))]
macro_rules! assert_children_order {
($children:expr) => {};
}
impl<Index, ReducedIndex> BTreeEntry<Index, ReducedIndex>
where
Index: ValueIndex + Clone + BinarySerialization + Debug + 'static,
ReducedIndex: Clone + Reducer<Index> + BinarySerialization + Debug + 'static,
{
pub(crate) fn modify<IndexedType, Context, Indexer, Loader>(
&mut self,
modification: &mut Modification<'_, IndexedType>,
context: &ModificationContext<IndexedType, Index, Context, Indexer, Loader>,
max_key: Option<&ArcBytes<'_>>,
changes: &mut Context,
writer: &mut PagedWriter<'_>,
) -> Result<ChangeResult, Error>
where
Indexer: Fn(
&ArcBytes<'_>,
Option<&IndexedType>,
Option<&Index>,
&mut Context,
&mut PagedWriter<'_>,
) -> Result<KeyOperation<Index>, Error>,
Loader: Fn(&Index, &mut PagedWriter<'_>) -> Result<Option<IndexedType>, Error>,
{
match &mut self.node {
BTreeNode::Leaf(children) => {
if Self::modify_leaf(children, modification, context, max_key, changes, writer)? {
self.dirty = true;
Ok(Self::clean_up_leaf(
children,
context.current_order,
context.minimum_children,
))
} else {
Ok(ChangeResult::Unchanged)
}
}
BTreeNode::Interior(children) => {
match Self::modify_interior(
children,
modification,
context,
max_key,
changes,
writer,
)? {
ChangeResult::Changed => {
self.dirty = true;
Ok(Self::clean_up_interior(
children,
context.current_order,
context.minimum_children,
))
}
other => Ok(other),
}
}
BTreeNode::Uninitialized => unreachable!(),
}
}
fn clean_up_leaf(
children: &mut [KeyEntry<Index>],
current_order: usize,
minimum_children: usize,
) -> ChangeResult {
let child_count = children.len();
assert_children_order!(children);
if child_count > current_order {
ChangeResult::Split
} else if child_count == 0 {
ChangeResult::Remove
} else if child_count < minimum_children {
ChangeResult::Absorb
} else {
ChangeResult::Changed
}
}
fn clean_up_interior(
children: &mut [Interior<Index, ReducedIndex>],
current_order: usize,
minimum_children: usize,
) -> ChangeResult {
let child_count = children.len();
assert_children_order!(children);
if child_count > current_order {
ChangeResult::Split
} else if child_count == 0 {
ChangeResult::Remove
} else if child_count < minimum_children {
ChangeResult::Absorb
} else {
ChangeResult::Changed
}
}
#[allow(clippy::too_many_lines)] fn modify_leaf<IndexedType, Context, Indexer, Loader>(
children: &mut Vec<KeyEntry<Index>>,
modification: &mut Modification<'_, IndexedType>,
context: &ModificationContext<IndexedType, Index, Context, Indexer, Loader>,
max_key: Option<&ArcBytes<'_>>,
changes: &mut Context,
writer: &mut PagedWriter<'_>,
) -> Result<bool, Error>
where
Indexer: Fn(
&ArcBytes<'_>,
Option<&IndexedType>,
Option<&Index>,
&mut Context,
&mut PagedWriter<'_>,
) -> Result<KeyOperation<Index>, Error>,
Loader: Fn(&Index, &mut PagedWriter<'_>) -> Result<Option<IndexedType>, Error>,
{
let mut last_index = 0;
let mut any_changes = false;
let max_len = children.len().max(context.current_order);
while !modification.keys.is_empty() && children.len() <= max_len {
let key = modification.keys.last().unwrap();
if max_key.map(|max_key| key > max_key).unwrap_or_default() {
break;
}
let search_result = children[last_index..].binary_search_by(|child| child.key.cmp(key));
match search_result {
Ok(matching_index) => {
let key = modification.keys.pop().unwrap();
last_index += matching_index;
let index = match &mut modification.operation {
Operation::Set(value) => (context.indexer)(
&key,
Some(value),
Some(&children[last_index].index),
changes,
writer,
)?,
Operation::SetEach(values) => (context.indexer)(
&key,
Some(&values.pop().ok_or_else(|| {
ErrorKind::message("need the same number of keys as values")
})?),
Some(&children[last_index].index),
changes,
writer,
)?,
Operation::Remove => (context.indexer)(
&key,
None,
Some(&children[last_index].index),
changes,
writer,
)?,
Operation::CompareSwap(callback) => {
let current_index = &children[last_index].index;
let existing_value = (context.loader)(current_index, writer)?;
match callback(&key, existing_value) {
KeyOperation::Skip => KeyOperation::Skip,
KeyOperation::Set(new_value) => (context.indexer)(
&key,
Some(&new_value),
Some(current_index),
changes,
writer,
)?,
KeyOperation::Remove => (context.indexer)(
&key,
None,
Some(current_index),
changes,
writer,
)?,
}
}
};
match index {
KeyOperation::Skip => {}
KeyOperation::Set(index) => {
children[last_index] = KeyEntry {
key: key.into_owned(),
index,
};
any_changes = true;
}
KeyOperation::Remove => {
children.remove(last_index);
any_changes = true;
}
}
}
Err(insert_at) => {
last_index += insert_at;
let key = modification.keys.pop().unwrap();
let operation = match &mut modification.operation {
Operation::Set(new_value) => {
(context.indexer)(&key, Some(new_value), None, changes, writer)?
}
Operation::SetEach(new_values) => (context.indexer)(
&key,
Some(&new_values.pop().ok_or_else(|| {
ErrorKind::message("need the same number of keys as values")
})?),
None,
changes,
writer,
)?,
Operation::Remove => {
KeyOperation::Remove
}
Operation::CompareSwap(callback) => match callback(&key, None) {
KeyOperation::Skip => KeyOperation::Skip,
KeyOperation::Set(new_value) => {
(context.indexer)(&key, Some(&new_value), None, changes, writer)?
}
KeyOperation::Remove => {
(context.indexer)(&key, None, None, changes, writer)?
}
},
};
match operation {
KeyOperation::Set(index) => {
if children.capacity() < children.len() + 1
&& context.current_order > children.len()
{
children.reserve(context.current_order - children.len());
}
children.insert(
last_index,
KeyEntry {
key: key.into_owned(),
index,
},
);
any_changes = true;
}
KeyOperation::Skip | KeyOperation::Remove => {}
}
}
}
assert_children_order!(children);
}
Ok(any_changes)
}
fn modify_interior<IndexedType, Context, Indexer, Loader>(
children: &mut Vec<Interior<Index, ReducedIndex>>,
modification: &mut Modification<'_, IndexedType>,
context: &ModificationContext<IndexedType, Index, Context, Indexer, Loader>,
max_key: Option<&ArcBytes<'_>>,
changes: &mut Context,
writer: &mut PagedWriter<'_>,
) -> Result<ChangeResult, Error>
where
Indexer: Fn(
&ArcBytes<'_>,
Option<&IndexedType>,
Option<&Index>,
&mut Context,
&mut PagedWriter<'_>,
) -> Result<KeyOperation<Index>, Error>,
Loader: Fn(&Index, &mut PagedWriter<'_>) -> Result<Option<IndexedType>, Error>,
{
let mut last_index = 0;
let mut any_changes = false;
while let Some(key) = modification.keys.last().cloned() {
if last_index >= children.len()
|| max_key.map(|max_key| &key > max_key).unwrap_or_default()
{
break;
}
let (containing_node_index, pushing_end) = children[last_index..]
.binary_search_by(|child| child.key.cmp(&key))
.map_or_else(
|not_found| {
if not_found > 0 && not_found + last_index == children.len() {
(not_found - 1, true)
} else {
(not_found, false)
}
},
|found| (found, false),
);
last_index += containing_node_index;
let child = &mut children[last_index];
child.position.load(
writer.file,
false,
writer.vault,
writer.cache,
Some(context.current_order),
)?;
let child_entry = child.position.get_mut().unwrap();
let max_key_for_modification = if pushing_end {
key
} else if let Some(max_key) = max_key {
child.key.clone().min(max_key.clone())
} else {
child.key.clone()
};
let (change_result, should_backup) = Self::process_interior_change_result(
child_entry.modify(
modification,
context,
Some(&max_key_for_modification),
changes,
writer,
)?,
last_index,
children,
context,
writer,
)?;
assert_children_order!(children);
match change_result {
ChangeResult::Unchanged => {}
ChangeResult::Split => unreachable!(),
ChangeResult::Absorb => {
return Ok(ChangeResult::Absorb);
}
ChangeResult::Remove => {
any_changes = true;
}
ChangeResult::Changed => any_changes = true,
}
if should_backup && last_index > 0 {
last_index -= 1;
}
}
Ok(if any_changes {
ChangeResult::Changed
} else {
ChangeResult::Unchanged
})
}
fn process_interior_change_result<IndexedType, Context, Indexer, Loader>(
result: ChangeResult,
child_index: usize,
children: &mut Vec<Interior<Index, ReducedIndex>>,
context: &ModificationContext<IndexedType, Index, Context, Indexer, Loader>,
writer: &mut PagedWriter<'_>,
) -> Result<(ChangeResult, bool), Error>
where
Indexer: Fn(
&ArcBytes<'_>,
Option<&IndexedType>,
Option<&Index>,
&mut Context,
&mut PagedWriter<'_>,
) -> Result<KeyOperation<Index>, Error>,
Loader: Fn(&Index, &mut PagedWriter<'_>) -> Result<Option<IndexedType>, Error>,
{
let can_absorb = children.len() > 1;
match (result, can_absorb) {
(ChangeResult::Unchanged, _) => Ok((ChangeResult::Unchanged, false)),
(ChangeResult::Changed, _) | (ChangeResult::Absorb, false) => {
let child = &mut children[child_index];
let child_entry = child.position.get_mut().unwrap();
child.key = child_entry.max_key().clone();
child.stats = child_entry.stats();
Ok((ChangeResult::Changed, result == ChangeResult::Absorb))
}
(ChangeResult::Split, _) => {
Self::process_interior_split(child_index, children, context, writer)
}
(ChangeResult::Absorb, true) => {
Self::process_absorb(child_index, children, context, writer)
}
(ChangeResult::Remove, _) => {
children.remove(child_index);
Ok((ChangeResult::Changed, true))
}
}
}
fn process_absorb<IndexedType, Context, Indexer, Loader>(
child_index: usize,
children: &mut Vec<Interior<Index, ReducedIndex>>,
context: &ModificationContext<IndexedType, Index, Context, Indexer, Loader>,
writer: &mut PagedWriter<'_>,
) -> Result<(ChangeResult, bool), Error>
where
Indexer: Fn(
&ArcBytes<'_>,
Option<&IndexedType>,
Option<&Index>,
&mut Context,
&mut PagedWriter<'_>,
) -> Result<KeyOperation<Index>, Error>,
Loader: Fn(&Index, &mut PagedWriter<'_>) -> Result<Option<IndexedType>, Error>,
{
let (insert_on_top, sponge_index) = if child_index > 0 {
(true, child_index - 1)
} else {
(false, child_index)
};
if sponge_index < children.len() - 1 {
let mut removed_child = children.remove(child_index);
let sponge = children.get_mut(sponge_index).unwrap();
let removed_child = removed_child.position.get_mut().unwrap();
let leaves = match &mut removed_child.node {
BTreeNode::Leaf(leaves) => Children::Leaves(std::mem::take(leaves)),
BTreeNode::Interior(interiors) => Children::Interiors(std::mem::take(interiors)),
BTreeNode::Uninitialized => unreachable!(),
};
sponge.position.load(
writer.file,
false,
writer.vault,
writer.cache,
Some(context.current_order),
)?;
let sponge_entry = sponge.position.get_mut().unwrap();
match Self::process_interior_change_result(
sponge_entry.absorb(
leaves,
insert_on_top,
context.current_order,
context.minimum_children,
writer,
)?,
sponge_index,
children,
context,
writer,
)? {
(ChangeResult::Absorb | ChangeResult::Split | ChangeResult::Unchanged, _) => {
unreachable!()
}
(ChangeResult::Remove, _) => Ok((ChangeResult::Remove, true)),
(ChangeResult::Changed, _) => Ok((ChangeResult::Changed, true)),
}
} else {
Ok((ChangeResult::Unchanged, false))
}
}
fn process_interior_split<IndexedType, Context, Indexer, Loader>(
child_index: usize,
children: &mut Vec<Interior<Index, ReducedIndex>>,
context: &ModificationContext<IndexedType, Index, Context, Indexer, Loader>,
writer: &mut PagedWriter<'_>,
) -> Result<(ChangeResult, bool), Error>
where
Indexer: Fn(
&ArcBytes<'_>,
Option<&IndexedType>,
Option<&Index>,
&mut Context,
&mut PagedWriter<'_>,
) -> Result<KeyOperation<Index>, Error>,
Loader: Fn(&Index, &mut PagedWriter<'_>) -> Result<Option<IndexedType>, Error>,
{
let mut should_backup = false;
if child_index > 0 {
match Self::steal_children_from_start(child_index, children, context, writer)? {
(ChangeResult::Unchanged, steal_results_in_should_backup) => {
should_backup = steal_results_in_should_backup;
}
(ChangeResult::Changed, should_backup) => {
return Ok((ChangeResult::Changed, should_backup))
}
_ => unreachable!(),
}
}
if child_index + 1 < children.len() {
match Self::steal_children_from_end(child_index, children, context, writer)? {
ChangeResult::Unchanged => {}
ChangeResult::Changed => return Ok((ChangeResult::Changed, should_backup)),
_ => unreachable!(),
}
}
let child = children[child_index].position.get_mut().unwrap();
child.dirty = true;
let next_node = match &mut child.node {
BTreeNode::Leaf(children) => {
let upper = children
.splice((children.len() + 1) / 2.., std::iter::empty())
.collect::<Vec<_>>();
Self::from(BTreeNode::Leaf(upper))
}
BTreeNode::Interior(children) => {
let upper = children
.splice((children.len() + 1) / 2.., std::iter::empty())
.collect::<Vec<_>>();
Self::from(BTreeNode::Interior(upper))
}
BTreeNode::Uninitialized => unimplemented!(),
};
let (max_key, stats) = { (child.max_key().clone(), child.stats()) };
children[child_index].key = max_key;
children[child_index].stats = stats;
children.insert(child_index + 1, Interior::from(next_node));
Ok((ChangeResult::Changed, should_backup))
}
fn steal_children_from_start<IndexedType, Context, Indexer, Loader>(
child_index: usize,
children: &mut [Interior<Index, ReducedIndex>],
context: &ModificationContext<IndexedType, Index, Context, Indexer, Loader>,
writer: &mut PagedWriter<'_>,
) -> Result<(ChangeResult, bool), Error>
where
Indexer: Fn(
&ArcBytes<'_>,
Option<&IndexedType>,
Option<&Index>,
&mut Context,
&mut PagedWriter<'_>,
) -> Result<KeyOperation<Index>, Error>,
Loader: Fn(&Index, &mut PagedWriter<'_>) -> Result<Option<IndexedType>, Error>,
{
let mut should_backup = false;
children[child_index - 1].position.load(
writer.file,
false,
writer.vault,
writer.cache,
Some(context.current_order),
)?;
let previous_child_count = children[child_index - 1].position.get().unwrap().count();
if let Some(free_space) = context.current_order.checked_sub(previous_child_count) {
if free_space > 0 {
should_backup = true;
let stolen_children =
match &mut children[child_index].position.get_mut().unwrap().node {
BTreeNode::Leaf(children) => {
let eligible_amount =
children.len().saturating_sub(context.minimum_children);
let amount_to_steal = free_space.min(eligible_amount);
Children::Leaves(
children
.splice(0..amount_to_steal, std::iter::empty())
.collect(),
)
}
BTreeNode::Interior(children) => {
let eligible_amount =
children.len().saturating_sub(context.minimum_children);
let amount_to_steal = free_space.max(eligible_amount);
Children::Interiors(
children
.splice(0..amount_to_steal, std::iter::empty())
.collect(),
)
}
BTreeNode::Uninitialized => unreachable!(),
};
let previous_child = children[child_index - 1].position.get_mut().unwrap();
match (&mut previous_child.node, stolen_children) {
(BTreeNode::Leaf(children), Children::Leaves(new_entries)) => {
children.extend(new_entries);
}
(BTreeNode::Interior(children), Children::Interiors(new_entries)) => {
children.extend(new_entries);
}
_ => unreachable!(),
}
previous_child.dirty = true;
let (max_key, stats) =
{ (previous_child.max_key().clone(), previous_child.stats()) };
children[child_index - 1].key = max_key;
children[child_index - 1].stats = stats;
let child = children[child_index].position.get_mut().unwrap();
child.dirty = true;
let (max_key, stats) = { (child.max_key().clone(), child.stats()) };
children[child_index].key = max_key;
children[child_index].stats = stats;
if child.count() <= context.current_order {
return Ok((ChangeResult::Changed, should_backup));
}
}
}
Ok((ChangeResult::Unchanged, should_backup))
}
fn steal_children_from_end<IndexedType, Context, Indexer, Loader>(
child_index: usize,
children: &mut [Interior<Index, ReducedIndex>],
context: &ModificationContext<IndexedType, Index, Context, Indexer, Loader>,
writer: &mut PagedWriter<'_>,
) -> Result<ChangeResult, Error>
where
Indexer: Fn(
&ArcBytes<'_>,
Option<&IndexedType>,
Option<&Index>,
&mut Context,
&mut PagedWriter<'_>,
) -> Result<KeyOperation<Index>, Error>,
Loader: Fn(&Index, &mut PagedWriter<'_>) -> Result<Option<IndexedType>, Error>,
{
children[child_index + 1].position.load(
writer.file,
false,
writer.vault,
writer.cache,
Some(context.current_order),
)?;
let next_child_count = children[child_index + 1].position.get().unwrap().count();
if let Some(free_space) = context.current_order.checked_sub(next_child_count) {
if free_space > 0 {
let stolen_children =
match &mut children[child_index].position.get_mut().unwrap().node {
BTreeNode::Leaf(children) => {
let eligible_amount =
children.len().saturating_sub(context.minimum_children);
let amount_to_steal = free_space.min(eligible_amount);
Children::Leaves(
children
.splice(children.len() - amount_to_steal.., std::iter::empty())
.collect(),
)
}
BTreeNode::Interior(children) => {
let eligible_amount =
children.len().saturating_sub(context.minimum_children);
let amount_to_steal = free_space.max(eligible_amount);
Children::Interiors(
children
.splice(children.len() - amount_to_steal.., std::iter::empty())
.collect(),
)
}
BTreeNode::Uninitialized => unreachable!(),
};
let next_child = children[child_index + 1].position.get_mut().unwrap();
match (&mut next_child.node, stolen_children) {
(BTreeNode::Leaf(children), Children::Leaves(new_entries)) => {
children.splice(0..0, new_entries);
}
(BTreeNode::Interior(children), Children::Interiors(new_entries)) => {
children.splice(0..0, new_entries);
}
_ => unreachable!(),
}
next_child.dirty = true;
let (max_key, stats) = { (next_child.max_key().clone(), next_child.stats()) };
children[child_index + 1].key = max_key;
children[child_index + 1].stats = stats;
let child = children[child_index].position.get_mut().unwrap();
child.dirty = true;
let (max_key, stats) = { (child.max_key().clone(), child.stats()) };
children[child_index].key = max_key;
children[child_index].stats = stats;
if child.count() <= context.current_order {
return Ok(ChangeResult::Changed);
}
}
}
Ok(ChangeResult::Unchanged)
}
fn count(&self) -> usize {
match &self.node {
BTreeNode::Uninitialized => unreachable!(),
BTreeNode::Leaf(children) => children.len(),
BTreeNode::Interior(children) => children.len(),
}
}
fn absorb(
&mut self,
children: Children<Index, ReducedIndex>,
insert_at_top: bool,
current_order: usize,
minimum_children: usize,
writer: &mut PagedWriter<'_>,
) -> Result<ChangeResult, Error> {
self.dirty = true;
match (&mut self.node, children) {
(BTreeNode::Leaf(existing_children), Children::Leaves(leaves)) => {
if insert_at_top {
existing_children.extend(leaves);
} else {
existing_children.splice(0..0, leaves);
}
Ok(Self::clean_up_leaf(
existing_children,
current_order,
minimum_children,
))
}
(BTreeNode::Interior(existing_children), Children::Leaves(leaves)) => {
let sponge = if insert_at_top {
existing_children.last_mut().unwrap()
} else {
existing_children.first_mut().unwrap()
};
sponge.position.load(
writer.file,
false,
writer.vault,
writer.cache,
Some(current_order),
)?;
let sponge = sponge.position.get_mut().unwrap();
sponge.absorb(
Children::Leaves(leaves),
insert_at_top,
current_order,
minimum_children,
writer,
)
}
(BTreeNode::Interior(existing_children), Children::Interiors(interiors)) => {
if insert_at_top {
existing_children.extend(interiors);
} else {
existing_children.splice(0..0, interiors);
}
Ok(Self::clean_up_interior(
existing_children,
current_order,
minimum_children,
))
}
(BTreeNode::Leaf(_), Children::Interiors(_)) | (BTreeNode::Uninitialized, _) => {
unreachable!()
}
}
}
pub(crate) fn split(
&mut self,
) -> (Interior<Index, ReducedIndex>, Interior<Index, ReducedIndex>) {
let mut old_node = Self::from(BTreeNode::Uninitialized);
std::mem::swap(self, &mut old_node);
match old_node.node {
BTreeNode::Leaf(mut children) => {
let upper = children
.splice((children.len() + 1) / 2.., std::iter::empty())
.collect::<Vec<_>>();
let lower = Self::from(BTreeNode::Leaf(children));
let upper = Self::from(BTreeNode::Leaf(upper));
(Interior::from(lower), Interior::from(upper))
}
BTreeNode::Interior(mut children) => {
let upper = children
.splice((children.len() + 1) / 2.., std::iter::empty())
.collect::<Vec<_>>();
let lower = Self::from(BTreeNode::Interior(children));
let upper = Self::from(BTreeNode::Interior(upper));
(Interior::from(lower), Interior::from(upper))
}
BTreeNode::Uninitialized => unreachable!(),
}
}
pub(crate) fn split_root(&mut self) {
let (lower, upper) = self.split();
self.node = BTreeNode::Interior(vec![lower, upper]);
}
#[must_use]
pub fn stats(&self) -> ReducedIndex {
match &self.node {
BTreeNode::Leaf(children) => ReducedIndex::reduce(children.iter().map(|c| &c.index)),
BTreeNode::Interior(children) => {
ReducedIndex::rereduce(children.iter().map(|c| &c.stats))
}
BTreeNode::Uninitialized => unreachable!(),
}
}
#[must_use]
#[allow(clippy::missing_panics_doc)]
pub fn max_key(&self) -> &ArcBytes<'static> {
match &self.node {
BTreeNode::Leaf(children) => &children.last().unwrap().key,
BTreeNode::Interior(children) => &children.last().unwrap().key,
BTreeNode::Uninitialized => unreachable!(),
}
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(skip(self, args, file, vault, cache))
)]
pub(crate) fn scan<
'k,
'keys,
CallerError: Display + Debug,
KeyRangeBounds,
NodeEvaluator,
KeyEvaluator,
ScanDataCallback,
>(
&self,
range: &'keys KeyRangeBounds,
args: &mut ScanArgs<
Index,
ReducedIndex,
CallerError,
NodeEvaluator,
KeyEvaluator,
ScanDataCallback,
>,
file: &mut dyn File,
vault: Option<&dyn AnyVault>,
cache: Option<&ChunkCache>,
current_depth: usize,
) -> Result<bool, AbortError<CallerError>>
where
NodeEvaluator: FnMut(&ArcBytes<'static>, &ReducedIndex, usize) -> ScanEvaluation,
KeyEvaluator: FnMut(&ArcBytes<'static>, &Index) -> ScanEvaluation,
KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized,
ScanDataCallback: FnMut(
ArcBytes<'static>,
&Index,
ArcBytes<'static>,
) -> Result<(), AbortError<CallerError>>,
{
match &self.node {
BTreeNode::Leaf(children) => {
for child in DirectionalSliceIterator::new(args.forwards, children) {
if range.contains(&child.key.as_slice()) {
match (args.key_evaluator)(&child.key, &child.index) {
ScanEvaluation::ReadData => {
if child.index.position() > 0 {
let data = match read_chunk(
child.index.position(),
false,
file,
vault,
cache,
)? {
CacheEntry::ArcBytes(contents) => contents,
CacheEntry::Decoded(_) => unreachable!(),
};
(args.data_callback)(child.key.clone(), &child.index, data)?;
}
}
ScanEvaluation::Skip => {}
ScanEvaluation::Stop => return Ok(false),
};
}
}
}
BTreeNode::Interior(children) => {
for (index, child) in
DirectionalSliceIterator::new(args.forwards, children).enumerate()
{
let start_bound = range.start_bound();
let end_bound = range.end_bound();
if args.forwards {
if index > 0 {
let previous_entry = &children[index - 1];
match end_bound {
Bound::Included(key) => {
if previous_entry.key > **key {
break;
}
}
Bound::Excluded(key) => {
if &previous_entry.key >= key {
break;
}
}
Bound::Unbounded => {}
}
}
} else {
}
match start_bound {
Bound::Included(key) => {
if child.key < **key {
continue;
}
}
Bound::Excluded(key) => {
if &child.key <= key {
continue;
}
}
Bound::Unbounded => {}
}
let keep_scanning =
match (args.node_evaluator)(&child.key, &child.stats, current_depth) {
ScanEvaluation::Stop => false,
ScanEvaluation::ReadData => child.position.map_loaded_entry(
file,
vault,
cache,
Some(children.len()),
|entry, file| {
entry.scan(range, args, file, vault, cache, current_depth + 1)
},
)?,
ScanEvaluation::Skip => true,
};
if !keep_scanning {
return Ok(false);
}
}
}
BTreeNode::Uninitialized => unreachable!(),
}
Ok(true)
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(skip(self, key_evaluator, keys, key_reader, file, vault, cache))
)]
pub(crate) fn get<'keys, KeyEvaluator, KeyReader, Keys>(
&self,
keys: &mut KeyRange<'keys, Keys>,
key_evaluator: &mut KeyEvaluator,
key_reader: &mut KeyReader,
file: &mut dyn File,
vault: Option<&dyn AnyVault>,
cache: Option<&ChunkCache>,
) -> Result<bool, Error>
where
KeyEvaluator: FnMut(&ArcBytes<'static>, &Index) -> ScanEvaluation,
KeyReader: FnMut(ArcBytes<'static>, &Index) -> Result<(), AbortError<Infallible>>,
Keys: Iterator<Item = &'keys [u8]>,
{
match &self.node {
BTreeNode::Leaf(children) => {
let mut last_index = 0;
let mut took_one_key = false;
while let Some(key) = keys.current_key() {
match children[last_index..].binary_search_by(|child| (&*child.key).cmp(key)) {
Ok(matching) => {
took_one_key = true;
keys.next();
last_index += matching;
let entry = &children[last_index];
match key_evaluator(&entry.key, &entry.index) {
ScanEvaluation::ReadData => {
key_reader(entry.key.clone(), &entry.index)?;
}
ScanEvaluation::Skip => {}
ScanEvaluation::Stop => return Ok(false),
}
}
Err(location) => {
last_index += location;
if last_index == children.len() && took_one_key {
break;
}
took_one_key = true;
keys.next();
}
}
}
}
BTreeNode::Interior(children) => {
let mut last_index = 0;
while let Some(key) = keys.current_key() {
let containing_node_index = children[last_index..]
.binary_search_by(|child| (&*child.key).cmp(key))
.unwrap_or_else(|not_found| not_found);
last_index += containing_node_index;
if let Some(child) = children.get(last_index) {
let keep_scanning = child.position.map_loaded_entry(
file,
vault,
cache,
Some(children.len()),
|entry, file| {
entry
.get(keys, key_evaluator, key_reader, file, vault, cache)
.map_err(AbortError::Nebari)
},
)?;
if !keep_scanning {
break;
}
last_index += 1;
} else {
break;
}
}
}
BTreeNode::Uninitialized => unreachable!(),
}
Ok(true)
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn copy_data_to<Callback>(
&mut self,
include_nodes: NodeInclusion,
file: &mut dyn File,
copied_chunks: &mut HashMap<u64, u64>,
writer: &mut PagedWriter<'_>,
vault: Option<&dyn AnyVault>,
scratch: &mut Vec<u8>,
index_callback: &mut Callback,
) -> Result<bool, Error>
where
Callback: FnMut(
&ArcBytes<'static>,
&mut Index,
&mut dyn File,
&mut HashMap<u64, u64>,
&mut PagedWriter<'_>,
Option<&dyn AnyVault>,
) -> Result<bool, Error>,
{
let mut any_changes = false;
match &mut self.node {
BTreeNode::Leaf(children) => {
for child in children {
any_changes =
child.copy_data_to(file, copied_chunks, writer, vault, index_callback)?
|| any_changes;
}
}
BTreeNode::Interior(children) => {
for child in children {
any_changes = child.copy_data_to(
include_nodes.next(),
file,
copied_chunks,
writer,
vault,
scratch,
index_callback,
)? || any_changes;
}
}
BTreeNode::Uninitialized => unreachable!(),
}
self.dirty |= true;
Ok(any_changes)
}
}
#[derive(Clone, Debug, Copy, Eq, PartialEq)]
pub enum NodeInclusion {
Exclude,
IncludeNext,
Include,
}
impl NodeInclusion {
pub const fn next(self) -> Self {
match self {
Self::Exclude => Self::Exclude,
Self::IncludeNext | Self::Include => Self::Include,
}
}
pub const fn should_include(self) -> bool {
matches!(self, Self::Include)
}
}
impl<
Index: Clone + BinarySerialization + Debug + 'static,
ReducedIndex: Reducer<Index> + Clone + BinarySerialization + Debug + 'static,
> BinarySerialization for BTreeEntry<Index, ReducedIndex>
{
fn serialize_to(
&mut self,
writer: &mut Vec<u8>,
paged_writer: &mut PagedWriter<'_>,
) -> Result<usize, Error> {
self.dirty = false;
let mut bytes_written = 0;
match &mut self.node {
BTreeNode::Leaf(leafs) => {
assert_children_order!(leafs);
writer.write_u8(1)?;
bytes_written += 1;
for leaf in leafs {
bytes_written += leaf.serialize_to(writer, paged_writer)?;
}
}
BTreeNode::Interior(interiors) => {
assert_children_order!(interiors);
writer.write_u8(0)?;
bytes_written += 1;
for interior in interiors {
bytes_written += interior.serialize_to(writer, paged_writer)?;
}
}
BTreeNode::Uninitialized => unreachable!(),
}
Ok(bytes_written)
}
fn deserialize_from(
reader: &mut ArcBytes<'_>,
current_order: Option<usize>,
) -> Result<Self, Error> {
let node_header = reader.read_u8()?;
match node_header {
0 => {
let mut nodes = Vec::new();
if let Some(current_order) = current_order {
nodes.reserve(current_order);
}
while !reader.is_empty() {
nodes.push(Interior::deserialize_from(reader, current_order)?);
}
Ok(Self {
node: BTreeNode::Interior(nodes),
dirty: false,
})
}
1 => {
let mut nodes = Vec::new();
if let Some(current_order) = current_order {
nodes.reserve(current_order);
}
while !reader.is_empty() {
nodes.push(KeyEntry::deserialize_from(reader, current_order)?);
}
Ok(Self {
node: BTreeNode::Leaf(nodes),
dirty: false,
})
}
_ => Err(Error::data_integrity("invalid node header")),
}
}
}
#[derive(Debug)]
pub enum KeyOperation<T> {
Skip,
Set(T),
Remove,
}
struct DirectionalSliceIterator<'a, I> {
forwards: bool,
index: usize,
contents: &'a [I],
}
impl<'a, I> DirectionalSliceIterator<'a, I> {
pub const fn new(forwards: bool, contents: &'a [I]) -> Self {
Self {
forwards,
contents,
index: if forwards { 0 } else { contents.len() },
}
}
}
impl<'a, I> Iterator for DirectionalSliceIterator<'a, I> {
type Item = &'a I;
fn next(&mut self) -> Option<Self::Item> {
if self.forwards && self.index < self.contents.len() {
let element = &self.contents[self.index];
self.index += 1;
Some(element)
} else if !self.forwards && self.index > 0 {
self.index -= 1;
Some(&self.contents[self.index])
} else {
None
}
}
}
pub struct ScanArgs<
Index,
ReducedIndex,
CallerError: Display + Debug,
NodeEvaluator,
KeyEvaluator,
DataCallback,
> where
NodeEvaluator: FnMut(&ArcBytes<'static>, &ReducedIndex, usize) -> ScanEvaluation,
KeyEvaluator: FnMut(&ArcBytes<'static>, &Index) -> ScanEvaluation,
DataCallback:
FnMut(ArcBytes<'static>, &Index, ArcBytes<'static>) -> Result<(), AbortError<CallerError>>,
{
pub forwards: bool,
pub node_evaluator: NodeEvaluator,
pub key_evaluator: KeyEvaluator,
pub data_callback: DataCallback,
_phantom: PhantomData<(Index, ReducedIndex, CallerError)>,
}
impl<
Index,
ReducedIndex,
CallerError: Display + Debug,
NodeEvaluator,
KeyEvaluator,
DataCallback,
> ScanArgs<Index, ReducedIndex, CallerError, NodeEvaluator, KeyEvaluator, DataCallback>
where
NodeEvaluator: FnMut(&ArcBytes<'static>, &ReducedIndex, usize) -> ScanEvaluation,
KeyEvaluator: FnMut(&ArcBytes<'static>, &Index) -> ScanEvaluation,
DataCallback:
FnMut(ArcBytes<'static>, &Index, ArcBytes<'static>) -> Result<(), AbortError<CallerError>>,
{
pub fn new(
forwards: bool,
node_evaluator: NodeEvaluator,
key_evaluator: KeyEvaluator,
data_callback: DataCallback,
) -> Self {
Self {
forwards,
node_evaluator,
key_evaluator,
data_callback,
_phantom: PhantomData,
}
}
}