hyperbee/
hb.rs

1use std::{fmt::Debug, path::Path, sync::Arc};
2
3use derive_builder::Builder;
4use futures_lite::Stream;
5use hypercore::{AppendOutcome, Hypercore};
6use tokio::sync::RwLock;
7
8use crate::{
9    error::HyperbeeError,
10    messages::header::Metadata,
11    prefixed::{Prefixed, PrefixedConfig},
12    traverse::{KeyDataResult, TraverseConfig},
13    tree::Tree,
14    KeyValueData, Shared,
15};
16
17/// An append only B-Tree built on [`Hypercore`](hypercore::Hypercore). It provides a key-value
18/// store API, with methods for [inserting](Hyperbee::put), [getting](Hyperbee::get), and
19/// [deleting](Hyperbee::del) key-value pair. As well as creating [sorted
20/// iterators](Hyperbee::traverse), and ["sub" B-Trees](Hyperbee::sub) for grouping related data.
21#[derive(Debug, Builder)]
22#[builder(pattern = "owned", derive(Debug))]
23pub struct Hyperbee {
24    tree: Shared<Tree>,
25}
26
27impl Hyperbee {
28    /// The number of blocks in the hypercore.
29    /// The first block is always the header block so:
30    /// `version` would be the `seq` of the next block
31    /// `version - 1` is most recent block
32    pub async fn version(&self) -> u64 {
33        self.tree.read().await.version().await
34    }
35
36    /// Create the header for the Hyperbee. This must be done before writing anything else to the
37    /// tree.
38    pub async fn create_header(
39        &self,
40        metadata: Option<Metadata>,
41    ) -> Result<AppendOutcome, HyperbeeError> {
42        self.tree.read().await.create_header(metadata).await
43    }
44
45    #[cfg(feature = "debug")]
46    /// The number of levels in the tree
47    pub async fn height(&self) -> Result<usize, HyperbeeError> {
48        self.tree.read().await.height().await
49    }
50
51    #[cfg(feature = "debug")]
52    /// Returs a string representing the structure of the tree showing the keys in each node
53    pub async fn print(&self) -> Result<String, HyperbeeError> {
54        self.tree.read().await.print().await
55    }
56
57    /// Get the value corresponding to the provided `key` from the Hyperbee
58    /// # Errors
59    /// When `Hyperbee.get_root` fails
60    pub async fn get(&self, key: &[u8]) -> Result<Option<(u64, Option<Vec<u8>>)>, HyperbeeError> {
61        self.tree.read().await.get(key).await
62    }
63
64    /// Insert the given key and value into the tree
65    /// Returs the `seq` of the new key, and `Option<u64>` which contains the `seq` of the old key
66    /// if it was replaced.
67    #[tracing::instrument(level = "trace", skip(self), ret)]
68    pub async fn put(
69        &self,
70        key: &[u8],
71        value: Option<&[u8]>,
72    ) -> Result<(Option<u64>, u64), HyperbeeError> {
73        self.tree.read().await.put(key, value).await
74    }
75
76    /// Like [`Hyperbee::put`] but takes a `compare_and_swap` function.
77    /// The `compared_and_swap` function is called with the old key (if present), and the new key.
78    /// The new key is only inserted if `compare_and_swap` returns true.
79    /// Returs two `Option<u64>`s. The first is the old key, the second is the new key.
80    pub async fn put_compare_and_swap(
81        &self,
82        key: &[u8],
83        value: Option<&[u8]>,
84        cas: impl FnOnce(Option<&KeyValueData>, &KeyValueData) -> bool,
85    ) -> Result<(Option<u64>, Option<u64>), HyperbeeError> {
86        self.tree
87            .read()
88            .await
89            .put_compare_and_swap(key, value, cas)
90            .await
91    }
92
93    /// Delete the given key from the tree.
94    /// Returns the `seq` from the key if it was deleted.
95    pub async fn del(&self, key: &[u8]) -> Result<Option<u64>, HyperbeeError> {
96        self.tree.read().await.del(key).await
97    }
98
99    /// Like [`Hyperbee::del`] but takes a `compare_and_swap` function.
100    /// Before deleting the function is called with existing key's [`KeyValueData`].
101    /// The key is only deleted if `compare_and_swap` returs true.
102    /// Returns the `bool` representing the result of `compare_and_swap`, and the `seq` for the
103    /// key.
104    pub async fn del_compare_and_swap(
105        &self,
106        key: &[u8],
107        cas: impl FnOnce(&KeyValueData) -> bool,
108    ) -> Result<Option<(bool, u64)>, HyperbeeError> {
109        self.tree.read().await.del_compare_and_swap(key, cas).await
110    }
111
112    /// Create a new tree with all it's operation's prefixed by the provided `prefix`.
113    pub fn sub(&self, prefix: &[u8], config: PrefixedConfig) -> Prefixed {
114        Prefixed::new(prefix, self.tree.clone(), config)
115    }
116
117    /// Traverse the tree based on the given [`TraverseConfig`]
118    pub async fn traverse<'a>(
119        &self,
120        conf: TraverseConfig,
121    ) -> Result<impl Stream<Item = KeyDataResult> + 'a, HyperbeeError> {
122        self.tree.read().await.traverse(conf).await
123    }
124
125    /// Create a [`Hyperbee`] from a storage directory
126    /// # Panics
127    /// when storage path is incorrect
128    /// when Hypercore fails to build
129    /// when Blocks fails to build
130    ///
131    /// # Errors
132    /// when Hyperbee fails to build
133    pub async fn from_storage_dir<T: AsRef<Path>>(
134        path_to_storage_dir: T,
135    ) -> Result<Hyperbee, HyperbeeError> {
136        Self::from_tree(Tree::from_storage_dir(path_to_storage_dir).await?)
137    }
138
139    /// Create a [`Hyperbee`] in RAM
140    pub async fn from_ram() -> Result<Hyperbee, HyperbeeError> {
141        Self::from_tree(Tree::from_ram().await?)
142    }
143
144    /// Helper for creating a [`Hyperbee`] from a [`Hypercore`]
145    pub fn from_hypercore(hypercore: Hypercore) -> Result<Self, HyperbeeError> {
146        Self::from_tree(Tree::from_hypercore(hypercore)?)
147    }
148
149    fn from_tree(tree: Tree) -> Result<Self, HyperbeeError> {
150        Ok(HyperbeeBuilder::default()
151            .tree(Arc::new(RwLock::new(tree)))
152            .build()?)
153    }
154}