commonware_storage/qmdb/mod.rs
1//! A collection of authenticated databases inspired by QMDB (Quick Merkle Database).
2//!
3//! # Terminology
4//!
5//! A database's state is derived from an append-only log of state-changing _operations_.
6//!
7//! In a _keyed_ database, a _key_ either has a _value_ or it doesn't, and different types of
8//! operations modify the state of a specific key. A key that has a value can change to one without
9//! a value through the _delete_ operation. The _update_ operation gives a key a specific value. We
10//! sometimes call an update for a key that doesn't already have a value a _create_ operation, but
11//! its representation in the log is the same.
12//!
13//! Keys with values are called _active_. An operation is called _active_ if (1) its key is active,
14//! (2) it is an update operation, and (3) it is the most recent operation for that key.
15//!
16//! # Database Lifecycle
17//!
18//! All variants are modified through a batch API that follows a common pattern:
19//! 1. Create a batch from the database.
20//! 2. Stage mutations on the batch.
21//! 3. Merkleize the batch -- this resolves mutations against the current state and computes
22//! the Merkle root that would result from applying them.
23//! 4. Inspect the root or create child batches.
24//! 5. Finalize the batch into a changeset.
25//! 6. Apply the changeset to the database.
26//!
27//! A merkleized batch can spawn child batches, forming a tree of speculative states that
28//! share a common ancestor. Only the finalized leaf needs to be applied.
29//!
30//! The specific mutation methods vary by variant.
31//! See each variant's module documentation for the concrete API and usage examples.
32//!
33//! Persistence and cleanup are managed directly on the database: `sync()`, `prune()`,
34//! and `destroy()`.
35//!
36//! # Traits
37//!
38//! All variants implement [store::LogStore] and [store::MerkleizedStore]. Keyed mutable
39//! variants ([any] and [current]) also implement [store::PrunableStore] and
40//! [crate::Persistable]. The [immutable] variant implements [crate::kv::Gettable].
41//!
42//! # Acknowledgments
43//!
44//! The following resources were used as references when implementing this crate:
45//!
46//! * [QMDB: Quick Merkle Database](https://arxiv.org/abs/2501.05262)
47//! * [Merkle Mountain
48//! Ranges](https://github.com/opentimestamps/opentimestamps-server/blob/master/doc/merkle-mountain-range.md)
49
50use crate::{
51 index::{Cursor, Unordered as Index},
52 journal::contiguous::{Mutable, Reader},
53 mmr::Location,
54 qmdb::operation::Operation,
55};
56use commonware_utils::NZUsize;
57use core::num::NonZeroUsize;
58use futures::{pin_mut, StreamExt as _};
59use thiserror::Error;
60
61pub mod any;
62pub mod current;
63pub mod immutable;
64pub mod keyless;
65pub mod operation;
66pub mod store;
67pub mod sync;
68pub mod verify;
69pub use verify::{
70 create_multi_proof, create_proof_store, create_proof_store_from_digests, extract_pinned_nodes,
71 verify_multi_proof, verify_proof, verify_proof_and_extract_digests,
72};
73
74/// Errors that can occur when interacting with an authenticated database.
75#[derive(Error, Debug)]
76pub enum Error {
77 #[error("data corrupted: {0}")]
78 DataCorrupted(&'static str),
79
80 #[error("mmr error: {0}")]
81 Mmr(#[from] crate::mmr::Error),
82
83 #[error("metadata error: {0}")]
84 Metadata(#[from] crate::metadata::Error),
85
86 #[error("journal error: {0}")]
87 Journal(#[from] crate::journal::Error),
88
89 #[error("runtime error: {0}")]
90 Runtime(#[from] commonware_runtime::Error),
91
92 #[error("operation pruned: {0}")]
93 OperationPruned(Location),
94
95 /// The requested key was not found in the snapshot.
96 #[error("key not found")]
97 KeyNotFound,
98
99 /// The key exists in the db, so we cannot prove its exclusion.
100 #[error("key exists")]
101 KeyExists,
102
103 #[error("unexpected data at location: {0}")]
104 UnexpectedData(Location),
105
106 #[error("location out of bounds: {0} >= {1}")]
107 LocationOutOfBounds(Location, Location),
108
109 #[error("prune location {0} beyond minimum required location {1}")]
110 PruneBeyondMinRequired(Location, Location),
111
112 /// The changeset was created from a different database state than the current one.
113 #[error("stale changeset: batch expected db size {expected}, but db has {actual}")]
114 StaleChangeset { expected: u64, actual: u64 },
115}
116
117impl From<crate::journal::authenticated::Error> for Error {
118 fn from(e: crate::journal::authenticated::Error) -> Self {
119 match e {
120 crate::journal::authenticated::Error::Journal(j) => Self::Journal(j),
121 crate::journal::authenticated::Error::Mmr(m) => Self::Mmr(m),
122 }
123 }
124}
125
126/// The size of the read buffer to use for replaying the operations log when rebuilding the
127/// snapshot.
128const SNAPSHOT_READ_BUFFER_SIZE: NonZeroUsize = NZUsize!(1 << 16);
129
130/// Builds the database's snapshot by replaying the log starting at the inactivity floor. Assumes
131/// the log is not pruned beyond the inactivity floor. The callback is invoked for each replayed
132/// operation, indicating activity status updates. The first argument of the callback is the
133/// activity status of the operation, and the second argument is the location of the operation it
134/// inactivates (if any). Returns the number of active keys in the db.
135pub(super) async fn build_snapshot_from_log<C, I, F>(
136 inactivity_floor_loc: Location,
137 reader: &C,
138 snapshot: &mut I,
139 mut callback: F,
140) -> Result<usize, Error>
141where
142 C: Reader<Item: Operation>,
143 I: Index<Value = Location>,
144 F: FnMut(bool, Option<Location>),
145{
146 let bounds = reader.bounds();
147 let stream = reader
148 .replay(SNAPSHOT_READ_BUFFER_SIZE, *inactivity_floor_loc)
149 .await?;
150 pin_mut!(stream);
151 let last_commit_loc = bounds.end.saturating_sub(1);
152 let mut active_keys: usize = 0;
153 while let Some(result) = stream.next().await {
154 let (loc, op) = result?;
155 if let Some(key) = op.key() {
156 if op.is_delete() {
157 let old_loc = delete_key(snapshot, reader, key).await?;
158 callback(false, old_loc);
159 if old_loc.is_some() {
160 active_keys -= 1;
161 }
162 } else if op.is_update() {
163 let new_loc = Location::new(loc);
164 let old_loc = update_key(snapshot, reader, key, new_loc).await?;
165 callback(true, old_loc);
166 if old_loc.is_none() {
167 active_keys += 1;
168 }
169 }
170 } else if op.has_floor().is_some() {
171 callback(loc == last_commit_loc, None);
172 }
173 }
174
175 Ok(active_keys)
176}
177
178/// Delete `key` from the snapshot if it exists, using a stable log reader, and return the
179/// previously associated location.
180async fn delete_key<I, R>(
181 snapshot: &mut I,
182 reader: &R,
183 key: &<R::Item as Operation>::Key,
184) -> Result<Option<Location>, Error>
185where
186 I: Index<Value = Location>,
187 R: Reader,
188 R::Item: Operation,
189{
190 // If the translated key is in the snapshot, get a cursor to look for the key.
191 let Some(mut cursor) = snapshot.get_mut(key) else {
192 return Ok(None);
193 };
194
195 // Find the matching key among all conflicts, then delete it.
196 let Some(loc) = find_update_op(reader, &mut cursor, key).await? else {
197 return Ok(None);
198 };
199 cursor.delete();
200
201 Ok(Some(loc))
202}
203
204/// Update `key` in the snapshot using a stable log reader, returning its old location if present.
205async fn update_key<I, R>(
206 snapshot: &mut I,
207 reader: &R,
208 key: &<R::Item as Operation>::Key,
209 new_loc: Location,
210) -> Result<Option<Location>, Error>
211where
212 I: Index<Value = Location>,
213 R: Reader,
214 R::Item: Operation,
215{
216 // If the translated key is not in the snapshot, insert the new location. Otherwise, get a
217 // cursor to look for the key.
218 let Some(mut cursor) = snapshot.get_mut_or_insert(key, new_loc) else {
219 return Ok(None);
220 };
221
222 // Find the matching key among all conflicts, then update its location.
223 if let Some(loc) = find_update_op(reader, &mut cursor, key).await? {
224 assert!(new_loc > loc);
225 cursor.update(new_loc);
226 return Ok(Some(loc));
227 }
228
229 // The key wasn't in the snapshot, so add it to the cursor.
230 cursor.insert(new_loc);
231
232 Ok(None)
233}
234
235/// Find and return the location of the update operation for `key`, if it exists. The cursor is
236/// positioned at the matching location, and can be used to update or delete the key.
237///
238/// # Panics
239///
240/// Panics if `key` is not found in the snapshot or if `old_loc` is not found in the cursor.
241async fn find_update_op<R>(
242 reader: &R,
243 cursor: &mut impl Cursor<Value = Location>,
244 key: &<R::Item as Operation>::Key,
245) -> Result<Option<Location>, Error>
246where
247 R: Reader,
248 R::Item: Operation,
249{
250 while let Some(&loc) = cursor.next() {
251 let op = reader.read(*loc).await?;
252 let k = op.key().expect("operation without key");
253 if *k == *key {
254 return Ok(Some(loc));
255 }
256 }
257
258 Ok(None)
259}
260
261/// For the given `key` which is known to exist in the snapshot with location `old_loc`, update
262/// its location to `new_loc`.
263///
264/// # Panics
265///
266/// Panics if `key` is not found in the snapshot or if `old_loc` is not found in the cursor.
267fn update_known_loc<I: Index<Value = Location>>(
268 snapshot: &mut I,
269 key: &[u8],
270 old_loc: Location,
271 new_loc: Location,
272) {
273 let mut cursor = snapshot.get_mut(key).expect("key should be known to exist");
274 assert!(
275 cursor.find(|&loc| *loc == old_loc),
276 "known key with given old_loc should have been found"
277 );
278 cursor.update(new_loc);
279}
280
281/// For the given `key` which is known to exist in the snapshot with location `old_loc`, delete
282/// it from the snapshot.
283///
284/// # Panics
285///
286/// Panics if `key` is not found in the snapshot or if `old_loc` is not found in the cursor.
287fn delete_known_loc<I: Index<Value = Location>>(snapshot: &mut I, key: &[u8], old_loc: Location) {
288 let mut cursor = snapshot.get_mut(key).expect("key should be known to exist");
289 assert!(
290 cursor.find(|&loc| *loc == old_loc),
291 "known key with given old_loc should have been found"
292 );
293 cursor.delete();
294}
295
296/// A wrapper of DB state required for implementing inactivity floor management.
297pub(crate) struct FloorHelper<'a, I: Index<Value = Location>, C: Mutable<Item: Operation>> {
298 pub snapshot: &'a mut I,
299 pub log: &'a mut C,
300}
301
302impl<I, C> FloorHelper<'_, I, C>
303where
304 I: Index<Value = Location>,
305 C: Mutable<Item: Operation>,
306{
307 /// Moves the given operation to the tip of the log if it is active, rendering its old location
308 /// inactive. If the operation was not active, then this is a no-op. Returns whether the
309 /// operation was moved.
310 async fn move_op_if_active(&mut self, op: C::Item, old_loc: Location) -> Result<bool, Error> {
311 let Some(key) = op.key() else {
312 return Ok(false); // operations without keys cannot be active
313 };
314
315 // If we find a snapshot entry corresponding to the operation, we know it's active.
316 {
317 let Some(mut cursor) = self.snapshot.get_mut(key) else {
318 return Ok(false);
319 };
320 if !cursor.find(|&loc| loc == old_loc) {
321 return Ok(false);
322 }
323
324 // Update the operation's snapshot location to point to tip.
325 cursor.update(Location::new(self.log.size().await));
326 }
327
328 // Apply the operation at tip.
329 self.log.append(&op).await?;
330
331 Ok(true)
332 }
333
334 /// Raise the inactivity floor by taking one _step_, which involves searching for the first
335 /// active operation above the inactivity floor, moving it to tip, and then setting the
336 /// inactivity floor to the location following the moved operation. This method is therefore
337 /// guaranteed to raise the floor by at least one. Returns the new inactivity floor location.
338 ///
339 /// # Panics
340 ///
341 /// Expects there is at least one active operation above the inactivity floor, and panics
342 /// otherwise.
343 async fn raise_floor(&mut self, mut inactivity_floor_loc: Location) -> Result<Location, Error>
344 where
345 I: Index<Value = Location>,
346 {
347 let tip_loc = Location::new(self.log.size().await);
348 loop {
349 assert!(
350 *inactivity_floor_loc < tip_loc,
351 "no active operations above the inactivity floor"
352 );
353 let old_loc = inactivity_floor_loc;
354 inactivity_floor_loc += 1;
355 let op = {
356 let reader = self.log.reader().await;
357 reader.read(*old_loc).await?
358 };
359 if self.move_op_if_active(op, old_loc).await? {
360 return Ok(inactivity_floor_loc);
361 }
362 }
363 }
364}