use std::cmp::Ordering;
use std::fmt;
#[cfg(debug_assertions)]
use std::iter::FromIterator;
use std::marker::PhantomData;
use std::ops::Deref;
use std::pin::Pin;
use std::sync::Arc;
use async_trait::async_trait;
use collate::Collate;
use destream::{de, en};
use futures::future::{self, Future, TryFutureExt};
use futures::join;
use futures::stream::{self, FuturesOrdered, FuturesUnordered, TryStreamExt};
use log::debug;
use uuid::Uuid;
use tc_error::*;
use tc_transact::fs::*;
use tc_transact::lock::TxnLock;
use tc_transact::{Transact, Transaction, TxnId};
use tc_value::{Value, ValueCollator};
use tcgeneric::{Instance, TCBoxTryFuture, TCBoxTryStream, Tuple};
use super::{BTree, BTreeInstance, BTreeSlice, BTreeType, BTreeWrite, Key, Range, RowSchema};
type Selection<'a> = FuturesOrdered<
Pin<Box<dyn Future<Output = TCResult<TCBoxTryStream<'a, Key>>> + Send + Unpin + 'a>>,
>;
const DEFAULT_BLOCK_SIZE: usize = 4_000;
const BLOCK_ID_SIZE: usize = 128;
type NodeId = BlockId;
#[derive(Clone, Eq, PartialEq)]
struct NodeKey {
deleted: bool,
value: Vec<Value>,
}
impl NodeKey {
fn new(value: Vec<Value>) -> Self {
Self {
deleted: false,
value,
}
}
}
impl AsRef<[Value]> for NodeKey {
fn as_ref(&self) -> &[Value] {
&self.value
}
}
#[async_trait]
impl de::FromStream for NodeKey {
type Context = ();
async fn from_stream<D: de::Decoder>(cxt: (), decoder: &mut D) -> Result<Self, D::Error> {
de::FromStream::from_stream(cxt, decoder)
.map_ok(|(deleted, value)| Self { deleted, value })
.map_err(|e| de::Error::custom(format!("error decoding BTree node key: {}", e)))
.await
}
}
impl<'en> en::ToStream<'en> for NodeKey {
fn to_stream<E: en::Encoder<'en>>(&'en self, encoder: E) -> Result<E::Ok, E::Error> {
en::IntoStream::into_stream((&self.deleted, &self.value), encoder)
}
}
impl<'en> en::IntoStream<'en> for NodeKey {
fn into_stream<E: en::Encoder<'en>>(self, encoder: E) -> Result<E::Ok, E::Error> {
en::IntoStream::into_stream((self.deleted, self.value), encoder)
}
}
#[cfg(debug_assertions)]
impl fmt::Display for NodeKey {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"BTree node key: {}{}",
Value::from_iter(self.value.to_vec()),
if self.deleted { " (DELETED)" } else { "" }
)
}
}
#[derive(Clone, Eq, PartialEq)]
pub struct Node {
leaf: bool,
keys: Vec<NodeKey>,
parent: Option<NodeId>,
children: Vec<NodeId>,
rebalance: bool, }
impl Node {
fn new(leaf: bool, parent: Option<NodeId>) -> Node {
Node {
leaf,
keys: vec![],
parent,
children: vec![],
rebalance: false,
}
}
#[cfg(debug_assertions)]
fn validate(&self, schema: &[super::Column], range: &Range) {
debug!("validate {}", self);
debug!("range: {:?}", range);
for key in &self.keys {
debug!("key: {}", key);
assert_eq!(key.value.len(), schema.len());
assert!(range.prefix().len() <= key.value.len());
assert!(range.len() <= key.value.len());
}
}
}
impl BlockData for Node {
fn ext() -> &'static str {
"node"
}
}
#[async_trait]
impl de::FromStream for Node {
type Context = ();
async fn from_stream<D: de::Decoder>(cxt: (), decoder: &mut D) -> Result<Self, D::Error> {
de::FromStream::from_stream(cxt, decoder)
.map_ok(|(leaf, keys, parent, children, rebalance)| Self {
leaf,
keys,
parent,
children,
rebalance,
})
.map_err(|e| de::Error::custom(format!("error decoding BTree node: {}", e)))
.await
}
}
impl<'en> en::ToStream<'en> for Node {
fn to_stream<E: en::Encoder<'en>>(&'en self, encoder: E) -> Result<E::Ok, E::Error> {
en::IntoStream::into_stream(
(
&self.leaf,
&self.keys,
&self.parent,
&self.children,
&self.rebalance,
),
encoder,
)
}
}
impl<'en> en::IntoStream<'en> for Node {
fn into_stream<E: en::Encoder<'en>>(self, encoder: E) -> Result<E::Ok, E::Error> {
en::IntoStream::into_stream(
(
self.leaf,
self.keys,
self.parent,
self.children,
self.rebalance,
),
encoder,
)
}
}
#[cfg(debug_assertions)]
impl fmt::Debug for Node {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Display::fmt(self, f)
}
}
#[cfg(debug_assertions)]
impl fmt::Display for Node {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if self.leaf {
writeln!(f, "leaf node:")?;
} else {
writeln!(f, "non-leaf node:")?;
}
write!(
f,
"\tkeys: {}",
Tuple::<NodeKey>::from_iter(self.keys.iter().cloned())
)?;
write!(f, "\t {} children", self.children.len())
}
}
struct Inner<F, D, T> {
file: F,
schema: RowSchema,
order: usize,
collator: ValueCollator,
root: TxnLock<NodeId>,
dir: PhantomData<D>,
txn: PhantomData<T>,
}
#[derive(Clone)]
pub struct BTreeFile<F, D, T> {
inner: Arc<Inner<F, D, T>>,
}
impl<F: File<Node>, D: Dir, T: Transaction<D>> BTreeFile<F, D, T>
where
Self: Clone,
{
fn new(file: F, schema: RowSchema, order: usize, root: NodeId) -> Self {
BTreeFile {
inner: Arc::new(Inner {
file,
schema,
order,
collator: ValueCollator::default(),
root: TxnLock::new("BTree root", root.into()),
dir: PhantomData,
txn: PhantomData,
}),
}
}
pub async fn create(file: F, schema: RowSchema, txn_id: TxnId) -> TCResult<Self> {
if !file.is_empty(txn_id).await? {
return Err(TCError::internal(
"Tried to create a new BTree without a new File",
));
}
let order = validate_schema(&schema)?;
let root: BlockId = Uuid::new_v4().into();
let node = Node::new(true, None);
file.clone()
.create_block(txn_id, root.clone(), node, DEFAULT_BLOCK_SIZE)
.await?;
Ok(BTreeFile::new(file, schema, order, root))
}
fn _delete_range<'a>(
&'a self,
txn_id: TxnId,
node_id: NodeId,
range: &'a Range,
) -> TCBoxTryFuture<'a, ()> {
debug_assert!(range.len() <= self.inner.schema.len());
Box::pin(async move {
let collator = &self.inner.collator;
let file = &self.inner.file;
let mut node = file.write_block(txn_id, node_id).await?;
#[cfg(debug_assertions)]
node.validate(&self.inner.schema, range);
let (l, r) = collator.bisect(&node.keys, range);
#[cfg(debug_assertions)]
debug!("delete from {} [{}..{}] ({:?})", *node, l, r, range);
if node.leaf {
for i in l..r {
node.keys[i].deleted = true;
}
node.rebalance = true;
Ok(())
} else if r > l {
node.rebalance = true;
for i in l..r {
node.keys[i].deleted = true;
}
let deletes: FuturesUnordered<_> = node.children[l..(r + 1)]
.iter()
.cloned()
.map(|child_id| self._delete_range(txn_id, child_id, range))
.collect();
deletes.try_fold((), |(), ()| future::ready(Ok(()))).await
} else {
let child_id = node.children[r].clone();
self._delete_range(txn_id, child_id, range).await
}
})
}
fn _insert(&self, txn_id: TxnId, mut node: F::Write, key: Key) -> TCBoxTryFuture<()> {
Box::pin(async move {
let collator = &self.inner.collator;
let file = &self.inner.file;
let order = self.inner.order;
let i = collator.bisect_left(&node.keys, &key);
#[cfg(debug_assertions)]
debug!("insert at index {} into {}", i, *node);
if node.leaf {
let key = NodeKey::new(key);
if i == node.keys.len() {
node.keys.insert(i, key);
return Ok(());
}
#[cfg(debug_assertions)]
debug!("insert key {} into {} at {}", key, *node, i);
match collator.compare_slice(&key, &node.keys[i]) {
Ordering::Less => node.keys.insert(i, key),
Ordering::Equal => {
#[cfg(debug_assertions)]
debug!("un-delete key at {}: {}", i, key);
node.keys[i].deleted = false
}
Ordering::Greater => panic!("error in Collate::bisect_left"),
}
Ok(())
} else {
let child_id = node.children[i].clone();
let child = file.write_block(txn_id, child_id).await?;
if child.keys.len() == (2 * order) - 1 {
let child_id = node.children[i].clone();
let mut node = self.split_child(txn_id, node, child_id, child, i).await?;
match collator.compare_slice(&key, &node.keys[i]) {
Ordering::Less => self._insert(txn_id, node, key).await,
Ordering::Equal => {
if node.keys[i].deleted {
node.keys[i].deleted = false;
}
return Ok(());
}
Ordering::Greater => {
let child_id = node.children[i + 1].clone();
let child = file.write_block(txn_id, child_id).await?;
self._insert(txn_id, child, key).await
}
}
} else {
self._insert(txn_id, child, key).await
}
}
})
}
fn _slice<'a, B: Deref<Target = Node>>(
self,
txn_id: TxnId,
node: B,
range: Range,
) -> TCResult<TCBoxTryStream<'a, Key>>
where
Self: 'a,
{
let (l, r) = self.inner.collator.bisect(&node.keys[..], &range);
#[cfg(debug_assertions)]
debug!("_slice {} from {} to {} ({:?})", *node, l, r, range);
if node.leaf {
let keys = node.keys[l..r]
.iter()
.filter(|k| !k.deleted)
.map(|k| k.value.to_vec())
.map(TCResult::Ok)
.collect::<Vec<TCResult<Key>>>();
Ok(Box::pin(stream::iter(keys)))
} else {
let mut selected: Selection<'a> = FuturesOrdered::new();
for i in l..r {
let child_id = node.children[i].clone();
let range_clone = range.clone();
let this = self.clone();
let selection = Box::pin(async move {
let node = this.inner.file.read_block(txn_id, child_id).await?;
this._slice(txn_id, node, range_clone)
});
selected.push(Box::pin(selection));
if !node.keys[i].deleted {
let key_at_i = TCResult::Ok(node.keys[i].value.to_vec());
let key_at_i: TCBoxTryStream<Key> =
Box::pin(stream::once(future::ready(key_at_i)));
selected.push(Box::pin(future::ready(Ok(key_at_i))));
}
}
let last_child_id = node.children[r].clone();
let selection = Box::pin(async move {
let node = self.inner.file.read_block(txn_id, last_child_id).await?;
self._slice(txn_id, node, range)
});
selected.push(Box::pin(selection));
Ok(Box::pin(selected.try_flatten()))
}
}
fn _slice_reverse<'a, B: Deref<Target = Node>>(
self,
txn_id: TxnId,
node: B,
range: Range,
) -> TCResult<TCBoxTryStream<'a, Key>>
where
Self: 'a,
{
let (l, r) = self.inner.collator.bisect(&node.keys, &range);
#[cfg(debug_assertions)]
debug!("_slice_reverse {} from {} to {} ({:?})", *node, r, l, range);
if node.leaf {
let keys = node.keys[l..r]
.iter()
.filter(|k| !k.deleted)
.rev()
.map(|k| k.value.to_vec())
.map(TCResult::Ok)
.collect::<Vec<TCResult<Key>>>();
Ok(Box::pin(stream::iter(keys)))
} else {
let mut selected: Selection<'a> = FuturesOrdered::new();
let last_child = node.children[r].clone();
let range_clone = range.clone();
let this = self.clone();
let selection = Box::pin(async move {
let node = this.inner.file.read_block(txn_id, last_child).await?;
this._slice_reverse(txn_id, node, range_clone)
});
selected.push(Box::pin(selection));
for i in (l..r).rev() {
let child_id = node.children[i].clone();
let range_clone = range.clone();
let this = self.clone();
let selection = Box::pin(async move {
let node = this.inner.file.read_block(txn_id, child_id).await?;
this._slice_reverse(txn_id, node, range_clone)
});
if !node.keys[i].deleted {
let key_at_i = TCResult::Ok(node.keys[i].value.to_vec());
let key_at_i: TCBoxTryStream<Key> =
Box::pin(stream::once(future::ready(key_at_i)));
selected.push(Box::pin(future::ready(Ok(key_at_i))));
}
selected.push(Box::pin(selection));
}
Ok(Box::pin(selected.try_flatten()))
}
}
pub(super) async fn rows_in_range<'a>(
self,
txn_id: TxnId,
range: Range,
reverse: bool,
) -> TCResult<TCBoxTryStream<'a, Key>>
where
Self: 'a,
{
let root_id = self.inner.root.read(txn_id).await?;
let root = self
.inner
.file
.read_block(txn_id, (*root_id).clone())
.await?;
if reverse {
self._slice_reverse(txn_id, root, range)
} else {
self._slice(txn_id, root, range)
}
}
async fn split_child(
&self,
txn_id: TxnId,
mut node: F::Write,
node_id: NodeId,
mut child: F::Write,
i: usize,
) -> TCResult<F::Write> {
debug!("BTree::split_child");
let file = &self.inner.file;
let order = self.inner.order;
assert_eq!(node_id, node.children[i].clone());
debug!(
"child to split has {} keys and {} children",
child.keys.len(),
child.children.len()
);
let new_node = Node::new(child.leaf, Some(node_id));
let (new_node_id, mut new_node) = file
.create_block_unique(txn_id, new_node, DEFAULT_BLOCK_SIZE)
.await?;
debug!("BTree::split_child created new node {}", new_node_id);
node.children.insert(i + 1, new_node_id.clone());
node.keys.insert(i, child.keys.remove(order - 1));
new_node.keys = child.keys.drain((order - 1)..).collect();
if child.leaf {
debug!("child is a leaf node");
} else {
new_node.children = child.children.drain(order..).collect();
}
Ok(node)
}
}
impl<F, D, T> Instance for BTreeFile<F, D, T>
where
Self: Send + Sync,
{
type Class = BTreeType;
fn class(&self) -> Self::Class {
BTreeType::File
}
}
#[async_trait]
impl<F: File<Node>, D: Dir, T: Transaction<D>> BTreeInstance for BTreeFile<F, D, T>
where
Self: Clone,
BTreeSlice<F, D, T>: 'static,
{
type Slice = BTreeSlice<F, D, T>;
fn collator(&'_ self) -> &'_ ValueCollator {
&self.inner.collator
}
fn schema(&'_ self) -> &'_ RowSchema {
&self.inner.schema
}
fn slice(self, range: Range, reverse: bool) -> TCResult<Self::Slice> {
BTreeSlice::new(BTree::File(self), range, reverse)
}
async fn is_empty(&self, txn_id: TxnId) -> TCResult<bool> {
let root_id = self.inner.root.read(txn_id).await?;
let root = self
.inner
.file
.read_block(txn_id, (*root_id).clone())
.await?;
Ok(root.keys.is_empty())
}
async fn keys<'a>(self, txn_id: TxnId) -> TCResult<TCBoxTryStream<'a, Key>> {
self.rows_in_range(txn_id, Range::default(), false).await
}
fn validate_key(&self, key: Key) -> TCResult<Key> {
if key.len() != self.inner.schema.len() {
return Err(TCError::bad_request("invalid key length", Tuple::from(key)));
}
key.into_iter()
.zip(&self.inner.schema)
.map(|(val, col)| {
val.into_type(col.dtype)
.ok_or_else(|| TCError::bad_request("invalid value for column", &col.name))
})
.collect()
}
}
#[async_trait]
impl<F: File<Node>, D: Dir, T: Transaction<D>> BTreeWrite for BTreeFile<F, D, T>
where
Self: Clone,
BTreeSlice<F, D, T>: 'static,
{
async fn delete(&self, txn_id: TxnId, range: Range) -> TCResult<()> {
if range == Range::default() {
let mut root = self.inner.root.write(txn_id).await?;
self.inner.file.truncate(txn_id).await?;
*root = Uuid::new_v4().into();
let node = Node::new(true, None);
self.inner
.file
.create_block(txn_id, (*root).clone(), node, DEFAULT_BLOCK_SIZE)
.await?;
return Ok(());
}
let root_id = self.inner.root.read(txn_id).await?;
self._delete_range(txn_id, (*root_id).clone(), &range).await
}
async fn insert(&self, txn_id: TxnId, key: Key) -> TCResult<()> {
let key = self.validate_key(key)?;
let file = &self.inner.file;
let order = self.inner.order;
let mut root_id = self.inner.root.write(txn_id).await?;
debug!("insert into BTree with root node ID {}", *root_id);
let root = file.write_block(txn_id, (*root_id).clone()).await?;
#[cfg(debug_assertions)]
debug!(
"insert {} into BTree, root node {} has {} keys and {} children (order is {})",
<Tuple<Value> as std::iter::FromIterator<Value>>::from_iter(key.to_vec()),
*root_id,
root.keys.len(),
root.children.len(),
order
);
#[cfg(debug_assertions)]
debug!("root node {} is {}", *root_id, *root);
assert_eq!(root.children.is_empty(), root.leaf);
if root.keys.len() == (2 * order) - 1 {
debug!("split root node");
let old_root_id = (*root_id).clone();
let mut new_root = Node::new(false, None);
new_root.children.push(old_root_id.clone());
let (new_root_id, new_root) = file
.create_block_unique(txn_id, new_root, DEFAULT_BLOCK_SIZE)
.await?;
(*root_id) = new_root_id;
let new_root = self
.split_child(txn_id, new_root, old_root_id, root, 0)
.await?;
self._insert(txn_id, new_root, key).await
} else {
self._insert(txn_id, root, key).await
}
}
}
#[async_trait]
impl<F: File<Node> + Transact, D: Dir, T: Transaction<D>> Transact for BTreeFile<F, D, T> {
async fn commit(&self, txn_id: &TxnId) {
join!(
self.inner.file.commit(txn_id),
self.inner.root.commit(txn_id)
);
}
async fn finalize(&self, txn_id: &TxnId) {
join!(
self.inner.file.finalize(txn_id),
self.inner.root.finalize(txn_id)
);
}
}
#[async_trait]
impl<F: File<Node>, D: Dir, T: Transaction<D>> Persist<D> for BTreeFile<F, D, T> {
type Schema = RowSchema;
type Store = F;
type Txn = T;
fn schema(&self) -> &Self::Schema {
&self.inner.schema
}
async fn load(txn: &T, schema: RowSchema, file: F) -> TCResult<Self> {
debug!("BTreeFile::load {:?}", schema);
let order = validate_schema(&schema)?;
let txn_id = *txn.id();
let mut root = None;
for block_id in file.block_ids(txn_id).await? {
debug!("BTreeFile::load block {}", block_id);
let block = file.read_block(txn_id, block_id.clone()).await?;
debug!("BTreeFile::loaded block {}", block_id);
if block.parent.is_none() {
root = Some(block_id);
break;
}
}
let root = root.ok_or_else(|| TCError::internal("BTree corrupted (missing root block)"))?;
Ok(BTreeFile::new(file, schema, order, root))
}
}
#[async_trait]
impl<F: File<Node>, D: Dir, T: Transaction<D>> Restore<D> for BTreeFile<F, D, T> {
async fn restore(&self, backup: &Self, txn_id: TxnId) -> TCResult<()> {
if self.inner.schema != backup.inner.schema {
return Err(TCError::unsupported(
"cannot restore a BTree from a backup with a different schema",
));
}
let mut root_id = self.inner.root.write(txn_id).await?;
self.inner.file.truncate(txn_id).await?;
*root_id = backup.inner.root.read(txn_id).await?.clone();
self.inner.file.copy_from(&backup.inner.file, txn_id).await
}
}
#[async_trait]
impl<F: File<Node>, D: Dir, T: Transaction<D>, I: BTreeInstance + 'static> CopyFrom<D, I>
for BTreeFile<F, D, T>
{
async fn copy_from(source: I, file: F, txn: &T) -> TCResult<Self> {
let txn_id = *txn.id();
let schema = source.schema().clone();
let dest = Self::create(file, schema, txn_id).await?;
let keys = source.keys(txn_id).await?;
dest.try_insert_from(txn_id, keys).await?;
Ok(dest)
}
}
impl<F, D, T> fmt::Display for BTreeFile<F, D, T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("a BTree")
}
}
fn validate_schema(schema: &RowSchema) -> TCResult<usize> {
let mut key_size = 0;
for col in schema {
if let Some(size) = col.dtype().size() {
key_size += size;
if col.max_len().is_some() {
return Err(TCError::bad_request(
"Maximum length is not applicable to",
col.dtype(),
));
}
} else if let Some(size) = col.max_len() {
key_size += size;
} else {
return Err(TCError::bad_request(
"Type requires a maximum length",
col.dtype(),
));
}
}
key_size += schema.len() * 2;
key_size += 4;
let order = if DEFAULT_BLOCK_SIZE > (key_size * 2) + (BLOCK_ID_SIZE * 3) {
(DEFAULT_BLOCK_SIZE - BLOCK_ID_SIZE) / (key_size + BLOCK_ID_SIZE)
} else {
2
};
Ok(order)
}