bincode_sled/lib.rs
1//! typed-sled - a database build on top of sled.
2//!
3//! sled is a high-performance embedded database with an API that is similar to a `BTreeMap<[u8], [u8]>`.
4//! typed-sled builds on top of sled and offers an API that is similar to a `BTreeMap<K, V>`, where
5//! K and V are user defined types which implement [Deserialize][serde::Deserialize] and [Serialize][serde::Serialize].
6//!
7//! # features
8//! Multiple features for common use cases are also available:
9//! * [search]: `SearchEngine` on top of a `Tree`.
10//! * [key_generating]: Create `Tree`s with automatically generated keys.
11//! * [convert]: Convert any `Tree` into another `Tree` with different key and value types.
12//!
13//! [sled]: https://docs.rs/sled/latest/sled/
14
15use bincode::config::{Fixint, LittleEndian, NoLimit};
16use bincode::{Decode, Encode};
17pub use sled::{open, Config};
18use transaction::TransactionalTree;
19
20#[cfg(feature = "convert")]
21pub mod convert;
22#[cfg(feature = "key-generating")]
23pub mod key_generating;
24#[cfg(feature = "search")]
25pub mod search;
26pub mod transaction;
27
28use core::fmt;
29use core::iter::{DoubleEndedIterator, Iterator};
30use core::ops::{Bound, RangeBounds};
31use sled::{
32 transaction::{ConflictableTransactionResult, TransactionResult},
33 IVec, Result,
34};
35use std::marker::PhantomData;
36
37pub const DEFAULT_CONF: bincode::config::Configuration<LittleEndian, Fixint, NoLimit> =
38 bincode::config::standard()
39 .with_little_endian()
40 .with_fixed_int_encoding()
41 .with_no_limit();
42type BinConfT = bincode::config::Configuration<LittleEndian, Fixint, NoLimit>;
43
44// pub trait Bin = DeserializeOwned + Serialize + Clone + Send + Sync;
45
46/// A flash-sympathetic persistent lock-free B+ tree.
47///
48/// A `Tree` represents a single logical keyspace / namespace / bucket.
49///
50/// # Example
51/// ```
52/// use bincode::{Encode, Decode};
53///
54/// #[derive(Debug, Clone, Encode, Decode, PartialEq)]
55/// struct SomeValue(u32);
56///
57/// fn main() -> Result<(), Box<dyn std::error::Error>> {
58/// // Creating a temporary sled database.
59/// // If you want to persist the data use sled::open instead.
60/// let db = sled::Config::new().temporary(true).open().unwrap();
61///
62/// // The id is used by sled to identify which Tree in the database (db) to open.
63/// let tree = bincode_sled::Tree::<String, SomeValue>::open(&db, "unique_id");
64///
65/// tree.insert(&"some_key".to_owned(), &SomeValue(10))?;
66///
67/// assert_eq!(tree.get(&"some_key".to_owned())?, Some(SomeValue(10)));
68/// Ok(())
69/// }
70/// ```
71#[derive(Debug)]
72pub struct Tree<K, V>
73where
74 K: Key,
75 V: Value,
76{
77 inner: sled::Tree,
78 _key: PhantomData<fn() -> K>,
79 _value: PhantomData<fn() -> V>,
80}
81
82// Manual implementation to make ToOwned behave better.
83// With derive(Clone) to_owned() on a reference returns a reference.
84impl<K, V> Clone for Tree<K, V>
85where
86 K: Key,
87 V: Value,
88{
89 fn clone(&self) -> Self {
90 Self {
91 inner: self.inner.clone(),
92 _key: PhantomData,
93 _value: PhantomData,
94 }
95 }
96}
97
98/// Trait alias for bounds required on keys and values.
99/// For now only types that implement DeserializeOwned
100/// are supported.
101// [specilization] might make
102// supporting any type that implements Deserialize<'a>
103// possible without much overhead. Otherwise the branch
104// custom_de_serialization introduces custom (de)serialization
105// for each `Tree` which might also make it possible.
106//
107// [specialization]: https://github.com/rust-lang/rust/issues/31844
108pub trait Key: Encode + Decode + Send {}
109
110impl<T: Encode + Decode + Send + Sync> Key for T {}
111
112pub trait Value: Encode + Decode + Send {}
113impl<T: Encode + Decode + Send + Sync> Value for T {}
114
115/// Compare and swap error.
116#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
117pub struct CompareAndSwapError<V> {
118 /// The current value which caused your CAS to fail.
119 pub current: Option<V>,
120 /// Returned value that was proposed unsuccessfully.
121 pub proposed: Option<V>,
122}
123
124impl<V> fmt::Display for CompareAndSwapError<V> {
125 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126 write!(f, "Compare and swap conflict")
127 }
128}
129
130// implemented like this in the sled source
131impl<V: std::fmt::Debug> std::error::Error for CompareAndSwapError<V> {}
132
133// These Trait bounds should probably be specified on the functions themselves, but too lazy.
134impl<K, V> Tree<K, V>
135where
136 K: Key,
137 V: Value,
138{
139 /// Initialize a typed tree. The id identifies the tree to be opened from the db.
140 /// # Example
141 ///
142 /// ```
143 /// use bincode::{Encode, Decode};
144 ///
145 /// #[derive(Debug, Clone, Encode, Decode, PartialEq)]
146 /// struct SomeValue(u32);
147 ///
148 /// fn main() -> Result<(), Box<dyn std::error::Error>> {
149 /// // Creating a temporary sled database.
150 /// // If you want to persist the data use sled::open instead.
151 /// let db = sled::Config::new().temporary(true).open().unwrap();
152 ///
153 /// // The id is used by sled to identify which Tree in the database (db) to open.
154 /// let tree = bincode_sled::Tree::<String, SomeValue>::open(&db, "unique_id");
155 ///
156 /// tree.insert(&"some_key".to_owned(), &SomeValue(10))?;
157 ///
158 /// assert_eq!(tree.get(&"some_key".to_owned())?, Some(SomeValue(10)));
159 /// Ok(())
160 /// }
161 /// ```
162 pub fn open<T: AsRef<str>>(db: &sled::Db, id: T) -> Self {
163 Self {
164 inner: db.open_tree(id.as_ref()).unwrap(),
165 _key: PhantomData,
166 _value: PhantomData,
167 }
168 }
169
170 /// Insert a key to a new value, returning the last value if it was set.
171 pub fn insert(&self, key: &K, value: &V) -> Result<Option<V>> {
172 self.inner
173 .insert(serialize(key), serialize(value))
174 .map(|opt| opt.map(|old_value| deserialize(&old_value)))
175 }
176
177 /// Perform a multi-key serializable transaction.
178 pub fn transaction<F, A, E>(&self, f: F) -> TransactionResult<A, E>
179 where
180 F: Fn(&TransactionalTree<K, V>) -> ConflictableTransactionResult<A, E>,
181 {
182 self.inner.transaction(|sled_transactional_tree| {
183 f(&TransactionalTree::new(sled_transactional_tree))
184 })
185 }
186
187 /// Create a new batched update that can be atomically applied.
188 ///
189 /// It is possible to apply a Batch in a transaction as well, which is the way you can apply a Batch to multiple Trees atomically.
190 pub fn apply_batch(&self, batch: Batch<K, V>) -> Result<()> {
191 self.inner.apply_batch(batch.inner)
192 }
193
194 /// Retrieve a value from the Tree if it exists.
195 pub fn get(&self, key: &K) -> Result<Option<V>> {
196 self.inner
197 .get(serialize(key))
198 .map(|opt| opt.map(|v| deserialize(&v)))
199 }
200
201 /// Retrieve a value from the Tree if it exists. The key must be in serialized form.
202 pub fn get_from_raw<B: AsRef<[u8]>>(&self, key_bytes: B) -> Result<Option<V>> {
203 self.inner
204 .get(key_bytes.as_ref())
205 .map(|opt| opt.map(|v| deserialize(&v)))
206 }
207
208 /// Deserialize a key and retrieve it's value from the Tree if it exists.
209 /// The deserialization is only done if a value was retrieved successfully.
210 pub fn get_kv_from_raw<B: AsRef<[u8]>>(&self, key_bytes: B) -> Result<Option<(K, V)>> {
211 self.inner
212 .get(key_bytes.as_ref())
213 .map(|opt| opt.map(|v| (deserialize(key_bytes.as_ref()), deserialize(&v))))
214 }
215
216 /// Delete a value, returning the old value if it existed.
217 pub fn remove(&self, key: &K) -> Result<Option<V>> {
218 self.inner
219 .remove(serialize(key))
220 .map(|opt| opt.map(|v| deserialize(&v)))
221 }
222
223 /// Compare and swap. Capable of unique creation, conditional modification, or deletion. If old is None, this will only set the value if it doesn't exist yet. If new is None, will delete the value if old is correct. If both old and new are Some, will modify the value if old is correct.
224 ///
225 /// It returns Ok(Ok(())) if operation finishes successfully.
226 ///
227 /// If it fails it returns: - Ok(Err(CompareAndSwapError(current, proposed))) if operation failed to setup a new value. CompareAndSwapError contains current and proposed values. - Err(Error::Unsupported) if the database is opened in read-only mode.
228 pub fn compare_and_swap(
229 &self,
230 key: &K,
231 old: Option<&V>,
232 new: Option<&V>,
233 ) -> Result<core::result::Result<(), CompareAndSwapError<V>>> {
234 self.inner
235 .compare_and_swap(
236 serialize(key),
237 old.map(|old| serialize(old)),
238 new.map(|new| serialize(new)),
239 )
240 .map(|cas_res| {
241 cas_res.map_err(|cas_err| CompareAndSwapError {
242 current: cas_err.current.as_ref().map(|b| deserialize(b)),
243 proposed: cas_err.proposed.as_ref().map(|b| deserialize(b)),
244 })
245 })
246 }
247
248 /// Fetch the value, apply a function to it and return the result.
249 // not sure if implemented correctly (different trait bound for F)
250 pub fn update_and_fetch<F>(&self, key: &K, mut f: F) -> Result<Option<V>>
251 where
252 F: FnMut(Option<V>) -> Option<V>,
253 {
254 self.inner
255 .update_and_fetch(serialize(&key), |opt_value| {
256 f(opt_value.map(|v| deserialize(v))).map(|v| serialize(&v))
257 })
258 .map(|res| res.map(|v| deserialize(&v)))
259 }
260
261 /// Fetch the value, apply a function to it and return the previous value.
262 // not sure if implemented correctly (different trait bound for F)
263 pub fn fetch_and_update<F>(&self, key: &K, mut f: F) -> Result<Option<V>>
264 where
265 F: FnMut(Option<V>) -> Option<V>,
266 {
267 self.inner
268 .fetch_and_update(serialize(key), |opt_value| {
269 f(opt_value.map(|v| deserialize(v))).map(|v| serialize(&v))
270 })
271 .map(|res| res.map(|v| deserialize(&v)))
272 }
273
274 /// Subscribe to `Event`s that happen to keys that have
275 /// the specified prefix. Events for particular keys are
276 /// guaranteed to be witnessed in the same order by all
277 /// threads, but threads may witness different interleavings
278 /// of `Event`s across different keys. If subscribers don't
279 /// keep up with new writes, they will cause new writes
280 /// to block. There is a buffer of 1024 items per
281 /// `Subscriber`. This can be used to build reactive
282 /// and replicated systems.
283 pub fn watch_prefix(&self, prefix: &K) -> Subscriber<K, V> {
284 Subscriber::from_sled(self.inner.watch_prefix(serialize(prefix)))
285 }
286
287 /// Subscribe to all`Event`s. Events for particular keys are
288 /// guaranteed to be witnessed in the same order by all
289 /// threads, but threads may witness different interleavings
290 /// of `Event`s across different keys. If subscribers don't
291 /// keep up with new writes, they will cause new writes
292 /// to block. There is a buffer of 1024 items per
293 /// `Subscriber`. This can be used to build reactive
294 /// and replicated systems.
295 pub fn watch_all(&self) -> Subscriber<K, V> {
296 Subscriber::from_sled(self.inner.watch_prefix(vec![]))
297 }
298
299 /// Synchronously flushes all dirty IO buffers and calls
300 /// fsync. If this succeeds, it is guaranteed that all
301 /// previous writes will be recovered if the system
302 /// crashes. Returns the number of bytes flushed during
303 /// this call.
304 ///
305 /// Flushing can take quite a lot of time, and you should
306 /// measure the performance impact of using it on
307 /// realistic sustained workloads running on realistic
308 /// hardware.
309 pub fn flush(&self) -> Result<usize> {
310 self.inner.flush()
311 }
312
313 /// Asynchronously flushes all dirty IO buffers
314 /// and calls fsync. If this succeeds, it is
315 /// guaranteed that all previous writes will
316 /// be recovered if the system crashes. Returns
317 /// the number of bytes flushed during this call.
318 ///
319 /// Flushing can take quite a lot of time, and you
320 /// should measure the performance impact of
321 /// using it on realistic sustained workloads
322 /// running on realistic hardware.
323 pub async fn flush_async(&self) -> Result<usize> {
324 self.inner.flush_async().await
325 }
326
327 /// Returns `true` if the `Tree` contains a value for
328 /// the specified key.
329 pub fn contains_key(&self, key: &K) -> Result<bool> {
330 self.inner.contains_key(serialize(key))
331 }
332
333 /// Retrieve the key and value before the provided key,
334 /// if one exists.
335 pub fn get_lt(&self, key: &K) -> Result<Option<(K, V)>> {
336 self.inner
337 .get_lt(serialize(key))
338 .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
339 }
340
341 /// Retrieve the next key and value from the `Tree` after the
342 /// provided key.
343 pub fn get_gt(&self, key: &K) -> Result<Option<(K, V)>> {
344 self.inner
345 .get_gt(serialize(key))
346 .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
347 }
348
349 /// Merge state directly into a given key's value using the
350 /// configured merge operator. This allows state to be written
351 /// into a value directly, without any read-modify-write steps.
352 /// Merge operators can be used to implement arbitrary data
353 /// structures.
354 ///
355 /// Calling `merge` will return an `Unsupported` error if it
356 /// is called without first setting a merge operator function.
357 ///
358 /// Merge operators are shared by all instances of a particular
359 /// `Tree`. Different merge operators may be set on different
360 /// `Tree`s.
361 pub fn merge(&self, key: &K, value: &V) -> Result<Option<V>> {
362 self.inner
363 .merge(serialize(key), serialize(value))
364 .map(|res| res.map(|old_v| deserialize(&old_v)))
365 }
366
367 // TODO: implement using own MergeOperator trait
368 /// Sets a merge operator for use with the `merge` function.
369 ///
370 /// Merge state directly into a given key's value using the
371 /// configured merge operator. This allows state to be written
372 /// into a value directly, without any read-modify-write steps.
373 /// Merge operators can be used to implement arbitrary data
374 /// structures.
375 ///
376 /// # Panics
377 ///
378 /// Calling `merge` will panic if no merge operator has been
379 /// configured.
380 pub fn set_merge_operator(&self, merge_operator: impl MergeOperator<K, V> + 'static) {
381 self.inner
382 .set_merge_operator(move |key: &[u8], old_v: Option<&[u8]>, value: &[u8]| {
383 let opt_v = merge_operator(
384 deserialize(key),
385 old_v.map(|v| deserialize(v)),
386 deserialize(value),
387 );
388 opt_v.map(|v| serialize(&v))
389 });
390 }
391
392 /// Create a double-ended iterator over the tuples of keys and
393 /// values in this tree.
394 pub fn iter(&self) -> Iter<K, V> {
395 Iter::from_sled(self.inner.iter())
396 }
397
398 /// Create a double-ended iterator over tuples of keys and values,
399 /// where the keys fall within the specified range.
400 pub fn range<R: RangeBounds<K>>(&self, range: R) -> Iter<K, V> {
401 match (range.start_bound(), range.end_bound()) {
402 (Bound::Unbounded, Bound::Unbounded) => {
403 Iter::from_sled(self.inner.range::<&[u8], _>(..))
404 }
405 (Bound::Unbounded, Bound::Excluded(b)) => {
406 Iter::from_sled(self.inner.range(..serialize(b)))
407 }
408 (Bound::Unbounded, Bound::Included(b)) => {
409 Iter::from_sled(self.inner.range(..=serialize(b)))
410 }
411 // FIX: This is not excluding lower bound.
412 (Bound::Excluded(b), Bound::Unbounded) => {
413 Iter::from_sled(self.inner.range(serialize(b)..))
414 }
415 (Bound::Excluded(b), Bound::Excluded(bb)) => {
416 Iter::from_sled(self.inner.range(serialize(b)..serialize(bb)))
417 }
418 (Bound::Excluded(b), Bound::Included(bb)) => {
419 Iter::from_sled(self.inner.range(serialize(b)..=serialize(bb)))
420 }
421 (Bound::Included(b), Bound::Unbounded) => {
422 Iter::from_sled(self.inner.range(serialize(b)..))
423 }
424 (Bound::Included(b), Bound::Excluded(bb)) => {
425 Iter::from_sled(self.inner.range(serialize(b)..serialize(bb)))
426 }
427 (Bound::Included(b), Bound::Included(bb)) => {
428 Iter::from_sled(self.inner.range(serialize(b)..=serialize(bb)))
429 }
430 }
431 }
432
433 /// Create an iterator over tuples of keys and values,
434 /// where the all the keys starts with the given prefix.
435 pub fn scan_prefix(&self, prefix: &K) -> Iter<K, V> {
436 Iter::from_sled(self.inner.scan_prefix(serialize(prefix)))
437 }
438
439 /// Returns the first key and value in the `Tree`, or
440 /// `None` if the `Tree` is empty.
441 pub fn first(&self) -> Result<Option<(K, V)>> {
442 self.inner
443 .first()
444 .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
445 }
446
447 /// Returns the last key and value in the `Tree`, or
448 /// `None` if the `Tree` is empty.
449 pub fn last(&self) -> Result<Option<(K, V)>> {
450 self.inner
451 .last()
452 .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
453 }
454
455 /// Atomically removes the maximum item in the `Tree` instance.
456 pub fn pop_max(&self) -> Result<Option<(K, V)>> {
457 self.inner
458 .pop_max()
459 .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
460 }
461
462 /// Atomically removes the minimum item in the `Tree` instance.
463 pub fn pop_min(&self) -> Result<Option<(K, V)>> {
464 self.inner
465 .pop_min()
466 .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
467 }
468
469 /// Returns the number of elements in this tree.
470 pub fn len(&self) -> usize {
471 self.inner.len()
472 }
473
474 /// Returns `true` if the `Tree` contains no elements.
475 pub fn is_empty(&self) -> bool {
476 self.inner.is_empty()
477 }
478
479 /// Clears the `Tree`, removing all values.
480 ///
481 /// Note that this is not atomic.
482 pub fn clear(&self) -> Result<()> {
483 self.inner.clear()
484 }
485
486 /// Returns the name of the tree.
487 pub fn name(&self) -> IVec {
488 self.inner.name()
489 }
490
491 /// Returns the CRC32 of all keys and values
492 /// in this Tree.
493 ///
494 /// This is O(N) and locks the underlying tree
495 /// for the duration of the entire scan.
496 pub fn checksum(&self) -> Result<u32> {
497 self.inner.checksum()
498 }
499}
500
501/// # Examples
502///
503/// ```
504/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
505/// use sled::{Config, IVec};
506///
507/// fn concatenate_merge(
508/// _key: String, // the key being merged
509/// old_value: Option<Vec<f32>>, // the previous value, if one existed
510/// merged_bytes: Vec<f32> // the new bytes being merged in
511/// ) -> Option<Vec<f32>> { // set the new value, return None to delete
512/// let mut ret = old_value
513/// .map(|ov| ov.to_vec())
514/// .unwrap_or_else(|| vec![]);
515///
516/// ret.extend_from_slice(&merged_bytes);
517///
518/// Some(ret)
519/// }
520///
521/// let db = sled::Config::new()
522/// .temporary(true).open()?;
523///
524/// let tree = bincode_sled::Tree::<String, Vec<f32>>::open(&db, "unique_id");
525/// tree.set_merge_operator(concatenate_merge);
526///
527/// let k = String::from("some_key");
528///
529/// tree.insert(&k, &vec![0.0]);
530/// tree.merge(&k, &vec![1.0]);
531/// tree.merge(&k, &vec![2.0]);
532/// assert_eq!(tree.get(&k)?, Some(vec![0.0, 1.0, 2.0]));
533///
534/// // Replace previously merged data. The merge function will not be called.
535/// tree.insert(&k, &vec![3.0]);
536/// assert_eq!(tree.get(&k)?, Some(vec![3.0]));
537///
538/// // Merges on non-present values will cause the merge function to be called
539/// // with `old_value == None`. If the merge function returns something (which it
540/// // does, in this case) a new value will be inserted.
541/// tree.remove(&k);
542/// tree.merge(&k, &vec![4.0]);
543/// assert_eq!(tree.get(&k)?, Some(vec![4.0]));
544/// # Ok(()) }
545/// ```
546pub trait MergeOperator<K, V>: Fn(K, Option<V>, V) -> Option<V>
547where
548 K: Key,
549 V: Value,
550{
551}
552
553impl<K: Key, V: Value, F> MergeOperator<K, V> for F where F: Fn(K, Option<V>, V) -> Option<V> {}
554
555pub struct Iter<K, V> {
556 inner: sled::Iter,
557 _key: PhantomData<fn() -> K>,
558 _value: PhantomData<fn() -> V>,
559}
560
561impl<K: Key, V: Value> Iterator for Iter<K, V> {
562 type Item = Result<(K, V)>;
563
564 fn next(&mut self) -> Option<Self::Item> {
565 self.inner
566 .next()
567 .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
568 }
569
570 fn last(mut self) -> Option<Self::Item> {
571 self.inner
572 .next_back()
573 .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
574 }
575}
576
577impl<K: Key, V: Value> DoubleEndedIterator for Iter<K, V> {
578 fn next_back(&mut self) -> Option<Self::Item> {
579 self.inner
580 .next_back()
581 .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
582 }
583}
584
585impl<K: Key, V: Value> Iter<K, V> {
586 pub fn from_sled(iter: sled::Iter) -> Self {
587 Iter {
588 inner: iter,
589 _key: PhantomData,
590 _value: PhantomData,
591 }
592 }
593
594 pub fn keys(self) -> impl DoubleEndedIterator<Item = Result<K>> + Send + Sync {
595 self.map(|r| r.map(|(k, _v)| k))
596 }
597
598 /// Iterate over the values of this Tree
599 pub fn values(self) -> impl DoubleEndedIterator<Item = Result<V>> + Send + Sync {
600 self.map(|r| r.map(|(_k, v)| v))
601 }
602}
603
604#[derive(Clone, Debug)]
605pub struct Batch<K, V>
606where
607 K: Key,
608 V: Value,
609{
610 inner: sled::Batch,
611 _key: PhantomData<fn() -> K>,
612 _value: PhantomData<fn() -> V>,
613}
614
615impl<K, V> Batch<K, V>
616where
617 K: Key,
618 V: Value,
619{
620 pub fn insert(&mut self, key: &K, value: &V) {
621 self.inner.insert(serialize(key), serialize(value));
622 }
623
624 pub fn remove(&mut self, key: &K) {
625 self.inner.remove(serialize(key))
626 }
627}
628
629// Implementing Default manually to not require K and V to implement Default.
630impl<K, V> Default for Batch<K, V>
631where
632 K: Key,
633 V: Value,
634{
635 fn default() -> Self {
636 Self {
637 inner: Default::default(),
638 _key: PhantomData,
639 _value: PhantomData,
640 }
641 }
642}
643
644use pin_project::pin_project;
645#[pin_project]
646pub struct Subscriber<K, V>
647where
648 K: Key,
649 V: Value,
650{
651 #[pin]
652 inner: sled::Subscriber,
653 _key: PhantomData<fn() -> K>,
654 _value: PhantomData<fn() -> V>,
655}
656
657impl<K, V> Subscriber<K, V>
658where
659 K: Key,
660 V: Value,
661{
662 pub fn next_timeout(
663 &mut self,
664 timeout: core::time::Duration,
665 ) -> core::result::Result<Event<K, V>, std::sync::mpsc::RecvTimeoutError> {
666 self.inner
667 .next_timeout(timeout)
668 .map(|e| Event::from_sled(&e))
669 }
670
671 pub fn from_sled(subscriber: sled::Subscriber) -> Self {
672 Self {
673 inner: subscriber,
674 _key: PhantomData,
675 _value: PhantomData,
676 }
677 }
678}
679
680use core::future::Future;
681use core::pin::Pin;
682use core::task::{Context, Poll};
683impl<K: Key + Unpin, V: Value + Unpin> Future for Subscriber<K, V> {
684 type Output = Option<Event<K, V>>;
685
686 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
687 self.project()
688 .inner
689 .poll(cx)
690 .map(|opt| opt.map(|e| Event::from_sled(&e)))
691 }
692}
693
694impl<K, V> Iterator for Subscriber<K, V>
695where
696 K: Key,
697 V: Value,
698{
699 type Item = Event<K, V>;
700
701 fn next(&mut self) -> Option<Event<K, V>> {
702 self.inner.next().map(|e| Event::from_sled(&e))
703 }
704}
705
706pub enum Event<K, V>
707where
708 K: Key,
709 V: Value,
710{
711 Insert { key: K, value: V },
712 Remove { key: K },
713}
714
715impl<K, V> Event<K, V>
716where
717 K: Key,
718 V: Value,
719{
720 pub fn key(&self) -> &K {
721 match self {
722 Self::Insert { key, .. } | Self::Remove { key } => key,
723 }
724 }
725
726 pub fn from_sled(event: &sled::Event) -> Self {
727 match event {
728 sled::Event::Insert { key, value } => Self::Insert {
729 key: deserialize(key),
730 value: deserialize(value),
731 },
732 sled::Event::Remove { key } => Self::Remove {
733 key: deserialize(key),
734 },
735 }
736 }
737}
738
739/// The function which is used to deserialize all keys and values.
740pub fn deserialize<T>(bytes: &[u8]) -> T
741where
742 T: Decode,
743{
744 bincode::decode_from_slice::<T, BinConfT>(bytes, DEFAULT_CONF)
745 .expect("Decode failed, did the type encoded change?")
746 .0
747}
748
749/// The function which is used to serialize all keys and values.
750pub fn serialize<T>(value: &T) -> Vec<u8>
751where
752 T: Encode,
753{
754 bincode::encode_to_vec(value, DEFAULT_CONF).expect("Encode failed.")
755}
756
757#[cfg(test)]
758mod tests {
759 use super::*;
760
761 #[test]
762 fn test_range() {
763 let config = sled::Config::new().temporary(true);
764 let db = config.open().unwrap();
765
766 let tree: Tree<u32, u32> = Tree::open(&db, "test_tree");
767
768 tree.insert(&1, &2).unwrap();
769 tree.insert(&3, &4).unwrap();
770 tree.insert(&6, &2).unwrap();
771 tree.insert(&10, &2).unwrap();
772 tree.insert(&15, &2).unwrap();
773 tree.flush().unwrap();
774
775 let expect_results = [(6, 2), (10, 2)];
776
777 for (i, result) in tree.range(6..11).enumerate() {
778 assert_eq!(result.unwrap(), expect_results[i]);
779 }
780 }
781
782 #[test]
783 fn test_cas() {
784 let config = sled::Config::new().temporary(true);
785 let db = config.open().unwrap();
786
787 let tree: Tree<u32, u32> = Tree::open(&db, "test_tree");
788
789 let current = 2;
790 tree.insert(&1, ¤t).unwrap();
791 let expected = 3;
792 let proposed = 4;
793 let res = tree
794 .compare_and_swap(&1, Some(&expected), Some(&proposed))
795 .expect("db failure");
796
797 assert_eq!(
798 res,
799 Err(CompareAndSwapError {
800 current: Some(current),
801 proposed: Some(proposed),
802 }),
803 );
804 }
805}