typed_sled/custom_serde/
mod.rs

1#![allow(clippy::type_complexity)]
2//! Support for custom (de)serialization.
3//!
4//! This module exports the same types as the root module, however the types take
5//! an additional generic parameter called `SerDe`. The `SerDe` type
6//! must implement the trait [SerDe][crate::custom_serde::serialize::SerDe] which defines how (de)serialization takes place.
7//!
8//! The [Tree<K, V>][crate::Tree] is equivalent to
9//! [Tree<K, V, BincodeSerDe>][crate::custom_serde::Tree] from this module.
10//!
11//! The following features are supported for the custom (de)serialization Tree:
12//! * [key_generating][self::key_generating]: Create `Tree`s with automatically generated keys.
13//! * [convert][self::convert]: Convert any `Tree` into another `Tree` with different key and value types.
14//!
15//! # Example
16//! ```
17//! use serde::{Deserialize, Serialize};
18//! use typed_sled::custom_serde::{serialize::BincodeSerDeLazy, Tree};
19//!
20//! #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
21//! struct SomeValue<'a>(&'a str);
22//!
23//! fn main() -> Result<(), Box<dyn std::error::Error>> {
24//!     // Creating a temporary sled database.
25//!     // If you want to persist the data use sled::open instead.
26//!     let db = sled::Config::new().temporary(true).open().unwrap();
27//!
28//!     // Notice that we are using &str, and SomeValue<'a> here which do not implement
29//!     // serde::DeserializeOwned and thus could not be used with typed_sled::Tree.
30//!     // However our custom lazy Deserializer contained in BincodeSerDeLazy allows us
31//!     // to perform the deserialization lazily and only requires serde::Deserialize<'a>
32//!     // for the lazy deserialization.
33//!     let tree = Tree::<&str, SomeValue, BincodeSerDeLazy>::open(&db, "unique_id");
34//!
35//!     tree.insert(&"some_key", &SomeValue("some_value"))?;
36//!
37//!     assert_eq!(
38//!         tree.get(&"some_key")?.unwrap().deserialize(),
39//!         SomeValue("some_value")
40//!     );
41//!     Ok(())
42//! }
43//! ```
44//!
45//! [sled]: https://docs.rs/sled/latest/sled/
46use crate::custom_serde::serialize::{Deserializer, Key, Serializer, Value};
47use core::fmt;
48use core::iter::{DoubleEndedIterator, Iterator};
49use core::ops::{Bound, RangeBounds};
50use sled::{
51    transaction::{ConflictableTransactionResult, TransactionResult},
52    IVec, Result,
53};
54use std::marker::PhantomData;
55
56pub mod serialize;
57
58#[cfg(feature = "convert")]
59pub mod convert;
60
61#[cfg(feature = "key-generating")]
62pub mod key_generating;
63
64// #[cfg(feature = "search")]
65// pub mod search;
66
67// pub trait Bin = DeserializeOwned + Serialize + Clone + Send + Sync;
68
69/// A flash-sympathetic persistent lock-free B+ tree.
70///
71/// A `Tree` represents a single logical keyspace / namespace / bucket.
72///
73/// # Example
74/// ```
75/// use serde::{Deserialize, Serialize};
76/// use typed_sled::custom_serde::{serialize::BincodeSerDeLazy, Tree};
77///
78/// #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
79/// struct SomeValue<'a>(&'a str);
80///
81/// fn main() -> Result<(), Box<dyn std::error::Error>> {
82///     // Creating a temporary sled database.
83///     // If you want to persist the data use sled::open instead.
84///     let db = sled::Config::new().temporary(true).open().unwrap();
85///
86///     // Notice that we are using &str, and SomeValue<'a> here which do not implement
87///     // serde::DeserializeOwned and thus could not be used with typed_sled::Tree.
88///     // However our custom lazy Deserializer contained in BincodeSerDeLazy allows us
89///     // to perform the deserialization lazily and only requires serde::Deserialize<'a>
90///     // for the lazy deserialization.
91///     let tree = Tree::<&str, SomeValue, BincodeSerDeLazy>::open(&db, "unique_id");
92///
93///     tree.insert(&"some_key", &SomeValue("some_value"))?;
94///
95///     assert_eq!(
96///         tree.get(&"some_key")?.unwrap().deserialize(),
97///         SomeValue("some_value")
98///     );
99///     Ok(())
100/// }
101/// ```
102#[derive(Debug)]
103pub struct Tree<K, V, SerDe> {
104    inner: sled::Tree,
105    _key: PhantomData<fn() -> K>,
106    _value: PhantomData<fn() -> V>,
107    _serde: PhantomData<fn(SerDe)>,
108}
109
110// Manual implementation to make Clone behave better.
111// With derive(Clone) calling clone on a reference returns a reference.
112impl<K, V, SerDe> Clone for Tree<K, V, SerDe> {
113    fn clone(&self) -> Self {
114        Self {
115            inner: self.inner.clone(),
116            _key: PhantomData,
117            _value: PhantomData,
118            _serde: PhantomData,
119        }
120    }
121}
122
123// // implemented like this in the sled source
124// // impl<V: std::fmt::Debug> std::error::Error for CompareAndSwapError<V> {}
125
126// These Trait bounds should probably be specified on the functions themselves, but too lazy.
127impl<K, V, SerDe> Tree<K, V, SerDe> {
128    /// Initialize a typed tree. The id identifies the tree to be opened from the db.
129    /// # Example
130    ///
131    /// ```
132    /// use serde::{Deserialize, Serialize};
133    /// use typed_sled::custom_serde::{serialize::BincodeSerDeLazy, Tree};
134    ///
135    /// #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
136    /// struct SomeValue<'a>(&'a str);
137    ///
138    /// fn main() -> Result<(), Box<dyn std::error::Error>> {
139    ///     // Creating a temporary sled database.
140    ///     // If you want to persist the data use sled::open instead.
141    ///     let db = sled::Config::new().temporary(true).open().unwrap();
142    ///
143    ///     // Notice that we are using &str, and SomeValue<'a> here which do not implement
144    ///     // serde::DeserializeOwned and thus could not be used with typed_sled::Tree.
145    ///     // However our custom lazy Deserializer contained in BincodeSerDeLazy allows us
146    ///     // to perform the deserialization lazily and only requires serde::Deserialize<'a>
147    ///     // for the lazy deserialization.
148    ///     let tree = Tree::<&str, SomeValue, BincodeSerDeLazy>::open(&db, "unique_id");
149    ///
150    ///     tree.insert(&"some_key", &SomeValue("some_value"))?;
151    ///
152    ///     assert_eq!(
153    ///         tree.get(&"some_key")?.unwrap().deserialize(),
154    ///         SomeValue("some_value")
155    ///     );
156    ///     Ok(())
157    /// }
158    //     /// ```
159    pub fn open<T: AsRef<str>>(db: &sled::Db, id: T) -> Self {
160        Self {
161            inner: db.open_tree(id.as_ref()).unwrap(),
162            _key: PhantomData,
163            _value: PhantomData,
164            _serde: PhantomData,
165        }
166    }
167
168    /// Insert a key to a new value, returning the last value if it was set.
169    pub fn insert(&self, key: &K, value: &V) -> Result<Option<Value<K, V, SerDe>>>
170    where
171        SerDe: serialize::SerDe<K, V>,
172    {
173        self.inner
174            .insert(
175                SerDe::SK::serialize(key).as_ref(),
176                SerDe::SV::serialize(value).as_ref(),
177            )
178            .map(|opt| opt.map(|old_value| SerDe::DV::deserialize(old_value)))
179    }
180
181    /// Perform a multi-key serializable transaction.
182    pub fn transaction<F, A, E>(&self, f: F) -> TransactionResult<A, E>
183    where
184        F: Fn(&TransactionalTree<K, V, SerDe>) -> ConflictableTransactionResult<A, E>,
185    {
186        self.inner.transaction(|sled_transactional_tree| {
187            f(&TransactionalTree {
188                inner: sled_transactional_tree,
189                _key: PhantomData,
190                _value: PhantomData,
191                _serde: PhantomData,
192            })
193        })
194    }
195
196    /// Create a new batched update that can be atomically applied.
197    ///
198    /// It is possible to apply a Batch in a transaction as well, which is the way you can apply a Batch to multiple Trees atomically.
199    pub fn apply_batch(&self, batch: Batch<K, V, SerDe>) -> Result<()> {
200        self.inner.apply_batch(batch.inner)
201    }
202
203    /// Retrieve a value from the Tree if it exists.
204    pub fn get(&self, key: &K) -> Result<Option<Value<K, V, SerDe>>>
205    where
206        SerDe: serialize::SerDe<K, V>,
207    {
208        self.inner
209            .get(SerDe::SK::serialize(key))
210            .map(|opt| opt.map(|v| SerDe::DV::deserialize(v)))
211    }
212
213    /// Retrieve a value from the Tree if it exists. The key must be in serialized form.
214    pub fn get_from_raw<B: AsRef<[u8]>>(&self, key_bytes: B) -> Result<Option<Value<K, V, SerDe>>>
215    where
216        SerDe: serialize::SerDe<K, V>,
217    {
218        self.inner
219            .get(key_bytes.as_ref())
220            .map(|opt| opt.map(|v| SerDe::DV::deserialize(v)))
221    }
222
223    /// Deserialize a key and retrieve it's value from the Tree if it exists.
224    /// The deserialization is only done if a value was retrieved successfully.
225    pub fn get_kv_from_raw<B: AsRef<[u8]>>(
226        &self,
227        key_bytes: B,
228    ) -> Result<Option<(Key<K, V, SerDe>, Value<K, V, SerDe>)>>
229    where
230        SerDe: serialize::SerDe<K, V>,
231    {
232        self.inner.get(key_bytes.as_ref()).map(|opt| {
233            opt.map(|v| {
234                (
235                    SerDe::DK::deserialize(sled::IVec::from(key_bytes.as_ref())),
236                    SerDe::DV::deserialize(v),
237                )
238            })
239        })
240    }
241
242    /// Delete a value, returning the old value if it existed.
243    pub fn remove(&self, key: &K) -> Result<Option<Value<K, V, SerDe>>>
244    where
245        SerDe: serialize::SerDe<K, V>,
246    {
247        self.inner
248            .remove(SerDe::SK::serialize(key).as_ref())
249            .map(|opt| opt.map(|v| SerDe::DV::deserialize(v)))
250    }
251
252    /// Compare and swap. Capable of unique creation, conditional modification, or deletion. If old is None, this will only set the value if it doesn't exist yet. If new is None, will delete the value if old is correct. If both old and new are Some, will modify the value if old is correct.
253    ///
254    /// It returns Ok(Ok(())) if operation finishes successfully.
255    ///
256    /// If it fails it returns: - Ok(Err(CompareAndSwapError(current, proposed))) if operation failed to setup a new value. CompareAndSwapError contains current and proposed values. - Err(Error::Unsupported) if the database is opened in read-only mode.
257    pub fn compare_and_swap(
258        &self,
259        key: &K,
260        old: Option<&V>,
261        new: Option<&V>,
262    ) -> Result<core::result::Result<(), CompareAndSwapError<Value<K, V, SerDe>>>>
263    where
264        SerDe: serialize::SerDe<K, V>,
265    {
266        self.inner
267            .compare_and_swap(
268                SerDe::SK::serialize(key),
269                old.map(|old| SerDe::SV::serialize(old))
270                    .as_ref()
271                    .map(|old| old.as_ref()),
272                new.map(|new| SerDe::SV::serialize(new))
273                    .as_ref()
274                    .map(|new| new.as_ref()),
275            )
276            .map(|cas_res| {
277                cas_res.map_err(|cas_err| CompareAndSwapError {
278                    current: cas_err.current.map(|b| SerDe::DV::deserialize(b)),
279                    proposed: cas_err.proposed.map(|b| SerDe::DV::deserialize(b)),
280                })
281            })
282    }
283
284    /// Fetch the value, apply a function to it and return the result.
285    pub fn update_and_fetch<F>(&self, key: &K, mut f: F) -> Result<Option<Value<K, V, SerDe>>>
286    where
287        SerDe: serialize::SerDe<K, V>,
288        F: FnMut(Option<Value<K, V, SerDe>>) -> Option<V>,
289    {
290        self.inner
291            .update_and_fetch(SerDe::SK::serialize(key), |opt_value| {
292                f(opt_value.map(|v| SerDe::DV::deserialize(sled::IVec::from(v)))).map(|value| {
293                    // TODO: Maybe add Into<IVec> to SerDe::SV::Bytes
294                    let bytes = SerDe::SV::serialize(&value);
295                    let mut v = Vec::with_capacity(bytes.as_ref().len());
296                    v.extend(bytes.as_ref());
297                    v
298                })
299            })
300            .map(|res| res.map(|v| SerDe::DV::deserialize(v)))
301    }
302
303    /// Fetch the value, apply a function to it and return the previous value.
304    // not sure if implemented correctly (different trait bound for F)
305    pub fn fetch_and_update<F>(&self, key: &K, mut f: F) -> Result<Option<Value<K, V, SerDe>>>
306    where
307        SerDe: serialize::SerDe<K, V>,
308        F: FnMut(Option<Value<K, V, SerDe>>) -> Option<V>,
309    {
310        self.inner
311            .fetch_and_update(SerDe::SK::serialize(key), |opt_value| {
312                f(opt_value.map(|v| SerDe::DV::deserialize(sled::IVec::from(v)))).map(|value| {
313                    // TODO: Maybe add Into<IVec> to SerDe::SV::Bytes
314                    let bytes = SerDe::SV::serialize(&value);
315                    let mut v = Vec::with_capacity(bytes.as_ref().len());
316                    v.extend(bytes.as_ref());
317                    v
318                })
319            })
320            .map(|res| res.map(|v| SerDe::DV::deserialize(v)))
321    }
322
323    /// Subscribe to `Event`s that happen to keys that have
324    /// the specified prefix. Events for particular keys are
325    /// guaranteed to be witnessed in the same order by all
326    /// threads, but threads may witness different interleavings
327    /// of `Event`s across different keys. If subscribers don't
328    /// keep up with new writes, they will cause new writes
329    /// to block. There is a buffer of 1024 items per
330    ///  `Subscriber`. This can be used to build reactive
331    /// and replicated systems.
332    pub fn watch_prefix(&self, prefix: &K) -> Subscriber<K, V, SerDe>
333    where
334        SerDe: serialize::SerDe<K, V>,
335    {
336        Subscriber::from_sled(self.inner.watch_prefix(SerDe::SK::serialize(prefix)))
337    }
338
339    /// Subscribe to  all`Event`s. Events for particular keys are
340    /// guaranteed to be witnessed in the same order by all
341    /// threads, but threads may witness different interleavings
342    /// of `Event`s across different keys. If subscribers don't
343    /// keep up with new writes, they will cause new writes
344    /// to block. There is a buffer of 1024 items per
345    /// `Subscriber`. This can be used to build reactive
346    /// and replicated systems.
347    pub fn watch_all(&self) -> Subscriber<K, V, SerDe>
348    where
349        SerDe: serialize::SerDe<K, V>,
350    {
351        Subscriber::from_sled(self.inner.watch_prefix(vec![]))
352    }
353
354    /// Synchronously flushes all dirty IO buffers and calls
355    /// fsync. If this succeeds, it is guaranteed that all
356    /// previous writes will be recovered if the system
357    /// crashes. Returns the number of bytes flushed during
358    /// this call.
359    ///
360    /// Flushing can take quite a lot of time, and you should
361    /// measure the performance impact of using it on
362    /// realistic sustained workloads running on realistic
363    /// hardware.
364    pub fn flush(&self) -> Result<usize> {
365        self.inner.flush()
366    }
367
368    /// Asynchronously flushes all dirty IO buffers
369    /// and calls fsync. If this succeeds, it is
370    /// guaranteed that all previous writes will
371    /// be recovered if the system crashes. Returns
372    /// the number of bytes flushed during this call.
373    ///
374    /// Flushing can take quite a lot of time, and you
375    /// should measure the performance impact of
376    /// using it on realistic sustained workloads
377    /// running on realistic hardware.
378    pub async fn flush_async(&self) -> Result<usize> {
379        self.inner.flush_async().await
380    }
381
382    /// Returns `true` if the `Tree` contains a value for
383    /// the specified key.
384    pub fn contains_key(&self, key: &K) -> Result<bool>
385    where
386        SerDe: serialize::SerDe<K, V>,
387    {
388        self.inner.contains_key(SerDe::SK::serialize(key))
389    }
390
391    /// Retrieve the key and value before the provided key,
392    /// if one exists.
393    pub fn get_lt(&self, key: &K) -> Result<Option<(Key<K, V, SerDe>, Value<K, V, SerDe>)>>
394    where
395        SerDe: serialize::SerDe<K, V>,
396    {
397        self.inner
398            .get_lt(SerDe::SK::serialize(key))
399            .map(|res| res.map(|(k, v)| (SerDe::DK::deserialize(k), SerDe::DV::deserialize(v))))
400    }
401
402    /// Retrieve the next key and value from the `Tree` after the
403    /// provided key.
404    pub fn get_gt(&self, key: &K) -> Result<Option<(Key<K, V, SerDe>, Value<K, V, SerDe>)>>
405    where
406        SerDe: serialize::SerDe<K, V>,
407    {
408        self.inner
409            .get_gt(SerDe::SK::serialize(key))
410            .map(|res| res.map(|(k, v)| (SerDe::DK::deserialize(k), SerDe::DV::deserialize(v))))
411    }
412
413    /// Merge state directly into a given key's value using the
414    /// configured merge operator. This allows state to be written
415    /// into a value directly, without any read-modify-write steps.
416    /// Merge operators can be used to implement arbitrary data
417    /// structures.
418    ///
419    /// Calling `merge` will return an `Unsupported` error if it
420    /// is called without first setting a merge operator function.
421    ///
422    /// Merge operators are shared by all instances of a particular
423    /// `Tree`. Different merge operators may be set on different
424    /// `Tree`s.
425    pub fn merge(&self, key: &K, value: &V) -> Result<Option<Value<K, V, SerDe>>>
426    where
427        SerDe: serialize::SerDe<K, V>,
428    {
429        self.inner
430            .merge(SerDe::SK::serialize(key), SerDe::SV::serialize(value))
431            .map(|res| res.map(|old_v| SerDe::DV::deserialize(old_v)))
432    }
433
434    /// For now this maps directly to sled::Tree::set_merge_operator,
435    /// meaning you will have to handle (de)serialization yourself.
436    ///
437    /// Sets a merge operator for use with the `merge` function.
438    ///
439    /// Merge state directly into a given key's value using the
440    /// configured merge operator. This allows state to be written
441    /// into a value directly, without any read-modify-write steps.
442    /// Merge operators can be used to implement arbitrary data
443    /// structures.
444    ///
445    /// # Panics
446    ///
447    /// Calling `merge` will panic if no merge operator has been
448    /// configured.
449    pub fn set_merge_operator(&self, merge_operator: impl sled::MergeOperator + 'static) {
450        self.inner.set_merge_operator(merge_operator);
451    }
452
453    /// Create a double-ended iterator over the tuples of keys and
454    /// values in this tree.
455    pub fn iter(&self) -> Iter<K, V, SerDe> {
456        Iter::from_sled(self.inner.iter())
457    }
458
459    /// Create a double-ended iterator over tuples of keys and values,
460    /// where the keys fall within the specified range.
461    pub fn range<R: RangeBounds<K>>(&self, range: R) -> Iter<K, V, SerDe>
462    where
463        SerDe: serialize::SerDe<K, V>,
464    {
465        match (range.start_bound(), range.end_bound()) {
466            (Bound::Unbounded, Bound::Unbounded) => {
467                Iter::from_sled(self.inner.range::<&[u8], _>(..))
468            }
469            (Bound::Unbounded, Bound::Excluded(b)) => {
470                Iter::from_sled(self.inner.range(..SerDe::SK::serialize(b)))
471            }
472            (Bound::Unbounded, Bound::Included(b)) => {
473                Iter::from_sled(self.inner.range(..=SerDe::SK::serialize(b)))
474            }
475            // FIX: This is not excluding lower bound.
476            (Bound::Excluded(b), Bound::Unbounded) => {
477                Iter::from_sled(self.inner.range(SerDe::SK::serialize(b)..))
478            }
479            (Bound::Excluded(b), Bound::Excluded(bb)) => Iter::from_sled(
480                self.inner
481                    .range(SerDe::SK::serialize(b)..SerDe::SK::serialize(bb)),
482            ),
483            (Bound::Excluded(b), Bound::Included(bb)) => Iter::from_sled(
484                self.inner
485                    .range(SerDe::SK::serialize(b)..=SerDe::SK::serialize(bb)),
486            ),
487            (Bound::Included(b), Bound::Unbounded) => {
488                Iter::from_sled(self.inner.range(SerDe::SK::serialize(b)..))
489            }
490            (Bound::Included(b), Bound::Excluded(bb)) => Iter::from_sled(
491                self.inner
492                    .range(SerDe::SK::serialize(b)..SerDe::SK::serialize(bb)),
493            ),
494            (Bound::Included(b), Bound::Included(bb)) => Iter::from_sled(
495                self.inner
496                    .range(SerDe::SK::serialize(b)..=SerDe::SK::serialize(bb)),
497            ),
498        }
499    }
500
501    /// Create an iterator over tuples of keys and values,
502    /// where the all the keys starts with the given prefix.
503    pub fn scan_prefix(&self, prefix: &K) -> Iter<K, V, SerDe>
504    where
505        SerDe: serialize::SerDe<K, V>,
506    {
507        Iter::from_sled(self.inner.scan_prefix(SerDe::SK::serialize(prefix)))
508    }
509
510    /// Returns the first key and value in the `Tree`, or
511    /// `None` if the `Tree` is empty.
512    pub fn first(&self) -> Result<Option<(Key<K, V, SerDe>, Value<K, V, SerDe>)>>
513    where
514        SerDe: serialize::SerDe<K, V>,
515    {
516        self.inner
517            .first()
518            .map(|res| res.map(|(k, v)| (SerDe::DK::deserialize(k), SerDe::DV::deserialize(v))))
519    }
520
521    /// Returns the last key and value in the `Tree`, or
522    /// `None` if the `Tree` is empty.
523    pub fn last(&self) -> Result<Option<(Key<K, V, SerDe>, Value<K, V, SerDe>)>>
524    where
525        SerDe: serialize::SerDe<K, V>,
526    {
527        self.inner
528            .last()
529            .map(|res| res.map(|(k, v)| (SerDe::DK::deserialize(k), SerDe::DV::deserialize(v))))
530    }
531
532    /// Atomically removes the maximum item in the `Tree` instance.
533    pub fn pop_max(&self) -> Result<Option<(Key<K, V, SerDe>, Value<K, V, SerDe>)>>
534    where
535        SerDe: serialize::SerDe<K, V>,
536    {
537        self.inner
538            .pop_max()
539            .map(|res| res.map(|(k, v)| (SerDe::DK::deserialize(k), SerDe::DV::deserialize(v))))
540    }
541
542    /// Atomically removes the minimum item in the `Tree` instance.
543    pub fn pop_min(&self) -> Result<Option<(Key<K, V, SerDe>, Value<K, V, SerDe>)>>
544    where
545        SerDe: serialize::SerDe<K, V>,
546    {
547        self.inner
548            .pop_min()
549            .map(|res| res.map(|(k, v)| (SerDe::DK::deserialize(k), SerDe::DV::deserialize(v))))
550    }
551
552    /// Returns the number of elements in this tree.
553    pub fn len(&self) -> usize {
554        self.inner.len()
555    }
556
557    /// Returns `true` if the `Tree` contains no elements.
558    pub fn is_empty(&self) -> bool {
559        self.inner.is_empty()
560    }
561
562    /// Clears the `Tree`, removing all values.
563    ///
564    /// Note that this is not atomic.
565    pub fn clear(&self) -> Result<()> {
566        self.inner.clear()
567    }
568
569    /// Returns the name of the tree.
570    pub fn name(&self) -> IVec {
571        self.inner.name()
572    }
573
574    /// Returns the CRC32 of all keys and values
575    /// in this Tree.
576    ///
577    /// This is O(N) and locks the underlying tree
578    /// for the duration of the entire scan.
579    pub fn checksum(&self) -> Result<u32> {
580        self.inner.checksum()
581    }
582}
583
584pub struct TransactionalTree<'a, K, V, SerDe> {
585    inner: &'a sled::transaction::TransactionalTree,
586    _key: PhantomData<fn() -> K>,
587    _value: PhantomData<fn() -> V>,
588    _serde: PhantomData<fn(SerDe)>,
589}
590
591impl<'a, K, V, SerDe> TransactionalTree<'a, K, V, SerDe> {
592    pub fn insert(
593        &self,
594        key: &K,
595        value: &V,
596    ) -> std::result::Result<
597        Option<Value<K, V, SerDe>>,
598        sled::transaction::UnabortableTransactionError,
599    >
600    where
601        SerDe: serialize::SerDe<K, V>,
602    {
603        self.inner
604            .insert(
605                SerDe::SK::serialize(key).as_ref(),
606                SerDe::SV::serialize(value).as_ref(),
607            )
608            .map(|opt| opt.map(|v| SerDe::DV::deserialize(v)))
609    }
610
611    pub fn remove(
612        &self,
613        key: &K,
614    ) -> std::result::Result<
615        Option<Value<K, V, SerDe>>,
616        sled::transaction::UnabortableTransactionError,
617    >
618    where
619        SerDe: serialize::SerDe<K, V>,
620    {
621        self.inner
622            .remove(SerDe::SK::serialize(key).as_ref())
623            .map(|opt| opt.map(|v| SerDe::DV::deserialize(v)))
624    }
625
626    pub fn get(
627        &self,
628        key: &K,
629    ) -> std::result::Result<
630        Option<Value<K, V, SerDe>>,
631        sled::transaction::UnabortableTransactionError,
632    >
633    where
634        SerDe: serialize::SerDe<K, V>,
635    {
636        self.inner
637            .get(SerDe::SK::serialize(key))
638            .map(|opt| opt.map(|v| SerDe::DV::deserialize(v)))
639    }
640
641    pub fn apply_batch(
642        &self,
643        batch: &Batch<K, V, SerDe>,
644    ) -> std::result::Result<(), sled::transaction::UnabortableTransactionError> {
645        self.inner.apply_batch(&batch.inner)
646    }
647
648    pub fn flush(&self) {
649        self.inner.flush()
650    }
651
652    pub fn generate_id(&self) -> Result<u64> {
653        self.inner.generate_id()
654    }
655}
656
657pub struct Iter<K, V, SerDe> {
658    inner: sled::Iter,
659    _key: PhantomData<fn() -> K>,
660    _value: PhantomData<fn() -> V>,
661    _serde: PhantomData<fn(SerDe)>,
662}
663
664impl<K, V, SerDe: serialize::SerDe<K, V>> Iterator for Iter<K, V, SerDe> {
665    type Item = Result<(Key<K, V, SerDe>, Value<K, V, SerDe>)>;
666
667    fn next(&mut self) -> Option<Self::Item> {
668        self.inner
669            .next()
670            .map(|res| res.map(|(k, v)| (SerDe::DK::deserialize(k), SerDe::DV::deserialize(v))))
671    }
672
673    fn last(mut self) -> Option<Self::Item> {
674        self.inner
675            .next_back()
676            .map(|res| res.map(|(k, v)| (SerDe::DK::deserialize(k), SerDe::DV::deserialize(v))))
677    }
678}
679
680impl<K, V, SerDe: serialize::SerDe<K, V>> DoubleEndedIterator for Iter<K, V, SerDe> {
681    fn next_back(&mut self) -> Option<Self::Item> {
682        self.inner
683            .next_back()
684            .map(|res| res.map(|(k, v)| (SerDe::DK::deserialize(k), SerDe::DV::deserialize(v))))
685    }
686}
687
688impl<K, V, SerDe> Iter<K, V, SerDe> {
689    pub fn from_sled(iter: sled::Iter) -> Self {
690        Iter {
691            inner: iter,
692            _key: PhantomData,
693            _value: PhantomData,
694            _serde: PhantomData,
695        }
696    }
697
698    pub fn keys(self) -> impl DoubleEndedIterator<Item = Result<Key<K, V, SerDe>>> + Send + Sync
699    where
700        SerDe: serialize::SerDe<K, V>,
701        K: Sync + Send,
702        V: Sync + Send,
703    {
704        self.map(|r| r.map(|(k, _v)| k))
705    }
706
707    /// Iterate over the values of this Tree
708    pub fn values(self) -> impl DoubleEndedIterator<Item = Result<Value<K, V, SerDe>>> + Send + Sync
709    where
710        SerDe: serialize::SerDe<K, V>,
711        K: Sync + Send,
712        V: Sync + Send,
713    {
714        self.map(|r| r.map(|(_k, v)| v))
715    }
716}
717
718#[derive(Clone, Debug)]
719pub struct Batch<K, V, SerDe> {
720    inner: sled::Batch,
721    _key: PhantomData<fn() -> K>,
722    _value: PhantomData<fn() -> V>,
723    _serde: PhantomData<fn(SerDe)>,
724}
725
726impl<K, V, SerDe> Batch<K, V, SerDe> {
727    pub fn insert(&mut self, key: &K, value: &V)
728    where
729        SerDe: serialize::SerDe<K, V>,
730    {
731        self.inner.insert(
732            SerDe::SK::serialize(key).as_ref(),
733            SerDe::SV::serialize(value).as_ref(),
734        );
735    }
736
737    pub fn remove(&mut self, key: &K)
738    where
739        SerDe: serialize::SerDe<K, V>,
740    {
741        self.inner.remove(SerDe::SK::serialize(key).as_ref())
742    }
743}
744
745// Implementing Default manually to not require K, V and SerDe to implement Default.
746impl<K, V, SerDe> Default for Batch<K, V, SerDe> {
747    fn default() -> Self {
748        Self {
749            inner: Default::default(),
750            _key: PhantomData,
751            _value: PhantomData,
752            _serde: PhantomData,
753        }
754    }
755}
756
757use pin_project::pin_project;
758#[pin_project]
759pub struct Subscriber<K, V, SerDe> {
760    #[pin]
761    inner: sled::Subscriber,
762    _key: PhantomData<fn() -> K>,
763    _value: PhantomData<fn() -> V>,
764    _serde: PhantomData<fn(SerDe)>,
765}
766
767impl<K, V, SerDe> Subscriber<K, V, SerDe> {
768    pub fn next_timeout(
769        &mut self,
770        timeout: core::time::Duration,
771    ) -> core::result::Result<Event<K, V, SerDe>, std::sync::mpsc::RecvTimeoutError>
772    where
773        SerDe: serialize::SerDe<K, V>,
774    {
775        self.inner
776            .next_timeout(timeout)
777            .map(|e| Event::from_sled(e))
778    }
779
780    pub fn from_sled(subscriber: sled::Subscriber) -> Self {
781        Self {
782            inner: subscriber,
783            _key: PhantomData,
784            _value: PhantomData,
785            _serde: PhantomData,
786        }
787    }
788}
789
790use core::future::Future;
791use core::pin::Pin;
792use core::task::{Context, Poll};
793impl<K: Unpin, V: Unpin, SerDe: serialize::SerDe<K, V>> Future for Subscriber<K, V, SerDe> {
794    type Output = Option<Event<K, V, SerDe>>;
795
796    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
797        self.project()
798            .inner
799            .poll(cx)
800            .map(|opt| opt.map(|e| Event::from_sled(e)))
801    }
802}
803
804impl<K, V, SerDe: serialize::SerDe<K, V>> Iterator for Subscriber<K, V, SerDe> {
805    type Item = Event<K, V, SerDe>;
806
807    fn next(&mut self) -> Option<Event<K, V, SerDe>> {
808        self.inner.next().map(|e| Event::from_sled(e))
809    }
810}
811
812pub enum Event<K, V, SerDe: serialize::SerDe<K, V>> {
813    Insert {
814        key: Key<K, V, SerDe>,
815        value: Value<K, V, SerDe>,
816    },
817    Remove {
818        key: Key<K, V, SerDe>,
819    },
820}
821
822impl<K, V, SerDe: serialize::SerDe<K, V>> Event<K, V, SerDe> {
823    pub fn key(&self) -> &Key<K, V, SerDe> {
824        match self {
825            Self::Insert { key, .. } | Self::Remove { key } => key,
826        }
827    }
828
829    pub fn from_sled(event: sled::Event) -> Self {
830        match event {
831            sled::Event::Insert { key, value } => Self::Insert {
832                key: SerDe::DK::deserialize(key),
833                value: SerDe::DV::deserialize(value),
834            },
835            sled::Event::Remove { key } => Self::Remove {
836                key: SerDe::DK::deserialize(key),
837            },
838        }
839    }
840}
841
842/// Compare and swap error.
843#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
844pub struct CompareAndSwapError<V> {
845    /// The current value which caused your CAS to fail.
846    pub current: Option<V>,
847    /// Returned value that was proposed unsuccessfully.
848    pub proposed: Option<V>,
849}
850
851impl<V> fmt::Display for CompareAndSwapError<V> {
852    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
853        write!(f, "Compare and swap conflict")
854    }
855}
856
857#[cfg(test)]
858mod tests {
859    use super::*;
860    use crate::custom_serde::serialize::BincodeSerDe;
861
862    #[test]
863    fn test_range() {
864        let config = sled::Config::new().temporary(true);
865        let db = config.open().unwrap();
866
867        let tree: Tree<u32, u32, BincodeSerDe> = Tree::open(&db, "test_tree");
868
869        tree.insert(&1, &2).unwrap();
870        tree.insert(&3, &4).unwrap();
871        tree.insert(&6, &2).unwrap();
872        tree.insert(&10, &2).unwrap();
873        tree.insert(&15, &2).unwrap();
874        tree.flush().unwrap();
875
876        let expect_results = [(6, 2), (10, 2)];
877
878        for (i, result) in tree.range(6..11).enumerate() {
879            assert_eq!(result.unwrap(), expect_results[i]);
880        }
881    }
882
883    #[test]
884    fn test_cas() {
885        let config = sled::Config::new().temporary(true);
886        let db = config.open().unwrap();
887
888        let tree: Tree<u32, u32, BincodeSerDe> = Tree::open(&db, "test_tree");
889
890        let current = 2;
891        tree.insert(&1, &current).unwrap();
892        let expected = 3;
893        let proposed = 4;
894        let res = tree
895            .compare_and_swap(&1, Some(&expected), Some(&proposed))
896            .expect("db failure");
897
898        assert_eq!(
899            res,
900            Err(CompareAndSwapError {
901                current: Some(current),
902                proposed: Some(proposed),
903            }),
904        );
905    }
906}