commonware_storage/qmdb/immutable/
mod.rs

1//! An authenticated database that only supports adding new keyed values (no updates or
2//! deletions), where values can have varying sizes.
3
4use crate::{
5    index::{unordered::Index, Unordered as _},
6    journal::{
7        authenticated,
8        contiguous::variable::{self, Config as JournalConfig},
9    },
10    mmr::{
11        journaled::{Config as MmrConfig, Mmr},
12        mem::{Clean, Dirty, State},
13        Location, Position, Proof, StandardHasher as Standard,
14    },
15    qmdb::{any::VariableValue, build_snapshot_from_log, Error},
16    translator::Translator,
17};
18use commonware_codec::Read;
19use commonware_cryptography::{DigestOf, Hasher as CHasher};
20use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage as RStorage, ThreadPool};
21use commonware_utils::Array;
22use std::num::{NonZeroU64, NonZeroUsize};
23use tracing::warn;
24
25mod operation;
26pub use operation::Operation;
27
28type Journal<E, K, V, H, S> =
29    authenticated::Journal<E, variable::Journal<E, Operation<K, V>>, H, S>;
30
31pub mod sync;
32
33/// Configuration for an [Immutable] authenticated db.
34#[derive(Clone)]
35pub struct Config<T: Translator, C> {
36    /// The name of the [RStorage] partition used for the MMR's backing journal.
37    pub mmr_journal_partition: String,
38
39    /// The items per blob configuration value used by the MMR journal.
40    pub mmr_items_per_blob: NonZeroU64,
41
42    /// The size of the write buffer to use for each blob in the MMR journal.
43    pub mmr_write_buffer: NonZeroUsize,
44
45    /// The name of the [RStorage] partition used for the MMR's metadata.
46    pub mmr_metadata_partition: String,
47
48    /// The name of the [RStorage] partition used to persist the log of operations.
49    pub log_partition: String,
50
51    /// The size of the write buffer to use for each blob in the log journal.
52    pub log_write_buffer: NonZeroUsize,
53
54    /// Optional compression level (using `zstd`) to apply to log data before storing.
55    pub log_compression: Option<u8>,
56
57    /// The codec configuration to use for encoding and decoding log items.
58    pub log_codec_config: C,
59
60    /// The number of items to put in each section of the journal.
61    pub log_items_per_section: NonZeroU64,
62
63    /// The translator used by the compressed index.
64    pub translator: T,
65
66    /// An optional thread pool to use for parallelizing batch operations.
67    pub thread_pool: Option<ThreadPool>,
68
69    /// The buffer pool to use for caching data.
70    pub buffer_pool: PoolRef,
71}
72
73/// An authenticated database that only supports adding new keyed values (no updates or
74/// deletions), where values can have varying sizes.
75pub struct Immutable<
76    E: RStorage + Clock + Metrics,
77    K: Array,
78    V: VariableValue,
79    H: CHasher,
80    T: Translator,
81    S: State<DigestOf<H>> = Clean<DigestOf<H>>,
82> {
83    /// Authenticated journal of operations.
84    journal: Journal<E, K, V, H, S>,
85
86    /// A map from each active key to the location of the operation that set its value.
87    ///
88    /// # Invariant
89    ///
90    /// Only references operations of type [Operation::Set].
91    snapshot: Index<T, Location>,
92
93    /// The location of the last commit operation.
94    last_commit_loc: Location,
95}
96
97impl<
98        E: RStorage + Clock + Metrics,
99        K: Array,
100        V: VariableValue,
101        H: CHasher,
102        T: Translator,
103        S: State<DigestOf<H>>,
104    > Immutable<E, K, V, H, T, S>
105{
106    /// Return the oldest location that remains retrievable.
107    pub fn oldest_retained_loc(&self) -> Location {
108        self.journal
109            .oldest_retained_loc()
110            .expect("at least one operation should exist")
111    }
112
113    /// Get the value of `key` in the db, or None if it has no value or its corresponding operation
114    /// has been pruned.
115    pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
116        let oldest = self.oldest_retained_loc();
117        let iter = self.snapshot.get(key);
118        for &loc in iter {
119            if loc < oldest {
120                continue;
121            }
122            if let Some(v) = self.get_from_loc(key, loc).await? {
123                return Ok(Some(v));
124            }
125        }
126
127        Ok(None)
128    }
129
130    /// Get the value of the operation with location `loc` in the db if it matches `key`. Returns
131    /// [Error::OperationPruned] if loc precedes the oldest retained location. The location is
132    /// otherwise assumed valid.
133    async fn get_from_loc(&self, key: &K, loc: Location) -> Result<Option<V>, Error> {
134        if loc < self.oldest_retained_loc() {
135            return Err(Error::OperationPruned(loc));
136        }
137
138        let Operation::Set(k, v) = self.journal.read(loc).await? else {
139            return Err(Error::UnexpectedData(loc));
140        };
141
142        if k != *key {
143            Ok(None)
144        } else {
145            Ok(Some(v))
146        }
147    }
148
149    /// Get the number of operations that have been applied to this db, including those that are not
150    /// yet committed.
151    pub fn op_count(&self) -> Location {
152        self.journal.size()
153    }
154
155    /// Get the metadata associated with the last commit.
156    pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
157        let last_commit_loc = self.last_commit_loc;
158        let Operation::Commit(metadata) = self.journal.read(last_commit_loc).await? else {
159            unreachable!("no commit operation at location of last commit {last_commit_loc}");
160        };
161
162        Ok(metadata)
163    }
164
165    /// Update the operations MMR with the given operation, and append the operation to the log. The
166    /// `commit` method must be called to make any applied operation persistent & recoverable.
167    pub(super) async fn apply_op(&mut self, op: Operation<K, V>) -> Result<(), Error> {
168        self.journal.append(op).await?;
169
170        Ok(())
171    }
172}
173
174impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
175    Immutable<E, K, V, H, T, Clean<H::Digest>>
176{
177    /// Returns an [Immutable] qmdb initialized from `cfg`. Any uncommitted log operations will be
178    /// discarded and the state of the db will be as of the last committed operation.
179    pub async fn init(
180        context: E,
181        cfg: Config<T, <Operation<K, V> as Read>::Cfg>,
182    ) -> Result<Self, Error> {
183        let mmr_cfg = MmrConfig {
184            journal_partition: cfg.mmr_journal_partition,
185            metadata_partition: cfg.mmr_metadata_partition,
186            items_per_blob: cfg.mmr_items_per_blob,
187            write_buffer: cfg.mmr_write_buffer,
188            thread_pool: cfg.thread_pool,
189            buffer_pool: cfg.buffer_pool.clone(),
190        };
191
192        let journal_cfg = JournalConfig {
193            partition: cfg.log_partition,
194            items_per_section: cfg.log_items_per_section,
195            compression: cfg.log_compression,
196            codec_config: cfg.log_codec_config,
197            buffer_pool: cfg.buffer_pool.clone(),
198            write_buffer: cfg.log_write_buffer,
199        };
200
201        let mut journal = Journal::new(
202            context.clone(),
203            mmr_cfg,
204            journal_cfg,
205            Operation::<K, V>::is_commit,
206        )
207        .await?;
208
209        if journal.size() == 0 {
210            warn!("Authenticated log is empty, initialized new db.");
211            journal.append(Operation::Commit(None)).await?;
212            journal.sync().await?;
213        }
214
215        let mut snapshot = Index::new(context.with_label("snapshot"), cfg.translator.clone());
216
217        // Get the start of the log.
218        let start_loc = journal.pruning_boundary();
219
220        // Build snapshot from the log.
221        build_snapshot_from_log(start_loc, &journal.journal, &mut snapshot, |_, _| {}).await?;
222
223        let last_commit_loc = journal.size().checked_sub(1).expect("commit should exist");
224
225        Ok(Self {
226            journal,
227            snapshot,
228            last_commit_loc,
229        })
230    }
231
232    /// The number of operations to apply to the MMR in a single batch.
233    const APPLY_BATCH_SIZE: u64 = 1 << 16;
234
235    /// Returns an [Immutable] built from the config and sync data in `cfg`.
236    #[allow(clippy::type_complexity)]
237    pub async fn init_synced(
238        context: E,
239        cfg: sync::Config<E, K, V, T, H::Digest, <Operation<K, V> as Read>::Cfg>,
240    ) -> Result<Self, Error> {
241        let mut hasher = Standard::new();
242
243        // Initialize MMR for sync
244        let mmr = Mmr::init_sync(
245            context.with_label("mmr"),
246            crate::mmr::journaled::SyncConfig {
247                config: MmrConfig {
248                    journal_partition: cfg.db_config.mmr_journal_partition,
249                    metadata_partition: cfg.db_config.mmr_metadata_partition,
250                    items_per_blob: cfg.db_config.mmr_items_per_blob,
251                    write_buffer: cfg.db_config.mmr_write_buffer,
252                    thread_pool: cfg.db_config.thread_pool.clone(),
253                    buffer_pool: cfg.db_config.buffer_pool.clone(),
254                },
255                range: Position::try_from(cfg.range.start)?
256                    ..Position::try_from(cfg.range.end.saturating_add(1))?,
257                pinned_nodes: cfg.pinned_nodes,
258            },
259            &mut hasher,
260        )
261        .await?;
262
263        let journal = Journal::<_, _, _, _, Clean<DigestOf<H>>>::from_components(
264            mmr,
265            cfg.log,
266            hasher,
267            Self::APPLY_BATCH_SIZE,
268        )
269        .await?;
270
271        let mut snapshot: Index<T, Location> = Index::new(
272            context.with_label("snapshot"),
273            cfg.db_config.translator.clone(),
274        );
275
276        // Get the start of the log.
277        let start_loc = journal.pruning_boundary();
278
279        // Build snapshot from the log
280        build_snapshot_from_log(start_loc, &journal.journal, &mut snapshot, |_, _| {}).await?;
281
282        let last_commit_loc = journal.size().checked_sub(1).expect("commit should exist");
283
284        let mut db = Self {
285            journal,
286            snapshot,
287            last_commit_loc,
288        };
289
290        db.sync().await?;
291        Ok(db)
292    }
293
294    /// Prune historical operations prior to `prune_loc`. This does not affect the db's root or
295    /// current snapshot.
296    ///
297    /// # Errors
298    ///
299    /// - Returns [Error::PruneBeyondMinRequired] if `prune_loc` > inactivity floor.
300    /// - Returns [crate::mmr::Error::LocationOverflow] if `prune_loc` > [crate::mmr::MAX_LOCATION].
301    pub async fn prune(&mut self, loc: Location) -> Result<(), Error> {
302        if loc > self.last_commit_loc {
303            return Err(Error::PruneBeyondMinRequired(loc, self.last_commit_loc));
304        }
305        self.journal.prune(loc).await?;
306
307        Ok(())
308    }
309
310    /// Sets `key` to have value `value`, assuming `key` hasn't already been assigned. The operation
311    /// is reflected in the snapshot, but will be subject to rollback until the next successful
312    /// `commit`. Attempting to set an already-set key results in undefined behavior.
313    ///
314    /// Any keys that have been pruned and map to the same translated key will be dropped
315    /// during this call.
316    pub async fn set(&mut self, key: K, value: V) -> Result<(), Error> {
317        let op_count = self.op_count();
318        let oldest = self.oldest_retained_loc();
319        self.snapshot
320            .insert_and_prune(&key, op_count, |v| *v < oldest);
321
322        let op = Operation::Set(key, value);
323        self.apply_op(op).await
324    }
325
326    /// Return the root of the db.
327    pub const fn root(&self) -> H::Digest {
328        self.journal.root()
329    }
330
331    /// Generate and return:
332    ///  1. a proof of all operations applied to the db in the range starting at (and including)
333    ///     location `start_loc`, and ending at the first of either:
334    ///     - the last operation performed, or
335    ///     - the operation `max_ops` from the start.
336    ///  2. the operations corresponding to the leaves in this range.
337    pub async fn proof(
338        &self,
339        start_index: Location,
340        max_ops: NonZeroU64,
341    ) -> Result<(Proof<H::Digest>, Vec<Operation<K, V>>), Error> {
342        let op_count = self.op_count();
343        self.historical_proof(op_count, start_index, max_ops).await
344    }
345
346    /// Analogous to proof but with respect to the state of the database when it had `op_count`
347    /// operations.
348    ///
349    /// # Errors
350    ///
351    /// Returns [crate::mmr::Error::LocationOverflow] if `op_count` or `start_loc` >
352    /// [crate::mmr::MAX_LOCATION].
353    /// Returns [crate::mmr::Error::RangeOutOfBounds] if `op_count` > number of operations, or
354    /// if `start_loc` >= `op_count`.
355    /// Returns [`Error::OperationPruned`] if `start_loc` has been pruned.
356    pub async fn historical_proof(
357        &self,
358        op_count: Location,
359        start_loc: Location,
360        max_ops: NonZeroU64,
361    ) -> Result<(Proof<H::Digest>, Vec<Operation<K, V>>), Error> {
362        Ok(self
363            .journal
364            .historical_proof(op_count, start_loc, max_ops)
365            .await?)
366    }
367
368    /// Commit any pending operations to the database, ensuring their durability upon return from
369    /// this function. Caller can associate an arbitrary `metadata` value with the commit.
370    ///
371    /// Failures after commit (but before `sync` or `close`) may still require reprocessing to
372    /// recover the database on restart.
373    pub async fn commit(&mut self, metadata: Option<V>) -> Result<(), Error> {
374        let loc = self.journal.append(Operation::Commit(metadata)).await?;
375        self.journal.commit().await?;
376        self.last_commit_loc = loc;
377
378        Ok(())
379    }
380
381    /// Sync all database state to disk. While this isn't necessary to ensure durability of
382    /// committed operations, periodic invocation may reduce memory usage and the time required to
383    /// recover the database on restart.
384    pub(super) async fn sync(&mut self) -> Result<(), Error> {
385        Ok(self.journal.sync().await?)
386    }
387
388    /// Close the db. Operations that have not been committed will be lost.
389    pub async fn close(self) -> Result<(), Error> {
390        Ok(self.journal.close().await?)
391    }
392
393    /// Destroy the db, removing all data from disk.
394    pub async fn destroy(self) -> Result<(), Error> {
395        Ok(self.journal.destroy().await?)
396    }
397
398    /// Convert this database into its dirty counterpart for batched updates.
399    pub fn into_dirty(self) -> Immutable<E, K, V, H, T, Dirty> {
400        Immutable {
401            journal: self.journal.into_dirty(),
402            snapshot: self.snapshot,
403            last_commit_loc: self.last_commit_loc,
404        }
405    }
406
407    /// Simulate a failed commit that successfully writes the log to the commit point, but without
408    /// fully committing the MMR's cached elements to trigger MMR node recovery on reopening.
409    #[cfg(test)]
410    pub async fn simulate_failed_commit_mmr(mut self, write_limit: usize) -> Result<(), Error>
411    where
412        V: Default,
413    {
414        self.apply_op(Operation::Commit(None)).await?;
415        self.journal.journal.close().await?;
416        self.journal.mmr.simulate_partial_sync(write_limit).await?;
417
418        Ok(())
419    }
420
421    /// Simulate a failed commit that successfully writes the MMR to the commit point, but without
422    /// fully committing the log, requiring rollback of the MMR and log upon reopening.
423    #[cfg(test)]
424    pub async fn simulate_failed_commit_log(mut self) -> Result<(), Error>
425    where
426        V: Default,
427    {
428        self.apply_op(Operation::Commit(None)).await?;
429        let log_size = self.journal.journal.size();
430
431        self.journal.mmr.close().await?;
432        // Rewind the operation log over the commit op to force rollback to the previous commit.
433        if log_size > 0 {
434            self.journal.journal.rewind(log_size - 1).await?;
435        }
436        self.journal.journal.close().await?;
437
438        Ok(())
439    }
440}
441
442impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
443    Immutable<E, K, V, H, T, Dirty>
444{
445    /// Merkleize the database and compute the root digest.
446    pub fn merkleize(self) -> Immutable<E, K, V, H, T, Clean<H::Digest>> {
447        Immutable {
448            journal: self.journal.merkleize(),
449            snapshot: self.snapshot,
450            last_commit_loc: self.last_commit_loc,
451        }
452    }
453}
454
455impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
456    crate::store::Store for Immutable<E, K, V, H, T, Clean<H::Digest>>
457{
458    type Key = K;
459    type Value = V;
460    type Error = Error;
461
462    async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
463        self.get(key).await
464    }
465}
466
467impl<
468        E: RStorage + Clock + Metrics,
469        K: Array,
470        V: VariableValue,
471        H: CHasher,
472        T: Translator,
473        S: State<DigestOf<H>>,
474    > crate::qmdb::store::LogStore for Immutable<E, K, V, H, T, S>
475{
476    type Value = V;
477
478    fn op_count(&self) -> Location {
479        self.op_count()
480    }
481
482    // All unpruned operations are active in an immutable store.
483    fn inactivity_floor_loc(&self) -> Location {
484        self.journal.pruning_boundary()
485    }
486
487    fn is_empty(&self) -> bool {
488        self.op_count() == 0
489    }
490
491    async fn get_metadata(&self) -> Result<Option<V>, Error> {
492        self.get_metadata().await
493    }
494}
495
496impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
497    crate::qmdb::store::CleanStore for Immutable<E, K, V, H, T, Clean<H::Digest>>
498{
499    type Digest = H::Digest;
500    type Operation = Operation<K, V>;
501    type Dirty = Immutable<E, K, V, H, T, Dirty>;
502
503    fn root(&self) -> Self::Digest {
504        self.root()
505    }
506
507    async fn proof(
508        &self,
509        start_loc: Location,
510        max_ops: NonZeroU64,
511    ) -> Result<(Proof<Self::Digest>, Vec<Self::Operation>), Error> {
512        self.proof(start_loc, max_ops).await
513    }
514
515    async fn historical_proof(
516        &self,
517        historical_size: Location,
518        start_loc: Location,
519        max_ops: NonZeroU64,
520    ) -> Result<(Proof<Self::Digest>, Vec<Self::Operation>), Error> {
521        self.historical_proof(historical_size, start_loc, max_ops)
522            .await
523    }
524
525    fn into_dirty(self) -> Self::Dirty {
526        self.into_dirty()
527    }
528}
529
530impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
531    crate::qmdb::store::DirtyStore for Immutable<E, K, V, H, T, Dirty>
532{
533    type Digest = H::Digest;
534    type Operation = Operation<K, V>;
535    type Clean = Immutable<E, K, V, H, T, Clean<H::Digest>>;
536
537    async fn merkleize(self) -> Result<Self::Clean, Error> {
538        Ok(self.merkleize())
539    }
540}
541
542#[cfg(test)]
543pub(super) mod test {
544    use super::*;
545    use crate::{qmdb::verify_proof, translator::TwoCap};
546    use commonware_cryptography::{sha256::Digest, Sha256};
547    use commonware_macros::test_traced;
548    use commonware_runtime::{
549        deterministic::{self},
550        Runner as _,
551    };
552    use commonware_utils::{NZUsize, NZU64};
553
554    const PAGE_SIZE: usize = 77;
555    const PAGE_CACHE_SIZE: usize = 9;
556    const ITEMS_PER_SECTION: u64 = 5;
557
558    pub(crate) fn db_config(
559        suffix: &str,
560    ) -> Config<TwoCap, (commonware_codec::RangeCfg<usize>, ())> {
561        Config {
562            mmr_journal_partition: format!("journal_{suffix}"),
563            mmr_metadata_partition: format!("metadata_{suffix}"),
564            mmr_items_per_blob: NZU64!(11),
565            mmr_write_buffer: NZUsize!(1024),
566            log_partition: format!("log_{suffix}"),
567            log_items_per_section: NZU64!(ITEMS_PER_SECTION),
568            log_compression: None,
569            log_codec_config: ((0..=10000).into(), ()),
570            log_write_buffer: NZUsize!(1024),
571            translator: TwoCap,
572            thread_pool: None,
573            buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
574        }
575    }
576
577    /// A type alias for the concrete [Immutable] type used in these unit tests.
578    type ImmutableTest = Immutable<deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap>;
579
580    /// Return an [Immutable] database initialized with a fixed config.
581    async fn open_db(context: deterministic::Context) -> ImmutableTest {
582        ImmutableTest::init(context, db_config("partition"))
583            .await
584            .unwrap()
585    }
586
587    #[test_traced("WARN")]
588    pub fn test_immutable_db_empty() {
589        let executor = deterministic::Runner::default();
590        executor.start(|context| async move {
591            let mut db = open_db(context.clone()).await;
592            assert_eq!(db.op_count(), 1);
593            assert_eq!(db.oldest_retained_loc(), Location::new_unchecked(0));
594            assert!(db.get_metadata().await.unwrap().is_none());
595
596            // Make sure closing/reopening gets us back to the same state, even after adding an uncommitted op.
597            let k1 = Sha256::fill(1u8);
598            let v1 = vec![4, 5, 6, 7];
599            let root = db.root();
600            db.set(k1, v1).await.unwrap();
601            db.close().await.unwrap();
602            let mut db = open_db(context.clone()).await;
603            assert_eq!(db.root(), root);
604            assert_eq!(db.op_count(), 1);
605
606            // Test calling commit on an empty db which should make it (durably) non-empty.
607            db.commit(None).await.unwrap();
608            assert_eq!(db.op_count(), 2); // commit op added
609            let root = db.root();
610            db.close().await.unwrap();
611
612            let db = open_db(context.clone()).await;
613            assert_eq!(db.root(), root);
614
615            db.destroy().await.unwrap();
616        });
617    }
618
619    #[test_traced("DEBUG")]
620    pub fn test_immutable_db_build_basic() {
621        let executor = deterministic::Runner::default();
622        executor.start(|context| async move {
623            // Build a db with 2 keys.
624            let mut db = open_db(context.clone()).await;
625
626            let k1 = Sha256::fill(1u8);
627            let k2 = Sha256::fill(2u8);
628            let v1 = vec![1, 2, 3];
629            let v2 = vec![4, 5, 6, 7, 8];
630
631            assert!(db.get(&k1).await.unwrap().is_none());
632            assert!(db.get(&k2).await.unwrap().is_none());
633
634            // Set the first key.
635            db.set(k1, v1.clone()).await.unwrap();
636            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
637            assert!(db.get(&k2).await.unwrap().is_none());
638            assert_eq!(db.op_count(), 2);
639            // Commit the first key.
640            let metadata = Some(vec![99, 100]);
641            db.commit(metadata.clone()).await.unwrap();
642            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
643            assert!(db.get(&k2).await.unwrap().is_none());
644            assert_eq!(db.op_count(), 3);
645            assert_eq!(db.get_metadata().await.unwrap(), metadata.clone());
646            // Set the second key.
647            db.set(k2, v2.clone()).await.unwrap();
648            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
649            assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
650            assert_eq!(db.op_count(), 4);
651
652            // Make sure we can still get metadata.
653            assert_eq!(db.get_metadata().await.unwrap(), metadata);
654
655            // Commit the second key.
656            db.commit(None).await.unwrap();
657            assert_eq!(db.op_count(), 5);
658            assert_eq!(db.get_metadata().await.unwrap(), None);
659
660            // Capture state.
661            let root = db.root();
662
663            // Add an uncommitted op then close the db.
664            let k3 = Sha256::fill(3u8);
665            let v3 = vec![9, 10, 11];
666            db.set(k3, v3).await.unwrap();
667            assert_eq!(db.op_count(), 6);
668            assert_ne!(db.root(), root);
669
670            // Close & reopen, make sure state is restored to last commit point.
671            db.close().await.unwrap();
672            let db = open_db(context.clone()).await;
673            assert!(db.get(&k3).await.unwrap().is_none());
674            assert_eq!(db.op_count(), 5);
675            assert_eq!(db.root(), root);
676            assert_eq!(db.get_metadata().await.unwrap(), None);
677
678            // Cleanup.
679            db.destroy().await.unwrap();
680        });
681    }
682
683    #[test_traced("WARN")]
684    pub fn test_immutable_db_build_and_authenticate() {
685        let executor = deterministic::Runner::default();
686        // Build a db with `ELEMENTS` key/value pairs and prove ranges over them.
687        const ELEMENTS: u64 = 2_000;
688        executor.start(|context| async move {
689            let mut hasher = Standard::<Sha256>::new();
690            let mut db = open_db(context.clone()).await;
691
692            for i in 0u64..ELEMENTS {
693                let k = Sha256::hash(&i.to_be_bytes());
694                let v = vec![i as u8; 100];
695                db.set(k, v).await.unwrap();
696            }
697
698            assert_eq!(db.op_count(), ELEMENTS + 1);
699
700            db.commit(None).await.unwrap();
701            assert_eq!(db.op_count(), ELEMENTS + 2);
702
703            // Close & reopen the db, making sure the re-opened db has exactly the same state.
704            let root = db.root();
705            db.close().await.unwrap();
706            let db = open_db(context.clone()).await;
707            assert_eq!(root, db.root());
708            assert_eq!(db.op_count(), ELEMENTS + 2);
709            for i in 0u64..ELEMENTS {
710                let k = Sha256::hash(&i.to_be_bytes());
711                let v = vec![i as u8; 100];
712                assert_eq!(db.get(&k).await.unwrap().unwrap(), v);
713            }
714
715            // Make sure all ranges of 5 operations are provable, including truncated ranges at the
716            // end.
717            let max_ops = NZU64!(5);
718            for i in 0..*db.op_count() {
719                let (proof, log) = db.proof(Location::new_unchecked(i), max_ops).await.unwrap();
720                assert!(verify_proof(
721                    &mut hasher,
722                    &proof,
723                    Location::new_unchecked(i),
724                    &log,
725                    &root
726                ));
727            }
728
729            db.destroy().await.unwrap();
730        });
731    }
732
733    #[test_traced("WARN")]
734    pub fn test_immutable_db_recovery_from_failed_mmr_sync() {
735        let executor = deterministic::Runner::default();
736        executor.start(|context| async move {
737            // Insert 1000 keys then sync.
738            const ELEMENTS: u64 = 1000;
739            let mut db = open_db(context.clone()).await;
740
741            for i in 0u64..ELEMENTS {
742                let k = Sha256::hash(&i.to_be_bytes());
743                let v = vec![i as u8; 100];
744                db.set(k, v).await.unwrap();
745            }
746
747            assert_eq!(db.op_count(), ELEMENTS + 1);
748            db.sync().await.unwrap();
749            let halfway_root = db.root();
750
751            // Insert another 1000 keys then simulate a failed close and test recovery.
752            for i in 0u64..ELEMENTS {
753                let k = Sha256::hash(&i.to_be_bytes());
754                let v = vec![i as u8; 100];
755                db.set(k, v).await.unwrap();
756            }
757
758            // We partially write only 101 of the cached MMR nodes to simulate a failure.
759            db.simulate_failed_commit_mmr(101).await.unwrap();
760
761            // Recovery should replay the log to regenerate the mmr.
762            let db = open_db(context.clone()).await;
763            assert_eq!(db.op_count(), 2002);
764            let root = db.root();
765            assert_ne!(root, halfway_root);
766
767            // Close & reopen could preserve the final commit.
768            db.close().await.unwrap();
769            let db = open_db(context.clone()).await;
770            assert_eq!(db.op_count(), 2002);
771            assert_eq!(db.root(), root);
772
773            db.destroy().await.unwrap();
774        });
775    }
776
777    #[test_traced("WARN")]
778    pub fn test_immutable_db_recovery_from_failed_log_sync() {
779        let executor = deterministic::Runner::default();
780        executor.start(|context| async move {
781            let mut db = open_db(context.clone()).await;
782
783            // Insert a single key and then commit to create a first commit point.
784            let k1 = Sha256::fill(1u8);
785            let v1 = vec![1, 2, 3];
786            db.set(k1, v1).await.unwrap();
787            db.commit(None).await.unwrap();
788            let first_commit_root = db.root();
789
790            // Insert 1000 keys then sync.
791            const ELEMENTS: u64 = 1000;
792
793            for i in 0u64..ELEMENTS {
794                let k = Sha256::hash(&i.to_be_bytes());
795                let v = vec![i as u8; 100];
796                db.set(k, v).await.unwrap();
797            }
798
799            assert_eq!(db.op_count(), ELEMENTS + 3);
800            db.sync().await.unwrap();
801
802            // Insert another 1000 keys then simulate a failed close and test recovery.
803            for i in 0u64..ELEMENTS {
804                let k = Sha256::hash(&i.to_be_bytes());
805                let v = vec![i as u8; 100];
806                db.set(k, v).await.unwrap();
807            }
808
809            // Simulate failure to write the full locations map.
810            db.simulate_failed_commit_log().await.unwrap();
811
812            // Recovery should back up to previous commit point.
813            let db = open_db(context.clone()).await;
814            assert_eq!(db.op_count(), 3);
815            let root = db.root();
816            assert_eq!(root, first_commit_root);
817
818            db.destroy().await.unwrap();
819        });
820    }
821
822    #[test_traced("WARN")]
823    pub fn test_immutable_db_pruning() {
824        let executor = deterministic::Runner::default();
825        // Build a db with `ELEMENTS` key/value pairs then prune some of them.
826        const ELEMENTS: u64 = 2_000;
827        executor.start(|context| async move {
828            let mut db = open_db(context.clone()).await;
829
830            for i in 1u64..ELEMENTS+1 {
831                let k = Sha256::hash(&i.to_be_bytes());
832                let v = vec![i as u8; 100];
833                db.set(k, v).await.unwrap();
834            }
835
836            assert_eq!(db.op_count(), ELEMENTS + 1);
837
838            db.commit(None).await.unwrap();
839            assert_eq!(db.op_count(), ELEMENTS + 2);
840
841            // Prune the db to the first half of the operations.
842            db.prune(Location::new_unchecked((ELEMENTS+2) / 2))
843                .await
844                .unwrap();
845            assert_eq!(db.op_count(), ELEMENTS + 2);
846
847            // items_per_section is 5, so half should be exactly at a blob boundary, in which case
848            // the actual pruning location should match the requested.
849            let oldest_retained_loc = db.oldest_retained_loc();
850            assert_eq!(oldest_retained_loc, Location::new_unchecked(ELEMENTS / 2));
851
852            // Try to fetch a pruned key.
853            let pruned_loc = oldest_retained_loc - 1;
854            let pruned_key = Sha256::hash(&pruned_loc.to_be_bytes());
855            assert!(db.get(&pruned_key).await.unwrap().is_none());
856
857            // Try to fetch unpruned key.
858            let unpruned_key = Sha256::hash(&oldest_retained_loc.to_be_bytes());
859            assert!(db.get(&unpruned_key).await.unwrap().is_some());
860
861            // Close & reopen the db, making sure the re-opened db has exactly the same state.
862            let root = db.root();
863            db.close().await.unwrap();
864            let mut db = open_db(context.clone()).await;
865            assert_eq!(root, db.root());
866            assert_eq!(db.op_count(), ELEMENTS + 2);
867            let oldest_retained_loc = db.oldest_retained_loc();
868            assert_eq!(oldest_retained_loc, Location::new_unchecked(ELEMENTS / 2));
869
870            // Prune to a non-blob boundary.
871            let loc = Location::new_unchecked(ELEMENTS / 2 + (ITEMS_PER_SECTION * 2 - 1));
872            db.prune(loc).await.unwrap();
873            // Actual boundary should be a multiple of 5.
874            let oldest_retained_loc = db.oldest_retained_loc();
875            assert_eq!(
876                oldest_retained_loc,
877                Location::new_unchecked(ELEMENTS / 2 + ITEMS_PER_SECTION)
878            );
879
880            // Confirm boundary persists across restart.
881            db.close().await.unwrap();
882            let db = open_db(context.clone()).await;
883            let oldest_retained_loc = db.oldest_retained_loc();
884            assert_eq!(
885                oldest_retained_loc,
886                Location::new_unchecked(ELEMENTS / 2 + ITEMS_PER_SECTION)
887            );
888
889            // Try to fetch a pruned key.
890            let pruned_loc = oldest_retained_loc - 3;
891            let pruned_key = Sha256::hash(&pruned_loc.to_be_bytes());
892            assert!(db.get(&pruned_key).await.unwrap().is_none());
893
894            // Try to fetch unpruned key.
895            let unpruned_key = Sha256::hash(&oldest_retained_loc.to_be_bytes());
896            assert!(db.get(&unpruned_key).await.unwrap().is_some());
897
898            // Confirm behavior of trying to create a proof of pruned items is as expected.
899            let pruned_pos = ELEMENTS / 2;
900            let proof_result = db
901                .proof(
902                    Location::new_unchecked(pruned_pos),
903                    NZU64!(pruned_pos + 100),
904                )
905                .await;
906            assert!(matches!(proof_result, Err(Error::Journal(crate::journal::Error::ItemPruned(pos))) if pos == pruned_pos));
907
908            db.destroy().await.unwrap();
909        });
910    }
911
912    #[test_traced("INFO")]
913    pub fn test_immutable_db_prune_beyond_commit() {
914        let executor = deterministic::Runner::default();
915        executor.start(|context| async move {
916            let mut db = open_db(context.clone()).await;
917
918            // Test pruning empty database (no commits)
919            let result = db.prune(Location::new_unchecked(1)).await;
920            assert!(
921                matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
922                    if prune_loc == Location::new_unchecked(1) && commit_loc == Location::new_unchecked(0))
923            );
924
925            // Add key-value pairs and commit
926            let k1 = Digest::from(*b"12345678901234567890123456789012");
927            let k2 = Digest::from(*b"abcdefghijklmnopqrstuvwxyz123456");
928            let k3 = Digest::from(*b"99999999999999999999999999999999");
929            let v1 = vec![1u8; 16];
930            let v2 = vec![2u8; 16];
931            let v3 = vec![3u8; 16];
932
933            db.set(k1, v1.clone()).await.unwrap();
934            db.set(k2, v2.clone()).await.unwrap();
935            db.commit(None).await.unwrap();
936            db.set(k3, v3.clone()).await.unwrap();
937
938            // op_count is 4 (k1, k2, commit, k3), last_commit is at location 2
939            assert_eq!(*db.last_commit_loc, 3);
940
941            // Test valid prune (at last commit)
942            assert!(db.prune(db.last_commit_loc).await.is_ok());
943
944            // Add more and commit again
945            db.commit(None).await.unwrap();
946            let new_last_commit = db.last_commit_loc;
947
948            // Test pruning beyond last commit
949            let beyond = new_last_commit + 1;
950            let result = db.prune(beyond).await;
951            assert!(
952                matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
953                    if prune_loc == beyond && commit_loc == new_last_commit)
954            );
955
956            db.destroy().await.unwrap();
957        });
958    }
959}