Skip to main content

commonware_storage/qmdb/
mod.rs

1//! A collection of authenticated databases inspired by QMDB (Quick Merkle Database).
2//!
3//! # Terminology
4//!
5//! A database's state is derived from an append-only log of state-changing _operations_.
6//!
7//! In a _keyed_ database, a _key_ either has a _value_ or it doesn't, and different types of
8//! operations modify the state of a specific key. A key that has a value can change to one without
9//! a value through the _delete_ operation. The _update_ operation gives a key a specific value. We
10//! sometimes call an update for a key that doesn't already have a value a _create_ operation, but
11//! its representation in the log is the same.
12//!
13//! Keys with values are called _active_. An operation is called _active_ if (1) its key is active,
14//! (2) it is an update operation, and (3) it is the most recent operation for that key.
15//!
16//! # Database Lifecycle
17//!
18//! All variants are modified through a batch API that follows a common pattern:
19//! 1. Create a batch from the database.
20//! 2. Stage mutations on the batch.
21//! 3. Merkleize the batch -- this resolves mutations against the current state and computes
22//!    the Merkle root that would result from applying them.
23//! 4. Inspect the root or create child batches.
24//! 5. Apply the batch to the database (uncommitted ancestors are applied automatically).
25//!
26//! The specific mutation methods vary by variant.
27//! See each variant's module documentation for the concrete API and usage examples.
28//!
29//! Persistence and cleanup are managed directly on the database: `sync()`, `prune()`,
30//! and `destroy()`.
31//!
32//! # Traits
33//!
34//! Keyed mutable variants ([any] and [current]) implement `any::traits::DbAny` and
35//! [crate::Persistable].
36//!
37//! # Acknowledgments
38//!
39//! The following resources were used as references when implementing this crate:
40//!
41//! * [QMDB: Quick Merkle Database](https://arxiv.org/abs/2501.05262)
42//! * [Merkle Mountain
43//!   Ranges](https://github.com/opentimestamps/opentimestamps-server/blob/master/doc/merkle-mountain-range.md)
44
45use crate::{
46    index::{Cursor, Unordered as Index},
47    journal::contiguous::{Mutable, Reader},
48    merkle::{Family, Location},
49    qmdb::operation::Operation,
50};
51use commonware_utils::NZUsize;
52use core::num::NonZeroUsize;
53use futures::{pin_mut, StreamExt as _};
54use thiserror::Error;
55
56pub mod any;
57pub mod current;
58pub mod immutable;
59pub mod keyless;
60pub mod operation;
61pub mod store;
62pub mod sync;
63pub mod verify;
64pub use verify::{
65    create_multi_proof, create_proof_store, verify_multi_proof, verify_proof,
66    verify_proof_and_extract_digests, verify_proof_and_pinned_nodes,
67};
68
69/// Errors that can occur when interacting with an authenticated database.
70#[derive(Error, Debug)]
71pub enum Error<F: Family> {
72    #[error("data corrupted: {0}")]
73    DataCorrupted(&'static str),
74
75    #[error("merkle error: {0}")]
76    Merkle(#[from] crate::merkle::Error<F>),
77
78    #[error("metadata error: {0}")]
79    Metadata(#[from] crate::metadata::Error),
80
81    #[error("journal error: {0}")]
82    Journal(#[from] crate::journal::Error),
83
84    #[error("runtime error: {0}")]
85    Runtime(#[from] commonware_runtime::Error),
86
87    #[error("operation pruned: {0}")]
88    OperationPruned(Location<F>),
89
90    /// The requested key was not found in the snapshot.
91    #[error("key not found")]
92    KeyNotFound,
93
94    /// The key exists in the db, so we cannot prove its exclusion.
95    #[error("key exists")]
96    KeyExists,
97
98    #[error("unexpected data at location: {0}")]
99    UnexpectedData(Location<F>),
100
101    #[error("location out of bounds: {0} >= {1}")]
102    LocationOutOfBounds(Location<F>, Location<F>),
103
104    #[error("prune location {0} beyond minimum required location {1}")]
105    PruneBeyondMinRequired(Location<F>, Location<F>),
106
107    /// The batch was created from a different database state than the current one.
108    #[error(
109        "stale batch: db has {db_size} ops, batch requires {batch_db_size} or {batch_base_size}"
110    )]
111    StaleBatch {
112        db_size: u64,
113        batch_db_size: u64,
114        batch_base_size: u64,
115    },
116}
117
118impl<F: Family> From<crate::journal::authenticated::Error<F>> for Error<F> {
119    fn from(e: crate::journal::authenticated::Error<F>) -> Self {
120        match e {
121            crate::journal::authenticated::Error::Journal(j) => Self::Journal(j),
122            crate::journal::authenticated::Error::Merkle(m) => Self::Merkle(m),
123        }
124    }
125}
126
127/// The size of the read buffer to use for replaying the operations log when rebuilding the
128/// snapshot.
129const SNAPSHOT_READ_BUFFER_SIZE: NonZeroUsize = NZUsize!(1 << 16);
130
131/// Builds the database's snapshot by replaying the log starting at the inactivity floor. Assumes
132/// the log is not pruned beyond the inactivity floor. The callback is invoked for each replayed
133/// operation, indicating activity status updates. The first argument of the callback is the
134/// activity status of the operation, and the second argument is the location of the operation it
135/// inactivates (if any). Returns the number of active keys in the db.
136pub(super) async fn build_snapshot_from_log<F, C, I, Fn>(
137    inactivity_floor_loc: crate::merkle::Location<F>,
138    reader: &C,
139    snapshot: &mut I,
140    mut callback: Fn,
141) -> Result<usize, Error<F>>
142where
143    F: crate::merkle::Family,
144    C: Reader<Item: Operation<F>>,
145    I: Index<Value = crate::merkle::Location<F>>,
146    Fn: FnMut(bool, Option<crate::merkle::Location<F>>),
147{
148    let bounds = reader.bounds();
149    let stream = reader
150        .replay(SNAPSHOT_READ_BUFFER_SIZE, *inactivity_floor_loc)
151        .await?;
152    pin_mut!(stream);
153    let last_commit_loc = bounds.end.saturating_sub(1);
154    let mut active_keys: usize = 0;
155    while let Some(result) = stream.next().await {
156        let (loc, op) = result?;
157        if let Some(key) = op.key() {
158            if op.is_delete() {
159                let old_loc = delete_key(snapshot, reader, key).await?;
160                callback(false, old_loc);
161                if old_loc.is_some() {
162                    active_keys -= 1;
163                }
164            } else if op.is_update() {
165                let new_loc = crate::merkle::Location::new(loc);
166                let old_loc = update_key(snapshot, reader, key, new_loc).await?;
167                callback(true, old_loc);
168                if old_loc.is_none() {
169                    active_keys += 1;
170                }
171            }
172        } else if op.has_floor().is_some() {
173            callback(loc == last_commit_loc, None);
174        }
175    }
176
177    Ok(active_keys)
178}
179
180/// Delete `key` from the snapshot if it exists, using a stable log reader, and return the
181/// previously associated location.
182async fn delete_key<F, I, R>(
183    snapshot: &mut I,
184    reader: &R,
185    key: &<R::Item as Operation<F>>::Key,
186) -> Result<Option<Location<F>>, Error<F>>
187where
188    F: Family,
189    I: Index<Value = Location<F>>,
190    R: Reader,
191    R::Item: Operation<F>,
192{
193    // If the translated key is in the snapshot, get a cursor to look for the key.
194    let Some(mut cursor) = snapshot.get_mut(key) else {
195        return Ok(None);
196    };
197
198    // Find the matching key among all conflicts, then delete it.
199    let Some(loc) = find_update_op::<F, _>(reader, &mut cursor, key).await? else {
200        return Ok(None);
201    };
202    cursor.delete();
203
204    Ok(Some(loc))
205}
206
207/// Update `key` in the snapshot using a stable log reader, returning its old location if present.
208async fn update_key<F, I, R>(
209    snapshot: &mut I,
210    reader: &R,
211    key: &<R::Item as Operation<F>>::Key,
212    new_loc: Location<F>,
213) -> Result<Option<Location<F>>, Error<F>>
214where
215    F: Family,
216    I: Index<Value = Location<F>>,
217    R: Reader,
218    R::Item: Operation<F>,
219{
220    // If the translated key is not in the snapshot, insert the new location. Otherwise, get a
221    // cursor to look for the key.
222    let Some(mut cursor) = snapshot.get_mut_or_insert(key, new_loc) else {
223        return Ok(None);
224    };
225
226    // Find the matching key among all conflicts, then update its location.
227    if let Some(loc) = find_update_op::<F, _>(reader, &mut cursor, key).await? {
228        assert!(new_loc > loc);
229        cursor.update(new_loc);
230        return Ok(Some(loc));
231    }
232
233    // The key wasn't in the snapshot, so add it to the cursor.
234    cursor.insert(new_loc);
235
236    Ok(None)
237}
238
239/// Find and return the location of the update operation for `key`, if it exists. The cursor is
240/// positioned at the matching location, and can be used to update or delete the key.
241///
242/// # Panics
243///
244/// Panics if `key` is not found in the snapshot or if `old_loc` is not found in the cursor.
245async fn find_update_op<F, R>(
246    reader: &R,
247    cursor: &mut impl Cursor<Value = Location<F>>,
248    key: &<R::Item as Operation<F>>::Key,
249) -> Result<Option<Location<F>>, Error<F>>
250where
251    F: Family,
252    R: Reader,
253    R::Item: Operation<F>,
254{
255    while let Some(&loc) = cursor.next() {
256        let op = reader.read(*loc).await?;
257        let k = op.key().expect("operation without key");
258        if *k == *key {
259            return Ok(Some(loc));
260        }
261    }
262
263    Ok(None)
264}
265
266/// For the given `key` which is known to exist in the snapshot with location `old_loc`, update
267/// its location to `new_loc`.
268///
269/// # Panics
270///
271/// Panics if `key` is not found in the snapshot or if `old_loc` is not found in the cursor.
272fn update_known_loc<F: Family, I: Index<Value = Location<F>>>(
273    snapshot: &mut I,
274    key: &[u8],
275    old_loc: Location<F>,
276    new_loc: Location<F>,
277) {
278    let mut cursor = snapshot.get_mut(key).expect("key should be known to exist");
279    assert!(
280        cursor.find(|&loc| *loc == old_loc),
281        "known key with given old_loc should have been found"
282    );
283    cursor.update(new_loc);
284}
285
286/// For the given `key` which is known to exist in the snapshot with location `old_loc`, delete
287/// it from the snapshot.
288///
289/// # Panics
290///
291/// Panics if `key` is not found in the snapshot or if `old_loc` is not found in the cursor.
292fn delete_known_loc<F: Family, I: Index<Value = Location<F>>>(
293    snapshot: &mut I,
294    key: &[u8],
295    old_loc: Location<F>,
296) {
297    let mut cursor = snapshot.get_mut(key).expect("key should be known to exist");
298    assert!(
299        cursor.find(|&loc| *loc == old_loc),
300        "known key with given old_loc should have been found"
301    );
302    cursor.delete();
303}
304
305/// A wrapper of DB state required for implementing inactivity floor management.
306pub(crate) struct FloorHelper<
307    'a,
308    F: Family,
309    I: Index<Value = Location<F>>,
310    C: Mutable<Item: Operation<F>>,
311> {
312    pub snapshot: &'a mut I,
313    pub log: &'a mut C,
314}
315
316impl<F, I, C> FloorHelper<'_, F, I, C>
317where
318    F: Family,
319    I: Index<Value = Location<F>>,
320    C: Mutable<Item: Operation<F>>,
321{
322    /// Moves the given operation to the tip of the log if it is active, rendering its old location
323    /// inactive. If the operation was not active, then this is a no-op. Returns whether the
324    /// operation was moved.
325    async fn move_op_if_active(
326        &mut self,
327        op: C::Item,
328        old_loc: Location<F>,
329    ) -> Result<bool, Error<F>> {
330        let Some(key) = op.key() else {
331            return Ok(false); // operations without keys cannot be active
332        };
333
334        // If we find a snapshot entry corresponding to the operation, we know it's active.
335        {
336            let Some(mut cursor) = self.snapshot.get_mut(key) else {
337                return Ok(false);
338            };
339            if !cursor.find(|&loc| loc == old_loc) {
340                return Ok(false);
341            }
342
343            // Update the operation's snapshot location to point to tip.
344            cursor.update(Location::<F>::new(self.log.size().await));
345        }
346
347        // Apply the operation at tip.
348        self.log.append(&op).await?;
349
350        Ok(true)
351    }
352
353    /// Raise the inactivity floor by taking one _step_, which involves searching for the first
354    /// active operation above the inactivity floor, moving it to tip, and then setting the
355    /// inactivity floor to the location following the moved operation. This method is therefore
356    /// guaranteed to raise the floor by at least one. Returns the new inactivity floor location.
357    ///
358    /// # Panics
359    ///
360    /// Expects there is at least one active operation above the inactivity floor, and panics
361    /// otherwise.
362    async fn raise_floor(
363        &mut self,
364        mut inactivity_floor_loc: Location<F>,
365    ) -> Result<Location<F>, Error<F>> {
366        let tip_loc: Location<F> = Location::new(self.log.size().await);
367        loop {
368            assert!(
369                *inactivity_floor_loc < tip_loc,
370                "no active operations above the inactivity floor"
371            );
372            let old_loc = inactivity_floor_loc;
373            inactivity_floor_loc += 1;
374            let op = {
375                let reader = self.log.reader().await;
376                reader.read(*old_loc).await?
377            };
378            if self.move_op_if_active(op, old_loc).await? {
379                return Ok(inactivity_floor_loc);
380            }
381        }
382    }
383}