commonware-storage 2026.4.0

Persist and retrieve data from an abstract store.
Documentation
//! A collection of authenticated databases inspired by QMDB (Quick Merkle Database).
//!
//! # Terminology
//!
//! A database's state is derived from an append-only log of state-changing _operations_.
//!
//! In a _keyed_ database, a _key_ either has a _value_ or it doesn't, and different types of
//! operations modify the state of a specific key. A key that has a value can change to one without
//! a value through the _delete_ operation. The _update_ operation gives a key a specific value. We
//! sometimes call an update for a key that doesn't already have a value a _create_ operation, but
//! its representation in the log is the same.
//!
//! Keys with values are called _active_. An operation is called _active_ if (1) its key is active,
//! (2) it is an update operation, and (3) it is the most recent operation for that key.
//!
//! # Database Lifecycle
//!
//! All variants are modified through a batch API that follows a common pattern:
//! 1. Create a batch from the database.
//! 2. Stage mutations on the batch.
//! 3. Merkleize the batch -- this resolves mutations against the current state and computes
//!    the Merkle root that would result from applying them.
//! 4. Inspect the root or create child batches.
//! 5. Apply the batch to the database (uncommitted ancestors are applied automatically).
//!
//! The specific mutation methods vary by variant.
//! See each variant's module documentation for the concrete API and usage examples.
//!
//! Persistence and cleanup are managed directly on the database: `sync()`, `prune()`,
//! and `destroy()`.
//!
//! # Traits
//!
//! Keyed mutable variants ([any] and [current]) implement `any::traits::DbAny` and
//! [crate::Persistable].
//!
//! # Acknowledgments
//!
//! The following resources were used as references when implementing this crate:
//!
//! * [QMDB: Quick Merkle Database](https://arxiv.org/abs/2501.05262)
//! * [Merkle Mountain
//!   Ranges](https://github.com/opentimestamps/opentimestamps-server/blob/master/doc/merkle-mountain-range.md)

use crate::{
    index::{Cursor, Unordered as Index},
    journal::contiguous::{Mutable, Reader},
    merkle::{Family, Location},
    qmdb::operation::Operation,
};
use commonware_utils::NZUsize;
use core::num::NonZeroUsize;
use futures::{pin_mut, StreamExt as _};
use thiserror::Error;

pub mod any;
pub mod current;
pub mod immutable;
pub mod keyless;
pub mod operation;
pub mod store;
pub mod sync;
pub mod verify;
pub use verify::{
    create_multi_proof, create_proof_store, verify_multi_proof, verify_proof,
    verify_proof_and_extract_digests, verify_proof_and_pinned_nodes,
};

/// Errors that can occur when interacting with an authenticated database.
#[derive(Error, Debug)]
pub enum Error<F: Family> {
    #[error("data corrupted: {0}")]
    DataCorrupted(&'static str),

    #[error("merkle error: {0}")]
    Merkle(#[from] crate::merkle::Error<F>),

    #[error("metadata error: {0}")]
    Metadata(#[from] crate::metadata::Error),

    #[error("journal error: {0}")]
    Journal(#[from] crate::journal::Error),

    #[error("runtime error: {0}")]
    Runtime(#[from] commonware_runtime::Error),

    #[error("operation pruned: {0}")]
    OperationPruned(Location<F>),

    /// The requested key was not found in the snapshot.
    #[error("key not found")]
    KeyNotFound,

    /// The key exists in the db, so we cannot prove its exclusion.
    #[error("key exists")]
    KeyExists,

    #[error("unexpected data at location: {0}")]
    UnexpectedData(Location<F>),

    #[error("location out of bounds: {0} >= {1}")]
    LocationOutOfBounds(Location<F>, Location<F>),

    #[error("prune location {0} beyond minimum required location {1}")]
    PruneBeyondMinRequired(Location<F>, Location<F>),

    /// The batch was created from a different database state than the current one.
    #[error(
        "stale batch: db has {db_size} ops, batch requires {batch_db_size} or {batch_base_size}"
    )]
    StaleBatch {
        db_size: u64,
        batch_db_size: u64,
        batch_base_size: u64,
    },
}

impl<F: Family> From<crate::journal::authenticated::Error<F>> for Error<F> {
    fn from(e: crate::journal::authenticated::Error<F>) -> Self {
        match e {
            crate::journal::authenticated::Error::Journal(j) => Self::Journal(j),
            crate::journal::authenticated::Error::Merkle(m) => Self::Merkle(m),
        }
    }
}

/// The size of the read buffer to use for replaying the operations log when rebuilding the
/// snapshot.
const SNAPSHOT_READ_BUFFER_SIZE: NonZeroUsize = NZUsize!(1 << 16);

/// Builds the database's snapshot by replaying the log starting at the inactivity floor. Assumes
/// the log is not pruned beyond the inactivity floor. The callback is invoked for each replayed
/// operation, indicating activity status updates. The first argument of the callback is the
/// activity status of the operation, and the second argument is the location of the operation it
/// inactivates (if any). Returns the number of active keys in the db.
pub(super) async fn build_snapshot_from_log<F, C, I, Fn>(
    inactivity_floor_loc: crate::merkle::Location<F>,
    reader: &C,
    snapshot: &mut I,
    mut callback: Fn,
) -> Result<usize, Error<F>>
where
    F: crate::merkle::Family,
    C: Reader<Item: Operation<F>>,
    I: Index<Value = crate::merkle::Location<F>>,
    Fn: FnMut(bool, Option<crate::merkle::Location<F>>),
{
    let bounds = reader.bounds();
    let stream = reader
        .replay(SNAPSHOT_READ_BUFFER_SIZE, *inactivity_floor_loc)
        .await?;
    pin_mut!(stream);
    let last_commit_loc = bounds.end.saturating_sub(1);
    let mut active_keys: usize = 0;
    while let Some(result) = stream.next().await {
        let (loc, op) = result?;
        if let Some(key) = op.key() {
            if op.is_delete() {
                let old_loc = delete_key(snapshot, reader, key).await?;
                callback(false, old_loc);
                if old_loc.is_some() {
                    active_keys -= 1;
                }
            } else if op.is_update() {
                let new_loc = crate::merkle::Location::new(loc);
                let old_loc = update_key(snapshot, reader, key, new_loc).await?;
                callback(true, old_loc);
                if old_loc.is_none() {
                    active_keys += 1;
                }
            }
        } else if op.has_floor().is_some() {
            callback(loc == last_commit_loc, None);
        }
    }

    Ok(active_keys)
}

/// Delete `key` from the snapshot if it exists, using a stable log reader, and return the
/// previously associated location.
async fn delete_key<F, I, R>(
    snapshot: &mut I,
    reader: &R,
    key: &<R::Item as Operation<F>>::Key,
) -> Result<Option<Location<F>>, Error<F>>
where
    F: Family,
    I: Index<Value = Location<F>>,
    R: Reader,
    R::Item: Operation<F>,
{
    // If the translated key is in the snapshot, get a cursor to look for the key.
    let Some(mut cursor) = snapshot.get_mut(key) else {
        return Ok(None);
    };

    // Find the matching key among all conflicts, then delete it.
    let Some(loc) = find_update_op::<F, _>(reader, &mut cursor, key).await? else {
        return Ok(None);
    };
    cursor.delete();

    Ok(Some(loc))
}

/// Update `key` in the snapshot using a stable log reader, returning its old location if present.
async fn update_key<F, I, R>(
    snapshot: &mut I,
    reader: &R,
    key: &<R::Item as Operation<F>>::Key,
    new_loc: Location<F>,
) -> Result<Option<Location<F>>, Error<F>>
where
    F: Family,
    I: Index<Value = Location<F>>,
    R: Reader,
    R::Item: Operation<F>,
{
    // If the translated key is not in the snapshot, insert the new location. Otherwise, get a
    // cursor to look for the key.
    let Some(mut cursor) = snapshot.get_mut_or_insert(key, new_loc) else {
        return Ok(None);
    };

    // Find the matching key among all conflicts, then update its location.
    if let Some(loc) = find_update_op::<F, _>(reader, &mut cursor, key).await? {
        assert!(new_loc > loc);
        cursor.update(new_loc);
        return Ok(Some(loc));
    }

    // The key wasn't in the snapshot, so add it to the cursor.
    cursor.insert(new_loc);

    Ok(None)
}

/// Find and return the location of the update operation for `key`, if it exists. The cursor is
/// positioned at the matching location, and can be used to update or delete the key.
///
/// # Panics
///
/// Panics if `key` is not found in the snapshot or if `old_loc` is not found in the cursor.
async fn find_update_op<F, R>(
    reader: &R,
    cursor: &mut impl Cursor<Value = Location<F>>,
    key: &<R::Item as Operation<F>>::Key,
) -> Result<Option<Location<F>>, Error<F>>
where
    F: Family,
    R: Reader,
    R::Item: Operation<F>,
{
    while let Some(&loc) = cursor.next() {
        let op = reader.read(*loc).await?;
        let k = op.key().expect("operation without key");
        if *k == *key {
            return Ok(Some(loc));
        }
    }

    Ok(None)
}

/// For the given `key` which is known to exist in the snapshot with location `old_loc`, update
/// its location to `new_loc`.
///
/// # Panics
///
/// Panics if `key` is not found in the snapshot or if `old_loc` is not found in the cursor.
fn update_known_loc<F: Family, I: Index<Value = Location<F>>>(
    snapshot: &mut I,
    key: &[u8],
    old_loc: Location<F>,
    new_loc: Location<F>,
) {
    let mut cursor = snapshot.get_mut(key).expect("key should be known to exist");
    assert!(
        cursor.find(|&loc| *loc == old_loc),
        "known key with given old_loc should have been found"
    );
    cursor.update(new_loc);
}

/// For the given `key` which is known to exist in the snapshot with location `old_loc`, delete
/// it from the snapshot.
///
/// # Panics
///
/// Panics if `key` is not found in the snapshot or if `old_loc` is not found in the cursor.
fn delete_known_loc<F: Family, I: Index<Value = Location<F>>>(
    snapshot: &mut I,
    key: &[u8],
    old_loc: Location<F>,
) {
    let mut cursor = snapshot.get_mut(key).expect("key should be known to exist");
    assert!(
        cursor.find(|&loc| *loc == old_loc),
        "known key with given old_loc should have been found"
    );
    cursor.delete();
}

/// A wrapper of DB state required for implementing inactivity floor management.
pub(crate) struct FloorHelper<
    'a,
    F: Family,
    I: Index<Value = Location<F>>,
    C: Mutable<Item: Operation<F>>,
> {
    pub snapshot: &'a mut I,
    pub log: &'a mut C,
}

impl<F, I, C> FloorHelper<'_, F, I, C>
where
    F: Family,
    I: Index<Value = Location<F>>,
    C: Mutable<Item: Operation<F>>,
{
    /// Moves the given operation to the tip of the log if it is active, rendering its old location
    /// inactive. If the operation was not active, then this is a no-op. Returns whether the
    /// operation was moved.
    async fn move_op_if_active(
        &mut self,
        op: C::Item,
        old_loc: Location<F>,
    ) -> Result<bool, Error<F>> {
        let Some(key) = op.key() else {
            return Ok(false); // operations without keys cannot be active
        };

        // If we find a snapshot entry corresponding to the operation, we know it's active.
        {
            let Some(mut cursor) = self.snapshot.get_mut(key) else {
                return Ok(false);
            };
            if !cursor.find(|&loc| loc == old_loc) {
                return Ok(false);
            }

            // Update the operation's snapshot location to point to tip.
            cursor.update(Location::<F>::new(self.log.size().await));
        }

        // Apply the operation at tip.
        self.log.append(&op).await?;

        Ok(true)
    }

    /// Raise the inactivity floor by taking one _step_, which involves searching for the first
    /// active operation above the inactivity floor, moving it to tip, and then setting the
    /// inactivity floor to the location following the moved operation. This method is therefore
    /// guaranteed to raise the floor by at least one. Returns the new inactivity floor location.
    ///
    /// # Panics
    ///
    /// Expects there is at least one active operation above the inactivity floor, and panics
    /// otherwise.
    async fn raise_floor(
        &mut self,
        mut inactivity_floor_loc: Location<F>,
    ) -> Result<Location<F>, Error<F>> {
        let tip_loc: Location<F> = Location::new(self.log.size().await);
        loop {
            assert!(
                *inactivity_floor_loc < tip_loc,
                "no active operations above the inactivity floor"
            );
            let old_loc = inactivity_floor_loc;
            inactivity_floor_loc += 1;
            let op = {
                let reader = self.log.reader().await;
                reader.read(*old_loc).await?
            };
            if self.move_op_if_active(op, old_loc).await? {
                return Ok(inactivity_floor_loc);
            }
        }
    }
}