1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
//! Implementation of [`Prefixed`] a "sub" [`Hyperbee`](crate::Hyperbee) used for grouping data.

use derive_builder::Builder;
use futures_lite::{Stream, StreamExt};

use crate::{
    error::HyperbeeError,
    traverse::{
        KeyDataResult,
        LimitValue::{Finite, Infinite},
        TraverseConfig,
    },
    CoreMem, KeyValueData, Shared, Tree,
};

pub static DEFAULT_PREFIXED_SEPERATOR: &[u8; 1] = b"\0";
#[derive(Builder, Debug, Clone)]
#[builder(derive(Debug))]
pub struct PrefixedConfig {
    #[builder(default = "DEFAULT_PREFIXED_SEPERATOR.to_vec()")]
    /// The seperator between the prefix and the key. The default is the NULL byte `b"\0"` which is
    /// the same as the [JavaScript
    /// implementation](https://docs.pears.com/building-blocks/hyperbee#const-sub-db.sub-sub-prefix-options).
    pub seperator: Vec<u8>,
}
impl Default for PrefixedConfig {
    fn default() -> Self {
        Self {
            seperator: DEFAULT_PREFIXED_SEPERATOR.to_vec(),
        }
    }
}

/// A "sub" [`Hyperbee`](crate::Hyperbee), which can be used for grouping data. [`get`](Self::get), [`put`](Self::put), [`del`](Self::del), [`traverse`](Self::traverse) operations are automatically prefixed
/// with [`Prefixed::prefix`] + [`PrefixedConfig::seperator`] where appropriate.
pub struct Prefixed<M: CoreMem> {
    /// All keys inserted with [`Prefixed::put`] are prefixed with this value
    pub prefix: Vec<u8>,
    tree: Shared<Tree<M>>,
    conf: PrefixedConfig,
}

// We use this to DRY the code for getting a prefixed key.
// The prefixed key is a slice, so we can't build it within a func and return it
// (because we cant return a reference to a value made within a function)
macro_rules! with_key_prefix {
    ($self:ident, $key:expr) => {
        &[&$self.prefix, &$self.conf.seperator, $key].concat()
    };
}
impl<M: CoreMem> Prefixed<M> {
    pub(crate) fn new(prefix: &[u8], tree: Shared<Tree<M>>, conf: PrefixedConfig) -> Self {
        Self {
            prefix: prefix.to_vec(),
            tree,
            conf,
        }
    }

    /// Get the value corresponding to the provided `key` from the Hyperbee
    /// # Errors
    /// When `Hyperbee.get_root` fails
    pub async fn get(&self, key: &[u8]) -> Result<Option<(u64, Option<Vec<u8>>)>, HyperbeeError> {
        let prefixed_key: &[u8] = with_key_prefix!(self, key);
        self.tree.read().await.get(prefixed_key).await
    }

    /// Insert the given key and value into the tree
    /// Returs the `seq` of the new key, and `Option<u64>` which contains the `seq` of the old key
    /// if it was replaced.
    #[tracing::instrument(level = "trace", skip(self), ret)]
    pub async fn put(
        &self,
        key: &[u8],
        value: Option<&[u8]>,
    ) -> Result<(Option<u64>, u64), HyperbeeError> {
        let prefixed_key: &[u8] = with_key_prefix!(self, key);
        self.tree.read().await.put(prefixed_key, value).await
    }

    /// Like [`Prefixed::put`] but takes a `compare_and_swap` function.
    /// The `compared_and_swap` function is called with the old key (if present), and the new key.
    /// The new key is only inserted if `compare_and_swap` returns true.
    /// Returs two `Option<u64>`s. The first is the old key, the second is the new key.
    pub async fn put_compare_and_swap(
        &self,
        key: &[u8],
        value: Option<&[u8]>,
        cas: impl FnOnce(Option<&KeyValueData>, &KeyValueData) -> bool,
    ) -> Result<(Option<u64>, Option<u64>), HyperbeeError> {
        let prefixed_key: &[u8] = with_key_prefix!(self, key);
        self.tree
            .read()
            .await
            .put_compare_and_swap(prefixed_key, value, cas)
            .await
    }

    /// Delete the given key from the tree
    /// Returns the `seq` from the key if it was deleted.
    pub async fn del(&self, key: &[u8]) -> Result<Option<u64>, HyperbeeError> {
        let prefixed_key: &[u8] = with_key_prefix!(self, key);
        self.tree.read().await.del(prefixed_key).await
    }

    /// Like [`Prefixed::del`] but takes a `compare_and_swap` function.
    /// Before deleting the function is called with existing key's [`KeyValueData`].
    /// The key is only deleted if `compare_and_swap` returs true.
    /// Returns the `bool` representing the result of `compare_and_swap`, and the `seq` for the
    /// key.
    pub async fn del_compare_and_swap(
        &self,
        key: &[u8],
        cas: impl FnOnce(&KeyValueData) -> bool,
    ) -> Result<Option<(bool, u64)>, HyperbeeError> {
        let prefixed_key: &[u8] = with_key_prefix!(self, key);
        self.tree
            .read()
            .await
            .del_compare_and_swap(prefixed_key, cas)
            .await
    }

    /// Travese prefixed keys. If you provide [`TraverseConfig::min_value`] or
    /// [`TraverseConfig::max_value`] it should not include the prefix.
    /// Note that the key that is yielded has [`Self::prefix`] + [`PrefixedConfig::seperator`]
    /// stripped (which is the same behavior as JavaScript [Hyperbee's](https://docs.pears.com/building-blocks/hyperbee) method [`.sub`](https://docs.pears.com/building-blocks/hyperbee#const-sub-db.sub-sub-prefix-options)
    pub async fn traverse<'a>(
        &self,
        conf: &TraverseConfig,
    ) -> Result<impl Stream<Item = KeyDataResult> + 'a, HyperbeeError>
    where
        M: 'a,
    {
        let end_of_prefix = increment_bytes(&self.prefix);

        let (min_value, min_inclusive) = match &conf.min_value {
            Infinite(_) => (Finite(self.prefix.clone()), true),
            Finite(key) => (
                Finite(with_key_prefix!(self, key.as_slice()).to_vec()),
                conf.min_inclusive,
            ),
        };

        let (max_value, max_inclusive) = match &conf.max_value {
            // inclusive = false because we don't want to include end_of_prefix if it is a key
            Infinite(_) => (Finite(end_of_prefix.clone()), false),
            Finite(key) => (
                Finite(with_key_prefix!(self, key.as_slice()).to_vec()),
                conf.max_inclusive,
            ),
        };

        let bounded_conf = TraverseConfig {
            min_value,
            min_inclusive,
            max_value,
            max_inclusive,
            reversed: conf.reversed,
        };

        let stream = self.tree.read().await.traverse(bounded_conf).await?;
        let len_drain = self.prefix.len() + self.conf.seperator.len();
        // strip `self.prefix + self.conf.seperator` from the beggining of the key
        Ok(stream.map(move |res| {
            let len_drain = len_drain;
            let stripped_kv = res.map(|mut x| {
                x.key.drain(..len_drain);
                x
            });
            stripped_kv
        }))
    }
}

fn increment_bytes(pref: &[u8]) -> Vec<u8> {
    let len = pref.len();
    if len == 0 {
        return vec![0];
    }

    let mut out = pref.to_vec();

    let last_byte = &mut out[len - 1];

    if *last_byte == u8::MAX {
        out.push(0u8);
        return out;
    }
    *last_byte += 1;
    out
}
#[cfg(test)]
mod test {
    use super::{
        increment_bytes, Finite, PrefixedConfig, PrefixedConfigBuilder, DEFAULT_PREFIXED_SEPERATOR,
    };
    use crate::{
        traverse::{KeyDataResult, TraverseConfig, TraverseConfigBuilder},
        Hyperbee,
    };

    #[test]
    fn prefixed_conf() -> Result<(), Box<dyn std::error::Error>> {
        let p = PrefixedConfigBuilder::default().build()?;
        assert_eq!(p.seperator, b"\0");
        let p = PrefixedConfig::default();
        assert_eq!(p.seperator, b"\0");
        Ok(())
    }

    #[tokio::test]
    async fn test_increment_bytes() -> Result<(), Box<dyn std::error::Error>> {
        let orig: Vec<u8> = "foo".into();
        let res = increment_bytes(&orig);
        assert_eq!(res, vec![102, 111, 112]);

        let orig: Vec<u8> = vec![];
        let res = increment_bytes(&orig);
        assert_eq!(res, vec![0]);

        let orig: Vec<u8> = vec![1, 2, u8::MAX];
        let res = increment_bytes(&orig);
        assert_eq!(res, vec![1, 2, u8::MAX, 0]);
        Ok(())
    }

    #[tokio::test]
    async fn prefix() -> Result<(), Box<dyn std::error::Error>> {
        let hb = Hyperbee::from_ram().await?;

        let prefix = b"my_prefix";
        let key = b"hello";

        let prefixed_hb = hb.sub(prefix, Default::default());

        hb.put(key, Some(b"no prefix")).await?;

        prefixed_hb.put(key, Some(b"with prefix")).await?;

        // regular no prefix
        let Some((_, Some(res))) = hb.get(key).await? else {
            panic!("could not get key")
        };
        assert_eq!(res, b"no prefix");
        // with prefix
        let Some((_, Some(res))) = prefixed_hb.get(key).await? else {
            panic!("could not get key")
        };
        assert_eq!(res, b"with prefix");
        // regular no prefix
        // with prefix
        let manually_prefixed_key = [
            prefix.to_vec(),
            DEFAULT_PREFIXED_SEPERATOR.to_vec(),
            key.to_vec(),
        ]
        .concat();

        // with prefix from regular
        let Some((_, Some(res))) = hb.get(&manually_prefixed_key).await? else {
            panic!("could not get key")
        };
        assert_eq!(res, b"with prefix");

        // reg delete does not delete prefixed
        assert!(hb.del(key).await?.is_some());
        // reg is gone
        assert!(hb.get(key).await?.is_none());
        // prefixed still there, accessible by reg hb
        assert!(hb.get(&manually_prefixed_key).await?.is_some());
        // prefixed hb still gets key
        assert!(prefixed_hb.get(key).await?.is_some());
        // prefixed hb delete works
        assert!(prefixed_hb.del(key).await?.is_some());
        // it's gone now
        assert!(prefixed_hb.get(key).await?.is_none());
        Ok(())
    }

    use futures_lite::Stream;
    use tokio_stream::StreamExt;
    #[tokio::test]
    async fn prefixed_traverse_basic() -> Result<(), Box<dyn std::error::Error>> {
        let hb = Hyperbee::from_ram().await?;

        let prefix = b"p:";

        let prefixed_hb = hb.sub(prefix, Default::default());

        hb.put(b"a", None).await?;
        hb.put(b"b", None).await?;

        prefixed_hb.put(b"c", None).await?;
        prefixed_hb.put(b"a", None).await?;
        prefixed_hb.put(b"b", None).await?;
        prefixed_hb.put(b"e", None).await?;
        prefixed_hb.put(b"f", None).await?;

        hb.put(b"d", None).await?;
        hb.put(b"e", None).await?;
        // prefix + 1
        hb.put(&increment_bytes(prefix), None).await?;

        let mut expected: Vec<Vec<u8>> = vec![b"a", b"b", b"c", b"e", b"f"]
            .into_iter()
            .map(|x| x.to_vec())
            .collect();

        async fn collect(x: impl Stream<Item = KeyDataResult>) -> Vec<Vec<u8>> {
            x.collect::<Vec<KeyDataResult>>()
                .await
                .into_iter()
                .map(|x| x.unwrap().key)
                .collect()
        }

        let stream = prefixed_hb.traverse(&TraverseConfig::default()).await?;
        let res = collect(stream).await;
        assert_eq!(res, expected);

        // with lower bound
        let conf = TraverseConfigBuilder::default()
            .min_value(Finite(b"b".into()))
            .build()?;
        let stream = prefixed_hb.traverse(&conf).await?;
        let res = collect(stream).await;
        assert_eq!(res, expected[1..]);

        // with lower bound exclusive
        let conf = TraverseConfigBuilder::default()
            .min_value(Finite(b"a".into()))
            .min_inclusive(false)
            .build()?;
        let stream = prefixed_hb.traverse(&conf).await?;
        let res = collect(stream).await;
        assert_eq!(res, expected[1..]);

        // with upper bound
        let conf = TraverseConfigBuilder::default()
            .max_value(Finite(b"e".into()))
            .build()?;
        let stream = prefixed_hb.traverse(&conf).await?;
        let res = collect(stream).await;
        assert_eq!(res, expected[..4]);

        // exclusive upper bound
        let conf = TraverseConfigBuilder::default()
            .max_value(Finite(b"f".into()))
            .max_inclusive(false)
            .build()?;
        let stream = prefixed_hb.traverse(&conf).await?;
        let res = collect(stream).await;
        assert_eq!(res, expected[..4]);

        expected.reverse();
        // reversed
        let conf = TraverseConfigBuilder::default().reversed(true).build()?;
        let stream = prefixed_hb.traverse(&conf).await?;
        let res = collect(stream).await;
        assert_eq!(res, expected);

        // with lower bound
        let conf = TraverseConfigBuilder::default()
            .reversed(true)
            .min_value(Finite(b"b".into()))
            .build()?;
        let stream = prefixed_hb.traverse(&conf).await?;
        let res = collect(stream).await;
        assert_eq!(res, expected[..4]);

        // with lower bound exclusive
        let conf = TraverseConfigBuilder::default()
            .reversed(true)
            .min_value(Finite(b"a".into()))
            .min_inclusive(false)
            .build()?;
        let stream = prefixed_hb.traverse(&conf).await?;
        let res = collect(stream).await;
        assert_eq!(res, expected[..4]);

        // with upper bound
        let conf = TraverseConfigBuilder::default()
            .reversed(true)
            .max_value(Finite(b"e".into()))
            .build()?;
        let stream = prefixed_hb.traverse(&conf).await?;
        let res = collect(stream).await;
        assert_eq!(res, expected[1..]);

        // exclusive upper bound
        let conf = TraverseConfigBuilder::default()
            .reversed(true)
            .max_value(Finite(b"f".into()))
            .max_inclusive(false)
            .build()?;
        let stream = prefixed_hb.traverse(&conf).await?;
        let res = collect(stream).await;
        assert_eq!(res, expected[1..]);
        Ok(())
    }
}