sled_extensions/
structured_tree.rs

1use sled::IVec;
2use std::{marker::PhantomData, ops::RangeBounds};
3
4use crate::{
5    encoding::Encoding,
6    error::{coerce, Result},
7};
8
9/// Compare and swap error.
10pub struct CompareAndSwapError<V> {
11    /// Current value.
12    pub current: Option<V>,
13    /// New proposed value.
14    pub proposed: Option<V>,
15}
16
17#[derive(Clone)]
18/// A flash-sympathetic persistent lock-free B+ tree
19pub struct StructuredTree<V, E>(sled::Tree, String, PhantomData<V>, PhantomData<E>);
20
21/// An iterator over keys and values in a `Tree`.
22pub struct StructuredIter<V, E>(sled::Iter, PhantomData<V>, PhantomData<E>);
23
24#[derive(Clone, Debug, Default)]
25/// A batch of updates that will be applied atomically to the Tree.
26pub struct StructuredBatch<V, E>(sled::Batch, PhantomData<V>, PhantomData<E>);
27
28#[derive(Clone)]
29/// A transaction that will be applied atomically to the Tree.
30pub struct StructuredTransactionalTree<'a, V, E>(
31    &'a sled::TransactionalTree,
32    PhantomData<V>,
33    PhantomData<E>,
34);
35
36impl<V, E> StructuredTree<V, E>
37where
38    E: Encoding<V> + 'static,
39{
40    pub(crate) fn new(db: &sled::Db, name: &str) -> Result<Self> {
41        Ok(StructuredTree(
42            db.open_tree(name)?,
43            name.to_owned(),
44            PhantomData,
45            PhantomData,
46        ))
47    }
48
49    /// Clone for structures where V and E aren't Clone
50    pub fn cloned(&self) -> Self {
51        StructuredTree(self.0.clone(), self.1.clone(), PhantomData, PhantomData)
52    }
53
54    /// Perform a multi-key serializable transaction.
55    ///
56    /// Transactions also work on tuples of Trees, preserving serializable ACID semantics! In this
57    /// example, we treat two trees like a work queue, atomically apply updates to data and move
58    /// them from the unprocessed Tree to the processed Tree.
59    pub fn transaction<F, R>(&self, f: F) -> sled::TransactionResult<Result<R>>
60    where
61        F: Fn(StructuredTransactionalTree<V, E>) -> sled::ConflictableTransactionResult<Result<R>>,
62    {
63        self.0.transaction(move |trans_tree| {
64            (f)(StructuredTransactionalTree(
65                trans_tree,
66                PhantomData,
67                PhantomData,
68            ))
69        })
70    }
71
72    /// Create a new batched update that can be atomically applied.
73    ///
74    /// It is possible to apply a Batch in a transaction as well, which is the way you can apply a Batch to multiple Trees atomically.
75    pub fn apply_batch(&self, batch: StructuredBatch<V, E>) -> Result<()> {
76        Ok(self.0.apply_batch(batch.0)?)
77    }
78
79    /// Compare and swap. Capable of unique creation, conditional modification, or deletion. If
80    /// old is None, this will only set the value if it doesn't exist yet. If new is None, will
81    /// delete the value if old is correct. If both old and new are Some, will modify the value
82    /// if old is correct.
83    ///
84    /// It returns Ok(Ok(())) if operation finishes successfully.
85    ///
86    /// If it fails it returns: - Ok(Err(CompareAndSwapError(current, proposed))) if operation
87    /// failed to setup a new value. CompareAndSwapError contains current and proposed values.
88    /// - Err(Error::Unsupported) if the database is opened in read-only mode.
89    pub fn compare_and_swap<K>(
90        &self,
91        key: K,
92        old: Option<V>,
93        new: Option<V>,
94    ) -> Result<std::result::Result<(), CompareAndSwapError<V>>>
95    where
96        K: AsRef<[u8]>,
97    {
98        let ov = coerce(old.map(|value| E::encode(&value)))?;
99        let nv = coerce(new.map(|value| E::encode(&value)))?;
100
101        match self.0.compare_and_swap(key, ov, nv)? {
102            Ok(()) => Ok(Ok(())),
103            Err(sled::CompareAndSwapError { current, proposed }) => {
104                let current = if let Some(current) = current {
105                    Some(E::decode(&current)?)
106                } else {
107                    None
108                };
109                let proposed = if let Some(proposed) = proposed {
110                    Some(E::decode(&proposed)?)
111                } else {
112                    None
113                };
114
115                Ok(Err(CompareAndSwapError { current, proposed }))
116            }
117        }
118    }
119
120    /// Retrieve a value from the Tree if it exists.
121    pub fn get<K>(&self, key: K) -> Result<Option<V>>
122    where
123        K: AsRef<[u8]>,
124    {
125        let opt = self.0.get(key)?;
126
127        if let Some(v) = opt {
128            Ok(Some(E::decode(&v)?))
129        } else {
130            Ok(None)
131        }
132    }
133
134    /// Insert a key to a new value, returning the last value if it was set.
135    pub fn insert<K>(&self, key: K, value: V) -> Result<Option<V>>
136    where
137        IVec: From<K>,
138        K: AsRef<[u8]>,
139    {
140        let v = E::encode(&value)?;
141
142        let opt = self.0.insert::<K, Vec<u8>>(key, v)?;
143
144        if let Some(v) = opt {
145            Ok(Some(E::decode(&v)?))
146        } else {
147            Ok(None)
148        }
149    }
150
151    /// Delete a value, returning the old value if it existed.
152    pub fn remove<K>(&self, key: K) -> Result<Option<V>>
153    where
154        K: AsRef<[u8]>,
155    {
156        let opt = self.0.remove(key)?;
157
158        if let Some(v) = opt {
159            Ok(Some(E::decode(&v)?))
160        } else {
161            Ok(None)
162        }
163    }
164
165    /// Fetch the value, apply a function to it and return the result.
166    ///
167    /// ### Note
168    /// This may call the function multiple times if the value has been changed from other threads
169    /// in the meantime.
170    pub fn update_and_fetch<K>(
171        &self,
172        key: K,
173        f: impl Fn(Option<V>) -> Option<V>,
174    ) -> Result<Option<V>>
175    where
176        K: AsRef<[u8]>,
177    {
178        let opt = self.0.update_and_fetch(key, |opt| {
179            let o = opt.and_then(|v| E::decode(&v).ok());
180
181            (f)(o).and_then(|value| E::encode(&value).ok())
182        })?;
183
184        if let Some(v) = opt {
185            Ok(Some(E::decode(&v)?))
186        } else {
187            Ok(None)
188        }
189    }
190
191    /// Fetch the value, apply a function to it and return the previous value.
192    ///
193    /// ### Note
194    /// This may call the function multiple times if the value has been changed from other threads in the meantime.
195    pub fn fetch_and_update<K>(
196        &self,
197        key: K,
198        f: impl Fn(Option<V>) -> Option<V>,
199    ) -> Result<Option<V>>
200    where
201        K: AsRef<[u8]>,
202    {
203        let opt = self.0.fetch_and_update(key, |opt| {
204            let o = opt.and_then(|v| E::decode(&v).ok());
205
206            (f)(o).and_then(|value| E::encode(&value).ok())
207        })?;
208
209        if let Some(v) = opt {
210            Ok(Some(E::decode(&v)?))
211        } else {
212            Ok(None)
213        }
214    }
215
216    /// Subscribe to `Event`s that happen to keys that have the specified prefix. Events for
217    /// particular keys are guaranteed to be witnessed in the same order by all threads, but
218    /// threads may witness different interleavings of `Event`s across different keys. If
219    /// subscribers don't keep up with new writes, they will cause new writes to block. There is a
220    /// buffer of 1024 items per `Subscriber`. This can be used to build reactive and replicated
221    /// systems.
222    pub fn watch_prefix(&self, prefix: Vec<u8>) -> sled::Subscriber {
223        self.0.watch_prefix(prefix)
224    }
225
226    /// Synchronously flushes all dirty IO buffers and calls fsync. If this succeeds, it is guaranteed that all previous writes will be recovered if the system crashes. Returns the number of bytes flushed during this call.
227    ///
228    /// Flushing can take quite a lot of time, and you should measure the performance impact of using it on realistic sustained workloads running on realistic hardware.
229    pub fn flush(&self) -> Result<()> {
230        self.0.flush()?;
231        Ok(())
232    }
233
234    /// Returns `true` if the `Tree` contains a value for the specified key.
235    pub fn contains_key<K>(&self, key: K) -> Result<bool>
236    where
237        K: AsRef<[u8]>,
238    {
239        Ok(self.0.contains_key(key)?)
240    }
241
242    /// Create a double-ended iterator over the tuples of keys and values in this tree.
243    pub fn iter(&self) -> StructuredIter<V, E> {
244        StructuredIter::new(self.0.iter())
245    }
246
247    /// Create a double-ended iterator over tuples of keys and values, where the keys fall
248    /// within the specified range.
249    pub fn range<K, R>(&self, range: R) -> StructuredIter<V, E>
250    where
251        K: AsRef<[u8]>,
252        R: RangeBounds<K>,
253    {
254        StructuredIter::new(self.0.range(range))
255    }
256
257    /// Retrieve the key and value before the provided key, if one exists.
258    pub fn get_lt<K>(&self, key: K) -> Result<Option<(IVec, V)>>
259    where
260        K: AsRef<[u8]>,
261    {
262        match self.0.get_lt(key)? {
263            Some((k, v)) => {
264                let value = E::decode(&v)?;
265                Ok(Some((k, value)))
266            }
267            None => Ok(None),
268        }
269    }
270
271    /// Retrieve the next key and value from the Tree after the provided key.
272    ///
273    /// ### Note
274    /// The order follows the Ord implementation for Vec<u8>:
275    ///
276    /// `[] < [0] < [255] < [255, 0] < [255, 255] ...`
277    ///
278    /// To retain the ordering of numerical types use big endian reprensentation
279    pub fn get_gt<K>(&self, key: K) -> Result<Option<(IVec, V)>>
280    where
281        K: AsRef<[u8]>,
282    {
283        match self.0.get_gt(key)? {
284            Some((k, v)) => {
285                let value = E::decode(&v)?;
286                Ok(Some((k, value)))
287            }
288            None => Ok(None),
289        }
290    }
291
292    /// Create an iterator over tuples of keys and values, where the all the keys starts with the
293    /// given prefix.
294    pub fn scan_prefix<P>(&self, prefix: P) -> StructuredIter<V, E>
295    where
296        P: AsRef<[u8]>,
297    {
298        StructuredIter::new(self.0.scan_prefix(prefix))
299    }
300
301    /// Atomically removes the maximum item in the `Tree` instance.
302    pub fn pop_max(&self) -> Result<Option<(IVec, V)>> {
303        match self.0.pop_max()? {
304            Some((k, v)) => {
305                let value = E::decode(&v)?;
306                Ok(Some((k, value)))
307            }
308            None => Ok(None),
309        }
310    }
311
312    /// Atomically removes the minimum item in the `Tree` instance.
313    pub fn pop_min(&self) -> Result<Option<(IVec, V)>> {
314        match self.0.pop_min()? {
315            Some((k, v)) => {
316                let value = E::decode(&v)?;
317                Ok(Some((k, value)))
318            }
319            None => Ok(None),
320        }
321    }
322
323    /// Returns the number of elements in this tree.
324    ///
325    /// Beware: performs a full O(n) scan under the hood.
326    pub fn len(&self) -> usize {
327        self.0.len()
328    }
329
330    /// Returns `true` if the `Tree` contains no elements.
331    pub fn is_empty(&self) -> bool {
332        self.0.is_empty()
333    }
334
335    /// Clears the `Tree`, removing all values.
336    ///
337    /// Note that this is not atomic.
338    pub fn clear(&self) -> Result<()> {
339        Ok(self.0.clear()?)
340    }
341
342    /// Returns the name of the tree.
343    pub fn name(&self) -> String {
344        self.1.clone()
345    }
346}
347
348impl<V, E> StructuredIter<V, E>
349where
350    E: Encoding<V> + 'static,
351{
352    fn new(iter: sled::Iter) -> Self {
353        StructuredIter(iter, PhantomData, PhantomData)
354    }
355
356    /// Iterate over the keys of this Tree
357    pub fn keys(self) -> impl DoubleEndedIterator<Item = Result<IVec>> {
358        self.map(|res| res.map(|(key, _)| key))
359    }
360
361    /// Iterate over the values of this Tree
362    pub fn values(self) -> impl DoubleEndedIterator<Item = Result<V>> {
363        self.map(|res| res.map(|(_, v)| v))
364    }
365}
366
367impl<V, E> StructuredBatch<V, E>
368where
369    E: Encoding<V>,
370{
371    /// Set a key to a new value
372    pub fn insert<K>(&mut self, key: K, value: V) -> Result<()>
373    where
374        IVec: From<K>,
375    {
376        let v = E::encode(&value)?;
377        self.0.insert::<_, Vec<u8>>(key, v);
378        Ok(())
379    }
380
381    /// Remove a key
382    pub fn remove<K>(&mut self, key: K)
383    where
384        IVec: From<K>,
385    {
386        self.0.remove(key)
387    }
388}
389
390impl<'a, V, E> StructuredTransactionalTree<'a, V, E>
391where
392    E: Encoding<V>,
393{
394    /// Set a key to a new value
395    pub fn insert<K>(
396        &self,
397        key: K,
398        value: V,
399    ) -> sled::ConflictableTransactionResult<Result<Option<V>>>
400    where
401        IVec: From<K>,
402        K: AsRef<[u8]>,
403    {
404        let v = match E::encode(&value) {
405            Ok(v) => v,
406            Err(e) => return Ok(Err(e)),
407        };
408
409        let opt = self.0.insert::<_, Vec<_>>(key, v)?;
410
411        if let Some(v) = opt {
412            match E::decode(&v) {
413                Ok(i) => return Ok(Ok(Some(i))),
414                Err(e) => return Ok(Err(e)),
415            }
416        } else {
417            Ok(Ok(None))
418        }
419    }
420
421    /// Remove a key
422    pub fn remove<K>(&self, key: K) -> sled::ConflictableTransactionResult<Result<Option<V>>>
423    where
424        IVec: From<K>,
425        K: AsRef<[u8]>,
426    {
427        let opt = self.0.remove(key)?;
428
429        if let Some(v) = opt {
430            match E::decode(&v) {
431                Ok(i) => return Ok(Ok(Some(i))),
432                Err(e) => return Ok(Err(e)),
433            }
434        } else {
435            Ok(Ok(None))
436        }
437    }
438
439    /// Get the value associated with a key
440    pub fn get<K>(&self, key: K) -> sled::ConflictableTransactionResult<Result<Option<V>>>
441    where
442        K: AsRef<[u8]>,
443    {
444        let opt = self.0.get(key)?;
445
446        if let Some(v) = opt {
447            match E::decode(&v) {
448                Ok(i) => return Ok(Ok(Some(i))),
449                Err(e) => return Ok(Err(e)),
450            }
451        } else {
452            Ok(Ok(None))
453        }
454    }
455
456    /// Atomically apply multiple inserts and removals.
457    pub fn apply_batch(
458        &self,
459        batch: StructuredBatch<V, E>,
460    ) -> sled::ConflictableTransactionResult<()> {
461        Ok(self.0.apply_batch(batch.0)?)
462    }
463}
464
465impl<V, E> Iterator for StructuredIter<V, E>
466where
467    E: Encoding<V>,
468{
469    type Item = Result<(IVec, V)>;
470
471    fn next(&mut self) -> Option<Self::Item> {
472        match self.0.next()? {
473            Ok((key, v)) => Some(E::decode(&v).map(move |value| (key, value))),
474            Err(e) => Some(Err(e.into())),
475        }
476    }
477}
478
479impl<V, E> DoubleEndedIterator for StructuredIter<V, E>
480where
481    E: Encoding<V>,
482{
483    fn next_back(&mut self) -> Option<<Self as Iterator>::Item> {
484        match self.0.next_back()? {
485            Ok((key, v)) => Some(E::decode(&v).map(move |value| (key, value))),
486            Err(e) => Some(Err(e.into())),
487        }
488    }
489}