use derive_builder::Builder;
use futures_lite::{Stream, StreamExt};
use crate::{
error::HyperbeeError,
traverse::{
KeyDataResult,
LimitValue::{Finite, Infinite},
TraverseConfig,
},
CoreMem, KeyValueData, Shared, Tree,
};
pub static DEFAULT_PREFIXED_SEPERATOR: &[u8; 1] = b"\0";
#[derive(Builder, Debug, Clone)]
#[builder(derive(Debug))]
pub struct PrefixedConfig {
#[builder(default = "DEFAULT_PREFIXED_SEPERATOR.to_vec()")]
pub seperator: Vec<u8>,
}
impl Default for PrefixedConfig {
fn default() -> Self {
Self {
seperator: DEFAULT_PREFIXED_SEPERATOR.to_vec(),
}
}
}
pub struct Prefixed<M: CoreMem> {
pub prefix: Vec<u8>,
tree: Shared<Tree<M>>,
conf: PrefixedConfig,
}
macro_rules! with_key_prefix {
($self:ident, $key:expr) => {
&[&$self.prefix, &$self.conf.seperator, $key].concat()
};
}
impl<M: CoreMem> Prefixed<M> {
pub(crate) fn new(prefix: &[u8], tree: Shared<Tree<M>>, conf: PrefixedConfig) -> Self {
Self {
prefix: prefix.to_vec(),
tree,
conf,
}
}
pub async fn get(&self, key: &[u8]) -> Result<Option<(u64, Option<Vec<u8>>)>, HyperbeeError> {
let prefixed_key: &[u8] = with_key_prefix!(self, key);
self.tree.read().await.get(prefixed_key).await
}
#[tracing::instrument(level = "trace", skip(self), ret)]
pub async fn put(
&self,
key: &[u8],
value: Option<&[u8]>,
) -> Result<(Option<u64>, u64), HyperbeeError> {
let prefixed_key: &[u8] = with_key_prefix!(self, key);
self.tree.read().await.put(prefixed_key, value).await
}
pub async fn put_compare_and_swap(
&self,
key: &[u8],
value: Option<&[u8]>,
cas: impl FnOnce(Option<&KeyValueData>, &KeyValueData) -> bool,
) -> Result<(Option<u64>, Option<u64>), HyperbeeError> {
let prefixed_key: &[u8] = with_key_prefix!(self, key);
self.tree
.read()
.await
.put_compare_and_swap(prefixed_key, value, cas)
.await
}
pub async fn del(&self, key: &[u8]) -> Result<Option<u64>, HyperbeeError> {
let prefixed_key: &[u8] = with_key_prefix!(self, key);
self.tree.read().await.del(prefixed_key).await
}
pub async fn del_compare_and_swap(
&self,
key: &[u8],
cas: impl FnOnce(&KeyValueData) -> bool,
) -> Result<Option<(bool, u64)>, HyperbeeError> {
let prefixed_key: &[u8] = with_key_prefix!(self, key);
self.tree
.read()
.await
.del_compare_and_swap(prefixed_key, cas)
.await
}
pub async fn traverse<'a>(
&self,
conf: &TraverseConfig,
) -> Result<impl Stream<Item = KeyDataResult> + 'a, HyperbeeError>
where
M: 'a,
{
let end_of_prefix = increment_bytes(&self.prefix);
let (min_value, min_inclusive) = match &conf.min_value {
Infinite(_) => (Finite(self.prefix.clone()), true),
Finite(key) => (
Finite(with_key_prefix!(self, key.as_slice()).to_vec()),
conf.min_inclusive,
),
};
let (max_value, max_inclusive) = match &conf.max_value {
Infinite(_) => (Finite(end_of_prefix.clone()), false),
Finite(key) => (
Finite(with_key_prefix!(self, key.as_slice()).to_vec()),
conf.max_inclusive,
),
};
let bounded_conf = TraverseConfig {
min_value,
min_inclusive,
max_value,
max_inclusive,
reversed: conf.reversed,
};
let stream = self.tree.read().await.traverse(bounded_conf).await?;
let len_drain = self.prefix.len() + self.conf.seperator.len();
Ok(stream.map(move |res| {
let len_drain = len_drain;
let stripped_kv = res.map(|mut x| {
x.key.drain(..len_drain);
x
});
stripped_kv
}))
}
}
fn increment_bytes(pref: &[u8]) -> Vec<u8> {
let len = pref.len();
if len == 0 {
return vec![0];
}
let mut out = pref.to_vec();
let last_byte = &mut out[len - 1];
if *last_byte == u8::MAX {
out.push(0u8);
return out;
}
*last_byte += 1;
out
}
#[cfg(test)]
mod test {
use super::{
increment_bytes, Finite, PrefixedConfig, PrefixedConfigBuilder, DEFAULT_PREFIXED_SEPERATOR,
};
use crate::{
traverse::{KeyDataResult, TraverseConfig, TraverseConfigBuilder},
Hyperbee,
};
#[test]
fn prefixed_conf() -> Result<(), Box<dyn std::error::Error>> {
let p = PrefixedConfigBuilder::default().build()?;
assert_eq!(p.seperator, b"\0");
let p = PrefixedConfig::default();
assert_eq!(p.seperator, b"\0");
Ok(())
}
#[tokio::test]
async fn test_increment_bytes() -> Result<(), Box<dyn std::error::Error>> {
let orig: Vec<u8> = "foo".into();
let res = increment_bytes(&orig);
assert_eq!(res, vec![102, 111, 112]);
let orig: Vec<u8> = vec![];
let res = increment_bytes(&orig);
assert_eq!(res, vec![0]);
let orig: Vec<u8> = vec![1, 2, u8::MAX];
let res = increment_bytes(&orig);
assert_eq!(res, vec![1, 2, u8::MAX, 0]);
Ok(())
}
#[tokio::test]
async fn prefix() -> Result<(), Box<dyn std::error::Error>> {
let hb = Hyperbee::from_ram().await?;
let prefix = b"my_prefix";
let key = b"hello";
let prefixed_hb = hb.sub(prefix, Default::default());
hb.put(key, Some(b"no prefix")).await?;
prefixed_hb.put(key, Some(b"with prefix")).await?;
let Some((_, Some(res))) = hb.get(key).await? else {
panic!("could not get key")
};
assert_eq!(res, b"no prefix");
let Some((_, Some(res))) = prefixed_hb.get(key).await? else {
panic!("could not get key")
};
assert_eq!(res, b"with prefix");
let manually_prefixed_key = [
prefix.to_vec(),
DEFAULT_PREFIXED_SEPERATOR.to_vec(),
key.to_vec(),
]
.concat();
let Some((_, Some(res))) = hb.get(&manually_prefixed_key).await? else {
panic!("could not get key")
};
assert_eq!(res, b"with prefix");
assert!(hb.del(key).await?.is_some());
assert!(hb.get(key).await?.is_none());
assert!(hb.get(&manually_prefixed_key).await?.is_some());
assert!(prefixed_hb.get(key).await?.is_some());
assert!(prefixed_hb.del(key).await?.is_some());
assert!(prefixed_hb.get(key).await?.is_none());
Ok(())
}
use futures_lite::Stream;
use tokio_stream::StreamExt;
#[tokio::test]
async fn prefixed_traverse_basic() -> Result<(), Box<dyn std::error::Error>> {
let hb = Hyperbee::from_ram().await?;
let prefix = b"p:";
let prefixed_hb = hb.sub(prefix, Default::default());
hb.put(b"a", None).await?;
hb.put(b"b", None).await?;
prefixed_hb.put(b"c", None).await?;
prefixed_hb.put(b"a", None).await?;
prefixed_hb.put(b"b", None).await?;
prefixed_hb.put(b"e", None).await?;
prefixed_hb.put(b"f", None).await?;
hb.put(b"d", None).await?;
hb.put(b"e", None).await?;
hb.put(&increment_bytes(prefix), None).await?;
let mut expected: Vec<Vec<u8>> = vec![b"a", b"b", b"c", b"e", b"f"]
.into_iter()
.map(|x| x.to_vec())
.collect();
async fn collect(x: impl Stream<Item = KeyDataResult>) -> Vec<Vec<u8>> {
x.collect::<Vec<KeyDataResult>>()
.await
.into_iter()
.map(|x| x.unwrap().key)
.collect()
}
let stream = prefixed_hb.traverse(&TraverseConfig::default()).await?;
let res = collect(stream).await;
assert_eq!(res, expected);
let conf = TraverseConfigBuilder::default()
.min_value(Finite(b"b".into()))
.build()?;
let stream = prefixed_hb.traverse(&conf).await?;
let res = collect(stream).await;
assert_eq!(res, expected[1..]);
let conf = TraverseConfigBuilder::default()
.min_value(Finite(b"a".into()))
.min_inclusive(false)
.build()?;
let stream = prefixed_hb.traverse(&conf).await?;
let res = collect(stream).await;
assert_eq!(res, expected[1..]);
let conf = TraverseConfigBuilder::default()
.max_value(Finite(b"e".into()))
.build()?;
let stream = prefixed_hb.traverse(&conf).await?;
let res = collect(stream).await;
assert_eq!(res, expected[..4]);
let conf = TraverseConfigBuilder::default()
.max_value(Finite(b"f".into()))
.max_inclusive(false)
.build()?;
let stream = prefixed_hb.traverse(&conf).await?;
let res = collect(stream).await;
assert_eq!(res, expected[..4]);
expected.reverse();
let conf = TraverseConfigBuilder::default().reversed(true).build()?;
let stream = prefixed_hb.traverse(&conf).await?;
let res = collect(stream).await;
assert_eq!(res, expected);
let conf = TraverseConfigBuilder::default()
.reversed(true)
.min_value(Finite(b"b".into()))
.build()?;
let stream = prefixed_hb.traverse(&conf).await?;
let res = collect(stream).await;
assert_eq!(res, expected[..4]);
let conf = TraverseConfigBuilder::default()
.reversed(true)
.min_value(Finite(b"a".into()))
.min_inclusive(false)
.build()?;
let stream = prefixed_hb.traverse(&conf).await?;
let res = collect(stream).await;
assert_eq!(res, expected[..4]);
let conf = TraverseConfigBuilder::default()
.reversed(true)
.max_value(Finite(b"e".into()))
.build()?;
let stream = prefixed_hb.traverse(&conf).await?;
let res = collect(stream).await;
assert_eq!(res, expected[1..]);
let conf = TraverseConfigBuilder::default()
.reversed(true)
.max_value(Finite(b"f".into()))
.max_inclusive(false)
.build()?;
let stream = prefixed_hb.traverse(&conf).await?;
let res = collect(stream).await;
assert_eq!(res, expected[1..]);
Ok(())
}
}