sled/
tree.rs

1use std::{
2    num::NonZeroU64,
3    borrow::Cow,
4    fmt::{self, Debug},
5    ops::{self, Deref, RangeBounds},
6    sync::atomic::Ordering::SeqCst,
7};
8
9use parking_lot::RwLock;
10
11use crate::{atomic_shim::AtomicU64, pagecache::NodeView, *};
12
13#[derive(Debug, Clone)]
14pub(crate) struct View<'g> {
15    pub node_view: NodeView<'g>,
16    pub pid: PageId,
17    pub size: u64,
18}
19
20impl<'g> Deref for View<'g> {
21    type Target = Node;
22
23    fn deref(&self) -> &Node {
24        &*self.node_view
25    }
26}
27
28impl IntoIterator for &'_ Tree {
29    type Item = Result<(IVec, IVec)>;
30    type IntoIter = Iter;
31
32    fn into_iter(self) -> Iter {
33        self.iter()
34    }
35}
36
37/// A flash-sympathetic persistent lock-free B+ tree.
38///
39/// A `Tree` represents a single logical keyspace / namespace / bucket.
40///
41/// Separate `Trees` may be opened to separate concerns using
42/// `Db::open_tree`.
43///
44/// `Db` implements `Deref<Target = Tree>` such that a `Db` acts
45/// like the "default" `Tree`. This is the only `Tree` that cannot
46/// be deleted via `Db::drop_tree`.
47///
48/// # Examples
49///
50/// ```
51/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
52/// use sled::IVec;
53///
54/// # let _ = std::fs::remove_dir_all("db");
55/// let db: sled::Db = sled::open("db")?;
56/// db.insert(b"yo!", b"v1".to_vec());
57/// assert_eq!(db.get(b"yo!"), Ok(Some(IVec::from(b"v1"))));
58///
59/// // Atomic compare-and-swap.
60/// db.compare_and_swap(
61///     b"yo!",      // key
62///     Some(b"v1"), // old value, None for not present
63///     Some(b"v2"), // new value, None for delete
64/// )?;
65///
66/// // Iterates over key-value pairs, starting at the given key.
67/// let scan_key: &[u8] = b"a non-present key before yo!";
68/// let mut iter = db.range(scan_key..);
69/// assert_eq!(
70///     iter.next().unwrap(),
71///     Ok((IVec::from(b"yo!"), IVec::from(b"v2")))
72/// );
73/// assert_eq!(iter.next(), None);
74///
75/// db.remove(b"yo!");
76/// assert_eq!(db.get(b"yo!"), Ok(None));
77///
78/// let other_tree: sled::Tree = db.open_tree(b"cool db facts")?;
79/// other_tree.insert(
80///     b"k1",
81///     &b"a Db acts like a Tree due to implementing Deref<Target = Tree>"[..]
82/// )?;
83/// # let _ = std::fs::remove_dir_all("db");
84/// # Ok(()) }
85/// ```
86#[derive(Clone)]
87pub struct Tree(pub(crate) Arc<TreeInner>);
88
89#[allow(clippy::module_name_repetitions)]
90pub struct TreeInner {
91    pub(crate) tree_id: IVec,
92    pub(crate) context: Context,
93    pub(crate) subscribers: Subscribers,
94    pub(crate) root: AtomicU64,
95    pub(crate) merge_operator: RwLock<Option<Box<dyn MergeOperator>>>,
96}
97
98impl Drop for TreeInner {
99    fn drop(&mut self) {
100        // Flush the underlying system in a loop until we
101        // have flushed all dirty data.
102        loop {
103            match self.context.pagecache.flush() {
104                Ok(0) => return,
105                Ok(_) => continue,
106                Err(e) => {
107                    error!("failed to flush data to disk: {:?}", e);
108                    return
109                },
110            }
111        }
112    }
113}
114
115impl Deref for Tree {
116    type Target = TreeInner;
117
118    fn deref(&self) -> &TreeInner {
119        &self.0
120    }
121}
122
123#[allow(unsafe_code)]
124unsafe impl Send for Tree {}
125
126#[allow(unsafe_code)]
127unsafe impl Sync for Tree {}
128
129impl Tree {
130    #[doc(hidden)]
131    #[deprecated(since = "0.24.2", note = "replaced by `Tree::insert`")]
132    pub fn set<K, V>(&self, key: K, value: V) -> Result<Option<IVec>>
133    where
134        K: AsRef<[u8]>,
135        V: Into<IVec>,
136    {
137        self.insert(key, value)
138    }
139
140    /// Insert a key to a new value, returning the last value if it
141    /// was set.
142    ///
143    /// # Examples
144    ///
145    /// ```
146    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
147    /// # let config = sled::Config::new().temporary(true);
148    /// # let db = config.open()?;
149    /// assert_eq!(db.insert(&[1, 2, 3], vec![0]), Ok(None));
150    /// assert_eq!(db.insert(&[1, 2, 3], vec![1]), Ok(Some(sled::IVec::from(&[0]))));
151    /// # Ok(()) }
152    /// ```
153    pub fn insert<K, V>(&self, key: K, value: V) -> Result<Option<IVec>>
154    where
155        K: AsRef<[u8]>,
156        V: Into<IVec>,
157    {
158        let value = value.into();
159        let mut guard = pin();
160        let _cc = concurrency_control::read();
161        loop {
162            trace!("setting key {:?}", key.as_ref());
163            if let Ok(res) =
164                self.insert_inner(key.as_ref(), Some(value.clone()), &mut guard)?
165            {
166                return Ok(res);
167            }
168        }
169    }
170
171    pub(crate) fn insert_inner(
172        &self,
173        key: &[u8],
174        mut value: Option<IVec>,
175        guard: &mut Guard,
176    ) -> Result<Conflictable<Option<IVec>>> {
177        let _measure = if value.is_some() {
178            Measure::new(&M.tree_set)
179        } else {
180            Measure::new(&M.tree_del)
181        };
182
183        let View { node_view, pid, .. } =
184            self.view_for_key(key.as_ref(), guard)?;
185
186        let mut subscriber_reservation = self.subscribers.reserve(&key);
187
188        let (encoded_key, last_value) = node_view.node_kv_pair(key.as_ref());
189
190        if value == last_value {
191            // short-circuit a no-op set or delete
192            return Ok(Ok(value))
193        }
194
195        let frag = if let Some(value) = value.clone() {
196            Link::Set(encoded_key, value)
197        } else {
198            Link::Del(encoded_key)
199        };
200
201        let link = self.context.pagecache.link(
202            pid,
203            node_view.0,
204            frag,
205            guard,
206        )?;
207
208        if link.is_ok() {
209            // success
210            if let Some(res) = subscriber_reservation.take() {
211                let event = if let Some(value) = value.take() {
212                    subscriber::Event::Insert {
213                        key: key.as_ref().into(),
214                        value,
215                    }
216                } else {
217                    subscriber::Event::Remove { key: key.as_ref().into() }
218                };
219
220                res.complete(&event);
221            }
222
223            guard.writeset.push(pid);
224
225            Ok(Ok(last_value))
226        } else {
227            M.tree_looped();
228            Ok(Err(Conflict))
229        }
230    }
231
232    /// Perform a multi-key serializable transaction.
233    ///
234    /// # Examples
235    ///
236    /// ```
237    /// # use sled::{transaction::TransactionResult, Config};
238    /// # fn main() -> TransactionResult<()> {
239    /// # let config = sled::Config::new().temporary(true);
240    /// # let db = config.open()?;
241    /// // Use write-only transactions as a writebatch:
242    /// db.transaction(|tx_db| {
243    ///     tx_db.insert(b"k1", b"cats")?;
244    ///     tx_db.insert(b"k2", b"dogs")?;
245    ///     Ok(())
246    /// })?;
247    ///
248    /// // Atomically swap two items:
249    /// db.transaction(|tx_db| {
250    ///     let v1_option = tx_db.remove(b"k1")?;
251    ///     let v1 = v1_option.unwrap();
252    ///     let v2_option = tx_db.remove(b"k2")?;
253    ///     let v2 = v2_option.unwrap();
254    ///
255    ///     tx_db.insert(b"k1", v2)?;
256    ///     tx_db.insert(b"k2", v1)?;
257    ///
258    ///     Ok(())
259    /// })?;
260    ///
261    /// assert_eq!(&db.get(b"k1")?.unwrap(), b"dogs");
262    /// assert_eq!(&db.get(b"k2")?.unwrap(), b"cats");
263    /// # Ok(())
264    /// # }
265    /// ```
266    ///
267    /// A transaction may return information from
268    /// an intentionally-cancelled transaction by using
269    /// the abort function inside the closure in
270    /// combination with the try operator.
271    ///
272    /// ```
273    /// use sled::{transaction::{abort, TransactionError, TransactionResult}, Config};
274    ///
275    /// #[derive(Debug, PartialEq)]
276    /// struct MyBullshitError;
277    ///
278    /// fn main() -> TransactionResult<(), MyBullshitError> {
279    ///     let config = Config::new().temporary(true);
280    ///     let db = config.open()?;
281    ///
282    ///     // Use write-only transactions as a writebatch:
283    ///     let res = db.transaction(|tx_db| {
284    ///         tx_db.insert(b"k1", b"cats")?;
285    ///         tx_db.insert(b"k2", b"dogs")?;
286    ///         // aborting will cause all writes to roll-back.
287    ///         if true {
288    ///             abort(MyBullshitError)?;
289    ///         }
290    ///         Ok(42)
291    ///     }).unwrap_err();
292    ///
293    ///     assert_eq!(res, TransactionError::Abort(MyBullshitError));
294    ///     assert_eq!(db.get(b"k1")?, None);
295    ///     assert_eq!(db.get(b"k2")?, None);
296    ///
297    ///     Ok(())
298    /// }
299    /// ```
300    ///
301    ///
302    /// Transactions also work on tuples of `Tree`s,
303    /// preserving serializable ACID semantics!
304    /// In this example, we treat two trees like a
305    /// work queue, atomically apply updates to
306    /// data and move them from the unprocessed `Tree`
307    /// to the processed `Tree`.
308    ///
309    /// ```
310    /// # use sled::transaction::TransactionResult;
311    /// # fn main() -> TransactionResult<()> {
312    /// # let config = sled::Config::new().temporary(true);
313    /// # let db = config.open()?;
314    /// use sled::Transactional;
315    ///
316    /// let unprocessed = db.open_tree(b"unprocessed items")?;
317    /// let processed = db.open_tree(b"processed items")?;
318    ///
319    /// // An update somehow gets into the tree, which we
320    /// // later trigger the atomic processing of.
321    /// unprocessed.insert(b"k3", b"ligers")?;
322    ///
323    /// // Atomically process the new item and move it
324    /// // between `Tree`s.
325    /// (&unprocessed, &processed)
326    ///     .transaction(|(tx_unprocessed, tx_processed)| {
327    ///         let unprocessed_item = tx_unprocessed.remove(b"k3")?.unwrap();
328    ///         let mut processed_item = b"yappin' ".to_vec();
329    ///         processed_item.extend_from_slice(&unprocessed_item);
330    ///         tx_processed.insert(b"k3", processed_item)?;
331    ///         Ok(())
332    ///     })?;
333    ///
334    /// assert_eq!(unprocessed.get(b"k3").unwrap(), None);
335    /// assert_eq!(&processed.get(b"k3").unwrap().unwrap(), b"yappin' ligers");
336    /// # Ok(()) }
337    /// ```
338    pub fn transaction<F, A, E>(
339        &self,
340        f: F,
341    ) -> transaction::TransactionResult<A, E>
342    where
343        F: Fn(
344            &transaction::TransactionalTree,
345        ) -> transaction::ConflictableTransactionResult<A, E>,
346    {
347        Transactional::transaction(&self, f)
348    }
349
350    /// Create a new batched update that can be
351    /// atomically applied.
352    ///
353    /// It is possible to apply a `Batch` in a transaction
354    /// as well, which is the way you can apply a `Batch`
355    /// to multiple `Tree`s atomically.
356    ///
357    /// # Examples
358    ///
359    /// ```
360    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
361    /// # let config = sled::Config::new().temporary(true);
362    /// # let db = config.open()?;
363    /// db.insert("key_0", "val_0")?;
364    ///
365    /// let mut batch = sled::Batch::default();
366    /// batch.insert("key_a", "val_a");
367    /// batch.insert("key_b", "val_b");
368    /// batch.insert("key_c", "val_c");
369    /// batch.remove("key_0");
370    ///
371    /// db.apply_batch(batch)?;
372    /// // key_0 no longer exists, and key_a, key_b, and key_c
373    /// // now do exist.
374    /// # Ok(()) }
375    /// ```
376    pub fn apply_batch(&self, batch: Batch) -> Result<()> {
377        let _cc = concurrency_control::write();
378        let mut guard = pin();
379        self.apply_batch_inner(batch, &mut guard)
380    }
381
382    pub(crate) fn apply_batch_inner(
383        &self,
384        batch: Batch,
385        guard: &mut Guard,
386    ) -> Result<()> {
387        let peg = self.context.pin_log(guard)?;
388        trace!("applying batch {:?}", batch);
389        for (k, v_opt) in batch.writes {
390            loop {
391                if self.insert_inner(&k, v_opt.clone(), guard)?.is_ok() {
392                    break;
393                }
394            }
395        }
396
397        // when the peg drops, it ensures all updates
398        // written to the log since its creation are
399        // recovered atomically
400        peg.seal_batch()
401    }
402
403    /// Retrieve a value from the `Tree` if it exists.
404    ///
405    /// # Examples
406    ///
407    /// ```
408    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
409    /// # let config = sled::Config::new().temporary(true);
410    /// # let db = config.open()?;
411    /// db.insert(&[0], vec![0])?;
412    /// assert_eq!(db.get(&[0]), Ok(Some(sled::IVec::from(vec![0]))));
413    /// assert_eq!(db.get(&[1]), Ok(None));
414    /// # Ok(()) }
415    /// ```
416    pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> {
417        let mut guard = pin();
418        let _cc = concurrency_control::read();
419        loop {
420            if let Ok(get) = self.get_inner(key.as_ref(), &mut guard)? {
421                return Ok(get);
422            }
423        }
424    }
425
426    pub(crate) fn get_inner(
427        &self,
428        key: &[u8],
429        guard: &mut Guard,
430    ) -> Result<Conflictable<Option<IVec>>> {
431        let _measure = Measure::new(&M.tree_get);
432
433        trace!("getting key {:?}", key);
434
435        let View { node_view, pid, .. } = self.view_for_key(key.as_ref(), guard)?;
436
437        let pair = node_view.leaf_pair_for_key(key.as_ref());
438        let val = pair.map(|kv| kv.1.clone());
439
440        guard.readset.push(pid);
441
442        Ok(Ok(val))
443    }
444
445    #[doc(hidden)]
446    #[deprecated(since = "0.24.2", note = "replaced by `Tree::remove`")]
447    pub fn del<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> {
448        self.remove(key)
449    }
450
451    /// Delete a value, returning the old value if it existed.
452    ///
453    /// # Examples
454    ///
455    /// ```
456    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
457    /// # let config = sled::Config::new().temporary(true);
458    /// # let db = config.open()?;
459    /// db.insert(&[1], vec![1]);
460    /// assert_eq!(db.remove(&[1]), Ok(Some(sled::IVec::from(vec![1]))));
461    /// assert_eq!(db.remove(&[1]), Ok(None));
462    /// # Ok(()) }
463    /// ```
464    pub fn remove<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> {
465        let mut guard = pin();
466        let _cc = concurrency_control::read();
467        loop {
468            trace!("removing key {:?}", key.as_ref());
469
470            if let Ok(res) = self.insert_inner(key.as_ref(), None, &mut guard)? {
471                return Ok(res);
472            }
473        }
474    }
475
476    /// Compare and swap. Capable of unique creation, conditional modification,
477    /// or deletion. If old is `None`, this will only set the value if it
478    /// doesn't exist yet. If new is `None`, will delete the value if old is
479    /// correct. If both old and new are `Some`, will modify the value if
480    /// old is correct.
481    ///
482    /// It returns `Ok(Ok(()))` if operation finishes successfully.
483    ///
484    /// If it fails it returns:
485    ///     - `Ok(Err(CompareAndSwapError(current, proposed)))` if operation
486    ///       failed to setup a new value. `CompareAndSwapError` contains
487    ///       current and proposed values.
488    ///     - `Err(Error::Unsupported)` if the database is opened in read-only
489    ///       mode.
490    ///
491    /// # Examples
492    ///
493    /// ```
494    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
495    /// # let config = sled::Config::new().temporary(true);
496    /// # let db = config.open()?;
497    /// // unique creation
498    /// assert_eq!(
499    ///     db.compare_and_swap(&[1], None as Option<&[u8]>, Some(&[10])),
500    ///     Ok(Ok(()))
501    /// );
502    ///
503    /// // conditional modification
504    /// assert_eq!(
505    ///     db.compare_and_swap(&[1], Some(&[10]), Some(&[20])),
506    ///     Ok(Ok(()))
507    /// );
508    ///
509    /// // failed conditional modification -- the current value is returned in
510    /// // the error variant
511    /// let operation = db.compare_and_swap(&[1], Some(&[30]), Some(&[40]));
512    /// assert!(operation.is_ok()); // the operation succeeded
513    /// let modification = operation.unwrap();
514    /// assert!(modification.is_err());
515    /// let actual_value = modification.unwrap_err();
516    /// assert_eq!(actual_value.current.map(|ivec| ivec.to_vec()), Some(vec![20]));
517    ///
518    /// // conditional deletion
519    /// assert_eq!(
520    ///     db.compare_and_swap(&[1], Some(&[20]), None as Option<&[u8]>),
521    ///     Ok(Ok(()))
522    /// );
523    /// assert_eq!(db.get(&[1]), Ok(None));
524    /// # Ok(()) }
525    /// ```
526    #[allow(clippy::needless_pass_by_value)]
527    pub fn compare_and_swap<K, OV, NV>(
528        &self,
529        key: K,
530        old: Option<OV>,
531        new: Option<NV>,
532    ) -> CompareAndSwapResult
533    where
534        K: AsRef<[u8]>,
535        OV: AsRef<[u8]>,
536        NV: Into<IVec>,
537    {
538        trace!("cas'ing key {:?}", key.as_ref());
539        let _measure = Measure::new(&M.tree_cas);
540
541        let guard = pin();
542        let _cc = concurrency_control::read();
543
544        let new = new.map(Into::into);
545
546        // we need to retry caps until old != cur, since just because
547        // cap fails it doesn't mean our value was changed.
548        loop {
549            let View { pid, node_view, .. } =
550                self.view_for_key(key.as_ref(), &guard)?;
551
552            let (encoded_key, current_value) =
553                node_view.node_kv_pair(key.as_ref());
554            let matches = match (old.as_ref(), &current_value) {
555                (None, None) => true,
556                (Some(o), Some(ref c)) => o.as_ref() == &**c,
557                _ => false,
558            };
559
560            if !matches {
561                return Ok(Err(CompareAndSwapError {
562                    current: current_value,
563                    proposed: new,
564                }));
565            }
566
567            let mut subscriber_reservation = self.subscribers.reserve(&key);
568
569            let frag = if let Some(ref new) = new {
570                Link::Set(encoded_key, new.clone())
571            } else {
572                Link::Del(encoded_key)
573            };
574            let link =
575                self.context.pagecache.link(pid, node_view.0, frag, &guard)?;
576
577            if link.is_ok() {
578                if let Some(res) = subscriber_reservation.take() {
579                    let event = if let Some(new) = new {
580                        subscriber::Event::Insert {
581                            key: key.as_ref().into(),
582                            value: new,
583                        }
584                    } else {
585                        subscriber::Event::Remove { key: key.as_ref().into() }
586                    };
587
588                    res.complete(&event);
589                }
590
591                return Ok(Ok(()));
592            }
593            M.tree_looped();
594        }
595    }
596
597    /// Fetch the value, apply a function to it and return the result.
598    ///
599    /// # Note
600    ///
601    /// This may call the function multiple times if the value has been
602    /// changed from other threads in the meantime.
603    ///
604    /// # Examples
605    ///
606    /// ```
607    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
608    /// use sled::{Config, Error, IVec};
609    /// use std::convert::TryInto;
610    ///
611    /// let config = Config::new().temporary(true);
612    /// let db = config.open()?;
613    ///
614    /// fn u64_to_ivec(number: u64) -> IVec {
615    ///     IVec::from(number.to_be_bytes().to_vec())
616    /// }
617    ///
618    /// let zero = u64_to_ivec(0);
619    /// let one = u64_to_ivec(1);
620    /// let two = u64_to_ivec(2);
621    /// let three = u64_to_ivec(3);
622    ///
623    /// fn increment(old: Option<&[u8]>) -> Option<Vec<u8>> {
624    ///     let number = match old {
625    ///         Some(bytes) => {
626    ///             let array: [u8; 8] = bytes.try_into().unwrap();
627    ///             let number = u64::from_be_bytes(array);
628    ///             number + 1
629    ///         }
630    ///         None => 0,
631    ///     };
632    ///
633    ///     Some(number.to_be_bytes().to_vec())
634    /// }
635    ///
636    /// assert_eq!(db.update_and_fetch("counter", increment), Ok(Some(zero)));
637    /// assert_eq!(db.update_and_fetch("counter", increment), Ok(Some(one)));
638    /// assert_eq!(db.update_and_fetch("counter", increment), Ok(Some(two)));
639    /// assert_eq!(db.update_and_fetch("counter", increment), Ok(Some(three)));
640    /// # Ok(()) }
641    /// ```
642    pub fn update_and_fetch<K, V, F>(
643        &self,
644        key: K,
645        mut f: F,
646    ) -> Result<Option<IVec>>
647    where
648        K: AsRef<[u8]>,
649        F: FnMut(Option<&[u8]>) -> Option<V>,
650        V: Into<IVec>,
651    {
652        let key_ref = key.as_ref();
653        let mut current = self.get(key_ref)?;
654
655        loop {
656            let tmp = current.as_ref().map(AsRef::as_ref);
657            let next = f(tmp).map(Into::into);
658            match self.compare_and_swap::<_, _, IVec>(
659                key_ref,
660                tmp,
661                next.clone(),
662            )? {
663                Ok(()) => return Ok(next),
664                Err(CompareAndSwapError { current: cur, .. }) => {
665                    current = cur;
666                }
667            }
668        }
669    }
670
671    /// Fetch the value, apply a function to it and return the previous value.
672    ///
673    /// # Note
674    ///
675    /// This may call the function multiple times if the value has been
676    /// changed from other threads in the meantime.
677    ///
678    /// # Examples
679    ///
680    /// ```
681    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
682    /// use sled::{Config, Error, IVec};
683    /// use std::convert::TryInto;
684    ///
685    /// let config = Config::new().temporary(true);
686    /// let db = config.open()?;
687    ///
688    /// fn u64_to_ivec(number: u64) -> IVec {
689    ///     IVec::from(number.to_be_bytes().to_vec())
690    /// }
691    ///
692    /// let zero = u64_to_ivec(0);
693    /// let one = u64_to_ivec(1);
694    /// let two = u64_to_ivec(2);
695    ///
696    /// fn increment(old: Option<&[u8]>) -> Option<Vec<u8>> {
697    ///     let number = match old {
698    ///         Some(bytes) => {
699    ///             let array: [u8; 8] = bytes.try_into().unwrap();
700    ///             let number = u64::from_be_bytes(array);
701    ///             number + 1
702    ///         }
703    ///         None => 0,
704    ///     };
705    ///
706    ///     Some(number.to_be_bytes().to_vec())
707    /// }
708    ///
709    /// assert_eq!(db.fetch_and_update("counter", increment), Ok(None));
710    /// assert_eq!(db.fetch_and_update("counter", increment), Ok(Some(zero)));
711    /// assert_eq!(db.fetch_and_update("counter", increment), Ok(Some(one)));
712    /// assert_eq!(db.fetch_and_update("counter", increment), Ok(Some(two)));
713    /// # Ok(()) }
714    /// ```
715    pub fn fetch_and_update<K, V, F>(
716        &self,
717        key: K,
718        mut f: F,
719    ) -> Result<Option<IVec>>
720    where
721        K: AsRef<[u8]>,
722        F: FnMut(Option<&[u8]>) -> Option<V>,
723        V: Into<IVec>,
724    {
725        let key_ref = key.as_ref();
726        let mut current = self.get(key_ref)?;
727
728        loop {
729            let tmp = current.as_ref().map(AsRef::as_ref);
730            let next = f(tmp);
731            match self.compare_and_swap(key_ref, tmp, next)? {
732                Ok(()) => return Ok(current),
733                Err(CompareAndSwapError { current: cur, .. }) => {
734                    current = cur;
735                }
736            }
737        }
738    }
739
740    /// Subscribe to `Event`s that happen to keys that have
741    /// the specified prefix. Events for particular keys are
742    /// guaranteed to be witnessed in the same order by all
743    /// threads, but threads may witness different interleavings
744    /// of `Event`s across different keys. If subscribers don't
745    /// keep up with new writes, they will cause new writes
746    /// to block. There is a buffer of 1024 items per
747    /// `Subscriber`. This can be used to build reactive
748    /// and replicated systems.
749    ///
750    /// `Subscriber` implements both `Iterator<Item = Event>`
751    /// and `Future<Output=Option<Event>>`
752    ///
753    /// # Examples
754    ///
755    /// Synchronous, blocking subscriber:
756    /// ```
757    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
758    /// # let config = sled::Config::new().temporary(true);
759    /// # let db = config.open()?;
760    /// // watch all events by subscribing to the empty prefix
761    /// let mut subscriber = db.watch_prefix(vec![]);
762    ///
763    /// let tree_2 = db.clone();
764    /// let thread = std::thread::spawn(move || {
765    ///     db.insert(vec![0], vec![1])
766    /// });
767    ///
768    /// // `Subscription` implements `Iterator<Item=Event>`
769    /// for event in subscriber.take(1) {
770    ///     match event {
771    ///         sled::Event::Insert{ key, value } => assert_eq!(key.as_ref(), &[0]),
772    ///         sled::Event::Remove {key } => {}
773    ///     }
774    /// }
775    ///
776    /// # thread.join().unwrap();
777    /// # Ok(()) }
778    /// ```
779    /// Aynchronous, non-blocking subscriber:
780    ///
781    /// `Subscription` implements `Future<Output=Option<Event>>`.
782    ///
783    /// `while let Some(event) = (&mut subscriber).await { /* use it */ }`
784    pub fn watch_prefix<P: AsRef<[u8]>>(&self, prefix: P) -> Subscriber {
785        self.subscribers.register(prefix.as_ref())
786    }
787
788    /// Synchronously flushes all dirty IO buffers and calls
789    /// fsync. If this succeeds, it is guaranteed that all
790    /// previous writes will be recovered if the system
791    /// crashes. Returns the number of bytes flushed during
792    /// this call.
793    ///
794    /// Flushing can take quite a lot of time, and you should
795    /// measure the performance impact of using it on
796    /// realistic sustained workloads running on realistic
797    /// hardware.
798    pub fn flush(&self) -> Result<usize> {
799        self.context.pagecache.flush()
800    }
801
802    /// Asynchronously flushes all dirty IO buffers
803    /// and calls fsync. If this succeeds, it is
804    /// guaranteed that all previous writes will
805    /// be recovered if the system crashes. Returns
806    /// the number of bytes flushed during this call.
807    ///
808    /// Flushing can take quite a lot of time, and you
809    /// should measure the performance impact of
810    /// using it on realistic sustained workloads
811    /// running on realistic hardware.
812    // this clippy check is mis-firing on async code.
813    #[allow(clippy::used_underscore_binding)]
814    pub async fn flush_async(&self) -> Result<usize> {
815        let pagecache = self.context.pagecache.clone();
816        if let Some(result) = threadpool::spawn(move || pagecache.flush())?.await
817        {
818            result
819        } else {
820            Err(Error::ReportableBug(
821                "threadpool failed to complete \
822                action before shutdown"
823                    .to_string(),
824            ))
825        }
826    }
827
828    /// Returns `true` if the `Tree` contains a value for
829    /// the specified key.
830    ///
831    /// # Examples
832    ///
833    /// ```
834    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
835    /// # let config = sled::Config::new().temporary(true);
836    /// # let db = config.open()?;
837    /// db.insert(&[0], vec![0])?;
838    /// assert!(db.contains_key(&[0])?);
839    /// assert!(!db.contains_key(&[1])?);
840    /// # Ok(()) }
841    /// ```
842    pub fn contains_key<K: AsRef<[u8]>>(&self, key: K) -> Result<bool> {
843        self.get(key).map(|v| v.is_some())
844    }
845
846    /// Retrieve the key and value before the provided key,
847    /// if one exists.
848    ///
849    /// # Examples
850    ///
851    /// ```
852    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
853    /// use sled::IVec;
854    /// # let config = sled::Config::new().temporary(true);
855    /// # let db = config.open()?;
856    /// for i in 0..10 {
857    ///     db.insert(&[i], vec![i])
858    ///         .expect("should write successfully");
859    /// }
860    ///
861    /// assert_eq!(db.get_lt(&[]), Ok(None));
862    /// assert_eq!(db.get_lt(&[0]), Ok(None));
863    /// assert_eq!(
864    ///     db.get_lt(&[1]),
865    ///     Ok(Some((IVec::from(&[0]), IVec::from(&[0]))))
866    /// );
867    /// assert_eq!(
868    ///     db.get_lt(&[9]),
869    ///     Ok(Some((IVec::from(&[8]), IVec::from(&[8]))))
870    /// );
871    /// assert_eq!(
872    ///     db.get_lt(&[10]),
873    ///     Ok(Some((IVec::from(&[9]), IVec::from(&[9]))))
874    /// );
875    /// assert_eq!(
876    ///     db.get_lt(&[255]),
877    ///     Ok(Some((IVec::from(&[9]), IVec::from(&[9]))))
878    /// );
879    /// # Ok(()) }
880    /// ```
881    pub fn get_lt<K>(&self, key: K) -> Result<Option<(IVec, IVec)>>
882    where
883        K: AsRef<[u8]>,
884    {
885        let _measure = Measure::new(&M.tree_get);
886        self.range(..key).next_back().transpose()
887    }
888
889    /// Retrieve the next key and value from the `Tree` after the
890    /// provided key.
891    ///
892    /// # Note
893    /// The order follows the Ord implementation for `Vec<u8>`:
894    ///
895    /// `[] < [0] < [255] < [255, 0] < [255, 255] ...`
896    ///
897    /// To retain the ordering of numerical types use big endian reprensentation
898    ///
899    /// # Examples
900    ///
901    /// ```
902    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
903    /// use sled::IVec;
904    /// # let config = sled::Config::new().temporary(true);
905    /// # let db = config.open()?;
906    /// for i in 0..10 {
907    ///     db.insert(&[i], vec![i])?;
908    /// }
909    ///
910    /// assert_eq!(
911    ///     db.get_gt(&[]),
912    ///     Ok(Some((IVec::from(&[0]), IVec::from(&[0]))))
913    /// );
914    /// assert_eq!(
915    ///     db.get_gt(&[0]),
916    ///     Ok(Some((IVec::from(&[1]), IVec::from(&[1]))))
917    /// );
918    /// assert_eq!(
919    ///     db.get_gt(&[1]),
920    ///     Ok(Some((IVec::from(&[2]), IVec::from(&[2]))))
921    /// );
922    /// assert_eq!(
923    ///     db.get_gt(&[8]),
924    ///     Ok(Some((IVec::from(&[9]), IVec::from(&[9]))))
925    /// );
926    /// assert_eq!(db.get_gt(&[9]), Ok(None));
927    ///
928    /// db.insert(500u16.to_be_bytes(), vec![10]);
929    /// assert_eq!(
930    ///     db.get_gt(&499u16.to_be_bytes()),
931    ///     Ok(Some((IVec::from(&500u16.to_be_bytes()), IVec::from(&[10]))))
932    /// );
933    /// # Ok(()) }
934    /// ```
935    pub fn get_gt<K>(&self, key: K) -> Result<Option<(IVec, IVec)>>
936    where
937        K: AsRef<[u8]>,
938    {
939        let _measure = Measure::new(&M.tree_get);
940        self.range((ops::Bound::Excluded(key), ops::Bound::Unbounded))
941            .next()
942            .transpose()
943    }
944
945    /// Merge state directly into a given key's value using the
946    /// configured merge operator. This allows state to be written
947    /// into a value directly, without any read-modify-write steps.
948    /// Merge operators can be used to implement arbitrary data
949    /// structures.
950    ///
951    /// Calling `merge` will return an `Unsupported` error if it
952    /// is called without first setting a merge operator function.
953    ///
954    /// Merge operators are shared by all instances of a particular
955    /// `Tree`. Different merge operators may be set on different
956    /// `Tree`s.
957    ///
958    /// # Examples
959    ///
960    /// ```
961    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
962    /// # let config = sled::Config::new().temporary(true);
963    /// # let db = config.open()?;
964    /// use sled::IVec;
965    ///
966    /// fn concatenate_merge(
967    ///   _key: &[u8],               // the key being merged
968    ///   old_value: Option<&[u8]>,  // the previous value, if one existed
969    ///   merged_bytes: &[u8]        // the new bytes being merged in
970    /// ) -> Option<Vec<u8>> {       // set the new value, return None to delete
971    ///   let mut ret = old_value
972    ///     .map(|ov| ov.to_vec())
973    ///     .unwrap_or_else(|| vec![]);
974    ///
975    ///   ret.extend_from_slice(merged_bytes);
976    ///
977    ///   Some(ret)
978    /// }
979    ///
980    /// db.set_merge_operator(concatenate_merge);
981    ///
982    /// let k = b"k1";
983    ///
984    /// db.insert(k, vec![0]);
985    /// db.merge(k, vec![1]);
986    /// db.merge(k, vec![2]);
987    /// assert_eq!(db.get(k), Ok(Some(IVec::from(vec![0, 1, 2]))));
988    ///
989    /// // Replace previously merged data. The merge function will not be called.
990    /// db.insert(k, vec![3]);
991    /// assert_eq!(db.get(k), Ok(Some(IVec::from(vec![3]))));
992    ///
993    /// // Merges on non-present values will cause the merge function to be called
994    /// // with `old_value == None`. If the merge function returns something (which it
995    /// // does, in this case) a new value will be inserted.
996    /// db.remove(k);
997    /// db.merge(k, vec![4]);
998    /// assert_eq!(db.get(k), Ok(Some(IVec::from(vec![4]))));
999    /// # Ok(()) }
1000    /// ```
1001    pub fn merge<K, V>(&self, key: K, value: V) -> Result<Option<IVec>>
1002    where
1003        K: AsRef<[u8]>,
1004        V: AsRef<[u8]>,
1005    {
1006        let _cc = concurrency_control::read();
1007        loop {
1008            if let Ok(merge) = self.merge_inner(key.as_ref(), value.as_ref())? {
1009                return Ok(merge);
1010            }
1011        }
1012    }
1013
1014    pub(crate) fn merge_inner(
1015        &self,
1016        key: &[u8],
1017        value: &[u8],
1018    ) -> Result<Conflictable<Option<IVec>>> {
1019        trace!("merging key {:?}", key);
1020        let _measure = Measure::new(&M.tree_merge);
1021
1022        let merge_operator_opt = self.merge_operator.read();
1023
1024        if merge_operator_opt.is_none() {
1025            return Err(Error::Unsupported(
1026                "must set a merge operator on this Tree \
1027                 before calling merge by calling \
1028                 Tree::set_merge_operator"
1029                    .to_owned(),
1030            ));
1031        }
1032
1033        let merge_operator = merge_operator_opt.as_ref().unwrap();
1034
1035        loop {
1036            let guard = pin();
1037            let View { pid, node_view, .. } =
1038                self.view_for_key(key.as_ref(), &guard)?;
1039
1040            let (encoded_key, current_value) =
1041                node_view.node_kv_pair(key.as_ref());
1042            let tmp = current_value.as_ref().map(AsRef::as_ref);
1043            let new = merge_operator(key, tmp, value).map(IVec::from);
1044
1045            let mut subscriber_reservation = self.subscribers.reserve(&key);
1046
1047            let frag = if let Some(ref new) = new {
1048                Link::Set(encoded_key, new.clone())
1049            } else {
1050                Link::Del(encoded_key)
1051            };
1052            let link =
1053                self.context.pagecache.link(pid, node_view.0, frag, &guard)?;
1054
1055            if link.is_ok() {
1056                if let Some(res) = subscriber_reservation.take() {
1057                    let event = if let Some(new) = &new {
1058                        subscriber::Event::Insert {
1059                            key: key.as_ref().into(),
1060                            value: new.clone(),
1061                        }
1062                    } else {
1063                        subscriber::Event::Remove { key: key.as_ref().into() }
1064                    };
1065
1066                    res.complete(&event);
1067                }
1068
1069                return Ok(Ok(new));
1070            }
1071            M.tree_looped();
1072        }
1073    }
1074
1075    /// Sets a merge operator for use with the `merge` function.
1076    ///
1077    /// Merge state directly into a given key's value using the
1078    /// configured merge operator. This allows state to be written
1079    /// into a value directly, without any read-modify-write steps.
1080    /// Merge operators can be used to implement arbitrary data
1081    /// structures.
1082    ///
1083    /// # Panics
1084    ///
1085    /// Calling `merge` will panic if no merge operator has been
1086    /// configured.
1087    ///
1088    /// # Examples
1089    ///
1090    /// ```
1091    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1092    /// # let config = sled::Config::new().temporary(true);
1093    /// # let db = config.open()?;
1094    /// use sled::IVec;
1095    ///
1096    /// fn concatenate_merge(
1097    ///   _key: &[u8],               // the key being merged
1098    ///   old_value: Option<&[u8]>,  // the previous value, if one existed
1099    ///   merged_bytes: &[u8]        // the new bytes being merged in
1100    /// ) -> Option<Vec<u8>> {       // set the new value, return None to delete
1101    ///   let mut ret = old_value
1102    ///     .map(|ov| ov.to_vec())
1103    ///     .unwrap_or_else(|| vec![]);
1104    ///
1105    ///   ret.extend_from_slice(merged_bytes);
1106    ///
1107    ///   Some(ret)
1108    /// }
1109    ///
1110    /// db.set_merge_operator(concatenate_merge);
1111    ///
1112    /// let k = b"k1";
1113    ///
1114    /// db.insert(k, vec![0]);
1115    /// db.merge(k, vec![1]);
1116    /// db.merge(k, vec![2]);
1117    /// assert_eq!(db.get(k), Ok(Some(IVec::from(vec![0, 1, 2]))));
1118    ///
1119    /// // Replace previously merged data. The merge function will not be called.
1120    /// db.insert(k, vec![3]);
1121    /// assert_eq!(db.get(k), Ok(Some(IVec::from(vec![3]))));
1122    ///
1123    /// // Merges on non-present values will cause the merge function to be called
1124    /// // with `old_value == None`. If the merge function returns something (which it
1125    /// // does, in this case) a new value will be inserted.
1126    /// db.remove(k);
1127    /// db.merge(k, vec![4]);
1128    /// assert_eq!(db.get(k), Ok(Some(IVec::from(vec![4]))));
1129    /// # Ok(()) }
1130    /// ```
1131    pub fn set_merge_operator(
1132        &self,
1133        merge_operator: impl MergeOperator + 'static,
1134    ) {
1135        let mut mo_write = self.merge_operator.write();
1136        *mo_write = Some(Box::new(merge_operator));
1137    }
1138
1139    /// Create a double-ended iterator over the tuples of keys and
1140    /// values in this tree.
1141    ///
1142    /// # Examples
1143    ///
1144    /// ```
1145    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1146    /// # let config = sled::Config::new().temporary(true);
1147    /// # let db = config.open()?;
1148    /// use sled::IVec;
1149    /// db.insert(&[1], vec![10]);
1150    /// db.insert(&[2], vec![20]);
1151    /// db.insert(&[3], vec![30]);
1152    /// let mut iter = db.iter();
1153    /// assert_eq!(
1154    ///     iter.next().unwrap(),
1155    ///     Ok((IVec::from(&[1]), IVec::from(&[10])))
1156    /// );
1157    /// assert_eq!(
1158    ///     iter.next().unwrap(),
1159    ///     Ok((IVec::from(&[2]), IVec::from(&[20])))
1160    /// );
1161    /// assert_eq!(
1162    ///     iter.next().unwrap(),
1163    ///     Ok((IVec::from(&[3]), IVec::from(&[30])))
1164    /// );
1165    /// assert_eq!(iter.next(), None);
1166    /// # Ok(()) }
1167    /// ```
1168    pub fn iter(&self) -> Iter {
1169        self.range::<Vec<u8>, _>(..)
1170    }
1171
1172    /// Create a double-ended iterator over tuples of keys and values,
1173    /// where the keys fall within the specified range.
1174    ///
1175    /// # Examples
1176    ///
1177    /// ```
1178    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1179    /// # let config = sled::Config::new().temporary(true);
1180    /// # let db = config.open()?;
1181    /// use sled::IVec;
1182    /// db.insert(&[0], vec![0])?;
1183    /// db.insert(&[1], vec![10])?;
1184    /// db.insert(&[2], vec![20])?;
1185    /// db.insert(&[3], vec![30])?;
1186    /// db.insert(&[4], vec![40])?;
1187    /// db.insert(&[5], vec![50])?;
1188    ///
1189    /// let start: &[u8] = &[2];
1190    /// let end: &[u8] = &[4];
1191    /// let mut r = db.range(start..end);
1192    /// assert_eq!(r.next().unwrap(), Ok((IVec::from(&[2]), IVec::from(&[20]))));
1193    /// assert_eq!(r.next().unwrap(), Ok((IVec::from(&[3]), IVec::from(&[30]))));
1194    /// assert_eq!(r.next(), None);
1195    ///
1196    /// let mut r = db.range(start..end).rev();
1197    /// assert_eq!(r.next().unwrap(), Ok((IVec::from(&[3]), IVec::from(&[30]))));
1198    /// assert_eq!(r.next().unwrap(), Ok((IVec::from(&[2]), IVec::from(&[20]))));
1199    /// assert_eq!(r.next(), None);
1200    /// # Ok(()) }
1201    /// ```
1202    pub fn range<K, R>(&self, range: R) -> Iter
1203    where
1204        K: AsRef<[u8]>,
1205        R: RangeBounds<K>,
1206    {
1207        let lo = match range.start_bound() {
1208            ops::Bound::Included(start) => {
1209                ops::Bound::Included(IVec::from(start.as_ref()))
1210            }
1211            ops::Bound::Excluded(start) => {
1212                ops::Bound::Excluded(IVec::from(start.as_ref()))
1213            }
1214            ops::Bound::Unbounded => ops::Bound::Included(IVec::from(&[])),
1215        };
1216
1217        let hi = match range.end_bound() {
1218            ops::Bound::Included(end) => {
1219                ops::Bound::Included(IVec::from(end.as_ref()))
1220            }
1221            ops::Bound::Excluded(end) => {
1222                ops::Bound::Excluded(IVec::from(end.as_ref()))
1223            }
1224            ops::Bound::Unbounded => ops::Bound::Unbounded,
1225        };
1226
1227        Iter {
1228            tree: self.clone(),
1229            hi,
1230            lo,
1231            cached_node: None,
1232            going_forward: true,
1233        }
1234    }
1235
1236    /// Create an iterator over tuples of keys and values,
1237    /// where the all the keys starts with the given prefix.
1238    ///
1239    /// # Examples
1240    ///
1241    /// ```
1242    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1243    /// # let config = sled::Config::new().temporary(true);
1244    /// # let db = config.open()?;
1245    /// use sled::IVec;
1246    /// db.insert(&[0, 0, 0], vec![0, 0, 0])?;
1247    /// db.insert(&[0, 0, 1], vec![0, 0, 1])?;
1248    /// db.insert(&[0, 0, 2], vec![0, 0, 2])?;
1249    /// db.insert(&[0, 0, 3], vec![0, 0, 3])?;
1250    /// db.insert(&[0, 1, 0], vec![0, 1, 0])?;
1251    /// db.insert(&[0, 1, 1], vec![0, 1, 1])?;
1252    ///
1253    /// let prefix: &[u8] = &[0, 0];
1254    /// let mut r = db.scan_prefix(prefix);
1255    /// assert_eq!(
1256    ///     r.next(),
1257    ///     Some(Ok((IVec::from(&[0, 0, 0]), IVec::from(&[0, 0, 0]))))
1258    /// );
1259    /// assert_eq!(
1260    ///     r.next(),
1261    ///     Some(Ok((IVec::from(&[0, 0, 1]), IVec::from(&[0, 0, 1]))))
1262    /// );
1263    /// assert_eq!(
1264    ///     r.next(),
1265    ///     Some(Ok((IVec::from(&[0, 0, 2]), IVec::from(&[0, 0, 2]))))
1266    /// );
1267    /// assert_eq!(
1268    ///     r.next(),
1269    ///     Some(Ok((IVec::from(&[0, 0, 3]), IVec::from(&[0, 0, 3]))))
1270    /// );
1271    /// assert_eq!(r.next(), None);
1272    /// # Ok(()) }
1273    /// ```
1274    pub fn scan_prefix<P>(&self, prefix: P) -> Iter
1275    where
1276        P: AsRef<[u8]>,
1277    {
1278        let prefix_ref = prefix.as_ref();
1279        let mut upper = prefix_ref.to_vec();
1280
1281        while let Some(last) = upper.pop() {
1282            if last < u8::max_value() {
1283                upper.push(last + 1);
1284                return self.range(prefix_ref..&upper);
1285            }
1286        }
1287
1288        self.range(prefix..)
1289    }
1290
1291    /// Returns the first key and value in the `Tree`, or
1292    /// `None` if the `Tree` is empty.
1293    pub fn first(&self) -> Result<Option<(IVec, IVec)>> {
1294        self.iter().next().transpose()
1295    }
1296
1297    /// Returns the last key and value in the `Tree`, or
1298    /// `None` if the `Tree` is empty.
1299    pub fn last(&self) -> Result<Option<(IVec, IVec)>> {
1300        self.iter().next_back().transpose()
1301    }
1302
1303    /// Atomically removes the maximum item in the `Tree` instance.
1304    ///
1305    /// # Examples
1306    ///
1307    /// ```
1308    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1309    /// # let config = sled::Config::new().temporary(true);
1310    /// # let db = config.open()?;
1311    /// db.insert(&[0], vec![0])?;
1312    /// db.insert(&[1], vec![10])?;
1313    /// db.insert(&[2], vec![20])?;
1314    /// db.insert(&[3], vec![30])?;
1315    /// db.insert(&[4], vec![40])?;
1316    /// db.insert(&[5], vec![50])?;
1317    ///
1318    /// assert_eq!(&db.pop_max()?.unwrap().0, &[5]);
1319    /// assert_eq!(&db.pop_max()?.unwrap().0, &[4]);
1320    /// assert_eq!(&db.pop_max()?.unwrap().0, &[3]);
1321    /// assert_eq!(&db.pop_max()?.unwrap().0, &[2]);
1322    /// assert_eq!(&db.pop_max()?.unwrap().0, &[1]);
1323    /// assert_eq!(&db.pop_max()?.unwrap().0, &[0]);
1324    /// assert_eq!(db.pop_max()?, None);
1325    /// # Ok(()) }
1326    /// ```
1327    pub fn pop_max(&self) -> Result<Option<(IVec, IVec)>> {
1328        loop {
1329            if let Some(first_res) = self.iter().next_back() {
1330                let first = first_res?;
1331                if self
1332                    .compare_and_swap::<_, _, &[u8]>(
1333                        &first.0,
1334                        Some(&first.1),
1335                        None,
1336                    )?
1337                    .is_ok()
1338                {
1339                    return Ok(Some(first));
1340                }
1341            // try again
1342            } else {
1343                return Ok(None);
1344            }
1345        }
1346    }
1347
1348    /// Atomically removes the minimum item in the `Tree` instance.
1349    ///
1350    /// # Examples
1351    ///
1352    /// ```
1353    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1354    /// # let config = sled::Config::new().temporary(true);
1355    /// # let db = config.open()?;
1356    /// db.insert(&[0], vec![0])?;
1357    /// db.insert(&[1], vec![10])?;
1358    /// db.insert(&[2], vec![20])?;
1359    /// db.insert(&[3], vec![30])?;
1360    /// db.insert(&[4], vec![40])?;
1361    /// db.insert(&[5], vec![50])?;
1362    ///
1363    /// assert_eq!(&db.pop_min()?.unwrap().0, &[0]);
1364    /// assert_eq!(&db.pop_min()?.unwrap().0, &[1]);
1365    /// assert_eq!(&db.pop_min()?.unwrap().0, &[2]);
1366    /// assert_eq!(&db.pop_min()?.unwrap().0, &[3]);
1367    /// assert_eq!(&db.pop_min()?.unwrap().0, &[4]);
1368    /// assert_eq!(&db.pop_min()?.unwrap().0, &[5]);
1369    /// assert_eq!(db.pop_min()?, None);
1370    /// # Ok(()) }
1371    /// ```
1372    pub fn pop_min(&self) -> Result<Option<(IVec, IVec)>> {
1373        loop {
1374            if let Some(first_res) = self.iter().next() {
1375                let first = first_res?;
1376                if self
1377                    .compare_and_swap::<_, _, &[u8]>(
1378                        &first.0,
1379                        Some(&first.1),
1380                        None,
1381                    )?
1382                    .is_ok()
1383                {
1384                    return Ok(Some(first));
1385                }
1386            // try again
1387            } else {
1388                return Ok(None);
1389            }
1390        }
1391    }
1392
1393    /// Returns the number of elements in this tree.
1394    ///
1395    /// Beware: performs a full O(n) scan under the hood.
1396    ///
1397    /// # Examples
1398    ///
1399    /// ```
1400    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
1401    /// # let config = sled::Config::new().temporary(true);
1402    /// # let db = config.open()?;
1403    /// db.insert(b"a", vec![0]);
1404    /// db.insert(b"b", vec![1]);
1405    /// assert_eq!(db.len(), 2);
1406    /// # Ok(()) }
1407    /// ```
1408    pub fn len(&self) -> usize {
1409        self.iter().count()
1410    }
1411
1412    /// Returns `true` if the `Tree` contains no elements.
1413    pub fn is_empty(&self) -> bool {
1414        self.iter().next().is_none()
1415    }
1416
1417    /// Clears the `Tree`, removing all values.
1418    ///
1419    /// Note that this is not atomic.
1420    pub fn clear(&self) -> Result<()> {
1421        for k in self.iter().keys() {
1422            let key = k?;
1423            let _old = self.remove(key)?;
1424        }
1425        Ok(())
1426    }
1427
1428    /// Returns the name of the tree.
1429    pub fn name(&self) -> IVec {
1430        self.tree_id.clone()
1431    }
1432
1433    /// Returns the CRC32 of all keys and values
1434    /// in this Tree.
1435    ///
1436    /// This is O(N) and locks the underlying tree
1437    /// for the duration of the entire scan.
1438    pub fn checksum(&self) -> Result<u32> {
1439        let mut hasher = crc32fast::Hasher::new();
1440        let mut iter = self.iter();
1441        let _cc = concurrency_control::write();
1442        while let Some(kv_res) = iter.next_inner() {
1443            let (k, v) = kv_res?;
1444            hasher.update(&k);
1445            hasher.update(&v);
1446        }
1447        Ok(hasher.finalize())
1448    }
1449
1450    fn split_node<'g>(
1451        &self,
1452        view: &View<'g>,
1453        parent_view: &Option<View<'g>>,
1454        root_pid: PageId,
1455        guard: &'g Guard,
1456    ) -> Result<()> {
1457        trace!("splitting node {}", view.pid);
1458        // split node
1459        let (mut lhs, rhs) = view.deref().clone().split();
1460        let rhs_lo = rhs.lo.clone();
1461
1462        // install right side
1463        let (rhs_pid, rhs_ptr) = self.context.pagecache.allocate(rhs, guard)?;
1464
1465        // replace node, pointing next to installed right
1466        lhs.next = Some(NonZeroU64::new(rhs_pid).unwrap());
1467        let replace = self.context.pagecache.replace(
1468            view.pid,
1469            view.node_view.0,
1470            lhs,
1471            guard,
1472        )?;
1473        M.tree_child_split_attempt();
1474        if replace.is_err() {
1475            // if we failed, don't follow through with the
1476            // parent split or root hoist.
1477            let _new_stack = self
1478                .context
1479                .pagecache
1480                .free(rhs_pid, rhs_ptr, guard)?
1481                .expect("could not free allocated page");
1482            return Ok(());
1483        }
1484        M.tree_child_split_success();
1485
1486        // either install parent split or hoist root
1487        if let Some(parent_view) = parent_view {
1488            M.tree_parent_split_attempt();
1489            let mut parent: Node = parent_view.deref().clone();
1490            let split_applied = parent.parent_split(&rhs_lo, rhs_pid);
1491
1492            if !split_applied {
1493                // due to deep races, it's possible for the
1494                // parent to already have a node for this lo key.
1495                // if this is the case, we can skip the parent split
1496                // because it's probably going to fail anyway.
1497                return Ok(());
1498            }
1499
1500            let replace = self.context.pagecache.replace(
1501                parent_view.pid,
1502                parent_view.node_view.0,
1503                parent,
1504                guard,
1505            )?;
1506            if replace.is_ok() {
1507                M.tree_parent_split_success();
1508            } else {
1509                // Parent splits are an optimization
1510                // so we don't need to care if we
1511                // failed.
1512            }
1513        } else {
1514            let _ = self.root_hoist(root_pid, rhs_pid, rhs_lo, guard)?;
1515        }
1516
1517        Ok(())
1518    }
1519
1520    fn root_hoist(
1521        &self,
1522        from: PageId,
1523        to: PageId,
1524        at: IVec,
1525        guard: &Guard,
1526    ) -> Result<bool> {
1527        M.tree_root_split_attempt();
1528        // hoist new root, pointing to lhs & rhs
1529
1530        let new_root = Node::new_hoisted_root(from, at, to);
1531
1532        let (new_root_pid, new_root_ptr) =
1533            self.context.pagecache.allocate(new_root, guard)?;
1534        debug!("allocated pid {} in root_hoist", new_root_pid);
1535
1536        debug_delay();
1537
1538        let cas = self.context.pagecache.cas_root_in_meta(
1539            &self.tree_id,
1540            Some(from),
1541            Some(new_root_pid),
1542            guard,
1543        )?;
1544        if cas.is_ok() {
1545            debug!("root hoist from {} to {} successful", from, new_root_pid);
1546            M.tree_root_split_success();
1547
1548            // we spin in a cas loop because it's possible
1549            // 2 threads are at this point, and we don't want
1550            // to cause roots to diverge between meta and
1551            // our version.
1552            while self.root.compare_and_swap(from, new_root_pid, SeqCst) != from
1553            {
1554            }
1555
1556            Ok(true)
1557        } else {
1558            debug!(
1559                "root hoist from {} to {} failed: {:?}",
1560                from, new_root_pid, cas
1561            );
1562            let _new_stack = self
1563                .context
1564                .pagecache
1565                .free(new_root_pid, new_root_ptr, guard)?
1566                .expect("could not free allocated page");
1567
1568            Ok(false)
1569        }
1570    }
1571
1572    pub(crate) fn view_for_pid<'g>(
1573        &self,
1574        pid: PageId,
1575        guard: &'g Guard,
1576    ) -> Result<Option<View<'g>>> {
1577        loop {
1578            let node_view_opt = self.context.pagecache.get(pid, guard)?;
1579            // if let Some((tree_ptr, ref leaf, size)) = &frag_opt {
1580            if let Some(node_view) = &node_view_opt {
1581                let size = node_view.0.log_size();
1582                let view = View { node_view: *node_view, pid, size };
1583                if view.merging_child.is_some() {
1584                    self.merge_node(&view, view.merging_child.unwrap().get(), guard)?;
1585                } else {
1586                    return Ok(Some(view));
1587                }
1588            } else {
1589                return Ok(None);
1590            }
1591        }
1592    }
1593
1594    // Returns the traversal path, completing any observed
1595    // partially complete splits or merges along the way.
1596    //
1597    // We intentionally leave the cyclometric complexity
1598    // high because attempts to split it up have made
1599    // the inherent complexity of the operation more
1600    // challenging to understand.
1601    #[allow(clippy::cognitive_complexity)]
1602    pub(crate) fn view_for_key<'g, K>(
1603        &self,
1604        key: K,
1605        guard: &'g Guard,
1606    ) -> Result<View<'g>>
1607    where
1608        K: AsRef<[u8]>,
1609    {
1610        #[cfg(any(test, feature = "lock_free_delays"))]
1611        const MAX_LOOPS: usize = usize::max_value();
1612
1613        #[cfg(not(any(test, feature = "lock_free_delays")))]
1614        const MAX_LOOPS: usize = 1_000_000;
1615
1616        let _measure = Measure::new(&M.tree_traverse);
1617
1618        let mut cursor = self.root.load(Acquire);
1619        let mut root_pid = cursor;
1620        let mut parent_view = None;
1621        let mut unsplit_parent = None;
1622        let mut took_leftmost_branch = false;
1623
1624        macro_rules! retry {
1625            () => {
1626                trace!(
1627                    "retrying at line {} when cursor was {}",
1628                    line!(),
1629                    cursor
1630                );
1631                cursor = self.root.load(Acquire);
1632                root_pid = cursor;
1633                parent_view = None;
1634                unsplit_parent = None;
1635                took_leftmost_branch = false;
1636                continue;
1637            };
1638        }
1639
1640        for _ in 0..MAX_LOOPS {
1641            if cursor == u64::max_value() {
1642                // this collection has been explicitly removed
1643                return Err(Error::CollectionNotFound(self.tree_id.clone()));
1644            }
1645
1646            let node_opt = self.view_for_pid(cursor, guard)?;
1647
1648            let view = if let Some(view) = node_opt {
1649                view
1650            } else {
1651                retry!();
1652            };
1653
1654            // When we encounter a merge intention, we collaboratively help out
1655            if view.merging_child.is_some() {
1656                self.merge_node(&view, view.merging_child.unwrap().get(), guard)?;
1657                retry!();
1658            } else if view.merging {
1659                // we missed the parent merge intention due to a benign race,
1660                // so go around again and try to help out if necessary
1661                retry!();
1662            }
1663
1664            let overshot = key.as_ref() < view.lo.as_ref();
1665            let undershot =
1666                key.as_ref() >= view.hi.as_ref() && !view.hi.is_empty();
1667
1668            if overshot {
1669                // merge interfered, reload root and retry
1670                retry!();
1671            }
1672
1673            if view.should_split() {
1674                self.split_node(&view, &parent_view, root_pid, guard)?;
1675                retry!();
1676            }
1677
1678            if undershot {
1679                // half-complete split detect & completion
1680                cursor = view.next.expect(
1681                    "if our hi bound is not Inf (inity), \
1682                     we should have a right sibling",
1683                ).get();
1684                if unsplit_parent.is_none() && parent_view.is_some() {
1685                    unsplit_parent = parent_view.clone();
1686                } else if parent_view.is_none() && view.lo.is_empty() {
1687                    assert!(unsplit_parent.is_none());
1688                    assert_eq!(view.pid, root_pid);
1689                    // we have found a partially-split root
1690                    if self.root_hoist(
1691                        root_pid,
1692                        view.next.unwrap().get(),
1693                        view.hi.clone(),
1694                        guard,
1695                    )? {
1696                        M.tree_root_split_success();
1697                        retry!();
1698                    }
1699                }
1700                continue;
1701            } else if let Some(unsplit_parent) = unsplit_parent.take() {
1702                // we have found the proper page for
1703                // our cooperative parent split
1704                let mut parent: Node = unsplit_parent.deref().clone();
1705                let split_applied =
1706                    parent.parent_split(view.lo.as_ref(), cursor);
1707
1708                if !split_applied {
1709                    // due to deep races, it's possible for the
1710                    // parent to already have a node for this lo key.
1711                    // if this is the case, we can skip the parent split
1712                    // because it's probably going to fail anyway.
1713                    retry!();
1714                }
1715
1716                M.tree_parent_split_attempt();
1717                let replace = self.context.pagecache.replace(
1718                    unsplit_parent.pid,
1719                    unsplit_parent.node_view.0,
1720                    parent,
1721                    guard,
1722                )?;
1723                if replace.is_ok() {
1724                    M.tree_parent_split_success();
1725                }
1726            }
1727
1728            // detect whether a node is mergeable, and begin
1729            // the merge process.
1730            // NB we can never begin merging a node that is
1731            // the leftmost child of an index, because it
1732            // would be merged into a different index, which
1733            // would add considerable complexity to this already
1734            // fairly complex implementation.
1735            if view.should_merge() && !took_leftmost_branch {
1736                if let Some(ref mut parent) = parent_view {
1737                    assert!(parent.merging_child.is_none());
1738                    if parent.can_merge_child() {
1739                        let frag = Link::ParentMergeIntention(cursor);
1740
1741                        let link = self.context.pagecache.link(
1742                            parent.pid,
1743                            parent.node_view.0,
1744                            frag,
1745                            guard,
1746                        )?;
1747
1748                        if let Ok(new_parent_ptr) = link {
1749                            parent.node_view = NodeView(new_parent_ptr);
1750                            self.merge_node(parent, cursor, guard)?;
1751                            retry!();
1752                        }
1753                    }
1754                }
1755            }
1756
1757            if view.data.is_index() {
1758                let next = view.index_next_node(key.as_ref());
1759                took_leftmost_branch = next.0 == 0;
1760                parent_view = Some(view);
1761                cursor = next.1;
1762            } else {
1763                assert!(!overshot && !undershot);
1764                return Ok(view);
1765            }
1766        }
1767        panic!(
1768            "cannot find pid {} in view_for_key, looking for key {:?} in tree",
1769            cursor,
1770            key.as_ref(),
1771        );
1772    }
1773
1774    fn cap_merging_child<'g>(
1775        &'g self,
1776        child_pid: PageId,
1777        guard: &'g Guard,
1778    ) -> Result<Option<View<'g>>> {
1779        // Get the child node and try to install a `MergeCap` frag.
1780        // In case we succeed, we break, otherwise we try from the start.
1781        loop {
1782            let mut child_view = if let Some(child_view) =
1783                self.view_for_pid(child_pid, guard)?
1784            {
1785                child_view
1786            } else {
1787                // the child was already freed, meaning
1788                // somebody completed this whole loop already
1789                return Ok(None);
1790            };
1791
1792            if child_view.merging {
1793                trace!("child pid {} already merging", child_pid);
1794                return Ok(Some(child_view));
1795            }
1796
1797            let install_frag = self.context.pagecache.link(
1798                child_pid,
1799                child_view.node_view.0,
1800                Link::ChildMergeCap,
1801                guard,
1802            )?;
1803            match install_frag {
1804                Ok(new_ptr) => {
1805                    trace!("child pid {} merge capped", child_pid);
1806                    child_view.node_view = NodeView(new_ptr);
1807                    return Ok(Some(child_view));
1808                }
1809                Err(Some((_, _))) => {
1810                    trace!(
1811                        "child pid {} merge cap failed, retrying",
1812                        child_pid
1813                    );
1814                    continue;
1815                }
1816                Err(None) => {
1817                    trace!("child pid {} already freed", child_pid);
1818                    return Ok(None);
1819                }
1820            }
1821        }
1822    }
1823
1824    fn install_parent_merge<'g>(
1825        &self,
1826        parent_view: &View<'g>,
1827        child_pid: PageId,
1828        guard: &'g Guard,
1829    ) -> Result<bool> {
1830        let mut parent_view = Cow::Borrowed(parent_view);
1831        loop {
1832            let linked = self.context.pagecache.link(
1833                parent_view.pid,
1834                parent_view.node_view.0,
1835                Link::ParentMergeConfirm,
1836                guard,
1837            )?;
1838            match linked {
1839                Ok(_) => {
1840                    trace!(
1841                        "ParentMergeConfirm succeeded on parent pid {}, \
1842                         now freeing child pid {}",
1843                        parent_view.pid,
1844                        child_pid
1845                    );
1846                    return Ok(true);
1847                }
1848                Err(None) => {
1849                    trace!(
1850                        "ParentMergeConfirm \
1851                         failed on (now freed) parent pid {}",
1852                        parent_view.pid
1853                    );
1854                    return Ok(false);
1855                }
1856                Err(_) => {
1857                    let new_parent_view = if let Some(new_parent_view) =
1858                        self.view_for_pid(parent_view.pid, guard)?
1859                    {
1860                        trace!(
1861                            "failed to confirm merge \
1862                             on parent pid {}, trying again",
1863                            parent_view.pid
1864                        );
1865                        new_parent_view
1866                    } else {
1867                        trace!(
1868                            "failed to confirm merge \
1869                             on parent pid {}, which was freed",
1870                            parent_view.pid
1871                        );
1872                        return Ok(false);
1873                    };
1874
1875                    if new_parent_view.merging_child.map(NonZeroU64::get) != Some(child_pid) {
1876                        trace!(
1877                            "someone else must have already \
1878                             completed the merge, and now the \
1879                             merging child for parent pid {} is {:?}",
1880                            new_parent_view.pid,
1881                            new_parent_view.merging_child
1882                        );
1883                        return Ok(false);
1884                    }
1885
1886                    parent_view = Cow::Owned(new_parent_view);
1887                }
1888            }
1889        }
1890    }
1891
1892    pub(crate) fn merge_node<'g>(
1893        &self,
1894        parent_view: &View<'g>,
1895        child_pid: PageId,
1896        guard: &'g Guard,
1897    ) -> Result<()> {
1898        trace!(
1899            "merging child pid {} of parent pid {}",
1900            child_pid,
1901            parent_view.pid
1902        );
1903
1904        let child_view = if let Some(merging_child) =
1905            self.cap_merging_child(child_pid, guard)?
1906        {
1907            merging_child
1908        } else {
1909            return Ok(());
1910        };
1911
1912        let index = parent_view.data.index_ref().unwrap();
1913        let child_index =
1914            index.pointers.iter().position(|pid| pid == &child_pid).unwrap();
1915
1916        assert_ne!(
1917            child_index, 0,
1918            "merging child must not be the \
1919             leftmost child of its parent"
1920        );
1921
1922        let mut merge_index = child_index - 1;
1923
1924        // we assume caller only merges when
1925        // the node to be merged is not the
1926        // leftmost child.
1927        let mut cursor_pid = index.pointers[merge_index];
1928
1929        // searching for the left sibling to merge the target page into
1930        loop {
1931            // The only way this child could have been freed is if the original
1932            // merge has already been handled. Only in that case can this child
1933            // have been freed
1934            trace!(
1935                "cursor_pid is {} while looking for left sibling",
1936                cursor_pid
1937            );
1938            let cursor_view = if let Some(cursor_view) =
1939                self.view_for_pid(cursor_pid, guard)?
1940            {
1941                cursor_view
1942            } else {
1943                trace!(
1944                    "couldn't retrieve frags for freed \
1945                     (possibly outdated) prospective left \
1946                     sibling with pid {}",
1947                    cursor_pid
1948                );
1949
1950                if merge_index == 0 {
1951                    trace!(
1952                        "failed to find any left sibling for \
1953                         merging pid {}, which means this merge \
1954                         must have already completed.",
1955                        child_pid
1956                    );
1957                    return Ok(());
1958                }
1959
1960                merge_index -= 1;
1961                cursor_pid = index.pointers[merge_index];
1962
1963                continue;
1964            };
1965
1966            // This means that `cursor_node` is the node we want to replace
1967            if cursor_view.next.map(NonZeroU64::get) == Some(child_pid) {
1968                trace!(
1969                    "found left sibling pid {} points to merging node pid {}",
1970                    cursor_view.pid,
1971                    child_pid
1972                );
1973                let cursor_node = cursor_view.node_view;
1974
1975                let replacement = cursor_node.receive_merge(&child_view);
1976                let replace = self.context.pagecache.replace(
1977                    cursor_pid,
1978                    cursor_node.0,
1979                    replacement,
1980                    guard,
1981                )?;
1982                match replace {
1983                    Ok(_) => {
1984                        trace!(
1985                            "merged node pid {} into left sibling pid {}",
1986                            child_pid,
1987                            cursor_pid
1988                        );
1989                        break;
1990                    }
1991                    Err(None) => {
1992                        trace!(
1993                            "failed to merge pid {} into \
1994                             pid {} since pid {} doesn't exist anymore",
1995                            child_pid,
1996                            cursor_pid,
1997                            cursor_pid
1998                        );
1999                        return Ok(());
2000                    }
2001                    Err(_) => {
2002                        trace!(
2003                            "failed to merge pid {} into \
2004                             pid {} due to cas failure",
2005                            child_pid,
2006                            cursor_pid
2007                        );
2008                        continue;
2009                    }
2010                }
2011            } else if cursor_view.hi >= child_view.lo {
2012                // we overshot the node being merged,
2013                trace!(
2014                    "cursor pid {} has hi key {:?}, which is \
2015                     >= merging child pid {}'s lo key of {:?}, breaking",
2016                    cursor_pid,
2017                    cursor_view.hi,
2018                    child_pid,
2019                    child_view.lo
2020                );
2021                break;
2022            } else {
2023                // In case we didn't find the child, we get the next cursor node
2024                if let Some(next) = cursor_view.next {
2025                    trace!(
2026                        "traversing from cursor pid {} to right sibling pid {}",
2027                        cursor_pid,
2028                        next
2029                    );
2030                    cursor_pid = next.get();
2031                } else {
2032                    trace!(
2033                        "hit the right side of the tree without finding \
2034                         a left sibling for merging child pid {}",
2035                        child_pid
2036                    );
2037                    break;
2038                }
2039            }
2040        }
2041
2042        trace!(
2043            "trying to install parent merge \
2044             confirmation of merged child pid {} for parent pid {}",
2045            child_pid,
2046            parent_view.pid
2047        );
2048
2049        let should_continue =
2050            self.install_parent_merge(parent_view, child_pid, guard)?;
2051
2052        if !should_continue {
2053            return Ok(());
2054        }
2055
2056        match self.context.pagecache.free(
2057            child_pid,
2058            child_view.node_view.0,
2059            guard,
2060        )? {
2061            Ok(_) => {
2062                // we freed it
2063                trace!("freed merged pid {}", child_pid);
2064            }
2065            Err(None) => {
2066                // someone else freed it
2067                trace!("someone else freed merged pid {}", child_pid);
2068            }
2069            Err(Some(_)) => {
2070                trace!(
2071                    "someone was able to reuse freed merged pid {}",
2072                    child_pid
2073                );
2074                // it was reused somehow after we
2075                // observed it as in the merging state
2076                panic!(
2077                    "somehow the merging child was reused \
2078                     before all threads that witnessed its previous \
2079                     merge have left their epoch"
2080                )
2081            }
2082        }
2083
2084        trace!("finished with merge of pid {}", child_pid);
2085        Ok(())
2086    }
2087
2088    // Remove all pages for this tree from the underlying
2089    // PageCache. This will leave orphans behind if
2090    // the tree crashes during gc.
2091    pub(crate) fn gc_pages(
2092        &self,
2093        mut leftmost_chain: Vec<PageId>,
2094    ) -> Result<()> {
2095        let guard = pin();
2096
2097        while let Some(mut pid) = leftmost_chain.pop() {
2098            loop {
2099                let cursor_view =
2100                    if let Some(view) = self.view_for_pid(pid, &guard)? {
2101                        view
2102                    } else {
2103                        trace!("encountered Free node while GC'ing tree");
2104                        break;
2105                    };
2106
2107                let ret = self.context.pagecache.free(
2108                    pid,
2109                    cursor_view.node_view.0,
2110                    &guard,
2111                )?;
2112
2113                if ret.is_ok() {
2114                    let next_pid = if let Some(next_pid) = cursor_view.next {
2115                        next_pid
2116                    } else {
2117                        break;
2118                    };
2119                    assert_ne!(pid, next_pid.get());
2120                    pid = next_pid.get();
2121                }
2122            }
2123        }
2124
2125        Ok(())
2126    }
2127
2128    #[doc(hidden)]
2129    pub fn verify_integrity(&self) -> Result<()> {
2130        // verification happens in Debug impl
2131        let _out = format!("{:?}", self);
2132        Ok(())
2133    }
2134}
2135
2136impl Debug for Tree {
2137    fn fmt(
2138        &self,
2139        f: &mut fmt::Formatter<'_>,
2140    ) -> std::result::Result<(), fmt::Error> {
2141        let guard = pin();
2142
2143        let mut pid = self.root.load(Acquire);
2144        let mut left_most = pid;
2145        let mut level = 0;
2146        let mut expected_pids = FastSet8::default();
2147        let mut referenced_pids = FastSet8::default();
2148        let mut loop_detector = FastSet8::default();
2149
2150        expected_pids.insert(pid);
2151
2152        f.write_str("Tree: \n\t")?;
2153        self.context.pagecache.fmt(f)?;
2154        f.write_str("\tlevel 0:\n")?;
2155
2156        loop {
2157            let get_res = self.view_for_pid(pid, &guard);
2158            let node = if let Ok(Some(ref view)) = get_res {
2159                expected_pids.remove(&pid);
2160                if loop_detector.contains(&pid) {
2161                    if cfg!(feature = "testing") {
2162                        panic!(
2163                            "detected a loop while iterating over the Tree. \
2164                            pid {} was encountered multiple times",
2165                            pid
2166                        );
2167                    } else {
2168                        error!(
2169                            "detected a loop while iterating over the Tree. \
2170                            pid {} was encountered multiple times",
2171                            pid
2172                        );
2173                    }
2174                } else {
2175                    loop_detector.insert(pid);
2176                }
2177
2178                view.deref()
2179            } else {
2180                if cfg!(feature = "testing") {
2181                    panic!(
2182                        "Tree::fmt failed to read node {} \
2183                         that has been freed",
2184                        pid,
2185                    );
2186                } else {
2187                    error!(
2188                        "Tree::fmt failed to read node {} \
2189                         that has been freed",
2190                        pid,
2191                    );
2192                }
2193                break;
2194            };
2195
2196            write!(f, "\t\t{}: ", pid)?;
2197            node.fmt(f)?;
2198            f.write_str("\n")?;
2199
2200            if let Data::Index(ref index) = &node.data {
2201                for pid in &index.pointers {
2202                    referenced_pids.insert(*pid);
2203                }
2204            }
2205
2206            if let Some(next_pid) = node.next {
2207                pid = next_pid.get();
2208            } else {
2209                // we've traversed our level, time to bump down
2210                let left_get_res = self.view_for_pid(left_most, &guard);
2211                let left_node = if let Ok(Some(ref view)) = left_get_res {
2212                    view
2213                } else {
2214                    panic!(
2215                        "pagecache returned non-base node: {:?}",
2216                        left_get_res
2217                    )
2218                };
2219
2220                match &left_node.data {
2221                    Data::Index(index) => {
2222                        if let Some(next_pid) = index.pointers.first() {
2223                            pid = *next_pid;
2224                            left_most = *next_pid;
2225                            level += 1;
2226                            f.write_str(&*format!("\n\tlevel {}:\n", level))?;
2227                            assert!(
2228                                expected_pids.is_empty(),
2229                                "expected pids {:?} but never \
2230                                saw them on this level",
2231                                expected_pids
2232                            );
2233                            std::mem::swap(&mut expected_pids, &mut referenced_pids);
2234                        } else {
2235                            panic!("trying to debug print empty index node");
2236                        }
2237                    }
2238                    Data::Leaf(_items) => {
2239                        // we've reached the end of our tree, all leafs are on
2240                        // the lowest level.
2241                        break;
2242                    }
2243                }
2244            }
2245        }
2246
2247        Ok(())
2248    }
2249}
2250
2251/// Compare and swap result.
2252///
2253/// It returns `Ok(Ok(()))` if operation finishes successfully and
2254///     - `Ok(Err(CompareAndSwapError(current, proposed)))` if operation failed
2255///       to setup a new value. `CompareAndSwapError` contains current and
2256///       proposed values.
2257///     - `Err(Error::Unsupported)` if the database is opened in read-only mode.
2258///       otherwise.
2259pub type CompareAndSwapResult =
2260    Result<std::result::Result<(), CompareAndSwapError>>;
2261
2262impl From<Error> for CompareAndSwapResult {
2263    fn from(error: Error) -> Self {
2264        Err(error)
2265    }
2266}
2267
2268/// Compare and swap error.
2269#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
2270pub struct CompareAndSwapError {
2271    /// The current value which caused your CAS to fail.
2272    pub current: Option<IVec>,
2273    /// Returned value that was proposed unsuccessfully.
2274    pub proposed: Option<IVec>,
2275}
2276
2277impl fmt::Display for CompareAndSwapError {
2278    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2279        write!(f, "Compare and swap conflict")
2280    }
2281}
2282
2283impl std::error::Error for CompareAndSwapError {}