sled 0.18.1

a modern embedded database
Documentation
use std::{
    borrow::Cow,
    cmp::Ordering::{Greater, Less},
    fmt::{self, Debug},
    ops::{self, RangeBounds},
    sync::Arc,
};

use pagecache::PagePtr;

use super::*;

type Path<'g> = Vec<(&'g Frag, TreePtr<'g>)>;

impl<'a> IntoIterator for &'a Tree {
    type Item = Result<(Vec<u8>, PinnedValue), ()>;
    type IntoIter = Iter<'a>;

    fn into_iter(self) -> Iter<'a> {
        self.iter()
    }
}

/// A flash-sympathetic persistent lock-free B+ tree
///
/// # Examples
///
/// ```
/// let t = sled::Db::start_default("path_to_my_database").unwrap();
///
/// t.set(b"yo!", b"v1".to_vec());
/// assert!(t.get(b"yo!").unwrap().unwrap() == &*b"v1".to_vec());
///
/// t.cas(
///     b"yo!",                // key
///     Some(b"v1"),           // old value, None for not present
///     Some(b"v2".to_vec()),  // new value, None for delete
/// ).unwrap();
///
/// let mut iter = t.scan(b"a non-present key before yo!");
/// // assert_eq!(iter.next(), Some(Ok((b"yo!".to_vec(), b"v2".to_vec()))));
/// // assert_eq!(iter.next(), None);
///
/// t.del(b"yo!");
/// assert_eq!(t.get(b"yo!"), Ok(None));
/// ```
#[derive(Clone)]
pub struct Tree {
    pub(crate) tree_id: Vec<u8>,
    pub(crate) config: Config,
    pub(crate) subscriptions: Arc<Subscriptions>,
    pub(crate) pages:
        Arc<PageCache<BLinkMaterializer, Frag, Recovery>>,
}

unsafe impl Send for Tree {}

unsafe impl Sync for Tree {}

impl Tree {
    /// Subscribe to `Event`s that happen to keys that have
    /// the specified prefix. Events for particular keys are
    /// guaranteed to be witnessed in the same order by all
    /// threads, but threads may witness different interleavings
    /// of `Event`s across different keys. If subscribers don't
    /// keep up with new writes, they will cause new writes
    /// to block. There is a buffer of 1024 items per
    /// `Subscriber`. This can be used to build reactive
    /// and replicated systems.
    ///
    /// # Examples
    /// ```
    /// use sled::{Event, ConfigBuilder};
    /// let config = ConfigBuilder::new().temporary(true).build();
    ///
    /// let tree = sled::Db::start(config).unwrap();
    ///
    /// // watch all events by subscribing to the empty prefix
    /// let mut events = tree.watch_prefix(vec![]);
    ///
    /// let tree_2 = tree.clone();
    /// let thread = std::thread::spawn(move || {
    ///     tree.set(vec![0], vec![1]).unwrap();
    /// });
    ///
    /// // events is a blocking `Iterator` over `Event`s
    /// for event in events.take(1) {
    ///     match event {
    ///         Event::Set(key, value) => assert_eq!(key, vec![0]),
    ///         Event::Merge(key, partial_value) => {}
    ///         Event::Del(key) => {}
    ///     }
    /// }
    ///
    /// thread.join().unwrap();
    /// ```
    pub fn watch_prefix(&self, prefix: Vec<u8>) -> Subscriber {
        self.subscriptions.register(prefix)
    }

    /// Clears the `Tree`, removing all values.
    ///
    /// Note that this is not atomic.
    pub fn clear(&self) -> Result<(), ()> {
        for k in self.keys(b"") {
            let key = k?;
            self.del(key)?;
        }
        Ok(())
    }

    /// Flushes all dirty IO buffers and calls fsync.
    /// If this succeeds, it is guaranteed that
    /// all previous writes will be recovered if
    /// the system crashes.
    pub fn flush(&self) -> Result<(), ()> {
        self.pages.flush()
    }

    /// Returns `true` if the `Tree` contains a value for
    /// the specified key.
    pub fn contains_key<K: AsRef<[u8]>>(
        &self,
        key: K,
    ) -> Result<bool, ()> {
        self.get(key).map(|r| r.is_some())
    }

    /// Retrieve a value from the `Tree` if it exists.
    pub fn get<K: AsRef<[u8]>>(
        &self,
        key: K,
    ) -> Result<Option<PinnedValue>, ()> {
        let _measure = Measure::new(&M.tree_get);

        let guard = pin();
        let pin_guard = pin();

        let (_, ret) = self.get_internal(key.as_ref(), &guard)?;

        guard.flush();

        Ok(ret.map(|r| PinnedValue::new(r, pin_guard)))
    }

    /// Retrieve the key and value before the provided key,
    /// if one exists.
    ///
    /// # Examples
    ///
    /// ```
    /// use sled::{ConfigBuilder, Error};
    /// let config = ConfigBuilder::new().temporary(true).build();
    ///
    /// let tree = sled::Db::start(config).unwrap();
    ///
    /// for i in 0..10 {
    ///     tree.set(vec![i], vec![i]).expect("should write successfully");
    /// }
    ///
    /// assert!(tree.get_lt(vec![]).unwrap().is_none());
    /// assert!(tree.get_lt(vec![0]).unwrap().is_none());
    /// assert!(tree.get_lt(vec![1]).unwrap().unwrap().1 == vec![0]);
    /// assert!(tree.get_lt(vec![9]).unwrap().unwrap().1 == vec![8]);
    /// assert!(tree.get_lt(vec![10]).unwrap().unwrap().1 == vec![9]);
    /// assert!(tree.get_lt(vec![255]).unwrap().unwrap().1 == vec![9]);
    /// ```
    pub fn get_lt<K: AsRef<[u8]>>(
        &self,
        key: K,
    ) -> Result<Option<(Key, PinnedValue)>, ()> {
        let _measure = Measure::new(&M.tree_get);

        // the double guard is a hack that maintains
        // correctness of the ret value
        let guard = pin();
        let pin_guard = pin();

        let path = self.path_for_key(key.as_ref(), &guard)?;
        let (last_frag, _tree_ptr) = path
            .last()
            .expect("path should always contain a last element");

        let last_node = last_frag.unwrap_base();
        let data = &last_node.data;
        let items =
            data.leaf_ref().expect("last_node should be a leaf");
        let search = leaf_search(Less, items, |&(ref k, ref _v)| {
            prefix_cmp_encoded(k, key.as_ref(), &last_node.lo)
        });

        let ret = if search.is_none() {
            let mut iter = self.range((
                std::ops::Bound::Unbounded,
                std::ops::Bound::Excluded(key),
            ));

            match iter.next_back() {
                Some(Err(e)) => return Err(e),
                Some(Ok(pair)) => Some(pair),
                None => None,
            }
        } else {
            let idx = search.unwrap();
            let (encoded_key, v) = &items[idx];
            Some((
                prefix_decode(&last_node.lo, &*encoded_key),
                PinnedValue::new(&*v, pin_guard),
            ))
        };

        guard.flush();

        Ok(ret)
    }

    /// Retrieve the next key and value from the `Tree` after the
    /// provided key.
    ///
    /// # Examples
    ///
    /// ```
    /// use sled::{ConfigBuilder, Error};
    /// let config = ConfigBuilder::new().temporary(true).build();
    ///
    /// let tree = sled::Db::start(config).unwrap();
    ///
    /// for i in 0..10 {
    ///     tree.set(vec![i], vec![i]).expect("should write successfully");
    /// }
    ///
    /// assert!(tree.get_gt(vec![]).unwrap().unwrap().1 == vec![0]);
    /// assert!(tree.get_gt(vec![0]).unwrap().unwrap().1 == vec![1]);
    /// assert!(tree.get_gt(vec![1]).unwrap().unwrap().1 == vec![2]);
    /// assert!(tree.get_gt(vec![8]).unwrap().unwrap().1 == vec![9]);
    /// assert!(tree.get_gt(vec![9]).unwrap().is_none());
    /// ```
    pub fn get_gt<K: AsRef<[u8]>>(
        &self,
        key: K,
    ) -> Result<Option<(Key, PinnedValue)>, ()> {
        let _measure = Measure::new(&M.tree_get);

        let guard = pin();
        let pin_guard = pin();

        let path = self.path_for_key(key.as_ref(), &guard)?;
        let (last_frag, _tree_ptr) = path
            .last()
            .expect("path should always contain a last element");

        let last_node = last_frag.unwrap_base();
        let data = &last_node.data;
        let items =
            data.leaf_ref().expect("last_node should be a leaf");
        let search =
            leaf_search(Greater, items, |&(ref k, ref _v)| {
                prefix_cmp_encoded(k, key.as_ref(), &last_node.lo)
            });

        let ret = if search.is_none() {
            let mut iter = self.range((
                std::ops::Bound::Excluded(key),
                std::ops::Bound::Unbounded,
            ));

            match iter.next() {
                Some(Err(e)) => return Err(e),
                Some(Ok(pair)) => Some(pair),
                None => None,
            }
        } else {
            let idx = search.unwrap();
            let (encoded_key, v) = &items[idx];
            Some((
                prefix_decode(&last_node.lo, &*encoded_key),
                PinnedValue::new(&*v, pin_guard),
            ))
        };

        guard.flush();

        Ok(ret)
    }

    /// Compare and swap. Capable of unique creation, conditional modification,
    /// or deletion. If old is None, this will only set the value if it doesn't
    /// exist yet. If new is None, will delete the value if old is correct.
    /// If both old and new are Some, will modify the value if old is correct.
    /// If Tree is read-only, will do nothing.
    ///
    /// # Examples
    ///
    /// ```
    /// use sled::{ConfigBuilder, Error};
    /// let config = ConfigBuilder::new().temporary(true).build();
    /// let t = sled::Db::start(config).unwrap();
    ///
    /// // unique creation
    /// assert_eq!(t.cas(&[1], None, Some(vec![1])), Ok(()));
    /// // assert_eq!(t.cas(&[1], None, Some(vec![1])), Err(Error::CasFailed(Some(vec![1]))));
    ///
    /// // conditional modification
    /// assert_eq!(t.cas(&[1], Some(&*vec![1]), Some(vec![2])), Ok(()));
    /// // assert_eq!(t.cas(&[1], Some(vec![1]), Some(vec![2])), Err(Error::CasFailed(Some(vec![2]))));
    ///
    /// // conditional deletion
    /// assert_eq!(t.cas(&[1], Some(&[2]), None), Ok(()));
    /// assert_eq!(t.get(&[1]), Ok(None));
    /// ```
    pub fn cas<K: AsRef<[u8]>>(
        &self,
        key: K,
        old: Option<&[u8]>,
        new: Option<Value>,
    ) -> Result<(), Option<PinnedValue>> {
        trace!("casing key {:?}", key.as_ref());
        let _measure = Measure::new(&M.tree_cas);

        if self.config.read_only {
            return Err(Error::CasFailed(None));
        }

        let guard = pin();

        let new_ivec = if let Some(ref n) = new {
            Some(IVec::from(&**n))
        } else {
            None
        };

        // we need to retry caps until old != cur, since just because
        // cap fails it doesn't mean our value was changed.
        loop {
            let pin_guard = pin();
            let (mut path, cur) = self
                .get_internal(key.as_ref(), &guard)
                .map_err(|e| e.danger_cast())?;

            if old != cur.map(|v| &*v) {
                return Err(Error::CasFailed(
                    cur.map(|c| PinnedValue::new(c, pin_guard)),
                ));
            }

            let mut subscriber_reservation =
                self.subscriptions.reserve(&key);

            let (leaf_frag, leaf_ptr) = path.pop().expect(
                "get_internal somehow returned a path of length zero",
            );

            let (node_id, encoded_key) = {
                let node: &Node = leaf_frag.unwrap_base();
                (node.id, prefix_encode(&node.lo, key.as_ref()))
            };
            let frag = if let Some(ref n) = new_ivec {
                Frag::Set(encoded_key, n.clone())
            } else {
                Frag::Del(encoded_key)
            };
            let link =
                self.pages.link(node_id, leaf_ptr, frag, &guard);
            match link {
                Ok(_) => {
                    if let Some(res) = subscriber_reservation.take() {
                        let event = if let Some(n) = new {
                            subscription::Event::Set(
                                key.as_ref().to_vec(),
                                n.clone(),
                            )
                        } else {
                            subscription::Event::Del(
                                key.as_ref().to_vec(),
                            )
                        };

                        res.complete(event);
                    }

                    guard.flush();
                    return Ok(());
                }
                Err(Error::CasFailed(_)) => {}
                Err(other) => {
                    guard.flush();
                    return Err(other.danger_cast());
                }
            }
            M.tree_looped();
        }
    }

    /// Set a key to a new value, returning the old value if it
    /// was set.
    pub fn set<K: AsRef<[u8]>>(
        &self,
        key: K,
        value: Value,
    ) -> Result<Option<PinnedValue>, ()> {
        trace!("setting key {:?}", key.as_ref());
        let _measure = Measure::new(&M.tree_set);

        if self.config.read_only {
            return Err(Error::Unsupported(
                "the database is in read-only mode".to_owned(),
            ));
        }

        let guard = pin();

        let val_ivec: IVec = IVec::from(&*value);

        loop {
            let pin_guard = pin();
            let (mut path, existing_key) =
                self.get_internal(key.as_ref(), &guard)?;
            let (leaf_frag, leaf_ptr) = path.pop().expect(
                "path_for_key should always return a path \
                 of length >= 2 (root + leaf)",
            );
            let node: &Node = leaf_frag.unwrap_base();
            let encoded_key = prefix_encode(&node.lo, key.as_ref());

            let mut subscriber_reservation =
                self.subscriptions.reserve(&key);

            let frag = Frag::Set(encoded_key, val_ivec.clone());
            let link = self.pages.link(
                node.id,
                leaf_ptr.clone(),
                frag.clone(),
                &guard,
            );
            match link {
                Ok(new_cas_key) => {
                    // success
                    if let Some(res) = subscriber_reservation.take() {
                        let event = subscription::Event::Set(
                            key.as_ref().to_vec(),
                            value.clone(),
                        );

                        res.complete(event);
                    }

                    if node.should_split(
                        self.config.blink_node_split_size as u64,
                    ) {
                        let mut path2 = path
                            .iter()
                            .map(|&(f, ref p)| {
                                (Cow::Borrowed(f), p.clone())
                            })
                            .collect::<Vec<(Cow<'_, Frag>, _)>>();
                        let mut node2 = node.clone();
                        node2
                            .apply(&frag, self.config.merge_operator);
                        let frag2 = Cow::Owned(Frag::Base(node2));
                        path2.push((frag2, new_cas_key));
                        self.recursive_split(path2, &guard)?;
                    }

                    guard.flush();

                    return Ok(existing_key.map(move |r| {
                        PinnedValue::new(r, pin_guard)
                    }));
                }
                Err(Error::CasFailed(_)) => {}
                Err(other) => {
                    guard.flush();

                    return Err(other.danger_cast());
                }
            }
            M.tree_looped();
        }
    }

    /// Delete a value, returning the last result if it existed.
    ///
    /// # Examples
    ///
    /// ```
    /// let config = sled::ConfigBuilder::new().temporary(true).build();
    /// let t = sled::Db::start(config).unwrap();
    /// t.set(&[1], vec![1]);
    /// assert_eq!(t.del(&*vec![1]).unwrap().unwrap(), vec![1]);
    /// assert_eq!(t.del(&*vec![1]), Ok(None));
    /// ```
    pub fn del<K: AsRef<[u8]>>(
        &self,
        key: K,
    ) -> Result<Option<PinnedValue>, ()> {
        let _measure = Measure::new(&M.tree_del);

        if self.config.read_only {
            return Ok(None);
        }

        let guard = pin();
        let pin_guard = pin();

        loop {
            let (mut path, existing_key) =
                self.get_internal(key.as_ref(), &guard)?;

            let mut subscriber_reservation =
                self.subscriptions.reserve(&key);

            let (leaf_frag, leaf_ptr) = path.pop().expect(
                "path_for_key should always return a path \
                 of length >= 2 (root + leaf)",
            );
            let node: &Node = leaf_frag.unwrap_base();
            let encoded_key = prefix_encode(&node.lo, key.as_ref());

            let frag = Frag::Del(encoded_key);
            let link = self.pages.link(
                node.id,
                leaf_ptr.clone(),
                frag,
                &guard,
            );

            match link {
                Ok(_) => {
                    // success
                    if let Some(res) = subscriber_reservation.take() {
                        let event = subscription::Event::Del(
                            key.as_ref().to_vec(),
                        );

                        res.complete(event);
                    }

                    guard.flush();
                    return Ok(existing_key.map(move |r| {
                        PinnedValue::new(r, pin_guard)
                    }));
                }
                Err(Error::CasFailed(_)) => {
                    M.tree_looped();
                    continue;
                }
                Err(other) => return Err(other.danger_cast()),
            }
        }
    }

    /// Merge state directly into a given key's value using the
    /// configured merge operator. This allows state to be written
    /// into a value directly, without any read-modify-write steps.
    /// Merge operators can be used to implement arbitrary data
    /// structures.
    ///
    /// # Panics
    ///
    /// Calling `merge` will panic if no merge operator has been
    /// configured.
    ///
    /// # Examples
    ///
    /// ```
    /// fn concatenate_merge(
    ///   _key: &[u8],               // the key being merged
    ///   old_value: Option<&[u8]>,  // the previous value, if one existed
    ///   merged_bytes: &[u8]        // the new bytes being merged in
    /// ) -> Option<Vec<u8>> {       // set the new value, return None to delete
    ///   let mut ret = old_value
    ///     .map(|ov| ov.to_vec())
    ///     .unwrap_or_else(|| vec![]);
    ///
    ///   ret.extend_from_slice(merged_bytes);
    ///
    ///   Some(ret)
    /// }
    ///
    /// let config = sled::ConfigBuilder::new()
    ///   .temporary(true)
    ///   .merge_operator(concatenate_merge)
    ///   .build();
    ///
    /// let tree = sled::Db::start(config).unwrap();
    ///
    /// let k = b"k1";
    ///
    /// tree.set(k, vec![0]);
    /// tree.merge(k, vec![1]);
    /// tree.merge(k, vec![2]);
    /// // assert_eq!(tree.get(k).unwrap().unwrap(), vec![0, 1, 2]);
    ///
    /// // sets replace previously merged data,
    /// // bypassing the merge function.
    /// tree.set(k, vec![3]);
    /// // assert_eq!(tree.get(k), Ok(Some(vec![3])));
    ///
    /// // merges on non-present values will add them
    /// tree.del(k);
    /// tree.merge(k, vec![4]);
    /// // assert_eq!(tree.get(k).unwrap().unwrap(), vec![4]);
    /// ```
    pub fn merge<K: AsRef<[u8]>>(
        &self,
        key: K,
        value: Value,
    ) -> Result<(), ()> {
        trace!("merging key {:?}", key.as_ref());
        let _measure = Measure::new(&M.tree_merge);

        if self.config.read_only {
            return Err(Error::Unsupported(
                "the database is in read-only mode".to_owned(),
            ));
        }

        let guard = pin();

        let val_ivec = IVec::from(&*value);

        loop {
            let mut path = self.path_for_key(key.as_ref(), &guard)?;
            let (leaf_frag, leaf_ptr) = path.pop().expect(
                "path_for_key should always return a path \
                 of length >= 2 (root + leaf)",
            );
            let node: &Node = leaf_frag.unwrap_base();

            let mut subscriber_reservation =
                self.subscriptions.reserve(&key);

            let encoded_key = prefix_encode(&node.lo, key.as_ref());
            let frag = Frag::Merge(encoded_key, val_ivec.clone());

            let link = self.pages.link(
                node.id,
                leaf_ptr.clone(),
                frag.clone(),
                &guard,
            );
            match link {
                Ok(new_cas_key) => {
                    // success
                    if let Some(res) = subscriber_reservation.take() {
                        let event = subscription::Event::Merge(
                            key.as_ref().to_vec(),
                            value.clone(),
                        );

                        res.complete(event);
                    }
                    if node.should_split(
                        self.config.blink_node_split_size as u64,
                    ) {
                        let mut path2 = path
                            .iter()
                            .map(|&(f, ref p)| {
                                (Cow::Borrowed(f), p.clone())
                            })
                            .collect::<Vec<(Cow<'_, Frag>, _)>>();
                        let mut node2 = node.clone();
                        node2
                            .apply(&frag, self.config.merge_operator);
                        let frag2 = Cow::Owned(Frag::Base(node2));
                        path2.push((frag2, new_cas_key));
                        self.recursive_split(path2, &guard)?;
                    }
                    guard.flush();
                    return Ok(());
                }
                Err(Error::CasFailed(_)) => {}
                Err(other) => {
                    guard.flush();
                    return Err(other.danger_cast());
                }
            }
            M.tree_looped();
        }
    }

    /// Create a double-ended iterator over the tuples of keys and
    /// values in this tree.
    ///
    /// # Examples
    ///
    /// ```
    /// let config = sled::ConfigBuilder::new().temporary(true).build();
    /// let t = sled::Db::start(config).unwrap();
    /// t.set(&[1], vec![10]);
    /// t.set(&[2], vec![20]);
    /// t.set(&[3], vec![30]);
    /// let mut iter = t.iter();
    /// // assert_eq!(iter.next(), Some(Ok((vec![1], vec![10]))));
    /// // assert_eq!(iter.next(), Some(Ok((vec![2], vec![20]))));
    /// // assert_eq!(iter.next(), Some(Ok((vec![3], vec![30]))));
    /// // assert_eq!(iter.next(), None);
    /// ```
    pub fn iter(&self) -> Iter<'_> {
        self.range::<Vec<u8>, _>(..)
    }

    /// Create a double-ended iterator over tuples of keys and values,
    /// starting at the provided key.
    ///
    /// # Examples
    ///
    /// ```
    /// let config = sled::ConfigBuilder::new()
    ///     .temporary(true)
    ///     .build();
    /// let t = sled::Db::start(config).unwrap();
    ///
    /// t.set(b"0", vec![0]).unwrap();
    /// t.set(b"1", vec![10]).unwrap();
    /// t.set(b"2", vec![20]).unwrap();
    /// t.set(b"3", vec![30]).unwrap();
    /// t.set(b"4", vec![40]).unwrap();
    /// t.set(b"5", vec![50]).unwrap();
    ///
    /// let mut r = t.scan(b"2");
    /// assert_eq!(r.next().unwrap().unwrap().0, b"2");
    /// assert_eq!(r.next().unwrap().unwrap().0, b"3");
    /// assert_eq!(r.next().unwrap().unwrap().0, b"4");
    /// assert_eq!(r.next().unwrap().unwrap().0, b"5");
    /// assert_eq!(r.next(), None);

    /// let mut r = t.scan(b"2").rev();
    /// assert_eq!(r.next().unwrap().unwrap().0, b"2");
    /// assert_eq!(r.next().unwrap().unwrap().0, b"1");
    /// assert_eq!(r.next().unwrap().unwrap().0, b"0");
    /// assert_eq!(r.next(), None);
    /// ```
    pub fn scan<K>(&self, key: K) -> Iter<'_>
    where
        K: AsRef<[u8]>,
    {
        let mut iter = self.range(key..);
        iter.is_scan = true;
        iter
    }

    /// Create a double-ended iterator over tuples of keys and values,
    /// where the keys fall within the specified range.
    ///
    /// # Examples
    ///
    /// ```
    /// let config = sled::ConfigBuilder::new()
    ///     .temporary(true)
    ///     .build();
    /// let t = sled::Db::start(config).unwrap();
    ///
    /// t.set(b"0", vec![0]).unwrap();
    /// t.set(b"1", vec![10]).unwrap();
    /// t.set(b"2", vec![20]).unwrap();
    /// t.set(b"3", vec![30]).unwrap();
    /// t.set(b"4", vec![40]).unwrap();
    /// t.set(b"5", vec![50]).unwrap();
    ///
    /// let start: &[u8] = b"2";
    /// let end: &[u8] = b"4";
    /// let mut r = t.range(start..end);
    /// assert_eq!(r.next().unwrap().unwrap().0, b"2");
    /// assert_eq!(r.next().unwrap().unwrap().0, b"3");
    /// assert_eq!(r.next(), None);
    ///
    /// let start = b"2".to_vec();
    /// let end = b"4".to_vec();
    /// let mut r = t.range(start..end).rev();
    /// assert_eq!(r.next().unwrap().unwrap().0, b"3");
    /// assert_eq!(r.next().unwrap().unwrap().0, b"2");
    /// assert_eq!(r.next(), None);
    /// ```
    pub fn range<K, R>(&self, range: R) -> Iter<'_>
    where
        K: AsRef<[u8]>,
        R: RangeBounds<K>,
    {
        let _measure = Measure::new(&M.tree_scan);

        let guard = pin();

        let lo = match range.start_bound() {
            ops::Bound::Included(ref end) => {
                ops::Bound::Included(end.as_ref().to_vec())
            }
            ops::Bound::Excluded(ref end) => {
                ops::Bound::Excluded(end.as_ref().to_vec())
            }
            ops::Bound::Unbounded => ops::Bound::Unbounded,
        };
        let hi = match range.end_bound() {
            ops::Bound::Included(ref end) => {
                ops::Bound::Included(end.as_ref().to_vec())
            }
            ops::Bound::Excluded(ref end) => {
                ops::Bound::Excluded(end.as_ref().to_vec())
            }
            ops::Bound::Unbounded => ops::Bound::Unbounded,
        };

        Iter {
            tree: &self,
            hi,
            lo,
            last_id: None,
            last_key: None,
            broken: None,
            done: false,
            is_scan: false,
            guard,
        }
    }

    /// Create a double-ended iterator over keys, starting at the provided key.
    ///
    /// # Examples
    ///
    /// ```
    /// let config = sled::ConfigBuilder::new().temporary(true).build();
    /// let t = sled::Db::start(config).unwrap();
    /// t.set(&[1], vec![10]);
    /// t.set(&[2], vec![20]);
    /// t.set(&[3], vec![30]);
    /// let mut iter = t.scan(&*vec![2]);
    /// // assert_eq!(iter.next(), Some(Ok(vec![2])));
    /// // assert_eq!(iter.next(), Some(Ok(vec![3])));
    /// // assert_eq!(iter.next(), None);
    /// ```
    pub fn keys<K>(&self, key: K) -> Keys<'_>
    where
        K: AsRef<[u8]>,
    {
        self.scan(key).keys()
    }

    /// Create a double-ended iterator over values, starting at the provided key.
    ///
    /// # Examples
    ///
    /// ```
    /// let config = sled::ConfigBuilder::new().temporary(true).build();
    /// let t = sled::Db::start(config).unwrap();
    /// t.set(&[1], vec![10]);
    /// t.set(&[2], vec![20]);
    /// t.set(&[3], vec![30]);
    /// let mut iter = t.scan(&*vec![2]);
    /// // assert_eq!(iter.next(), Some(Ok(vec![20])));
    /// // assert_eq!(iter.next(), Some(Ok(vec![30])));
    /// // assert_eq!(iter.next(), None);
    /// ```
    pub fn values<K: AsRef<[u8]>>(&self, key: K) -> Values<'_> {
        self.scan(key).values()
    }

    /// Returns the number of elements in this tree.
    ///
    /// Beware: performs a full O(n) scan under the hood.
    pub fn len(&self) -> usize {
        self.iter().count()
    }

    /// Returns `true` if the `Tree` contains no elements.
    pub fn is_empty(&self) -> bool {
        self.iter().next().is_none()
    }

    fn recursive_split<'g>(
        &self,
        path: Vec<(Cow<'g, Frag>, TreePtr<'g>)>,
        guard: &'g Guard,
    ) -> Result<(), ()> {
        // to split, we pop the path, see if it's in need of split, recurse up
        // two-phase: (in prep for lock-free, not necessary for single threaded)
        //  1. half-split: install split on child, P
        //      a. allocate new right sibling page, Q
        //      b. locate split point
        //      c. create new consolidated pages for both sides
        //      d. add new node to pagetable
        //      e. merge split delta to original page P with physical pointer to Q
        //      f. if failed, free the new page
        //  2. parent update: install new index term on parent
        //      a. merge "index term delta record" to parent, containing:
        //          i. new bounds for P & Q
        //          ii. logical pointer to Q
        //
        //      (it's possible parent was merged in the mean-time, so if that's the
        //      case, we need to go up the path to the grandparent then down again
        //      or higher until it works)
        //  3. any traversing nodes that witness #1 but not #2 try to complete it
        //
        //  root is special case, where we need to hoist a new root

        let adjusted_max = |height| {
            // nodes toward the root are larger
            let threshold = std::cmp::min(height, 8) as u32;
            let multiplier = 2_u64.pow(threshold);
            self.config.blink_node_split_size as u64 * multiplier
        };

        for (height, window) in path.windows(2).rev().enumerate() {
            let (parent_frag, parent_ptr) = &window[0];
            let (node_frag, node_ptr) = &window[1];
            let node: &Node = node_frag.unwrap_base();
            if node.should_split(
                adjusted_max(height)
            ) {
                // try to child split
                if let Ok(parent_split) =
                    self.child_split(node, node_ptr.clone(), guard)
                {
                    // now try to parent split
                    let parent_node = parent_frag.unwrap_base();

                    let res = self.parent_split(
                        parent_node.id,
                        parent_ptr.clone(),
                        parent_split.clone(),
                        guard,
                    );

                    match res {
                        Ok(_res) => {}
                        Err(Error::CasFailed(_)) => continue,
                        other => {
                            return other
                                .map(|_| ())
                                .map_err(|e| e.danger_cast());
                        }
                    }
                }
            }
        }

        let (ref root_frag, ref root_ptr) = path[0];
        let root_node: &Node = root_frag.unwrap_base();

        if root_node
            .should_split(adjusted_max(path.len()))
        {
            if let Ok(parent_split) =
                self.child_split(&root_node, root_ptr.clone(), guard)
            {
                return self
                    .root_hoist(
                        root_node.id,
                        parent_split.to,
                        parent_split.at.clone(),
                        guard,
                    )
                    .map(|_| ())
                    .map_err(|e| e.danger_cast());
            }
        }

        Ok(())
    }

    fn child_split<'g>(
        &self,
        node: &Node,
        node_cas_key: TreePtr<'g>,
        guard: &'g Guard,
    ) -> Result<ParentSplit, ()> {
        let new_pid = self.pages.allocate(guard)?;
        trace!("allocated pid {} in child_split", new_pid);

        // split the node in half
        let rhs = node.split(new_pid);

        let child_split = Frag::ChildSplit(ChildSplit {
            at: rhs.lo.clone(),
            to: new_pid,
        });

        let parent_split = ParentSplit {
            at: rhs.lo.clone(),
            to: new_pid,
        };

        // install the new right side
        let new_ptr = loop {
            debug_delay();
            let res = self.pages.replace(
                new_pid,
                PagePtr::allocated(),
                Frag::Base(rhs.clone()),
                guard,
            );

            // This may fail if the pagecache has relocated
            // the page since we allocated it.
            match res {
                Ok(r) => break r,
                Err(Error::CasFailed(_)) => continue,
                Err(other) => return Err(other.danger_cast()),
            }
        };

        // try to install a child split on the left side
        let link = self.pages.link(
            node.id,
            node_cas_key,
            child_split,
            guard,
        );

        match link {
            Ok(_) => {}
            Err(Error::CasFailed(_)) => {
                // if we failed, don't follow through with the parent split
                let mut ptr = new_ptr.clone();
                loop {
                    match self.pages.free(new_pid, ptr, guard) {
                        Err(Error::CasFailed(Some(actual_ptr))) => {
                            ptr = actual_ptr.clone()
                        }
                        Err(Error::CasFailed(None)) => panic!("somehow allocated child was already freed"),
                        Err(other) => return Err(other.danger_cast()),
                        Ok(_) => break,
                    }
                }
                return Err(Error::CasFailed(()));
            }
            Err(other) => return Err(other.danger_cast()),
        }

        Ok(parent_split)
    }

    fn parent_split<'g>(
        &self,
        parent_node_id: usize,
        parent_cas_key: TreePtr<'g>,
        parent_split: ParentSplit,
        guard: &'g Guard,
    ) -> Result<TreePtr<'g>, Option<TreePtr<'g>>> {
        // install parent split
        self.pages.link(
            parent_node_id,
            parent_cas_key,
            Frag::ParentSplit(parent_split.clone()),
            guard,
        )
    }

    fn root_hoist<'g>(
        &self,
        from: PageId,
        to: PageId,
        at: IVec,
        guard: &'g Guard,
    ) -> Result<(), ()> {
        // hoist new root, pointing to lhs & rhs
        let new_root_pid = self.pages.allocate(guard)?;
        debug!("allocated pid {} in root_hoist", new_root_pid);

        let root_lo = b"";
        let mut new_root_vec = vec![];
        new_root_vec.push((vec![0].into(), from));

        let encoded_at = prefix_encode(root_lo, &*at);
        new_root_vec.push((encoded_at, to));
        let new_root = Frag::Base(Node {
            id: new_root_pid,
            data: Data::Index(new_root_vec),
            next: None,
            lo: vec![].into(),
            hi: vec![].into(),
        });
        debug_delay();

        let new_root_ptr = loop {
            debug_delay();
            let res = self.pages.replace(
                new_root_pid,
                PagePtr::allocated(),
                new_root.clone(),
                guard,
            );

            // This may fail if the pagecache has relocated
            // the page since we allocated it.
            match res {
                Ok(r) => break r,
                Err(Error::CasFailed(_)) => continue,
                Err(other) => return Err(other.danger_cast()),
            }
        };

        debug_delay();
        let cas = meta::cas_root(
            &*self.pages,
            self.tree_id.clone(),
            Some(from),
            Some(new_root_pid),
            guard,
        );
        if cas.is_ok() {
            debug!(
                "root hoist from {} to {} successful",
                from, new_root_pid
            );
            Ok(())
        } else {
            debug!(
                "root hoist from {} to {} failed: {:?}",
                from, new_root_pid, cas
            );
            self.pages
                .free(new_root_pid, new_root_ptr, guard)
                .map_err(|e| e.danger_cast())
        }
    }

    fn get_internal<'g, K: AsRef<[u8]>>(
        &self,
        key: K,
        guard: &'g Guard,
    ) -> Result<(Path<'g>, Option<&'g [u8]>), ()> {
        let path = self.path_for_key(key.as_ref(), guard)?;

        let ret = path.last().and_then(|(last_frag, _tree_ptr)| {
            let last_node = last_frag.unwrap_base();
            let data = &last_node.data;
            let items =
                data.leaf_ref().expect("last_node should be a leaf");
            let search = items
                .binary_search_by(|&(ref k, ref _v)| {
                    prefix_cmp_encoded(k, key.as_ref(), &last_node.lo)
                })
                .ok();

            search.map(|idx| &*items[idx].1)
        });

        Ok((path, ret))
    }

    #[doc(hidden)]
    pub fn key_debug_str<K: AsRef<[u8]>>(&self, key: K) -> String {
        let guard = pin();

        let path = self.path_for_key(key.as_ref(), &guard).expect(
            "path_for_key should always return at least 2 nodes, \
             even if the key being searched for is not present",
        );
        let mut ret = String::new();
        for (node, _ptr) in &path {
            ret.push_str(&*format!("\n{:?}", node));
        }

        guard.flush();

        ret
    }

    /// returns the traversal path, completing any observed
    /// partially complete splits or merges along the way.
    pub(crate) fn path_for_key<'g, K: AsRef<[u8]>>(
        &self,
        key: K,
        guard: &'g Guard,
    ) -> Result<Vec<(&'g Frag, TreePtr<'g>)>, ()> {
        let _measure = Measure::new(&M.tree_traverse);

        let mut cursor =
            meta::pid_for_name(&*self.pages, &self.tree_id, guard)?;
        let mut path: Vec<(&'g Frag, TreePtr<'g>)> = vec![];

        // unsplit_parent is used for tracking need
        // to complete partial splits.
        let mut unsplit_parent: Option<usize> = None;

        let mut not_found_loops = 0;
        loop {
            let get_cursor = self
                .pages
                .get(cursor, guard)
                .map_err(|e| e.danger_cast())?;

            if get_cursor.is_free() || get_cursor.is_allocated() {
                // restart search from the tree's root
                not_found_loops += 1;
                debug_assert_ne!(
                    not_found_loops, 10_000,
                    "cannot find pid {} in path_for_key",
                    cursor
                );
                cursor = meta::pid_for_name(
                    &*self.pages,
                    &self.tree_id,
                    guard,
                )?;
                continue;
            }

            let (frag, cas_key) = match get_cursor {
                PageGet::Materialized(node, cas_key) => {
                    (node, cas_key)
                }
                broken => {
                    return Err(Error::ReportableBug(format!(
                        "got non-base node while traversing tree: {:?}",
                        broken
                    )));
                }
            };

            let node = frag.unwrap_base();

            // TODO this may need to change when handling (half) merges
            assert!(
                node.lo.as_ref() <= key.as_ref(),
                "overshot key somehow"
            );

            // half-complete split detect & completion
            // (when hi is empty, it means it's unbounded)
            if !node.hi.is_empty() && node.hi.as_ref() <= key.as_ref()
            {
                // we have encountered a child split, without
                // having hit the parent split above.
                cursor = node.next.expect(
                    "if our hi bound is not Inf (inity), \
                     we should have a right sibling",
                );
                if unsplit_parent.is_none() && !path.is_empty() {
                    unsplit_parent = Some(path.len() - 1);
                }
                continue;
            } else if let Some(idx) = unsplit_parent.take() {
                // we have found the proper page for
                // our split.
                let (parent_frag, parent_ptr) = &path[idx];
                let parent_node = parent_frag.unwrap_base();

                let ps = Frag::ParentSplit(ParentSplit {
                    at: node.lo.clone(),
                    to: node.id,
                });

                let link = self.pages.link(
                    parent_node.id,
                    parent_ptr.clone(),
                    ps,
                    guard,
                );
                match link {
                    Ok(_new_key) => {
                        // TODO set parent's cas_key (not this cas_key) to
                        // new_key in the path, along with updating the
                        // parent's node in the path vec. if we don't do
                        // both, we lose the newly appended parent split.
                    }
                    Err(Error::CasFailed(_)) => {}
                    Err(other) => return Err(other.danger_cast()),
                }
            }

            path.push((frag, cas_key.clone()));

            match path
                .last()
                .expect("we just pushed to path, so it's not empty")
                .0
                .unwrap_base()
                .data
            {
                Data::Index(ref ptrs) => {
                    let old_cursor = cursor;

                    let search = binary_search_lub(
                        ptrs,
                        |&(ref k, ref _v)| {
                            prefix_cmp_encoded(
                                k,
                                key.as_ref(),
                                &node.lo,
                            )
                        },
                    );

                    // This might be none if ord is Less and we're
                    // searching for the empty key
                    let index =
                        search.expect("failed to traverse index");

                    cursor = ptrs[index].1;

                    if cursor == old_cursor {
                        panic!("stuck in page traversal loop");
                    }
                }
                Data::Leaf(_) => {
                    break;
                }
            }
        }

        Ok(path)
    }

    // Remove all pages for this tree from the underlying
    // PageCache. This will leave orphans behind if
    // the tree crashes during gc.
    pub(crate) fn gc_pages(
        &self,
        mut leftmost_chain: Vec<PageId>,
    ) -> Result<(), ()> {
        let guard = pin();

        while let Some(mut pid) = leftmost_chain.pop() {
            loop {
                let get_cursor = self
                    .pages
                    .get(pid, &guard)
                    .map_err(|e| e.danger_cast())?;

                let (node, key) = match get_cursor {
                    PageGet::Materialized(node, key) => {
                        (node, key)
                    }
                    broken => {
                        return Err(Error::ReportableBug(format!(
                            "got non-base node while GC'ing tree: {:?}",
                            broken
                        )))
                    }
                };

                let ret = self.pages.free(pid, key.clone(), &guard);

                match ret {
                    Ok(_) => {
                        let next_pid = node.unwrap_base().id;
                        if next_pid == 0 {
                            break;
                        }
                        pid = next_pid;
                    }
                    Err(Error::CasFailed(_k)) => continue,
                    Err(other) => return Err(other.danger_cast()),
                }
            }
        }

        Ok(())
    }
}

impl Debug for Tree {
    fn fmt(
        &self,
        f: &mut fmt::Formatter<'_>,
    ) -> std::result::Result<(), fmt::Error> {
        let guard = pin();

        let mut pid =
            meta::pid_for_name(&*self.pages, &self.tree_id, &guard)
                .expect("tree does not exist");
        let mut left_most = pid;
        let mut level = 0;

        f.write_str("Tree: \n\t")?;
        self.pages.fmt(f)?;
        f.write_str("\tlevel 0:\n")?;

        loop {
            let get_res = self.pages.get(pid, &guard);
            let node = match get_res {
                Ok(PageGet::Materialized(ref frag, ref _ptr)) => {
                    frag.unwrap_base()
                }
                broken => panic!(
                    "pagecache returned non-base node: {:?}",
                    broken
                ),
            };

            f.write_str("\t\t")?;
            node.fmt(f)?;
            f.write_str("\n")?;

            if let Some(next_pid) = node.next {
                pid = next_pid;
            } else {
                // we've traversed our level, time to bump down
                let left_get_res = self.pages.get(left_most, &guard);
                let left_node = match left_get_res {
                    Ok(PageGet::Materialized(mf, ..)) => {
                        mf.unwrap_base()
                    }
                    broken => panic!(
                        "pagecache returned non-base node: {:?}",
                        broken
                    ),
                };

                match &left_node.data {
                    Data::Index(ptrs) => {
                        if let Some(&(ref _sep, ref next_pid)) =
                            ptrs.first()
                        {
                            pid = *next_pid;
                            left_most = *next_pid;
                            level += 1;
                            f.write_str(&*format!(
                                "\n\tlevel {}:\n",
                                level
                            ))?;
                        } else {
                            panic!("trying to debug print empty index node");
                        }
                    }
                    Data::Leaf(_items) => {
                        // we've reached the end of our tree, all leafs are on
                        // the lowest level.
                        break;
                    }
                }
            }
        }

        guard.flush();

        Ok(())
    }
}