hyperbee/
lib.rs

1//! Rust version of [hyperbee](https://github.com/holepunchto/hyperbee)
2//! A [B-tree](https://en.wikipedia.org/wiki/B-tree) built on top of [Hypercore](https://docs.pears.com/building-blocks/hypercore).
3
4mod messages {
5    include!(concat!(env!("OUT_DIR"), "/_.rs"));
6}
7mod blocks;
8mod changes;
9mod del;
10mod error;
11#[cfg(feature = "ffi")]
12pub mod ffi;
13mod hb;
14mod keys;
15pub mod prefixed;
16mod put;
17mod test;
18pub mod traverse;
19mod tree;
20
21#[cfg(feature = "clib")]
22mod external;
23
24use std::{
25    fmt::Debug,
26    ops::{Deref, Range},
27    sync::Arc,
28};
29
30use tokio::sync::RwLock;
31use tracing::trace;
32
33use blocks::Blocks;
34use messages::yolo_index;
35
36use tree::Tree;
37
38pub use error::HyperbeeError;
39pub use hb::Hyperbee;
40pub use messages::header::Metadata;
41
42type Shared<T> = Arc<RwLock<T>>;
43type SharedNode = Shared<Node>;
44type NodePath = Vec<(SharedNode, usize)>;
45
46/// Same value as JS hyperbee https://github.com/holepunchto/hyperbee/blob/e1b398f5afef707b73e62f575f2b166bcef1fa34/index.js#L663
47static PROTOCOL: &str = "hyperbee";
48/// Same value as JS hyperbee https://github.com/holepunchto/hyperbee/blob/e1b398f5afef707b73e62f575f2b166bcef1fa34/index.js#L16-L18
49static MAX_KEYS: usize = 8;
50
51fn min_keys(max_keys: usize) -> usize {
52    max_keys >> 1
53}
54
55#[derive(Clone, Debug)]
56/// Reference used within a [`Node`] of the [Hypercore](hypercore::Hypercore) block where a
57/// key-value  pair is stored.
58struct KeyValue {
59    /// Index of key value pair within the [`hypercore::Hypercore`].
60    seq: u64,
61}
62
63impl KeyValue {
64    fn new(seq: u64) -> Self {
65        KeyValue { seq }
66    }
67}
68
69#[cfg_attr(feature = "ffi", derive(uniffi::Record))]
70#[derive(Clone, Debug)]
71/// Data related to a key value pair within the [`Hyperbee`].
72pub struct KeyValueData {
73    /// The index of the block within the [`Hypercore`](hypercore::Hypercore) where this data is stored.
74    pub seq: u64,
75    /// The key. The data by which the [`Hyperbee`] is ordered.
76    pub key: Vec<u8>,
77    /// The value.
78    pub value: Option<Vec<u8>>,
79}
80
81#[derive(Clone, Debug)]
82/// Pointer used within a [`Node`] to reference to it's child nodes.
83struct Child {
84    /// Index of the [`BlockEntry`] within the [`hypercore::Hypercore`] that contains the [`Node`]
85    pub seq: u64,
86    /// Index of the `Node` within the [`BlockEntry`] referenced by [`Child::seq`]
87    pub offset: u64,
88}
89
90impl Child {
91    fn new(seq: u64, offset: u64) -> Self {
92        Child { seq, offset }
93    }
94}
95
96struct Children {
97    blocks: Shared<Blocks>,
98    children: RwLock<Vec<Child>>,
99}
100
101impl Children {
102    fn new(blocks: Shared<Blocks>, children: Vec<Child>) -> Self {
103        Self {
104            blocks,
105            children: RwLock::new(children),
106        }
107    }
108
109    #[tracing::instrument(skip(self, new_children))]
110    async fn insert(&self, index: usize, new_children: Vec<Child>) {
111        if new_children.is_empty() {
112            trace!("no children to insert, do nothing");
113            return;
114        }
115
116        let replace_split_child = match new_children.is_empty() {
117            true => 0,
118            false => 1,
119        };
120        trace!(
121            "replacing child @ [{}] with [{}] children.",
122            index,
123            new_children.len()
124        );
125        self.children
126            .write()
127            .await
128            .splice(index..(index + replace_split_child), new_children);
129    }
130
131    #[tracing::instrument(skip(self))]
132    async fn get_child(&self, index: usize) -> Result<Shared<Node>, HyperbeeError> {
133        let (seq, offset) = {
134            let child_ref = &self.children.read().await[index].clone();
135            (child_ref.seq, child_ref.offset)
136        };
137        let block = self
138            .blocks
139            .read()
140            .await
141            .get(&seq, self.blocks.clone())
142            .await?;
143        let node = block.read().await.get_tree_node(offset)?;
144        Ok(node)
145    }
146
147    async fn len(&self) -> usize {
148        self.children.read().await.len()
149    }
150}
151
152impl Debug for Children {
153    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154        match self.children.try_read() {
155            Ok(children) => {
156                let mut dl = f.debug_list();
157                for child in children.iter() {
158                    dl.entry(&format_args!("({}, {})", child.seq, child.offset));
159                }
160                dl.finish()
161            }
162            Err(_) => write!(f, "<locked>"),
163        }
164    }
165}
166
167macro_rules! wchildren {
168    ($node:expr) => {
169        $node.read().await.children.children.write().await
170    };
171}
172pub(crate) use wchildren;
173
174/// A node of the B-Tree within the [`Hyperbee`]
175struct Node {
176    keys: Vec<KeyValue>,
177    children: Children,
178    blocks: Shared<Blocks>,
179}
180
181impl Node {
182    fn new(keys: Vec<KeyValue>, children: Vec<Child>, blocks: Shared<Blocks>) -> Self {
183        Node {
184            keys,
185            children: Children::new(blocks.clone(), children),
186            blocks,
187        }
188    }
189
190    pub async fn n_children(&self) -> usize {
191        self.children.len().await
192    }
193
194    async fn is_leaf(&self) -> bool {
195        self.n_children().await == 0
196    }
197
198    #[cfg(feature = "debug")]
199    /// The number of children between this node and a leaf + 1
200    pub async fn height(&self) -> Result<usize, HyperbeeError> {
201        if self.is_leaf().await {
202            Ok(1)
203        } else {
204            let mut out = 1;
205            let mut cur_child = self.get_child(0).await?;
206            loop {
207                out += 1;
208                if cur_child.read().await.n_children().await == 0 {
209                    return Ok(out);
210                }
211                let next_child = cur_child.read().await.get_child(0).await?;
212                cur_child = next_child;
213            }
214        }
215    }
216
217    /// Serialize this node
218    async fn to_level(&self) -> yolo_index::Level {
219        let mut children = vec![];
220        for c in self.children.children.read().await.iter() {
221            children.push(c.seq);
222            children.push(c.offset);
223        }
224        yolo_index::Level {
225            keys: self.keys.iter().map(|k| k.seq).collect(),
226            children,
227        }
228    }
229
230    #[tracing::instrument(skip(self))]
231    async fn get_key_value(&self, index: usize) -> Result<KeyValueData, HyperbeeError> {
232        let KeyValue { seq, .. } = self.keys[index];
233        let key = self
234            .blocks
235            .read()
236            .await
237            .get(&seq, self.blocks.clone())
238            .await?
239            .read()
240            .await
241            .key
242            .clone();
243        let value = self
244            .blocks
245            .read()
246            .await
247            .get(&seq, self.blocks.clone())
248            .await?
249            .read()
250            .await
251            .value
252            .clone();
253        Ok(KeyValueData { seq, key, value })
254    }
255
256    /// Get the child at the provided index
257    async fn get_child(&self, index: usize) -> Result<Shared<Node>, HyperbeeError> {
258        self.children.get_child(index).await
259    }
260
261    /// Insert a key and it's children into [`self`].
262    #[tracing::instrument(skip(self, key_ref, children, range))]
263    async fn insert(&mut self, key_ref: KeyValue, children: Vec<Child>, range: Range<usize>) {
264        trace!("inserting [{}] children", children.len());
265        self.keys.splice(range.clone(), vec![key_ref]);
266        self.children.insert(range.start, children).await;
267    }
268}
269
270/// custom debug because the struct is recursive
271impl Debug for Node {
272    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
273        node_debug(self, f)
274    }
275}
276
277fn node_debug<T: Deref<Target = Node>>(
278    node: T,
279    f: &mut std::fmt::Formatter<'_>,
280) -> std::fmt::Result {
281    f.debug_struct("Node")
282        .field(
283            "keys",
284            &format_args!("{:?}", node.keys.iter().map(|k| k.seq).collect::<Vec<_>>()),
285        )
286        .field("children", &node.children)
287        .finish()
288}
289
290#[tracing::instrument(skip(node))]
291/// Find the `key` in the `node` with a binary search
292///
293/// # Returns (`matched`, `index`)
294///
295/// `matched` is Some means we found the `key`.
296///
297/// if `matched` is None:
298///     if `node` is not a leaf:
299///         index of the child within the `node` where the `key` could be
300///     if `node` is a leaf:
301///         the index within this `node`'s keys where the `key` wolud be inserted
302/// if `matched` is Some:
303///     the index within this `node`'s keys of the `key`
304async fn get_index_of_key<T>(
305    node: SharedNode,
306    key: &T,
307) -> Result<(Option<u64>, usize), HyperbeeError>
308where
309    T: PartialOrd<[u8]> + Debug + ?Sized,
310{
311    let child_index: usize = 'found: {
312        // Binary search current node for matching key, or index of next child
313        let n_keys = node.read().await.keys.len();
314        if n_keys == 0 {
315            break 'found n_keys;
316        }
317        let mut low = 0;
318        let mut high = n_keys - 1;
319
320        while low <= high {
321            let mid = low + ((high - low) >> 1);
322            let KeyValueData {
323                seq,
324                key: other_key,
325                ..
326            } = node.read().await.get_key_value(mid).await?;
327
328            // if matching key, we are done!
329            if key == &other_key[..] {
330                trace!(
331                    "key {:?} == other_key {:?} at index {}",
332                    key,
333                    other_key,
334                    mid
335                );
336                return Ok((Some(seq), mid));
337            }
338
339            if key < &other_key[..] {
340                if mid == 0 {
341                    break;
342                }
343                // look lower
344                high = mid - 1;
345            } else {
346                // look higher
347                low = mid + 1;
348            }
349        }
350        break 'found low;
351    };
352    Ok((None, child_index))
353}
354
355/// Descend through tree to the node nearest (or matching) the provided key
356/// Return value describes the path to the key. It looks like:
357/// `(matched, path: Vec<(node, index)>)`
358///
359/// Here `matched` is a bool that indicates if the key was matched.
360/// The `path` is a `Vec` that describes the path to the key. Each item is a tuple `(node, inde)`.
361/// `path[0]` is the root of tree, and the last element would be final node,
362/// which is always a leaf if `matched == false`.
363/// In the `path` the `node` is a referenece to the node we passed through.
364/// The `index` is the child index to the next node in the path.
365/// In a leaf node, the `index` could be thought of as the gap between the node's keys where the provided
366/// `key` would be ineserted. Or for `matched = true` the index of the matched key in the nodes's
367/// keys.
368#[tracing::instrument(skip(node))]
369async fn nearest_node<T>(
370    node: SharedNode,
371    key: &T,
372) -> Result<(Option<u64>, NodePath), HyperbeeError>
373where
374    T: PartialOrd<[u8]> + Debug + ?Sized,
375{
376    let mut current_node = node;
377    let mut out_path: NodePath = vec![];
378    loop {
379        let next_node = {
380            let (matched, child_index) = get_index_of_key(current_node.clone(), key).await?;
381            out_path.push((current_node.clone(), child_index));
382
383            // found match or reached leaf
384            if matched.is_some() || current_node.read().await.is_leaf().await {
385                return Ok((matched, out_path));
386            }
387
388            // continue to next node
389            current_node.read().await.get_child(child_index).await?
390        };
391        current_node = next_node;
392    }
393}
394
395#[cfg(feature = "ffi")]
396mod uniffi_scaffolding {
397    uniffi::setup_scaffolding!();
398}
399#[cfg(feature = "ffi")]
400pub use uniffi_scaffolding::*;