commonware_storage/adb/any/fixed/
mod.rs

1//! An _Any_ authenticated database (ADB) provides succinct proofs of _any_ value ever associated
2//! with a key. Its implementation is based on an [Mmr] over a log of state-change operations backed
3//! by a [Journal].
4//!
5//! In an Any db, it is not possible to prove whether the value of a key is the currently active
6//! one, only that it was associated with the key at some point in the past. This type of
7//! authenticated database is most useful for applications involving keys that are given values once
8//! and cannot be updated after.
9
10use crate::{
11    adb::{operation::fixed::FixedOperation, Error},
12    index::{Cursor, Index},
13    journal::contiguous::fixed::{Config as JConfig, Journal},
14    mmr::{
15        bitmap::BitMap,
16        journaled::{Config as MmrConfig, Mmr},
17        Location, Position, Proof, StandardHasher as Standard,
18    },
19    translator::Translator,
20};
21use commonware_codec::Encode as _;
22use commonware_cryptography::Hasher as CHasher;
23use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage, ThreadPool};
24use futures::{
25    future::{try_join_all, TryFutureExt as _},
26    try_join,
27};
28use std::num::{NonZeroU64, NonZeroUsize};
29use tracing::{debug, warn};
30
31pub mod ordered;
32pub mod sync;
33pub mod unordered;
34
35/// The size of the read buffer to use for replaying the operations log when rebuilding the
36/// snapshot.
37const SNAPSHOT_READ_BUFFER_SIZE: usize = 1 << 16;
38
39/// Configuration for an `Any` authenticated db.
40#[derive(Clone)]
41pub struct Config<T: Translator> {
42    /// The name of the [Storage] partition used for the MMR's backing journal.
43    pub mmr_journal_partition: String,
44
45    /// The items per blob configuration value used by the MMR journal.
46    pub mmr_items_per_blob: NonZeroU64,
47
48    /// The size of the write buffer to use for each blob in the MMR journal.
49    pub mmr_write_buffer: NonZeroUsize,
50
51    /// The name of the [Storage] partition used for the MMR's metadata.
52    pub mmr_metadata_partition: String,
53
54    /// The name of the [Storage] partition used to persist the (pruned) log of operations.
55    pub log_journal_partition: String,
56
57    /// The items per blob configuration value used by the log journal.
58    pub log_items_per_blob: NonZeroU64,
59
60    /// The size of the write buffer to use for each blob in the log journal.
61    pub log_write_buffer: NonZeroUsize,
62
63    /// The translator used by the compressed index.
64    pub translator: T,
65
66    /// An optional thread pool to use for parallelizing batch operations.
67    pub thread_pool: Option<ThreadPool>,
68
69    /// The buffer pool to use for caching data.
70    pub buffer_pool: PoolRef,
71}
72
73/// Initialize and return the mmr and log from the given config, correcting any inconsistencies
74/// between them. Any uncommitted operations in the log will be rolled back and the state of the
75/// db will be as of the last committed operation.
76pub(crate) async fn init_mmr_and_log<
77    E: Storage + Clock + Metrics,
78    O: FixedOperation,
79    H: CHasher,
80    T: Translator,
81>(
82    context: E,
83    cfg: Config<T>,
84    hasher: &mut Standard<H>,
85) -> Result<(Location, Mmr<E, H>, Journal<E, O>), Error> {
86    let mut mmr = Mmr::init(
87        context.with_label("mmr"),
88        hasher,
89        MmrConfig {
90            journal_partition: cfg.mmr_journal_partition,
91            metadata_partition: cfg.mmr_metadata_partition,
92            items_per_blob: cfg.mmr_items_per_blob,
93            write_buffer: cfg.mmr_write_buffer,
94            thread_pool: cfg.thread_pool,
95            buffer_pool: cfg.buffer_pool.clone(),
96        },
97    )
98    .await?;
99
100    let mut log: Journal<E, O> = Journal::init(
101        context.with_label("log"),
102        JConfig {
103            partition: cfg.log_journal_partition,
104            items_per_blob: cfg.log_items_per_blob,
105            write_buffer: cfg.log_write_buffer,
106            buffer_pool: cfg.buffer_pool,
107        },
108    )
109    .await?;
110
111    // Back up over / discard any uncommitted operations in the log.
112    let mut log_size: Location = log.size().await.into();
113    let mut rewind_leaf_num = log_size;
114    let mut inactivity_floor_loc = Location::new_unchecked(0);
115    while rewind_leaf_num > 0 {
116        let op = log.read(rewind_leaf_num.as_u64() - 1).await?;
117        if let Some(loc) = op.commit_floor() {
118            inactivity_floor_loc = loc;
119            break;
120        }
121        rewind_leaf_num -= 1;
122    }
123    if rewind_leaf_num != log_size {
124        let op_count = log_size - rewind_leaf_num;
125        warn!(
126            ?log_size,
127            ?op_count,
128            "rewinding over uncommitted log operations"
129        );
130        log.rewind(rewind_leaf_num.as_u64()).await?;
131        log.sync().await?;
132        log_size = rewind_leaf_num;
133    }
134
135    // Pop any MMR elements that are ahead of the last log commit point.
136    let mut next_mmr_leaf_num = mmr.leaves();
137    if next_mmr_leaf_num > log_size {
138        let op_count = next_mmr_leaf_num - log_size;
139        warn!(?log_size, ?op_count, "popping uncommitted MMR operations");
140        mmr.pop(op_count.as_u64() as usize).await?;
141        next_mmr_leaf_num = log_size;
142    }
143
144    // If the MMR is behind, replay log operations to catch up.
145    if next_mmr_leaf_num < log_size {
146        let op_count = log_size - next_mmr_leaf_num;
147        warn!(
148            ?log_size,
149            ?op_count,
150            "MMR lags behind log, replaying log to catch up"
151        );
152        while next_mmr_leaf_num < log_size {
153            let op = log.read(next_mmr_leaf_num.as_u64()).await?;
154            mmr.add_batched(hasher, &op.encode()).await?;
155            next_mmr_leaf_num += 1;
156        }
157        mmr.sync(hasher).await.map_err(Error::Mmr)?;
158    }
159
160    // At this point the MMR and log should be consistent.
161    assert_eq!(log.size().await, mmr.leaves());
162
163    Ok((inactivity_floor_loc, mmr, log))
164}
165
166/// Common implementation for pruning an Any database.
167///
168/// # Errors
169///
170/// - Returns [crate::mmr::Error::LocationOverflow] if `target_prune_loc` >
171///   [crate::mmr::MAX_LOCATION].
172/// - Returns [crate::mmr::Error::RangeOutOfBounds] if `target_prune_loc` is greater than the
173///   inactivity floor.
174async fn prune_db<E, O, H>(
175    mmr: &mut Mmr<E, H>,
176    log: &mut Journal<E, O>,
177    hasher: &mut Standard<H>,
178    target_prune_loc: Location,
179    inactivity_floor_loc: Location,
180    op_count: Location,
181) -> Result<(), Error>
182where
183    E: Storage + Clock + Metrics,
184    O: FixedOperation,
185    H: CHasher,
186{
187    if target_prune_loc > inactivity_floor_loc {
188        return Err(crate::mmr::Error::RangeOutOfBounds(target_prune_loc).into());
189    }
190    let target_prune_pos = Position::try_from(target_prune_loc)?;
191
192    if mmr.size() == 0 {
193        // DB is empty, nothing to prune.
194        return Ok(());
195    };
196
197    // Sync the mmr before pruning the log, otherwise the MMR tip could end up behind the log's
198    // pruning boundary on restart from an unclean shutdown, and there would be no way to replay
199    // the operations between the MMR tip and the log pruning boundary.
200    mmr.sync(hasher).await?;
201
202    if !log.prune(target_prune_loc.as_u64()).await? {
203        return Ok(());
204    }
205
206    debug!(
207        log_size = op_count.as_u64(),
208        ?target_prune_loc,
209        "pruned inactive ops"
210    );
211
212    mmr.prune_to_pos(hasher, target_prune_pos).await?;
213
214    Ok(())
215}
216
217/// Common implementation for historical_proof.
218///
219/// Generates a proof with respect to the state of the MMR when it had `op_count` operations.
220///
221/// # Errors
222///
223/// - Returns [crate::mmr::Error::LocationOverflow] if `op_count` or `start_loc` >
224///   [crate::mmr::MAX_LOCATION].
225/// - Returns [crate::mmr::Error::RangeOutOfBounds] if `start_loc` >= `op_count` or `op_count` >
226///   number of operations in the log.
227/// - Returns [`Error::OperationPruned`] if `start_loc` has been pruned.
228async fn historical_proof<E, O, H>(
229    mmr: &Mmr<E, H>,
230    log: &Journal<E, O>,
231    op_count: Location,
232    start_loc: Location,
233    max_ops: NonZeroU64,
234) -> Result<(Proof<H::Digest>, Vec<O>), Error>
235where
236    E: Storage + Clock + Metrics,
237    O: FixedOperation,
238    H: CHasher,
239{
240    let size = Location::new_unchecked(log.size().await);
241    if op_count > size {
242        return Err(crate::mmr::Error::RangeOutOfBounds(size).into());
243    }
244    if start_loc >= op_count {
245        return Err(crate::mmr::Error::RangeOutOfBounds(start_loc).into());
246    }
247    let end_loc = std::cmp::min(op_count, start_loc.saturating_add(max_ops.get()));
248
249    let mmr_size = Position::try_from(op_count)?;
250    let proof = mmr
251        .historical_range_proof(mmr_size, start_loc..end_loc)
252        .await?;
253
254    let mut ops = Vec::with_capacity((end_loc.as_u64() - start_loc.as_u64()) as usize);
255    let futures = (start_loc.as_u64()..end_loc.as_u64())
256        .map(|i| log.read(i))
257        .collect::<Vec<_>>();
258    try_join_all(futures)
259        .await?
260        .into_iter()
261        .for_each(|op| ops.push(op));
262
263    Ok((proof, ops))
264}
265
266/// Update the location of `key` to `new_loc` in the snapshot and return its old location, or insert
267/// it if the key isn't already present.
268async fn update_loc<E, I: Index<Value = Location>, O>(
269    snapshot: &mut I,
270    log: &Journal<E, O>,
271    key: &<O as FixedOperation>::Key,
272    new_loc: Location,
273) -> Result<Option<Location>, Error>
274where
275    E: Storage + Clock + Metrics,
276    O: FixedOperation,
277{
278    // If the translated key is not in the snapshot, insert the new location. Otherwise, get a
279    // cursor to look for the key.
280    let Some(mut cursor) = snapshot.get_mut_or_insert(key, new_loc) else {
281        return Ok(None);
282    };
283
284    // Find the matching key among all conflicts, then update its location.
285    if let Some(loc) = find_update_op(log, &mut cursor, key).await? {
286        assert!(new_loc > loc);
287        cursor.update(new_loc);
288        return Ok(Some(loc));
289    }
290
291    // The key wasn't in the snapshot, so add it to the cursor.
292    cursor.insert(new_loc);
293
294    Ok(None)
295}
296
297/// Delete `key` from the snapshot if it exists, returning the location that was previously
298/// associated with it.
299async fn delete_key<E, I, O>(
300    snapshot: &mut I,
301    log: &Journal<E, O>,
302    key: &O::Key,
303) -> Result<Option<Location>, Error>
304where
305    E: Storage + Clock + Metrics,
306    I: Index<Value = Location>,
307    O: FixedOperation,
308{
309    // If the translated key is in the snapshot, get a cursor to look for the key.
310    let Some(mut cursor) = snapshot.get_mut(key) else {
311        return Ok(None);
312    };
313
314    // Find the matching key among all conflicts, then delete it.
315    let Some(loc) = find_update_op(log, &mut cursor, key).await? else {
316        return Ok(None);
317    };
318    cursor.delete();
319
320    Ok(Some(loc))
321}
322
323/// Find and return the location of the update operation for `key`, if it exists. The cursor is
324/// positioned at the matching location, and can be used to update or delete the key.
325async fn find_update_op<E, C, O>(
326    log: &Journal<E, O>,
327    cursor: &mut C,
328    key: &<O as FixedOperation>::Key,
329) -> Result<Option<Location>, Error>
330where
331    E: Storage + Clock + Metrics,
332    C: Cursor<Value = Location>,
333    O: FixedOperation,
334{
335    while let Some(&loc) = cursor.next() {
336        let op = log.read(*loc).await?;
337        let k = op.key().expect("operation without key");
338        if *k == *key {
339            return Ok(Some(loc));
340        }
341    }
342
343    Ok(None)
344}
345
346/// A wrapper of DB state required for invoking operations shared across variants.
347pub(crate) struct Shared<
348    'a,
349    E: Storage + Clock + Metrics,
350    I: Index<Value = Location>,
351    O: FixedOperation,
352    H: CHasher,
353> {
354    pub snapshot: &'a mut I,
355    pub mmr: &'a mut Mmr<E, H>,
356    pub log: &'a mut Journal<E, O>,
357    pub hasher: &'a mut Standard<H>,
358}
359
360impl<E, I, O, H> Shared<'_, E, I, O, H>
361where
362    E: Storage + Clock + Metrics,
363    I: Index<Value = Location>,
364    O: FixedOperation,
365    H: CHasher,
366{
367    /// Append `op` to the log and add it to the MMR. The operation will be subject to rollback
368    /// until the next successful `commit`.
369    pub(crate) async fn apply_op(&mut self, op: O) -> Result<(), Error> {
370        let encoded_op = op.encode();
371
372        // Append operation to the log and update the MMR in parallel.
373        try_join!(
374            self.mmr
375                .add_batched(self.hasher, &encoded_op)
376                .map_err(Error::Mmr),
377            self.log.append(op).map_err(Error::Journal)
378        )?;
379
380        Ok(())
381    }
382
383    /// Moves the given operation to the tip of the log if it is active, rendering its old location
384    /// inactive. If the operation was not active, then this is a no-op. Returns the old location of
385    /// the operation if it was active.
386    pub(crate) async fn move_op_if_active(
387        &mut self,
388        op: O,
389        old_loc: Location,
390    ) -> Result<Option<Location>, Error> {
391        // If the translated key is not in the snapshot, get a cursor to look for the key.
392        let Some(key) = op.key() else {
393            return Ok(None); // operations without keys cannot be active
394        };
395        let Some(mut cursor) = self.snapshot.get_mut(key) else {
396            return Ok(None);
397        };
398
399        // Find the snapshot entry corresponding to the operation.
400        if cursor.find(|&loc| *loc == old_loc) {
401            // Update the operation's snapshot location to point to tip.
402            let tip_loc = Location::new_unchecked(self.log.size().await);
403            cursor.update(tip_loc);
404            drop(cursor);
405
406            // Apply the operation at tip.
407            self.apply_op(op).await?;
408            return Ok(Some(old_loc));
409        }
410
411        // The operation is not active, so this is a no-op.
412        Ok(None)
413    }
414
415    /// Raise the inactivity floor by taking one _step_, which involves searching for the first
416    /// active operation above the inactivity floor, moving it to tip, and then setting the
417    /// inactivity floor to the location following the moved operation. This method is therefore
418    /// guaranteed to raise the floor by at least one. Returns the new inactivity floor location.
419    ///
420    /// # Panics
421    ///
422    /// Expects there is at least one active operation above the inactivity floor, and panics otherwise.
423    async fn raise_floor(&mut self, mut inactivity_floor_loc: Location) -> Result<Location, Error>
424    where
425        E: Storage + Clock + Metrics,
426        I: Index<Value = Location>,
427        H: CHasher,
428        O: FixedOperation,
429    {
430        // Search for the first active operation above the inactivity floor and move it to tip.
431        //
432        // TODO(https://github.com/commonwarexyz/monorepo/issues/1829): optimize this w/ a bitmap.
433        loop {
434            let tip_loc = Location::new_unchecked(self.log.size().await);
435            assert!(
436                *inactivity_floor_loc < tip_loc,
437                "no active operations above the inactivity floor"
438            );
439            let old_loc = inactivity_floor_loc;
440            inactivity_floor_loc += 1;
441            let op = self.log.read(*old_loc).await?;
442            if self.move_op_if_active(op, old_loc).await?.is_some() {
443                return Ok(inactivity_floor_loc);
444            }
445        }
446    }
447
448    /// Same as `raise_floor` but uses the status bitmap to more efficiently find the first active
449    /// operation above the inactivity floor.
450    ///
451    /// # Panics
452    ///
453    /// Panics if there is not at least one active operation above the inactivity floor.
454    pub(crate) async fn raise_floor_with_bitmap<const N: usize>(
455        &mut self,
456        status: &mut BitMap<H, N>,
457        mut inactivity_floor_loc: Location,
458    ) -> Result<Location, Error>
459    where
460        E: Storage + Clock + Metrics,
461        I: Index<Value = Location>,
462        O: FixedOperation,
463        H: CHasher,
464    {
465        // Use the status bitmap to find the first active operation above the inactivity floor.
466        while !status.get_bit(*inactivity_floor_loc) {
467            inactivity_floor_loc += 1;
468        }
469
470        // Move the active operation to tip.
471        let op = self.log.read(*inactivity_floor_loc).await?;
472        let loc = self
473            .move_op_if_active(op, inactivity_floor_loc)
474            .await?
475            .expect("op should be active based on status bitmap");
476        status.set_bit(*loc, false);
477        status.push(true);
478
479        // Advance inactivity floor above the moved operation since we know it's inactive.
480        inactivity_floor_loc += 1;
481
482        Ok(inactivity_floor_loc)
483    }
484
485    /// Sync the log and process the updates to the MMR in parallel.
486    async fn sync_and_process_updates(&mut self) -> Result<(), Error> {
487        let mmr_fut = async {
488            self.mmr.merkleize(self.hasher);
489            Ok::<(), Error>(())
490        };
491        try_join!(self.log.sync().map_err(Error::Journal), mmr_fut)?;
492
493        Ok(())
494    }
495
496    /// Sync the log and the MMR to disk.
497    async fn sync(&mut self) -> Result<(), Error> {
498        try_join!(
499            self.log.sync().map_err(Error::Journal),
500            self.mmr.sync(self.hasher).map_err(Error::Mmr)
501        )?;
502
503        Ok(())
504    }
505}