1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
use std::{path::Path, sync::Arc};

use derive_builder::Builder;
use futures_lite::Stream;
use hypercore::AppendOutcome;
use tokio::sync::RwLock;

use crate::{
    error::HyperbeeError,
    messages::header::Metadata,
    prefixed::{Prefixed, PrefixedConfig},
    traverse::{KeyDataResult, TraverseConfig},
    tree, KeyValueData,
};

use super::{tree::Tree, CoreMem, Shared};

/// An append only B-Tree built on [`Hypercore`](hypercore::Hypercore). It provides a key-value
/// store API, with methods for [inserting](Hyperbee::put), [getting](Hyperbee::get), and
/// [deleting](Hyperbee::del) key-value pair. As well as creating [sorted
/// iterators](Hyperbee::traverse), and ["sub" B-Trees](Hyperbee::sub) for grouping related data.
#[derive(Debug, Builder)]
#[builder(pattern = "owned", derive(Debug))]
pub struct Hyperbee<M: CoreMem> {
    tree: Shared<Tree<M>>,
}

impl<M: CoreMem> Hyperbee<M> {
    /// The number of blocks in the hypercore.
    /// The first block is always the header block so:
    /// `version` would be the `seq` of the next block
    /// `version - 1` is most recent block
    pub async fn version(&self) -> u64 {
        self.tree.read().await.version().await
    }

    /// Create the header for the Hyperbee. This must be done before writing anything else to the
    /// tree.
    pub async fn create_header(
        &self,
        metadata: Option<Metadata>,
    ) -> Result<AppendOutcome, HyperbeeError> {
        self.tree.read().await.create_header(metadata).await
    }

    /// The number of levels in the tree
    pub async fn height(&self) -> Result<usize, HyperbeeError> {
        self.tree.read().await.height().await
    }

    /// Returs a string representing the structure of the tree showing the keys in each node
    pub async fn print(&self) -> Result<String, HyperbeeError> {
        self.tree.read().await.print().await
    }

    /// Get the value corresponding to the provided `key` from the Hyperbee
    /// # Errors
    /// When `Hyperbee.get_root` fails
    pub async fn get(&self, key: &[u8]) -> Result<Option<(u64, Option<Vec<u8>>)>, HyperbeeError> {
        self.tree.read().await.get(key).await
    }

    /// Insert the given key and value into the tree
    /// Returs the `seq` of the new key, and `Option<u64>` which contains the `seq` of the old key
    /// if it was replaced.
    #[tracing::instrument(level = "trace", skip(self), ret)]
    pub async fn put(
        &self,
        key: &[u8],
        value: Option<&[u8]>,
    ) -> Result<(Option<u64>, u64), HyperbeeError> {
        self.tree.read().await.put(key, value).await
    }

    /// Like [`Hyperbee::put`] but takes a `compare_and_swap` function.
    /// The `compared_and_swap` function is called with the old key (if present), and the new key.
    /// The new key is only inserted if `compare_and_swap` returns true.
    /// Returs two `Option<u64>`s. The first is the old key, the second is the new key.
    pub async fn put_compare_and_swap(
        &self,
        key: &[u8],
        value: Option<&[u8]>,
        cas: impl FnOnce(Option<&KeyValueData>, &KeyValueData) -> bool,
    ) -> Result<(Option<u64>, Option<u64>), HyperbeeError> {
        self.tree
            .read()
            .await
            .put_compare_and_swap(key, value, cas)
            .await
    }

    /// Delete the given key from the tree.
    /// Returns the `seq` from the key if it was deleted.
    pub async fn del(&self, key: &[u8]) -> Result<Option<u64>, HyperbeeError> {
        self.tree.read().await.del(key).await
    }

    /// Like [`Hyperbee::del`] but takes a `compare_and_swap` function.
    /// Before deleting the function is called with existing key's [`KeyValueData`].
    /// The key is only deleted if `compare_and_swap` returs true.
    /// Returns the `bool` representing the result of `compare_and_swap`, and the `seq` for the
    /// key.
    pub async fn del_compare_and_swap(
        &self,
        key: &[u8],
        cas: impl FnOnce(&KeyValueData) -> bool,
    ) -> Result<Option<(bool, u64)>, HyperbeeError> {
        self.tree.read().await.del_compare_and_swap(key, cas).await
    }

    /// Create a new tree with all it's operation's prefixed by the provided `prefix`.
    pub fn sub(&self, prefix: &[u8], config: PrefixedConfig) -> Prefixed<M> {
        Prefixed::new(prefix, self.tree.clone(), config)
    }

    /// Traverse the tree based on the given [`TraverseConfig`]
    pub async fn traverse<'a>(
        &self,
        conf: TraverseConfig,
    ) -> Result<impl Stream<Item = KeyDataResult> + 'a, HyperbeeError>
    where
        M: 'a,
    {
        self.tree.read().await.traverse(conf).await
    }
}

impl Hyperbee<random_access_disk::RandomAccessDisk> {
    /// Helper for creating a Hyperbee
    /// # Panics
    /// when storage path is incorrect
    /// when Hypercore failse to build
    /// when Blocks fails to build
    ///
    /// # Errors
    /// when Hyperbee fails to build
    pub async fn from_storage_dir<T: AsRef<Path>>(
        path_to_storage_dir: T,
    ) -> Result<Hyperbee<random_access_disk::RandomAccessDisk>, HyperbeeError> {
        let tree = tree::Tree::from_storage_dir(path_to_storage_dir).await?;
        Ok(HyperbeeBuilder::default()
            .tree(Arc::new(RwLock::new(tree)))
            .build()?)
    }
}

impl Hyperbee<random_access_memory::RandomAccessMemory> {
    /// Helper for creating a Hyperbee in RAM
    pub async fn from_ram(
    ) -> Result<Hyperbee<random_access_memory::RandomAccessMemory>, HyperbeeError> {
        let tree = tree::Tree::from_ram().await?;
        Ok(HyperbeeBuilder::default()
            .tree(Arc::new(RwLock::new(tree)))
            .build()?)
    }
}