typed_sled/
lib.rs

1//! typed-sled - a database build on top of sled.
2//!
3//! sled is a high-performance embedded database with an API that is similar to a `BTreeMap<[u8], [u8]>`.  
4//! typed-sled builds on top of sled and offers an API that is similar to a `BTreeMap<K, V>`, where
5//! K and V are user defined types which implement [Deserialize][serde::Deserialize] and [Serialize][serde::Serialize].
6//!
7//! # features
8//! Multiple features for common use cases are also available:
9//! * [search]: `SearchEngine` on top of a `Tree`.
10//! * [key_generating]: Create `Tree`s with automatically generated keys.
11//! * [convert]: Convert any `Tree` into another `Tree` with different key and value types.
12//! * [custom_serde]: Create `Tree`s with custom (de)serialization. This for example makes
13//!                   lazy or zero-copy (de)serialization possible.
14//!
15//! # Example
16//! ```
17//! use serde::{Deserialize, Serialize};
18//!
19//! #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
20//! struct SomeValue(u32);
21//!
22//! fn main() -> Result<(), Box<dyn std::error::Error>> {
23//!     // Creating a temporary sled database.
24//!     // If you want to persist the data use sled::open instead.
25//!     let db = sled::Config::new().temporary(true).open().unwrap();
26//!
27//!     // The id is used by sled to identify which Tree in the database (db) to open
28//!     let tree = typed_sled::Tree::<String, SomeValue>::open(&db, "unique_id");
29//!
30//!     tree.insert(&"some_key".to_owned(), &SomeValue(10))?;
31//!
32//!     assert_eq!(tree.get(&"some_key".to_owned())?, Some(SomeValue(10)));
33//!     Ok(())
34//! }
35//! ```
36//! [sled]: https://docs.rs/sled/latest/sled/
37
38pub use sled::{open, Config};
39use transaction::TransactionalTree;
40
41#[cfg(feature = "convert")]
42pub mod convert;
43#[cfg(feature = "key-generating")]
44pub mod key_generating;
45#[cfg(feature = "search")]
46pub mod search;
47pub mod transaction;
48
49pub mod custom_serde;
50
51use core::fmt;
52use core::iter::{DoubleEndedIterator, Iterator};
53use core::ops::{Bound, RangeBounds};
54use serde::Serialize;
55use sled::{
56    transaction::{ConflictableTransactionResult, TransactionResult},
57    IVec, Result,
58};
59use std::marker::PhantomData;
60
61// pub trait Bin = DeserializeOwned + Serialize + Clone + Send + Sync;
62
63/// A flash-sympathetic persistent lock-free B+ tree.
64///
65/// A `Tree` represents a single logical keyspace / namespace / bucket.
66///
67/// # Example
68/// ```
69/// use serde::{Deserialize, Serialize};
70///
71/// #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
72/// struct SomeValue(u32);
73///
74/// fn main() -> Result<(), Box<dyn std::error::Error>> {
75///     // Creating a temporary sled database.
76///     // If you want to persist the data use sled::open instead.
77///     let db = sled::Config::new().temporary(true).open().unwrap();
78///
79///     // The id is used by sled to identify which Tree in the database (db) to open.
80///     let tree = typed_sled::Tree::<String, SomeValue>::open(&db, "unique_id");
81///
82///     tree.insert(&"some_key".to_owned(), &SomeValue(10))?;
83///
84///     assert_eq!(tree.get(&"some_key".to_owned())?, Some(SomeValue(10)));
85///     Ok(())
86/// }
87/// ```
88#[derive(Debug)]
89pub struct Tree<K, V> {
90    inner: sled::Tree,
91    _key: PhantomData<fn() -> K>,
92    _value: PhantomData<fn() -> V>,
93}
94
95// Manual implementation to make ToOwned behave better.
96// With derive(Clone) to_owned() on a reference returns a reference.
97impl<K, V> Clone for Tree<K, V> {
98    fn clone(&self) -> Self {
99        Self {
100            inner: self.inner.clone(),
101            _key: PhantomData,
102            _value: PhantomData,
103        }
104    }
105}
106
107/// Trait alias for bounds required on keys and values.
108/// For now only types that implement DeserializeOwned
109/// are supported.
110// [specilization] might make
111// supporting any type that implements Deserialize<'a>
112// possible without much overhead. Otherwise the branch
113// custom_de_serialization introduces custom (de)serialization
114// for each `Tree` which might also make it possible.
115//
116// [specialization]: https://github.com/rust-lang/rust/issues/31844
117pub trait KV: serde::de::DeserializeOwned + Serialize {}
118
119impl<T: serde::de::DeserializeOwned + Serialize> KV for T {}
120
121/// Compare and swap error.
122#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
123pub struct CompareAndSwapError<V> {
124    /// The current value which caused your CAS to fail.
125    pub current: Option<V>,
126    /// Returned value that was proposed unsuccessfully.
127    pub proposed: Option<V>,
128}
129
130impl<V> fmt::Display for CompareAndSwapError<V> {
131    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132        write!(f, "Compare and swap conflict")
133    }
134}
135
136// implemented like this in the sled source
137impl<V: std::fmt::Debug> std::error::Error for CompareAndSwapError<V> {}
138
139// These Trait bounds should probably be specified on the functions themselves, but too lazy.
140impl<K, V> Tree<K, V> {
141    /// Initialize a typed tree. The id identifies the tree to be opened from the db.
142    /// # Example
143    ///
144    /// ```
145    /// use serde::{Deserialize, Serialize};
146    ///
147    /// #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
148    /// struct SomeValue(u32);
149    ///
150    /// fn main() -> Result<(), Box<dyn std::error::Error>> {
151    ///     // Creating a temporary sled database.
152    ///     // If you want to persist the data use sled::open instead.
153    ///     let db = sled::Config::new().temporary(true).open().unwrap();
154    ///
155    ///     // The id is used by sled to identify which Tree in the database (db) to open.
156    ///     let tree = typed_sled::Tree::<String, SomeValue>::open(&db, "unique_id");
157    ///
158    ///     tree.insert(&"some_key".to_owned(), &SomeValue(10))?;
159    ///
160    ///     assert_eq!(tree.get(&"some_key".to_owned())?, Some(SomeValue(10)));
161    ///     Ok(())
162    /// }
163    /// ```
164    pub fn open<T: AsRef<str>>(db: &sled::Db, id: T) -> Self {
165        Self {
166            inner: db.open_tree(id.as_ref()).unwrap(),
167            _key: PhantomData,
168            _value: PhantomData,
169        }
170    }
171
172    /// Insert a key to a new value, returning the last value if it was set.
173    pub fn insert(&self, key: &K, value: &V) -> Result<Option<V>>
174    where
175        K: KV,
176        V: KV,
177    {
178        self.inner
179            .insert(serialize(key), serialize(value))
180            .map(|opt| opt.map(|old_value| deserialize(&old_value)))
181    }
182
183    /// Perform a multi-key serializable transaction.
184    pub fn transaction<F, A, E>(&self, f: F) -> TransactionResult<A, E>
185    where
186        F: Fn(&TransactionalTree<K, V>) -> ConflictableTransactionResult<A, E>,
187    {
188        self.inner.transaction(|sled_transactional_tree| {
189            f(&TransactionalTree::new(sled_transactional_tree))
190        })
191    }
192
193    /// Create a new batched update that can be atomically applied.
194    ///
195    /// It is possible to apply a Batch in a transaction as well, which is the way you can apply a Batch to multiple Trees atomically.
196    pub fn apply_batch(&self, batch: Batch<K, V>) -> Result<()> {
197        self.inner.apply_batch(batch.inner)
198    }
199
200    /// Retrieve a value from the Tree if it exists.
201    pub fn get(&self, key: &K) -> Result<Option<V>>
202    where
203        K: KV,
204        V: KV,
205    {
206        self.inner
207            .get(serialize(key))
208            .map(|opt| opt.map(|v| deserialize(&v)))
209    }
210
211    /// Retrieve a value from the Tree if it exists. The key must be in serialized form.
212    pub fn get_from_raw<B: AsRef<[u8]>>(&self, key_bytes: B) -> Result<Option<V>>
213    where
214        K: KV,
215        V: KV,
216    {
217        self.inner
218            .get(key_bytes.as_ref())
219            .map(|opt| opt.map(|v| deserialize(&v)))
220    }
221
222    /// Deserialize a key and retrieve it's value from the Tree if it exists.
223    /// The deserialization is only done if a value was retrieved successfully.
224    pub fn get_kv_from_raw<B: AsRef<[u8]>>(&self, key_bytes: B) -> Result<Option<(K, V)>>
225    where
226        K: KV,
227        V: KV,
228    {
229        self.inner
230            .get(key_bytes.as_ref())
231            .map(|opt| opt.map(|v| (deserialize(key_bytes.as_ref()), deserialize(&v))))
232    }
233
234    /// Delete a value, returning the old value if it existed.
235    pub fn remove(&self, key: &K) -> Result<Option<V>>
236    where
237        K: KV,
238        V: KV,
239    {
240        self.inner
241            .remove(serialize(key))
242            .map(|opt| opt.map(|v| deserialize(&v)))
243    }
244
245    /// Compare and swap. Capable of unique creation, conditional modification, or deletion. If old is None, this will only set the value if it doesn't exist yet. If new is None, will delete the value if old is correct. If both old and new are Some, will modify the value if old is correct.
246    ///
247    /// It returns Ok(Ok(())) if operation finishes successfully.
248    ///
249    /// If it fails it returns: - Ok(Err(CompareAndSwapError(current, proposed))) if operation failed to setup a new value. CompareAndSwapError contains current and proposed values. - Err(Error::Unsupported) if the database is opened in read-only mode.
250    pub fn compare_and_swap(
251        &self,
252        key: &K,
253        old: Option<&V>,
254        new: Option<&V>,
255    ) -> Result<core::result::Result<(), CompareAndSwapError<V>>>
256    where
257        K: KV,
258        V: KV,
259    {
260        self.inner
261            .compare_and_swap(
262                serialize(key),
263                old.map(|old| serialize(old)),
264                new.map(|new| serialize(new)),
265            )
266            .map(|cas_res| {
267                cas_res.map_err(|cas_err| CompareAndSwapError {
268                    current: cas_err.current.as_ref().map(|b| deserialize(b)),
269                    proposed: cas_err.proposed.as_ref().map(|b| deserialize(b)),
270                })
271            })
272    }
273
274    /// Fetch the value, apply a function to it and return the result.
275    // not sure if implemented correctly (different trait bound for F)
276    pub fn update_and_fetch<F>(&self, key: &K, mut f: F) -> Result<Option<V>>
277    where
278        K: KV,
279        V: KV,
280        F: FnMut(Option<V>) -> Option<V>,
281    {
282        self.inner
283            .update_and_fetch(serialize(&key), |opt_value| {
284                f(opt_value.map(|v| deserialize(v))).map(|v| serialize(&v))
285            })
286            .map(|res| res.map(|v| deserialize(&v)))
287    }
288
289    /// Fetch the value, apply a function to it and return the previous value.
290    // not sure if implemented correctly (different trait bound for F)
291    pub fn fetch_and_update<F>(&self, key: &K, mut f: F) -> Result<Option<V>>
292    where
293        K: KV,
294        V: KV,
295        F: FnMut(Option<V>) -> Option<V>,
296    {
297        self.inner
298            .fetch_and_update(serialize(key), |opt_value| {
299                f(opt_value.map(|v| deserialize(v))).map(|v| serialize(&v))
300            })
301            .map(|res| res.map(|v| deserialize(&v)))
302    }
303
304    /// Subscribe to `Event`s that happen to keys that have
305    /// the specified prefix. Events for particular keys are
306    /// guaranteed to be witnessed in the same order by all
307    /// threads, but threads may witness different interleavings
308    /// of `Event`s across different keys. If subscribers don't
309    /// keep up with new writes, they will cause new writes
310    /// to block. There is a buffer of 1024 items per
311    /// `Subscriber`. This can be used to build reactive
312    /// and replicated systems.
313    pub fn watch_prefix(&self, prefix: &K) -> Subscriber<K, V>
314    where
315        K: KV,
316    {
317        Subscriber::from_sled(self.inner.watch_prefix(serialize(prefix)))
318    }
319
320    /// Subscribe to  all`Event`s. Events for particular keys are
321    /// guaranteed to be witnessed in the same order by all
322    /// threads, but threads may witness different interleavings
323    /// of `Event`s across different keys. If subscribers don't
324    /// keep up with new writes, they will cause new writes
325    /// to block. There is a buffer of 1024 items per
326    /// `Subscriber`. This can be used to build reactive
327    /// and replicated systems.
328    pub fn watch_all(&self) -> Subscriber<K, V>
329    where
330        K: KV,
331    {
332        Subscriber::from_sled(self.inner.watch_prefix(vec![]))
333    }
334
335    /// Synchronously flushes all dirty IO buffers and calls
336    /// fsync. If this succeeds, it is guaranteed that all
337    /// previous writes will be recovered if the system
338    /// crashes. Returns the number of bytes flushed during
339    /// this call.
340    ///
341    /// Flushing can take quite a lot of time, and you should
342    /// measure the performance impact of using it on
343    /// realistic sustained workloads running on realistic
344    /// hardware.
345    pub fn flush(&self) -> Result<usize> {
346        self.inner.flush()
347    }
348
349    /// Asynchronously flushes all dirty IO buffers
350    /// and calls fsync. If this succeeds, it is
351    /// guaranteed that all previous writes will
352    /// be recovered if the system crashes. Returns
353    /// the number of bytes flushed during this call.
354    ///
355    /// Flushing can take quite a lot of time, and you
356    /// should measure the performance impact of
357    /// using it on realistic sustained workloads
358    /// running on realistic hardware.
359    pub async fn flush_async(&self) -> Result<usize> {
360        self.inner.flush_async().await
361    }
362
363    /// Returns `true` if the `Tree` contains a value for
364    /// the specified key.
365    pub fn contains_key(&self, key: &K) -> Result<bool>
366    where
367        K: KV,
368    {
369        self.inner.contains_key(serialize(key))
370    }
371
372    /// Retrieve the key and value before the provided key,
373    /// if one exists.
374    pub fn get_lt(&self, key: &K) -> Result<Option<(K, V)>>
375    where
376        K: KV,
377        V: KV,
378    {
379        self.inner
380            .get_lt(serialize(key))
381            .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
382    }
383
384    /// Retrieve the next key and value from the `Tree` after the
385    /// provided key.
386    pub fn get_gt(&self, key: &K) -> Result<Option<(K, V)>>
387    where
388        K: KV,
389        V: KV,
390    {
391        self.inner
392            .get_gt(serialize(key))
393            .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
394    }
395
396    /// Merge state directly into a given key's value using the
397    /// configured merge operator. This allows state to be written
398    /// into a value directly, without any read-modify-write steps.
399    /// Merge operators can be used to implement arbitrary data
400    /// structures.
401    ///
402    /// Calling `merge` will return an `Unsupported` error if it
403    /// is called without first setting a merge operator function.
404    ///
405    /// Merge operators are shared by all instances of a particular
406    /// `Tree`. Different merge operators may be set on different
407    /// `Tree`s.
408    pub fn merge(&self, key: &K, value: &V) -> Result<Option<V>>
409    where
410        K: KV,
411        V: KV,
412    {
413        self.inner
414            .merge(serialize(key), serialize(value))
415            .map(|res| res.map(|old_v| deserialize(&old_v)))
416    }
417
418    // TODO: implement using own MergeOperator trait
419    /// Sets a merge operator for use with the `merge` function.
420    ///
421    /// Merge state directly into a given key's value using the
422    /// configured merge operator. This allows state to be written
423    /// into a value directly, without any read-modify-write steps.
424    /// Merge operators can be used to implement arbitrary data
425    /// structures.
426    ///
427    /// # Panics
428    ///
429    /// Calling `merge` will panic if no merge operator has been
430    /// configured.
431    pub fn set_merge_operator(&self, merge_operator: impl MergeOperator<K, V> + 'static)
432    where
433        K: KV,
434        V: KV,
435    {
436        self.inner
437            .set_merge_operator(move |key: &[u8], old_v: Option<&[u8]>, value: &[u8]| {
438                let opt_v = merge_operator(
439                    deserialize(key),
440                    old_v.map(|v| deserialize(v)),
441                    deserialize(value),
442                );
443                opt_v.map(|v| serialize(&v))
444            });
445    }
446
447    /// Create a double-ended iterator over the tuples of keys and
448    /// values in this tree.
449    pub fn iter(&self) -> Iter<K, V> {
450        Iter::from_sled(self.inner.iter())
451    }
452
453    /// Create a double-ended iterator over tuples of keys and values,
454    /// where the keys fall within the specified range.
455    pub fn range<R: RangeBounds<K>>(&self, range: R) -> Iter<K, V>
456    where
457        K: KV + std::fmt::Debug,
458    {
459        match (range.start_bound(), range.end_bound()) {
460            (Bound::Unbounded, Bound::Unbounded) => {
461                Iter::from_sled(self.inner.range::<&[u8], _>(..))
462            }
463            (Bound::Unbounded, Bound::Excluded(b)) => {
464                Iter::from_sled(self.inner.range(..serialize(b)))
465            }
466            (Bound::Unbounded, Bound::Included(b)) => {
467                Iter::from_sled(self.inner.range(..=serialize(b)))
468            }
469            // FIX: This is not excluding lower bound.
470            (Bound::Excluded(b), Bound::Unbounded) => {
471                Iter::from_sled(self.inner.range(serialize(b)..))
472            }
473            (Bound::Excluded(b), Bound::Excluded(bb)) => {
474                Iter::from_sled(self.inner.range(serialize(b)..serialize(bb)))
475            }
476            (Bound::Excluded(b), Bound::Included(bb)) => {
477                Iter::from_sled(self.inner.range(serialize(b)..=serialize(bb)))
478            }
479            (Bound::Included(b), Bound::Unbounded) => {
480                Iter::from_sled(self.inner.range(serialize(b)..))
481            }
482            (Bound::Included(b), Bound::Excluded(bb)) => {
483                Iter::from_sled(self.inner.range(serialize(b)..serialize(bb)))
484            }
485            (Bound::Included(b), Bound::Included(bb)) => {
486                Iter::from_sled(self.inner.range(serialize(b)..=serialize(bb)))
487            }
488        }
489    }
490
491    /// Create an iterator over tuples of keys and values,
492    /// where the all the keys starts with the given prefix.
493    pub fn scan_prefix(&self, prefix: &K) -> Iter<K, V>
494    where
495        K: KV,
496    {
497        Iter::from_sled(self.inner.scan_prefix(serialize(prefix)))
498    }
499
500    /// Returns the first key and value in the `Tree`, or
501    /// `None` if the `Tree` is empty.
502    pub fn first(&self) -> Result<Option<(K, V)>>
503    where
504        K: KV,
505        V: KV,
506    {
507        self.inner
508            .first()
509            .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
510    }
511
512    /// Returns the last key and value in the `Tree`, or
513    /// `None` if the `Tree` is empty.
514    pub fn last(&self) -> Result<Option<(K, V)>>
515    where
516        K: KV,
517        V: KV,
518    {
519        self.inner
520            .last()
521            .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
522    }
523
524    /// Atomically removes the maximum item in the `Tree` instance.
525    pub fn pop_max(&self) -> Result<Option<(K, V)>>
526    where
527        K: KV,
528        V: KV,
529    {
530        self.inner
531            .pop_max()
532            .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
533    }
534
535    /// Atomically removes the minimum item in the `Tree` instance.
536    pub fn pop_min(&self) -> Result<Option<(K, V)>>
537    where
538        K: KV,
539        V: KV,
540    {
541        self.inner
542            .pop_min()
543            .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
544    }
545
546    /// Returns the number of elements in this tree.
547    pub fn len(&self) -> usize {
548        self.inner.len()
549    }
550
551    /// Returns `true` if the `Tree` contains no elements.
552    pub fn is_empty(&self) -> bool {
553        self.inner.is_empty()
554    }
555
556    /// Clears the `Tree`, removing all values.
557    ///
558    /// Note that this is not atomic.
559    pub fn clear(&self) -> Result<()> {
560        self.inner.clear()
561    }
562
563    /// Returns the name of the tree.
564    pub fn name(&self) -> IVec {
565        self.inner.name()
566    }
567
568    /// Returns the CRC32 of all keys and values
569    /// in this Tree.
570    ///
571    /// This is O(N) and locks the underlying tree
572    /// for the duration of the entire scan.
573    pub fn checksum(&self) -> Result<u32> {
574        self.inner.checksum()
575    }
576}
577
578/// # Examples
579///
580/// ```
581/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
582/// use sled::{Config, IVec};
583///
584/// fn concatenate_merge(
585///   _key: String,               // the key being merged
586///   old_value: Option<Vec<f32>>,  // the previous value, if one existed
587///   merged_bytes: Vec<f32>        // the new bytes being merged in
588/// ) -> Option<Vec<f32>> {       // set the new value, return None to delete
589///   let mut ret = old_value
590///     .map(|ov| ov.to_vec())
591///     .unwrap_or_else(|| vec![]);
592///
593///   ret.extend_from_slice(&merged_bytes);
594///
595///   Some(ret)
596/// }
597///
598/// let db = sled::Config::new()
599///   .temporary(true).open()?;
600///
601/// let tree = typed_sled::Tree::<String, Vec<f32>>::open(&db, "unique_id");
602/// tree.set_merge_operator(concatenate_merge);
603///
604/// let k = String::from("some_key");
605///
606/// tree.insert(&k, &vec![0.0]);
607/// tree.merge(&k, &vec![1.0]);
608/// tree.merge(&k, &vec![2.0]);
609/// assert_eq!(tree.get(&k)?, Some(vec![0.0, 1.0, 2.0]));
610///
611/// // Replace previously merged data. The merge function will not be called.
612/// tree.insert(&k, &vec![3.0]);
613/// assert_eq!(tree.get(&k)?, Some(vec![3.0]));
614///
615/// // Merges on non-present values will cause the merge function to be called
616/// // with `old_value == None`. If the merge function returns something (which it
617/// // does, in this case) a new value will be inserted.
618/// tree.remove(&k);
619/// tree.merge(&k, &vec![4.0]);
620/// assert_eq!(tree.get(&k)?, Some(vec![4.0]));
621/// # Ok(()) }
622/// ```
623pub trait MergeOperator<K, V>: Fn(K, Option<V>, V) -> Option<V> {}
624
625impl<K, V, F> MergeOperator<K, V> for F where F: Fn(K, Option<V>, V) -> Option<V> {}
626
627pub struct Iter<K, V> {
628    inner: sled::Iter,
629    _key: PhantomData<fn() -> K>,
630    _value: PhantomData<fn() -> V>,
631}
632
633impl<K: KV, V: KV> Iterator for Iter<K, V> {
634    type Item = Result<(K, V)>;
635
636    fn next(&mut self) -> Option<Self::Item> {
637        self.inner
638            .next()
639            .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
640    }
641
642    fn last(mut self) -> Option<Self::Item> {
643        self.inner
644            .next_back()
645            .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
646    }
647}
648
649impl<K: KV, V: KV> DoubleEndedIterator for Iter<K, V> {
650    fn next_back(&mut self) -> Option<Self::Item> {
651        self.inner
652            .next_back()
653            .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
654    }
655}
656
657impl<K, V> Iter<K, V> {
658    pub fn from_sled(iter: sled::Iter) -> Self {
659        Iter {
660            inner: iter,
661            _key: PhantomData,
662            _value: PhantomData,
663        }
664    }
665
666    pub fn keys(self) -> impl DoubleEndedIterator<Item = Result<K>> + Send + Sync
667    where
668        K: KV + Send + Sync,
669        V: KV + Send + Sync,
670    {
671        self.map(|r| r.map(|(k, _v)| k))
672    }
673
674    /// Iterate over the values of this Tree
675    pub fn values(self) -> impl DoubleEndedIterator<Item = Result<V>> + Send + Sync
676    where
677        K: KV + Send + Sync,
678        V: KV + Send + Sync,
679    {
680        self.map(|r| r.map(|(_k, v)| v))
681    }
682}
683
684#[derive(Clone, Debug)]
685pub struct Batch<K, V> {
686    inner: sled::Batch,
687    _key: PhantomData<fn() -> K>,
688    _value: PhantomData<fn() -> V>,
689}
690
691impl<K, V> Batch<K, V> {
692    pub fn insert(&mut self, key: &K, value: &V)
693    where
694        K: KV,
695        V: KV,
696    {
697        self.inner.insert(serialize(key), serialize(value));
698    }
699
700    pub fn remove(&mut self, key: &K)
701    where
702        K: KV,
703    {
704        self.inner.remove(serialize(key))
705    }
706}
707
708// Implementing Default manually to not require K and V to implement Default.
709impl<K, V> Default for Batch<K, V> {
710    fn default() -> Self {
711        Self {
712            inner: Default::default(),
713            _key: PhantomData,
714            _value: PhantomData,
715        }
716    }
717}
718
719use pin_project::pin_project;
720#[pin_project]
721pub struct Subscriber<K, V> {
722    #[pin]
723    inner: sled::Subscriber,
724    _key: PhantomData<fn() -> K>,
725    _value: PhantomData<fn() -> V>,
726}
727
728impl<K, V> Subscriber<K, V> {
729    pub fn next_timeout(
730        &mut self,
731        timeout: core::time::Duration,
732    ) -> core::result::Result<Event<K, V>, std::sync::mpsc::RecvTimeoutError>
733    where
734        K: KV,
735        V: KV,
736    {
737        self.inner
738            .next_timeout(timeout)
739            .map(|e| Event::from_sled(&e))
740    }
741
742    pub fn from_sled(subscriber: sled::Subscriber) -> Self {
743        Self {
744            inner: subscriber,
745            _key: PhantomData,
746            _value: PhantomData,
747        }
748    }
749}
750
751use core::future::Future;
752use core::pin::Pin;
753use core::task::{Context, Poll};
754impl<K: KV + Unpin, V: KV + Unpin> Future for Subscriber<K, V> {
755    type Output = Option<Event<K, V>>;
756
757    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
758        self.project()
759            .inner
760            .poll(cx)
761            .map(|opt| opt.map(|e| Event::from_sled(&e)))
762    }
763}
764
765impl<K: KV, V: KV> Iterator for Subscriber<K, V> {
766    type Item = Event<K, V>;
767
768    fn next(&mut self) -> Option<Event<K, V>> {
769        self.inner.next().map(|e| Event::from_sled(&e))
770    }
771}
772
773pub enum Event<K, V> {
774    Insert { key: K, value: V },
775    Remove { key: K },
776}
777
778impl<K, V> Event<K, V> {
779    pub fn key(&self) -> &K
780    where
781        K: KV,
782    {
783        match self {
784            Self::Insert { key, .. } | Self::Remove { key } => key,
785        }
786    }
787
788    pub fn from_sled(event: &sled::Event) -> Self
789    where
790        K: KV,
791        V: KV,
792    {
793        match event {
794            sled::Event::Insert { key, value } => Self::Insert {
795                key: deserialize(key),
796                value: deserialize(value),
797            },
798            sled::Event::Remove { key } => Self::Remove {
799                key: deserialize(key),
800            },
801        }
802    }
803}
804
805/// The function which is used to deserialize all keys and values.
806pub fn deserialize<'a, T>(bytes: &'a [u8]) -> T
807where
808    T: serde::de::Deserialize<'a>,
809{
810    bincode::deserialize(bytes).expect("deserialization failed, did the type serialized change?")
811}
812
813/// The function which is used to serialize all keys and values.
814pub fn serialize<T>(value: &T) -> Vec<u8>
815where
816    T: serde::Serialize,
817{
818    bincode::serialize(value).expect("serialization failed, did the type serialized change?")
819}
820
821#[cfg(test)]
822mod tests {
823    use super::*;
824
825    #[test]
826    fn test_range() {
827        let config = sled::Config::new().temporary(true);
828        let db = config.open().unwrap();
829
830        let tree: Tree<u32, u32> = Tree::open(&db, "test_tree");
831
832        tree.insert(&1, &2).unwrap();
833        tree.insert(&3, &4).unwrap();
834        tree.insert(&6, &2).unwrap();
835        tree.insert(&10, &2).unwrap();
836        tree.insert(&15, &2).unwrap();
837        tree.flush().unwrap();
838
839        let expect_results = [(6, 2), (10, 2)];
840
841        for (i, result) in tree.range(6..11).enumerate() {
842            assert_eq!(result.unwrap(), expect_results[i]);
843        }
844    }
845
846    #[test]
847    fn test_cas() {
848        let config = sled::Config::new().temporary(true);
849        let db = config.open().unwrap();
850
851        let tree: Tree<u32, u32> = Tree::open(&db, "test_tree");
852
853        let current = 2;
854        tree.insert(&1, &current).unwrap();
855        let expected = 3;
856        let proposed = 4;
857        let res = tree
858            .compare_and_swap(&1, Some(&expected), Some(&proposed))
859            .expect("db failure");
860
861        assert_eq!(
862            res,
863            Err(CompareAndSwapError {
864                current: Some(current),
865                proposed: Some(proposed),
866            }),
867        );
868    }
869}