use derive_builder::Builder;
use futures_lite::{Stream, StreamExt};
use hypercore::{AppendOutcome, Hypercore, HypercoreBuilder, Storage};
use prost::Message;
use crate::{
blocks::{Blocks, BlocksBuilder},
error::HyperbeeError,
messages::{header::Metadata, Header},
nearest_node,
traverse::{KeyDataResult, Traverse, TraverseConfig},
Node, Shared, PROTOCOL,
};
use std::{
fmt::Debug,
path::{Path, PathBuf},
sync::Arc,
};
use tokio::sync::{Mutex, RwLock};
#[derive(Debug, Builder)]
#[builder(pattern = "owned", derive(Debug))]
pub struct Tree {
pub blocks: Shared<Blocks>,
}
impl Tree {
pub async fn version(&self) -> u64 {
self.blocks.read().await.info().await.length
}
pub(crate) async fn get_root(
&self,
ensure_header: bool,
) -> Result<Option<Shared<Node>>, HyperbeeError> {
let blocks = self.blocks.read().await;
let version = self.version().await;
if version <= 1 {
if version == 0 && ensure_header {
self.ensure_header().await?;
}
return Ok(None);
}
let root = blocks
.get(&(version - 1), self.blocks.clone())
.await?
.read()
.await
.get_tree_node(0)?;
Ok(Some(root))
}
#[cfg(feature = "debug")]
pub async fn height(&self) -> Result<usize, HyperbeeError> {
let Some(root) = self.get_root(false).await? else {
return Ok(0);
};
let root = root.read().await;
root.height().await
}
pub async fn get(&self, key: &[u8]) -> Result<Option<(u64, Option<Vec<u8>>)>, HyperbeeError> {
let node = match self.get_root(false).await? {
None => return Ok(None),
Some(node) => node,
};
let (matched, path) = nearest_node(node, key).await?;
if matched.is_some() {
let (node, key_index) = path
.last()
.expect("Since `matched` was true, there must be at least one node in `path`");
let kv = node.read().await.get_key_value(*key_index).await?;
return Ok(Some((kv.seq, kv.value)));
}
Ok(None)
}
async fn ensure_header(&self) -> Result<bool, HyperbeeError> {
match self.create_header(None).await {
Ok(_) => Ok(true),
Err(e) => match e {
HyperbeeError::HeaderAlreadyExists => Ok(false),
other_errors => Err(other_errors),
},
}
}
pub async fn create_header(
&self,
metadata: Option<Metadata>,
) -> Result<AppendOutcome, HyperbeeError> {
if self.blocks.read().await.info().await.length != 0 {
return Err(HyperbeeError::HeaderAlreadyExists);
}
let header = Header {
protocol: PROTOCOL.to_string(),
metadata,
};
let mut buf = Vec::with_capacity(header.encoded_len());
header
.encode(&mut buf)
.map_err(HyperbeeError::HeaderEncodingError)?;
self.blocks.read().await.append(&buf).await
}
#[cfg(feature = "debug")]
pub async fn print(&self) -> Result<String, HyperbeeError> {
let root = self
.get_root(false)
.await?
.ok_or(HyperbeeError::NoRootError)?;
let out = crate::traverse::print(root).await?;
Ok(out)
}
pub async fn traverse<'a>(
&self,
conf: TraverseConfig,
) -> Result<impl Stream<Item = KeyDataResult> + 'a, HyperbeeError> {
let root = self
.get_root(false)
.await?
.ok_or(HyperbeeError::NoRootError)?;
let stream = Traverse::new(root, conf);
Ok(stream.map(move |kv_and_node| kv_and_node.0))
}
pub async fn from_storage_dir<T: AsRef<Path>>(
path_to_storage_dir: T,
) -> Result<Tree, HyperbeeError> {
let p: PathBuf = path_to_storage_dir.as_ref().to_owned();
let storage = Storage::new_disk(&p, false).await?;
let hc = Arc::new(Mutex::new(HypercoreBuilder::new(storage).build().await?));
let blocks = BlocksBuilder::default().core(hc).build()?;
Self::from_blocks(blocks)
}
pub async fn from_ram() -> Result<Tree, HyperbeeError> {
let hc = Arc::new(Mutex::new(
HypercoreBuilder::new(Storage::new_memory().await?)
.build()
.await?,
));
let blocks = BlocksBuilder::default().core(hc).build()?;
Self::from_blocks(blocks)
}
pub fn from_hypercore(hypercore: Hypercore) -> Result<Self, HyperbeeError> {
let hc = Arc::new(Mutex::new(hypercore));
let blocks = BlocksBuilder::default().core(hc).build()?;
Self::from_blocks(blocks)
}
fn from_blocks(blocks: Blocks) -> Result<Self, HyperbeeError> {
Ok(TreeBuilder::default()
.blocks(Arc::new(RwLock::new(blocks)))
.build()?)
}
}
impl Clone for Tree {
fn clone(&self) -> Self {
Self {
blocks: self.blocks.clone(),
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[tokio::test]
async fn from_hc() -> Result<(), HyperbeeError> {
let hc = HypercoreBuilder::new(Storage::new_memory().await?)
.build()
.await?;
let tree = Tree::from_hypercore(hc)?;
assert_eq!((None, 1), tree.put(b"hello", Some(b"world")).await?);
assert_eq!(
tree.get(b"hello").await?,
Some((1u64, Some(b"world".into())))
);
Ok(())
}
}
#[cfg(feature = "debug")]
#[cfg(test)]
mod test {
use super::*;
#[tokio::test]
async fn height_zero() -> Result<(), HyperbeeError> {
let tree = Tree::from_ram().await?;
assert_eq!(tree.height().await?, 0);
tree.put(b"foo", None).await?;
assert_eq!(tree.height().await?, 1);
tree.del(b"foo").await?;
assert_eq!(tree.height().await?, 1);
Ok(())
}
}