bincode_sled/
lib.rs

1//! typed-sled - a database build on top of sled.
2//!
3//! sled is a high-performance embedded database with an API that is similar to a `BTreeMap<[u8], [u8]>`.  
4//! typed-sled builds on top of sled and offers an API that is similar to a `BTreeMap<K, V>`, where
5//! K and V are user defined types which implement [Deserialize][serde::Deserialize] and [Serialize][serde::Serialize].
6//!
7//! # features
8//! Multiple features for common use cases are also available:
9//! * [search]: `SearchEngine` on top of a `Tree`.
10//! * [key_generating]: Create `Tree`s with automatically generated keys.
11//! * [convert]: Convert any `Tree` into another `Tree` with different key and value types.
12//!
13//! [sled]: https://docs.rs/sled/latest/sled/
14
15use bincode::config::{Fixint, LittleEndian, NoLimit};
16use bincode::{Decode, Encode};
17pub use sled::{open, Config};
18use transaction::TransactionalTree;
19
20#[cfg(feature = "convert")]
21pub mod convert;
22#[cfg(feature = "key-generating")]
23pub mod key_generating;
24#[cfg(feature = "search")]
25pub mod search;
26pub mod transaction;
27
28use core::fmt;
29use core::iter::{DoubleEndedIterator, Iterator};
30use core::ops::{Bound, RangeBounds};
31use sled::{
32    transaction::{ConflictableTransactionResult, TransactionResult},
33    IVec, Result,
34};
35use std::marker::PhantomData;
36
37pub const DEFAULT_CONF: bincode::config::Configuration<LittleEndian, Fixint, NoLimit> =
38    bincode::config::standard()
39        .with_little_endian()
40        .with_fixed_int_encoding()
41        .with_no_limit();
42type BinConfT = bincode::config::Configuration<LittleEndian, Fixint, NoLimit>;
43
44// pub trait Bin = DeserializeOwned + Serialize + Clone + Send + Sync;
45
46/// A flash-sympathetic persistent lock-free B+ tree.
47///
48/// A `Tree` represents a single logical keyspace / namespace / bucket.
49///
50/// # Example
51/// ```
52/// use bincode::{Encode, Decode};
53///
54/// #[derive(Debug, Clone, Encode, Decode, PartialEq)]
55/// struct SomeValue(u32);
56///
57/// fn main() -> Result<(), Box<dyn std::error::Error>> {
58///     // Creating a temporary sled database.
59///     // If you want to persist the data use sled::open instead.
60///     let db = sled::Config::new().temporary(true).open().unwrap();
61///
62///     // The id is used by sled to identify which Tree in the database (db) to open.
63///     let tree = bincode_sled::Tree::<String, SomeValue>::open(&db, "unique_id");
64///
65///     tree.insert(&"some_key".to_owned(), &SomeValue(10))?;
66///
67///     assert_eq!(tree.get(&"some_key".to_owned())?, Some(SomeValue(10)));
68///     Ok(())
69/// }
70/// ```
71#[derive(Debug)]
72pub struct Tree<K, V>
73where
74    K: Key,
75    V: Value,
76{
77    inner: sled::Tree,
78    _key: PhantomData<fn() -> K>,
79    _value: PhantomData<fn() -> V>,
80}
81
82// Manual implementation to make ToOwned behave better.
83// With derive(Clone) to_owned() on a reference returns a reference.
84impl<K, V> Clone for Tree<K, V>
85where
86    K: Key,
87    V: Value,
88{
89    fn clone(&self) -> Self {
90        Self {
91            inner: self.inner.clone(),
92            _key: PhantomData,
93            _value: PhantomData,
94        }
95    }
96}
97
98/// Trait alias for bounds required on keys and values.
99/// For now only types that implement DeserializeOwned
100/// are supported.
101// [specilization] might make
102// supporting any type that implements Deserialize<'a>
103// possible without much overhead. Otherwise the branch
104// custom_de_serialization introduces custom (de)serialization
105// for each `Tree` which might also make it possible.
106//
107// [specialization]: https://github.com/rust-lang/rust/issues/31844
108pub trait Key: Encode + Decode + Send {}
109
110impl<T: Encode + Decode + Send + Sync> Key for T {}
111
112pub trait Value: Encode + Decode + Send {}
113impl<T: Encode + Decode + Send + Sync> Value for T {}
114
115/// Compare and swap error.
116#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
117pub struct CompareAndSwapError<V> {
118    /// The current value which caused your CAS to fail.
119    pub current: Option<V>,
120    /// Returned value that was proposed unsuccessfully.
121    pub proposed: Option<V>,
122}
123
124impl<V> fmt::Display for CompareAndSwapError<V> {
125    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126        write!(f, "Compare and swap conflict")
127    }
128}
129
130// implemented like this in the sled source
131impl<V: std::fmt::Debug> std::error::Error for CompareAndSwapError<V> {}
132
133// These Trait bounds should probably be specified on the functions themselves, but too lazy.
134impl<K, V> Tree<K, V>
135where
136    K: Key,
137    V: Value,
138{
139    /// Initialize a typed tree. The id identifies the tree to be opened from the db.
140    /// # Example
141    ///
142    /// ```
143    /// use bincode::{Encode, Decode};
144    ///
145    /// #[derive(Debug, Clone, Encode, Decode, PartialEq)]
146    /// struct SomeValue(u32);
147    ///
148    /// fn main() -> Result<(), Box<dyn std::error::Error>> {
149    ///     // Creating a temporary sled database.
150    ///     // If you want to persist the data use sled::open instead.
151    ///     let db = sled::Config::new().temporary(true).open().unwrap();
152    ///
153    ///     // The id is used by sled to identify which Tree in the database (db) to open.
154    ///     let tree = bincode_sled::Tree::<String, SomeValue>::open(&db, "unique_id");
155    ///
156    ///     tree.insert(&"some_key".to_owned(), &SomeValue(10))?;
157    ///
158    ///     assert_eq!(tree.get(&"some_key".to_owned())?, Some(SomeValue(10)));
159    ///     Ok(())
160    /// }
161    /// ```
162    pub fn open<T: AsRef<str>>(db: &sled::Db, id: T) -> Self {
163        Self {
164            inner: db.open_tree(id.as_ref()).unwrap(),
165            _key: PhantomData,
166            _value: PhantomData,
167        }
168    }
169
170    /// Insert a key to a new value, returning the last value if it was set.
171    pub fn insert(&self, key: &K, value: &V) -> Result<Option<V>> {
172        self.inner
173            .insert(serialize(key), serialize(value))
174            .map(|opt| opt.map(|old_value| deserialize(&old_value)))
175    }
176
177    /// Perform a multi-key serializable transaction.
178    pub fn transaction<F, A, E>(&self, f: F) -> TransactionResult<A, E>
179    where
180        F: Fn(&TransactionalTree<K, V>) -> ConflictableTransactionResult<A, E>,
181    {
182        self.inner.transaction(|sled_transactional_tree| {
183            f(&TransactionalTree::new(sled_transactional_tree))
184        })
185    }
186
187    /// Create a new batched update that can be atomically applied.
188    ///
189    /// It is possible to apply a Batch in a transaction as well, which is the way you can apply a Batch to multiple Trees atomically.
190    pub fn apply_batch(&self, batch: Batch<K, V>) -> Result<()> {
191        self.inner.apply_batch(batch.inner)
192    }
193
194    /// Retrieve a value from the Tree if it exists.
195    pub fn get(&self, key: &K) -> Result<Option<V>> {
196        self.inner
197            .get(serialize(key))
198            .map(|opt| opt.map(|v| deserialize(&v)))
199    }
200
201    /// Retrieve a value from the Tree if it exists. The key must be in serialized form.
202    pub fn get_from_raw<B: AsRef<[u8]>>(&self, key_bytes: B) -> Result<Option<V>> {
203        self.inner
204            .get(key_bytes.as_ref())
205            .map(|opt| opt.map(|v| deserialize(&v)))
206    }
207
208    /// Deserialize a key and retrieve it's value from the Tree if it exists.
209    /// The deserialization is only done if a value was retrieved successfully.
210    pub fn get_kv_from_raw<B: AsRef<[u8]>>(&self, key_bytes: B) -> Result<Option<(K, V)>> {
211        self.inner
212            .get(key_bytes.as_ref())
213            .map(|opt| opt.map(|v| (deserialize(key_bytes.as_ref()), deserialize(&v))))
214    }
215
216    /// Delete a value, returning the old value if it existed.
217    pub fn remove(&self, key: &K) -> Result<Option<V>> {
218        self.inner
219            .remove(serialize(key))
220            .map(|opt| opt.map(|v| deserialize(&v)))
221    }
222
223    /// Compare and swap. Capable of unique creation, conditional modification, or deletion. If old is None, this will only set the value if it doesn't exist yet. If new is None, will delete the value if old is correct. If both old and new are Some, will modify the value if old is correct.
224    ///
225    /// It returns Ok(Ok(())) if operation finishes successfully.
226    ///
227    /// If it fails it returns: - Ok(Err(CompareAndSwapError(current, proposed))) if operation failed to setup a new value. CompareAndSwapError contains current and proposed values. - Err(Error::Unsupported) if the database is opened in read-only mode.
228    pub fn compare_and_swap(
229        &self,
230        key: &K,
231        old: Option<&V>,
232        new: Option<&V>,
233    ) -> Result<core::result::Result<(), CompareAndSwapError<V>>> {
234        self.inner
235            .compare_and_swap(
236                serialize(key),
237                old.map(|old| serialize(old)),
238                new.map(|new| serialize(new)),
239            )
240            .map(|cas_res| {
241                cas_res.map_err(|cas_err| CompareAndSwapError {
242                    current: cas_err.current.as_ref().map(|b| deserialize(b)),
243                    proposed: cas_err.proposed.as_ref().map(|b| deserialize(b)),
244                })
245            })
246    }
247
248    /// Fetch the value, apply a function to it and return the result.
249    // not sure if implemented correctly (different trait bound for F)
250    pub fn update_and_fetch<F>(&self, key: &K, mut f: F) -> Result<Option<V>>
251    where
252        F: FnMut(Option<V>) -> Option<V>,
253    {
254        self.inner
255            .update_and_fetch(serialize(&key), |opt_value| {
256                f(opt_value.map(|v| deserialize(v))).map(|v| serialize(&v))
257            })
258            .map(|res| res.map(|v| deserialize(&v)))
259    }
260
261    /// Fetch the value, apply a function to it and return the previous value.
262    // not sure if implemented correctly (different trait bound for F)
263    pub fn fetch_and_update<F>(&self, key: &K, mut f: F) -> Result<Option<V>>
264    where
265        F: FnMut(Option<V>) -> Option<V>,
266    {
267        self.inner
268            .fetch_and_update(serialize(key), |opt_value| {
269                f(opt_value.map(|v| deserialize(v))).map(|v| serialize(&v))
270            })
271            .map(|res| res.map(|v| deserialize(&v)))
272    }
273
274    /// Subscribe to `Event`s that happen to keys that have
275    /// the specified prefix. Events for particular keys are
276    /// guaranteed to be witnessed in the same order by all
277    /// threads, but threads may witness different interleavings
278    /// of `Event`s across different keys. If subscribers don't
279    /// keep up with new writes, they will cause new writes
280    /// to block. There is a buffer of 1024 items per
281    /// `Subscriber`. This can be used to build reactive
282    /// and replicated systems.
283    pub fn watch_prefix(&self, prefix: &K) -> Subscriber<K, V> {
284        Subscriber::from_sled(self.inner.watch_prefix(serialize(prefix)))
285    }
286
287    /// Subscribe to  all`Event`s. Events for particular keys are
288    /// guaranteed to be witnessed in the same order by all
289    /// threads, but threads may witness different interleavings
290    /// of `Event`s across different keys. If subscribers don't
291    /// keep up with new writes, they will cause new writes
292    /// to block. There is a buffer of 1024 items per
293    /// `Subscriber`. This can be used to build reactive
294    /// and replicated systems.
295    pub fn watch_all(&self) -> Subscriber<K, V> {
296        Subscriber::from_sled(self.inner.watch_prefix(vec![]))
297    }
298
299    /// Synchronously flushes all dirty IO buffers and calls
300    /// fsync. If this succeeds, it is guaranteed that all
301    /// previous writes will be recovered if the system
302    /// crashes. Returns the number of bytes flushed during
303    /// this call.
304    ///
305    /// Flushing can take quite a lot of time, and you should
306    /// measure the performance impact of using it on
307    /// realistic sustained workloads running on realistic
308    /// hardware.
309    pub fn flush(&self) -> Result<usize> {
310        self.inner.flush()
311    }
312
313    /// Asynchronously flushes all dirty IO buffers
314    /// and calls fsync. If this succeeds, it is
315    /// guaranteed that all previous writes will
316    /// be recovered if the system crashes. Returns
317    /// the number of bytes flushed during this call.
318    ///
319    /// Flushing can take quite a lot of time, and you
320    /// should measure the performance impact of
321    /// using it on realistic sustained workloads
322    /// running on realistic hardware.
323    pub async fn flush_async(&self) -> Result<usize> {
324        self.inner.flush_async().await
325    }
326
327    /// Returns `true` if the `Tree` contains a value for
328    /// the specified key.
329    pub fn contains_key(&self, key: &K) -> Result<bool> {
330        self.inner.contains_key(serialize(key))
331    }
332
333    /// Retrieve the key and value before the provided key,
334    /// if one exists.
335    pub fn get_lt(&self, key: &K) -> Result<Option<(K, V)>> {
336        self.inner
337            .get_lt(serialize(key))
338            .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
339    }
340
341    /// Retrieve the next key and value from the `Tree` after the
342    /// provided key.
343    pub fn get_gt(&self, key: &K) -> Result<Option<(K, V)>> {
344        self.inner
345            .get_gt(serialize(key))
346            .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
347    }
348
349    /// Merge state directly into a given key's value using the
350    /// configured merge operator. This allows state to be written
351    /// into a value directly, without any read-modify-write steps.
352    /// Merge operators can be used to implement arbitrary data
353    /// structures.
354    ///
355    /// Calling `merge` will return an `Unsupported` error if it
356    /// is called without first setting a merge operator function.
357    ///
358    /// Merge operators are shared by all instances of a particular
359    /// `Tree`. Different merge operators may be set on different
360    /// `Tree`s.
361    pub fn merge(&self, key: &K, value: &V) -> Result<Option<V>> {
362        self.inner
363            .merge(serialize(key), serialize(value))
364            .map(|res| res.map(|old_v| deserialize(&old_v)))
365    }
366
367    // TODO: implement using own MergeOperator trait
368    /// Sets a merge operator for use with the `merge` function.
369    ///
370    /// Merge state directly into a given key's value using the
371    /// configured merge operator. This allows state to be written
372    /// into a value directly, without any read-modify-write steps.
373    /// Merge operators can be used to implement arbitrary data
374    /// structures.
375    ///
376    /// # Panics
377    ///
378    /// Calling `merge` will panic if no merge operator has been
379    /// configured.
380    pub fn set_merge_operator(&self, merge_operator: impl MergeOperator<K, V> + 'static) {
381        self.inner
382            .set_merge_operator(move |key: &[u8], old_v: Option<&[u8]>, value: &[u8]| {
383                let opt_v = merge_operator(
384                    deserialize(key),
385                    old_v.map(|v| deserialize(v)),
386                    deserialize(value),
387                );
388                opt_v.map(|v| serialize(&v))
389            });
390    }
391
392    /// Create a double-ended iterator over the tuples of keys and
393    /// values in this tree.
394    pub fn iter(&self) -> Iter<K, V> {
395        Iter::from_sled(self.inner.iter())
396    }
397
398    /// Create a double-ended iterator over tuples of keys and values,
399    /// where the keys fall within the specified range.
400    pub fn range<R: RangeBounds<K>>(&self, range: R) -> Iter<K, V> {
401        match (range.start_bound(), range.end_bound()) {
402            (Bound::Unbounded, Bound::Unbounded) => {
403                Iter::from_sled(self.inner.range::<&[u8], _>(..))
404            }
405            (Bound::Unbounded, Bound::Excluded(b)) => {
406                Iter::from_sled(self.inner.range(..serialize(b)))
407            }
408            (Bound::Unbounded, Bound::Included(b)) => {
409                Iter::from_sled(self.inner.range(..=serialize(b)))
410            }
411            // FIX: This is not excluding lower bound.
412            (Bound::Excluded(b), Bound::Unbounded) => {
413                Iter::from_sled(self.inner.range(serialize(b)..))
414            }
415            (Bound::Excluded(b), Bound::Excluded(bb)) => {
416                Iter::from_sled(self.inner.range(serialize(b)..serialize(bb)))
417            }
418            (Bound::Excluded(b), Bound::Included(bb)) => {
419                Iter::from_sled(self.inner.range(serialize(b)..=serialize(bb)))
420            }
421            (Bound::Included(b), Bound::Unbounded) => {
422                Iter::from_sled(self.inner.range(serialize(b)..))
423            }
424            (Bound::Included(b), Bound::Excluded(bb)) => {
425                Iter::from_sled(self.inner.range(serialize(b)..serialize(bb)))
426            }
427            (Bound::Included(b), Bound::Included(bb)) => {
428                Iter::from_sled(self.inner.range(serialize(b)..=serialize(bb)))
429            }
430        }
431    }
432
433    /// Create an iterator over tuples of keys and values,
434    /// where the all the keys starts with the given prefix.
435    pub fn scan_prefix(&self, prefix: &K) -> Iter<K, V> {
436        Iter::from_sled(self.inner.scan_prefix(serialize(prefix)))
437    }
438
439    /// Returns the first key and value in the `Tree`, or
440    /// `None` if the `Tree` is empty.
441    pub fn first(&self) -> Result<Option<(K, V)>> {
442        self.inner
443            .first()
444            .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
445    }
446
447    /// Returns the last key and value in the `Tree`, or
448    /// `None` if the `Tree` is empty.
449    pub fn last(&self) -> Result<Option<(K, V)>> {
450        self.inner
451            .last()
452            .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
453    }
454
455    /// Atomically removes the maximum item in the `Tree` instance.
456    pub fn pop_max(&self) -> Result<Option<(K, V)>> {
457        self.inner
458            .pop_max()
459            .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
460    }
461
462    /// Atomically removes the minimum item in the `Tree` instance.
463    pub fn pop_min(&self) -> Result<Option<(K, V)>> {
464        self.inner
465            .pop_min()
466            .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
467    }
468
469    /// Returns the number of elements in this tree.
470    pub fn len(&self) -> usize {
471        self.inner.len()
472    }
473
474    /// Returns `true` if the `Tree` contains no elements.
475    pub fn is_empty(&self) -> bool {
476        self.inner.is_empty()
477    }
478
479    /// Clears the `Tree`, removing all values.
480    ///
481    /// Note that this is not atomic.
482    pub fn clear(&self) -> Result<()> {
483        self.inner.clear()
484    }
485
486    /// Returns the name of the tree.
487    pub fn name(&self) -> IVec {
488        self.inner.name()
489    }
490
491    /// Returns the CRC32 of all keys and values
492    /// in this Tree.
493    ///
494    /// This is O(N) and locks the underlying tree
495    /// for the duration of the entire scan.
496    pub fn checksum(&self) -> Result<u32> {
497        self.inner.checksum()
498    }
499}
500
501/// # Examples
502///
503/// ```
504/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
505/// use sled::{Config, IVec};
506///
507/// fn concatenate_merge(
508///   _key: String,               // the key being merged
509///   old_value: Option<Vec<f32>>,  // the previous value, if one existed
510///   merged_bytes: Vec<f32>        // the new bytes being merged in
511/// ) -> Option<Vec<f32>> {       // set the new value, return None to delete
512///   let mut ret = old_value
513///     .map(|ov| ov.to_vec())
514///     .unwrap_or_else(|| vec![]);
515///
516///   ret.extend_from_slice(&merged_bytes);
517///
518///   Some(ret)
519/// }
520///
521/// let db = sled::Config::new()
522///   .temporary(true).open()?;
523///
524/// let tree = bincode_sled::Tree::<String, Vec<f32>>::open(&db, "unique_id");
525/// tree.set_merge_operator(concatenate_merge);
526///
527/// let k = String::from("some_key");
528///
529/// tree.insert(&k, &vec![0.0]);
530/// tree.merge(&k, &vec![1.0]);
531/// tree.merge(&k, &vec![2.0]);
532/// assert_eq!(tree.get(&k)?, Some(vec![0.0, 1.0, 2.0]));
533///
534/// // Replace previously merged data. The merge function will not be called.
535/// tree.insert(&k, &vec![3.0]);
536/// assert_eq!(tree.get(&k)?, Some(vec![3.0]));
537///
538/// // Merges on non-present values will cause the merge function to be called
539/// // with `old_value == None`. If the merge function returns something (which it
540/// // does, in this case) a new value will be inserted.
541/// tree.remove(&k);
542/// tree.merge(&k, &vec![4.0]);
543/// assert_eq!(tree.get(&k)?, Some(vec![4.0]));
544/// # Ok(()) }
545/// ```
546pub trait MergeOperator<K, V>: Fn(K, Option<V>, V) -> Option<V>
547where
548    K: Key,
549    V: Value,
550{
551}
552
553impl<K: Key, V: Value, F> MergeOperator<K, V> for F where F: Fn(K, Option<V>, V) -> Option<V> {}
554
555pub struct Iter<K, V> {
556    inner: sled::Iter,
557    _key: PhantomData<fn() -> K>,
558    _value: PhantomData<fn() -> V>,
559}
560
561impl<K: Key, V: Value> Iterator for Iter<K, V> {
562    type Item = Result<(K, V)>;
563
564    fn next(&mut self) -> Option<Self::Item> {
565        self.inner
566            .next()
567            .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
568    }
569
570    fn last(mut self) -> Option<Self::Item> {
571        self.inner
572            .next_back()
573            .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
574    }
575}
576
577impl<K: Key, V: Value> DoubleEndedIterator for Iter<K, V> {
578    fn next_back(&mut self) -> Option<Self::Item> {
579        self.inner
580            .next_back()
581            .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
582    }
583}
584
585impl<K: Key, V: Value> Iter<K, V> {
586    pub fn from_sled(iter: sled::Iter) -> Self {
587        Iter {
588            inner: iter,
589            _key: PhantomData,
590            _value: PhantomData,
591        }
592    }
593
594    pub fn keys(self) -> impl DoubleEndedIterator<Item = Result<K>> + Send + Sync {
595        self.map(|r| r.map(|(k, _v)| k))
596    }
597
598    /// Iterate over the values of this Tree
599    pub fn values(self) -> impl DoubleEndedIterator<Item = Result<V>> + Send + Sync {
600        self.map(|r| r.map(|(_k, v)| v))
601    }
602}
603
604#[derive(Clone, Debug)]
605pub struct Batch<K, V>
606where
607    K: Key,
608    V: Value,
609{
610    inner: sled::Batch,
611    _key: PhantomData<fn() -> K>,
612    _value: PhantomData<fn() -> V>,
613}
614
615impl<K, V> Batch<K, V>
616where
617    K: Key,
618    V: Value,
619{
620    pub fn insert(&mut self, key: &K, value: &V) {
621        self.inner.insert(serialize(key), serialize(value));
622    }
623
624    pub fn remove(&mut self, key: &K) {
625        self.inner.remove(serialize(key))
626    }
627}
628
629// Implementing Default manually to not require K and V to implement Default.
630impl<K, V> Default for Batch<K, V>
631where
632    K: Key,
633    V: Value,
634{
635    fn default() -> Self {
636        Self {
637            inner: Default::default(),
638            _key: PhantomData,
639            _value: PhantomData,
640        }
641    }
642}
643
644use pin_project::pin_project;
645#[pin_project]
646pub struct Subscriber<K, V>
647where
648    K: Key,
649    V: Value,
650{
651    #[pin]
652    inner: sled::Subscriber,
653    _key: PhantomData<fn() -> K>,
654    _value: PhantomData<fn() -> V>,
655}
656
657impl<K, V> Subscriber<K, V>
658where
659    K: Key,
660    V: Value,
661{
662    pub fn next_timeout(
663        &mut self,
664        timeout: core::time::Duration,
665    ) -> core::result::Result<Event<K, V>, std::sync::mpsc::RecvTimeoutError> {
666        self.inner
667            .next_timeout(timeout)
668            .map(|e| Event::from_sled(&e))
669    }
670
671    pub fn from_sled(subscriber: sled::Subscriber) -> Self {
672        Self {
673            inner: subscriber,
674            _key: PhantomData,
675            _value: PhantomData,
676        }
677    }
678}
679
680use core::future::Future;
681use core::pin::Pin;
682use core::task::{Context, Poll};
683impl<K: Key + Unpin, V: Value + Unpin> Future for Subscriber<K, V> {
684    type Output = Option<Event<K, V>>;
685
686    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
687        self.project()
688            .inner
689            .poll(cx)
690            .map(|opt| opt.map(|e| Event::from_sled(&e)))
691    }
692}
693
694impl<K, V> Iterator for Subscriber<K, V>
695where
696    K: Key,
697    V: Value,
698{
699    type Item = Event<K, V>;
700
701    fn next(&mut self) -> Option<Event<K, V>> {
702        self.inner.next().map(|e| Event::from_sled(&e))
703    }
704}
705
706pub enum Event<K, V>
707where
708    K: Key,
709    V: Value,
710{
711    Insert { key: K, value: V },
712    Remove { key: K },
713}
714
715impl<K, V> Event<K, V>
716where
717    K: Key,
718    V: Value,
719{
720    pub fn key(&self) -> &K {
721        match self {
722            Self::Insert { key, .. } | Self::Remove { key } => key,
723        }
724    }
725
726    pub fn from_sled(event: &sled::Event) -> Self {
727        match event {
728            sled::Event::Insert { key, value } => Self::Insert {
729                key: deserialize(key),
730                value: deserialize(value),
731            },
732            sled::Event::Remove { key } => Self::Remove {
733                key: deserialize(key),
734            },
735        }
736    }
737}
738
739/// The function which is used to deserialize all keys and values.
740pub fn deserialize<T>(bytes: &[u8]) -> T
741where
742    T: Decode,
743{
744    bincode::decode_from_slice::<T, BinConfT>(bytes, DEFAULT_CONF)
745        .expect("Decode failed, did the type encoded change?")
746        .0
747}
748
749/// The function which is used to serialize all keys and values.
750pub fn serialize<T>(value: &T) -> Vec<u8>
751where
752    T: Encode,
753{
754    bincode::encode_to_vec(value, DEFAULT_CONF).expect("Encode failed.")
755}
756
757#[cfg(test)]
758mod tests {
759    use super::*;
760
761    #[test]
762    fn test_range() {
763        let config = sled::Config::new().temporary(true);
764        let db = config.open().unwrap();
765
766        let tree: Tree<u32, u32> = Tree::open(&db, "test_tree");
767
768        tree.insert(&1, &2).unwrap();
769        tree.insert(&3, &4).unwrap();
770        tree.insert(&6, &2).unwrap();
771        tree.insert(&10, &2).unwrap();
772        tree.insert(&15, &2).unwrap();
773        tree.flush().unwrap();
774
775        let expect_results = [(6, 2), (10, 2)];
776
777        for (i, result) in tree.range(6..11).enumerate() {
778            assert_eq!(result.unwrap(), expect_results[i]);
779        }
780    }
781
782    #[test]
783    fn test_cas() {
784        let config = sled::Config::new().temporary(true);
785        let db = config.open().unwrap();
786
787        let tree: Tree<u32, u32> = Tree::open(&db, "test_tree");
788
789        let current = 2;
790        tree.insert(&1, &current).unwrap();
791        let expected = 3;
792        let proposed = 4;
793        let res = tree
794            .compare_and_swap(&1, Some(&expected), Some(&proposed))
795            .expect("db failure");
796
797        assert_eq!(
798            res,
799            Err(CompareAndSwapError {
800                current: Some(current),
801                proposed: Some(proposed),
802            }),
803        );
804    }
805}