sled 0.22.0

a modern embedded database
Documentation
use std::{
    borrow::Cow,
    cmp::Ordering::{Greater, Less},
    fmt::{self, Debug},
    ops::{self, RangeBounds},
    sync::{
        atomic::{AtomicUsize, Ordering::SeqCst},
        Arc,
    },
};

use super::*;

type Path<'g> = Vec<(PageId, &'g Frag, TreePtr<'g>)>;

impl<'a> IntoIterator for &'a Tree {
    type Item = Result<(Vec<u8>, IVec)>;
    type IntoIter = Iter<'a>;

    fn into_iter(self) -> Iter<'a> {
        self.iter()
    }
}

/// A flash-sympathetic persistent lock-free B+ tree
///
/// # Examples
///
/// ```
/// let t = sled::Db::start_default("path_to_my_database").unwrap();
///
/// t.set(b"yo!", b"v1".to_vec());
/// assert!(t.get(b"yo!").unwrap().unwrap() == &*b"v1".to_vec());
///
/// t.cas(
///     b"yo!",                // key
///     Some(b"v1"),           // old value, None for not present
///     Some(b"v2".to_vec()),  // new value, None for delete
/// ).unwrap();
///
/// let mut iter = t.scan(b"a non-present key before yo!");
/// // assert_eq!(iter.next(), Some(Ok((b"yo!".to_vec(), b"v2".to_vec()))));
/// // assert_eq!(iter.next(), None);
///
/// t.del(b"yo!");
/// assert_eq!(t.get(b"yo!"), Ok(None));
/// ```
#[derive(Clone)]
pub struct Tree {
    pub(crate) tree_id: Vec<u8>,
    pub(crate) context: Context,
    pub(crate) subscriptions: Arc<Subscriptions>,
    pub(crate) root: Arc<AtomicUsize>,
}

unsafe impl Send for Tree {}

unsafe impl Sync for Tree {}

impl Tree {
    /// Set a key to a new value, returning the old value if it
    /// was set.
    pub fn set<K, V>(&self, key: K, value: V) -> Result<Option<IVec>>
    where
        K: AsRef<[u8]>,
        IVec: From<V>,
    {
        trace!("setting key {:?}", key.as_ref());
        let _measure = Measure::new(&M.tree_set);

        if self.context.read_only {
            return Err(Error::Unsupported(
                "the database is in read-only mode".to_owned(),
            ));
        }

        let value = IVec::from(value);

        loop {
            let tx = self.context.pagecache.begin()?;
            let (mut path, existing_val) =
                self.get_internal(key.as_ref(), &tx)?;
            let (leaf_id, leaf_frag, leaf_ptr) = path.pop().expect(
                "path_for_key should always return a path \
                 of length >= 2 (root + leaf)",
            );
            let node: &Node = leaf_frag.unwrap_base();
            let encoded_key = prefix_encode(&node.lo, key.as_ref());

            let mut subscriber_reservation = self.subscriptions.reserve(&key);

            let frag = Frag::Set(encoded_key, value.clone());
            let link = self.context.pagecache.link(
                leaf_id,
                leaf_ptr.clone(),
                frag.clone(),
                &tx,
            )?;
            if let Ok(new_cas_key) = link {
                // success
                if let Some(res) = subscriber_reservation.take() {
                    let event =
                        subscription::Event::Set(key.as_ref().to_vec(), value);

                    res.complete(event);
                }

                if node.should_split(self.context.blink_node_split_size as u64)
                {
                    let mut path2 = path
                        .iter()
                        .map(|&(id, f, ref p)| {
                            (id, Cow::Borrowed(f), p.clone())
                        })
                        .collect::<Vec<(PageId, Cow<'_, Frag>, _)>>();
                    let mut node2 = node.clone();
                    node2.apply(&frag, self.context.merge_operator);
                    let frag2 = Cow::Owned(Frag::Base(node2));
                    path2.push((leaf_id, frag2, new_cas_key));
                    self.recursive_split(path2, &tx)?;
                }

                tx.flush();

                return Ok(existing_val.cloned());
            }
            M.tree_looped();
        }
    }

    /// Retrieve a value from the `Tree` if it exists.
    pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> {
        let _measure = Measure::new(&M.tree_get);

        let tx = self.context.pagecache.begin()?;

        let (_, ret) = self.get_internal(key.as_ref(), &tx)?;

        tx.flush();

        Ok(ret.cloned())
    }

    /// Delete a value, returning the last result if it existed.
    ///
    /// # Examples
    ///
    /// ```
    /// let config = sled::ConfigBuilder::new().temporary(true).build();
    /// let t = sled::Db::start(config).unwrap();
    /// t.set(&[1], vec![1]);
    /// assert_eq!(t.del(&*vec![1]).unwrap().unwrap(), vec![1]);
    /// assert_eq!(t.del(&*vec![1]), Ok(None));
    /// ```
    pub fn del<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> {
        let _measure = Measure::new(&M.tree_del);

        if self.context.read_only {
            return Ok(None);
        }

        loop {
            let tx = self.context.pagecache.begin()?;

            let (mut path, existing_val) =
                self.get_internal(key.as_ref(), &tx)?;

            let mut subscriber_reservation = self.subscriptions.reserve(&key);

            let (leaf_id, leaf_frag, leaf_ptr) = path.pop().expect(
                "path_for_key should always return a path \
                 of length >= 2 (root + leaf)",
            );
            let node: &Node = leaf_frag.unwrap_base();
            let encoded_key = prefix_encode(&node.lo, key.as_ref());

            let frag = Frag::Del(encoded_key);
            let link = self.context.pagecache.link(
                leaf_id,
                leaf_ptr.clone(),
                frag,
                &tx,
            )?;

            if link.is_ok() {
                // success
                if let Some(res) = subscriber_reservation.take() {
                    let event = subscription::Event::Del(key.as_ref().to_vec());

                    res.complete(event);
                }

                tx.flush();
                return Ok(existing_val.cloned());
            }
        }
    }

    /// Compare and swap. Capable of unique creation, conditional modification,
    /// or deletion. If old is None, this will only set the value if it doesn't
    /// exist yet. If new is None, will delete the value if old is correct.
    /// If both old and new are Some, will modify the value if old is correct.
    /// If Tree is read-only, will do nothing.
    ///
    /// # Examples
    ///
    /// ```
    /// use sled::{ConfigBuilder, Error};
    /// let config = ConfigBuilder::new().temporary(true).build();
    /// let t = sled::Db::start(config).unwrap();
    ///
    /// // unique creation
    /// assert_eq!(t.cas(&[1], None as Option<&[u8]>, Some("1")), Ok(Ok(())));
    ///
    /// // conditional modification
    /// assert_eq!(t.cas(&[1], Some("1"), Some("2")), Ok(Ok(())));
    ///
    /// // conditional deletion
    /// assert_eq!(t.cas(&[1], Some("2"), None as Option<&[u8]>), Ok(Ok(())));
    /// assert_eq!(t.get(&[1]), Ok(None));
    /// ```
    pub fn cas<K, OV, NV>(
        &self,
        key: K,
        old: Option<OV>,
        new: Option<NV>,
    ) -> Result<std::result::Result<(), Option<IVec>>>
    where
        K: AsRef<[u8]>,
        OV: AsRef<[u8]>,
        IVec: From<NV>,
    {
        trace!("casing key {:?}", key.as_ref());
        let _measure = Measure::new(&M.tree_cas);

        if self.context.read_only {
            return Err(Error::Unsupported(
                "can not perform a cas on a read-only Tree".into(),
            ));
        }

        let new = new.map(IVec::from);

        // we need to retry caps until old != cur, since just because
        // cap fails it doesn't mean our value was changed.
        loop {
            let tx = self.context.pagecache.begin()?;
            let (mut path, cur) = self.get_internal(key.as_ref(), &tx)?;

            let matches = match (&old, &cur) {
                (None, None) => true,
                (Some(ref o), Some(ref c)) => o.as_ref() == &***c,
                _ => false,
            };

            if !matches {
                return Ok(Err(cur.cloned()));
            }

            let mut subscriber_reservation = self.subscriptions.reserve(&key);

            let (leaf_id, leaf_frag, leaf_ptr) = path
                .pop()
                .expect("get_internal somehow returned a path of length zero");

            let (node_id, encoded_key) = {
                let node: &Node = leaf_frag.unwrap_base();
                (leaf_id, prefix_encode(&node.lo, key.as_ref()))
            };
            let frag = if let Some(ref new) = new {
                Frag::Set(encoded_key, new.clone())
            } else {
                Frag::Del(encoded_key)
            };
            let link =
                self.context.pagecache.link(node_id, leaf_ptr, frag, &tx)?;

            if link.is_ok() {
                if let Some(res) = subscriber_reservation.take() {
                    let event = if let Some(new) = new {
                        subscription::Event::Set(key.as_ref().to_vec(), new)
                    } else {
                        subscription::Event::Del(key.as_ref().to_vec())
                    };

                    res.complete(event);
                }

                tx.flush();
                return Ok(Ok(()));
            }
            M.tree_looped();
        }
    }

    /// Subscribe to `Event`s that happen to keys that have
    /// the specified prefix. Events for particular keys are
    /// guaranteed to be witnessed in the same order by all
    /// threads, but threads may witness different interleavings
    /// of `Event`s across different keys. If subscribers don't
    /// keep up with new writes, they will cause new writes
    /// to block. There is a buffer of 1024 items per
    /// `Subscriber`. This can be used to build reactive
    /// and replicated systems.
    ///
    /// # Examples
    /// ```
    /// use sled::{Event, ConfigBuilder};
    /// let config = ConfigBuilder::new().temporary(true).build();
    ///
    /// let tree = sled::Db::start(config).unwrap();
    ///
    /// // watch all events by subscribing to the empty prefix
    /// let mut events = tree.watch_prefix(vec![]);
    ///
    /// let tree_2 = tree.clone();
    /// let thread = std::thread::spawn(move || {
    ///     tree.set(vec![0], vec![1]).unwrap();
    /// });
    ///
    /// // events is a blocking `Iterator` over `Event`s
    /// for event in events.take(1) {
    ///     match event {
    ///         Event::Set(key, value) => assert_eq!(key, vec![0]),
    ///         Event::Merge(key, partial_value) => {}
    ///         Event::Del(key) => {}
    ///     }
    /// }
    ///
    /// thread.join().unwrap();
    /// ```
    pub fn watch_prefix(&self, prefix: Vec<u8>) -> Subscriber {
        self.subscriptions.register(prefix)
    }

    /// Flushes all dirty IO buffers and calls fsync.
    /// If this succeeds, it is guaranteed that
    /// all previous writes will be recovered if
    /// the system crashes. Returns the number
    /// of bytes flushed during this call.
    pub fn flush(&self) -> Result<usize> {
        self.context.pagecache.flush()
    }

    /// Returns `true` if the `Tree` contains a value for
    /// the specified key.
    pub fn contains_key<K: AsRef<[u8]>>(&self, key: K) -> Result<bool> {
        self.get(key).map(|v| v.is_some())
    }

    /// Retrieve the key and value before the provided key,
    /// if one exists.
    ///
    /// # Examples
    ///
    /// ```
    /// use sled::{ConfigBuilder, Error};
    /// let config = ConfigBuilder::new().temporary(true).build();
    ///
    /// let tree = sled::Db::start(config).unwrap();
    ///
    /// for i in 0..10 {
    ///     tree.set(vec![i], vec![i]).expect("should write successfully");
    /// }
    ///
    /// assert!(tree.get_lt(vec![]).unwrap().is_none());
    /// assert!(tree.get_lt(vec![0]).unwrap().is_none());
    /// assert!(tree.get_lt(vec![1]).unwrap().unwrap().1 == vec![0]);
    /// assert!(tree.get_lt(vec![9]).unwrap().unwrap().1 == vec![8]);
    /// assert!(tree.get_lt(vec![10]).unwrap().unwrap().1 == vec![9]);
    /// assert!(tree.get_lt(vec![255]).unwrap().unwrap().1 == vec![9]);
    /// ```
    pub fn get_lt<K: AsRef<[u8]>>(
        &self,
        key: K,
    ) -> Result<Option<(Key, IVec)>> {
        let _measure = Measure::new(&M.tree_get);

        // the double tx is a hack that maintains
        // correctness of the ret value
        let tx = self.context.pagecache.begin()?;

        let path = self.path_for_key(key.as_ref(), &tx)?;
        let (_last_id, last_frag, _tree_ptr) = path
            .last()
            .expect("path should always contain a last element");

        let last_node = last_frag.unwrap_base();
        let data = &last_node.data;
        let items = data.leaf_ref().expect("last_node should be a leaf");
        let search = leaf_search(Less, items, |&(ref k, ref _v)| {
            prefix_cmp_encoded(k, key.as_ref(), &last_node.lo)
        });

        let ret = if search.is_none() {
            let mut iter = self.range((
                std::ops::Bound::Unbounded,
                std::ops::Bound::Excluded(key),
            ));

            match iter.next_back() {
                Some(Err(e)) => return Err(e),
                Some(Ok(pair)) => Some(pair),
                None => None,
            }
        } else {
            let idx = search.unwrap();
            let (encoded_key, v) = &items[idx];
            Some((prefix_decode(&last_node.lo, &*encoded_key), v.clone()))
        };

        tx.flush();

        Ok(ret)
    }

    /// Retrieve the next key and value from the `Tree` after the
    /// provided key.
    ///
    /// # Examples
    ///
    /// ```
    /// use sled::{ConfigBuilder, Error};
    /// let config = ConfigBuilder::new().temporary(true).build();
    ///
    /// let tree = sled::Db::start(config).unwrap();
    ///
    /// for i in 0..10 {
    ///     tree.set(vec![i], vec![i]).expect("should write successfully");
    /// }
    ///
    /// assert!(tree.get_gt(vec![]).unwrap().unwrap().1 == vec![0]);
    /// assert!(tree.get_gt(vec![0]).unwrap().unwrap().1 == vec![1]);
    /// assert!(tree.get_gt(vec![1]).unwrap().unwrap().1 == vec![2]);
    /// assert!(tree.get_gt(vec![8]).unwrap().unwrap().1 == vec![9]);
    /// assert!(tree.get_gt(vec![9]).unwrap().is_none());
    /// ```
    pub fn get_gt<K: AsRef<[u8]>>(
        &self,
        key: K,
    ) -> Result<Option<(Key, IVec)>> {
        let _measure = Measure::new(&M.tree_get);

        let tx = self.context.pagecache.begin()?;

        let path = self.path_for_key(key.as_ref(), &tx)?;
        let (_last_id, last_frag, _tree_ptr) = path
            .last()
            .expect("path should always contain a last element");

        let last_node = last_frag.unwrap_base();
        let data = &last_node.data;
        let items = data.leaf_ref().expect("last_node should be a leaf");
        let search = leaf_search(Greater, items, |&(ref k, ref _v)| {
            prefix_cmp_encoded(k, key.as_ref(), &last_node.lo)
        });

        let ret = if search.is_none() {
            let mut iter = self.range((
                std::ops::Bound::Excluded(key),
                std::ops::Bound::Unbounded,
            ));

            match iter.next() {
                Some(Err(e)) => return Err(e),
                Some(Ok(pair)) => Some(pair),
                None => None,
            }
        } else {
            let idx = search.unwrap();
            let (encoded_key, v) = &items[idx];
            Some((prefix_decode(&last_node.lo, &*encoded_key), v.clone()))
        };

        tx.flush();

        Ok(ret)
    }

    /// Merge state directly into a given key's value using the
    /// configured merge operator. This allows state to be written
    /// into a value directly, without any read-modify-write steps.
    /// Merge operators can be used to implement arbitrary data
    /// structures.
    ///
    /// # Panics
    ///
    /// Calling `merge` will panic if no merge operator has been
    /// configured.
    ///
    /// # Examples
    ///
    /// ```
    /// fn concatenate_merge(
    ///   _key: &[u8],               // the key being merged
    ///   old_value: Option<&[u8]>,  // the previous value, if one existed
    ///   merged_bytes: &[u8]        // the new bytes being merged in
    /// ) -> Option<Vec<u8>> {       // set the new value, return None to delete
    ///   let mut ret = old_value
    ///     .map(|ov| ov.to_vec())
    ///     .unwrap_or_else(|| vec![]);
    ///
    ///   ret.extend_from_slice(merged_bytes);
    ///
    ///   Some(ret)
    /// }
    ///
    /// let config = sled::ConfigBuilder::new()
    ///   .temporary(true)
    ///   .merge_operator(concatenate_merge)
    ///   .build();
    ///
    /// let tree = sled::Db::start(config).unwrap();
    ///
    /// let k = b"k1";
    ///
    /// tree.set(k, vec![0]);
    /// tree.merge(k, vec![1]);
    /// tree.merge(k, vec![2]);
    /// // assert_eq!(tree.get(k).unwrap().unwrap(), vec![0, 1, 2]);
    ///
    /// // sets replace previously merged data,
    /// // bypassing the merge function.
    /// tree.set(k, vec![3]);
    /// // assert_eq!(tree.get(k), Ok(Some(vec![3])));
    ///
    /// // merges on non-present values will add them
    /// tree.del(k);
    /// tree.merge(k, vec![4]);
    /// // assert_eq!(tree.get(k).unwrap().unwrap(), vec![4]);
    /// ```
    pub fn merge<K, V>(&self, key: K, value: V) -> Result<()>
    where
        K: AsRef<[u8]>,
        IVec: From<V>,
    {
        trace!("merging key {:?}", key.as_ref());
        let _measure = Measure::new(&M.tree_merge);

        if self.context.read_only {
            return Err(Error::Unsupported(
                "the database is in read-only mode".to_owned(),
            ));
        }

        let value = IVec::from(value);

        loop {
            let tx = self.context.pagecache.begin()?;

            let mut path = self.path_for_key(key.as_ref(), &tx)?;
            let (leaf_id, leaf_frag, leaf_ptr) = path.pop().expect(
                "path_for_key should always return a path \
                 of length >= 2 (root + leaf)",
            );
            let node: &Node = leaf_frag.unwrap_base();

            let mut subscriber_reservation = self.subscriptions.reserve(&key);

            let encoded_key = prefix_encode(&node.lo, key.as_ref());
            let frag = Frag::Merge(encoded_key, value.clone());

            let link = self.context.pagecache.link(
                leaf_id,
                leaf_ptr.clone(),
                frag.clone(),
                &tx,
            )?;
            if let Ok(new_cas_key) = link {
                // success
                if let Some(res) = subscriber_reservation.take() {
                    let event = subscription::Event::Merge(
                        key.as_ref().to_vec(),
                        value,
                    );

                    res.complete(event);
                }
                if node.should_split(self.context.blink_node_split_size as u64)
                {
                    let mut path2 = path
                        .iter()
                        .map(|&(id, f, ref p)| {
                            (id, Cow::Borrowed(f), p.clone())
                        })
                        .collect::<Vec<(PageId, Cow<'_, Frag>, _)>>();
                    let mut node2 = node.clone();
                    node2.apply(&frag, self.context.merge_operator);
                    let frag2 = Cow::Owned(Frag::Base(node2));
                    path2.push((leaf_id, frag2, new_cas_key));
                    self.recursive_split(path2, &tx)?;
                }
                tx.flush();
                return Ok(());
            }
            M.tree_looped();
        }
    }

    /// Create a double-ended iterator over the tuples of keys and
    /// values in this tree.
    ///
    /// # Examples
    ///
    /// ```
    /// let config = sled::ConfigBuilder::new().temporary(true).build();
    /// let t = sled::Db::start(config).unwrap();
    /// t.set(&[1], vec![10]);
    /// t.set(&[2], vec![20]);
    /// t.set(&[3], vec![30]);
    /// let mut iter = t.iter();
    /// // assert_eq!(iter.next(), Some(Ok((vec![1], vec![10]))));
    /// // assert_eq!(iter.next(), Some(Ok((vec![2], vec![20]))));
    /// // assert_eq!(iter.next(), Some(Ok((vec![3], vec![30]))));
    /// // assert_eq!(iter.next(), None);
    /// ```
    pub fn iter(&self) -> Iter<'_> {
        self.range::<Vec<u8>, _>(..)
    }

    /// Create a double-ended iterator over tuples of keys and values,
    /// starting at the provided key.
    ///
    /// # Examples
    ///
    /// ```
    /// let config = sled::ConfigBuilder::new()
    ///     .temporary(true)
    ///     .build();
    /// let t = sled::Db::start(config).unwrap();
    ///
    /// t.set(b"0", vec![0]).unwrap();
    /// t.set(b"1", vec![10]).unwrap();
    /// t.set(b"2", vec![20]).unwrap();
    /// t.set(b"3", vec![30]).unwrap();
    /// t.set(b"4", vec![40]).unwrap();
    /// t.set(b"5", vec![50]).unwrap();
    ///
    /// let mut r = t.scan(b"2");
    /// assert_eq!(r.next().unwrap().unwrap().0, b"2");
    /// assert_eq!(r.next().unwrap().unwrap().0, b"3");
    /// assert_eq!(r.next().unwrap().unwrap().0, b"4");
    /// assert_eq!(r.next().unwrap().unwrap().0, b"5");
    /// assert_eq!(r.next(), None);

    /// let mut r = t.scan(b"2").rev();
    /// assert_eq!(r.next().unwrap().unwrap().0, b"2");
    /// assert_eq!(r.next().unwrap().unwrap().0, b"1");
    /// assert_eq!(r.next().unwrap().unwrap().0, b"0");
    /// assert_eq!(r.next(), None);
    /// ```
    pub fn scan<K>(&self, key: K) -> Iter<'_>
    where
        K: AsRef<[u8]>,
    {
        let mut iter = self.range(key..);
        iter.is_scan = true;
        iter
    }

    /// Create a double-ended iterator over tuples of keys and values,
    /// where the keys fall within the specified range.
    ///
    /// # Examples
    ///
    /// ```
    /// let config = sled::ConfigBuilder::new()
    ///     .temporary(true)
    ///     .build();
    /// let t = sled::Db::start(config).unwrap();
    ///
    /// t.set(b"0", vec![0]).unwrap();
    /// t.set(b"1", vec![10]).unwrap();
    /// t.set(b"2", vec![20]).unwrap();
    /// t.set(b"3", vec![30]).unwrap();
    /// t.set(b"4", vec![40]).unwrap();
    /// t.set(b"5", vec![50]).unwrap();
    ///
    /// let start: &[u8] = b"2";
    /// let end: &[u8] = b"4";
    /// let mut r = t.range(start..end);
    /// assert_eq!(r.next().unwrap().unwrap().0, b"2");
    /// assert_eq!(r.next().unwrap().unwrap().0, b"3");
    /// assert_eq!(r.next(), None);
    ///
    /// let start = b"2".to_vec();
    /// let end = b"4".to_vec();
    /// let mut r = t.range(start..end).rev();
    /// assert_eq!(r.next().unwrap().unwrap().0, b"3");
    /// assert_eq!(r.next().unwrap().unwrap().0, b"2");
    /// assert_eq!(r.next(), None);
    /// ```
    pub fn range<K, R>(&self, range: R) -> Iter<'_>
    where
        K: AsRef<[u8]>,
        R: RangeBounds<K>,
    {
        let _measure = Measure::new(&M.tree_scan);

        let tx = match self.context.pagecache.begin() {
            Ok(tx) => tx,
            Err(e) => {
                return Iter {
                    tree: &self,
                    tx: Tx::new(0),
                    broken: Some(e),
                    done: false,
                    hi: ops::Bound::Unbounded,
                    lo: ops::Bound::Unbounded,
                    is_scan: false,
                    last_key: None,
                    last_id: None,
                };
            }
        };

        let lo = match range.start_bound() {
            ops::Bound::Included(ref end) => {
                ops::Bound::Included(end.as_ref().to_vec())
            }
            ops::Bound::Excluded(ref end) => {
                ops::Bound::Excluded(end.as_ref().to_vec())
            }
            ops::Bound::Unbounded => ops::Bound::Unbounded,
        };
        let hi = match range.end_bound() {
            ops::Bound::Included(ref end) => {
                ops::Bound::Included(end.as_ref().to_vec())
            }
            ops::Bound::Excluded(ref end) => {
                ops::Bound::Excluded(end.as_ref().to_vec())
            }
            ops::Bound::Unbounded => ops::Bound::Unbounded,
        };

        Iter {
            tree: &self,
            hi,
            lo,
            last_id: None,
            last_key: None,
            broken: None,
            done: false,
            is_scan: false,
            tx,
        }
    }

    /// Create a double-ended iterator over keys, starting at the provided key.
    ///
    /// # Examples
    ///
    /// ```
    /// let config = sled::ConfigBuilder::new().temporary(true).build();
    /// let t = sled::Db::start(config).unwrap();
    /// t.set(&[1], vec![10]);
    /// t.set(&[2], vec![20]);
    /// t.set(&[3], vec![30]);
    /// let mut iter = t.scan(&*vec![2]);
    /// // assert_eq!(iter.next(), Some(Ok(vec![2])));
    /// // assert_eq!(iter.next(), Some(Ok(vec![3])));
    /// // assert_eq!(iter.next(), None);
    /// ```
    pub fn keys<'a, K>(
        &'a self,
        key: K,
    ) -> impl 'a + DoubleEndedIterator<Item = Result<Vec<u8>>>
    where
        K: AsRef<[u8]>,
    {
        self.scan(key).keys()
    }

    /// Create a double-ended iterator over values, starting at the provided key.
    ///
    /// # Examples
    ///
    /// ```
    /// let config = sled::ConfigBuilder::new().temporary(true).build();
    /// let t = sled::Db::start(config).unwrap();
    /// t.set(&[1], vec![10]);
    /// t.set(&[2], vec![20]);
    /// t.set(&[3], vec![30]);
    /// let mut iter = t.scan(&*vec![2]);
    /// // assert_eq!(iter.next(), Some(Ok(vec![20])));
    /// // assert_eq!(iter.next(), Some(Ok(vec![30])));
    /// // assert_eq!(iter.next(), None);
    /// ```
    pub fn values<'a, K>(
        &'a self,
        key: K,
    ) -> impl 'a + DoubleEndedIterator<Item = Result<IVec>>
    where
        K: AsRef<[u8]>,
    {
        self.scan(key).values()
    }

    /// Returns the number of elements in this tree.
    ///
    /// Beware: performs a full O(n) scan under the hood.
    pub fn len(&self) -> usize {
        self.iter().count()
    }

    /// Returns `true` if the `Tree` contains no elements.
    pub fn is_empty(&self) -> bool {
        self.iter().next().is_none()
    }

    /// Clears the `Tree`, removing all values.
    ///
    /// Note that this is not atomic.
    pub fn clear(&self) -> Result<()> {
        for k in self.keys(b"") {
            let key = k?;
            self.del(key)?;
        }
        Ok(())
    }

    /// Returns the name of the tree.
    pub fn name(&self) -> Vec<u8> {
        self.tree_id.clone()
    }

    fn recursive_split<'g>(
        &self,
        path: Vec<(PageId, Cow<'g, Frag>, TreePtr<'g>)>,
        tx: &'g Tx,
    ) -> Result<()> {
        // to split, we pop the path, see if it's in need of split, recurse up
        // two-phase: (in prep for lock-free, not necessary for single threaded)
        //  1. half-split: install split on child, P
        //      a. allocate new right sibling page, Q
        //      b. locate split point
        //      c. create new consolidated pages for both sides
        //      d. add new node to pagetable
        //      e. merge split delta to original page P with physical pointer to Q
        //      f. if failed, free the new page
        //  2. parent update: install new index term on parent
        //      a. merge "index term delta record" to parent, containing:
        //          i. new bounds for P & Q
        //          ii. logical pointer to Q
        //
        //      (it's possible parent was merged in the mean-time, so if that's the
        //      case, we need to go up the path to the grandparent then down again
        //      or higher until it works)
        //  3. any traversing nodes that witness #1 but not #2 try to complete it
        //
        //  root is special case, where we need to hoist a new root

        let adjusted_max = |height| {
            // nodes toward the root are larger
            let threshold = std::cmp::min(height, 8) as u32;
            let multiplier = 2_u64.pow(threshold);
            self.context.blink_node_split_size as u64 * multiplier
        };

        for (height, node_parts) in path.iter().skip(1).rev().enumerate() {
            let (node_id, node_frag, node_ptr) = &node_parts;
            let node: &Node = node_frag.unwrap_base();
            if node.should_split(adjusted_max(height)) {
                // try to child split
                M.tree_child_split_attempt();

                if self
                    .child_split(*node_id, node, node_ptr.clone(), tx)?
                    .is_some()
                {
                    M.tree_child_split_success();
                } else {
                    return Ok(());
                }
            }
        }

        let (ref root_id, ref root_frag, ref root_ptr) = path[0];
        let root_node: &Node = root_frag.unwrap_base();

        if root_node.should_split(adjusted_max(path.len())) {
            M.tree_root_split_attempt();
            if let Some(parent_split) =
                self.child_split(*root_id, &root_node, root_ptr.clone(), tx)?
            {
                if self
                    .root_hoist(
                        *root_id,
                        parent_split.to,
                        parent_split.at.clone(),
                        tx,
                    )
                    .is_ok()
                {
                    M.tree_root_split_success();
                }
            }
        }

        Ok(())
    }

    fn child_split<'g>(
        &self,
        node_id: PageId,
        node: &Node,
        node_cas_key: TreePtr<'g>,
        tx: &'g Tx,
    ) -> Result<Option<ParentSplit>> {
        // split the node in half
        let rhs = node.split();

        let rhs_lo = rhs.lo.clone();

        let mut child_split = ChildSplit {
            at: rhs_lo.clone(),
            to: 0,
        };

        // install the new right side
        let (new_pid, new_ptr) =
            self.context.pagecache.allocate(Frag::Base(rhs), tx)?;

        trace!("allocated pid {} in child_split", new_pid);

        child_split.to = new_pid;

        let parent_split = ParentSplit {
            at: rhs_lo,
            to: new_pid,
        };

        // try to install a child split on the left side
        let link = self.context.pagecache.link(
            node_id,
            node_cas_key,
            Frag::ChildSplit(child_split),
            tx,
        )?;

        if link.is_err() {
            // if we failed, don't follow through with the parent split
            self.context
                .pagecache
                .free(new_pid, new_ptr, tx)?
                .expect("could not free allocated page");
            return Ok(None);
        }

        Ok(Some(parent_split))
    }

    fn root_hoist<'g>(
        &self,
        from: PageId,
        to: PageId,
        at: IVec,
        tx: &'g Tx,
    ) -> Result<()> {
        // hoist new root, pointing to lhs & rhs
        let root_lo = b"";
        let mut new_root_vec = vec![];
        new_root_vec.push((vec![0].into(), from));

        let encoded_at = prefix_encode(root_lo, &*at);
        new_root_vec.push((encoded_at, to));

        let new_root = Frag::Base(Node {
            data: Data::Index(new_root_vec),
            next: None,
            lo: vec![].into(),
            hi: vec![].into(),
        });

        let (new_root_pid, new_root_ptr) =
            self.context.pagecache.allocate(new_root, tx)?;
        debug!("allocated pid {} in root_hoist", new_root_pid);

        debug_delay();

        let cas = self.context.pagecache.cas_root_in_meta(
            self.tree_id.clone(),
            Some(from),
            Some(new_root_pid),
            tx,
        )?;
        if cas.is_ok() {
            debug!("root hoist from {} to {} successful", from, new_root_pid);

            // we spin in a cas loop because it's possible
            // 2 threads are at this point, and we don't want
            // to cause roots to diverge between meta and
            // our version.
            while self.root.compare_and_swap(from, new_root_pid, SeqCst) != from
            {
            }

            Ok(())
        } else {
            debug!(
                "root hoist from {} to {} failed: {:?}",
                from, new_root_pid, cas
            );
            self.context
                .pagecache
                .free(new_root_pid, new_root_ptr, tx)?
                .expect("could not free allocated page");

            Ok(())
        }
    }

    fn get_internal<'g, K: AsRef<[u8]>>(
        &self,
        key: K,
        tx: &'g Tx,
    ) -> Result<(Path<'g>, Option<&'g IVec>)> {
        let path = self.path_for_key(key.as_ref(), tx)?;

        let ret = path.last().and_then(|(_last_id, last_frag, _tree_ptr)| {
            let last_node = last_frag.unwrap_base();
            let data = &last_node.data;
            let items = data.leaf_ref().expect("last_node should be a leaf");
            let search = items
                .binary_search_by(|&(ref k, ref _v)| {
                    prefix_cmp_encoded(k, key.as_ref(), &last_node.lo)
                })
                .ok();

            search.map(|idx| &items[idx].1)
        });

        Ok((path, ret))
    }

    #[doc(hidden)]
    pub fn key_debug_str<K: AsRef<[u8]>>(&self, key: K) -> String {
        let tx = self.context.pagecache.begin().unwrap();

        let path = self.path_for_key(key.as_ref(), &tx).expect(
            "path_for_key should always return at least 2 nodes, \
             even if the key being searched for is not present",
        );
        let mut ret = String::new();
        for (id, node, _ptr) in &path {
            ret.push_str(&*format!("\n{}: {:?}", id, node));
        }

        tx.flush();

        ret
    }

    /// returns the traversal path, completing any observed
    /// partially complete splits or merges along the way.
    pub(crate) fn path_for_key<'g, K: AsRef<[u8]>>(
        &self,
        key: K,
        tx: &'g Tx,
    ) -> Result<Path<'g>> {
        let _measure = Measure::new(&M.tree_traverse);

        let mut cursor = self.root.load(SeqCst);
        let mut path: Vec<(PageId, &'g Frag, TreePtr<'g>)> = vec![];

        // unsplit_parent is used for tracking need
        // to complete partial splits.
        let mut unsplit_parent: Option<usize> = None;

        let mut not_found_loops = 0;
        loop {
            if cursor == usize::max_value() {
                // this collection has been explicitly removed
                return Err(Error::CollectionNotFound(self.tree_id.clone()));
            }
            let get_cursor = self.context.pagecache.get(cursor, tx)?;

            if get_cursor.is_free() {
                // restart search from the tree's root
                not_found_loops += 1;
                debug_assert_ne!(
                    not_found_loops, 10_000,
                    "cannot find pid {} in path_for_key",
                    cursor
                );
                cursor = self.root.load(SeqCst);
                continue;
            }

            let (frag, cas_key) = match get_cursor {
                PageGet::Materialized(node, cas_key) => (node, cas_key),
                broken => {
                    return Err(Error::ReportableBug(format!(
                        "got non-base node while traversing tree: {:?}",
                        broken
                    )));
                }
            };

            let node = frag.unwrap_base();

            // TODO this may need to change when handling (half) merges
            assert!(node.lo.as_ref() <= key.as_ref(), "overshot key somehow");

            // half-complete split detect & completion
            // (when hi is empty, it means it's unbounded)
            if !node.hi.is_empty() && node.hi.as_ref() <= key.as_ref() {
                // we have encountered a child split, without
                // having hit the parent split above.
                cursor = node.next.expect(
                    "if our hi bound is not Inf (inity), \
                     we should have a right sibling",
                );
                if unsplit_parent.is_none() && !path.is_empty() {
                    unsplit_parent = Some(path.len() - 1);
                }
                continue;
            } else if let Some(idx) = unsplit_parent.take() {
                // we have found the proper page for
                // our split.
                let (parent_id, _parent_frag, parent_ptr) = &path[idx];

                let ps = Frag::ParentSplit(ParentSplit {
                    at: node.lo.clone(),
                    to: cursor,
                });

                M.tree_parent_split_attempt();
                let link = self.context.pagecache.link(
                    *parent_id,
                    parent_ptr.clone(),
                    ps,
                    tx,
                )?;
                if let Ok(_new_key) = link {
                    // TODO set parent's cas_key (not this cas_key) to
                    // new_key in the path, along with updating the
                    // parent's node in the path vec. if we don't do
                    // both, we lose the newly appended parent split.
                    M.tree_parent_split_success();
                }
            }

            path.push((cursor, frag, cas_key.clone()));

            match path
                .last()
                .expect("we just pushed to path, so it's not empty")
                .1
                .unwrap_base()
                .data
            {
                Data::Index(ref ptrs) => {
                    let old_cursor = cursor;

                    let search = binary_search_lub(ptrs, |&(ref k, ref _v)| {
                        prefix_cmp_encoded(k, key.as_ref(), &node.lo)
                    });

                    // This might be none if ord is Less and we're
                    // searching for the empty key
                    let index = search.expect("failed to traverse index");

                    cursor = ptrs[index].1;

                    if cursor == old_cursor {
                        panic!("stuck in page traversal loop");
                    }
                }
                Data::Leaf(_) => {
                    break;
                }
            }
        }

        Ok(path)
    }

    // Remove all pages for this tree from the underlying
    // PageCache. This will leave orphans behind if
    // the tree crashes during gc.
    pub(crate) fn gc_pages(
        &self,
        mut leftmost_chain: Vec<PageId>,
    ) -> Result<()> {
        let tx = self.context.pagecache.begin()?;

        while let Some(mut pid) = leftmost_chain.pop() {
            loop {
                let get_cursor = self.context.pagecache.get(pid, &tx)?;

                let (node, key) = match get_cursor {
                    PageGet::Materialized(node, key) => (node, key),
                    PageGet::Free(_) => {
                        error!("encountered Free node while GC'ing tree");
                        break;
                    }
                    broken => {
                        return Err(Error::ReportableBug(format!(
                            "got non-base node while GC'ing tree: {:?}",
                            broken
                        )));
                    }
                };

                let ret = self.context.pagecache.free(pid, key.clone(), &tx)?;

                if ret.is_ok() {
                    let next_pid = node.unwrap_base().next.unwrap_or(0);
                    if next_pid == 0 {
                        break;
                    }
                    assert_ne!(pid, next_pid);
                    pid = next_pid;
                }
            }
        }

        Ok(())
    }
}

impl Debug for Tree {
    fn fmt(
        &self,
        f: &mut fmt::Formatter<'_>,
    ) -> std::result::Result<(), fmt::Error> {
        let tx = self.context.pagecache.begin().unwrap();

        let mut pid = self.root.load(SeqCst);
        let mut left_most = pid;
        let mut level = 0;

        f.write_str("Tree: \n\t")?;
        self.context.pagecache.fmt(f)?;
        f.write_str("\tlevel 0:\n")?;

        loop {
            let get_res = self.context.pagecache.get(pid, &tx);
            let node = match get_res {
                Ok(PageGet::Materialized(ref frag, ref _ptr)) => {
                    frag.unwrap_base()
                }
                broken => {
                    panic!("pagecache returned non-base node: {:?}", broken)
                }
            };

            f.write_str("\t\t")?;
            node.fmt(f)?;
            f.write_str("\n")?;

            if let Some(next_pid) = node.next {
                pid = next_pid;
            } else {
                // we've traversed our level, time to bump down
                let left_get_res = self.context.pagecache.get(left_most, &tx);
                let left_node = match left_get_res {
                    Ok(PageGet::Materialized(mf, ..)) => mf.unwrap_base(),
                    broken => {
                        panic!("pagecache returned non-base node: {:?}", broken)
                    }
                };

                match &left_node.data {
                    Data::Index(ptrs) => {
                        if let Some(&(ref _sep, ref next_pid)) = ptrs.first() {
                            pid = *next_pid;
                            left_most = *next_pid;
                            level += 1;
                            f.write_str(&*format!("\n\tlevel {}:\n", level))?;
                        } else {
                            panic!("trying to debug print empty index node");
                        }
                    }
                    Data::Leaf(_items) => {
                        // we've reached the end of our tree, all leafs are on
                        // the lowest level.
                        break;
                    }
                }
            }
        }

        tx.flush();

        Ok(())
    }
}