pub use sled::{open, Config};
use transaction::TransactionalTree;
#[cfg(feature = "convert")]
pub mod convert;
#[cfg(feature = "key-generating")]
pub mod key_generating;
#[cfg(feature = "search")]
pub mod search;
pub mod transaction;
pub mod custom_serde;
use core::fmt;
use core::iter::{DoubleEndedIterator, Iterator};
use core::ops::{Bound, RangeBounds};
use serde::Serialize;
use sled::{
transaction::{ConflictableTransactionResult, TransactionResult},
IVec, Result,
};
use std::marker::PhantomData;
#[derive(Debug)]
pub struct Tree<K, V> {
inner: sled::Tree,
_key: PhantomData<fn() -> K>,
_value: PhantomData<fn() -> V>,
}
impl<K, V> Clone for Tree<K, V> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
_key: PhantomData,
_value: PhantomData,
}
}
}
pub trait KV: serde::de::DeserializeOwned + Serialize {}
impl<T: serde::de::DeserializeOwned + Serialize> KV for T {}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct CompareAndSwapError<V> {
pub current: Option<V>,
pub proposed: Option<V>,
}
impl<V> fmt::Display for CompareAndSwapError<V> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Compare and swap conflict")
}
}
impl<V: std::fmt::Debug> std::error::Error for CompareAndSwapError<V> {}
impl<K, V> Tree<K, V> {
pub fn open<T: AsRef<str>>(db: &sled::Db, id: T) -> Self {
Self {
inner: db.open_tree(id.as_ref()).unwrap(),
_key: PhantomData,
_value: PhantomData,
}
}
pub fn insert(&self, key: &K, value: &V) -> Result<Option<V>>
where
K: KV,
V: KV,
{
self.inner
.insert(serialize(key), serialize(value))
.map(|opt| opt.map(|old_value| deserialize(&old_value)))
}
pub fn transaction<F, A, E>(&self, f: F) -> TransactionResult<A, E>
where
F: Fn(&TransactionalTree<K, V>) -> ConflictableTransactionResult<A, E>,
{
self.inner.transaction(|sled_transactional_tree| {
f(&TransactionalTree::new(sled_transactional_tree))
})
}
pub fn apply_batch(&self, batch: Batch<K, V>) -> Result<()> {
self.inner.apply_batch(batch.inner)
}
pub fn get(&self, key: &K) -> Result<Option<V>>
where
K: KV,
V: KV,
{
self.inner
.get(serialize(key))
.map(|opt| opt.map(|v| deserialize(&v)))
}
pub fn get_from_raw<B: AsRef<[u8]>>(&self, key_bytes: B) -> Result<Option<V>>
where
K: KV,
V: KV,
{
self.inner
.get(key_bytes.as_ref())
.map(|opt| opt.map(|v| deserialize(&v)))
}
pub fn get_kv_from_raw<B: AsRef<[u8]>>(&self, key_bytes: B) -> Result<Option<(K, V)>>
where
K: KV,
V: KV,
{
self.inner
.get(key_bytes.as_ref())
.map(|opt| opt.map(|v| (deserialize(key_bytes.as_ref()), deserialize(&v))))
}
pub fn remove(&self, key: &K) -> Result<Option<V>>
where
K: KV,
V: KV,
{
self.inner
.remove(serialize(key))
.map(|opt| opt.map(|v| deserialize(&v)))
}
pub fn compare_and_swap(
&self,
key: &K,
old: Option<&V>,
new: Option<&V>,
) -> Result<core::result::Result<(), CompareAndSwapError<V>>>
where
K: KV,
V: KV,
{
self.inner
.compare_and_swap(
serialize(key),
old.map(|old| serialize(old)),
new.map(|new| serialize(new)),
)
.map(|cas_res| {
cas_res.map_err(|cas_err| CompareAndSwapError {
current: cas_err.current.as_ref().map(|b| deserialize(b)),
proposed: cas_err.proposed.as_ref().map(|b| deserialize(b)),
})
})
}
pub fn update_and_fetch<F>(&self, key: &K, mut f: F) -> Result<Option<V>>
where
K: KV,
V: KV,
F: FnMut(Option<V>) -> Option<V>,
{
self.inner
.update_and_fetch(serialize(&key), |opt_value| {
f(opt_value.map(|v| deserialize(v))).map(|v| serialize(&v))
})
.map(|res| res.map(|v| deserialize(&v)))
}
pub fn fetch_and_update<F>(&self, key: &K, mut f: F) -> Result<Option<V>>
where
K: KV,
V: KV,
F: FnMut(Option<V>) -> Option<V>,
{
self.inner
.fetch_and_update(serialize(key), |opt_value| {
f(opt_value.map(|v| deserialize(v))).map(|v| serialize(&v))
})
.map(|res| res.map(|v| deserialize(&v)))
}
pub fn watch_prefix(&self, prefix: &K) -> Subscriber<K, V>
where
K: KV,
{
Subscriber::from_sled(self.inner.watch_prefix(serialize(prefix)))
}
pub fn watch_all(&self) -> Subscriber<K, V>
where
K: KV,
{
Subscriber::from_sled(self.inner.watch_prefix(vec![]))
}
pub fn flush(&self) -> Result<usize> {
self.inner.flush()
}
pub async fn flush_async(&self) -> Result<usize> {
self.inner.flush_async().await
}
pub fn contains_key(&self, key: &K) -> Result<bool>
where
K: KV,
{
self.inner.contains_key(serialize(key))
}
pub fn get_lt(&self, key: &K) -> Result<Option<(K, V)>>
where
K: KV,
V: KV,
{
self.inner
.get_lt(serialize(key))
.map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
}
pub fn get_gt(&self, key: &K) -> Result<Option<(K, V)>>
where
K: KV,
V: KV,
{
self.inner
.get_gt(serialize(key))
.map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
}
pub fn merge(&self, key: &K, value: &V) -> Result<Option<V>>
where
K: KV,
V: KV,
{
self.inner
.merge(serialize(key), serialize(value))
.map(|res| res.map(|old_v| deserialize(&old_v)))
}
pub fn set_merge_operator(&self, merge_operator: impl MergeOperator<K, V> + 'static)
where
K: KV,
V: KV,
{
self.inner
.set_merge_operator(move |key: &[u8], old_v: Option<&[u8]>, value: &[u8]| {
let opt_v = merge_operator(
deserialize(key),
old_v.map(|v| deserialize(v)),
deserialize(value),
);
opt_v.map(|v| serialize(&v))
});
}
pub fn iter(&self) -> Iter<K, V> {
Iter::from_sled(self.inner.iter())
}
pub fn range<R: RangeBounds<K>>(&self, range: R) -> Iter<K, V>
where
K: KV + std::fmt::Debug,
{
match (range.start_bound(), range.end_bound()) {
(Bound::Unbounded, Bound::Unbounded) => {
Iter::from_sled(self.inner.range::<&[u8], _>(..))
}
(Bound::Unbounded, Bound::Excluded(b)) => {
Iter::from_sled(self.inner.range(..serialize(b)))
}
(Bound::Unbounded, Bound::Included(b)) => {
Iter::from_sled(self.inner.range(..=serialize(b)))
}
(Bound::Excluded(b), Bound::Unbounded) => {
Iter::from_sled(self.inner.range(serialize(b)..))
}
(Bound::Excluded(b), Bound::Excluded(bb)) => {
Iter::from_sled(self.inner.range(serialize(b)..serialize(bb)))
}
(Bound::Excluded(b), Bound::Included(bb)) => {
Iter::from_sled(self.inner.range(serialize(b)..=serialize(bb)))
}
(Bound::Included(b), Bound::Unbounded) => {
Iter::from_sled(self.inner.range(serialize(b)..))
}
(Bound::Included(b), Bound::Excluded(bb)) => {
Iter::from_sled(self.inner.range(serialize(b)..serialize(bb)))
}
(Bound::Included(b), Bound::Included(bb)) => {
Iter::from_sled(self.inner.range(serialize(b)..=serialize(bb)))
}
}
}
pub fn scan_prefix(&self, prefix: &K) -> Iter<K, V>
where
K: KV,
{
Iter::from_sled(self.inner.scan_prefix(serialize(prefix)))
}
pub fn first(&self) -> Result<Option<(K, V)>>
where
K: KV,
V: KV,
{
self.inner
.first()
.map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
}
pub fn last(&self) -> Result<Option<(K, V)>>
where
K: KV,
V: KV,
{
self.inner
.last()
.map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
}
pub fn pop_max(&self) -> Result<Option<(K, V)>>
where
K: KV,
V: KV,
{
self.inner
.pop_max()
.map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
}
pub fn pop_min(&self) -> Result<Option<(K, V)>>
where
K: KV,
V: KV,
{
self.inner
.pop_min()
.map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
}
pub fn len(&self) -> usize {
self.inner.len()
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
pub fn clear(&self) -> Result<()> {
self.inner.clear()
}
pub fn name(&self) -> IVec {
self.inner.name()
}
pub fn checksum(&self) -> Result<u32> {
self.inner.checksum()
}
}
pub trait MergeOperator<K, V>: Fn(K, Option<V>, V) -> Option<V> {}
impl<K, V, F> MergeOperator<K, V> for F where F: Fn(K, Option<V>, V) -> Option<V> {}
pub struct Iter<K, V> {
inner: sled::Iter,
_key: PhantomData<fn() -> K>,
_value: PhantomData<fn() -> V>,
}
impl<K: KV, V: KV> Iterator for Iter<K, V> {
type Item = Result<(K, V)>;
fn next(&mut self) -> Option<Self::Item> {
self.inner
.next()
.map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
}
fn last(mut self) -> Option<Self::Item> {
self.inner
.next_back()
.map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
}
}
impl<K: KV, V: KV> DoubleEndedIterator for Iter<K, V> {
fn next_back(&mut self) -> Option<Self::Item> {
self.inner
.next_back()
.map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
}
}
impl<K, V> Iter<K, V> {
pub fn from_sled(iter: sled::Iter) -> Self {
Iter {
inner: iter,
_key: PhantomData,
_value: PhantomData,
}
}
pub fn keys(self) -> impl DoubleEndedIterator<Item = Result<K>> + Send + Sync
where
K: KV + Send + Sync,
V: KV + Send + Sync,
{
self.map(|r| r.map(|(k, _v)| k))
}
pub fn values(self) -> impl DoubleEndedIterator<Item = Result<V>> + Send + Sync
where
K: KV + Send + Sync,
V: KV + Send + Sync,
{
self.map(|r| r.map(|(_k, v)| v))
}
}
#[derive(Clone, Debug)]
pub struct Batch<K, V> {
inner: sled::Batch,
_key: PhantomData<fn() -> K>,
_value: PhantomData<fn() -> V>,
}
impl<K, V> Batch<K, V> {
pub fn insert(&mut self, key: &K, value: &V)
where
K: KV,
V: KV,
{
self.inner.insert(serialize(key), serialize(value));
}
pub fn remove(&mut self, key: &K)
where
K: KV,
{
self.inner.remove(serialize(key))
}
}
impl<K, V> Default for Batch<K, V> {
fn default() -> Self {
Self {
inner: Default::default(),
_key: PhantomData,
_value: PhantomData,
}
}
}
use pin_project::pin_project;
#[pin_project]
pub struct Subscriber<K, V> {
#[pin]
inner: sled::Subscriber,
_key: PhantomData<fn() -> K>,
_value: PhantomData<fn() -> V>,
}
impl<K, V> Subscriber<K, V> {
pub fn next_timeout(
&mut self,
timeout: core::time::Duration,
) -> core::result::Result<Event<K, V>, std::sync::mpsc::RecvTimeoutError>
where
K: KV,
V: KV,
{
self.inner
.next_timeout(timeout)
.map(|e| Event::from_sled(&e))
}
pub fn from_sled(subscriber: sled::Subscriber) -> Self {
Self {
inner: subscriber,
_key: PhantomData,
_value: PhantomData,
}
}
}
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
impl<K: KV + Unpin, V: KV + Unpin> Future for Subscriber<K, V> {
type Output = Option<Event<K, V>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project()
.inner
.poll(cx)
.map(|opt| opt.map(|e| Event::from_sled(&e)))
}
}
impl<K: KV, V: KV> Iterator for Subscriber<K, V> {
type Item = Event<K, V>;
fn next(&mut self) -> Option<Event<K, V>> {
self.inner.next().map(|e| Event::from_sled(&e))
}
}
pub enum Event<K, V> {
Insert { key: K, value: V },
Remove { key: K },
}
impl<K, V> Event<K, V> {
pub fn key(&self) -> &K
where
K: KV,
{
match self {
Self::Insert { key, .. } | Self::Remove { key } => key,
}
}
pub fn from_sled(event: &sled::Event) -> Self
where
K: KV,
V: KV,
{
match event {
sled::Event::Insert { key, value } => Self::Insert {
key: deserialize(key),
value: deserialize(value),
},
sled::Event::Remove { key } => Self::Remove {
key: deserialize(key),
},
}
}
}
pub fn deserialize<'a, T>(bytes: &'a [u8]) -> T
where
T: serde::de::Deserialize<'a>,
{
bincode::deserialize(bytes).expect("deserialization failed, did the type serialized change?")
}
pub fn serialize<T>(value: &T) -> Vec<u8>
where
T: serde::Serialize,
{
bincode::serialize(value).expect("serialization failed, did the type serialized change?")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_range() {
let config = sled::Config::new().temporary(true);
let db = config.open().unwrap();
let tree: Tree<u32, u32> = Tree::open(&db, "test_tree");
tree.insert(&1, &2).unwrap();
tree.insert(&3, &4).unwrap();
tree.insert(&6, &2).unwrap();
tree.insert(&10, &2).unwrap();
tree.insert(&15, &2).unwrap();
tree.flush().unwrap();
let expect_results = [(6, 2), (10, 2)];
for (i, result) in tree.range(6..11).enumerate() {
assert_eq!(result.unwrap(), expect_results[i]);
}
}
#[test]
fn test_cas() {
let config = sled::Config::new().temporary(true);
let db = config.open().unwrap();
let tree: Tree<u32, u32> = Tree::open(&db, "test_tree");
let current = 2;
tree.insert(&1, ¤t).unwrap();
let expected = 3;
let proposed = 4;
let res = tree
.compare_and_swap(&1, Some(&expected), Some(&proposed))
.expect("db failure");
assert_eq!(
res,
Err(CompareAndSwapError {
current: Some(current),
proposed: Some(proposed),
}),
);
}
}