commonware_storage/qmdb/
mod.rs

1//! A collection of authenticated databases inspired by QMDB (Quick Merkle Database).
2//!
3//! # Terminology
4//!
5//! A _key_ in an authenticated database either has a _value_ or it doesn't. Two types of
6//! _operations_ can be applied to the db to modify the state of a specific key. A key that has a
7//! value can change to one without a value through the _delete_ operation. The _update_ operation
8//! gives a key a specific value whether it previously had no value or had a different value.
9//!
10//! Keys with values are called _active_. An operation is called _active_ if (1) its key is active,
11//! (2) it is an update operation, and (3) it is the most recent operation for that key.
12//!
13//! # Acknowledgments
14//!
15//! The following resources were used as references when implementing this crate:
16//!
17//! * [QMDB: Quick Merkle Database](https://arxiv.org/abs/2501.05262)
18//! * [Merkle Mountain Ranges](https://github.com/opentimestamps/opentimestamps-server/blob/master/doc/merkle-mountain-range.md)
19
20use crate::{
21    index::{Cursor, Unordered as Index},
22    journal::contiguous::{Contiguous, MutableContiguous},
23    mmr::Location,
24    qmdb::operation::Operation,
25    DirtyAuthenticatedBitMap,
26};
27use commonware_cryptography::Digest;
28use commonware_utils::NZUsize;
29use core::num::NonZeroUsize;
30use futures::{pin_mut, StreamExt as _};
31use thiserror::Error;
32
33pub mod any;
34pub mod current;
35pub mod immutable;
36pub mod keyless;
37pub mod operation;
38pub mod store;
39pub mod sync;
40pub mod verify;
41pub use verify::{
42    create_multi_proof, create_proof, create_proof_store, create_proof_store_from_digests,
43    digests_required_for_proof, extract_pinned_nodes, verify_multi_proof, verify_proof,
44    verify_proof_and_extract_digests,
45};
46
47/// Errors that can occur when interacting with an authenticated database.
48#[derive(Error, Debug)]
49pub enum Error {
50    #[error("mmr error: {0}")]
51    Mmr(#[from] crate::mmr::Error),
52
53    #[error("metadata error: {0}")]
54    Metadata(#[from] crate::metadata::Error),
55
56    #[error("journal error: {0}")]
57    Journal(#[from] crate::journal::Error),
58
59    #[error("runtime error: {0}")]
60    Runtime(#[from] commonware_runtime::Error),
61
62    #[error("operation pruned: {0}")]
63    OperationPruned(Location),
64
65    /// The requested key was not found in the snapshot.
66    #[error("key not found")]
67    KeyNotFound,
68
69    /// The key exists in the db, so we cannot prove its exclusion.
70    #[error("key exists")]
71    KeyExists,
72
73    #[error("unexpected data at location: {0}")]
74    UnexpectedData(Location),
75
76    #[error("location out of bounds: {0} >= {1}")]
77    LocationOutOfBounds(Location, Location),
78
79    #[error("prune location {0} beyond minimum required location {1}")]
80    PruneBeyondMinRequired(Location, Location),
81}
82
83impl From<crate::journal::authenticated::Error> for Error {
84    fn from(e: crate::journal::authenticated::Error) -> Self {
85        match e {
86            crate::journal::authenticated::Error::Journal(j) => Self::Journal(j),
87            crate::journal::authenticated::Error::Mmr(m) => Self::Mmr(m),
88        }
89    }
90}
91
92/// The size of the read buffer to use for replaying the operations log when rebuilding the
93/// snapshot.
94const SNAPSHOT_READ_BUFFER_SIZE: NonZeroUsize = NZUsize!(1 << 16);
95
96/// Builds the database's snapshot by replaying the log starting at the inactivity floor. Assumes
97/// the log is not pruned beyond the inactivity floor. The callback is invoked for each replayed
98/// operation, indicating activity status updates. The first argument of the callback is the
99/// activity status of the operation, and the second argument is the location of the operation it
100/// inactivates (if any). Returns the number of active keys in the db.
101pub(super) async fn build_snapshot_from_log<C, I, F>(
102    inactivity_floor_loc: Location,
103    log: &C,
104    snapshot: &mut I,
105    mut callback: F,
106) -> Result<usize, Error>
107where
108    C: Contiguous<Item: Operation>,
109    I: Index<Value = Location>,
110    F: FnMut(bool, Option<Location>),
111{
112    let stream = log
113        .replay(*inactivity_floor_loc, SNAPSHOT_READ_BUFFER_SIZE)
114        .await?;
115    pin_mut!(stream);
116    let last_commit_loc = log.size().saturating_sub(1);
117    let mut active_keys: usize = 0;
118    while let Some(result) = stream.next().await {
119        let (loc, op) = result?;
120        if let Some(key) = op.key() {
121            if op.is_delete() {
122                let old_loc = delete_key(snapshot, log, key).await?;
123                callback(false, old_loc);
124                if old_loc.is_some() {
125                    active_keys -= 1;
126                }
127            } else if op.is_update() {
128                let new_loc = Location::new_unchecked(loc);
129                let old_loc = update_key(snapshot, log, key, new_loc).await?;
130                callback(true, old_loc);
131                if old_loc.is_none() {
132                    active_keys += 1;
133                }
134            }
135        } else if op.has_floor().is_some() {
136            callback(loc == last_commit_loc, None);
137        }
138    }
139
140    Ok(active_keys)
141}
142
143/// Delete `key` from the snapshot if it exists, returning the location that was previously
144/// associated with it.
145async fn delete_key<I, C>(
146    snapshot: &mut I,
147    log: &C,
148    key: &<C::Item as Operation>::Key,
149) -> Result<Option<Location>, Error>
150where
151    I: Index<Value = Location>,
152    C: Contiguous<Item: Operation>,
153{
154    // If the translated key is in the snapshot, get a cursor to look for the key.
155    let Some(mut cursor) = snapshot.get_mut(key) else {
156        return Ok(None);
157    };
158
159    // Find the matching key among all conflicts, then delete it.
160    let Some(loc) = find_update_op(log, &mut cursor, key).await? else {
161        return Ok(None);
162    };
163    cursor.delete();
164
165    Ok(Some(loc))
166}
167
168/// Update the location of `key` to `new_loc` in the snapshot and return its old location, or insert
169/// it if the key isn't already present.
170async fn update_key<I, C>(
171    snapshot: &mut I,
172    log: &C,
173    key: &<C::Item as Operation>::Key,
174    new_loc: Location,
175) -> Result<Option<Location>, Error>
176where
177    I: Index<Value = Location>,
178    C: Contiguous<Item: Operation>,
179{
180    // If the translated key is not in the snapshot, insert the new location. Otherwise, get a
181    // cursor to look for the key.
182    let Some(mut cursor) = snapshot.get_mut_or_insert(key, new_loc) else {
183        return Ok(None);
184    };
185
186    // Find the matching key among all conflicts, then update its location.
187    if let Some(loc) = find_update_op(log, &mut cursor, key).await? {
188        assert!(new_loc > loc);
189        cursor.update(new_loc);
190        return Ok(Some(loc));
191    }
192
193    // The key wasn't in the snapshot, so add it to the cursor.
194    cursor.insert(new_loc);
195
196    Ok(None)
197}
198
199/// Create a `key` with location `new_loc` in the snapshot only if it doesn't already exist, and
200/// return false otherwise.
201async fn create_key<I, C>(
202    snapshot: &mut I,
203    log: &C,
204    key: &<C::Item as Operation>::Key,
205    new_loc: Location,
206) -> Result<bool, Error>
207where
208    I: Index<Value = Location>,
209    C: Contiguous<Item: Operation>,
210{
211    // If the translated key is not in the snapshot, insert the new location. Otherwise, get a
212    // cursor to look for the key.
213    let Some(mut cursor) = snapshot.get_mut_or_insert(key, new_loc) else {
214        return Ok(true);
215    };
216
217    // Confirm the key doesn't already exist.
218    if find_update_op(log, &mut cursor, key).await?.is_some() {
219        return Ok(false);
220    }
221
222    // The key doesn't exist, so add it to the cursor.
223    cursor.insert(new_loc);
224
225    Ok(true)
226}
227
228/// Find and return the location of the update operation for `key`, if it exists. The cursor is
229/// positioned at the matching location, and can be used to update or delete the key.
230///
231/// # Panics
232///
233/// Panics if `key` is not found in the snapshot or if `old_loc` is not found in the cursor.
234async fn find_update_op<C>(
235    log: &C,
236    cursor: &mut impl Cursor<Value = Location>,
237    key: &<C::Item as Operation>::Key,
238) -> Result<Option<Location>, Error>
239where
240    C: Contiguous<Item: Operation>,
241{
242    while let Some(&loc) = cursor.next() {
243        let op = log.read(*loc).await?;
244        let k = op.key().expect("operation without key");
245        if *k == *key {
246            return Ok(Some(loc));
247        }
248    }
249
250    Ok(None)
251}
252
253/// For the given `key` which is known to exist in the snapshot with location `old_loc`, update
254/// its location to `new_loc`.
255///
256/// # Panics
257///
258/// Panics if `key` is not found in the snapshot or if `old_loc` is not found in the cursor.
259fn update_known_loc<I: Index<Value = Location>>(
260    snapshot: &mut I,
261    key: &[u8],
262    old_loc: Location,
263    new_loc: Location,
264) {
265    let mut cursor = snapshot.get_mut(key).expect("key should be known to exist");
266    assert!(
267        cursor.find(|&loc| *loc == old_loc),
268        "known key with given old_loc should have been found"
269    );
270    cursor.update(new_loc);
271}
272
273/// For the given `key` which is known to exist in the snapshot with location `old_loc`, delete
274/// it from the snapshot.
275///
276/// # Panics
277///
278/// Panics if `key` is not found in the snapshot or if `old_loc` is not found in the cursor.
279fn delete_known_loc<I: Index<Value = Location>>(snapshot: &mut I, key: &[u8], old_loc: Location) {
280    let mut cursor = snapshot.get_mut(key).expect("key should be known to exist");
281    assert!(
282        cursor.find(|&loc| *loc == old_loc),
283        "known key with given old_loc should have been found"
284    );
285    cursor.delete();
286}
287
288/// A wrapper of DB state required for implementing inactivity floor management.
289pub(crate) struct FloorHelper<'a, I: Index<Value = Location>, C: MutableContiguous<Item: Operation>>
290{
291    pub snapshot: &'a mut I,
292    pub log: &'a mut C,
293}
294
295impl<I, C> FloorHelper<'_, I, C>
296where
297    I: Index<Value = Location>,
298    C: MutableContiguous<Item: Operation>,
299{
300    /// Moves the given operation to the tip of the log if it is active, rendering its old location
301    /// inactive. If the operation was not active, then this is a no-op. Returns whether the
302    /// operation was moved.
303    async fn move_op_if_active(&mut self, op: C::Item, old_loc: Location) -> Result<bool, Error> {
304        let Some(key) = op.key() else {
305            return Ok(false); // operations without keys cannot be active
306        };
307
308        // If we find a snapshot entry corresponding to the operation, we know it's active.
309        let Some(mut cursor) = self.snapshot.get_mut(key) else {
310            return Ok(false);
311        };
312        if !cursor.find(|&loc| loc == old_loc) {
313            return Ok(false);
314        }
315
316        // Update the operation's snapshot location to point to tip.
317        cursor.update(Location::new_unchecked(self.log.size()));
318        drop(cursor);
319
320        // Apply the operation at tip.
321        self.log.append(op).await?;
322
323        Ok(true)
324    }
325
326    /// Raise the inactivity floor by taking one _step_, which involves searching for the first
327    /// active operation above the inactivity floor, moving it to tip, and then setting the
328    /// inactivity floor to the location following the moved operation. This method is therefore
329    /// guaranteed to raise the floor by at least one. Returns the new inactivity floor location.
330    ///
331    /// # Panics
332    ///
333    /// Expects there is at least one active operation above the inactivity floor, and panics
334    /// otherwise.
335    // TODO(https://github.com/commonwarexyz/monorepo/issues/1829): callers of this method should
336    // migrate to using [Self::raise_floor_with_bitmap] instead.
337    async fn raise_floor(&mut self, mut inactivity_floor_loc: Location) -> Result<Location, Error>
338    where
339        I: Index<Value = Location>,
340    {
341        let tip_loc = Location::new_unchecked(self.log.size());
342        loop {
343            assert!(
344                *inactivity_floor_loc < tip_loc,
345                "no active operations above the inactivity floor"
346            );
347            let old_loc = inactivity_floor_loc;
348            inactivity_floor_loc += 1;
349            let op = self.log.read(*old_loc).await?;
350            if self.move_op_if_active(op, old_loc).await? {
351                return Ok(inactivity_floor_loc);
352            }
353        }
354    }
355
356    /// Same as `raise_floor` but uses the status bitmap to more efficiently find the first active
357    /// operation above the inactivity floor. The status bitmap is updated to reflect any moved
358    /// operations.
359    ///
360    /// # Panics
361    ///
362    /// Panics if there is not at least one active operation above the inactivity floor.
363    pub(crate) async fn raise_floor_with_bitmap<D: Digest, const N: usize>(
364        &mut self,
365        status: &mut DirtyAuthenticatedBitMap<D, N>,
366        mut inactivity_floor_loc: Location,
367    ) -> Result<Location, Error>
368    where
369        I: Index<Value = Location>,
370    {
371        // Use the status bitmap to find the first active operation above the inactivity floor.
372        while !status.get_bit(*inactivity_floor_loc) {
373            inactivity_floor_loc += 1;
374        }
375
376        // Move the active operation to tip.
377        let op = self.log.read(*inactivity_floor_loc).await?;
378        assert!(
379            self.move_op_if_active(op, inactivity_floor_loc).await?,
380            "op should be active based on status bitmap"
381        );
382        status.set_bit(*inactivity_floor_loc, false);
383        status.push(true);
384
385        Ok(inactivity_floor_loc + 1)
386    }
387}