bplus_store 0.4.0

Copy-on-write B+ tree with page-aligned storage, split/merge, and crash-safety primitives.
Documentation
//! Embedded database façade.
//!
//! `Db::open` initialises a `Database` and exposes `create_tree` / `open_tree`
//! to obtain typed [`Tree`] handles. All storage details are encapsulated inside
//! the `Database` layer — this module never touches storage types directly.

use std::marker::PhantomData;
use std::path::Path;
use std::sync::Arc;

use crate::api::ApiError;
use crate::bplustree::iterator::BPlusTreeIter;
use crate::bplustree::transaction::{TxnStatus, WriteTransaction};
use crate::bplustree::tree::SharedBPlusTree;
use crate::codec::kv::{KeyCodec, ValueCodec};
use crate::database::{self, Database};
use crate::keyfmt::KeyFormat;
use crate::keyfmt::raw::RawFormat;
use crate::storage::file_page_storage::FilePageStorage;
use crate::storage::paged_node_storage::PagedNodeStorage;

type InnerTree = SharedBPlusTree<PagedNodeStorage<FilePageStorage>, FilePageStorage>;

/// Embedded database handle.
///
/// Storage is shared via `Arc`, so [`Tree`] handles derived from this `Db` are
/// independently owned and can be freely sent across threads.
pub struct Db {
    database: Arc<Database<FilePageStorage>>,
}

impl Db {
    /// Opens (or creates) the database rooted at `dir`.
    ///
    /// The directory must already exist. On first open the data file and
    /// manifest are created automatically.
    pub fn open<P: AsRef<Path>>(dir: P) -> Result<Self, ApiError> {
        let db = database::open::<FilePageStorage, _>(dir)
            .map_err(|e| ApiError::Internal(e.to_string()))?;
        Ok(Self {
            database: Arc::new(db),
        })
    }

    /// Creates a new named tree and returns a typed handle.
    pub fn create_tree<K, V>(&self, name: &str, order: u64) -> Result<Tree<K, V>, ApiError>
    where
        K: KeyCodec,
        V: ValueCodec,
    {
        let key_format = KeyFormat::Raw(RawFormat);
        let tree_meta = self
            .database
            .create_tree(name, K::ENCODING, key_format, order, None)
            .map_err(|e| ApiError::Internal(e.to_string()))?;
        let inner = self
            .database
            .bind_tree(&tree_meta)
            .map_err(|e| ApiError::Internal(e.to_string()))?;
        Ok(Tree {
            inner,
            _k: PhantomData,
            _v: PhantomData,
        })
    }

    /// Opens an existing named tree from the catalog.
    pub fn open_tree<K, V>(&self, name: &str) -> Result<Tree<K, V>, ApiError>
    where
        K: KeyCodec,
        V: ValueCodec,
    {
        let tree_meta = self
            .database
            .describe_tree(name)
            .map_err(|e| ApiError::Internal(e.to_string()))?;
        let inner = self
            .database
            .bind_tree(&tree_meta)
            .map_err(|e| ApiError::Internal(e.to_string()))?;
        Ok(Tree {
            inner,
            _k: PhantomData,
            _v: PhantomData,
        })
    }

    /// Opens an existing tree, or creates one with the given `order` if it
    /// does not exist yet.
    pub fn tree<K, V>(&self, name: &str, order: u64) -> Result<Tree<K, V>, ApiError>
    where
        K: KeyCodec,
        V: ValueCodec,
    {
        match self.open_tree(name) {
            Ok(t) => Ok(t),
            Err(_) => self.create_tree(name, order),
        }
    }

    /// Renames an existing tree from `old_name` to `new_name`.
    ///
    /// The change is recorded durably in the manifest log. After this call,
    /// [`open_tree`](Self::open_tree) with `old_name` will fail and `new_name`
    /// must be used instead.
    pub fn rename_tree(&self, old_name: &str, new_name: &str) -> Result<(), ApiError> {
        let meta = self
            .database
            .describe_tree(old_name)
            .map_err(|e| ApiError::Internal(e.to_string()))?;
        self.database
            .rename_tree(&meta.id, new_name)
            .map_err(|e| ApiError::Internal(e.to_string()))
    }

    /// Removes a tree from the catalog.
    ///
    /// The deletion is recorded durably in the manifest log. After this call,
    /// [`open_tree`](Self::open_tree) with `name` will fail.
    pub fn drop_tree(&self, name: &str) -> Result<(), ApiError> {
        let meta = self
            .database
            .describe_tree(name)
            .map_err(|e| ApiError::Internal(e.to_string()))?;
        self.database
            .drop_tree(&meta.id)
            .map_err(|e| ApiError::Internal(e.to_string()))
    }

    /// Returns the names of all trees currently in the catalog.
    pub fn list_trees(&self) -> Vec<String> {
        self.database.list_trees()
    }

    /// Returns the on-disk format version read from the superblock.
    pub fn format_version(&self) -> u32 {
        self.database.format_version()
    }

    /// Persists the freelist snapshot, returning any I/O error from the
    /// checkpoint step.
    ///
    /// After this call the `Db` is consumed. The underlying `Database` is
    /// dropped when the last `Arc` reference (including any live [`Tree`]
    /// handles) goes away.
    pub fn close(self) -> Result<(), ApiError> {
        self.database
            .checkpoint_freelist()
            .map_err(|e| ApiError::Internal(e.to_string()))
    }
}

impl Drop for Db {
    fn drop(&mut self) {
        // Best-effort freelist checkpoint; nothing useful to do with the error
        // since Drop can't propagate it. Use `Db::close()` for error handling.
        if let Err(e) = self.database.checkpoint_freelist() {
            eprintln!("warning: failed to checkpoint freelist on drop: {e}");
        }
    }
}

// ---------------------------------------------------------------------------
// Tree<K, V>
// ---------------------------------------------------------------------------

/// Typed handle to a single B+ tree inside a [`Db`].
pub struct Tree<K, V>
where
    K: KeyCodec,
    V: ValueCodec,
{
    inner: InnerTree,
    _k: PhantomData<fn() -> K>,
    _v: PhantomData<fn() -> V>,
}

impl<K, V> Tree<K, V>
where
    K: KeyCodec,
    V: ValueCodec,
{
    /// Inserts or replaces the value for `key`.
    pub fn put(&self, key: &K, value: &V) -> Result<(), ApiError> {
        let mut txn = WriteTransaction::new(self.inner.clone());
        txn.insert(key.encode(), value.encode());
        match txn.commit(&self.inner)? {
            TxnStatus::Committed => Ok(()),
            TxnStatus::Aborted => Err(ApiError::TxnAborted),
        }
    }

    /// Returns the value for `key`, or `None` if the key is absent.
    pub fn get(&self, key: &K) -> Result<Option<V>, ApiError> {
        let kb = key.encode();
        match self.inner.search(kb)? {
            Some(bytes) => Ok(Some(V::decode(&bytes)?)),
            None => Ok(None),
        }
    }

    /// Returns `true` if `key` exists in the tree, without decoding the value.
    pub fn contains_key(&self, key: &K) -> Result<bool, ApiError> {
        let kb = key.encode();
        Ok(self.inner.contains_key(kb)?)
    }

    /// Deletes the value for `key`. Returns an error if the key is not found.
    pub fn delete(&self, key: &K) -> Result<(), ApiError> {
        let mut txn = WriteTransaction::new(self.inner.clone());
        txn.delete(key.encode());
        match txn.commit(&self.inner)? {
            TxnStatus::Committed => Ok(()),
            TxnStatus::Aborted => Err(ApiError::TxnAborted),
        }
    }

    /// Starts a batched write transaction.
    pub fn txn(&self) -> WriteTxn<'_, K, V> {
        WriteTxn {
            inner: WriteTransaction::new(self.inner.clone()),
            tree: self,
        }
    }

    /// Returns a forward range iterator from `start` (inclusive) to `end`
    /// (exclusive).
    pub fn range(&self, start: &K, end: &K) -> Result<RangeIter<'_, K, V>, ApiError> {
        let start_bytes = start.encode();
        let end_bytes = end.encode();
        let inner = self.inner.search_range(&start_bytes, Some(&end_bytes))?;
        Ok(RangeIter {
            inner,
            _k: PhantomData,
            _v: PhantomData,
        })
    }

    /// Returns a forward range iterator from `start` (inclusive) to the end
    /// of the tree.
    pub fn range_from(&self, start: &K) -> Result<RangeIter<'_, K, V>, ApiError> {
        let start_bytes = start.encode();
        let inner = self.inner.search_range(&start_bytes, None)?;
        Ok(RangeIter {
            inner,
            _k: PhantomData,
            _v: PhantomData,
        })
    }

    /// Returns the number of entries currently in the tree.
    pub fn len(&self) -> u64 {
        self.inner.get_size()
    }

    /// Returns true if the tree is empty.
    pub fn is_empty(&self) -> bool {
        self.len() == 0
    }

    /// Returns the current height of the tree (1 = single leaf).
    pub fn height(&self) -> u64 {
        self.inner.get_height()
    }
}

// ---------------------------------------------------------------------------
// WriteTxn
// ---------------------------------------------------------------------------

/// Batched write transaction with optimistic CAS commit.
pub struct WriteTxn<'t, K, V>
where
    K: KeyCodec,
    V: ValueCodec,
{
    inner: WriteTransaction,
    tree: &'t Tree<K, V>,
}

impl<'t, K, V> WriteTxn<'t, K, V>
where
    K: KeyCodec,
    V: ValueCodec,
{
    /// Stages an insert of `key` → `value`.
    pub fn insert(&mut self, key: &K, value: &V) {
        self.inner.insert(key.encode(), value.encode());
    }

    /// Stages a delete of `key`.
    pub fn delete(&mut self, key: &K) {
        self.inner.delete(key.encode());
    }

    /// Commits all staged operations. Returns `Err(ApiError::TxnAborted)` if
    /// the retry budget is exhausted.
    pub fn commit(mut self) -> Result<(), ApiError> {
        match self.inner.commit(&self.tree.inner)? {
            TxnStatus::Committed => Ok(()),
            TxnStatus::Aborted => Err(ApiError::TxnAborted),
        }
    }
}

// ---------------------------------------------------------------------------
// RangeIter
// ---------------------------------------------------------------------------

type InnerNodeStorage = PagedNodeStorage<FilePageStorage>;

/// Typed forward-range iterator over `(K, V)` pairs.
///
/// Wraps a bytes-level `BPlusTreeIter` and decodes keys and values via
/// [`KeyCodec`] / [`ValueCodec`].
pub struct RangeIter<'t, K, V>
where
    K: KeyCodec,
    V: ValueCodec,
{
    inner: BPlusTreeIter<'t, InnerNodeStorage>,
    _k: PhantomData<fn() -> K>,
    _v: PhantomData<fn() -> V>,
}

impl<'t, K, V> Iterator for RangeIter<'t, K, V>
where
    K: KeyCodec,
    V: ValueCodec,
{
    type Item = Result<(K, V), ApiError>;

    fn next(&mut self) -> Option<Self::Item> {
        let (key_bytes, val_bytes) = match self.inner.next()? {
            Ok(pair) => pair,
            Err(e) => return Some(Err(e.into())),
        };
        let key = match K::decode(&key_bytes) {
            Ok(k) => k,
            Err(e) => return Some(Err(e)),
        };
        let value = match V::decode(&val_bytes) {
            Ok(v) => v,
            Err(e) => return Some(Err(e)),
        };
        Some(Ok((key, value)))
    }
}