Skip to main content

commonware_storage/qmdb/
mod.rs

1//! A collection of authenticated databases inspired by QMDB (Quick Merkle Database).
2//!
3//! # Terminology
4//!
5//! A database's state is derived from an append-only log of state-changing _operations_.
6//!
7//! In a _keyed_ database, a _key_ either has a _value_ or it doesn't, and different types of
8//! operations modify the state of a specific key. A key that has a value can change to one without
9//! a value through the _delete_ operation. The _update_ operation gives a key a specific value. We
10//! sometimes call an update for a key that doesn't already have a value a _create_ operation, but
11//! its representation in the log is the same.
12//!
13//! Keys with values are called _active_. An operation is called _active_ if (1) its key is active,
14//! (2) it is an update operation, and (3) it is the most recent operation for that key.
15//!
16//! # Database Lifecycle
17//!
18//! All variants are modified through a batch API that follows a common pattern:
19//! 1. Create a batch from the database.
20//! 2. Stage mutations on the batch.
21//! 3. Merkleize the batch -- this resolves mutations against the current state and computes
22//!    the Merkle root that would result from applying them.
23//! 4. Inspect the root or create child batches.
24//! 5. Finalize the batch into a changeset.
25//! 6. Apply the changeset to the database.
26//!
27//! A merkleized batch can spawn child batches, forming a tree of speculative states that
28//! share a common ancestor. Only the finalized leaf needs to be applied.
29//!
30//! The specific mutation methods vary by variant.
31//! See each variant's module documentation for the concrete API and usage examples.
32//!
33//! Persistence and cleanup are managed directly on the database: `sync()`, `prune()`,
34//! and `destroy()`.
35//!
36//! # Traits
37//!
38//! All variants implement [store::LogStore] and [store::MerkleizedStore]. Keyed mutable
39//! variants ([any] and [current]) also implement [store::PrunableStore] and
40//! [crate::Persistable]. The [immutable] variant implements [crate::kv::Gettable].
41//!
42//! # Acknowledgments
43//!
44//! The following resources were used as references when implementing this crate:
45//!
46//! * [QMDB: Quick Merkle Database](https://arxiv.org/abs/2501.05262)
47//! * [Merkle Mountain
48//!   Ranges](https://github.com/opentimestamps/opentimestamps-server/blob/master/doc/merkle-mountain-range.md)
49
50use crate::{
51    index::{Cursor, Unordered as Index},
52    journal::contiguous::{Mutable, Reader},
53    mmr::Location,
54    qmdb::operation::Operation,
55};
56use commonware_utils::NZUsize;
57use core::num::NonZeroUsize;
58use futures::{pin_mut, StreamExt as _};
59use thiserror::Error;
60
61pub mod any;
62pub mod current;
63pub mod immutable;
64pub mod keyless;
65pub mod operation;
66pub mod store;
67pub mod sync;
68pub mod verify;
69pub use verify::{
70    create_multi_proof, create_proof_store, create_proof_store_from_digests, extract_pinned_nodes,
71    verify_multi_proof, verify_proof, verify_proof_and_extract_digests,
72};
73
74/// Errors that can occur when interacting with an authenticated database.
75#[derive(Error, Debug)]
76pub enum Error {
77    #[error("data corrupted: {0}")]
78    DataCorrupted(&'static str),
79
80    #[error("mmr error: {0}")]
81    Mmr(#[from] crate::mmr::Error),
82
83    #[error("metadata error: {0}")]
84    Metadata(#[from] crate::metadata::Error),
85
86    #[error("journal error: {0}")]
87    Journal(#[from] crate::journal::Error),
88
89    #[error("runtime error: {0}")]
90    Runtime(#[from] commonware_runtime::Error),
91
92    #[error("operation pruned: {0}")]
93    OperationPruned(Location),
94
95    /// The requested key was not found in the snapshot.
96    #[error("key not found")]
97    KeyNotFound,
98
99    /// The key exists in the db, so we cannot prove its exclusion.
100    #[error("key exists")]
101    KeyExists,
102
103    #[error("unexpected data at location: {0}")]
104    UnexpectedData(Location),
105
106    #[error("location out of bounds: {0} >= {1}")]
107    LocationOutOfBounds(Location, Location),
108
109    #[error("prune location {0} beyond minimum required location {1}")]
110    PruneBeyondMinRequired(Location, Location),
111
112    /// The changeset was created from a different database state than the current one.
113    #[error("stale changeset: batch expected db size {expected}, but db has {actual}")]
114    StaleChangeset { expected: u64, actual: u64 },
115}
116
117impl From<crate::journal::authenticated::Error> for Error {
118    fn from(e: crate::journal::authenticated::Error) -> Self {
119        match e {
120            crate::journal::authenticated::Error::Journal(j) => Self::Journal(j),
121            crate::journal::authenticated::Error::Mmr(m) => Self::Mmr(m),
122        }
123    }
124}
125
126/// The size of the read buffer to use for replaying the operations log when rebuilding the
127/// snapshot.
128const SNAPSHOT_READ_BUFFER_SIZE: NonZeroUsize = NZUsize!(1 << 16);
129
130/// Builds the database's snapshot by replaying the log starting at the inactivity floor. Assumes
131/// the log is not pruned beyond the inactivity floor. The callback is invoked for each replayed
132/// operation, indicating activity status updates. The first argument of the callback is the
133/// activity status of the operation, and the second argument is the location of the operation it
134/// inactivates (if any). Returns the number of active keys in the db.
135pub(super) async fn build_snapshot_from_log<C, I, F>(
136    inactivity_floor_loc: Location,
137    reader: &C,
138    snapshot: &mut I,
139    mut callback: F,
140) -> Result<usize, Error>
141where
142    C: Reader<Item: Operation>,
143    I: Index<Value = Location>,
144    F: FnMut(bool, Option<Location>),
145{
146    let bounds = reader.bounds();
147    let stream = reader
148        .replay(SNAPSHOT_READ_BUFFER_SIZE, *inactivity_floor_loc)
149        .await?;
150    pin_mut!(stream);
151    let last_commit_loc = bounds.end.saturating_sub(1);
152    let mut active_keys: usize = 0;
153    while let Some(result) = stream.next().await {
154        let (loc, op) = result?;
155        if let Some(key) = op.key() {
156            if op.is_delete() {
157                let old_loc = delete_key(snapshot, reader, key).await?;
158                callback(false, old_loc);
159                if old_loc.is_some() {
160                    active_keys -= 1;
161                }
162            } else if op.is_update() {
163                let new_loc = Location::new(loc);
164                let old_loc = update_key(snapshot, reader, key, new_loc).await?;
165                callback(true, old_loc);
166                if old_loc.is_none() {
167                    active_keys += 1;
168                }
169            }
170        } else if op.has_floor().is_some() {
171            callback(loc == last_commit_loc, None);
172        }
173    }
174
175    Ok(active_keys)
176}
177
178/// Delete `key` from the snapshot if it exists, using a stable log reader, and return the
179/// previously associated location.
180async fn delete_key<I, R>(
181    snapshot: &mut I,
182    reader: &R,
183    key: &<R::Item as Operation>::Key,
184) -> Result<Option<Location>, Error>
185where
186    I: Index<Value = Location>,
187    R: Reader,
188    R::Item: Operation,
189{
190    // If the translated key is in the snapshot, get a cursor to look for the key.
191    let Some(mut cursor) = snapshot.get_mut(key) else {
192        return Ok(None);
193    };
194
195    // Find the matching key among all conflicts, then delete it.
196    let Some(loc) = find_update_op(reader, &mut cursor, key).await? else {
197        return Ok(None);
198    };
199    cursor.delete();
200
201    Ok(Some(loc))
202}
203
204/// Update `key` in the snapshot using a stable log reader, returning its old location if present.
205async fn update_key<I, R>(
206    snapshot: &mut I,
207    reader: &R,
208    key: &<R::Item as Operation>::Key,
209    new_loc: Location,
210) -> Result<Option<Location>, Error>
211where
212    I: Index<Value = Location>,
213    R: Reader,
214    R::Item: Operation,
215{
216    // If the translated key is not in the snapshot, insert the new location. Otherwise, get a
217    // cursor to look for the key.
218    let Some(mut cursor) = snapshot.get_mut_or_insert(key, new_loc) else {
219        return Ok(None);
220    };
221
222    // Find the matching key among all conflicts, then update its location.
223    if let Some(loc) = find_update_op(reader, &mut cursor, key).await? {
224        assert!(new_loc > loc);
225        cursor.update(new_loc);
226        return Ok(Some(loc));
227    }
228
229    // The key wasn't in the snapshot, so add it to the cursor.
230    cursor.insert(new_loc);
231
232    Ok(None)
233}
234
235/// Find and return the location of the update operation for `key`, if it exists. The cursor is
236/// positioned at the matching location, and can be used to update or delete the key.
237///
238/// # Panics
239///
240/// Panics if `key` is not found in the snapshot or if `old_loc` is not found in the cursor.
241async fn find_update_op<R>(
242    reader: &R,
243    cursor: &mut impl Cursor<Value = Location>,
244    key: &<R::Item as Operation>::Key,
245) -> Result<Option<Location>, Error>
246where
247    R: Reader,
248    R::Item: Operation,
249{
250    while let Some(&loc) = cursor.next() {
251        let op = reader.read(*loc).await?;
252        let k = op.key().expect("operation without key");
253        if *k == *key {
254            return Ok(Some(loc));
255        }
256    }
257
258    Ok(None)
259}
260
261/// For the given `key` which is known to exist in the snapshot with location `old_loc`, update
262/// its location to `new_loc`.
263///
264/// # Panics
265///
266/// Panics if `key` is not found in the snapshot or if `old_loc` is not found in the cursor.
267fn update_known_loc<I: Index<Value = Location>>(
268    snapshot: &mut I,
269    key: &[u8],
270    old_loc: Location,
271    new_loc: Location,
272) {
273    let mut cursor = snapshot.get_mut(key).expect("key should be known to exist");
274    assert!(
275        cursor.find(|&loc| *loc == old_loc),
276        "known key with given old_loc should have been found"
277    );
278    cursor.update(new_loc);
279}
280
281/// For the given `key` which is known to exist in the snapshot with location `old_loc`, delete
282/// it from the snapshot.
283///
284/// # Panics
285///
286/// Panics if `key` is not found in the snapshot or if `old_loc` is not found in the cursor.
287fn delete_known_loc<I: Index<Value = Location>>(snapshot: &mut I, key: &[u8], old_loc: Location) {
288    let mut cursor = snapshot.get_mut(key).expect("key should be known to exist");
289    assert!(
290        cursor.find(|&loc| *loc == old_loc),
291        "known key with given old_loc should have been found"
292    );
293    cursor.delete();
294}
295
296/// A wrapper of DB state required for implementing inactivity floor management.
297pub(crate) struct FloorHelper<'a, I: Index<Value = Location>, C: Mutable<Item: Operation>> {
298    pub snapshot: &'a mut I,
299    pub log: &'a mut C,
300}
301
302impl<I, C> FloorHelper<'_, I, C>
303where
304    I: Index<Value = Location>,
305    C: Mutable<Item: Operation>,
306{
307    /// Moves the given operation to the tip of the log if it is active, rendering its old location
308    /// inactive. If the operation was not active, then this is a no-op. Returns whether the
309    /// operation was moved.
310    async fn move_op_if_active(&mut self, op: C::Item, old_loc: Location) -> Result<bool, Error> {
311        let Some(key) = op.key() else {
312            return Ok(false); // operations without keys cannot be active
313        };
314
315        // If we find a snapshot entry corresponding to the operation, we know it's active.
316        {
317            let Some(mut cursor) = self.snapshot.get_mut(key) else {
318                return Ok(false);
319            };
320            if !cursor.find(|&loc| loc == old_loc) {
321                return Ok(false);
322            }
323
324            // Update the operation's snapshot location to point to tip.
325            cursor.update(Location::new(self.log.size().await));
326        }
327
328        // Apply the operation at tip.
329        self.log.append(&op).await?;
330
331        Ok(true)
332    }
333
334    /// Raise the inactivity floor by taking one _step_, which involves searching for the first
335    /// active operation above the inactivity floor, moving it to tip, and then setting the
336    /// inactivity floor to the location following the moved operation. This method is therefore
337    /// guaranteed to raise the floor by at least one. Returns the new inactivity floor location.
338    ///
339    /// # Panics
340    ///
341    /// Expects there is at least one active operation above the inactivity floor, and panics
342    /// otherwise.
343    async fn raise_floor(&mut self, mut inactivity_floor_loc: Location) -> Result<Location, Error>
344    where
345        I: Index<Value = Location>,
346    {
347        let tip_loc = Location::new(self.log.size().await);
348        loop {
349            assert!(
350                *inactivity_floor_loc < tip_loc,
351                "no active operations above the inactivity floor"
352            );
353            let old_loc = inactivity_floor_loc;
354            inactivity_floor_loc += 1;
355            let op = {
356                let reader = self.log.reader().await;
357                reader.read(*old_loc).await?
358            };
359            if self.move_op_if_active(op, old_loc).await? {
360                return Ok(inactivity_floor_loc);
361            }
362        }
363    }
364}