commonware_storage/adb/immutable/
mod.rs

1//! An authenticated database (ADB) that only supports adding new keyed values (no updates or
2//! deletions), where values can have varying sizes.
3
4use crate::{
5    adb::{operation::variable::Operation, Error},
6    index::{Index as _, Unordered as Index},
7    journal::contiguous::variable,
8    mmr::{
9        journaled::{Config as MmrConfig, Mmr},
10        Location, Position, Proof, StandardHasher as Standard,
11    },
12    translator::Translator,
13};
14use commonware_codec::{Codec, Encode as _, Read};
15use commonware_cryptography::Hasher as CHasher;
16use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage as RStorage, ThreadPool};
17use commonware_utils::{Array, NZUsize};
18use futures::{future::TryFutureExt, pin_mut, try_join, StreamExt};
19use std::num::{NonZeroU64, NonZeroUsize};
20use tracing::{debug, warn};
21
22pub mod sync;
23
24/// The size of the read buffer to use for replaying the operations log when rebuilding the
25/// snapshot. The exact value does not impact performance significantly as long as it is large
26/// enough, so we don't make it configurable.
27const SNAPSHOT_READ_BUFFER_SIZE: usize = 1 << 16;
28
29/// Configuration for an [Immutable] authenticated db.
30#[derive(Clone)]
31pub struct Config<T: Translator, C> {
32    /// The name of the [RStorage] partition used for the MMR's backing journal.
33    pub mmr_journal_partition: String,
34
35    /// The items per blob configuration value used by the MMR journal.
36    pub mmr_items_per_blob: NonZeroU64,
37
38    /// The size of the write buffer to use for each blob in the MMR journal.
39    pub mmr_write_buffer: NonZeroUsize,
40
41    /// The name of the [RStorage] partition used for the MMR's metadata.
42    pub mmr_metadata_partition: String,
43
44    /// The name of the [RStorage] partition used to persist the log of operations.
45    pub log_partition: String,
46
47    /// The size of the write buffer to use for each blob in the log journal.
48    pub log_write_buffer: NonZeroUsize,
49
50    /// Optional compression level (using `zstd`) to apply to log data before storing.
51    pub log_compression: Option<u8>,
52
53    /// The codec configuration to use for encoding and decoding log items.
54    pub log_codec_config: C,
55
56    /// The number of items to put in each section of the journal.
57    pub log_items_per_section: NonZeroU64,
58
59    /// The translator used by the compressed index.
60    pub translator: T,
61
62    /// An optional thread pool to use for parallelizing batch operations.
63    pub thread_pool: Option<ThreadPool>,
64
65    /// The buffer pool to use for caching data.
66    pub buffer_pool: PoolRef,
67}
68
69/// An authenticatable key-value database based on an MMR that does not allow updates or deletions
70/// of previously set keys.
71pub struct Immutable<
72    E: RStorage + Clock + Metrics,
73    K: Array,
74    V: Codec + Send,
75    H: CHasher,
76    T: Translator,
77> {
78    /// An MMR over digests of the operations applied to the db.
79    ///
80    /// # Invariant
81    ///
82    /// The number of leaves in this MMR always equals the number of operations in the unpruned
83    /// `log`.
84    mmr: Mmr<E, H>,
85
86    /// A log of all operations applied to the db in order of occurrence. The _location_ of an
87    /// operation is its position in this log, and corresponds to its leaf number in the MMR.
88    log: variable::Journal<E, Operation<K, V>>,
89
90    /// A map from each active key to the location of the operation that set its value.
91    ///
92    /// # Invariant
93    ///
94    /// Only references operations of type [Operation::Set].
95    snapshot: Index<T, Location>,
96
97    /// Cryptographic hasher to re-use within mutable operations requiring digest computation.
98    hasher: Standard<H>,
99
100    /// The location of the last commit operation, or None if no commit has been made.
101    last_commit: Option<Location>,
102}
103
104impl<E: RStorage + Clock + Metrics, K: Array, V: Codec + Send, H: CHasher, T: Translator>
105    Immutable<E, K, V, H, T>
106{
107    /// Returns an [Immutable] adb initialized from `cfg`. Any uncommitted log operations will be
108    /// discarded and the state of the db will be as of the last committed operation.
109    pub async fn init(
110        context: E,
111        cfg: Config<T, <Operation<K, V> as Read>::Cfg>,
112    ) -> Result<Self, Error> {
113        let mut hasher = Standard::<H>::new();
114
115        let mut mmr = Mmr::init(
116            context.with_label("mmr"),
117            &mut hasher,
118            MmrConfig {
119                journal_partition: cfg.mmr_journal_partition,
120                metadata_partition: cfg.mmr_metadata_partition,
121                items_per_blob: cfg.mmr_items_per_blob,
122                write_buffer: cfg.mmr_write_buffer,
123                thread_pool: cfg.thread_pool,
124                buffer_pool: cfg.buffer_pool.clone(),
125            },
126        )
127        .await?;
128
129        let mut log = variable::Journal::init(
130            context.with_label("log"),
131            variable::Config {
132                partition: cfg.log_partition.clone(),
133                items_per_section: cfg.log_items_per_section,
134                compression: cfg.log_compression,
135                codec_config: cfg.log_codec_config.clone(),
136                buffer_pool: cfg.buffer_pool.clone(),
137                write_buffer: cfg.log_write_buffer,
138            },
139        )
140        .await?;
141
142        // Build snapshot from the log
143        let mut snapshot = Index::init(context.with_label("snapshot"), cfg.translator.clone());
144        let log_size =
145            Self::build_snapshot_from_log(&mut hasher, &mut mmr, &mut log, &mut snapshot).await?;
146
147        let last_commit = log_size.checked_sub(1);
148
149        Ok(Immutable {
150            mmr,
151            log,
152            snapshot,
153            hasher,
154            last_commit,
155        })
156    }
157
158    /// Returns an [Immutable] built from the config and sync data in `cfg`.
159    #[allow(clippy::type_complexity)]
160    pub async fn init_synced(
161        context: E,
162        mut cfg: sync::Config<E, K, V, T, H::Digest, <Operation<K, V> as Read>::Cfg>,
163    ) -> Result<Self, Error> {
164        // Initialize MMR for sync
165        let mut mmr = Mmr::init_sync(
166            context.with_label("mmr"),
167            crate::mmr::journaled::SyncConfig {
168                config: MmrConfig {
169                    journal_partition: cfg.db_config.mmr_journal_partition,
170                    metadata_partition: cfg.db_config.mmr_metadata_partition,
171                    items_per_blob: cfg.db_config.mmr_items_per_blob,
172                    write_buffer: cfg.db_config.mmr_write_buffer,
173                    thread_pool: cfg.db_config.thread_pool.clone(),
174                    buffer_pool: cfg.db_config.buffer_pool.clone(),
175                },
176                range: Position::try_from(cfg.range.start)?
177                    ..Position::try_from(cfg.range.end.saturating_add(1))?,
178                pinned_nodes: cfg.pinned_nodes,
179            },
180        )
181        .await?;
182
183        // Build snapshot from the log
184        let mut snapshot = Index::init(
185            context.with_label("snapshot"),
186            cfg.db_config.translator.clone(),
187        );
188        let log_size = Self::build_snapshot_from_log(
189            &mut Standard::<H>::new(),
190            &mut mmr,
191            &mut cfg.log,
192            &mut snapshot,
193        )
194        .await?;
195
196        let last_commit = log_size.checked_sub(1);
197
198        let mut db = Immutable {
199            mmr,
200            log: cfg.log,
201            snapshot,
202            hasher: Standard::<H>::new(),
203            last_commit,
204        };
205
206        db.sync().await?;
207        Ok(db)
208    }
209
210    /// Builds the database's snapshot by replaying the log from inception, while also:
211    ///   - trimming any uncommitted operations from the log,
212    ///   - adding log operations to the MMR if they are missing,
213    ///   - removing any elements from the MMR that don't remain in the log after trimming.
214    ///
215    /// Returns the number of operations in the log.
216    ///
217    /// # Post-condition
218    ///
219    /// The number of operations in the log and the number of leaves in the MMR are equal.
220    pub(super) async fn build_snapshot_from_log(
221        hasher: &mut Standard<H>,
222        mmr: &mut Mmr<E, H>,
223        log: &mut variable::Journal<E, Operation<K, V>>,
224        snapshot: &mut Index<T, Location>,
225    ) -> Result<Location, Error> {
226        // Get current MMR size
227        let mut mmr_leaves = mmr.leaves();
228
229        // Get the start location from the log.
230        let start_loc = match log.oldest_retained_pos() {
231            Some(loc) => loc,
232            None => log.size(),
233        };
234
235        // The number of operations in the log.
236        let mut log_size = Location::new_unchecked(start_loc);
237        // The location of the first operation to follow the last known commit point.
238        let mut after_last_commit = None;
239        // A list of uncommitted operations that must be rolled back, in order of their locations.
240        let mut uncommitted_ops = Vec::new();
241
242        // Replay the log from the start to build the snapshot, keeping track of any uncommitted
243        // operations that must be rolled back, and any log operations that need to be re-added to the MMR.
244        {
245            let stream = log
246                .replay(start_loc, NZUsize!(SNAPSHOT_READ_BUFFER_SIZE))
247                .await?;
248            pin_mut!(stream);
249            while let Some(result) = stream.next().await {
250                let (loc, op) = result?;
251
252                let loc = Location::new_unchecked(loc); // location of the current operation.
253                if after_last_commit.is_none() {
254                    after_last_commit = Some(loc);
255                }
256
257                log_size = loc + 1;
258
259                if log_size > mmr_leaves {
260                    debug!(?loc, "operation was missing from MMR");
261                    mmr.add(hasher, &op.encode()).await?;
262                    mmr_leaves += 1;
263                }
264                match op {
265                    Operation::Set(key, _) => {
266                        uncommitted_ops.push((key, loc));
267                    }
268                    Operation::Commit(_) => {
269                        for (key, loc) in uncommitted_ops.iter() {
270                            snapshot.insert(key, *loc);
271                        }
272                        uncommitted_ops.clear();
273                        after_last_commit = None;
274                    }
275                    _ => {
276                        unreachable!("unsupported operation at location {loc}");
277                    }
278                }
279            }
280        }
281
282        // Rewind the operations log if necessary.
283        if let Some(end_loc) = after_last_commit {
284            assert!(!uncommitted_ops.is_empty());
285            warn!(
286                op_count = uncommitted_ops.len(),
287                log_size = *end_loc,
288                "rewinding over uncommitted operations at end of log"
289            );
290            log.rewind(*end_loc).await.map_err(Error::Journal)?;
291            log.sync().await.map_err(Error::Journal)?;
292            log_size = end_loc;
293        }
294
295        // Pop any MMR elements that are ahead of the last log commit point.
296        if mmr_leaves > log_size {
297            let op_count = (*mmr_leaves - *log_size) as usize;
298            warn!(op_count, "popping uncommitted MMR operations");
299            mmr.pop(op_count).await?;
300        }
301
302        // Confirm post-conditions hold.
303        assert_eq!(log_size, Location::try_from(mmr.size()).unwrap());
304        assert_eq!(log_size, log.size());
305
306        Ok(log_size)
307    }
308
309    /// Return the oldest location that remains retrievable.
310    pub fn oldest_retained_loc(&self) -> Option<Location> {
311        self.log.oldest_retained_pos().map(Location::new_unchecked)
312    }
313
314    /// Prunes the db of up to all operations that have location less than `loc`. The actual number
315    /// pruned may be fewer than requested due to section boundaries.
316    ///
317    /// # Errors
318    ///
319    /// Returns [Error::PruneBeyondCommit] if `loc` is beyond the last commit point.
320    pub async fn prune(&mut self, loc: Location) -> Result<(), Error> {
321        let last_commit = self.last_commit.unwrap_or(Location::new_unchecked(0));
322        if loc > last_commit {
323            return Err(Error::PruneBeyondCommit(loc, last_commit));
324        }
325
326        // Prune the log up to the requested location. The log will prune at section boundaries,
327        // so the actual oldest retained location may be less than requested. We always prune the
328        // log first, and then prune the MMR based on the log's actual pruning boundary. This
329        // procedure ensures all log operations always have corresponding MMR entries, even in the
330        // event of failures, with no need for special recovery.
331        self.log.prune(*loc).await?;
332
333        // Get the oldest retained location based on what the log actually pruned.
334        let pruning_boundary = match self.oldest_retained_loc() {
335            Some(loc) => loc,
336            None => self.op_count(),
337        };
338
339        // Prune the MMR up to the oldest retained item in the log after pruning.
340        self.mmr
341            .prune_to_pos(&mut self.hasher, Position::try_from(pruning_boundary)?)
342            .await?;
343        Ok(())
344    }
345
346    /// Get the value of `key` in the db, or None if it has no value or its corresponding operation
347    /// has been pruned.
348    pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
349        let oldest = self
350            .oldest_retained_loc()
351            .unwrap_or(Location::new_unchecked(0));
352        let iter = self.snapshot.get(key);
353        for &loc in iter {
354            if loc < oldest {
355                continue;
356            }
357            if let Some(v) = self.get_from_loc(key, loc).await? {
358                return Ok(Some(v));
359            }
360        }
361
362        Ok(None)
363    }
364
365    /// Get the value of the operation with location `loc` in the db.
366    ///
367    /// # Errors
368    ///
369    /// Returns [Error::LocationOutOfBounds] if `loc >= op_count()`.
370    /// Returns [Error::OperationPruned] if `loc` precedes the oldest retained location.
371    pub async fn get_loc(&self, loc: Location) -> Result<Option<V>, Error> {
372        let op_count = self.op_count();
373        if loc >= op_count {
374            return Err(Error::LocationOutOfBounds(loc, op_count));
375        }
376        let pruning_boundary = match self.oldest_retained_loc() {
377            Some(oldest) => oldest,
378            None => self.op_count(),
379        };
380        if loc < pruning_boundary {
381            return Err(Error::OperationPruned(loc));
382        }
383
384        let op = self.log.read(*loc).await?;
385        Ok(op.into_value())
386    }
387
388    /// Get the value of the operation with location `loc` in the db if it matches `key`. Returns
389    /// [Error::OperationPruned] if loc precedes the oldest retained location. The location is
390    /// otherwise assumed valid.
391    pub async fn get_from_loc(&self, key: &K, loc: Location) -> Result<Option<V>, Error> {
392        let pruning_boundary = match self.oldest_retained_loc() {
393            Some(oldest) => oldest,
394            None => self.op_count(),
395        };
396        if loc < pruning_boundary {
397            return Err(Error::OperationPruned(loc));
398        }
399
400        let Operation::Set(k, v) = self.log.read(*loc).await? else {
401            return Err(Error::UnexpectedData(loc));
402        };
403
404        if k != *key {
405            Ok(None)
406        } else {
407            Ok(Some(v))
408        }
409    }
410
411    /// Get the number of operations that have been applied to this db, including those that are not
412    /// yet committed.
413    pub fn op_count(&self) -> Location {
414        Location::new_unchecked(self.log.size())
415    }
416
417    /// Sets `key` to have value `value`, assuming `key` hasn't already been assigned. The operation
418    /// is reflected in the snapshot, but will be subject to rollback until the next successful
419    /// `commit`. Attempting to set an already-set key results in undefined behavior.
420    ///
421    /// Any keys that have been pruned and map to the same translated key will be dropped
422    /// during this call.
423    pub async fn set(&mut self, key: K, value: V) -> Result<(), Error> {
424        let op_count = self.op_count();
425        let oldest = self
426            .oldest_retained_loc()
427            .unwrap_or(Location::new_unchecked(0));
428        self.snapshot
429            .insert_and_prune(&key, op_count, |v| *v < oldest);
430
431        let op = Operation::Set(key, value);
432        self.apply_op(op).await
433    }
434
435    /// Return the root of the db.
436    ///
437    /// # Warning
438    ///
439    /// Panics if there are uncommitted operations.
440    pub fn root(&self, hasher: &mut Standard<H>) -> H::Digest {
441        self.mmr.root(hasher)
442    }
443
444    /// Update the operations MMR with the given operation, and append the operation to the log. The
445    /// `commit` method must be called to make any applied operation persistent & recoverable.
446    pub(super) async fn apply_op(&mut self, op: Operation<K, V>) -> Result<(), Error> {
447        let encoded_op = op.encode();
448
449        // Create a future that updates the MMR.
450        let mmr_fut = async {
451            self.mmr.add_batched(&mut self.hasher, &encoded_op).await?;
452            Ok::<(), Error>(())
453        };
454
455        // Create a future that appends the operation to the log.
456        let log_fut = async {
457            self.log.append(op).await?;
458            Ok::<(), Error>(())
459        };
460
461        // Run the 2 futures in parallel.
462        try_join!(mmr_fut, log_fut)?;
463
464        Ok(())
465    }
466
467    /// Generate and return:
468    ///  1. a proof of all operations applied to the db in the range starting at (and including)
469    ///     location `start_loc`, and ending at the first of either:
470    ///     - the last operation performed, or
471    ///     - the operation `max_ops` from the start.
472    ///  2. the operations corresponding to the leaves in this range.
473    ///
474    /// # Warning
475    ///
476    /// Panics if there are uncommitted operations.
477    pub async fn proof(
478        &self,
479        start_index: Location,
480        max_ops: NonZeroU64,
481    ) -> Result<(Proof<H::Digest>, Vec<Operation<K, V>>), Error> {
482        let op_count = self.op_count();
483        self.historical_proof(op_count, start_index, max_ops).await
484    }
485
486    /// Analogous to proof but with respect to the state of the database when it had `op_count`
487    /// operations.
488    ///
489    /// # Errors
490    ///
491    /// Returns [crate::mmr::Error::LocationOverflow] if `op_count` or `start_loc` >
492    /// [crate::mmr::MAX_LOCATION].
493    /// Returns [crate::mmr::Error::RangeOutOfBounds] if `op_count` > number of operations, or
494    /// if `start_loc` >= `op_count`.
495    /// Returns [`Error::OperationPruned`] if `start_loc` has been pruned.
496    pub async fn historical_proof(
497        &self,
498        op_count: Location,
499        start_loc: Location,
500        max_ops: NonZeroU64,
501    ) -> Result<(Proof<H::Digest>, Vec<Operation<K, V>>), Error> {
502        let current_op_count = self.op_count();
503        if op_count > current_op_count {
504            return Err(crate::mmr::Error::RangeOutOfBounds(op_count).into());
505        }
506        if start_loc >= op_count {
507            return Err(crate::mmr::Error::RangeOutOfBounds(start_loc).into());
508        }
509        let pruning_boundary = match self.oldest_retained_loc() {
510            Some(oldest) => oldest,
511            None => self.op_count(),
512        };
513        if start_loc < pruning_boundary {
514            return Err(Error::OperationPruned(start_loc));
515        }
516
517        let mmr_size = Position::try_from(op_count)?;
518        let end_loc = std::cmp::min(op_count, start_loc.saturating_add(max_ops.get()));
519        let proof = self
520            .mmr
521            .historical_range_proof(mmr_size, start_loc..end_loc)
522            .await?;
523        let mut ops = Vec::with_capacity((*end_loc - *start_loc) as usize);
524        for loc in *start_loc..*end_loc {
525            let op = self.log.read(loc).await?;
526            ops.push(op);
527        }
528
529        Ok((proof, ops))
530    }
531
532    /// Commit any pending operations to the database, ensuring their durability upon return from
533    /// this function. Caller can associate an arbitrary `metadata` value with the commit.
534    ///
535    /// Failures after commit (but before `sync` or `close`) may still require reprocessing to
536    /// recover the database on restart.
537    pub async fn commit(&mut self, metadata: Option<V>) -> Result<(), Error> {
538        self.last_commit = Some(self.op_count());
539        let op = Operation::<K, V>::Commit(metadata);
540        let encoded_op = op.encode();
541
542        // Create a future that updates the MMR.
543        let mmr_fut = async {
544            self.mmr.add_batched(&mut self.hasher, &encoded_op).await?;
545            self.mmr.merkleize(&mut self.hasher);
546            Ok::<(), Error>(())
547        };
548
549        // Create a future that appends the operation to the log and syncs it.
550        let log_fut = async {
551            self.log.append(op).await?;
552            self.log.sync_data().await?;
553            Ok::<(), Error>(())
554        };
555
556        // Run the 2 futures in parallel.
557        try_join!(mmr_fut, log_fut)?;
558
559        Ok(())
560    }
561
562    /// Get the location and metadata associated with the last commit, or None if no commit has been
563    /// made.
564    pub async fn get_metadata(&self) -> Result<Option<(Location, Option<V>)>, Error> {
565        let Some(last_commit) = self.last_commit else {
566            return Ok(None);
567        };
568        let Operation::Commit(metadata) = self.log.read(*last_commit).await? else {
569            unreachable!("no commit operation at location of last commit {last_commit}");
570        };
571
572        Ok(Some((last_commit, metadata)))
573    }
574
575    /// Sync all database state to disk. While this isn't necessary to ensure durability of
576    /// committed operations, periodic invocation may reduce memory usage and the time required to
577    /// recover the database on restart.
578    pub(super) async fn sync(&mut self) -> Result<(), Error> {
579        try_join!(
580            self.mmr.sync(&mut self.hasher).map_err(Error::Mmr),
581            self.log.sync().map_err(Error::Journal),
582        )?;
583
584        Ok(())
585    }
586
587    /// Close the db. Operations that have not been committed will be lost.
588    pub async fn close(mut self) -> Result<(), Error> {
589        try_join!(
590            self.log.close().map_err(Error::Journal),
591            self.mmr.close(&mut self.hasher).map_err(Error::Mmr),
592        )?;
593
594        Ok(())
595    }
596
597    /// Destroy the db, removing all data from disk.
598    pub async fn destroy(self) -> Result<(), Error> {
599        try_join!(
600            self.log.destroy().map_err(Error::Journal),
601            self.mmr.destroy().map_err(Error::Mmr),
602        )?;
603
604        Ok(())
605    }
606
607    /// Simulate a failed commit that successfully writes the log to the commit point, but without
608    /// fully committing the MMR's cached elements to trigger MMR node recovery on reopening.
609    #[cfg(test)]
610    pub async fn simulate_failed_commit_mmr(mut self, write_limit: usize) -> Result<(), Error>
611    where
612        V: Default,
613    {
614        self.apply_op(Operation::Commit(None)).await?;
615        self.log.close().await?;
616        self.mmr
617            .simulate_partial_sync(&mut self.hasher, write_limit)
618            .await?;
619
620        Ok(())
621    }
622
623    /// Simulate a failed commit that successfully writes the MMR to the commit point, but without
624    /// fully committing the log, requiring rollback of the MMR and log upon reopening.
625    #[cfg(test)]
626    pub async fn simulate_failed_commit_log(mut self) -> Result<(), Error>
627    where
628        V: Default,
629    {
630        self.apply_op(Operation::Commit(None)).await?;
631        let log_size = self.log.size();
632
633        self.mmr.close(&mut self.hasher).await?;
634        // Rewind the operation log over the commit op to force rollback to the previous commit.
635        if log_size > 0 {
636            self.log.rewind(log_size - 1).await?;
637        }
638        self.log.close().await?;
639
640        Ok(())
641    }
642}
643
644#[cfg(test)]
645pub(super) mod test {
646    use super::*;
647    use crate::{adb::verify_proof, mmr::mem::Mmr as MemMmr, translator::TwoCap};
648    use commonware_cryptography::{sha256::Digest, Sha256};
649    use commonware_macros::test_traced;
650    use commonware_runtime::{
651        deterministic::{self},
652        Runner as _,
653    };
654    use commonware_utils::{NZUsize, NZU64};
655
656    const PAGE_SIZE: usize = 77;
657    const PAGE_CACHE_SIZE: usize = 9;
658    const ITEMS_PER_SECTION: u64 = 5;
659
660    pub(crate) fn db_config(
661        suffix: &str,
662    ) -> Config<TwoCap, (commonware_codec::RangeCfg<usize>, ())> {
663        Config {
664            mmr_journal_partition: format!("journal_{suffix}"),
665            mmr_metadata_partition: format!("metadata_{suffix}"),
666            mmr_items_per_blob: NZU64!(11),
667            mmr_write_buffer: NZUsize!(1024),
668            log_partition: format!("log_{suffix}"),
669            log_items_per_section: NZU64!(ITEMS_PER_SECTION),
670            log_compression: None,
671            log_codec_config: ((0..=10000).into(), ()),
672            log_write_buffer: NZUsize!(1024),
673            translator: TwoCap,
674            thread_pool: None,
675            buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
676        }
677    }
678
679    /// A type alias for the concrete [Immutable] type used in these unit tests.
680    type ImmutableTest = Immutable<deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap>;
681
682    /// Return an [Immutable] database initialized with a fixed config.
683    async fn open_db(context: deterministic::Context) -> ImmutableTest {
684        ImmutableTest::init(context, db_config("partition"))
685            .await
686            .unwrap()
687    }
688
689    #[test_traced("WARN")]
690    pub fn test_immutable_db_empty() {
691        let executor = deterministic::Runner::default();
692        executor.start(|context| async move {
693            let mut db = open_db(context.clone()).await;
694            let mut hasher = Standard::<Sha256>::new();
695            assert_eq!(db.op_count(), 0);
696            assert_eq!(db.oldest_retained_loc(), None);
697            assert_eq!(db.root(&mut hasher), MemMmr::default().root(&mut hasher));
698            assert!(db.get_metadata().await.unwrap().is_none());
699
700            // Make sure closing/reopening gets us back to the same state, even after adding an uncommitted op.
701            let k1 = Sha256::fill(1u8);
702            let v1 = vec![4, 5, 6, 7];
703            let root = db.root(&mut hasher);
704            db.set(k1, v1).await.unwrap();
705            db.close().await.unwrap();
706            let mut db = open_db(context.clone()).await;
707            assert_eq!(db.root(&mut hasher), root);
708            assert_eq!(db.op_count(), 0);
709
710            // Test calling commit on an empty db which should make it (durably) non-empty.
711            db.commit(None).await.unwrap();
712            assert_eq!(db.op_count(), 1); // commit op added
713            let root = db.root(&mut hasher);
714            db.close().await.unwrap();
715
716            let db = open_db(context.clone()).await;
717            assert_eq!(db.root(&mut hasher), root);
718
719            db.destroy().await.unwrap();
720        });
721    }
722
723    #[test_traced("DEBUG")]
724    pub fn test_immutable_db_build_basic() {
725        let executor = deterministic::Runner::default();
726        executor.start(|context| async move {
727            // Build a db with 2 keys.
728            let mut hasher = Standard::<Sha256>::new();
729            let mut db = open_db(context.clone()).await;
730
731            let k1 = Sha256::fill(1u8);
732            let k2 = Sha256::fill(2u8);
733            let v1 = vec![1, 2, 3];
734            let v2 = vec![4, 5, 6, 7, 8];
735
736            assert!(db.get(&k1).await.unwrap().is_none());
737            assert!(db.get(&k2).await.unwrap().is_none());
738
739            // Set the first key.
740            db.set(k1, v1.clone()).await.unwrap();
741            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
742            assert!(db.get(&k2).await.unwrap().is_none());
743            assert_eq!(db.op_count(), 1);
744            // Commit the first key.
745            let metadata = Some(vec![99, 100]);
746            db.commit(metadata.clone()).await.unwrap();
747            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
748            assert!(db.get(&k2).await.unwrap().is_none());
749            assert_eq!(db.op_count(), 2);
750            assert_eq!(
751                db.get_metadata().await.unwrap(),
752                Some((Location::new_unchecked(1), metadata.clone()))
753            );
754            // Set the second key.
755            db.set(k2, v2.clone()).await.unwrap();
756            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
757            assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
758            assert_eq!(db.op_count(), 3);
759
760            // Make sure we can still get metadata.
761            assert_eq!(
762                db.get_metadata().await.unwrap(),
763                Some((Location::new_unchecked(1), metadata))
764            );
765
766            // Commit the second key.
767            db.commit(None).await.unwrap();
768            assert_eq!(db.op_count(), 4);
769            assert_eq!(
770                db.get_metadata().await.unwrap(),
771                Some((Location::new_unchecked(3), None))
772            );
773
774            // Capture state.
775            let root = db.root(&mut hasher);
776
777            // Add an uncommitted op then close the db.
778            let k3 = Sha256::fill(3u8);
779            let v3 = vec![9, 10, 11];
780            db.set(k3, v3).await.unwrap();
781            assert_eq!(db.op_count(), 5);
782            assert_ne!(db.root(&mut hasher), root);
783
784            // Close & reopen, make sure state is restored to last commit point.
785            db.close().await.unwrap();
786            let db = open_db(context.clone()).await;
787            assert!(db.get(&k3).await.unwrap().is_none());
788            assert_eq!(db.op_count(), 4);
789            assert_eq!(db.root(&mut hasher), root);
790            assert_eq!(
791                db.get_metadata().await.unwrap(),
792                Some((Location::new_unchecked(3), None))
793            );
794
795            // Cleanup.
796            db.destroy().await.unwrap();
797        });
798    }
799
800    #[test_traced("WARN")]
801    pub fn test_immutable_db_build_and_authenticate() {
802        let executor = deterministic::Runner::default();
803        // Build a db with `ELEMENTS` key/value pairs and prove ranges over them.
804        const ELEMENTS: u64 = 2_000;
805        executor.start(|context| async move {
806            let mut hasher = Standard::<Sha256>::new();
807            let mut db = open_db(context.clone()).await;
808
809            for i in 0u64..ELEMENTS {
810                let k = Sha256::hash(&i.to_be_bytes());
811                let v = vec![i as u8; 100];
812                db.set(k, v).await.unwrap();
813            }
814
815            assert_eq!(db.op_count(), ELEMENTS);
816
817            db.commit(None).await.unwrap();
818            assert_eq!(db.op_count(), ELEMENTS + 1);
819
820            // Close & reopen the db, making sure the re-opened db has exactly the same state.
821            let root = db.root(&mut hasher);
822            db.close().await.unwrap();
823            let db = open_db(context.clone()).await;
824            assert_eq!(root, db.root(&mut hasher));
825            assert_eq!(db.op_count(), ELEMENTS + 1);
826            for i in 0u64..ELEMENTS {
827                let k = Sha256::hash(&i.to_be_bytes());
828                let v = vec![i as u8; 100];
829                assert_eq!(db.get(&k).await.unwrap().unwrap(), v);
830            }
831
832            // Make sure all ranges of 5 operations are provable, including truncated ranges at the
833            // end.
834            let max_ops = NZU64!(5);
835            for i in 0..*db.op_count() {
836                let (proof, log) = db.proof(Location::new_unchecked(i), max_ops).await.unwrap();
837                assert!(verify_proof(
838                    &mut hasher,
839                    &proof,
840                    Location::new_unchecked(i),
841                    &log,
842                    &root
843                ));
844            }
845
846            db.destroy().await.unwrap();
847        });
848    }
849
850    #[test_traced("WARN")]
851    pub fn test_immutable_db_recovery_from_failed_mmr_sync() {
852        let executor = deterministic::Runner::default();
853        executor.start(|context| async move {
854            // Insert 1000 keys then sync.
855            const ELEMENTS: u64 = 1000;
856            let mut hasher = Standard::<Sha256>::new();
857            let mut db = open_db(context.clone()).await;
858
859            for i in 0u64..ELEMENTS {
860                let k = Sha256::hash(&i.to_be_bytes());
861                let v = vec![i as u8; 100];
862                db.set(k, v).await.unwrap();
863            }
864
865            assert_eq!(db.op_count(), ELEMENTS);
866            db.sync().await.unwrap();
867            let halfway_root = db.root(&mut hasher);
868
869            // Insert another 1000 keys then simulate a failed close and test recovery.
870            for i in 0u64..ELEMENTS {
871                let k = Sha256::hash(&i.to_be_bytes());
872                let v = vec![i as u8; 100];
873                db.set(k, v).await.unwrap();
874            }
875
876            // We partially write only 101 of the cached MMR nodes to simulate a failure.
877            db.simulate_failed_commit_mmr(101).await.unwrap();
878
879            // Recovery should replay the log to regenerate the mmr.
880            let db = open_db(context.clone()).await;
881            assert_eq!(db.op_count(), 2001);
882            let root = db.root(&mut hasher);
883            assert_ne!(root, halfway_root);
884
885            // Close & reopen could preserve the final commit.
886            db.close().await.unwrap();
887            let db = open_db(context.clone()).await;
888            assert_eq!(db.op_count(), 2001);
889            assert_eq!(db.root(&mut hasher), root);
890
891            db.destroy().await.unwrap();
892        });
893    }
894
895    #[test_traced("WARN")]
896    pub fn test_immutable_db_recovery_from_failed_log_sync() {
897        let executor = deterministic::Runner::default();
898        executor.start(|context| async move {
899            let mut hasher = Standard::<Sha256>::new();
900            let mut db = open_db(context.clone()).await;
901
902            // Insert a single key and then commit to create a first commit point.
903            let k1 = Sha256::fill(1u8);
904            let v1 = vec![1, 2, 3];
905            db.set(k1, v1).await.unwrap();
906            db.commit(None).await.unwrap();
907            let first_commit_root = db.root(&mut hasher);
908
909            // Insert 1000 keys then sync.
910            const ELEMENTS: u64 = 1000;
911
912            for i in 0u64..ELEMENTS {
913                let k = Sha256::hash(&i.to_be_bytes());
914                let v = vec![i as u8; 100];
915                db.set(k, v).await.unwrap();
916            }
917
918            assert_eq!(db.op_count(), ELEMENTS + 2);
919            db.sync().await.unwrap();
920
921            // Insert another 1000 keys then simulate a failed close and test recovery.
922            for i in 0u64..ELEMENTS {
923                let k = Sha256::hash(&i.to_be_bytes());
924                let v = vec![i as u8; 100];
925                db.set(k, v).await.unwrap();
926            }
927
928            // Simulate failure to write the full locations map.
929            db.simulate_failed_commit_log().await.unwrap();
930
931            // Recovery should back up to previous commit point.
932            let db = open_db(context.clone()).await;
933            assert_eq!(db.op_count(), 2);
934            let root = db.root(&mut hasher);
935            assert_eq!(root, first_commit_root);
936
937            db.destroy().await.unwrap();
938        });
939    }
940
941    #[test_traced("WARN")]
942    pub fn test_immutable_db_pruning() {
943        let executor = deterministic::Runner::default();
944        // Build a db with `ELEMENTS` key/value pairs and prove ranges over them.
945        const ELEMENTS: u64 = 2_000;
946        executor.start(|context| async move {
947            let mut hasher = Standard::<Sha256>::new();
948            let mut db = open_db(context.clone()).await;
949
950            for i in 0u64..ELEMENTS {
951                let k = Sha256::hash(&i.to_be_bytes());
952                let v = vec![i as u8; 100];
953                db.set(k, v).await.unwrap();
954            }
955
956            assert_eq!(db.op_count(), ELEMENTS);
957
958            db.commit(None).await.unwrap();
959            assert_eq!(db.op_count(), ELEMENTS + 1);
960
961            // Prune the db to the first half of the operations.
962            db.prune(Location::new_unchecked(ELEMENTS / 2))
963                .await
964                .unwrap();
965            assert_eq!(db.op_count(), ELEMENTS + 1);
966
967            // items_per_section is 5, so half should be exactly at a blob boundary, in which case
968            // the actual pruning location should match the requested.
969            let oldest_retained_loc = db.oldest_retained_loc().unwrap();
970            assert_eq!(oldest_retained_loc, Location::new_unchecked(ELEMENTS / 2));
971
972            // Try to fetch a pruned key.
973            let pruned_loc = oldest_retained_loc - 1;
974            let pruned_key = Sha256::hash(&pruned_loc.to_be_bytes());
975            assert!(db.get(&pruned_key).await.unwrap().is_none());
976
977            // Try to fetch unpruned key.
978            let unpruned_key = Sha256::hash(&oldest_retained_loc.to_be_bytes());
979            assert!(db.get(&unpruned_key).await.unwrap().is_some());
980
981            // Close & reopen the db, making sure the re-opened db has exactly the same state.
982            let root = db.root(&mut hasher);
983            db.close().await.unwrap();
984            let mut db = open_db(context.clone()).await;
985            assert_eq!(root, db.root(&mut hasher));
986            assert_eq!(db.op_count(), ELEMENTS + 1);
987            let oldest_retained_loc = db.oldest_retained_loc().unwrap();
988            assert_eq!(oldest_retained_loc, Location::new_unchecked(ELEMENTS / 2));
989
990            // Prune to a non-blob boundary.
991            let loc = Location::new_unchecked(ELEMENTS / 2 + (ITEMS_PER_SECTION * 2 - 1));
992            db.prune(loc).await.unwrap();
993            // Actual boundary should be a multiple of 5.
994            let oldest_retained_loc = db.oldest_retained_loc().unwrap();
995            assert_eq!(
996                oldest_retained_loc,
997                Location::new_unchecked(ELEMENTS / 2 + ITEMS_PER_SECTION)
998            );
999
1000            // Confirm boundary persists across restart.
1001            db.close().await.unwrap();
1002            let db = open_db(context.clone()).await;
1003            let oldest_retained_loc = db.oldest_retained_loc().unwrap();
1004            assert_eq!(
1005                oldest_retained_loc,
1006                Location::new_unchecked(ELEMENTS / 2 + ITEMS_PER_SECTION)
1007            );
1008
1009            // Try to fetch a pruned key.
1010            let pruned_loc = oldest_retained_loc - 3;
1011            let pruned_key = Sha256::hash(&pruned_loc.to_be_bytes());
1012            assert!(db.get(&pruned_key).await.unwrap().is_none());
1013
1014            // Try to fetch unpruned key.
1015            let unpruned_key = Sha256::hash(&oldest_retained_loc.to_be_bytes());
1016            assert!(db.get(&unpruned_key).await.unwrap().is_some());
1017
1018            // Confirm behavior of trying to create a proof of pruned items is as expected.
1019            let pruned_pos = ELEMENTS / 2;
1020            let proof_result = db
1021                .proof(
1022                    Location::new_unchecked(pruned_pos),
1023                    NZU64!(pruned_pos + 100),
1024                )
1025                .await;
1026            assert!(matches!(proof_result, Err(Error::OperationPruned(pos)) if pos == pruned_pos));
1027
1028            db.destroy().await.unwrap();
1029        });
1030    }
1031
1032    #[test_traced("INFO")]
1033    pub fn test_immutable_db_get_loc_out_of_bounds() {
1034        let executor = deterministic::Runner::default();
1035        executor.start(|context| async move {
1036            let mut db = open_db(context.clone()).await;
1037
1038            // Test getting from empty database
1039            let result = db.get_loc(Location::new_unchecked(0)).await;
1040            assert!(matches!(result, Err(Error::LocationOutOfBounds(loc, size))
1041                    if loc == Location::new_unchecked(0) && size == Location::new_unchecked(0)));
1042
1043            // Add some key-value pairs
1044            let k1 = Digest::from(*b"12345678901234567890123456789012");
1045            let k2 = Digest::from(*b"abcdefghijklmnopqrstuvwxyz123456");
1046            let v1 = vec![1u8; 16];
1047            let v2 = vec![2u8; 16];
1048
1049            db.set(k1, v1.clone()).await.unwrap();
1050            db.set(k2, v2.clone()).await.unwrap();
1051            db.commit(None).await.unwrap();
1052
1053            // Test getting valid locations - should succeed
1054            assert_eq!(
1055                db.get_loc(Location::new_unchecked(0))
1056                    .await
1057                    .unwrap()
1058                    .unwrap(),
1059                v1
1060            );
1061            assert_eq!(
1062                db.get_loc(Location::new_unchecked(1))
1063                    .await
1064                    .unwrap()
1065                    .unwrap(),
1066                v2
1067            );
1068
1069            // Test getting out of bounds location (op_count is 3: k1, k2, commit)
1070            let result = db.get_loc(Location::new_unchecked(3)).await;
1071            assert!(matches!(result, Err(Error::LocationOutOfBounds(loc, size))
1072                    if loc == Location::new_unchecked(3) && size == Location::new_unchecked(3)));
1073
1074            db.destroy().await.unwrap();
1075        });
1076    }
1077
1078    #[test_traced("INFO")]
1079    pub fn test_immutable_db_prune_beyond_commit() {
1080        let executor = deterministic::Runner::default();
1081        executor.start(|context| async move {
1082            let mut db = open_db(context.clone()).await;
1083
1084            // Test pruning empty database (no commits)
1085            let result = db.prune(Location::new_unchecked(1)).await;
1086            assert!(
1087                matches!(result, Err(Error::PruneBeyondCommit(prune_loc, commit_loc))
1088                    if prune_loc == Location::new_unchecked(1) && commit_loc == Location::new_unchecked(0))
1089            );
1090
1091            // Add key-value pairs and commit
1092            let k1 = Digest::from(*b"12345678901234567890123456789012");
1093            let k2 = Digest::from(*b"abcdefghijklmnopqrstuvwxyz123456");
1094            let k3 = Digest::from(*b"99999999999999999999999999999999");
1095            let v1 = vec![1u8; 16];
1096            let v2 = vec![2u8; 16];
1097            let v3 = vec![3u8; 16];
1098
1099            db.set(k1, v1.clone()).await.unwrap();
1100            db.set(k2, v2.clone()).await.unwrap();
1101            db.commit(None).await.unwrap();
1102            db.set(k3, v3.clone()).await.unwrap();
1103
1104            // op_count is 4 (k1, k2, commit, k3), last_commit is at location 2
1105            let last_commit = db.last_commit.unwrap();
1106            assert_eq!(last_commit, Location::new_unchecked(2));
1107
1108            // Test valid prune (at last commit)
1109            assert!(db.prune(last_commit).await.is_ok());
1110
1111            // Add more and commit again
1112            db.commit(None).await.unwrap();
1113            let new_last_commit = db.last_commit.unwrap();
1114
1115            // Test pruning beyond last commit
1116            let beyond = Location::new_unchecked(*new_last_commit + 1);
1117            let result = db.prune(beyond).await;
1118            assert!(
1119                matches!(result, Err(Error::PruneBeyondCommit(prune_loc, commit_loc))
1120                    if prune_loc == beyond && commit_loc == new_last_commit)
1121            );
1122
1123            db.destroy().await.unwrap();
1124        });
1125    }
1126}