typed_sled/lib.rs
1//! typed-sled - a database build on top of sled.
2//!
3//! sled is a high-performance embedded database with an API that is similar to a `BTreeMap<[u8], [u8]>`.
4//! typed-sled builds on top of sled and offers an API that is similar to a `BTreeMap<K, V>`, where
5//! K and V are user defined types which implement [Deserialize][serde::Deserialize] and [Serialize][serde::Serialize].
6//!
7//! # features
8//! Multiple features for common use cases are also available:
9//! * [search]: `SearchEngine` on top of a `Tree`.
10//! * [key_generating]: Create `Tree`s with automatically generated keys.
11//! * [convert]: Convert any `Tree` into another `Tree` with different key and value types.
12//! * [custom_serde]: Create `Tree`s with custom (de)serialization. This for example makes
13//! lazy or zero-copy (de)serialization possible.
14//!
15//! # Example
16//! ```
17//! use serde::{Deserialize, Serialize};
18//!
19//! #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
20//! struct SomeValue(u32);
21//!
22//! fn main() -> Result<(), Box<dyn std::error::Error>> {
23//! // Creating a temporary sled database.
24//! // If you want to persist the data use sled::open instead.
25//! let db = sled::Config::new().temporary(true).open().unwrap();
26//!
27//! // The id is used by sled to identify which Tree in the database (db) to open
28//! let tree = typed_sled::Tree::<String, SomeValue>::open(&db, "unique_id");
29//!
30//! tree.insert(&"some_key".to_owned(), &SomeValue(10))?;
31//!
32//! assert_eq!(tree.get(&"some_key".to_owned())?, Some(SomeValue(10)));
33//! Ok(())
34//! }
35//! ```
36//! [sled]: https://docs.rs/sled/latest/sled/
37
38pub use sled::{open, Config};
39use transaction::TransactionalTree;
40
41#[cfg(feature = "convert")]
42pub mod convert;
43#[cfg(feature = "key-generating")]
44pub mod key_generating;
45#[cfg(feature = "search")]
46pub mod search;
47pub mod transaction;
48
49pub mod custom_serde;
50
51use core::fmt;
52use core::iter::{DoubleEndedIterator, Iterator};
53use core::ops::{Bound, RangeBounds};
54use serde::Serialize;
55use sled::{
56 transaction::{ConflictableTransactionResult, TransactionResult},
57 IVec, Result,
58};
59use std::marker::PhantomData;
60
61// pub trait Bin = DeserializeOwned + Serialize + Clone + Send + Sync;
62
63/// A flash-sympathetic persistent lock-free B+ tree.
64///
65/// A `Tree` represents a single logical keyspace / namespace / bucket.
66///
67/// # Example
68/// ```
69/// use serde::{Deserialize, Serialize};
70///
71/// #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
72/// struct SomeValue(u32);
73///
74/// fn main() -> Result<(), Box<dyn std::error::Error>> {
75/// // Creating a temporary sled database.
76/// // If you want to persist the data use sled::open instead.
77/// let db = sled::Config::new().temporary(true).open().unwrap();
78///
79/// // The id is used by sled to identify which Tree in the database (db) to open.
80/// let tree = typed_sled::Tree::<String, SomeValue>::open(&db, "unique_id");
81///
82/// tree.insert(&"some_key".to_owned(), &SomeValue(10))?;
83///
84/// assert_eq!(tree.get(&"some_key".to_owned())?, Some(SomeValue(10)));
85/// Ok(())
86/// }
87/// ```
88#[derive(Debug)]
89pub struct Tree<K, V> {
90 inner: sled::Tree,
91 _key: PhantomData<fn() -> K>,
92 _value: PhantomData<fn() -> V>,
93}
94
95// Manual implementation to make ToOwned behave better.
96// With derive(Clone) to_owned() on a reference returns a reference.
97impl<K, V> Clone for Tree<K, V> {
98 fn clone(&self) -> Self {
99 Self {
100 inner: self.inner.clone(),
101 _key: PhantomData,
102 _value: PhantomData,
103 }
104 }
105}
106
107/// Trait alias for bounds required on keys and values.
108/// For now only types that implement DeserializeOwned
109/// are supported.
110// [specilization] might make
111// supporting any type that implements Deserialize<'a>
112// possible without much overhead. Otherwise the branch
113// custom_de_serialization introduces custom (de)serialization
114// for each `Tree` which might also make it possible.
115//
116// [specialization]: https://github.com/rust-lang/rust/issues/31844
117pub trait KV: serde::de::DeserializeOwned + Serialize {}
118
119impl<T: serde::de::DeserializeOwned + Serialize> KV for T {}
120
121/// Compare and swap error.
122#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
123pub struct CompareAndSwapError<V> {
124 /// The current value which caused your CAS to fail.
125 pub current: Option<V>,
126 /// Returned value that was proposed unsuccessfully.
127 pub proposed: Option<V>,
128}
129
130impl<V> fmt::Display for CompareAndSwapError<V> {
131 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132 write!(f, "Compare and swap conflict")
133 }
134}
135
136// implemented like this in the sled source
137impl<V: std::fmt::Debug> std::error::Error for CompareAndSwapError<V> {}
138
139// These Trait bounds should probably be specified on the functions themselves, but too lazy.
140impl<K, V> Tree<K, V> {
141 /// Initialize a typed tree. The id identifies the tree to be opened from the db.
142 /// # Example
143 ///
144 /// ```
145 /// use serde::{Deserialize, Serialize};
146 ///
147 /// #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
148 /// struct SomeValue(u32);
149 ///
150 /// fn main() -> Result<(), Box<dyn std::error::Error>> {
151 /// // Creating a temporary sled database.
152 /// // If you want to persist the data use sled::open instead.
153 /// let db = sled::Config::new().temporary(true).open().unwrap();
154 ///
155 /// // The id is used by sled to identify which Tree in the database (db) to open.
156 /// let tree = typed_sled::Tree::<String, SomeValue>::open(&db, "unique_id");
157 ///
158 /// tree.insert(&"some_key".to_owned(), &SomeValue(10))?;
159 ///
160 /// assert_eq!(tree.get(&"some_key".to_owned())?, Some(SomeValue(10)));
161 /// Ok(())
162 /// }
163 /// ```
164 pub fn open<T: AsRef<str>>(db: &sled::Db, id: T) -> Self {
165 Self {
166 inner: db.open_tree(id.as_ref()).unwrap(),
167 _key: PhantomData,
168 _value: PhantomData,
169 }
170 }
171
172 /// Insert a key to a new value, returning the last value if it was set.
173 pub fn insert(&self, key: &K, value: &V) -> Result<Option<V>>
174 where
175 K: KV,
176 V: KV,
177 {
178 self.inner
179 .insert(serialize(key), serialize(value))
180 .map(|opt| opt.map(|old_value| deserialize(&old_value)))
181 }
182
183 /// Perform a multi-key serializable transaction.
184 pub fn transaction<F, A, E>(&self, f: F) -> TransactionResult<A, E>
185 where
186 F: Fn(&TransactionalTree<K, V>) -> ConflictableTransactionResult<A, E>,
187 {
188 self.inner.transaction(|sled_transactional_tree| {
189 f(&TransactionalTree::new(sled_transactional_tree))
190 })
191 }
192
193 /// Create a new batched update that can be atomically applied.
194 ///
195 /// It is possible to apply a Batch in a transaction as well, which is the way you can apply a Batch to multiple Trees atomically.
196 pub fn apply_batch(&self, batch: Batch<K, V>) -> Result<()> {
197 self.inner.apply_batch(batch.inner)
198 }
199
200 /// Retrieve a value from the Tree if it exists.
201 pub fn get(&self, key: &K) -> Result<Option<V>>
202 where
203 K: KV,
204 V: KV,
205 {
206 self.inner
207 .get(serialize(key))
208 .map(|opt| opt.map(|v| deserialize(&v)))
209 }
210
211 /// Retrieve a value from the Tree if it exists. The key must be in serialized form.
212 pub fn get_from_raw<B: AsRef<[u8]>>(&self, key_bytes: B) -> Result<Option<V>>
213 where
214 K: KV,
215 V: KV,
216 {
217 self.inner
218 .get(key_bytes.as_ref())
219 .map(|opt| opt.map(|v| deserialize(&v)))
220 }
221
222 /// Deserialize a key and retrieve it's value from the Tree if it exists.
223 /// The deserialization is only done if a value was retrieved successfully.
224 pub fn get_kv_from_raw<B: AsRef<[u8]>>(&self, key_bytes: B) -> Result<Option<(K, V)>>
225 where
226 K: KV,
227 V: KV,
228 {
229 self.inner
230 .get(key_bytes.as_ref())
231 .map(|opt| opt.map(|v| (deserialize(key_bytes.as_ref()), deserialize(&v))))
232 }
233
234 /// Delete a value, returning the old value if it existed.
235 pub fn remove(&self, key: &K) -> Result<Option<V>>
236 where
237 K: KV,
238 V: KV,
239 {
240 self.inner
241 .remove(serialize(key))
242 .map(|opt| opt.map(|v| deserialize(&v)))
243 }
244
245 /// Compare and swap. Capable of unique creation, conditional modification, or deletion. If old is None, this will only set the value if it doesn't exist yet. If new is None, will delete the value if old is correct. If both old and new are Some, will modify the value if old is correct.
246 ///
247 /// It returns Ok(Ok(())) if operation finishes successfully.
248 ///
249 /// If it fails it returns: - Ok(Err(CompareAndSwapError(current, proposed))) if operation failed to setup a new value. CompareAndSwapError contains current and proposed values. - Err(Error::Unsupported) if the database is opened in read-only mode.
250 pub fn compare_and_swap(
251 &self,
252 key: &K,
253 old: Option<&V>,
254 new: Option<&V>,
255 ) -> Result<core::result::Result<(), CompareAndSwapError<V>>>
256 where
257 K: KV,
258 V: KV,
259 {
260 self.inner
261 .compare_and_swap(
262 serialize(key),
263 old.map(|old| serialize(old)),
264 new.map(|new| serialize(new)),
265 )
266 .map(|cas_res| {
267 cas_res.map_err(|cas_err| CompareAndSwapError {
268 current: cas_err.current.as_ref().map(|b| deserialize(b)),
269 proposed: cas_err.proposed.as_ref().map(|b| deserialize(b)),
270 })
271 })
272 }
273
274 /// Fetch the value, apply a function to it and return the result.
275 // not sure if implemented correctly (different trait bound for F)
276 pub fn update_and_fetch<F>(&self, key: &K, mut f: F) -> Result<Option<V>>
277 where
278 K: KV,
279 V: KV,
280 F: FnMut(Option<V>) -> Option<V>,
281 {
282 self.inner
283 .update_and_fetch(serialize(&key), |opt_value| {
284 f(opt_value.map(|v| deserialize(v))).map(|v| serialize(&v))
285 })
286 .map(|res| res.map(|v| deserialize(&v)))
287 }
288
289 /// Fetch the value, apply a function to it and return the previous value.
290 // not sure if implemented correctly (different trait bound for F)
291 pub fn fetch_and_update<F>(&self, key: &K, mut f: F) -> Result<Option<V>>
292 where
293 K: KV,
294 V: KV,
295 F: FnMut(Option<V>) -> Option<V>,
296 {
297 self.inner
298 .fetch_and_update(serialize(key), |opt_value| {
299 f(opt_value.map(|v| deserialize(v))).map(|v| serialize(&v))
300 })
301 .map(|res| res.map(|v| deserialize(&v)))
302 }
303
304 /// Subscribe to `Event`s that happen to keys that have
305 /// the specified prefix. Events for particular keys are
306 /// guaranteed to be witnessed in the same order by all
307 /// threads, but threads may witness different interleavings
308 /// of `Event`s across different keys. If subscribers don't
309 /// keep up with new writes, they will cause new writes
310 /// to block. There is a buffer of 1024 items per
311 /// `Subscriber`. This can be used to build reactive
312 /// and replicated systems.
313 pub fn watch_prefix(&self, prefix: &K) -> Subscriber<K, V>
314 where
315 K: KV,
316 {
317 Subscriber::from_sled(self.inner.watch_prefix(serialize(prefix)))
318 }
319
320 /// Subscribe to all`Event`s. Events for particular keys are
321 /// guaranteed to be witnessed in the same order by all
322 /// threads, but threads may witness different interleavings
323 /// of `Event`s across different keys. If subscribers don't
324 /// keep up with new writes, they will cause new writes
325 /// to block. There is a buffer of 1024 items per
326 /// `Subscriber`. This can be used to build reactive
327 /// and replicated systems.
328 pub fn watch_all(&self) -> Subscriber<K, V>
329 where
330 K: KV,
331 {
332 Subscriber::from_sled(self.inner.watch_prefix(vec![]))
333 }
334
335 /// Synchronously flushes all dirty IO buffers and calls
336 /// fsync. If this succeeds, it is guaranteed that all
337 /// previous writes will be recovered if the system
338 /// crashes. Returns the number of bytes flushed during
339 /// this call.
340 ///
341 /// Flushing can take quite a lot of time, and you should
342 /// measure the performance impact of using it on
343 /// realistic sustained workloads running on realistic
344 /// hardware.
345 pub fn flush(&self) -> Result<usize> {
346 self.inner.flush()
347 }
348
349 /// Asynchronously flushes all dirty IO buffers
350 /// and calls fsync. If this succeeds, it is
351 /// guaranteed that all previous writes will
352 /// be recovered if the system crashes. Returns
353 /// the number of bytes flushed during this call.
354 ///
355 /// Flushing can take quite a lot of time, and you
356 /// should measure the performance impact of
357 /// using it on realistic sustained workloads
358 /// running on realistic hardware.
359 pub async fn flush_async(&self) -> Result<usize> {
360 self.inner.flush_async().await
361 }
362
363 /// Returns `true` if the `Tree` contains a value for
364 /// the specified key.
365 pub fn contains_key(&self, key: &K) -> Result<bool>
366 where
367 K: KV,
368 {
369 self.inner.contains_key(serialize(key))
370 }
371
372 /// Retrieve the key and value before the provided key,
373 /// if one exists.
374 pub fn get_lt(&self, key: &K) -> Result<Option<(K, V)>>
375 where
376 K: KV,
377 V: KV,
378 {
379 self.inner
380 .get_lt(serialize(key))
381 .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
382 }
383
384 /// Retrieve the next key and value from the `Tree` after the
385 /// provided key.
386 pub fn get_gt(&self, key: &K) -> Result<Option<(K, V)>>
387 where
388 K: KV,
389 V: KV,
390 {
391 self.inner
392 .get_gt(serialize(key))
393 .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
394 }
395
396 /// Merge state directly into a given key's value using the
397 /// configured merge operator. This allows state to be written
398 /// into a value directly, without any read-modify-write steps.
399 /// Merge operators can be used to implement arbitrary data
400 /// structures.
401 ///
402 /// Calling `merge` will return an `Unsupported` error if it
403 /// is called without first setting a merge operator function.
404 ///
405 /// Merge operators are shared by all instances of a particular
406 /// `Tree`. Different merge operators may be set on different
407 /// `Tree`s.
408 pub fn merge(&self, key: &K, value: &V) -> Result<Option<V>>
409 where
410 K: KV,
411 V: KV,
412 {
413 self.inner
414 .merge(serialize(key), serialize(value))
415 .map(|res| res.map(|old_v| deserialize(&old_v)))
416 }
417
418 // TODO: implement using own MergeOperator trait
419 /// Sets a merge operator for use with the `merge` function.
420 ///
421 /// Merge state directly into a given key's value using the
422 /// configured merge operator. This allows state to be written
423 /// into a value directly, without any read-modify-write steps.
424 /// Merge operators can be used to implement arbitrary data
425 /// structures.
426 ///
427 /// # Panics
428 ///
429 /// Calling `merge` will panic if no merge operator has been
430 /// configured.
431 pub fn set_merge_operator(&self, merge_operator: impl MergeOperator<K, V> + 'static)
432 where
433 K: KV,
434 V: KV,
435 {
436 self.inner
437 .set_merge_operator(move |key: &[u8], old_v: Option<&[u8]>, value: &[u8]| {
438 let opt_v = merge_operator(
439 deserialize(key),
440 old_v.map(|v| deserialize(v)),
441 deserialize(value),
442 );
443 opt_v.map(|v| serialize(&v))
444 });
445 }
446
447 /// Create a double-ended iterator over the tuples of keys and
448 /// values in this tree.
449 pub fn iter(&self) -> Iter<K, V> {
450 Iter::from_sled(self.inner.iter())
451 }
452
453 /// Create a double-ended iterator over tuples of keys and values,
454 /// where the keys fall within the specified range.
455 pub fn range<R: RangeBounds<K>>(&self, range: R) -> Iter<K, V>
456 where
457 K: KV + std::fmt::Debug,
458 {
459 match (range.start_bound(), range.end_bound()) {
460 (Bound::Unbounded, Bound::Unbounded) => {
461 Iter::from_sled(self.inner.range::<&[u8], _>(..))
462 }
463 (Bound::Unbounded, Bound::Excluded(b)) => {
464 Iter::from_sled(self.inner.range(..serialize(b)))
465 }
466 (Bound::Unbounded, Bound::Included(b)) => {
467 Iter::from_sled(self.inner.range(..=serialize(b)))
468 }
469 // FIX: This is not excluding lower bound.
470 (Bound::Excluded(b), Bound::Unbounded) => {
471 Iter::from_sled(self.inner.range(serialize(b)..))
472 }
473 (Bound::Excluded(b), Bound::Excluded(bb)) => {
474 Iter::from_sled(self.inner.range(serialize(b)..serialize(bb)))
475 }
476 (Bound::Excluded(b), Bound::Included(bb)) => {
477 Iter::from_sled(self.inner.range(serialize(b)..=serialize(bb)))
478 }
479 (Bound::Included(b), Bound::Unbounded) => {
480 Iter::from_sled(self.inner.range(serialize(b)..))
481 }
482 (Bound::Included(b), Bound::Excluded(bb)) => {
483 Iter::from_sled(self.inner.range(serialize(b)..serialize(bb)))
484 }
485 (Bound::Included(b), Bound::Included(bb)) => {
486 Iter::from_sled(self.inner.range(serialize(b)..=serialize(bb)))
487 }
488 }
489 }
490
491 /// Create an iterator over tuples of keys and values,
492 /// where the all the keys starts with the given prefix.
493 pub fn scan_prefix(&self, prefix: &K) -> Iter<K, V>
494 where
495 K: KV,
496 {
497 Iter::from_sled(self.inner.scan_prefix(serialize(prefix)))
498 }
499
500 /// Returns the first key and value in the `Tree`, or
501 /// `None` if the `Tree` is empty.
502 pub fn first(&self) -> Result<Option<(K, V)>>
503 where
504 K: KV,
505 V: KV,
506 {
507 self.inner
508 .first()
509 .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
510 }
511
512 /// Returns the last key and value in the `Tree`, or
513 /// `None` if the `Tree` is empty.
514 pub fn last(&self) -> Result<Option<(K, V)>>
515 where
516 K: KV,
517 V: KV,
518 {
519 self.inner
520 .last()
521 .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
522 }
523
524 /// Atomically removes the maximum item in the `Tree` instance.
525 pub fn pop_max(&self) -> Result<Option<(K, V)>>
526 where
527 K: KV,
528 V: KV,
529 {
530 self.inner
531 .pop_max()
532 .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
533 }
534
535 /// Atomically removes the minimum item in the `Tree` instance.
536 pub fn pop_min(&self) -> Result<Option<(K, V)>>
537 where
538 K: KV,
539 V: KV,
540 {
541 self.inner
542 .pop_min()
543 .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
544 }
545
546 /// Returns the number of elements in this tree.
547 pub fn len(&self) -> usize {
548 self.inner.len()
549 }
550
551 /// Returns `true` if the `Tree` contains no elements.
552 pub fn is_empty(&self) -> bool {
553 self.inner.is_empty()
554 }
555
556 /// Clears the `Tree`, removing all values.
557 ///
558 /// Note that this is not atomic.
559 pub fn clear(&self) -> Result<()> {
560 self.inner.clear()
561 }
562
563 /// Returns the name of the tree.
564 pub fn name(&self) -> IVec {
565 self.inner.name()
566 }
567
568 /// Returns the CRC32 of all keys and values
569 /// in this Tree.
570 ///
571 /// This is O(N) and locks the underlying tree
572 /// for the duration of the entire scan.
573 pub fn checksum(&self) -> Result<u32> {
574 self.inner.checksum()
575 }
576}
577
578/// # Examples
579///
580/// ```
581/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
582/// use sled::{Config, IVec};
583///
584/// fn concatenate_merge(
585/// _key: String, // the key being merged
586/// old_value: Option<Vec<f32>>, // the previous value, if one existed
587/// merged_bytes: Vec<f32> // the new bytes being merged in
588/// ) -> Option<Vec<f32>> { // set the new value, return None to delete
589/// let mut ret = old_value
590/// .map(|ov| ov.to_vec())
591/// .unwrap_or_else(|| vec![]);
592///
593/// ret.extend_from_slice(&merged_bytes);
594///
595/// Some(ret)
596/// }
597///
598/// let db = sled::Config::new()
599/// .temporary(true).open()?;
600///
601/// let tree = typed_sled::Tree::<String, Vec<f32>>::open(&db, "unique_id");
602/// tree.set_merge_operator(concatenate_merge);
603///
604/// let k = String::from("some_key");
605///
606/// tree.insert(&k, &vec![0.0]);
607/// tree.merge(&k, &vec![1.0]);
608/// tree.merge(&k, &vec![2.0]);
609/// assert_eq!(tree.get(&k)?, Some(vec![0.0, 1.0, 2.0]));
610///
611/// // Replace previously merged data. The merge function will not be called.
612/// tree.insert(&k, &vec![3.0]);
613/// assert_eq!(tree.get(&k)?, Some(vec![3.0]));
614///
615/// // Merges on non-present values will cause the merge function to be called
616/// // with `old_value == None`. If the merge function returns something (which it
617/// // does, in this case) a new value will be inserted.
618/// tree.remove(&k);
619/// tree.merge(&k, &vec![4.0]);
620/// assert_eq!(tree.get(&k)?, Some(vec![4.0]));
621/// # Ok(()) }
622/// ```
623pub trait MergeOperator<K, V>: Fn(K, Option<V>, V) -> Option<V> {}
624
625impl<K, V, F> MergeOperator<K, V> for F where F: Fn(K, Option<V>, V) -> Option<V> {}
626
627pub struct Iter<K, V> {
628 inner: sled::Iter,
629 _key: PhantomData<fn() -> K>,
630 _value: PhantomData<fn() -> V>,
631}
632
633impl<K: KV, V: KV> Iterator for Iter<K, V> {
634 type Item = Result<(K, V)>;
635
636 fn next(&mut self) -> Option<Self::Item> {
637 self.inner
638 .next()
639 .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
640 }
641
642 fn last(mut self) -> Option<Self::Item> {
643 self.inner
644 .next_back()
645 .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
646 }
647}
648
649impl<K: KV, V: KV> DoubleEndedIterator for Iter<K, V> {
650 fn next_back(&mut self) -> Option<Self::Item> {
651 self.inner
652 .next_back()
653 .map(|res| res.map(|(k, v)| (deserialize(&k), deserialize(&v))))
654 }
655}
656
657impl<K, V> Iter<K, V> {
658 pub fn from_sled(iter: sled::Iter) -> Self {
659 Iter {
660 inner: iter,
661 _key: PhantomData,
662 _value: PhantomData,
663 }
664 }
665
666 pub fn keys(self) -> impl DoubleEndedIterator<Item = Result<K>> + Send + Sync
667 where
668 K: KV + Send + Sync,
669 V: KV + Send + Sync,
670 {
671 self.map(|r| r.map(|(k, _v)| k))
672 }
673
674 /// Iterate over the values of this Tree
675 pub fn values(self) -> impl DoubleEndedIterator<Item = Result<V>> + Send + Sync
676 where
677 K: KV + Send + Sync,
678 V: KV + Send + Sync,
679 {
680 self.map(|r| r.map(|(_k, v)| v))
681 }
682}
683
684#[derive(Clone, Debug)]
685pub struct Batch<K, V> {
686 inner: sled::Batch,
687 _key: PhantomData<fn() -> K>,
688 _value: PhantomData<fn() -> V>,
689}
690
691impl<K, V> Batch<K, V> {
692 pub fn insert(&mut self, key: &K, value: &V)
693 where
694 K: KV,
695 V: KV,
696 {
697 self.inner.insert(serialize(key), serialize(value));
698 }
699
700 pub fn remove(&mut self, key: &K)
701 where
702 K: KV,
703 {
704 self.inner.remove(serialize(key))
705 }
706}
707
708// Implementing Default manually to not require K and V to implement Default.
709impl<K, V> Default for Batch<K, V> {
710 fn default() -> Self {
711 Self {
712 inner: Default::default(),
713 _key: PhantomData,
714 _value: PhantomData,
715 }
716 }
717}
718
719use pin_project::pin_project;
720#[pin_project]
721pub struct Subscriber<K, V> {
722 #[pin]
723 inner: sled::Subscriber,
724 _key: PhantomData<fn() -> K>,
725 _value: PhantomData<fn() -> V>,
726}
727
728impl<K, V> Subscriber<K, V> {
729 pub fn next_timeout(
730 &mut self,
731 timeout: core::time::Duration,
732 ) -> core::result::Result<Event<K, V>, std::sync::mpsc::RecvTimeoutError>
733 where
734 K: KV,
735 V: KV,
736 {
737 self.inner
738 .next_timeout(timeout)
739 .map(|e| Event::from_sled(&e))
740 }
741
742 pub fn from_sled(subscriber: sled::Subscriber) -> Self {
743 Self {
744 inner: subscriber,
745 _key: PhantomData,
746 _value: PhantomData,
747 }
748 }
749}
750
751use core::future::Future;
752use core::pin::Pin;
753use core::task::{Context, Poll};
754impl<K: KV + Unpin, V: KV + Unpin> Future for Subscriber<K, V> {
755 type Output = Option<Event<K, V>>;
756
757 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
758 self.project()
759 .inner
760 .poll(cx)
761 .map(|opt| opt.map(|e| Event::from_sled(&e)))
762 }
763}
764
765impl<K: KV, V: KV> Iterator for Subscriber<K, V> {
766 type Item = Event<K, V>;
767
768 fn next(&mut self) -> Option<Event<K, V>> {
769 self.inner.next().map(|e| Event::from_sled(&e))
770 }
771}
772
773pub enum Event<K, V> {
774 Insert { key: K, value: V },
775 Remove { key: K },
776}
777
778impl<K, V> Event<K, V> {
779 pub fn key(&self) -> &K
780 where
781 K: KV,
782 {
783 match self {
784 Self::Insert { key, .. } | Self::Remove { key } => key,
785 }
786 }
787
788 pub fn from_sled(event: &sled::Event) -> Self
789 where
790 K: KV,
791 V: KV,
792 {
793 match event {
794 sled::Event::Insert { key, value } => Self::Insert {
795 key: deserialize(key),
796 value: deserialize(value),
797 },
798 sled::Event::Remove { key } => Self::Remove {
799 key: deserialize(key),
800 },
801 }
802 }
803}
804
805/// The function which is used to deserialize all keys and values.
806pub fn deserialize<'a, T>(bytes: &'a [u8]) -> T
807where
808 T: serde::de::Deserialize<'a>,
809{
810 bincode::deserialize(bytes).expect("deserialization failed, did the type serialized change?")
811}
812
813/// The function which is used to serialize all keys and values.
814pub fn serialize<T>(value: &T) -> Vec<u8>
815where
816 T: serde::Serialize,
817{
818 bincode::serialize(value).expect("serialization failed, did the type serialized change?")
819}
820
821#[cfg(test)]
822mod tests {
823 use super::*;
824
825 #[test]
826 fn test_range() {
827 let config = sled::Config::new().temporary(true);
828 let db = config.open().unwrap();
829
830 let tree: Tree<u32, u32> = Tree::open(&db, "test_tree");
831
832 tree.insert(&1, &2).unwrap();
833 tree.insert(&3, &4).unwrap();
834 tree.insert(&6, &2).unwrap();
835 tree.insert(&10, &2).unwrap();
836 tree.insert(&15, &2).unwrap();
837 tree.flush().unwrap();
838
839 let expect_results = [(6, 2), (10, 2)];
840
841 for (i, result) in tree.range(6..11).enumerate() {
842 assert_eq!(result.unwrap(), expect_results[i]);
843 }
844 }
845
846 #[test]
847 fn test_cas() {
848 let config = sled::Config::new().temporary(true);
849 let db = config.open().unwrap();
850
851 let tree: Tree<u32, u32> = Tree::open(&db, "test_tree");
852
853 let current = 2;
854 tree.insert(&1, ¤t).unwrap();
855 let expected = 3;
856 let proposed = 4;
857 let res = tree
858 .compare_and_swap(&1, Some(&expected), Some(&proposed))
859 .expect("db failure");
860
861 assert_eq!(
862 res,
863 Err(CompareAndSwapError {
864 current: Some(current),
865 proposed: Some(proposed),
866 }),
867 );
868 }
869}