mod messages {
include!(concat!(env!("OUT_DIR"), "/_.rs"));
}
mod blocks;
mod changes;
mod del;
mod keys;
mod put;
mod test;
pub mod traverse;
use std::{
fmt::Debug,
num::TryFromIntError,
ops::{Range, RangeBounds},
path::{Path, PathBuf},
string::FromUtf8Error,
sync::Arc,
};
use derive_builder::Builder;
use hypercore::{AppendOutcome, HypercoreBuilder, HypercoreError, Storage};
use prost::{bytes::Buf, DecodeError, EncodeError, Message};
use random_access_storage::RandomAccess;
use thiserror::Error;
use tokio::sync::RwLock;
use tracing::trace;
use blocks::{Blocks, BlocksBuilder, BlocksBuilderError};
use messages::{header::Metadata, yolo_index, Header, Node as NodeSchema, YoloIndex};
pub trait CoreMem: RandomAccess + Debug + Send {}
impl<T: RandomAccess + Debug + Send> CoreMem for T {}
static PROTOCOL: &str = "hyperbee";
static MAX_KEYS: usize = 8;
fn min_keys(max_keys: usize) -> usize {
max_keys >> 1
}
#[derive(Error, Debug)]
pub enum HyperbeeError {
#[error("There was an error in the underlying Hypercore")]
HypercoreError(#[from] HypercoreError),
#[error("There was an error decoding Hypercore data")]
DecodeError(#[from] DecodeError),
#[error("No block at seq `{0}`")]
NoBlockAtSeqError(u64),
#[error("There was an error building `crate::Hyperbee` from `crate::HyperbeeBuilder`")]
HyperbeeBuilderError(#[from] HyperbeeBuilderError),
#[error(
"There was an error building `crate::blocks::Blocks` from `crate::blocks::BlocksBuilder`"
)]
BlocksBuilderError(#[from] BlocksBuilderError),
#[error("Converting a u64 value [{0}] to usize failed. This is possibly a 32bit platform. Got error {1}")]
U64ToUsizeConversionError(u64, TryFromIntError),
#[error("Could not traverse child node. Got error: {0}")]
GetChildInTraverseError(Box<dyn std::error::Error>),
#[error("There was an error encoding a messages::YoloIndex {0}")]
YoloIndexEncodingError(EncodeError),
#[error("There was an error encoding a messages::Header {0}")]
HeaderEncodingError(EncodeError),
#[error("There was an error encoding a messages::Node {0}")]
NodeEncodingError(EncodeError),
#[error("There was an error decoding a key")]
KeyFromUtf8Error(#[from] FromUtf8Error),
#[error("The tree has no root so this operation failed")]
NoRootError,
#[error("The tree already has a header")]
HeaderAlreadyExists,
}
#[derive(Clone, Debug)]
pub struct KeyValue {
seq: u64,
cached_key: Option<Vec<u8>>,
cached_value: Option<Option<Vec<u8>>>,
}
#[derive(Debug)]
pub struct Child<M: CoreMem> {
pub seq: u64,
pub offset: u64,
cached_node: Option<SharedNode<M>>,
}
#[derive(Clone, Debug)]
pub struct BlockEntry<M: CoreMem> {
nodes: Vec<SharedNode<M>>,
key: Vec<u8>,
value: Option<Vec<u8>>,
}
type Shared<T> = Arc<RwLock<T>>;
type SharedNode<T> = Shared<Node<T>>;
type NodePath<T> = Vec<(SharedNode<T>, usize)>;
#[derive(Debug)]
struct Children<M: CoreMem> {
blocks: Shared<Blocks<M>>,
children: RwLock<Vec<Child<M>>>,
}
#[derive(Debug)]
pub struct Node<M: CoreMem> {
pub keys: Vec<KeyValue>,
children: Children<M>,
blocks: Shared<Blocks<M>>,
}
#[derive(Debug, Builder)]
#[builder(pattern = "owned", derive(Debug))]
pub struct Hyperbee<M: CoreMem> {
pub blocks: Shared<Blocks<M>>,
}
impl KeyValue {
fn new(seq: u64, keys_key: Option<Vec<u8>>, keys_value: Option<Option<Vec<u8>>>) -> Self {
KeyValue {
seq,
cached_key: keys_key,
cached_value: keys_value,
}
}
}
impl<M: CoreMem> Child<M> {
fn new(seq: u64, offset: u64, node: Option<SharedNode<M>>) -> Self {
Child {
seq,
offset,
cached_node: node,
}
}
}
impl<M: CoreMem> Clone for Child<M> {
fn clone(&self) -> Self {
Self::new(self.seq, self.offset, self.cached_node.clone())
}
}
fn make_node_vec<B: Buf, M: CoreMem>(
buf: B,
blocks: Shared<Blocks<M>>,
) -> Result<Vec<SharedNode<M>>, DecodeError> {
Ok(YoloIndex::decode(buf)?
.levels
.iter()
.map(|level| {
let keys = level
.keys
.iter()
.map(|k| KeyValue::new(*k, Option::None, Option::None))
.collect();
let mut children = vec![];
for i in (0..(level.children.len())).step_by(2) {
children.push(Child::new(
level.children[i],
level.children[i + 1],
Option::None,
));
}
Arc::new(RwLock::new(Node::new(keys, children, blocks.clone())))
})
.collect())
}
impl<M: CoreMem> Children<M> {
fn new(blocks: Shared<Blocks<M>>, children: Vec<Child<M>>) -> Self {
Self {
blocks,
children: RwLock::new(children),
}
}
#[tracing::instrument(skip(self))]
async fn insert(&self, index: usize, new_children: Vec<Child<M>>) {
if new_children.is_empty() {
trace!("no children to insert, do nothing");
return;
}
let replace_split_child = match new_children.is_empty() {
true => 0,
false => 1,
};
trace!(
"replacing child @ [{}] with [{}] children.",
index,
new_children.len()
);
self.children
.write()
.await
.splice(index..(index + replace_split_child), new_children);
}
#[tracing::instrument(skip(self))]
async fn get_child(&self, index: usize) -> Result<Shared<Node<M>>, HyperbeeError> {
let (seq, offset) = {
let child_ref = &self.children.read().await[index];
if let Some(node) = &child_ref.cached_node {
return Ok(node.clone());
}
(child_ref.seq, child_ref.offset)
};
let block = self
.blocks
.read()
.await
.get(&seq, self.blocks.clone())
.await?;
let node = block.read().await.get_tree_node(offset)?;
self.children.write().await[index].cached_node = Some(node.clone());
Ok(node)
}
async fn len(&self) -> usize {
self.children.read().await.len()
}
async fn splice<R: RangeBounds<usize>, I: IntoIterator<Item = Child<M>>>(
&self,
range: R,
replace_with: I,
) -> Vec<Child<M>> {
if self.children.read().await.is_empty() {
return vec![];
}
self.children
.write()
.await
.splice(range, replace_with)
.collect()
}
}
#[tracing::instrument(skip(node))]
async fn nearest_node<M: CoreMem, T>(
node: SharedNode<M>,
key: &T,
) -> Result<(bool, NodePath<M>), HyperbeeError>
where
T: PartialOrd<[u8]> + Debug + ?Sized,
{
let mut current_node = node;
let mut out_path: NodePath<M> = vec![];
loop {
let next_node = {
let child_index: usize = 'found: {
let n_keys = current_node.read().await.keys.len();
if n_keys == 0 {
break 'found n_keys;
}
let mut low = 0;
let mut high = n_keys - 1;
while low <= high {
let mid = low + ((high - low) >> 1);
let val = current_node.write().await.get_key(mid).await?;
if key == &val[..] {
trace!("key {:?} == val {:?} at index {}", key, val, mid);
out_path.push((current_node.clone(), mid));
return Ok((true, out_path));
}
if key < &val[..] {
if mid == 0 {
break;
}
high = mid - 1;
} else {
low = mid + 1;
}
}
out_path.push((current_node.clone(), low));
break 'found low;
};
if current_node.read().await.is_leaf().await {
trace!("Reached leaf, we're done.");
return Ok((false, out_path));
}
current_node.read().await.get_child(child_index).await?
};
current_node = next_node;
}
}
impl<M: CoreMem> Node<M> {
fn new(keys: Vec<KeyValue>, children: Vec<Child<M>>, blocks: Shared<Blocks<M>>) -> Self {
Node {
keys,
children: Children::new(blocks.clone(), children),
blocks,
}
}
pub async fn n_children(&self) -> usize {
self.children.len().await
}
async fn is_leaf(&self) -> bool {
self.n_children().await == 0
}
pub async fn height(&self) -> Result<usize, HyperbeeError> {
if self.is_leaf().await {
Ok(1)
} else {
let mut out = 1;
let mut cur_child = self.get_child(0).await?;
loop {
out += 1;
if cur_child.read().await.n_children().await == 0 {
return Ok(out);
}
let next_child = cur_child.read().await.get_child(0).await?;
cur_child = next_child;
}
}
}
async fn to_level(&self) -> yolo_index::Level {
let mut children = vec![];
for c in self.children.children.read().await.iter() {
children.push(c.seq);
children.push(c.offset);
}
yolo_index::Level {
keys: self.keys.iter().map(|k| k.seq).collect(),
children,
}
}
#[tracing::instrument(skip(self))]
async fn get_key(&mut self, index: usize) -> Result<Vec<u8>, HyperbeeError> {
let key = &mut self.keys[index];
if let Some(value) = &key.cached_key {
trace!("has cached value");
return Ok(value.clone());
}
trace!("no cached value");
let value = self
.blocks
.read()
.await
.get(&key.seq, self.blocks.clone())
.await?
.read()
.await
.key
.clone();
key.cached_key = Some(value.clone());
Ok(value)
}
async fn get_value_of_key(
&self,
index: usize,
) -> Result<(u64, Option<Vec<u8>>), HyperbeeError> {
match &self.keys[index] {
KeyValue {
seq,
cached_value: Some(value),
..
} => Ok((*seq, value.clone())),
KeyValue {
seq,
cached_value: None,
..
} => Ok((
*seq,
self.blocks
.read()
.await
.get(seq, self.blocks.clone())
.await?
.read()
.await
.value
.clone(),
)),
}
}
async fn get_child(&self, index: usize) -> Result<Shared<Node<M>>, HyperbeeError> {
self.children.get_child(index).await
}
#[tracing::instrument(skip(self))]
async fn insert(&mut self, key_ref: KeyValue, children: Vec<Child<M>>, range: Range<usize>) {
trace!("inserting [{}] children", children.len());
self.keys.splice(range.clone(), vec![key_ref]);
self.children.insert(range.start, children).await;
}
}
impl<M: CoreMem> BlockEntry<M> {
fn new(entry: NodeSchema, blocks: Shared<Blocks<M>>) -> Result<Self, HyperbeeError> {
Ok(BlockEntry {
nodes: make_node_vec(&entry.index[..], blocks)?,
key: entry.key,
value: entry.value,
})
}
fn get_tree_node(&self, offset: u64) -> Result<SharedNode<M>, HyperbeeError> {
Ok(self
.nodes
.get(
usize::try_from(offset)
.map_err(|e| HyperbeeError::U64ToUsizeConversionError(offset, e))?,
)
.expect("offset *should* always point to a real node")
.clone())
}
}
impl<M: CoreMem> Hyperbee<M> {
pub async fn version(&self) -> u64 {
self.blocks.read().await.info().await.length
}
pub async fn get_root(
&mut self,
ensure_header: bool,
) -> Result<Option<Shared<Node<M>>>, HyperbeeError> {
let blocks = self.blocks.read().await;
let version = self.version().await;
if version == 0 {
if ensure_header {
self.ensure_header().await?;
}
return Ok(None);
}
let root = blocks
.get(&(version - 1), self.blocks.clone())
.await?
.read()
.await
.get_tree_node(0)?;
Ok(Some(root))
}
pub async fn get(
&mut self,
key: &[u8],
) -> Result<Option<(u64, Option<Vec<u8>>)>, HyperbeeError> {
let node = match self.get_root(false).await? {
None => return Ok(None),
Some(node) => node,
};
let (matched, path) = nearest_node(node, key).await?;
if matched {
let (node, key_index) = path
.last()
.expect("Since `matched` was true, there must be at least one node in `path`");
return Ok(Some(node.read().await.get_value_of_key(*key_index).await?));
}
Ok(None)
}
async fn ensure_header(&self) -> Result<bool, HyperbeeError> {
match self.create_header(None).await {
Ok(_) => Ok(true),
Err(e) => match e {
HyperbeeError::HeaderAlreadyExists => Ok(false),
other_errors => Err(other_errors),
},
}
}
pub async fn create_header(
&self,
metadata: Option<Metadata>,
) -> Result<AppendOutcome, HyperbeeError> {
if self.blocks.read().await.info().await.length != 0 {
return Err(HyperbeeError::HeaderAlreadyExists);
}
let header = Header {
protocol: PROTOCOL.to_string(),
metadata,
};
let mut buf = vec![];
buf.reserve(header.encoded_len());
header
.encode(&mut buf)
.map_err(HyperbeeError::HeaderEncodingError)?;
self.blocks.read().await.append(&buf).await
}
pub async fn print(&mut self) -> Result<String, HyperbeeError> {
let root = self
.get_root(false)
.await?
.ok_or(HyperbeeError::NoRootError)?;
let out = traverse::print(root).await?;
Ok(out)
}
}
impl Hyperbee<random_access_disk::RandomAccessDisk> {
pub async fn from_storage_dir<T: AsRef<Path>>(
path_to_storage_dir: T,
) -> Result<Hyperbee<random_access_disk::RandomAccessDisk>, HyperbeeError> {
let p: PathBuf = path_to_storage_dir.as_ref().to_owned();
let storage = Storage::new_disk(&p, false).await?;
let hc = Arc::new(RwLock::new(HypercoreBuilder::new(storage).build().await?));
let blocks = BlocksBuilder::default().core(hc).build()?;
Ok(HyperbeeBuilder::default()
.blocks(Arc::new(RwLock::new(blocks)))
.build()?)
}
}
impl Hyperbee<random_access_memory::RandomAccessMemory> {
pub async fn from_ram(
) -> Result<Hyperbee<random_access_memory::RandomAccessMemory>, HyperbeeError> {
let hc = Arc::new(RwLock::new(
HypercoreBuilder::new(Storage::new_memory().await?)
.build()
.await?,
));
let blocks = BlocksBuilder::default().core(hc).build()?;
Ok(HyperbeeBuilder::default()
.blocks(Arc::new(RwLock::new(blocks)))
.build()?)
}
}
impl<M: CoreMem> Clone for Hyperbee<M> {
fn clone(&self) -> Self {
Self {
blocks: self.blocks.clone(),
}
}
}