metaldb 1.0.0

Persistent storage implementation based on RocksDB.
Documentation
use std::{
    cell::RefCell,
    collections::{BTreeMap, HashMap},
    fmt, iter,
    iter::{Iterator as StdIterator, Peekable},
    marker::PhantomData,
    mem,
    ops::{Bound, Deref, DerefMut},
    rc::Rc,
    result::Result as StdResult,
};

use crate::{
    validation::assert_valid_name_component,
    views::{AsReadonly, ChangesIter, IndexesPool, RawAccess, ResolvedAddress, View},
    Error, Result,
};

/// Changes related to a specific `View`.
#[derive(Debug, Default, Clone)]
pub struct ViewChanges {
    /// Changes within the view.
    pub(super) data: BTreeMap<Vec<u8>, Change>,
    /// Was the view cleared as a part of changes?
    is_cleared: bool,
}

impl ViewChanges {
    fn new() -> Self {
        Self::default()
    }

    pub fn is_cleared(&self) -> bool {
        self.is_cleared
    }

    pub fn clear(&mut self) {
        self.data.clear();
        self.is_cleared = true;
    }

    pub(crate) fn into_data(self) -> BTreeMap<Vec<u8>, Change> {
        self.data
    }

    /// Returns a value for the specified key, or an `Err(_)` if the value should be determined
    /// by the underlying snapshot.
    pub fn get(&self, key: &[u8]) -> StdResult<Option<Vec<u8>>, ()> {
        if let Some(change) = self.data.get(key) {
            return Ok(match *change {
                Change::Put(ref v) => Some(v.clone()),
                Change::Delete => None,
            });
        }
        if self.is_cleared() {
            return Ok(None);
        }
        Err(())
    }

    /// Returns whether the view contains the specified `key`. An `Err(_)` is returned if this
    /// is determined by the underlying snapshot.
    pub fn contains(&self, key: &[u8]) -> StdResult<bool, ()> {
        if let Some(change) = self.data.get(key) {
            return Ok(match *change {
                Change::Put(..) => true,
                Change::Delete => false,
            });
        }

        if self.is_cleared() {
            return Ok(false);
        }
        Err(())
    }
}

/// Cell holding changes for a specific view. Mutable view borrows take changes out
/// of the `Option` and unwraps `Rc` into inner data, while immutable borrows clone inner `Rc`.
type ChangesCell = Option<Rc<ViewChanges>>;

#[derive(Debug, Default)]
struct WorkingPatch {
    changes: RefCell<HashMap<ResolvedAddress, ChangesCell>>,
}

#[derive(Debug)]
enum WorkingPatchRef<'a> {
    Borrowed(&'a WorkingPatch),
    Owned(Rc<Fork>),
}

impl WorkingPatchRef<'_> {
    fn patch(&self) -> &WorkingPatch {
        match self {
            WorkingPatchRef::Borrowed(patch) => patch,
            WorkingPatchRef::Owned(ref fork) => &fork.working_patch,
        }
    }
}

#[derive(Debug)]
pub struct ChangesRef<'a> {
    inner: Rc<ViewChanges>,
    _lifetime: PhantomData<&'a ()>,
}

impl Drop for ChangesRef<'_> {
    fn drop(&mut self) {
        // Do nothing. The implementation is required to make `View`s based on `ChangesRef`
        // drop before a mutable operation is performed on a fork (e.g., it's converted
        // into a patch).
    }
}

impl Deref for ChangesRef<'_> {
    type Target = ViewChanges;

    fn deref(&self) -> &ViewChanges {
        &*self.inner
    }
}

/// `RefMut`, but dumber.
#[derive(Debug)]
pub struct ChangesMut<'a> {
    parent: WorkingPatchRef<'a>,
    key: ResolvedAddress,
    changes: Option<Rc<ViewChanges>>,
}

impl Deref for ChangesMut<'_> {
    type Target = ViewChanges;

    fn deref(&self) -> &ViewChanges {
        // `.unwrap()` is safe: `changes` can be equal to `None` only when
        // the instance is being dropped.
        self.changes.as_ref().unwrap()
    }
}

impl DerefMut for ChangesMut<'_> {
    fn deref_mut(&mut self) -> &mut ViewChanges {
        // `.unwrap()`s are safe:
        //
        // - `changes` can be equal to `None` only when the instance is being dropped.
        // - We know that `Rc` with the changes is unique.
        Rc::get_mut(self.changes.as_mut().unwrap()).unwrap()
    }
}

impl Drop for ChangesMut<'_> {
    fn drop(&mut self) {
        let mut change_map = self.parent.patch().changes.borrow_mut();
        let changes = change_map.get_mut(&self.key).unwrap_or_else(|| {
            panic!("insertion point for changes disappeared at {:?}", self.key);
        });

        debug_assert!(changes.is_none(), "edit conflict at {:?}", self.key);
        *changes = self.changes.take();
    }
}

impl WorkingPatch {
    /// Creates a new empty patch.
    fn new() -> Self {
        Self {
            changes: RefCell::new(HashMap::new()),
        }
    }

    /// Takes a cell with changes for a specific `View` out of the patch.
    /// The returned cell is guaranteed to contain an `Rc` with an exclusive ownership.
    fn take_view_changes(&self, address: &ResolvedAddress) -> ChangesCell {
        let view_changes = {
            let mut changes = self.changes.borrow_mut();
            let view_changes = changes.get_mut(address).map(Option::take);
            view_changes.unwrap_or_else(|| {
                changes
                    .entry(address.clone())
                    .or_insert_with(|| Some(Rc::new(ViewChanges::new())))
                    .take()
            })
        };

        if let Some(ref view_changes) = view_changes {
            assert!(
                Rc::strong_count(view_changes) == 1,
                "Attempting to borrow {:?} mutably while it's borrowed immutably",
                address
            );
        } else {
            panic!("Multiple mutable borrows of an index at {:?}", address);
        }
        view_changes
    }

    /// Clones changes for a specific `View` from the patch. Panics if the changes
    /// are mutably borrowed.
    fn clone_view_changes(&self, address: &ResolvedAddress) -> Rc<ViewChanges> {
        let mut changes = self.changes.borrow_mut();
        // Get changes for the specified address.
        let changes: &ChangesCell = changes
            .entry(address.clone())
            .or_insert_with(|| Some(Rc::new(ViewChanges::new())));

        changes
            .as_ref()
            .unwrap_or_else(|| {
                // If the `changes` are `None`, this means they have been taken by a previous call
                // to `take_view_changes` and not yet returned.
                panic!(
                    "Attempting to borrow {:?} immutably while it's borrowed mutably",
                    address
                );
            })
            .clone()
    }

    // TODO: verify that this method updates `Change`s already in the `Patch` [ECR-2834]
    fn merge_into(self, patch: &mut Patch) {
        for (address, changes) in self.changes.into_inner() {
            // Check that changes are not borrowed mutably (in this case, the corresponding
            // `ChangesCell` is `None`).
            //
            // Both this and the following `panic`s cannot feasibly be triggered,
            // since the only place where this method is called (`Fork::flush()`) borrows
            // `Fork` mutably; this forces both mutable and immutable index borrows to be dropped,
            // since they borrow `Fork` immutably.
            let changes = changes.unwrap_or_else(|| {
                panic!(
                    "changes are still mutably borrowed at address {:?}",
                    address
                );
            });
            // Check that changes are not borrowed immutably (in this case, there is another
            // `Rc<_>` pointer to changes somewhere).
            let changes = Rc::try_unwrap(changes).unwrap_or_else(|_| {
                panic!(
                    "changes are still immutably borrowed at address {:?}",
                    address
                );
            });

            // The patch may already contain changes related to the `address`. If it does,
            // we extend these changes with the new changes (relying on the fact that
            // newer changes override older ones), unless the view was cleared (in which case,
            // the old changes do not matter and should be forgotten).
            let patch_changes = patch
                .changes
                .entry(address)
                .or_insert_with(ViewChanges::new);
            if changes.is_cleared() {
                *patch_changes = changes;
            } else {
                patch_changes.data.extend(changes.data);
            }
        }
    }
}

/// A generalized iterator over the storage views.
pub type Iter<'a> = Box<dyn Iterator + 'a>;

/// An enum that represents a type of change made to some key in the storage.
#[derive(Debug, Clone, PartialEq)]
#[cfg_attr(test, derive(Eq, Hash))] // needed for patch equality comparison
pub enum Change {
    /// Put the specified value into the storage for the corresponding key.
    Put(Vec<u8>),
    /// Delete a value from the storage for the corresponding key.
    Delete,
}

/// A combination of a database snapshot and changes on top of it.
///
/// A `Fork` provides both immutable and mutable operations over the database by implementing
/// the [`RawAccessMut`] trait. Like [`Snapshot`], `Fork` provides read isolation.
/// When mutable operations are applied to a fork, the subsequent reads act as if the changes
/// are applied to the database; in reality, these changes are accumulated in memory.
///
/// To apply the changes to the database, you need to convert a `Fork` into a [`Patch`] using
/// [`into_patch`] and then atomically [`merge`] it into the database. If two
/// conflicting forks are merged into a database, this can lead to an inconsistent state. If you
/// need to consistently apply several sets of changes to the same data, the next fork should be
/// created after the previous fork has been merged.
///
/// `Fork` also supports checkpoints ([`flush`] and [`rollback`] methods), which allows
/// rolling back the latest changes. A checkpoint is created automatically after calling
/// the `flush` method.
///
/// ```
/// # use metaldb::{access::CopyAccessExt, Database, TemporaryDB};
/// let db = TemporaryDB::new();
/// let mut fork = db.fork();
/// fork.get_list("list").extend(vec![1_u32, 2]);
/// fork.flush();
/// fork.get_list("list").push(3_u32);
/// fork.rollback();
/// // The changes after the latest `flush()` are now forgotten.
/// let list = fork.get_list::<_, u32>("list");
/// assert_eq!(list.len(), 2);
/// # assert_eq!(list.iter().collect::<Vec<_>>(), vec![1, 2]);
/// ```
///
/// In order to convert a fork into `&dyn Snapshot` presentation, convert it into a `Patch`
/// and use a reference to it (`Patch` implements `Snapshot`). Using `<Fork as RawAccess>::snapshot`
/// for this purpose is logically incorrect and may lead to hard-to-debug errors.
///
/// # Borrow checking
///
/// It is possible to create only one instance of index with the specified `IndexAddress` based on a
/// single fork. If an additional instance is requested, the code will panic in runtime.
/// Hence, obtaining indexes from a `Fork` functions similarly to [`RefCell::borrow_mut()`].
///
/// For example the code below will panic at runtime.
///
/// ```rust,should_panic
/// # use metaldb::{access::CopyAccessExt, TemporaryDB, ListIndex, Database};
/// let db = TemporaryDB::new();
/// let fork = db.fork();
/// let index = fork.get_list::<_, u8>("index");
/// // This code will panic at runtime.
/// let index2 = fork.get_list::<_, u8>("index");
/// ```
///
/// To enable immutable / shared references to indexes, you may use [`readonly`] method:
///
/// ```
/// # use metaldb::{access::CopyAccessExt, TemporaryDB, ListIndex, Database};
/// let db = TemporaryDB::new();
/// let fork = db.fork();
/// fork.get_list::<_, u8>("index").extend(vec![1, 2, 3]);
///
/// let readonly = fork.readonly();
/// let index = readonly.get_list::<_, u8>("index");
/// // Works fine.
/// let index2 = readonly.get_list::<_, u8>("index");
/// ```
///
/// It is impossible to mutate index contents having a readonly access to the fork; this is
/// checked by the Rust type system.
///
/// Shared references work like `RefCell::borrow()`; it is a runtime error to try to obtain
/// a shared reference to an index if there is an exclusive reference to the same index,
/// and vice versa.
///
/// [`RawAccessMut`]: access/trait.RawAccessMut.html
/// [`Snapshot`]: trait.Snapshot.html
/// [`Patch`]: struct.Patch.html
/// [`into_patch`]: #method.into_patch
/// [`merge`]: trait.Database.html#tymethod.merge
/// [`commit`]: #method.commit
/// [`flush`]: #method.flush
/// [`rollback`]: #method.rollback
/// [`readonly`]: #method.readonly
/// [`RefCell::borrow_mut()`]: https://doc.rust-lang.org/std/cell/struct.RefCell.html#method.borrow_mut
#[derive(Debug)]
pub struct Fork {
    patch: Patch,
    working_patch: WorkingPatch,
}

/// A set of changes that can be atomically applied to a `Database`.
///
/// This set can contain changes from multiple indexes. Changes can be read from the `Patch`
/// using its `RawAccess` implementation.
///
/// # Examples
///
/// ```
/// # use metaldb::{
/// #     access::CopyAccessExt, Database, Patch, TemporaryDB,
/// # };
/// let db = TemporaryDB::new();
/// let fork = db.fork();
/// fork.get_list("list").extend(vec![1_i32, 2, 3]);
/// let patch: Patch = fork.into_patch();
/// // The patch contains changes recorded in the fork.
/// let list = patch.get_list::<_, i32>("list");
/// assert_eq!(list.len(), 3);
/// ```
#[derive(Debug)]
pub struct Patch {
    snapshot: Box<dyn Snapshot>,
    changes: HashMap<ResolvedAddress, ViewChanges>,
}

pub(super) struct ForkIter<'a, T: StdIterator> {
    snapshot: Iter<'a>,
    changes: Option<Peekable<T>>,
}

#[derive(Debug, PartialEq, Eq)]
enum NextIterValue {
    Stored,
    Replaced,
    Inserted,
    Deleted,
    MissDeleted,
    Finished,
}

/// Low-level storage backend implementing a collection of named key-value stores
/// (aka column families).
///
/// A `Database` instance is shared across different threads, so it must be `Sync` and `Send`.
///
/// There is no way to directly interact with data in the database; use [`snapshot`], [`fork`]
/// and [`merge`] methods for indirect interaction. See [the crate-level documentation](index.html)
/// for more details.
///
/// Note that `Database` effectively has [interior mutability][interior-mut];
/// `merge` and `merge_sync` methods take a shared reference to the database (`&self`)
/// rather than an exclusive one (`&mut self`). This means that the following code compiles:
///
/// ```
/// use metaldb::{access::CopyAccessExt, Database, TemporaryDB};
///
/// // not declared as `mut db`!
/// let db: Box<dyn Database> = Box::new(TemporaryDB::new());
/// let fork = db.fork();
/// {
///     let mut list = fork.get_list("list");
///     list.push(42_u64);
/// }
/// db.merge(fork.into_patch()).unwrap();
/// ```
///
/// # Merge Workflow
///
/// The user of a `Database` is responsible to ensure that forks are either created and merged
/// sequentially or do not contain overlapping changes. By sequential creation we mean the following
/// workflow:
///
/// ```
/// # use metaldb::{Database, TemporaryDB};
/// let db = TemporaryDB::new();
/// let first_fork = db.fork();
/// // Perform some operations on `first_fork`...
/// db.merge(first_fork.into_patch()).unwrap();
/// let second_fork = db.fork();
/// // Perform some operations on `second_fork`...
/// db.merge(second_fork.into_patch()).unwrap();
/// ```
///
/// In contrast, this is a non-sequential workflow:
///
/// ```
/// # use metaldb::{Database, TemporaryDB};
/// let db = TemporaryDB::new();
/// let first_fork = db.fork();
/// // Perform some operations on `first_fork`...
/// let second_fork = db.fork();
/// // Perform some operations on `second_fork`...
/// db.merge(first_fork.into_patch()).unwrap();
/// db.merge(second_fork.into_patch()).unwrap();
/// ```
///
/// In a non-sequential workflow, `first_fork` and `second_fork` **must not** contain overlapping
/// changes (i.e., changes to the same index). If they do, the result of the merge may be
/// unpredictable to the programmer and may break database invariants, e.g., that the length
/// of an index is equal to the number of elements obtained by iterating over the index:
///
/// ```
/// // NEVER USE THIS PATTERN!
/// # use metaldb::{access::CopyAccessExt, Database, TemporaryDB};
/// let db = TemporaryDB::new();
/// let first_fork = db.fork();
/// first_fork.get_list("list").extend(vec![1, 2, 3]);
/// let second_fork = db.fork();
/// second_fork.get_list("list").push(4);
/// db.merge(first_fork.into_patch()).unwrap();
/// db.merge(second_fork.into_patch()).unwrap();
///
/// let snapshot = db.snapshot();
/// let list = snapshot.get_list::<_, i32>("list");
/// assert_eq!(list.len(), 1);
/// assert_eq!(list.iter().collect::<Vec<_>>(), vec![4, 2, 3]);
/// // ^-- Oops, we got two phantom elements!
/// ```
///
/// It is advised to create / merge patches sequentially whenever possible. The concurrent
/// workflow should only be used for minor changes, for which the proof that a patch does not overlap
/// with concurrent patches is tractable.
///
/// [`snapshot`]: #tymethod.snapshot
/// [`fork`]: #method.fork
/// [`merge`]: #tymethod.merge
/// [interior-mut]: https://doc.rust-lang.org/book/ch15-05-interior-mutability.html
pub trait Database: Send + Sync + 'static {
    /// Creates a new snapshot of the database from its current state.
    fn snapshot(&self) -> Box<dyn Snapshot>;

    /// Creates a new fork of the database from its current state.
    fn fork(&self) -> Fork {
        Fork {
            patch: Patch {
                snapshot: self.snapshot(),
                changes: HashMap::new(),
            },
            working_patch: WorkingPatch::new(),
        }
    }

    /// Atomically applies a sequence of patch changes to the database.
    ///
    /// Note that this method may be called concurrently from different threads, the
    /// onus to guarantee atomicity is on the implementor of the trait.
    ///
    /// # Logical Safety
    ///
    /// Merging several patches which are not created sequentially and contain
    /// overlapping changes may result in the unexpected storage state and lead to the hard-to debug
    /// errors, storage leaks etc. See the [trait docs](#merge-workflow) for more details.
    ///
    /// # Errors
    ///
    /// If this method encounters any form of I/O or other error during merging, an error variant
    /// will be returned. In case of an error, the method guarantees no changes are applied to
    /// the database.
    fn merge(&self, patch: Patch) -> Result<()>;

    /// Atomically applies a sequence of patch changes to the database with fsync.
    ///
    /// Note that this method may be called concurrently from different threads, the
    /// onus to guarantee atomicity is on the implementor of the trait.
    ///
    /// # Logical Safety
    ///
    /// Merging several patches which are not created sequentially and contain
    /// overlapping changes may result in the unexpected storage state and lead to the hard-to debug
    /// errors, storage leaks etc. See the [trait docs](#merge-workflow) for more details.
    ///
    /// # Errors
    ///
    /// If this method encounters any form of I/O or other error during merging, an error variant
    /// will be returned. In case of an error, the method guarantees no changes are applied to
    /// the database.
    fn merge_sync(&self, patch: Patch) -> Result<()>;
}

/// Extension trait for `Database`.
pub trait DatabaseExt: Database {
    /// Merges a patch into the database and creates a backup patch that reverses all the merged
    /// changes.
    ///
    /// # Safety
    ///
    /// It is logically unsound to merge other patches to the database between the `merge_with_backup`
    /// call and merging the backup patch. This may lead to merge artifacts and an inconsistent
    /// database state.
    ///
    /// An exception to this rule is creating backups for several merged patches
    /// and then applying backups in the reverse order:
    ///
    /// ```
    /// # use metaldb::{access::{Access, CopyAccessExt}, Database, DatabaseExt, TemporaryDB};
    /// let db = TemporaryDB::new();
    /// let fork = db.fork();
    /// fork.get_list("list").push(1_u32);
    /// let backup1 = db.merge_with_backup(fork.into_patch()).unwrap();
    /// let fork = db.fork();
    /// fork.get_list("list").push(2_u32);
    /// let backup2 = db.merge_with_backup(fork.into_patch()).unwrap();
    /// let fork = db.fork();
    /// fork.get_list("list").extend(vec![3_u32, 4]);
    /// let backup3 = db.merge_with_backup(fork.into_patch()).unwrap();
    ///
    /// fn enumerate_list<A: Access + Copy>(view: A) -> Vec<u32> {
    ///     view.get_list("list").iter().collect()
    /// }
    ///
    /// assert_eq!(enumerate_list(&db.snapshot()), vec![1, 2, 3, 4]);
    /// // Rollback the most recent merge.
    /// db.merge(backup3).unwrap();
    /// assert_eq!(enumerate_list(&db.snapshot()), vec![1, 2]);
    /// // ...Then the penultimate merge.
    /// db.merge(backup2).unwrap();
    /// assert_eq!(enumerate_list(&db.snapshot()), vec![1]);
    /// // ...Then the oldest one.
    /// db.merge(backup1).unwrap();
    /// assert!(enumerate_list(&db.snapshot()).is_empty());
    /// ```
    ///
    /// # Performance notes
    ///
    /// This method is linear w.r.t. patch size (i.e., the total number of changes in it) plus,
    /// for each clear operation, the corresponding index size before clearing. As such,
    /// the method may be inappropriate to use with large patches.
    ///
    /// # Errors
    ///
    /// Returns an error in the same situations as `Database::merge()`.
    fn merge_with_backup(&self, patch: Patch) -> Result<Patch> {
        let snapshot = self.snapshot();
        let mut rev_changes = HashMap::with_capacity(patch.changes.len());

        for (name, changes) in &patch.changes {
            let mut view_changes = changes.data.clone();
            for (key, change) in &mut view_changes {
                *change = snapshot.get(name, key).map_or(Change::Delete, Change::Put);
            }

            // Remember all elements that will be deleted.
            if changes.is_cleared() {
                let mut iter = snapshot.iter(name, &[]);
                while let Some((key, value)) = iter.next() {
                    view_changes.insert(key.to_vec(), Change::Put(value.to_vec()));
                }
            }

            rev_changes.insert(
                name.clone(),
                ViewChanges {
                    data: view_changes,
                    is_cleared: false,
                },
            );
        }

        self.merge(patch)?;
        Ok(Patch {
            snapshot: self.snapshot(),
            changes: rev_changes,
        })
    }
}

impl<T: Database> DatabaseExt for T {}

/// A read-only snapshot of a storage backend.
///
/// A `Snapshot` instance is an immutable representation of a certain storage state.
/// It provides read isolation, so consistency is guaranteed even if the data in
/// the database changes between reads.
pub trait Snapshot: Send + Sync + 'static {
    /// Returns a value corresponding to the specified address and key as a raw vector of bytes,
    /// or `None` if it does not exist.
    fn get(&self, name: &ResolvedAddress, key: &[u8]) -> Option<Vec<u8>>;

    /// Returns a value for each key corresponding to the specified address and this key as a raw vector of bytes,
    /// or `None` if it does not exist.
    fn multi_get<'a>(
        &self,
        name: &ResolvedAddress,
        keys: &'a mut dyn iter::Iterator<Item = &'a [u8]>,
    ) -> Vec<Option<Vec<u8>>>;

    /// Returns `true` if the snapshot contains a value for the specified address and key.
    ///
    /// The default implementation checks existence of the value using [`get`](#tymethod.get).
    fn contains(&self, name: &ResolvedAddress, key: &[u8]) -> bool {
        self.get(name, key).is_some()
    }

    /// Returns an iterator over the entries of the snapshot in ascending order starting from
    /// the specified key. The iterator element type is `(&[u8], &[u8])`.
    fn iter(&self, name: &ResolvedAddress, from: &[u8]) -> Iter<'_>;
}

/// A trait that defines a streaming iterator over storage view entries. Unlike
/// the standard [`Iterator`](https://doc.rust-lang.org/std/iter/trait.Iterator.html)
/// trait, `Iterator` in `metaldb` is low-level and, therefore, operates with bytes.
pub trait Iterator {
    /// Advances the iterator and returns a reference to the next key and value.
    fn next(&mut self) -> Option<(&[u8], &[u8])>;

    /// Returns a reference to the current key and value without advancing the iterator.
    fn peek(&mut self) -> Option<(&[u8], &[u8])>;
}

impl Patch {
    /// Iterates over changes in this patch.
    pub(crate) fn into_changes(self) -> HashMap<ResolvedAddress, ViewChanges> {
        self.changes
    }
}

impl Snapshot for Patch {
    fn get(&self, name: &ResolvedAddress, key: &[u8]) -> Option<Vec<u8>> {
        self.changes
            .get(name)
            .map_or(Err(()), |changes| changes.get(key))
            // At this point, `Err(_)` signifies that we need to retrieve data from the snapshot.
            .unwrap_or_else(|()| self.snapshot.get(name, key))
    }

    fn multi_get<'a>(
        &self,
        name: &ResolvedAddress,
        keys: &'a mut dyn iter::Iterator<Item = &'a [u8]>,
    ) -> Vec<Option<Vec<u8>>> {
        let changes = self.changes.get(name);
        let size = {
            let (min, max) = keys.size_hint();

            max.unwrap_or(min)
        };

        let (mut res, db_keys) = keys.into_iter().enumerate().fold(
            (Vec::with_capacity(size), Vec::with_capacity(size)),
            |(mut res, mut db_keys), (idx, key)| {
                if let Some(Ok(item)) = changes.map(|changes| changes.get(key)) {
                    res.push(item);
                } else {
                    res.push(None);
                    db_keys.push((idx, key));
                }

                (res, db_keys)
            },
        );

        let db_res = self
            .snapshot
            .multi_get(name, &mut db_keys.iter().map(|(_, key)| *key));

        for ((idx, _), item) in db_keys.into_iter().zip(db_res) {
            res[idx] = item;
        }

        res
    }

    fn contains(&self, name: &ResolvedAddress, key: &[u8]) -> bool {
        self.changes
            .get(name)
            .map_or(Err(()), |changes| changes.contains(key))
            // At this point, `Err(_)` signifies that we need to retrieve data from the snapshot.
            .unwrap_or_else(|()| self.snapshot.contains(name, key))
    }

    fn iter(&self, name: &ResolvedAddress, from: &[u8]) -> Iter<'_> {
        let maybe_changes = self.changes.get(name);
        let changes_iter = maybe_changes.map(|changes| {
            changes
                .data
                .range::<[u8], _>((Bound::Included(from), Bound::Unbounded))
        });

        let is_cleared = maybe_changes.map_or(false, ViewChanges::is_cleared);
        if is_cleared {
            // Ignore all changes from the snapshot.
            Box::new(ChangesIter::new(changes_iter.unwrap()))
        } else {
            Box::new(ForkIter::new(self.snapshot.iter(name, from), changes_iter))
        }
    }
}

impl RawAccess for &'_ Patch {
    type Changes = ();

    fn snapshot(&self) -> &dyn Snapshot {
        *self as &dyn Snapshot
    }

    fn changes(&self, _address: &ResolvedAddress) -> Self::Changes {}
}

impl AsReadonly for &'_ Patch {
    type Readonly = Self;

    fn as_readonly(&self) -> Self::Readonly {
        self
    }
}

impl Fork {
    /// Finalizes all changes that were made after previous execution of the `flush` method.
    /// If no `flush` method had been called before, finalizes all changes that were
    /// made after creation of `Fork`.
    pub fn flush(&mut self) {
        let working_patch = mem::replace(&mut self.working_patch, WorkingPatch::new());
        working_patch.merge_into(&mut self.patch);
    }

    /// Finishes a migration of indexes with the specified prefix.
    pub(crate) fn flush_migration(&mut self, prefix: &str) {
        assert_valid_name_component(prefix);

        // Mutable `self` reference ensures that no indexes are instantiated in the client code.
        self.flush(); // Flushing is necessary to keep `self.patch` up to date.

        let removed_addrs = IndexesPool::new(&*self).flush_migration(prefix);
        for addr in removed_addrs {
            self.patch.changes.entry(addr).or_default().clear();
        }
    }

    /// Rolls back all changes that were made after the latest execution
    /// of the `flush` method.
    pub fn rollback(&mut self) {
        self.working_patch = WorkingPatch::new();
    }

    /// Rolls back the migration with the specified name. This will remove all indexes
    /// within the migration.
    pub(crate) fn rollback_migration(&mut self, prefix: &str) {
        assert_valid_name_component(prefix);
        self.flush();
        let removed_addrs = IndexesPool::new(&*self).rollback_migration(prefix);
        for addr in &removed_addrs {
            self.patch.changes.remove(addr);
        }
    }

    /// Converts the fork into `Patch` consuming the fork instance.
    pub fn into_patch(mut self) -> Patch {
        self.flush();
        self.patch
    }

    /// Returns a readonly wrapper around the fork. Indexes created based on the readonly
    /// version cannot be modified; on the other hand, it is possible to have multiple
    /// copies of an index at the same time.
    pub fn readonly(&self) -> ReadonlyFork<'_> {
        ReadonlyFork(self)
    }
}

impl From<Patch> for Fork {
    /// Creates a fork based on the provided `patch` and `snapshot`.
    ///
    /// Note: using created fork to modify data already present in `patch` may lead
    /// to an inconsistent database state. Hence, this method is useful only if you
    /// are sure that the fork and `patch` interacted with different indexes.
    fn from(patch: Patch) -> Self {
        Self {
            patch,
            working_patch: WorkingPatch::new(),
        }
    }
}

impl<'a> RawAccess for &'a Fork {
    type Changes = ChangesMut<'a>;

    fn snapshot(&self) -> &dyn Snapshot {
        &self.patch
    }

    fn changes(&self, address: &ResolvedAddress) -> Self::Changes {
        let changes = self.working_patch.take_view_changes(address);
        ChangesMut {
            changes,
            key: address.clone(),
            parent: WorkingPatchRef::Borrowed(&self.working_patch),
        }
    }
}

impl RawAccess for Rc<Fork> {
    type Changes = ChangesMut<'static>;

    fn snapshot(&self) -> &dyn Snapshot {
        &self.patch
    }

    fn changes(&self, address: &ResolvedAddress) -> Self::Changes {
        let changes = self.working_patch.take_view_changes(address);
        ChangesMut {
            changes,
            key: address.clone(),
            parent: WorkingPatchRef::Owned(Self::clone(self)),
        }
    }
}

/// Readonly wrapper for a `Fork`.
///
/// This wrapper allows to read from index state from the fork
/// in a type-safe manner (it is impossible to accidentally modify data in the index), and
/// without encountering runtime errors when attempting to concurrently get the same index
/// more than once.
///
/// Since the wrapper borrows the `Fork` immutably, it is still possible to access indexes
/// in the fork directly. In this scenario, the caller should be careful that `ReadonlyFork`
/// does not access the same indexes as the original `Fork`: this will result in a runtime
/// error (sort of like attempting both an exclusive and a shared borrow from a `RefCell`
/// or `RwLock`).
///
/// # Examples
///
/// ```
/// # use metaldb::{access::CopyAccessExt, Database, ReadonlyFork, TemporaryDB};
/// let db = TemporaryDB::new();
/// let fork = db.fork();
/// fork.get_list("list").push(1_u32);
/// let readonly: ReadonlyFork<'_> = fork.readonly();
/// let list = readonly.get_list::<_, u32>("list");
/// assert_eq!(list.get(0), Some(1));
/// let same_list = readonly.get_list::<_, u32>("list");
/// // ^-- Does not result in an error!
///
/// // Original fork is still accessible.
/// let mut map = fork.get_map("map");
/// map.put(&1_u32, "foo".to_string());
/// ```
///
/// There are no write methods in indexes instantiated from `ReadonlyFork`:
///
/// ```compile_fail
/// # use metaldb::{access::CopyAccessExt, Database, ReadonlyFork, TemporaryDB};
/// let db = TemporaryDB::new();
/// let fork = db.fork();
/// let readonly: ReadonlyFork<'_> = fork.readonly();
/// let mut list = readonly.get_list("list");
/// list.push(1_u32); // Won't compile: no `push` method in `ListIndex<ReadonlyFork, u32>`!
/// ```
#[derive(Debug, Clone, Copy)]
pub struct ReadonlyFork<'a>(&'a Fork);

impl<'a> AsReadonly for ReadonlyFork<'a> {
    type Readonly = Self;

    fn as_readonly(&self) -> Self::Readonly {
        *self
    }
}

impl<'a> AsReadonly for &'a Fork {
    type Readonly = ReadonlyFork<'a>;

    fn as_readonly(&self) -> Self::Readonly {
        ReadonlyFork(*self)
    }
}

impl<'a> RawAccess for ReadonlyFork<'a> {
    type Changes = ChangesRef<'a>;

    fn snapshot(&self) -> &dyn Snapshot {
        &self.0.patch
    }

    fn changes(&self, address: &ResolvedAddress) -> Self::Changes {
        ChangesRef {
            inner: self.0.working_patch.clone_view_changes(address),
            _lifetime: PhantomData,
        }
    }
}

/// Version of `ReadonlyFork` with a static lifetime. Can be produced from an `Rc<Fork>` using
/// the `AsReadonly` trait.
///
/// Beware that producing an instance increases the reference counter of the underlying fork.
/// If you need to obtain `Fork` from `Rc<Fork>` via [`Rc::try_unwrap`], make sure that all
/// `OwnedReadonlyFork` instances are dropped by this time.
///
/// [`Rc::try_unwrap`]: https://doc.rust-lang.org/std/rc/struct.Rc.html#method.try_unwrap
///
/// # Examples
///
/// ```
/// # use metaldb::{access::AccessExt, AsReadonly, Database, OwnedReadonlyFork, TemporaryDB};
/// # use std::rc::Rc;
/// let db = TemporaryDB::new();
/// let fork = Rc::new(db.fork());
/// fork.get_list("list").extend(vec![1_u32, 2, 3]);
/// let ro_fork: OwnedReadonlyFork = fork.as_readonly();
/// let list = ro_fork.get_list::<_, u32>("list");
/// assert_eq!(list.len(), 3);
/// ```
#[derive(Debug, Clone)]
pub struct OwnedReadonlyFork(Rc<Fork>);

impl RawAccess for OwnedReadonlyFork {
    type Changes = ChangesRef<'static>;

    fn snapshot(&self) -> &dyn Snapshot {
        &self.0.patch
    }

    fn changes(&self, address: &ResolvedAddress) -> Self::Changes {
        ChangesRef {
            inner: self.0.working_patch.clone_view_changes(address),
            _lifetime: PhantomData,
        }
    }
}

impl AsReadonly for OwnedReadonlyFork {
    type Readonly = Self;

    fn as_readonly(&self) -> Self::Readonly {
        self.clone()
    }
}

impl AsReadonly for Rc<Fork> {
    type Readonly = OwnedReadonlyFork;

    fn as_readonly(&self) -> Self::Readonly {
        OwnedReadonlyFork(self.clone())
    }
}

impl AsRef<dyn Snapshot> for dyn Snapshot {
    fn as_ref(&self) -> &dyn Snapshot {
        self
    }
}

impl Snapshot for Box<dyn Snapshot> {
    fn get(&self, name: &ResolvedAddress, key: &[u8]) -> Option<Vec<u8>> {
        self.as_ref().get(name, key)
    }

    fn multi_get<'a>(
        &self,
        name: &ResolvedAddress,
        keys: &'a mut dyn iter::Iterator<Item = &'a [u8]>,
    ) -> Vec<Option<Vec<u8>>> {
        self.as_ref().multi_get(name, keys)
    }

    fn contains(&self, name: &ResolvedAddress, key: &[u8]) -> bool {
        self.as_ref().contains(name, key)
    }

    fn iter(&self, name: &ResolvedAddress, from: &[u8]) -> Iter<'_> {
        self.as_ref().iter(name, from)
    }
}

impl<'a, T> ForkIter<'a, T>
where
    T: StdIterator<Item = (&'a Vec<u8>, &'a Change)>,
{
    pub fn new(snapshot: Iter<'a>, changes: Option<T>) -> Self {
        ForkIter {
            snapshot,
            changes: changes.map(StdIterator::peekable),
        }
    }

    #[allow(clippy::option_if_let_else)]
    fn step(&mut self) -> NextIterValue {
        use std::cmp::Ordering::{Equal, Greater, Less};

        if let Some(ref mut changes) = self.changes {
            match changes.peek() {
                Some(&(k, change)) => match self.snapshot.peek() {
                    Some((key, ..)) => match *change {
                        Change::Put(..) => match k[..].cmp(key) {
                            Equal => NextIterValue::Replaced,
                            Less => NextIterValue::Inserted,
                            Greater => NextIterValue::Stored,
                        },
                        Change::Delete => match k[..].cmp(key) {
                            Equal => NextIterValue::Deleted,
                            Less => NextIterValue::MissDeleted,
                            Greater => NextIterValue::Stored,
                        },
                    },
                    None => match *change {
                        Change::Put(..) => NextIterValue::Inserted,
                        Change::Delete => NextIterValue::MissDeleted,
                    },
                },
                None => match self.snapshot.peek() {
                    Some(..) => NextIterValue::Stored,
                    None => NextIterValue::Finished,
                },
            }
        } else {
            match self.snapshot.peek() {
                Some(..) => NextIterValue::Stored,
                None => NextIterValue::Finished,
            }
        }
    }
}

impl<'a, T> Iterator for ForkIter<'a, T>
where
    T: StdIterator<Item = (&'a Vec<u8>, &'a Change)>,
{
    fn next(&mut self) -> Option<(&[u8], &[u8])> {
        loop {
            match self.step() {
                NextIterValue::Stored => return self.snapshot.next(),
                NextIterValue::Replaced => {
                    self.snapshot.next();
                    return self.changes.as_mut().unwrap().next().map(|(key, change)| {
                        (
                            key.as_slice(),
                            match *change {
                                Change::Put(ref value) => value.as_slice(),
                                Change::Delete => unreachable!(),
                            },
                        )
                    });
                }
                NextIterValue::Inserted => {
                    return self.changes.as_mut().unwrap().next().map(|(key, change)| {
                        (
                            key.as_slice(),
                            match *change {
                                Change::Put(ref value) => value.as_slice(),
                                Change::Delete => unreachable!(),
                            },
                        )
                    });
                }
                NextIterValue::Deleted => {
                    self.changes.as_mut().unwrap().next();
                    self.snapshot.next();
                }
                NextIterValue::MissDeleted => {
                    self.changes.as_mut().unwrap().next();
                }
                NextIterValue::Finished => return None,
            }
        }
    }

    fn peek(&mut self) -> Option<(&[u8], &[u8])> {
        loop {
            match self.step() {
                NextIterValue::Stored => return self.snapshot.peek(),
                NextIterValue::Replaced | NextIterValue::Inserted => {
                    return self.changes.as_mut().unwrap().peek().map(|&(key, change)| {
                        (
                            key.as_slice(),
                            match *change {
                                Change::Put(ref value) => value.as_slice(),
                                Change::Delete => unreachable!(),
                            },
                        )
                    });
                }
                NextIterValue::Deleted => {
                    self.changes.as_mut().unwrap().next();
                    self.snapshot.next();
                }
                NextIterValue::MissDeleted => {
                    self.changes.as_mut().unwrap().next();
                }
                NextIterValue::Finished => return None,
            }
        }
    }
}

impl fmt::Debug for dyn Database {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("Database").finish()
    }
}

impl fmt::Debug for dyn Snapshot {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("Snapshot").finish()
    }
}

impl fmt::Debug for dyn Iterator {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("Iterator").finish()
    }
}

/// The current `MerkleDB` data layout version.
pub const DB_VERSION: u8 = 0;
/// Database metadata address.
pub const DB_METADATA: &str = "__DB_METADATA__";
/// Version attribute name.
pub const VERSION_NAME: &str = "version";

/// This function checks that the given database is compatible with the current `MerkleDB` version.
pub fn check_database(db: &mut dyn Database) -> Result<()> {
    let fork = db.fork();
    {
        let addr = ResolvedAddress::system(DB_METADATA);
        let mut view = View::new(&fork, addr);
        if let Some(saved_version) = view.get::<_, u8>(VERSION_NAME) {
            if saved_version != DB_VERSION {
                return Err(Error::new(format!(
                    "Database version doesn't match: actual {}, expected {}",
                    saved_version, DB_VERSION
                )));
            }

            return Ok(());
        }
        view.put(VERSION_NAME, DB_VERSION);
    }
    db.merge(fork.into_patch())
}

#[cfg(test)]
mod tests {
    use super::{
        AsReadonly, Change, Database, DatabaseExt, Fork, OwnedReadonlyFork, Patch, Rc,
        ResolvedAddress, Snapshot, StdIterator, View,
    };
    use crate::{access::CopyAccessExt, TemporaryDB};

    use std::{collections::HashSet, iter};

    #[test]
    fn readonly_indexes_are_timely_dropped() {
        let db = TemporaryDB::new();
        let fork = db.fork();
        fork.get_list("list").push(1_u64);
        {
            // The code without an additional scope must not compile.
            let _list = fork.readonly().get_list::<_, u64>("list");
        }
        fork.into_patch();
    }

    /// Asserts that a patch contains only the specified changes.
    fn check_patch<'a, I>(patch: &Patch, changes: I)
    where
        I: IntoIterator<Item = (&'a str, &'a [u8], Change)>,
    {
        let mut patch_set: HashSet<_> = HashSet::new();
        for (name, changes) in &patch.changes {
            for (key, value) in &changes.data {
                patch_set.insert((name.clone(), key.as_slice(), value.clone()));
            }
        }
        let expected_set: HashSet<_> = changes
            .into_iter()
            .map(|(name, key, change)| (ResolvedAddress::system(name), key, change))
            .collect();
        assert_eq!(patch_set, expected_set);
    }

    #[test]
    fn backup_data_is_correct() {
        let db = TemporaryDB::new();
        let fork = db.fork();
        {
            let mut view = View::new(&fork, "foo");
            view.put(&vec![], vec![2]);
        }
        let backup = db.merge_with_backup(fork.into_patch()).unwrap();
        check_patch(&backup, vec![("foo", &[] as &[u8], Change::Delete)]);
        let snapshot = db.snapshot();
        assert_eq!(snapshot.get(&"foo".into(), &[]), Some(vec![2]));

        let fork = db.fork();
        {
            let mut view = View::new(&fork, "foo");
            view.put(&vec![], vec![3]);
            let mut view = View::new(&fork, "bar");
            view.put(&vec![1], vec![4]);
            let mut view = View::new(&fork, "bar2");
            view.put(&vec![5], vec![6]);
        }
        let backup = db.merge_with_backup(fork.into_patch()).unwrap();
        check_patch(
            &backup,
            vec![
                ("bar2", &[5_u8] as &[u8], Change::Delete),
                ("bar", &[1], Change::Delete),
                ("foo", &[], Change::Put(vec![2])),
            ],
        );

        // Check that the old snapshot still corresponds to the same DB state.
        assert_eq!(snapshot.get(&"foo".into(), &[]), Some(vec![2]));
        let snapshot = db.snapshot();
        assert_eq!(snapshot.get(&"foo".into(), &[]), Some(vec![3]));
    }

    #[test]
    fn rollback_via_backup_patches() {
        let db = TemporaryDB::new();
        let fork = db.fork();
        {
            let mut view = View::new(&fork, "foo");
            view.put(&vec![], vec![2]);
        }
        db.merge(fork.into_patch()).unwrap();

        let fork = db.fork();
        {
            let mut view = View::new(&fork, "foo");
            view.put(&vec![], vec![3]);
            let mut view = View::new(&fork, "bar");
            view.put(&vec![1], vec![4]);
        }
        let backup = db.merge_with_backup(fork.into_patch()).unwrap();

        let snapshot = db.snapshot();
        assert_eq!(snapshot.get(&"foo".into(), &[]), Some(vec![3]));
        assert_eq!(backup.get(&"foo".into(), &[]), Some(vec![2]));
        assert_eq!(backup.get(&"bar".into(), &[1]), None);
        assert_eq!(snapshot.get(&"bar".into(), &[1]), Some(vec![4]));

        assert_eq!(
            snapshot.multi_get(&"foo".into(), &mut iter::once(&[] as &[u8])),
            vec![Some(vec![3])]
        );
        assert_eq!(
            backup.multi_get(&"foo".into(), &mut iter::once(&[] as &[u8])),
            vec![Some(vec![2])]
        );
        assert_eq!(
            backup.multi_get(&"bar".into(), &mut iter::once(&[1u8] as &[u8])),
            vec![None]
        );

        assert_eq!(
            snapshot.multi_get(&"bar".into(), &mut iter::once(&[1u8] as &[u8])),
            vec![Some(vec![4])]
        );

        db.merge(backup).unwrap();
        let snapshot = db.snapshot();
        assert_eq!(snapshot.get(&"foo".into(), &[]), Some(vec![2]));
        assert_eq!(snapshot.get(&"bar".into(), &[1]), None);

        assert_eq!(
            snapshot.multi_get(&"foo".into(), &mut iter::once(&[] as &[u8])),
            vec![Some(vec![2])]
        );
        assert_eq!(
            snapshot.multi_get(&"bar".into(), &mut iter::once(&[1u8] as &[u8])),
            vec![None]
        );

        // Check that DB continues working as usual after a rollback.
        let fork = db.fork();
        {
            let mut view = View::new(&fork, "foo");
            view.put(&vec![], vec![4]);
            view.put(&vec![0, 0], vec![255]);
            let mut view = View::new(&fork, "bar");
            view.put(&vec![1], vec![253]);
        }
        let backup1 = db.merge_with_backup(fork.into_patch()).unwrap();
        let snapshot = db.snapshot();
        assert_eq!(
            snapshot.multi_get(
                &"foo".into(),
                &mut vec![&[] as &[u8], &[0u8, 0]].into_iter()
            ),
            vec![Some(vec![4]), Some(vec![255])]
        );

        let fork = db.fork();
        {
            let mut view = View::new(&fork, "bar");
            view.put(&vec![1], vec![254]);
        }
        let backup2 = db.merge_with_backup(fork.into_patch()).unwrap();
        let snapshot = db.snapshot();
        assert_eq!(
            snapshot.multi_get(
                &"foo".into(),
                &mut vec![&[] as &[u8], &[0u8, 0]].into_iter()
            ),
            vec![Some(vec![4]), Some(vec![255])]
        );
        assert_eq!(snapshot.get(&"bar".into(), &[1]), Some(vec![254]));

        // Check patches used as `Snapshot`s.
        assert_eq!(backup1.get(&"bar".into(), &[1]), None);
        assert_eq!(backup2.get(&"bar".into(), &[1]), Some(vec![253]));
        assert_eq!(backup1.get(&"foo".into(), &[]), Some(vec![2]));
        assert_eq!(backup2.get(&"foo".into(), &[]), Some(vec![4]));

        // Backups should be applied in the reverse order.
        db.merge(backup2).unwrap();
        db.merge(backup1).unwrap();
        let snapshot = db.snapshot();
        assert_eq!(
            snapshot.multi_get(&"foo".into(), &mut vec![&[] as &[u8], &[0, 0]].into_iter()),
            vec![Some(vec![2]), None]
        );
        assert_eq!(snapshot.get(&"bar".into(), &[1]), None);
    }

    #[test]
    fn backup_after_clearing_view() {
        let db = TemporaryDB::new();
        let fork = db.fork();
        {
            let mut view = View::new(&fork, "foo");
            view.put(&vec![], vec![1]);
            view.put(&vec![1], vec![2]);
        }
        db.merge(fork.into_patch()).unwrap();

        let fork = db.fork();
        {
            let mut view = View::new(&fork, "foo");
            view.clear();
            view.put(&vec![1], vec![3]);
            view.put(&vec![2], vec![4]);
        }
        let backup = db.merge_with_backup(fork.into_patch()).unwrap();
        assert_eq!(
            backup.multi_get(
                &"foo".into(),
                &mut vec![&[] as &[u8], &[1], &[2]].into_iter()
            ),
            vec![Some(vec![1]), Some(vec![2]), None]
        );
        db.merge(backup).unwrap();
        let snapshot = db.snapshot();
        assert_eq!(
            snapshot.multi_get(
                &"foo".into(),
                &mut vec![&[] as &[u8], &[1], &[2]].into_iter()
            ),
            vec![Some(vec![1]), Some(vec![2]), None]
        );
    }

    #[test]
    fn backup_reverting_index_creation() {
        let db = TemporaryDB::new();
        let fork = db.fork();
        fork.get_entry("foo").set(1_u32);
        db.merge(fork.into_patch()).unwrap();
        let fork = db.fork();
        fork.get_entry(("foo", &1_u8)).set(2_u32);
        let backup = db.merge_with_backup(fork.into_patch()).unwrap();
        assert!(backup.index_type(("foo", &1_u8)).is_none());
        assert!(backup.get_list::<_, u32>(("foo", &1_u8)).is_empty());
    }

    #[test]
    fn borrows_from_owned_forks() {
        use crate::{access::AccessExt, Entry};

        let db = TemporaryDB::new();
        let fork = Rc::new(db.fork());
        let readonly: OwnedReadonlyFork = fork.as_readonly();
        // Modify an index via `fork`.
        fork.get_list("list").extend(vec![1_i64, 2, 3]);
        // Check that if both `CopyAccessExt` and `AccessExt` traits are in scope, the correct one
        // is used for `Rc<Fork>`.
        let mut entry: Entry<Rc<Fork>, _> = fork.get_entry("entry");
        // Access the list via `readonly`.
        let list = readonly.get_list::<_, i64>("list");
        assert_eq!(list.len(), 3);
        assert_eq!(list.get(1), Some(2));
        assert_eq!(list.iter_from(1).collect::<Vec<_>>(), vec![2, 3]);

        entry.set("!".to_owned());
        drop(entry);
        let entry = readonly.get_entry::<_, String>("entry");
        // Clone `readonly` access and get another `entry` instance.
        let other_readonly = readonly;
        let other_entry = other_readonly.get_entry::<_, String>("entry");
        assert_eq!(entry.get().unwrap(), "!");
        assert_eq!(other_entry.get().unwrap(), "!");
    }

    #[test]
    fn concurrent_borrow_from_fork_and_readonly_fork() {
        let db = TemporaryDB::new();
        let fork = db.fork();
        // This entry is phantom.
        let _readonly_entry = fork.readonly().get_entry::<_, u32>(("entry", &1_u8));
        // This one is not phantom, but it has the same `ResolvedAddress` as the phantom entry.
        // Since phantom entries do not borrow changes from the `Fork`, this works fine.
        let _entry = fork.get_entry::<_, u32>("entry");
    }

    #[test]
    fn stale_read_from_phantom_index() {
        let db = TemporaryDB::new();
        let fork = db.fork();
        // Phantom entries are unusual in that they can lead to stale reads (sort of; we assume
        // that the database writer is smart enough to separate readonly and read-write parts
        // of the `Fork`, e.g., via `Prefixed` accesses).
        let phantom_entry = fork.readonly().get_entry::<_, u32>("entry");
        let mut entry = fork.get_entry::<_, u32>("entry");
        entry.set(1);
        assert_eq!(phantom_entry.get(), None);
    }

    #[test]
    #[should_panic(expected = "immutably while it's borrowed mutably")]
    fn borrow_from_readonly_fork_after_index_is_created() {
        let db = TemporaryDB::new();
        let fork = db.fork();
        let _entry = fork.get_entry::<_, u32>("entry");
        // Since the index is already created, this should lead to a panic.
        let _readonly_entry = fork.readonly().get_entry::<_, u32>("entry");
    }
}