Skip to main content

commonware_storage/qmdb/
mod.rs

1//! A collection of authenticated databases inspired by QMDB (Quick Merkle Database).
2//!
3//! # Terminology
4//!
5//! A database's state is derived from an append-only log of state-changing _operations_.
6//!
7//! In a _keyed_ database, a _key_ either has a _value_ or it doesn't, and different types of
8//! operations modify the state of a specific key. A key that has a value can change to one without
9//! a value through the _delete_ operation. The _update_ operation gives a key a specific value. We
10//! sometimes call an update for a key that doesn't already have a value a _create_ operation, but
11//! its representation in the log is the same.
12//!
13//! Keys with values are called _active_. An operation is called _active_ if (1) its key is active,
14//! (2) it is an update operation, and (3) it is the most recent operation for that key.
15//!
16//! # Database Lifecycle
17//!
18//! All variants are modified through a batch API that follows a common pattern:
19//! 1. Create a batch from the database.
20//! 2. Stage mutations on the batch.
21//! 3. Merkleize the batch -- this resolves mutations against the current state and computes
22//!    the Merkle root that would result from applying them.
23//! 4. Inspect the root or create child batches.
24//! 5. Apply the batch to the database (uncommitted ancestors are applied automatically).
25//!
26//! The specific mutation methods vary by variant.
27//! See each variant's module documentation for the concrete API and usage examples.
28//!
29//! Persistence and cleanup are managed directly on the database: `sync()`, `prune()`,
30//! and `destroy()`.
31//!
32//! # Traits
33//!
34//! Keyed mutable variants ([any] and [current]) implement `any::traits::DbAny` and
35//! [crate::Persistable].
36//!
37//! # Acknowledgments
38//!
39//! The following resources were used as references when implementing this crate:
40//!
41//! * [QMDB: Quick Merkle Database](https://arxiv.org/abs/2501.05262)
42//! * [Merkle Mountain
43//!   Ranges](https://github.com/opentimestamps/opentimestamps-server/blob/master/doc/merkle-mountain-range.md)
44
45use crate::{
46    index::{Cursor, Unordered as Index},
47    journal::{
48        contiguous::{Mutable, Reader},
49        Error as JournalError,
50    },
51    merkle::{hasher::Standard as StandardHasher, Bagging, Family, Location},
52    qmdb::operation::Operation,
53};
54use commonware_cryptography::Hasher as CryptoHasher;
55use commonware_utils::NZUsize;
56use core::num::NonZeroUsize;
57use futures::{pin_mut, StreamExt as _};
58use thiserror::Error;
59
60pub mod any;
61pub mod batch_chain;
62pub(crate) mod bitmap;
63pub(crate) mod compact;
64#[cfg(test)]
65mod conformance;
66pub mod current;
67pub mod immutable;
68pub mod keyless;
69mod metrics;
70pub mod operation;
71pub mod store;
72pub mod sync;
73pub mod verify;
74
75pub use verify::{
76    create_multi_proof, create_proof_store, verify_multi_proof, verify_proof,
77    verify_proof_and_extract_digests, verify_proof_and_pinned_nodes,
78};
79
80/// Merkle peak bagging policy used by QMDB operation roots.
81pub(crate) const ROOT_BAGGING: Bagging = Bagging::BackwardFold;
82
83/// Return the Merkle hasher configuration used by QMDB operation roots and proofs.
84pub const fn hasher<H: CryptoHasher>() -> StandardHasher<H> {
85    StandardHasher::new(ROOT_BAGGING)
86}
87
88/// Look up the inactivity floor declared at the commit immediately preceding `op_count`.
89///
90/// `op_count` must be a non-zero commit-boundary historical size: the operation at `op_count - 1`
91/// must itself be a commit op (one for which `floor_of` returns `Some`).
92///
93/// # Errors
94///
95/// - [`Error::HistoricalFloorPruned`] if `op_count` is zero (no preceding commit exists), or if
96///   `op_count - 1` is retained but is not a commit op (either because the caller passed a
97///   non-commit-boundary size, or because pruning removed the commit that would have governed this
98///   size).
99/// - [`JournalError::ItemPruned`] if `op_count - 1` precedes the oldest retained location.
100pub(crate) async fn find_inactivity_floor_at<F, R>(
101    reader: &R,
102    op_count: Location<F>,
103    floor_of: impl Fn(&R::Item) -> Option<Location<F>>,
104) -> Result<Location<F>, Error<F>>
105where
106    F: Family,
107    R: Reader,
108{
109    let Some(last_op) = op_count.checked_sub(1) else {
110        return Err(Error::HistoricalFloorPruned(op_count));
111    };
112    let last_op = *last_op;
113    let bounds = reader.bounds();
114    if last_op < bounds.start {
115        return Err(JournalError::ItemPruned(last_op).into());
116    }
117
118    let op = reader.read(last_op).await?;
119    let floor = floor_of(&op).ok_or(Error::HistoricalFloorPruned(op_count))?;
120    if floor > Location::new(last_op) {
121        return Err(Error::DataCorrupted(
122            "inactivity floor exceeds commit location",
123        ));
124    }
125    Ok(floor)
126}
127
128/// Compute the inactive peak count for a historical operation count.
129pub(crate) async fn inactive_peaks_at<F, R>(
130    reader: &R,
131    op_count: Location<F>,
132    floor_of: impl Fn(&R::Item) -> Option<Location<F>>,
133) -> Result<usize, Error<F>>
134where
135    F: Family,
136    R: Reader,
137{
138    if op_count == Location::new(0) {
139        return Ok(0);
140    }
141
142    let floor = find_inactivity_floor_at::<F, _>(reader, op_count, floor_of).await?;
143    Ok(F::inactive_peaks(F::location_to_position(op_count), floor))
144}
145
146/// Errors that can occur when interacting with an authenticated database.
147#[derive(Error, Debug)]
148pub enum Error<F: Family> {
149    #[error("data corrupted: {0}")]
150    DataCorrupted(&'static str),
151
152    #[error("merkle error: {0}")]
153    Merkle(#[from] crate::merkle::Error<F>),
154
155    #[error("metadata error: {0}")]
156    Metadata(#[from] crate::metadata::Error),
157
158    #[error("journal error: {0}")]
159    Journal(#[from] crate::journal::Error),
160
161    #[error("runtime error: {0}")]
162    Runtime(#[from] commonware_runtime::Error),
163
164    #[error("operation pruned: {0}")]
165    OperationPruned(Location<F>),
166
167    /// The requested key was not found in the snapshot.
168    #[error("key not found")]
169    KeyNotFound,
170
171    /// The key exists in the db, so we cannot prove its exclusion.
172    #[error("key exists")]
173    KeyExists,
174
175    #[error("unexpected data at location: {0}")]
176    UnexpectedData(Location<F>),
177
178    #[error("location out of bounds: {0} >= {1}")]
179    LocationOutOfBounds(Location<F>, Location<F>),
180
181    #[error("prune location {0} beyond minimum required location {1}")]
182    PruneBeyondMinRequired(Location<F>, Location<F>),
183
184    /// The batch was created from a different database state than the current one.
185    #[error(
186        "stale batch: db has {db_size} ops, batch requires {batch_db_size}, {batch_base_size}, or an ancestor boundary"
187    )]
188    StaleBatch {
189        db_size: u64,
190        batch_db_size: u64,
191        batch_base_size: u64,
192    },
193
194    /// The batch's inactivity floor is lower than the database's current floor.
195    #[error("floor regressed: batch floor {0} < current floor {1}")]
196    FloorRegressed(Location<F>, Location<F>),
197
198    /// The batch's inactivity floor exceeds its own commit operation's location. The floor
199    /// must not sit past the commit, since a subsequent `prune(floor)` would then remove the
200    /// last readable commit from the journal.
201    #[error("floor beyond commit location: floor {0} > commit loc {1}")]
202    FloorBeyondSize(Location<F>, Location<F>),
203
204    /// The inactivity floor that governed the requested `historical_size` is not retrievable from
205    /// the journal, so the wrapper cannot derive the `inactive_peaks` count needed to construct a
206    /// proof matching the historical root.
207    ///
208    /// Historical proofs require `historical_size` to be a commit-boundary: the operation at
209    /// `historical_size - 1` must itself be a commit op declaring the governing floor. This error
210    /// fires when the caller passes a non-commit-boundary size, or when pruning has removed the
211    /// commit that would have governed the size.
212    #[error("historical floor pruned for size: {0}")]
213    HistoricalFloorPruned(Location<F>),
214}
215
216impl<F: Family> From<crate::journal::authenticated::Error<F>> for Error<F> {
217    fn from(e: crate::journal::authenticated::Error<F>) -> Self {
218        match e {
219            crate::journal::authenticated::Error::Journal(j) => Self::Journal(j),
220            crate::journal::authenticated::Error::Merkle(m) => Self::Merkle(m),
221        }
222    }
223}
224
225/// The size of the read buffer to use for replaying the operations log when rebuilding the
226/// snapshot.
227const SNAPSHOT_READ_BUFFER_SIZE: NonZeroUsize = NZUsize!(1 << 16);
228
229/// Builds the database's snapshot by replaying the log starting at the inactivity floor. Assumes
230/// the log is not pruned beyond the inactivity floor. The callback is invoked for each replayed
231/// operation, indicating activity status updates. The first argument of the callback is the
232/// activity status of the operation, and the second argument is the location of the operation it
233/// inactivates (if any). Returns the number of active keys in the db.
234pub(super) async fn build_snapshot_from_log<F, C, I, Fn>(
235    inactivity_floor_loc: crate::merkle::Location<F>,
236    reader: &C,
237    snapshot: &mut I,
238    mut callback: Fn,
239) -> Result<usize, Error<F>>
240where
241    F: crate::merkle::Family,
242    C: Reader<Item: Operation<F>>,
243    I: Index<Value = crate::merkle::Location<F>>,
244    Fn: FnMut(bool, Option<crate::merkle::Location<F>>),
245{
246    let bounds = reader.bounds();
247    let stream = reader
248        .replay(SNAPSHOT_READ_BUFFER_SIZE, *inactivity_floor_loc)
249        .await?;
250    pin_mut!(stream);
251    let last_commit_loc = bounds.end.saturating_sub(1);
252    let mut active_keys: usize = 0;
253    while let Some(result) = stream.next().await {
254        let (loc, op) = result?;
255        if let Some(key) = op.key() {
256            if op.is_delete() {
257                let old_loc = delete_key(snapshot, reader, key).await?;
258                callback(false, old_loc);
259                if old_loc.is_some() {
260                    active_keys -= 1;
261                }
262            } else if op.is_update() {
263                let new_loc = crate::merkle::Location::new(loc);
264                let old_loc = update_key(snapshot, reader, key, new_loc).await?;
265                callback(true, old_loc);
266                if old_loc.is_none() {
267                    active_keys += 1;
268                }
269            }
270        } else if op.has_floor().is_some() {
271            callback(loc == last_commit_loc, None);
272        }
273    }
274
275    Ok(active_keys)
276}
277
278/// Delete `key` from the snapshot if it exists, using a stable log reader, and return the
279/// previously associated location.
280async fn delete_key<F, I, R>(
281    snapshot: &mut I,
282    reader: &R,
283    key: &<R::Item as Operation<F>>::Key,
284) -> Result<Option<Location<F>>, Error<F>>
285where
286    F: Family,
287    I: Index<Value = Location<F>>,
288    R: Reader,
289    R::Item: Operation<F>,
290{
291    // If the translated key is in the snapshot, get a cursor to look for the key.
292    let Some(mut cursor) = snapshot.get_mut(key) else {
293        return Ok(None);
294    };
295
296    // Find the matching key among all conflicts, then delete it.
297    let Some(loc) = find_update_op::<F, _>(reader, &mut cursor, key).await? else {
298        return Ok(None);
299    };
300    cursor.delete();
301
302    Ok(Some(loc))
303}
304
305/// Update `key` in the snapshot using a stable log reader, returning its old location if present.
306async fn update_key<F, I, R>(
307    snapshot: &mut I,
308    reader: &R,
309    key: &<R::Item as Operation<F>>::Key,
310    new_loc: Location<F>,
311) -> Result<Option<Location<F>>, Error<F>>
312where
313    F: Family,
314    I: Index<Value = Location<F>>,
315    R: Reader,
316    R::Item: Operation<F>,
317{
318    // If the translated key is not in the snapshot, insert the new location. Otherwise, get a
319    // cursor to look for the key.
320    let Some(mut cursor) = snapshot.get_mut_or_insert(key, new_loc) else {
321        return Ok(None);
322    };
323
324    // Find the matching key among all conflicts, then update its location.
325    if let Some(loc) = find_update_op::<F, _>(reader, &mut cursor, key).await? {
326        assert!(new_loc > loc);
327        cursor.update(new_loc);
328        return Ok(Some(loc));
329    }
330
331    // The key wasn't in the snapshot, so add it to the cursor.
332    cursor.insert(new_loc);
333
334    Ok(None)
335}
336
337/// Find and return the location of the update operation for `key`, if it exists. The cursor is
338/// positioned at the matching location, and can be used to update or delete the key.
339///
340/// # Panics
341///
342/// Panics if `key` is not found in the snapshot or if `old_loc` is not found in the cursor.
343async fn find_update_op<F, R>(
344    reader: &R,
345    cursor: &mut impl Cursor<Value = Location<F>>,
346    key: &<R::Item as Operation<F>>::Key,
347) -> Result<Option<Location<F>>, Error<F>>
348where
349    F: Family,
350    R: Reader,
351    R::Item: Operation<F>,
352{
353    while let Some(&loc) = cursor.next() {
354        let op = reader.read(*loc).await?;
355        let k = op.key().expect("operation without key");
356        if *k == *key {
357            return Ok(Some(loc));
358        }
359    }
360
361    Ok(None)
362}
363
364/// For the given `key` which is known to exist in the snapshot with location `old_loc`, update
365/// its location to `new_loc`.
366///
367/// # Panics
368///
369/// Panics if `key` is not found in the snapshot or if `old_loc` is not found in the cursor.
370fn update_known_loc<F: Family, I: Index<Value = Location<F>>>(
371    snapshot: &mut I,
372    key: &[u8],
373    old_loc: Location<F>,
374    new_loc: Location<F>,
375) {
376    let mut cursor = snapshot.get_mut(key).expect("key should be known to exist");
377    assert!(
378        cursor.find(|&loc| *loc == old_loc),
379        "known key with given old_loc should have been found"
380    );
381    cursor.update(new_loc);
382}
383
384/// For the given `key` which is known to exist in the snapshot with location `old_loc`, delete
385/// it from the snapshot.
386///
387/// # Panics
388///
389/// Panics if `key` is not found in the snapshot or if `old_loc` is not found in the cursor.
390fn delete_known_loc<F: Family, I: Index<Value = Location<F>>>(
391    snapshot: &mut I,
392    key: &[u8],
393    old_loc: Location<F>,
394) {
395    let mut cursor = snapshot.get_mut(key).expect("key should be known to exist");
396    assert!(
397        cursor.find(|&loc| *loc == old_loc),
398        "known key with given old_loc should have been found"
399    );
400    cursor.delete();
401}
402
403/// A wrapper of DB state required for implementing inactivity floor management.
404pub(crate) struct FloorHelper<
405    'a,
406    F: Family,
407    I: Index<Value = Location<F>>,
408    C: Mutable<Item: Operation<F>>,
409> {
410    pub snapshot: &'a mut I,
411    pub log: &'a mut C,
412}
413
414impl<F, I, C> FloorHelper<'_, F, I, C>
415where
416    F: Family,
417    I: Index<Value = Location<F>>,
418    C: Mutable<Item: Operation<F>>,
419{
420    /// Moves the given operation to the tip of the log if it is active, rendering its old location
421    /// inactive. If the operation was not active, then this is a no-op. Returns whether the
422    /// operation was moved.
423    async fn move_op_if_active(
424        &mut self,
425        op: C::Item,
426        old_loc: Location<F>,
427    ) -> Result<bool, Error<F>> {
428        let Some(key) = op.key() else {
429            return Ok(false); // operations without keys cannot be active
430        };
431
432        // If we find a snapshot entry corresponding to the operation, we know it's active.
433        {
434            let Some(mut cursor) = self.snapshot.get_mut(key) else {
435                return Ok(false);
436            };
437            if !cursor.find(|&loc| loc == old_loc) {
438                return Ok(false);
439            }
440
441            // Update the operation's snapshot location to point to tip.
442            cursor.update(Location::<F>::new(self.log.size().await));
443        }
444
445        // Apply the operation at tip.
446        self.log.append(&op).await?;
447
448        Ok(true)
449    }
450
451    /// Raise the inactivity floor by taking one _step_, which involves searching for the first
452    /// active operation above the inactivity floor, moving it to tip, and then setting the
453    /// inactivity floor to the location following the moved operation. This method is therefore
454    /// guaranteed to raise the floor by at least one. Returns the new inactivity floor location.
455    ///
456    /// # Panics
457    ///
458    /// Expects there is at least one active operation above the inactivity floor, and panics
459    /// otherwise.
460    async fn raise_floor(
461        &mut self,
462        mut inactivity_floor_loc: Location<F>,
463    ) -> Result<Location<F>, Error<F>> {
464        let tip_loc: Location<F> = Location::new(self.log.size().await);
465        loop {
466            assert!(
467                *inactivity_floor_loc < tip_loc,
468                "no active operations above the inactivity floor"
469            );
470            let old_loc = inactivity_floor_loc;
471            inactivity_floor_loc += 1;
472            let op = {
473                let reader = self.log.reader().await;
474                reader.read(*old_loc).await?
475            };
476            if self.move_op_if_active(op, old_loc).await? {
477                return Ok(inactivity_floor_loc);
478            }
479        }
480    }
481}