commonware_storage/qmdb/mod.rs
1//! A collection of authenticated databases inspired by QMDB (Quick Merkle Database).
2//!
3//! # Terminology
4//!
5//! A database's state is derived from an append-only log of state-changing _operations_.
6//!
7//! In a _keyed_ database, a _key_ either has a _value_ or it doesn't, and different types of
8//! operations modify the state of a specific key. A key that has a value can change to one without
9//! a value through the _delete_ operation. The _update_ operation gives a key a specific value. We
10//! sometimes call an update for a key that doesn't already have a value a _create_ operation, but
11//! its representation in the log is the same.
12//!
13//! Keys with values are called _active_. An operation is called _active_ if (1) its key is active,
14//! (2) it is an update operation, and (3) it is the most recent operation for that key.
15//!
16//! # Database States
17//!
18//! An _authenticated_ database can be in one of four states based on two orthogonal dimensions:
19//! - Merkleization: [Merkleized] (has computed root) or [Unmerkleized] (root not yet computed)
20//! - Durability : [Durable] (committed to disk) or [NonDurable] (uncommitted changes)
21//!
22//! We call the combined (Merkleized,Durable) state the _Clean_ state.
23//!
24//! We call the combined (Unmerkleized,NonDurable) state the _Mutable_ state since it's the only
25//! state in which the database state (as reflected by its `root`) can be changed.
26//!
27//! State transitions result from `into_mutable()`, `into_merkleized()`, and `commit()`:
28//! - `init()` → `Clean`
29//! - `Clean.into_mutable()` → `Mutable`
30//! - `(Unmerkleized,Durable).into_mutable()` → `Mutable`
31//! - `(Merkleized,NonDurable).into_mutable()` → `Mutable`
32//! - `(Unmerkleized,Durable).into_merkleized()` → `Clean`
33//! - `Mutable.into_merkleized()` → `(Merkleized,NonDurable)`
34//! - `Mutable.commit()` → `(Unmerkleized,Durable)`
35//!
36//! An authenticated database implements [store::LogStore] in every state, and keyed databases
37//! additionally implement [crate::kv::Gettable]. Additional functionality in other states includes:
38//!
39//! - Clean: [store::MerkleizedStore], [store::PrunableStore], [super::Persistable]
40//! - (Merkleized,NonDurable): [store::MerkleizedStore], [store::PrunableStore]
41//!
42//! Keyed databases additionally implement:
43//! - Mutable: [crate::kv::Deletable], [crate::kv::Batchable]
44//!
45//! # Acknowledgments
46//!
47//! The following resources were used as references when implementing this crate:
48//!
49//! * [QMDB: Quick Merkle Database](https://arxiv.org/abs/2501.05262)
50//! * [Merkle Mountain
51//! Ranges](https://github.com/opentimestamps/opentimestamps-server/blob/master/doc/merkle-mountain-range.md)
52
53use crate::{
54 index::{Cursor, Unordered as Index},
55 journal::contiguous::{Contiguous, MutableContiguous},
56 mmr::{mem::State as MerkleizationState, Location},
57 qmdb::{operation::Operation, store::State as DurabilityState},
58 DirtyAuthenticatedBitMap,
59};
60use commonware_cryptography::{Digest, DigestOf};
61use commonware_utils::NZUsize;
62use core::num::NonZeroUsize;
63use futures::{pin_mut, StreamExt as _};
64use thiserror::Error;
65
66pub mod any;
67pub mod current;
68pub mod immutable;
69pub mod keyless;
70pub mod operation;
71pub mod store;
72pub mod sync;
73pub mod verify;
74pub use verify::{
75 create_multi_proof, create_proof, create_proof_store, create_proof_store_from_digests,
76 digests_required_for_proof, extract_pinned_nodes, verify_multi_proof, verify_proof,
77 verify_proof_and_extract_digests,
78};
79
80/// Errors that can occur when interacting with an authenticated database.
81#[derive(Error, Debug)]
82pub enum Error {
83 #[error("mmr error: {0}")]
84 Mmr(#[from] crate::mmr::Error),
85
86 #[error("metadata error: {0}")]
87 Metadata(#[from] crate::metadata::Error),
88
89 #[error("journal error: {0}")]
90 Journal(#[from] crate::journal::Error),
91
92 #[error("runtime error: {0}")]
93 Runtime(#[from] commonware_runtime::Error),
94
95 #[error("operation pruned: {0}")]
96 OperationPruned(Location),
97
98 /// The requested key was not found in the snapshot.
99 #[error("key not found")]
100 KeyNotFound,
101
102 /// The key exists in the db, so we cannot prove its exclusion.
103 #[error("key exists")]
104 KeyExists,
105
106 #[error("unexpected data at location: {0}")]
107 UnexpectedData(Location),
108
109 #[error("location out of bounds: {0} >= {1}")]
110 LocationOutOfBounds(Location, Location),
111
112 #[error("prune location {0} beyond minimum required location {1}")]
113 PruneBeyondMinRequired(Location, Location),
114}
115
116impl From<crate::journal::authenticated::Error> for Error {
117 fn from(e: crate::journal::authenticated::Error) -> Self {
118 match e {
119 crate::journal::authenticated::Error::Journal(j) => Self::Journal(j),
120 crate::journal::authenticated::Error::Mmr(m) => Self::Mmr(m),
121 }
122 }
123}
124
125/// Type alias for merkleized state of a QMDB.
126pub type Merkleized<H> = crate::mmr::mem::Clean<DigestOf<H>>;
127/// Type alias for unmerkleized state of a QMDB.
128pub type Unmerkleized = crate::mmr::mem::Dirty;
129/// Type alias for durable state of a QMDB.
130pub type Durable = store::Durable;
131/// Type alias for non-durable state of a QMDB.
132pub type NonDurable = store::NonDurable;
133
134/// The size of the read buffer to use for replaying the operations log when rebuilding the
135/// snapshot.
136const SNAPSHOT_READ_BUFFER_SIZE: NonZeroUsize = NZUsize!(1 << 16);
137
138/// Builds the database's snapshot by replaying the log starting at the inactivity floor. Assumes
139/// the log is not pruned beyond the inactivity floor. The callback is invoked for each replayed
140/// operation, indicating activity status updates. The first argument of the callback is the
141/// activity status of the operation, and the second argument is the location of the operation it
142/// inactivates (if any). Returns the number of active keys in the db.
143pub(super) async fn build_snapshot_from_log<C, I, F>(
144 inactivity_floor_loc: Location,
145 log: &C,
146 snapshot: &mut I,
147 mut callback: F,
148) -> Result<usize, Error>
149where
150 C: Contiguous<Item: Operation>,
151 I: Index<Value = Location>,
152 F: FnMut(bool, Option<Location>),
153{
154 let stream = log
155 .replay(*inactivity_floor_loc, SNAPSHOT_READ_BUFFER_SIZE)
156 .await?;
157 pin_mut!(stream);
158 let last_commit_loc = log.size().saturating_sub(1);
159 let mut active_keys: usize = 0;
160 while let Some(result) = stream.next().await {
161 let (loc, op) = result?;
162 if let Some(key) = op.key() {
163 if op.is_delete() {
164 let old_loc = delete_key(snapshot, log, key).await?;
165 callback(false, old_loc);
166 if old_loc.is_some() {
167 active_keys -= 1;
168 }
169 } else if op.is_update() {
170 let new_loc = Location::new_unchecked(loc);
171 let old_loc = update_key(snapshot, log, key, new_loc).await?;
172 callback(true, old_loc);
173 if old_loc.is_none() {
174 active_keys += 1;
175 }
176 }
177 } else if op.has_floor().is_some() {
178 callback(loc == last_commit_loc, None);
179 }
180 }
181
182 Ok(active_keys)
183}
184
185/// Delete `key` from the snapshot if it exists, returning the location that was previously
186/// associated with it.
187async fn delete_key<I, C>(
188 snapshot: &mut I,
189 log: &C,
190 key: &<C::Item as Operation>::Key,
191) -> Result<Option<Location>, Error>
192where
193 I: Index<Value = Location>,
194 C: Contiguous<Item: Operation>,
195{
196 // If the translated key is in the snapshot, get a cursor to look for the key.
197 let Some(mut cursor) = snapshot.get_mut(key) else {
198 return Ok(None);
199 };
200
201 // Find the matching key among all conflicts, then delete it.
202 let Some(loc) = find_update_op(log, &mut cursor, key).await? else {
203 return Ok(None);
204 };
205 cursor.delete();
206
207 Ok(Some(loc))
208}
209
210/// Update the location of `key` to `new_loc` in the snapshot and return its old location, or insert
211/// it if the key isn't already present.
212async fn update_key<I, C>(
213 snapshot: &mut I,
214 log: &C,
215 key: &<C::Item as Operation>::Key,
216 new_loc: Location,
217) -> Result<Option<Location>, Error>
218where
219 I: Index<Value = Location>,
220 C: Contiguous<Item: Operation>,
221{
222 // If the translated key is not in the snapshot, insert the new location. Otherwise, get a
223 // cursor to look for the key.
224 let Some(mut cursor) = snapshot.get_mut_or_insert(key, new_loc) else {
225 return Ok(None);
226 };
227
228 // Find the matching key among all conflicts, then update its location.
229 if let Some(loc) = find_update_op(log, &mut cursor, key).await? {
230 assert!(new_loc > loc);
231 cursor.update(new_loc);
232 return Ok(Some(loc));
233 }
234
235 // The key wasn't in the snapshot, so add it to the cursor.
236 cursor.insert(new_loc);
237
238 Ok(None)
239}
240
241/// Create a `key` with location `new_loc` in the snapshot only if it doesn't already exist, and
242/// return false otherwise.
243async fn create_key<I, C>(
244 snapshot: &mut I,
245 log: &C,
246 key: &<C::Item as Operation>::Key,
247 new_loc: Location,
248) -> Result<bool, Error>
249where
250 I: Index<Value = Location>,
251 C: Contiguous<Item: Operation>,
252{
253 // If the translated key is not in the snapshot, insert the new location. Otherwise, get a
254 // cursor to look for the key.
255 let Some(mut cursor) = snapshot.get_mut_or_insert(key, new_loc) else {
256 return Ok(true);
257 };
258
259 // Confirm the key doesn't already exist.
260 if find_update_op(log, &mut cursor, key).await?.is_some() {
261 return Ok(false);
262 }
263
264 // The key doesn't exist, so add it to the cursor.
265 cursor.insert(new_loc);
266
267 Ok(true)
268}
269
270/// Find and return the location of the update operation for `key`, if it exists. The cursor is
271/// positioned at the matching location, and can be used to update or delete the key.
272///
273/// # Panics
274///
275/// Panics if `key` is not found in the snapshot or if `old_loc` is not found in the cursor.
276async fn find_update_op<C>(
277 log: &C,
278 cursor: &mut impl Cursor<Value = Location>,
279 key: &<C::Item as Operation>::Key,
280) -> Result<Option<Location>, Error>
281where
282 C: Contiguous<Item: Operation>,
283{
284 while let Some(&loc) = cursor.next() {
285 let op = log.read(*loc).await?;
286 let k = op.key().expect("operation without key");
287 if *k == *key {
288 return Ok(Some(loc));
289 }
290 }
291
292 Ok(None)
293}
294
295/// For the given `key` which is known to exist in the snapshot with location `old_loc`, update
296/// its location to `new_loc`.
297///
298/// # Panics
299///
300/// Panics if `key` is not found in the snapshot or if `old_loc` is not found in the cursor.
301fn update_known_loc<I: Index<Value = Location>>(
302 snapshot: &mut I,
303 key: &[u8],
304 old_loc: Location,
305 new_loc: Location,
306) {
307 let mut cursor = snapshot.get_mut(key).expect("key should be known to exist");
308 assert!(
309 cursor.find(|&loc| *loc == old_loc),
310 "known key with given old_loc should have been found"
311 );
312 cursor.update(new_loc);
313}
314
315/// For the given `key` which is known to exist in the snapshot with location `old_loc`, delete
316/// it from the snapshot.
317///
318/// # Panics
319///
320/// Panics if `key` is not found in the snapshot or if `old_loc` is not found in the cursor.
321fn delete_known_loc<I: Index<Value = Location>>(snapshot: &mut I, key: &[u8], old_loc: Location) {
322 let mut cursor = snapshot.get_mut(key).expect("key should be known to exist");
323 assert!(
324 cursor.find(|&loc| *loc == old_loc),
325 "known key with given old_loc should have been found"
326 );
327 cursor.delete();
328}
329
330/// A wrapper of DB state required for implementing inactivity floor management.
331pub(crate) struct FloorHelper<'a, I: Index<Value = Location>, C: MutableContiguous<Item: Operation>>
332{
333 pub snapshot: &'a mut I,
334 pub log: &'a mut C,
335}
336
337impl<I, C> FloorHelper<'_, I, C>
338where
339 I: Index<Value = Location>,
340 C: MutableContiguous<Item: Operation>,
341{
342 /// Moves the given operation to the tip of the log if it is active, rendering its old location
343 /// inactive. If the operation was not active, then this is a no-op. Returns whether the
344 /// operation was moved.
345 async fn move_op_if_active(&mut self, op: C::Item, old_loc: Location) -> Result<bool, Error> {
346 let Some(key) = op.key() else {
347 return Ok(false); // operations without keys cannot be active
348 };
349
350 // If we find a snapshot entry corresponding to the operation, we know it's active.
351 let Some(mut cursor) = self.snapshot.get_mut(key) else {
352 return Ok(false);
353 };
354 if !cursor.find(|&loc| loc == old_loc) {
355 return Ok(false);
356 }
357
358 // Update the operation's snapshot location to point to tip.
359 cursor.update(Location::new_unchecked(self.log.size()));
360 drop(cursor);
361
362 // Apply the operation at tip.
363 self.log.append(op).await?;
364
365 Ok(true)
366 }
367
368 /// Raise the inactivity floor by taking one _step_, which involves searching for the first
369 /// active operation above the inactivity floor, moving it to tip, and then setting the
370 /// inactivity floor to the location following the moved operation. This method is therefore
371 /// guaranteed to raise the floor by at least one. Returns the new inactivity floor location.
372 ///
373 /// # Panics
374 ///
375 /// Expects there is at least one active operation above the inactivity floor, and panics
376 /// otherwise.
377 // TODO(https://github.com/commonwarexyz/monorepo/issues/1829): callers of this method should
378 // migrate to using [Self::raise_floor_with_bitmap] instead.
379 async fn raise_floor(&mut self, mut inactivity_floor_loc: Location) -> Result<Location, Error>
380 where
381 I: Index<Value = Location>,
382 {
383 let tip_loc = Location::new_unchecked(self.log.size());
384 loop {
385 assert!(
386 *inactivity_floor_loc < tip_loc,
387 "no active operations above the inactivity floor"
388 );
389 let old_loc = inactivity_floor_loc;
390 inactivity_floor_loc += 1;
391 let op = self.log.read(*old_loc).await?;
392 if self.move_op_if_active(op, old_loc).await? {
393 return Ok(inactivity_floor_loc);
394 }
395 }
396 }
397
398 /// Same as `raise_floor` but uses the status bitmap to more efficiently find the first active
399 /// operation above the inactivity floor. The status bitmap is updated to reflect any moved
400 /// operations.
401 ///
402 /// # Panics
403 ///
404 /// Panics if there is not at least one active operation above the inactivity floor.
405 pub(crate) async fn raise_floor_with_bitmap<D: Digest, const N: usize>(
406 &mut self,
407 status: &mut DirtyAuthenticatedBitMap<D, N>,
408 mut inactivity_floor_loc: Location,
409 ) -> Result<Location, Error>
410 where
411 I: Index<Value = Location>,
412 {
413 // Use the status bitmap to find the first active operation above the inactivity floor.
414 while !status.get_bit(*inactivity_floor_loc) {
415 inactivity_floor_loc += 1;
416 }
417
418 // Move the active operation to tip.
419 let op = self.log.read(*inactivity_floor_loc).await?;
420 assert!(
421 self.move_op_if_active(op, inactivity_floor_loc).await?,
422 "op should be active based on status bitmap"
423 );
424 status.set_bit(*inactivity_floor_loc, false);
425 status.push(true);
426
427 Ok(inactivity_floor_loc + 1)
428 }
429}