commonware_storage/adb/immutable/
mod.rs

1//! An authenticated database (ADB) that only supports adding new keyed values (no updates or
2//! deletions), where values can have varying sizes.
3
4use crate::{
5    adb::{any::fixed::sync::init_journal, Error},
6    index::Index,
7    journal::{fixed, variable},
8    mmr::{
9        hasher::Standard,
10        iterator::{leaf_num_to_pos, leaf_pos_to_num},
11        journaled::{Config as MmrConfig, Mmr},
12        verification::Proof,
13    },
14    store::operation::Variable,
15    translator::Translator,
16};
17use commonware_codec::{Codec, Encode as _, Read};
18use commonware_cryptography::Hasher as CHasher;
19use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage as RStorage, ThreadPool};
20use commonware_utils::{Array, NZUsize};
21use futures::{future::TryFutureExt, pin_mut, try_join, StreamExt};
22use std::num::{NonZeroU64, NonZeroUsize};
23use tracing::{debug, warn};
24
25pub mod sync;
26
27/// The size of the read buffer to use for replaying the operations log when rebuilding the
28/// snapshot. The exact value does not impact performance significantly as long as it is large
29/// enough, so we don't make it configurable.
30const SNAPSHOT_READ_BUFFER_SIZE: usize = 1 << 16;
31
32/// Configuration for an [Immutable] authenticated db.
33#[derive(Clone)]
34pub struct Config<T: Translator, C> {
35    /// The name of the [RStorage] partition used for the MMR's backing journal.
36    pub mmr_journal_partition: String,
37
38    /// The items per blob configuration value used by the MMR journal.
39    pub mmr_items_per_blob: NonZeroU64,
40
41    /// The size of the write buffer to use for each blob in the MMR journal.
42    pub mmr_write_buffer: NonZeroUsize,
43
44    /// The name of the [RStorage] partition used for the MMR's metadata.
45    pub mmr_metadata_partition: String,
46
47    /// The name of the [RStorage] partition used to persist the (pruned) log of operations.
48    pub log_journal_partition: String,
49
50    /// The size of the write buffer to use for each blob in the log journal.
51    pub log_write_buffer: NonZeroUsize,
52
53    /// Optional compression level (using `zstd`) to apply to log data before storing.
54    pub log_compression: Option<u8>,
55
56    /// The codec configuration to use for encoding and decoding log items.
57    pub log_codec_config: C,
58
59    /// The number of items to put in each section of the journal.
60    pub log_items_per_section: NonZeroU64,
61
62    /// The name of the [RStorage] partition used for the location map.
63    pub locations_journal_partition: String,
64
65    /// The number of items to put in each blob in the location map.
66    pub locations_items_per_blob: NonZeroU64,
67
68    /// The translator used by the compressed index.
69    pub translator: T,
70
71    /// An optional thread pool to use for parallelizing batch operations.
72    pub thread_pool: Option<ThreadPool>,
73
74    /// The buffer pool to use for caching data.
75    pub buffer_pool: PoolRef,
76}
77
78/// An authenticatable key-value database based on an MMR that does not allow updates or deletions
79/// of previously set keys.
80pub struct Immutable<E: RStorage + Clock + Metrics, K: Array, V: Codec, H: CHasher, T: Translator> {
81    /// An MMR over digests of the operations applied to the db.
82    ///
83    /// # Invariant
84    ///
85    /// The number of leaves in this MMR always equals the number of operations in the unpruned
86    /// `log` and `locations`.
87    mmr: Mmr<E, H>,
88
89    /// A log of all operations applied to the db in order of occurrence. The _location_ of an
90    /// operation is its order of occurrence with respect to this log, and corresponds to its leaf
91    /// number in the MMR.
92    log: variable::Journal<E, Variable<K, V>>,
93
94    /// The number of operations that have been appended to the log (which must equal the number of
95    /// leaves in the MMR).
96    log_size: u64,
97
98    /// The number of items to put in each section of the journal.
99    log_items_per_section: u64,
100
101    /// A fixed-length journal that maps an operation's location to its offset within its respective
102    /// section of the log. (The section number is derived from location.)
103    locations: fixed::Journal<E, u32>,
104
105    /// The location of the oldest retained operation, or 0 if no operations have been added.
106    oldest_retained_loc: u64,
107
108    /// A map from each active key to the location of the operation that set its value.
109    ///
110    /// # Invariant
111    ///
112    /// Only references operations of type [Variable::Set].
113    snapshot: Index<T, u64>,
114
115    /// Cryptographic hasher to re-use within mutable operations requiring digest computation.
116    hasher: Standard<H>,
117
118    /// The location of the last commit operation, or None if no commit has been made.
119    last_commit: Option<u64>,
120}
121
122impl<E: RStorage + Clock + Metrics, K: Array, V: Codec, H: CHasher, T: Translator>
123    Immutable<E, K, V, H, T>
124{
125    /// Returns an [Immutable] adb initialized from `cfg`. Any uncommitted log operations will be
126    /// discarded and the state of the db will be as of the last committed operation.
127    pub async fn init(
128        context: E,
129        cfg: Config<T, <Variable<K, V> as Read>::Cfg>,
130    ) -> Result<Self, Error> {
131        let mut hasher = Standard::<H>::new();
132
133        let mut mmr = Mmr::init(
134            context.with_label("mmr"),
135            &mut hasher,
136            MmrConfig {
137                journal_partition: cfg.mmr_journal_partition,
138                metadata_partition: cfg.mmr_metadata_partition,
139                items_per_blob: cfg.mmr_items_per_blob,
140                write_buffer: cfg.mmr_write_buffer,
141                thread_pool: cfg.thread_pool,
142                buffer_pool: cfg.buffer_pool.clone(),
143            },
144        )
145        .await?;
146
147        let mut log = variable::Journal::init(
148            context.with_label("log"),
149            variable::Config {
150                partition: cfg.log_journal_partition,
151                compression: cfg.log_compression,
152                codec_config: cfg.log_codec_config,
153                buffer_pool: cfg.buffer_pool.clone(),
154                write_buffer: cfg.log_write_buffer,
155            },
156        )
157        .await?;
158
159        let mut locations = fixed::Journal::init(
160            context.with_label("locations"),
161            fixed::Config {
162                partition: cfg.locations_journal_partition,
163                items_per_blob: cfg.locations_items_per_blob,
164                write_buffer: cfg.log_write_buffer,
165                buffer_pool: cfg.buffer_pool,
166            },
167        )
168        .await?;
169
170        let mut snapshot: Index<T, u64> =
171            Index::init(context.with_label("snapshot"), cfg.translator.clone());
172        let (log_size, oldest_retained_loc) = Self::build_snapshot_from_log(
173            &mut hasher,
174            cfg.log_items_per_section,
175            &mut mmr,
176            &mut log,
177            &mut locations,
178            &mut snapshot,
179        )
180        .await?;
181
182        let last_commit = log_size.checked_sub(1);
183
184        Ok(Immutable {
185            mmr,
186            log,
187            log_size,
188            oldest_retained_loc,
189            locations,
190            log_items_per_section: cfg.log_items_per_section.get(),
191            snapshot,
192            hasher,
193            last_commit,
194        })
195    }
196
197    /// Returns an [Immutable] built from the config and sync data in `cfg`.
198    #[allow(clippy::type_complexity)]
199    pub async fn init_synced(
200        context: E,
201        mut cfg: sync::Config<E, K, V, T, H::Digest, <Variable<K, V> as Read>::Cfg>,
202    ) -> Result<Self, Error> {
203        // Initialize MMR for sync
204        let mut mmr = Mmr::init_sync(
205            context.with_label("mmr"),
206            crate::mmr::journaled::SyncConfig {
207                config: MmrConfig {
208                    journal_partition: cfg.db_config.mmr_journal_partition,
209                    metadata_partition: cfg.db_config.mmr_metadata_partition,
210                    items_per_blob: cfg.db_config.mmr_items_per_blob,
211                    write_buffer: cfg.db_config.mmr_write_buffer,
212                    thread_pool: cfg.db_config.thread_pool.clone(),
213                    buffer_pool: cfg.db_config.buffer_pool.clone(),
214                },
215                lower_bound: leaf_num_to_pos(cfg.lower_bound),
216                upper_bound: leaf_num_to_pos(cfg.upper_bound + 1) - 1,
217                pinned_nodes: cfg.pinned_nodes,
218            },
219        )
220        .await
221        .map_err(Error::Mmr)?;
222
223        // Initialize locations journal for sync
224        let mut locations = init_journal(
225            context.with_label("locations"),
226            fixed::Config {
227                partition: cfg.db_config.locations_journal_partition,
228                items_per_blob: cfg.db_config.locations_items_per_blob,
229                write_buffer: cfg.db_config.log_write_buffer,
230                buffer_pool: cfg.db_config.buffer_pool.clone(),
231            },
232            cfg.lower_bound,
233            cfg.upper_bound,
234        )
235        .await?;
236
237        // Build snapshot from the log
238        let mut snapshot = Index::init(
239            context.with_label("snapshot"),
240            cfg.db_config.translator.clone(),
241        );
242        let (log_size, oldest_retained_loc) = Self::build_snapshot_from_log(
243            &mut Standard::<H>::new(),
244            cfg.db_config.log_items_per_section,
245            &mut mmr,
246            &mut cfg.log,
247            &mut locations,
248            &mut snapshot,
249        )
250        .await?;
251
252        let last_commit = log_size.checked_sub(1);
253
254        let mut db = Immutable {
255            mmr,
256            log: cfg.log,
257            log_size,
258            oldest_retained_loc,
259            locations,
260            log_items_per_section: cfg.db_config.log_items_per_section.get(),
261            snapshot,
262            hasher: Standard::<H>::new(),
263            last_commit,
264        };
265
266        db.sync().await?;
267        Ok(db)
268    }
269
270    /// Builds the database's snapshot by replaying the log from inception, while also:
271    ///   - trimming any uncommitted operations from the log,
272    ///   - adding log operations to the MMR & location map if they are missing,
273    ///   - removing any elements from the MMR & location map that don't remain in the log after
274    ///     trimming.
275    ///
276    /// Returns the number of operations in the log and the oldest retained location.
277    ///
278    /// # Post-condition
279    ///
280    /// The number of operations in the log, locations, and the number of leaves in the MMR are
281    /// equal.
282    pub(super) async fn build_snapshot_from_log(
283        hasher: &mut Standard<H>,
284        log_items_per_section: NonZeroU64,
285        mmr: &mut Mmr<E, H>,
286        log: &mut variable::Journal<E, Variable<K, V>>,
287        locations: &mut fixed::Journal<E, u32>,
288        snapshot: &mut Index<T, u64>,
289    ) -> Result<(u64, u64), Error> {
290        // Align the mmr with the location map.
291        let mut mmr_leaves = super::align_mmr_and_locations(mmr, locations).await?;
292
293        // The number of operations in the log.
294        let mut log_size = 0;
295        // The location and blob-offset of the first operation to follow the last known commit point.
296        let mut after_last_commit = None;
297        // A list of uncommitted operations that must be rolled back, in order of their locations.
298        let mut uncommitted_ops = Vec::new();
299        let mut oldest_retained_loc = None;
300
301        // Replay the log from inception to build the snapshot, keeping track of any uncommitted
302        // operations that must be rolled back, and any log operations that need to be re-added to
303        // the MMR & locations.
304        {
305            let stream = log
306                .replay(0, 0, NZUsize!(SNAPSHOT_READ_BUFFER_SIZE))
307                .await?;
308            pin_mut!(stream);
309            while let Some(result) = stream.next().await {
310                match result {
311                    Err(e) => {
312                        return Err(Error::Journal(e));
313                    }
314                    Ok((section, offset, _, op)) => {
315                        if oldest_retained_loc.is_none() {
316                            log_size = section * log_items_per_section.get();
317                            oldest_retained_loc = Some(log_size);
318                        }
319
320                        let loc = log_size; // location of the current operation.
321                        if after_last_commit.is_none() {
322                            after_last_commit = Some((loc, offset));
323                        }
324
325                        log_size += 1;
326
327                        // Consistency check: confirm the provided section matches what we expect from this operation's
328                        // index.
329                        let expected = loc / log_items_per_section.get();
330                        assert_eq!(section, expected,
331                                "section {section} did not match expected session {expected} from location {loc}");
332
333                        if log_size > mmr_leaves {
334                            debug!(
335                                section,
336                                offset, "operation was missing from MMR/location map"
337                            );
338                            mmr.add(hasher, &op.encode()).await?;
339                            locations.append(offset).await?;
340                            mmr_leaves += 1;
341                        }
342                        match op {
343                            Variable::Set(key, _) => {
344                                uncommitted_ops.push((key, loc));
345                            }
346                            Variable::Commit(_) => {
347                                for (key, loc) in uncommitted_ops.iter() {
348                                    snapshot.insert(key, *loc);
349                                }
350                                uncommitted_ops.clear();
351                                after_last_commit = None;
352                            }
353                            _ => {
354                                unreachable!(
355                                    "unsupported operation at offset {offset} in section {section}"
356                                );
357                            }
358                        }
359                    }
360                }
361            }
362        }
363
364        // Rewind the operations log if necessary.
365        if let Some((end_loc, end_offset)) = after_last_commit {
366            assert!(!uncommitted_ops.is_empty());
367            warn!(
368                op_count = uncommitted_ops.len(),
369                log_size = end_loc,
370                end_offset,
371                "rewinding over uncommitted operations at end of log"
372            );
373            let prune_to_section = end_loc / log_items_per_section.get();
374            log.rewind_to_offset(prune_to_section, end_offset).await?;
375            log.sync(prune_to_section).await?;
376            log_size = end_loc;
377        }
378
379        // Pop any MMR elements that are ahead of the last log commit point.
380        if mmr_leaves > log_size {
381            locations.rewind(log_size).await?;
382            locations.sync().await?;
383
384            let op_count = mmr_leaves - log_size;
385            warn!(op_count, "popping uncommitted MMR operations");
386            mmr.pop(op_count as usize).await?;
387        }
388
389        // Confirm post-conditions hold.
390        assert_eq!(log_size, leaf_pos_to_num(mmr.size()).unwrap());
391        assert_eq!(log_size, locations.size().await?);
392
393        Ok((log_size, oldest_retained_loc.unwrap_or(0)))
394    }
395
396    /// Returns the section of the log where we are currently writing new items.
397    fn current_section(&self) -> u64 {
398        self.log_size / self.log_items_per_section
399    }
400
401    /// Return the oldest location that remains retrievable.
402    pub fn oldest_retained_loc(&self) -> Option<u64> {
403        if self.log_size == 0 {
404            None
405        } else {
406            Some(self.oldest_retained_loc)
407        }
408    }
409
410    /// Prunes the db of up to all operations that have location less than `loc`. The actual number
411    /// pruned may be fewer than requested due to blob boundaries.
412    ///
413    /// # Panics
414    ///
415    /// Panics if `loc` is beyond the last commit point.
416    pub async fn prune(&mut self, loc: u64) -> Result<(), Error> {
417        assert!(loc <= self.last_commit.unwrap_or(0));
418
419        // Prune the log up to the section containing the requested pruning location. We always
420        // prune the log first, and then prune the MMR+locations structures based on the log's
421        // actual pruning boundary. This procedure ensures all log operations always have
422        // corresponding MMR & location entries, even in the event of failures, with no need for
423        // special recovery.
424        let section = loc / self.log_items_per_section;
425        self.log.prune(section).await?;
426        self.oldest_retained_loc = section * self.log_items_per_section;
427
428        // Prune the MMR & locations map up to the oldest retained item in the log after pruning.
429        self.locations.prune(self.oldest_retained_loc).await?;
430        self.mmr
431            .prune_to_pos(&mut self.hasher, leaf_num_to_pos(self.oldest_retained_loc))
432            .await
433            .map_err(Error::Mmr)
434    }
435
436    /// Get the value of `key` in the db, or None if it has no value or its corresponding operation
437    /// has been pruned.
438    pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
439        let iter = self.snapshot.get(key);
440        for &loc in iter {
441            if loc < self.oldest_retained_loc {
442                continue;
443            }
444            if let Some(v) = self.get_from_loc(key, loc).await? {
445                return Ok(Some(v));
446            }
447        }
448
449        Ok(None)
450    }
451
452    /// Get the value of the operation with location `loc` in the db. Returns [Error::OperationPruned]
453    /// if loc precedes the oldest retained location. The location is otherwise assumed valid.
454    pub async fn get_loc(&self, loc: u64) -> Result<Option<V>, Error> {
455        assert!(loc < self.op_count());
456        if loc < self.oldest_retained_loc {
457            return Err(Error::OperationPruned(loc));
458        }
459
460        let offset = self.locations.read(loc).await?;
461        let section = loc / self.log_items_per_section;
462        let op = self.log.get(section, offset).await?;
463
464        Ok(op.into_value())
465    }
466
467    /// Get the value of the operation with location `loc` in the db if it matches `key`. Returns
468    /// [Error::OperationPruned] if loc precedes the oldest retained location. The location is
469    /// otherwise assumed valid.
470    pub async fn get_from_loc(&self, key: &K, loc: u64) -> Result<Option<V>, Error> {
471        if loc < self.oldest_retained_loc {
472            return Err(Error::OperationPruned(loc));
473        }
474
475        match self.locations.read(loc).await {
476            Ok(offset) => {
477                return self.get_from_offset(key, loc, offset).await;
478            }
479            Err(e) => Err(Error::Journal(e)),
480        }
481    }
482
483    /// Get the value of the operation with location `loc` and offset `offset` in the log if it
484    /// matches `key`, or return [Error::OperationPruned] if the location precedes the oldest
485    /// retained.
486    async fn get_from_offset(&self, key: &K, loc: u64, offset: u32) -> Result<Option<V>, Error> {
487        if loc < self.oldest_retained_loc {
488            return Err(Error::OperationPruned(loc));
489        }
490
491        let section = loc / self.log_items_per_section;
492        let Variable::Set(k, v) = self.log.get(section, offset).await? else {
493            panic!("didn't find Set operation at location {loc} and offset {offset}");
494        };
495
496        if k != *key {
497            Ok(None)
498        } else {
499            Ok(Some(v))
500        }
501    }
502
503    /// Get the number of operations that have been applied to this db, including those that are not
504    /// yet committed.
505    pub fn op_count(&self) -> u64 {
506        self.log_size
507    }
508
509    /// Sets `key` to have value `value`, assuming `key` hasn't already been assigned. The operation
510    /// is reflected in the snapshot, but will be subject to rollback until the next successful
511    /// `commit`. Attempting to set an already-set key results in undefined behavior.
512    ///
513    /// Any keys that have been pruned and map to the same translated key will be dropped
514    /// during this call.
515    pub async fn set(&mut self, key: K, value: V) -> Result<(), Error> {
516        let loc = self.log_size;
517        self.snapshot
518            .insert_and_prune(&key, loc, |v| *v < self.oldest_retained_loc);
519
520        let op = Variable::Set(key, value);
521        self.apply_op(op).await
522    }
523
524    /// Return the root of the db.
525    ///
526    /// # Warning
527    ///
528    /// Panics if there are uncommitted operations.
529    pub fn root(&self, hasher: &mut Standard<H>) -> H::Digest {
530        self.mmr.root(hasher)
531    }
532
533    /// Update the operations MMR with the given operation, and append the operation to the log. The
534    /// `commit` method must be called to make any applied operation persistent & recoverable.
535    pub(super) async fn apply_op(&mut self, op: Variable<K, V>) -> Result<(), Error> {
536        let section = self.current_section();
537        let encoded_op = op.encode();
538
539        // Create a future that updates the MMR.
540        let mmr_fut = async {
541            self.mmr.add_batched(&mut self.hasher, &encoded_op).await?;
542            Ok::<(), Error>(())
543        };
544
545        // Create a future that appends the operation to the log and writes the resulting offset
546        // locations.
547        let log_fut = async {
548            let (offset, _) = self.log.append(section, op).await?;
549            self.locations.append(offset).await?;
550            Ok::<(), Error>(())
551        };
552
553        // Run the 2 futures in parallel.
554        try_join!(mmr_fut, log_fut)?;
555        self.log_size += 1;
556
557        // Maintain invariant that all filled sections are synced and immutable.
558        if section != self.current_section() {
559            self.log.sync(section).await?;
560        }
561
562        Ok(())
563    }
564
565    /// Generate and return:
566    ///  1. a proof of all operations applied to the db in the range starting at (and including)
567    ///     location `start_loc`, and ending at the first of either:
568    ///     - the last operation performed, or
569    ///     - the operation `max_ops` from the start.
570    ///  2. the operations corresponding to the leaves in this range.
571    ///
572    /// # Warning
573    ///
574    /// Panics if there are uncommitted operations.
575    pub async fn proof(
576        &self,
577        start_index: u64,
578        max_ops: NonZeroU64,
579    ) -> Result<(Proof<H::Digest>, Vec<Variable<K, V>>), Error> {
580        self.historical_proof(self.op_count(), start_index, max_ops)
581            .await
582    }
583
584    /// Analogous to proof but with respect to the state of the MMR when it had `size` elements.
585    pub async fn historical_proof(
586        &self,
587        size: u64,
588        start_loc: u64,
589        max_ops: NonZeroU64,
590    ) -> Result<(Proof<H::Digest>, Vec<Variable<K, V>>), Error> {
591        if start_loc < self.oldest_retained_loc {
592            return Err(Error::OperationPruned(start_loc));
593        }
594
595        let start_pos = leaf_num_to_pos(start_loc);
596        let end_loc = std::cmp::min(size - 1, start_loc + max_ops.get() - 1);
597        let end_pos = leaf_num_to_pos(end_loc);
598        let mmr_size = leaf_num_to_pos(size);
599
600        let proof = self
601            .mmr
602            .historical_range_proof(mmr_size, start_pos, end_pos)
603            .await?;
604        let mut ops = Vec::with_capacity((end_loc - start_loc + 1) as usize);
605        for loc in start_loc..=end_loc {
606            let section = loc / self.log_items_per_section;
607            let offset = self.locations.read(loc).await?;
608            let op = self.log.get(section, offset).await?;
609            ops.push(op);
610        }
611
612        Ok((proof, ops))
613    }
614
615    /// Commit any pending operations to the database, ensuring their durability upon return from
616    /// this function. Caller can associate an arbitrary `metadata` value with the commit.
617    ///
618    /// Failures after commit (but before `sync` or `close`) may still require reprocessing to
619    /// recover the database on restart.
620    pub async fn commit(&mut self, metadata: Option<V>) -> Result<(), Error> {
621        self.last_commit = Some(self.log_size);
622        let op = Variable::<K, V>::Commit(metadata);
623        let encoded_op = op.encode();
624        let section = self.current_section();
625
626        // Create a future that updates the MMR.
627        let mmr_fut = async {
628            self.mmr.add_batched(&mut self.hasher, &encoded_op).await?;
629            self.mmr.process_updates(&mut self.hasher);
630            Ok::<(), Error>(())
631        };
632
633        // Create a future that appends the operation to the log, syncs it, and writes the resulting
634        // offset locations.
635        let log_fut = async {
636            let (offset, _) = self.log.append(section, op).await?;
637            // Sync the log and update locations in parallel.
638            try_join!(
639                self.log.sync(section).map_err(Error::Journal),
640                self.locations.append(offset).map_err(Error::Journal),
641            )?;
642            Ok::<(), Error>(())
643        };
644
645        // Run the 2 futures in parallel.
646        try_join!(mmr_fut, log_fut)?;
647        self.log_size += 1;
648
649        Ok(())
650    }
651
652    /// Get the location and metadata associated with the last commit, or None if no commit has been
653    /// made.
654    pub async fn get_metadata(&self) -> Result<Option<(u64, Option<V>)>, Error> {
655        let Some(last_commit) = self.last_commit else {
656            return Ok(None);
657        };
658        let section = last_commit / self.log_items_per_section;
659        let offset = self.locations.read(last_commit).await?;
660        let Variable::Commit(metadata) = self.log.get(section, offset).await? else {
661            unreachable!("no commit operation at location of last commit {last_commit}");
662        };
663
664        Ok(Some((last_commit, metadata)))
665    }
666
667    /// Sync all database state to disk. While this isn't necessary to ensure durability of
668    /// committed operations, periodic invocation may reduce memory usage and the time required to
669    /// recover the database on restart.
670    pub(super) async fn sync(&mut self) -> Result<(), Error> {
671        let section = self.current_section();
672        try_join!(
673            self.mmr.sync(&mut self.hasher).map_err(Error::Mmr),
674            self.log.sync(section).map_err(Error::Journal),
675            self.locations.sync().map_err(Error::Journal),
676        )?;
677
678        Ok(())
679    }
680
681    /// Close the db. Operations that have not been committed will be lost.
682    pub async fn close(mut self) -> Result<(), Error> {
683        try_join!(
684            self.log.close().map_err(Error::Journal),
685            self.mmr.close(&mut self.hasher).map_err(Error::Mmr),
686            self.locations.close().map_err(Error::Journal),
687        )?;
688
689        Ok(())
690    }
691
692    /// Destroy the db, removing all data from disk.
693    pub async fn destroy(self) -> Result<(), Error> {
694        try_join!(
695            self.log.destroy().map_err(Error::Journal),
696            self.mmr.destroy().map_err(Error::Mmr),
697            self.locations.destroy().map_err(Error::Journal),
698        )?;
699
700        Ok(())
701    }
702
703    /// Simulate a failed commit that successfully writes the log to the commit point, but without
704    /// fully committing the MMR's cached elements to trigger MMR node recovery on reopening.
705    #[cfg(test)]
706    pub async fn simulate_failed_commit_mmr(mut self, write_limit: usize) -> Result<(), Error>
707    where
708        V: Default,
709    {
710        self.apply_op(Variable::Commit(None)).await?;
711        self.log.close().await?;
712        self.locations.close().await?;
713        self.mmr
714            .simulate_partial_sync(&mut self.hasher, write_limit)
715            .await?;
716
717        Ok(())
718    }
719
720    /// Simulate a failed commit that successfully writes the MMR to the commit point, but without
721    /// fully committing the log, requiring rollback of the MMR and log upon reopening.
722    #[cfg(test)]
723    pub async fn simulate_failed_commit_log(mut self) -> Result<(), Error>
724    where
725        V: Default,
726    {
727        self.apply_op(Variable::Commit(None)).await?;
728        let mut section = self.current_section();
729
730        self.mmr.close(&mut self.hasher).await?;
731        // Rewind the operation log over the commit op to force rollback to the previous commit.
732        let mut size = self.log.size(section).await?;
733        if size == 0 {
734            section -= 1;
735            size = self.log.size(section).await?;
736        }
737        self.log.rewind(section, size - 1).await?;
738        self.log.close().await?;
739
740        Ok(())
741    }
742
743    /// Simulate a failed commit that successfully writes the log to the commit point, but without
744    /// fully committing the locations.
745    #[cfg(test)]
746    pub async fn simulate_failed_commit_locations(
747        mut self,
748        operations_to_trim: u64,
749    ) -> Result<(), Error>
750    where
751        V: Default,
752    {
753        self.apply_op(Variable::Commit(None)).await?;
754        let op_count = self.op_count();
755        assert!(op_count >= operations_to_trim);
756
757        self.log.close().await?;
758        self.mmr.close(&mut self.hasher).await?;
759        self.locations.rewind(op_count - operations_to_trim).await?;
760        self.locations.close().await?;
761
762        Ok(())
763    }
764}
765
766#[cfg(test)]
767pub(super) mod test {
768    use super::*;
769    use crate::{adb::verify_proof, mmr::mem::Mmr as MemMmr, translator::TwoCap};
770    use commonware_cryptography::{sha256::Digest, Sha256};
771    use commonware_macros::test_traced;
772    use commonware_runtime::{
773        deterministic::{self},
774        Runner as _,
775    };
776    use commonware_utils::{NZUsize, NZU64};
777
778    const PAGE_SIZE: usize = 77;
779    const PAGE_CACHE_SIZE: usize = 9;
780    const ITEMS_PER_SECTION: u64 = 5;
781
782    pub(crate) fn db_config(suffix: &str) -> Config<TwoCap, (commonware_codec::RangeCfg, ())> {
783        Config {
784            mmr_journal_partition: format!("journal_{suffix}"),
785            mmr_metadata_partition: format!("metadata_{suffix}"),
786            mmr_items_per_blob: NZU64!(11),
787            mmr_write_buffer: NZUsize!(1024),
788            log_journal_partition: format!("log_journal_{suffix}"),
789            log_items_per_section: NZU64!(ITEMS_PER_SECTION),
790            log_compression: None,
791            log_codec_config: ((0..=10000).into(), ()),
792            log_write_buffer: NZUsize!(1024),
793            locations_journal_partition: format!("locations_journal_{suffix}"),
794            locations_items_per_blob: NZU64!(7),
795            translator: TwoCap,
796            thread_pool: None,
797            buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
798        }
799    }
800
801    /// A type alias for the concrete [Immutable] type used in these unit tests.
802    type ImmutableTest = Immutable<deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap>;
803
804    /// Return an [Immutable] database initialized with a fixed config.
805    async fn open_db(context: deterministic::Context) -> ImmutableTest {
806        ImmutableTest::init(context, db_config("partition"))
807            .await
808            .unwrap()
809    }
810
811    #[test_traced("WARN")]
812    pub fn test_immutable_db_empty() {
813        let executor = deterministic::Runner::default();
814        executor.start(|context| async move {
815            let mut db = open_db(context.clone()).await;
816            let mut hasher = Standard::<Sha256>::new();
817            assert_eq!(db.op_count(), 0);
818            assert_eq!(db.oldest_retained_loc(), None);
819            assert_eq!(db.root(&mut hasher), MemMmr::default().root(&mut hasher));
820            assert!(db.get_metadata().await.unwrap().is_none());
821
822            // Make sure closing/reopening gets us back to the same state, even after adding an uncommitted op.
823            let k1 = Sha256::fill(1u8);
824            let v1 = vec![4, 5, 6, 7];
825            let root = db.root(&mut hasher);
826            db.set(k1, v1).await.unwrap();
827            db.close().await.unwrap();
828            let mut db = open_db(context.clone()).await;
829            assert_eq!(db.root(&mut hasher), root);
830            assert_eq!(db.op_count(), 0);
831
832            // Test calling commit on an empty db which should make it (durably) non-empty.
833            db.commit(None).await.unwrap();
834            assert_eq!(db.op_count(), 1); // commit op added
835            let root = db.root(&mut hasher);
836            db.close().await.unwrap();
837
838            let db = open_db(context.clone()).await;
839            assert_eq!(db.root(&mut hasher), root);
840
841            db.destroy().await.unwrap();
842        });
843    }
844
845    #[test_traced("DEBUG")]
846    pub fn test_immutable_db_build_basic() {
847        let executor = deterministic::Runner::default();
848        executor.start(|context| async move {
849            // Build a db with 2 keys.
850            let mut hasher = Standard::<Sha256>::new();
851            let mut db = open_db(context.clone()).await;
852
853            let k1 = Sha256::fill(1u8);
854            let k2 = Sha256::fill(2u8);
855            let v1 = vec![1, 2, 3];
856            let v2 = vec![4, 5, 6, 7, 8];
857
858            assert!(db.get(&k1).await.unwrap().is_none());
859            assert!(db.get(&k2).await.unwrap().is_none());
860
861            // Set the first key.
862            db.set(k1, v1.clone()).await.unwrap();
863            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
864            assert!(db.get(&k2).await.unwrap().is_none());
865            assert_eq!(db.op_count(), 1);
866            // Commit the first key.
867            let metadata = Some(vec![99, 100]);
868            db.commit(metadata.clone()).await.unwrap();
869            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
870            assert!(db.get(&k2).await.unwrap().is_none());
871            assert_eq!(db.op_count(), 2);
872            assert_eq!(
873                db.get_metadata().await.unwrap(),
874                Some((1, metadata.clone()))
875            );
876            // Set the second key.
877            db.set(k2, v2.clone()).await.unwrap();
878            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
879            assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
880            assert_eq!(db.op_count(), 3);
881
882            // Make sure we can still get metadata.
883            assert_eq!(db.get_metadata().await.unwrap(), Some((1, metadata)));
884
885            // Commit the second key.
886            db.commit(None).await.unwrap();
887            assert_eq!(db.op_count(), 4);
888            assert_eq!(db.get_metadata().await.unwrap(), Some((3, None)));
889
890            // Capture state.
891            let root = db.root(&mut hasher);
892
893            // Add an uncommitted op then close the db.
894            let k3 = Sha256::fill(3u8);
895            let v3 = vec![9, 10, 11];
896            db.set(k3, v3).await.unwrap();
897            assert_eq!(db.op_count(), 5);
898            assert_ne!(db.root(&mut hasher), root);
899
900            // Close & reopen, make sure state is restored to last commit point.
901            db.close().await.unwrap();
902            let db = open_db(context.clone()).await;
903            assert!(db.get(&k3).await.unwrap().is_none());
904            assert_eq!(db.op_count(), 4);
905            assert_eq!(db.root(&mut hasher), root);
906            assert_eq!(db.get_metadata().await.unwrap(), Some((3, None)));
907
908            // Cleanup.
909            db.destroy().await.unwrap();
910        });
911    }
912
913    #[test_traced("WARN")]
914    pub fn test_immutable_db_build_and_authenticate() {
915        let executor = deterministic::Runner::default();
916        // Build a db with `ELEMENTS` key/value pairs and prove ranges over them.
917        const ELEMENTS: u64 = 2_000;
918        executor.start(|context| async move {
919            let mut hasher = Standard::<Sha256>::new();
920            let mut db = open_db(context.clone()).await;
921
922            for i in 0u64..ELEMENTS {
923                let k = Sha256::hash(&i.to_be_bytes());
924                let v = vec![i as u8; 100];
925                db.set(k, v).await.unwrap();
926            }
927
928            assert_eq!(db.op_count(), ELEMENTS);
929
930            db.commit(None).await.unwrap();
931            assert_eq!(db.op_count(), ELEMENTS + 1);
932
933            // Close & reopen the db, making sure the re-opened db has exactly the same state.
934            let root = db.root(&mut hasher);
935            db.close().await.unwrap();
936            let db = open_db(context.clone()).await;
937            assert_eq!(root, db.root(&mut hasher));
938            assert_eq!(db.op_count(), ELEMENTS + 1);
939            for i in 0u64..ELEMENTS {
940                let k = Sha256::hash(&i.to_be_bytes());
941                let v = vec![i as u8; 100];
942                assert_eq!(db.get(&k).await.unwrap().unwrap(), v);
943            }
944
945            // Make sure all ranges of 5 operations are provable, including truncated ranges at the
946            // end.
947            let max_ops = NZU64!(5);
948            for i in 0..db.op_count() {
949                let (proof, log) = db.proof(i, max_ops).await.unwrap();
950                assert!(verify_proof(&mut hasher, &proof, i, &log, &root));
951            }
952
953            db.destroy().await.unwrap();
954        });
955    }
956
957    #[test_traced("WARN")]
958    pub fn test_immutable_db_recovery_from_failed_mmr_sync() {
959        let executor = deterministic::Runner::default();
960        executor.start(|context| async move {
961            // Insert 1000 keys then sync.
962            const ELEMENTS: u64 = 1000;
963            let mut hasher = Standard::<Sha256>::new();
964            let mut db = open_db(context.clone()).await;
965
966            for i in 0u64..ELEMENTS {
967                let k = Sha256::hash(&i.to_be_bytes());
968                let v = vec![i as u8; 100];
969                db.set(k, v).await.unwrap();
970            }
971
972            assert_eq!(db.op_count(), ELEMENTS);
973            db.sync().await.unwrap();
974            let halfway_root = db.root(&mut hasher);
975
976            // Insert another 1000 keys then simulate a failed close and test recovery.
977            for i in 0u64..ELEMENTS {
978                let k = Sha256::hash(&i.to_be_bytes());
979                let v = vec![i as u8; 100];
980                db.set(k, v).await.unwrap();
981            }
982
983            // We partially write only 101 of the cached MMR nodes to simulate a failure.
984            db.simulate_failed_commit_mmr(101).await.unwrap();
985
986            // Recovery should replay the log to regenerate the mmr.
987            let db = open_db(context.clone()).await;
988            assert_eq!(db.op_count(), 2001);
989            let root = db.root(&mut hasher);
990            assert_ne!(root, halfway_root);
991
992            // Close & reopen could preserve the final commit.
993            db.close().await.unwrap();
994            let db = open_db(context.clone()).await;
995            assert_eq!(db.op_count(), 2001);
996            assert_eq!(db.root(&mut hasher), root);
997
998            db.destroy().await.unwrap();
999        });
1000    }
1001
1002    #[test_traced("WARN")]
1003    pub fn test_immutable_db_recovery_from_failed_locations_sync() {
1004        let executor = deterministic::Runner::default();
1005        executor.start(|context| async move {
1006            // Insert 1000 keys then sync.
1007            const ELEMENTS: u64 = 1000;
1008            let mut hasher = Standard::<Sha256>::new();
1009            let mut db = open_db(context.clone()).await;
1010
1011            for i in 0u64..ELEMENTS {
1012                let k = Sha256::hash(&i.to_be_bytes());
1013                let v = vec![i as u8; 100];
1014                db.set(k, v).await.unwrap();
1015            }
1016
1017            assert_eq!(db.op_count(), ELEMENTS);
1018            db.sync().await.unwrap();
1019            let halfway_root = db.root(&mut hasher);
1020
1021            // Insert another 1000 keys then simulate a failed close and test recovery.
1022            for i in 0u64..ELEMENTS {
1023                let k = Sha256::hash(&i.to_be_bytes());
1024                let v = vec![i as u8; 100];
1025                db.set(k, v).await.unwrap();
1026            }
1027
1028            // Simulate failure to write the full locations map.
1029            db.simulate_failed_commit_locations(101).await.unwrap();
1030
1031            // Recovery should replay the log to regenerate the locations map.
1032            let db = open_db(context.clone()).await;
1033            assert_eq!(db.op_count(), 2001);
1034            let root = db.root(&mut hasher);
1035            assert_ne!(root, halfway_root);
1036
1037            db.destroy().await.unwrap();
1038        });
1039    }
1040
1041    #[test_traced("WARN")]
1042    pub fn test_immutable_db_recovery_from_failed_log_sync() {
1043        let executor = deterministic::Runner::default();
1044        executor.start(|context| async move {
1045            let mut hasher = Standard::<Sha256>::new();
1046            let mut db = open_db(context.clone()).await;
1047
1048            // Insert a single key and then commit to create a first commit point.
1049            let k1 = Sha256::fill(1u8);
1050            let v1 = vec![1, 2, 3];
1051            db.set(k1, v1).await.unwrap();
1052            db.commit(None).await.unwrap();
1053            let first_commit_root = db.root(&mut hasher);
1054
1055            // Insert 1000 keys then sync.
1056            const ELEMENTS: u64 = 1000;
1057
1058            for i in 0u64..ELEMENTS {
1059                let k = Sha256::hash(&i.to_be_bytes());
1060                let v = vec![i as u8; 100];
1061                db.set(k, v).await.unwrap();
1062            }
1063
1064            assert_eq!(db.op_count(), ELEMENTS + 2);
1065            db.sync().await.unwrap();
1066
1067            // Insert another 1000 keys then simulate a failed close and test recovery.
1068            for i in 0u64..ELEMENTS {
1069                let k = Sha256::hash(&i.to_be_bytes());
1070                let v = vec![i as u8; 100];
1071                db.set(k, v).await.unwrap();
1072            }
1073
1074            // Simulate failure to write the full locations map.
1075            db.simulate_failed_commit_log().await.unwrap();
1076
1077            // Recovery should back up to previous commit point.
1078            let db = open_db(context.clone()).await;
1079            assert_eq!(db.op_count(), 2);
1080            let root = db.root(&mut hasher);
1081            assert_eq!(root, first_commit_root);
1082
1083            db.destroy().await.unwrap();
1084        });
1085    }
1086
1087    #[test_traced("WARN")]
1088    pub fn test_immutable_db_pruning() {
1089        let executor = deterministic::Runner::default();
1090        // Build a db with `ELEMENTS` key/value pairs and prove ranges over them.
1091        const ELEMENTS: u64 = 2_000;
1092        executor.start(|context| async move {
1093            let mut hasher = Standard::<Sha256>::new();
1094            let mut db = open_db(context.clone()).await;
1095
1096            for i in 0u64..ELEMENTS {
1097                let k = Sha256::hash(&i.to_be_bytes());
1098                let v = vec![i as u8; 100];
1099                db.set(k, v).await.unwrap();
1100            }
1101
1102            assert_eq!(db.op_count(), ELEMENTS);
1103
1104            db.commit(None).await.unwrap();
1105            assert_eq!(db.op_count(), ELEMENTS + 1);
1106
1107            // Prune the db to the first half of the operations.
1108            db.prune(ELEMENTS / 2).await.unwrap();
1109            assert_eq!(db.op_count(), ELEMENTS + 1);
1110
1111            // items_per_section is 5, so half should be exactly at a blob boundary, in which case
1112            // the actual pruning location should match the requested.
1113            let oldest_retained_loc = db.oldest_retained_loc().unwrap();
1114            assert_eq!(oldest_retained_loc, ELEMENTS / 2);
1115
1116            // Try to fetch a pruned key.
1117            let pruned_loc = oldest_retained_loc - 1;
1118            let pruned_key = Sha256::hash(&pruned_loc.to_be_bytes());
1119            assert!(db.get(&pruned_key).await.unwrap().is_none());
1120
1121            // Try to fetch unpruned key.
1122            let unpruned_key = Sha256::hash(&oldest_retained_loc.to_be_bytes());
1123            assert!(db.get(&unpruned_key).await.unwrap().is_some());
1124
1125            // Close & reopen the db, making sure the re-opened db has exactly the same state.
1126            let root = db.root(&mut hasher);
1127            db.close().await.unwrap();
1128            let mut db = open_db(context.clone()).await;
1129            assert_eq!(root, db.root(&mut hasher));
1130            assert_eq!(db.op_count(), ELEMENTS + 1);
1131            let oldest_retained_loc = db.oldest_retained_loc().unwrap();
1132            assert_eq!(oldest_retained_loc, ELEMENTS / 2);
1133
1134            // Prune to a non-blob boundary.
1135            db.prune(ELEMENTS / 2 + (ITEMS_PER_SECTION * 2 - 1))
1136                .await
1137                .unwrap();
1138            // Actual boundary should be a multiple of 5.
1139            let oldest_retained_loc = db.oldest_retained_loc().unwrap();
1140            assert_eq!(oldest_retained_loc, ELEMENTS / 2 + ITEMS_PER_SECTION);
1141
1142            // Confirm boundary persists across restart.
1143            db.close().await.unwrap();
1144            let db = open_db(context.clone()).await;
1145            let oldest_retained_loc = db.oldest_retained_loc().unwrap();
1146            assert_eq!(oldest_retained_loc, ELEMENTS / 2 + ITEMS_PER_SECTION);
1147
1148            // Try to fetch a pruned key.
1149            let pruned_loc = oldest_retained_loc - 3;
1150            let pruned_key = Sha256::hash(&pruned_loc.to_be_bytes());
1151            assert!(db.get(&pruned_key).await.unwrap().is_none());
1152
1153            // Try to fetch unpruned key.
1154            let unpruned_key = Sha256::hash(&oldest_retained_loc.to_be_bytes());
1155            assert!(db.get(&unpruned_key).await.unwrap().is_some());
1156
1157            // Confirm behavior of trying to create a proof of pruned items is as expected.
1158            let pruned_pos = ELEMENTS / 2;
1159            let proof_result = db.proof(pruned_pos, NZU64!(pruned_pos + 100)).await;
1160            assert!(matches!(proof_result, Err(Error::OperationPruned(pos)) if pos == pruned_pos));
1161
1162            db.destroy().await.unwrap();
1163        });
1164    }
1165}