Skip to main content

commonware_storage/qmdb/any/
db.rs

1//! A shared, generic implementation of the _Any_ QMDB.
2//!
3//! The impl blocks in this file define shared functionality across all Any QMDB variants.
4
5use super::operation::{update::Update, Operation};
6use crate::{
7    index::Unordered as UnorderedIndex,
8    journal::{
9        authenticated,
10        contiguous::{Contiguous, Mutable, Reader},
11        Error as JournalError,
12    },
13    merkle::{Family, Location, Proof},
14    qmdb::{
15        build_snapshot_from_log, delete_known_loc, operation::Operation as OperationTrait,
16        update_known_loc, Error,
17    },
18    Context, Persistable,
19};
20use commonware_codec::{Codec, CodecShared};
21use commonware_cryptography::Hasher;
22use core::num::NonZeroU64;
23use std::collections::HashMap;
24
25/// Type alias for the authenticated journal used by [Db].
26pub(crate) type AuthenticatedLog<F, E, C, H> = authenticated::Journal<F, E, C, H>;
27
28/// Snapshot mutation needed to undo one operation while rewinding.
29enum SnapshotUndo<F: Family, K> {
30    Replace {
31        key: K,
32        old_loc: Location<F>,
33        new_loc: Location<F>,
34    },
35    Remove {
36        key: K,
37        old_loc: Location<F>,
38    },
39    Insert {
40        key: K,
41        new_loc: Location<F>,
42    },
43}
44
45/// An "Any" QMDB implementation generic over ordered/unordered keys and variable/fixed values.
46/// Consider using one of the following specialized variants instead, which may be more ergonomic:
47/// - [crate::qmdb::any::ordered::fixed::Db]
48/// - [crate::qmdb::any::ordered::variable::Db]
49/// - [crate::qmdb::any::unordered::fixed::Db]
50/// - [crate::qmdb::any::unordered::variable::Db]
51pub struct Db<
52    F: Family,
53    E: Context,
54    C: Contiguous<Item: CodecShared>,
55    I: UnorderedIndex<Value = Location<F>>,
56    H: Hasher,
57    U: Send + Sync,
58> {
59    /// A (pruned) log of all operations in order of their application. The index of each
60    /// operation in the log is called its _location_, which is a stable identifier.
61    ///
62    /// # Invariants
63    ///
64    /// - The log is never pruned beyond the inactivity floor.
65    /// - There is always at least one commit operation in the log.
66    pub(crate) log: AuthenticatedLog<F, E, C, H>,
67
68    /// A location before which all operations are "inactive" (that is, operations before this point
69    /// are over keys that have been updated by some operation at or after this point).
70    pub(crate) inactivity_floor_loc: Location<F>,
71
72    /// The location of the last commit operation.
73    pub(crate) last_commit_loc: Location<F>,
74
75    /// A snapshot of all currently active operations in the form of a map from each key to the
76    /// location in the log containing its most recent update.
77    ///
78    /// # Invariant
79    ///
80    /// - Only references `Operation::Update`s.
81    pub(crate) snapshot: I,
82
83    /// The number of active keys in the snapshot.
84    pub(crate) active_keys: usize,
85
86    /// Marker for the update type parameter.
87    pub(crate) _update: core::marker::PhantomData<U>,
88}
89
90// Shared read-only functionality.
91impl<F, E, U, C, I, H> Db<F, E, C, I, H, U>
92where
93    F: Family,
94    E: Context,
95    U: Update,
96    C: Contiguous<Item = Operation<F, U>>,
97    I: UnorderedIndex<Value = Location<F>>,
98    H: Hasher,
99    Operation<F, U>: Codec,
100{
101    /// Return the inactivity floor location. This is the location before which all operations are
102    /// known to be inactive. Operations before this point can be safely pruned.
103    pub const fn inactivity_floor_loc(&self) -> Location<F> {
104        self.inactivity_floor_loc
105    }
106
107    /// Whether the snapshot currently has no active keys.
108    pub const fn is_empty(&self) -> bool {
109        self.active_keys == 0
110    }
111
112    /// Get the metadata associated with the last commit.
113    pub async fn get_metadata(&self) -> Result<Option<U::Value>, crate::qmdb::Error<F>> {
114        match self.log.reader().await.read(*self.last_commit_loc).await? {
115            Operation::CommitFloor(metadata, _) => Ok(metadata),
116            _ => unreachable!("last commit is not a CommitFloor operation"),
117        }
118    }
119
120    pub fn root(&self) -> H::Digest {
121        self.log.root()
122    }
123
124    /// Get the value of `key` in the db, or None if it has no value.
125    pub async fn get(&self, key: &U::Key) -> Result<Option<U::Value>, crate::qmdb::Error<F>> {
126        // Collect to avoid holding a borrow across await points (rust-lang/rust#100013).
127        let locs: Vec<Location<F>> = self.snapshot.get(key).copied().collect();
128        let reader = self.log.reader().await;
129        for loc in locs {
130            let op = reader.read(*loc).await?;
131            let Operation::Update(data) = op else {
132                panic!("location does not reference update operation. loc={loc}");
133            };
134            if data.key() == key {
135                return Ok(Some(data.value().clone()));
136            }
137        }
138        Ok(None)
139    }
140
141    /// Return [start, end) where `start` and `end - 1` are the Locations of the oldest and newest
142    /// retained operations respectively.
143    pub async fn bounds(&self) -> std::ops::Range<Location<F>> {
144        let bounds = self.log.reader().await.bounds();
145        Location::new(bounds.start)..Location::new(bounds.end)
146    }
147
148    /// Return the pinned Merkle nodes for a lower operation boundary of `loc`.
149    pub async fn pinned_nodes_at(
150        &self,
151        loc: Location<F>,
152    ) -> Result<Vec<H::Digest>, crate::qmdb::Error<F>> {
153        if !loc.is_valid() {
154            return Err(crate::merkle::Error::LocationOverflow(loc).into());
155        }
156        let futs: Vec<_> = F::nodes_to_pin(loc)
157            .map(|p| async move {
158                self.log
159                    .merkle
160                    .get_node(p)
161                    .await?
162                    .ok_or(crate::merkle::Error::ElementPruned(p).into())
163            })
164            .collect();
165        futures::future::try_join_all(futs).await
166    }
167}
168
169// Functionality requiring Mutable journal.
170impl<F, E, U, C, I, H> Db<F, E, C, I, H, U>
171where
172    F: Family,
173    E: Context,
174    U: Update,
175    C: Mutable<Item = Operation<F, U>>,
176    I: UnorderedIndex<Value = Location<F>>,
177    H: Hasher,
178    Operation<F, U>: Codec,
179{
180    /// Prunes historical operations prior to `prune_loc`. This does not affect the db's root or
181    /// snapshot.
182    ///
183    /// # Errors
184    ///
185    /// - Returns [crate::qmdb::Error::PruneBeyondMinRequired] if `prune_loc` > inactivity floor.
186    /// - Returns [`crate::merkle::Error::LocationOverflow`] if `prune_loc` > [`crate::merkle::Family::MAX_LEAVES`].
187    pub async fn prune(&mut self, prune_loc: Location<F>) -> Result<(), crate::qmdb::Error<F>> {
188        if prune_loc > self.inactivity_floor_loc {
189            return Err(crate::qmdb::Error::PruneBeyondMinRequired(
190                prune_loc,
191                self.inactivity_floor_loc,
192            ));
193        }
194
195        self.log.prune(prune_loc).await?;
196
197        Ok(())
198    }
199
200    pub async fn historical_proof(
201        &self,
202        historical_size: Location<F>,
203        start_loc: Location<F>,
204        max_ops: NonZeroU64,
205    ) -> Result<(Proof<F, H::Digest>, Vec<Operation<F, U>>), crate::qmdb::Error<F>> {
206        self.log
207            .historical_proof(historical_size, start_loc, max_ops)
208            .await
209            .map_err(Into::into)
210    }
211
212    pub async fn proof(
213        &self,
214        loc: Location<F>,
215        max_ops: NonZeroU64,
216    ) -> Result<(Proof<F, H::Digest>, Vec<Operation<F, U>>), crate::qmdb::Error<F>> {
217        self.historical_proof(self.log.size().await, loc, max_ops)
218            .await
219    }
220
221    /// Rewind the database to `size` operations, where `size` is the location of the next append.
222    ///
223    /// This rewinds both the authenticated log and the in-memory snapshot, then restores metadata
224    /// (`last_commit_loc`, `inactivity_floor_loc`, `active_keys`) for the new tip commit.
225    ///
226    /// # Errors
227    ///
228    /// Returns an error when:
229    /// - `size` is not a valid rewind target
230    /// - the target's required logical range is not fully retained (for example, the target
231    ///   inactivity floor is pruned)
232    /// - `size - 1` is not a commit operation
233    ///
234    /// Any error from this method is fatal for this handle. Rewind may mutate journal state before
235    /// all in-memory structures are rebuilt. Callers must drop this database handle after any `Err`
236    /// from `rewind` and reopen from storage.
237    ///
238    /// Returns the list of locations restored to active state in the snapshot.
239    ///
240    /// A successful rewind is not restart-stable until a subsequent [`Db::commit`] or
241    /// [`Db::sync`].
242    pub async fn rewind(&mut self, size: Location<F>) -> Result<Vec<Location<F>>, Error<F>> {
243        let rewind_size = *size;
244        let current_size = *self.last_commit_loc + 1;
245
246        if rewind_size == current_size {
247            return Ok(Vec::new());
248        }
249        if rewind_size == 0 || rewind_size > current_size {
250            return Err(Error::Journal(JournalError::InvalidRewind(rewind_size)));
251        }
252
253        // Read everything needed for rewind before mutating storage.
254        let (rewind_floor, undos, active_keys_delta) = {
255            let reader = self.log.reader().await;
256            let bounds = reader.bounds();
257            let rewind_last_loc = Location::new(rewind_size - 1);
258            if rewind_size <= bounds.start {
259                return Err(Error::<F>::Journal(JournalError::ItemPruned(
260                    *rewind_last_loc,
261                )));
262            }
263            let rewind_last_op = reader.read(*rewind_last_loc).await?;
264            let Some(rewind_floor) = rewind_last_op.has_floor() else {
265                return Err(Error::UnexpectedData(rewind_last_loc));
266            };
267            if *rewind_floor < bounds.start {
268                return Err(Error::<F>::Journal(JournalError::ItemPruned(*rewind_floor)));
269            }
270
271            let mut undos = Vec::with_capacity((current_size - rewind_size) as usize);
272            let mut active_keys_delta = 0isize;
273            let mut prior_state_by_key: HashMap<U::Key, Option<Location<F>>> = HashMap::new();
274
275            // Reconstruct key state once in a single pass from the rewind floor.
276            for loc in *rewind_floor..current_size {
277                let op = reader.read(loc).await?;
278                let op_loc = Location::new(loc);
279                match op {
280                    Operation::CommitFloor(_, _) => {}
281                    Operation::Update(update) => {
282                        let key = update.key().clone();
283                        let previous_loc = prior_state_by_key.get(&key).copied().flatten();
284
285                        if loc >= rewind_size {
286                            if let Some(previous_loc) = previous_loc {
287                                undos.push(SnapshotUndo::Replace {
288                                    key: key.clone(),
289                                    old_loc: op_loc,
290                                    new_loc: previous_loc,
291                                });
292                            } else {
293                                active_keys_delta -= 1;
294                                undos.push(SnapshotUndo::Remove {
295                                    key: key.clone(),
296                                    old_loc: op_loc,
297                                });
298                            }
299                        }
300
301                        prior_state_by_key.insert(key, Some(op_loc));
302                    }
303                    Operation::Delete(key) => {
304                        let previous_loc = prior_state_by_key.get(&key).copied().flatten();
305
306                        if loc >= rewind_size {
307                            if let Some(previous_loc) = previous_loc {
308                                active_keys_delta += 1;
309                                undos.push(SnapshotUndo::Insert {
310                                    key: key.clone(),
311                                    new_loc: previous_loc,
312                                });
313                            }
314                        }
315
316                        prior_state_by_key.insert(key, None);
317                    }
318                }
319            }
320
321            // Undo operations must run from newest to oldest removed operation.
322            undos.reverse();
323
324            (rewind_floor, undos, active_keys_delta)
325        };
326
327        // Journal rewind happens before in-memory undo application. If any later step fails, this
328        // handle may be internally diverged and must be dropped by the caller. This step is not
329        // restart-stable until a later commit/sync boundary.
330        self.log.rewind(rewind_size).await?;
331
332        let mut restored_locs = Vec::new();
333        for undo in undos {
334            match undo {
335                SnapshotUndo::Replace {
336                    key,
337                    old_loc,
338                    new_loc,
339                } => {
340                    if new_loc < rewind_size {
341                        restored_locs.push(new_loc);
342                    }
343                    update_known_loc(&mut self.snapshot, &key, old_loc, new_loc);
344                }
345                SnapshotUndo::Remove { key, old_loc } => {
346                    delete_known_loc(&mut self.snapshot, &key, old_loc)
347                }
348                SnapshotUndo::Insert { key, new_loc } => {
349                    if new_loc < rewind_size {
350                        restored_locs.push(new_loc);
351                    }
352                    self.snapshot.insert(&key, new_loc);
353                }
354            }
355        }
356
357        self.active_keys = self
358            .active_keys
359            .checked_add_signed(active_keys_delta)
360            .ok_or(Error::DataCorrupted(
361                "active_keys underflow while rewinding",
362            ))?;
363        self.last_commit_loc = Location::new(rewind_size - 1);
364        self.inactivity_floor_loc = rewind_floor;
365
366        Ok(restored_locs)
367    }
368}
369
370// Functionality requiring Mutable + Persistable journal.
371impl<F, E, U, C, I, H> Db<F, E, C, I, H, U>
372where
373    F: Family,
374    E: Context,
375    U: Update,
376    C: Mutable<Item = Operation<F, U>> + Persistable<Error = JournalError>,
377    I: UnorderedIndex<Value = Location<F>>,
378    H: Hasher,
379    Operation<F, U>: Codec,
380{
381    /// Returns a [Db] initialized from `log`, using `callback` to report snapshot
382    /// building events.
383    ///
384    /// # Panics
385    ///
386    /// Panics if the log is empty or the last operation is not a commit floor operation.
387    pub async fn init_from_log<Cb>(
388        mut index: I,
389        log: AuthenticatedLog<F, E, C, H>,
390        known_inactivity_floor: Option<Location<F>>,
391        mut callback: Cb,
392    ) -> Result<Self, crate::qmdb::Error<F>>
393    where
394        Cb: FnMut(bool, Option<Location<F>>),
395    {
396        // If the last-known inactivity floor is behind the current floor, then invoke the callback
397        // appropriately to report the inactive bits.
398        let (last_commit_loc, inactivity_floor_loc, active_keys) = {
399            let reader = log.reader().await;
400            let last_commit_loc = reader
401                .bounds()
402                .end
403                .checked_sub(1)
404                .expect("commit should exist");
405            let last_commit = reader.read(last_commit_loc).await?;
406            let inactivity_floor_loc = last_commit.has_floor().expect("should be a commit");
407            if let Some(known_inactivity_floor) = known_inactivity_floor {
408                (*known_inactivity_floor..*inactivity_floor_loc)
409                    .for_each(|_| callback(false, None));
410            }
411
412            let active_keys =
413                build_snapshot_from_log(inactivity_floor_loc, &reader, &mut index, callback)
414                    .await?;
415            (
416                Location::new(last_commit_loc),
417                inactivity_floor_loc,
418                active_keys,
419            )
420        };
421
422        Ok(Self {
423            log,
424            inactivity_floor_loc,
425            snapshot: index,
426            last_commit_loc,
427            active_keys,
428            _update: core::marker::PhantomData,
429        })
430    }
431
432    /// Sync all database state to disk.
433    pub async fn sync(&self) -> Result<(), crate::qmdb::Error<F>> {
434        self.log.sync().await.map_err(Into::into)
435    }
436
437    /// Durably commit the journal state published by prior [`Db::apply_batch`]
438    /// calls.
439    pub async fn commit(&self) -> Result<(), crate::qmdb::Error<F>> {
440        self.log.commit().await.map_err(Into::into)
441    }
442
443    /// Destroy the db, removing all data from disk.
444    pub async fn destroy(self) -> Result<(), crate::qmdb::Error<F>> {
445        self.log.destroy().await.map_err(Into::into)
446    }
447}
448
449impl<F, E, U, C, I, H> Persistable for Db<F, E, C, I, H, U>
450where
451    F: Family,
452    E: Context,
453    U: Update,
454    C: Mutable<Item = Operation<F, U>> + Persistable<Error = JournalError>,
455    I: UnorderedIndex<Value = Location<F>>,
456    H: Hasher,
457    Operation<F, U>: Codec,
458{
459    type Error = crate::qmdb::Error<F>;
460
461    async fn commit(&self) -> Result<(), crate::qmdb::Error<F>> {
462        Self::commit(self).await
463    }
464
465    async fn sync(&self) -> Result<(), crate::qmdb::Error<F>> {
466        Self::sync(self).await
467    }
468
469    async fn destroy(self) -> Result<(), crate::qmdb::Error<F>> {
470        self.destroy().await
471    }
472}