commonware_storage/qmdb/immutable/
mod.rs

1//! An authenticated database that only supports adding new keyed values (no updates or
2//! deletions), where values can have varying sizes.
3
4use crate::{
5    index::{unordered::Index, Unordered as _},
6    journal::{
7        authenticated,
8        contiguous::variable::{self, Config as JournalConfig},
9    },
10    kv,
11    mmr::{
12        journaled::{Config as MmrConfig, Mmr},
13        Location, Position, Proof, StandardHasher as Standard,
14    },
15    qmdb::{
16        any::VariableValue, build_snapshot_from_log, DurabilityState, Durable, Error,
17        MerkleizationState, Merkleized, NonDurable, Unmerkleized,
18    },
19    translator::Translator,
20};
21use commonware_codec::Read;
22use commonware_cryptography::{DigestOf, Hasher as CHasher};
23use commonware_parallel::ThreadPool;
24use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage as RStorage};
25use commonware_utils::Array;
26use std::{
27    num::{NonZeroU64, NonZeroUsize},
28    ops::Range,
29};
30use tracing::warn;
31
32mod operation;
33pub use operation::Operation;
34
35type Journal<E, K, V, H, S> =
36    authenticated::Journal<E, variable::Journal<E, Operation<K, V>>, H, S>;
37
38pub mod sync;
39
40/// Configuration for an [Immutable] authenticated db.
41#[derive(Clone)]
42pub struct Config<T: Translator, C> {
43    /// The name of the [RStorage] partition used for the MMR's backing journal.
44    pub mmr_journal_partition: String,
45
46    /// The items per blob configuration value used by the MMR journal.
47    pub mmr_items_per_blob: NonZeroU64,
48
49    /// The size of the write buffer to use for each blob in the MMR journal.
50    pub mmr_write_buffer: NonZeroUsize,
51
52    /// The name of the [RStorage] partition used for the MMR's metadata.
53    pub mmr_metadata_partition: String,
54
55    /// The name of the [RStorage] partition used to persist the log of operations.
56    pub log_partition: String,
57
58    /// The size of the write buffer to use for each blob in the log journal.
59    pub log_write_buffer: NonZeroUsize,
60
61    /// Optional compression level (using `zstd`) to apply to log data before storing.
62    pub log_compression: Option<u8>,
63
64    /// The codec configuration to use for encoding and decoding log items.
65    pub log_codec_config: C,
66
67    /// The number of items to put in each section of the journal.
68    pub log_items_per_section: NonZeroU64,
69
70    /// The translator used by the compressed index.
71    pub translator: T,
72
73    /// An optional thread pool to use for parallelizing batch operations.
74    pub thread_pool: Option<ThreadPool>,
75
76    /// The buffer pool to use for caching data.
77    pub buffer_pool: PoolRef,
78}
79
80/// An authenticated database that only supports adding new keyed values (no updates or
81/// deletions), where values can have varying sizes.
82pub struct Immutable<
83    E: RStorage + Clock + Metrics,
84    K: Array,
85    V: VariableValue,
86    H: CHasher,
87    T: Translator,
88    M: MerkleizationState<DigestOf<H>> + Send + Sync = Merkleized<H>,
89    D: DurabilityState = Durable,
90> {
91    /// Authenticated journal of operations.
92    journal: Journal<E, K, V, H, M>,
93
94    /// A map from each active key to the location of the operation that set its value.
95    ///
96    /// # Invariant
97    ///
98    /// Only references operations of type [Operation::Set].
99    snapshot: Index<T, Location>,
100
101    /// The location of the last commit operation.
102    last_commit_loc: Location,
103
104    /// Marker for the durability state.
105    _durable: core::marker::PhantomData<D>,
106}
107
108// Functionality shared across all DB states.
109impl<
110        E: RStorage + Clock + Metrics,
111        K: Array,
112        V: VariableValue,
113        H: CHasher,
114        T: Translator,
115        M: MerkleizationState<DigestOf<H>> + Send + Sync,
116        D: DurabilityState,
117    > Immutable<E, K, V, H, T, M, D>
118{
119    /// Return the oldest location that remains retrievable.
120    pub fn oldest_retained_loc(&self) -> Location {
121        self.journal
122            .oldest_retained_loc()
123            .expect("at least one operation should exist")
124    }
125
126    /// Get the value of `key` in the db, or None if it has no value or its corresponding operation
127    /// has been pruned.
128    pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
129        let oldest = self.oldest_retained_loc();
130        let iter = self.snapshot.get(key);
131        for &loc in iter {
132            if loc < oldest {
133                continue;
134            }
135            if let Some(v) = self.get_from_loc(key, loc).await? {
136                return Ok(Some(v));
137            }
138        }
139
140        Ok(None)
141    }
142
143    /// Get the value of the operation with location `loc` in the db if it matches `key`. Returns
144    /// [Error::OperationPruned] if loc precedes the oldest retained location. The location is
145    /// otherwise assumed valid.
146    async fn get_from_loc(&self, key: &K, loc: Location) -> Result<Option<V>, Error> {
147        if loc < self.oldest_retained_loc() {
148            return Err(Error::OperationPruned(loc));
149        }
150
151        let Operation::Set(k, v) = self.journal.read(loc).await? else {
152            return Err(Error::UnexpectedData(loc));
153        };
154
155        if k != *key {
156            Ok(None)
157        } else {
158            Ok(Some(v))
159        }
160    }
161
162    /// Get the number of operations that have been applied to this db, including those that are not
163    /// yet committed.
164    pub fn op_count(&self) -> Location {
165        self.journal.size()
166    }
167
168    /// Get the metadata associated with the last commit.
169    pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
170        let last_commit_loc = self.last_commit_loc;
171        let Operation::Commit(metadata) = self.journal.read(last_commit_loc).await? else {
172            unreachable!("no commit operation at location of last commit {last_commit_loc}");
173        };
174
175        Ok(metadata)
176    }
177}
178
179// Functionality shared across Merkleized states.
180impl<
181        E: RStorage + Clock + Metrics,
182        K: Array,
183        V: VariableValue,
184        H: CHasher,
185        T: Translator,
186        D: DurabilityState,
187    > Immutable<E, K, V, H, T, Merkleized<H>, D>
188{
189    /// Return the root of the db.
190    pub const fn root(&self) -> H::Digest {
191        self.journal.root()
192    }
193
194    /// Generate and return:
195    ///  1. a proof of all operations applied to the db in the range starting at (and including)
196    ///     location `start_loc`, and ending at the first of either:
197    ///     - the last operation performed, or
198    ///     - the operation `max_ops` from the start.
199    ///  2. the operations corresponding to the leaves in this range.
200    pub async fn proof(
201        &self,
202        start_index: Location,
203        max_ops: NonZeroU64,
204    ) -> Result<(Proof<H::Digest>, Vec<Operation<K, V>>), Error> {
205        let op_count = self.op_count();
206        self.historical_proof(op_count, start_index, max_ops).await
207    }
208
209    /// Analogous to proof but with respect to the state of the database when it had `op_count`
210    /// operations.
211    ///
212    /// # Errors
213    ///
214    /// Returns [crate::mmr::Error::LocationOverflow] if `op_count` or `start_loc` >
215    /// [crate::mmr::MAX_LOCATION].
216    /// Returns [crate::mmr::Error::RangeOutOfBounds] if `op_count` > number of operations, or
217    /// if `start_loc` >= `op_count`.
218    /// Returns [`Error::OperationPruned`] if `start_loc` has been pruned.
219    pub async fn historical_proof(
220        &self,
221        op_count: Location,
222        start_loc: Location,
223        max_ops: NonZeroU64,
224    ) -> Result<(Proof<H::Digest>, Vec<Operation<K, V>>), Error> {
225        Ok(self
226            .journal
227            .historical_proof(op_count, start_loc, max_ops)
228            .await?)
229    }
230
231    /// Prune historical operations prior to `prune_loc`. This does not affect the db's root or
232    /// current snapshot.
233    ///
234    /// # Errors
235    ///
236    /// - Returns [Error::PruneBeyondMinRequired] if `prune_loc` > inactivity floor.
237    /// - Returns [crate::mmr::Error::LocationOverflow] if `prune_loc` > [crate::mmr::MAX_LOCATION].
238    pub async fn prune(&mut self, loc: Location) -> Result<(), Error> {
239        if loc > self.last_commit_loc {
240            return Err(Error::PruneBeyondMinRequired(loc, self.last_commit_loc));
241        }
242        self.journal.prune(loc).await?;
243
244        Ok(())
245    }
246}
247
248// Functionality specific to (Merkleized, Durable) state.
249impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
250    Immutable<E, K, V, H, T, Merkleized<H>, Durable>
251{
252    /// Returns an [Immutable] qmdb initialized from `cfg`. Any uncommitted log operations will be
253    /// discarded and the state of the db will be as of the last committed operation.
254    pub async fn init(
255        context: E,
256        cfg: Config<T, <Operation<K, V> as Read>::Cfg>,
257    ) -> Result<Self, Error> {
258        let mmr_cfg = MmrConfig {
259            journal_partition: cfg.mmr_journal_partition,
260            metadata_partition: cfg.mmr_metadata_partition,
261            items_per_blob: cfg.mmr_items_per_blob,
262            write_buffer: cfg.mmr_write_buffer,
263            thread_pool: cfg.thread_pool,
264            buffer_pool: cfg.buffer_pool.clone(),
265        };
266
267        let journal_cfg = JournalConfig {
268            partition: cfg.log_partition,
269            items_per_section: cfg.log_items_per_section,
270            compression: cfg.log_compression,
271            codec_config: cfg.log_codec_config,
272            buffer_pool: cfg.buffer_pool.clone(),
273            write_buffer: cfg.log_write_buffer,
274        };
275
276        let mut journal = Journal::new(
277            context.clone(),
278            mmr_cfg,
279            journal_cfg,
280            Operation::<K, V>::is_commit,
281        )
282        .await?;
283
284        if journal.size() == 0 {
285            warn!("Authenticated log is empty, initialized new db.");
286            journal.append(Operation::Commit(None)).await?;
287            journal.sync().await?;
288        }
289
290        let mut snapshot = Index::new(context.with_label("snapshot"), cfg.translator.clone());
291
292        // Get the start of the log.
293        let start_loc = journal.pruning_boundary();
294
295        // Build snapshot from the log.
296        build_snapshot_from_log(start_loc, &journal.journal, &mut snapshot, |_, _| {}).await?;
297
298        let last_commit_loc = journal.size().checked_sub(1).expect("commit should exist");
299
300        Ok(Self {
301            journal,
302            snapshot,
303            last_commit_loc,
304            _durable: core::marker::PhantomData,
305        })
306    }
307
308    /// The number of operations to apply to the MMR in a single batch.
309    const APPLY_BATCH_SIZE: u64 = 1 << 16;
310
311    /// Returns an [Immutable] built from the config and sync data in `cfg`.
312    #[allow(clippy::type_complexity)]
313    pub async fn init_synced(
314        context: E,
315        cfg: sync::Config<E, K, V, T, H::Digest, <Operation<K, V> as Read>::Cfg>,
316    ) -> Result<Self, Error> {
317        let mut hasher = Standard::new();
318
319        // Initialize MMR for sync
320        let mmr = Mmr::init_sync(
321            context.with_label("mmr"),
322            crate::mmr::journaled::SyncConfig {
323                config: MmrConfig {
324                    journal_partition: cfg.db_config.mmr_journal_partition,
325                    metadata_partition: cfg.db_config.mmr_metadata_partition,
326                    items_per_blob: cfg.db_config.mmr_items_per_blob,
327                    write_buffer: cfg.db_config.mmr_write_buffer,
328                    thread_pool: cfg.db_config.thread_pool.clone(),
329                    buffer_pool: cfg.db_config.buffer_pool.clone(),
330                },
331                range: Position::try_from(cfg.range.start)?
332                    ..Position::try_from(cfg.range.end.saturating_add(1))?,
333                pinned_nodes: cfg.pinned_nodes,
334            },
335            &mut hasher,
336        )
337        .await?;
338
339        let journal = Journal::<_, _, _, _, Merkleized<H>>::from_components(
340            mmr,
341            cfg.log,
342            hasher,
343            Self::APPLY_BATCH_SIZE,
344        )
345        .await?;
346
347        let mut snapshot: Index<T, Location> = Index::new(
348            context.with_label("snapshot"),
349            cfg.db_config.translator.clone(),
350        );
351
352        // Get the start of the log.
353        let start_loc = journal.pruning_boundary();
354
355        // Build snapshot from the log
356        build_snapshot_from_log(start_loc, &journal.journal, &mut snapshot, |_, _| {}).await?;
357
358        let last_commit_loc = journal.size().checked_sub(1).expect("commit should exist");
359
360        let mut db = Self {
361            journal,
362            snapshot,
363            last_commit_loc,
364            _durable: core::marker::PhantomData,
365        };
366
367        db.sync().await?;
368        Ok(db)
369    }
370
371    /// Sync all database state to disk. While this isn't necessary to ensure durability of
372    /// committed operations, periodic invocation may reduce memory usage and the time required to
373    /// recover the database on restart.
374    pub async fn sync(&mut self) -> Result<(), Error> {
375        Ok(self.journal.sync().await?)
376    }
377
378    /// Destroy the db, removing all data from disk.
379    pub async fn destroy(self) -> Result<(), Error> {
380        Ok(self.journal.destroy().await?)
381    }
382
383    /// Convert this database into a mutable state for batched updates.
384    pub fn into_mutable(self) -> Immutable<E, K, V, H, T, Unmerkleized, NonDurable> {
385        Immutable {
386            journal: self.journal.into_dirty(),
387            snapshot: self.snapshot,
388            last_commit_loc: self.last_commit_loc,
389            _durable: core::marker::PhantomData,
390        }
391    }
392}
393
394// Functionality specific to (Unmerkleized, Durable) state.
395impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
396    Immutable<E, K, V, H, T, Unmerkleized, Durable>
397{
398    /// Convert this database into a mutable state for batched updates.
399    pub fn into_mutable(self) -> Immutable<E, K, V, H, T, Unmerkleized, NonDurable> {
400        Immutable {
401            journal: self.journal,
402            snapshot: self.snapshot,
403            last_commit_loc: self.last_commit_loc,
404            _durable: core::marker::PhantomData,
405        }
406    }
407
408    /// Convert to merkleized state.
409    pub fn into_merkleized(self) -> Immutable<E, K, V, H, T, Merkleized<H>, Durable> {
410        Immutable {
411            journal: self.journal.merkleize(),
412            snapshot: self.snapshot,
413            last_commit_loc: self.last_commit_loc,
414            _durable: core::marker::PhantomData,
415        }
416    }
417}
418
419// Functionality specific to (Merkleized, NonDurable) state.
420impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
421    Immutable<E, K, V, H, T, Merkleized<H>, NonDurable>
422{
423    /// Convert this database into a mutable state for batched updates.
424    pub fn into_mutable(self) -> Immutable<E, K, V, H, T, Unmerkleized, NonDurable> {
425        Immutable {
426            journal: self.journal.into_dirty(),
427            snapshot: self.snapshot,
428            last_commit_loc: self.last_commit_loc,
429            _durable: core::marker::PhantomData,
430        }
431    }
432}
433
434// Functionality specific to (Unmerkleized, NonDurable) state - the mutable state.
435impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
436    Immutable<E, K, V, H, T, Unmerkleized, NonDurable>
437{
438    /// Update the operations MMR with the given operation, and append the operation to the log. The
439    /// `commit` method must be called to make any applied operation persistent & recoverable.
440    pub(super) async fn apply_op(&mut self, op: Operation<K, V>) -> Result<(), Error> {
441        self.journal.append(op).await?;
442
443        Ok(())
444    }
445
446    /// Sets `key` to have value `value`, assuming `key` hasn't already been assigned. The operation
447    /// is reflected in the snapshot, but will be subject to rollback until the next successful
448    /// `commit`. Attempting to set an already-set key results in undefined behavior.
449    ///
450    /// Any keys that have been pruned and map to the same translated key will be dropped
451    /// during this call.
452    pub async fn set(&mut self, key: K, value: V) -> Result<(), Error> {
453        let op_count = self.op_count();
454        let oldest = self.oldest_retained_loc();
455        self.snapshot
456            .insert_and_prune(&key, op_count, |v| *v < oldest);
457
458        let op = Operation::Set(key, value);
459        self.apply_op(op).await
460    }
461
462    /// Commit any pending operations to the database, ensuring their durability upon return from
463    /// this function. Caller can associate an arbitrary `metadata` value with the commit.
464    /// Returns the committed database and the range of committed locations. Note that even if no
465    /// operations were added since the last commit, this is a root-state changing operation.
466    pub async fn commit(
467        mut self,
468        metadata: Option<V>,
469    ) -> Result<
470        (
471            Immutable<E, K, V, H, T, Unmerkleized, Durable>,
472            Range<Location>,
473        ),
474        Error,
475    > {
476        let loc = self.journal.append(Operation::Commit(metadata)).await?;
477        self.journal.commit().await?;
478        self.last_commit_loc = loc;
479        let range = loc..self.op_count();
480
481        let db = Immutable {
482            journal: self.journal,
483            snapshot: self.snapshot,
484            last_commit_loc: self.last_commit_loc,
485            _durable: core::marker::PhantomData,
486        };
487
488        Ok((db, range))
489    }
490
491    /// Convert to merkleized state without committing (for read-only merkle operations).
492    pub fn into_merkleized(self) -> Immutable<E, K, V, H, T, Merkleized<H>, NonDurable> {
493        Immutable {
494            journal: self.journal.merkleize(),
495            snapshot: self.snapshot,
496            last_commit_loc: self.last_commit_loc,
497            _durable: core::marker::PhantomData,
498        }
499    }
500}
501
502impl<
503        E: RStorage + Clock + Metrics,
504        K: Array,
505        V: VariableValue,
506        H: CHasher,
507        T: Translator,
508        M: MerkleizationState<DigestOf<H>> + Send + Sync,
509        D: DurabilityState,
510    > kv::Gettable for Immutable<E, K, V, H, T, M, D>
511{
512    type Key = K;
513    type Value = V;
514    type Error = Error;
515
516    async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
517        self.get(key).await
518    }
519}
520
521impl<
522        E: RStorage + Clock + Metrics,
523        K: Array,
524        V: VariableValue,
525        H: CHasher,
526        T: Translator,
527        M: MerkleizationState<DigestOf<H>> + Send + Sync,
528        D: DurabilityState,
529    > crate::qmdb::store::LogStore for Immutable<E, K, V, H, T, M, D>
530{
531    type Value = V;
532
533    fn op_count(&self) -> Location {
534        self.op_count()
535    }
536
537    // All unpruned operations are active in an immutable store.
538    fn inactivity_floor_loc(&self) -> Location {
539        self.journal.pruning_boundary()
540    }
541
542    fn is_empty(&self) -> bool {
543        self.op_count() == 0
544    }
545
546    async fn get_metadata(&self) -> Result<Option<V>, Error> {
547        self.get_metadata().await
548    }
549}
550
551impl<
552        E: RStorage + Clock + Metrics,
553        K: Array,
554        V: VariableValue,
555        H: CHasher,
556        T: Translator,
557        D: DurabilityState,
558    > crate::qmdb::store::MerkleizedStore for Immutable<E, K, V, H, T, Merkleized<H>, D>
559{
560    type Digest = H::Digest;
561    type Operation = Operation<K, V>;
562
563    fn root(&self) -> Self::Digest {
564        self.root()
565    }
566
567    async fn historical_proof(
568        &self,
569        historical_size: Location,
570        start_loc: Location,
571        max_ops: NonZeroU64,
572    ) -> Result<(Proof<Self::Digest>, Vec<Self::Operation>), Error> {
573        self.historical_proof(historical_size, start_loc, max_ops)
574            .await
575    }
576}
577
578impl<
579        E: RStorage + Clock + Metrics,
580        K: Array,
581        V: VariableValue,
582        H: CHasher,
583        T: Translator,
584        D: DurabilityState,
585    > crate::qmdb::store::PrunableStore for Immutable<E, K, V, H, T, Merkleized<H>, D>
586{
587    async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> {
588        self.prune(prune_loc).await
589    }
590}
591
592#[cfg(test)]
593pub(super) mod test {
594    use super::*;
595    use crate::{qmdb::verify_proof, translator::TwoCap};
596    use commonware_cryptography::{sha256::Digest, Sha256};
597    use commonware_macros::test_traced;
598    use commonware_runtime::{
599        deterministic::{self},
600        Runner as _,
601    };
602    use commonware_utils::{NZUsize, NZU16, NZU64};
603    use std::num::NonZeroU16;
604
605    const PAGE_SIZE: NonZeroU16 = NZU16!(77);
606    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(9);
607    const ITEMS_PER_SECTION: u64 = 5;
608
609    pub(crate) fn db_config(
610        suffix: &str,
611    ) -> Config<TwoCap, (commonware_codec::RangeCfg<usize>, ())> {
612        Config {
613            mmr_journal_partition: format!("journal_{suffix}"),
614            mmr_metadata_partition: format!("metadata_{suffix}"),
615            mmr_items_per_blob: NZU64!(11),
616            mmr_write_buffer: NZUsize!(1024),
617            log_partition: format!("log_{suffix}"),
618            log_items_per_section: NZU64!(ITEMS_PER_SECTION),
619            log_compression: None,
620            log_codec_config: ((0..=10000).into(), ()),
621            log_write_buffer: NZUsize!(1024),
622            translator: TwoCap,
623            thread_pool: None,
624            buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
625        }
626    }
627
628    /// Return an [Immutable] database initialized with a fixed config.
629    async fn open_db(
630        context: deterministic::Context,
631    ) -> Immutable<deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap> {
632        Immutable::init(context, db_config("partition"))
633            .await
634            .unwrap()
635    }
636
637    #[test_traced("WARN")]
638    pub fn test_immutable_db_empty() {
639        let executor = deterministic::Runner::default();
640        executor.start(|context| async move {
641            let db = open_db(context.clone()).await;
642            assert_eq!(db.op_count(), 1);
643            assert_eq!(db.oldest_retained_loc(), Location::new_unchecked(0));
644            assert!(db.get_metadata().await.unwrap().is_none());
645
646            // Make sure closing/reopening gets us back to the same state, even after adding an uncommitted op.
647            let k1 = Sha256::fill(1u8);
648            let v1 = vec![4, 5, 6, 7];
649            let root = db.root();
650            let mut db = db.into_mutable();
651            db.set(k1, v1).await.unwrap();
652            drop(db); // Simulate failed commit
653            let db = open_db(context.clone()).await;
654            assert_eq!(db.root(), root);
655            assert_eq!(db.op_count(), 1);
656
657            // Test calling commit on an empty db which should make it (durably) non-empty.
658            let db = db.into_mutable();
659            let (durable_db, _) = db.commit(None).await.unwrap();
660            let db = durable_db.into_merkleized();
661            assert_eq!(db.op_count(), 2); // commit op added
662            let root = db.root();
663            drop(db);
664
665            let db = open_db(context.clone()).await;
666            assert_eq!(db.root(), root);
667
668            db.destroy().await.unwrap();
669        });
670    }
671
672    #[test_traced("DEBUG")]
673    pub fn test_immutable_db_build_basic() {
674        let executor = deterministic::Runner::default();
675        executor.start(|context| async move {
676            // Build a db with 2 keys.
677            let db = open_db(context.clone()).await;
678
679            let k1 = Sha256::fill(1u8);
680            let k2 = Sha256::fill(2u8);
681            let v1 = vec![1, 2, 3];
682            let v2 = vec![4, 5, 6, 7, 8];
683
684            assert!(db.get(&k1).await.unwrap().is_none());
685            assert!(db.get(&k2).await.unwrap().is_none());
686
687            // Set the first key.
688            let mut db = db.into_mutable();
689            db.set(k1, v1.clone()).await.unwrap();
690            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
691            assert!(db.get(&k2).await.unwrap().is_none());
692            assert_eq!(db.op_count(), 2);
693            // Commit the first key.
694            let metadata = Some(vec![99, 100]);
695            let (durable_db, _) = db.commit(metadata.clone()).await.unwrap();
696            let db = durable_db.into_merkleized();
697            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
698            assert!(db.get(&k2).await.unwrap().is_none());
699            assert_eq!(db.op_count(), 3);
700            assert_eq!(db.get_metadata().await.unwrap(), metadata.clone());
701            // Set the second key.
702            let mut db = db.into_mutable();
703            db.set(k2, v2.clone()).await.unwrap();
704            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
705            assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
706            assert_eq!(db.op_count(), 4);
707
708            // Make sure we can still get metadata.
709            assert_eq!(db.get_metadata().await.unwrap(), metadata);
710
711            // Commit the second key.
712            let (durable_db, _) = db.commit(None).await.unwrap();
713            let db = durable_db.into_merkleized();
714            assert_eq!(db.op_count(), 5);
715            assert_eq!(db.get_metadata().await.unwrap(), None);
716
717            // Capture state.
718            let root = db.root();
719
720            // Add an uncommitted op then simulate failure.
721            let k3 = Sha256::fill(3u8);
722            let v3 = vec![9, 10, 11];
723            let mut db = db.into_mutable();
724            db.set(k3, v3).await.unwrap();
725            assert_eq!(db.op_count(), 6);
726
727            // Reopen, make sure state is restored to last commit point.
728            drop(db); // Simulate failed commit
729            let db = open_db(context.clone()).await;
730            assert!(db.get(&k3).await.unwrap().is_none());
731            assert_eq!(db.op_count(), 5);
732            assert_eq!(db.root(), root);
733            assert_eq!(db.get_metadata().await.unwrap(), None);
734
735            // Cleanup.
736            db.destroy().await.unwrap();
737        });
738    }
739
740    #[test_traced("WARN")]
741    pub fn test_immutable_db_build_and_authenticate() {
742        let executor = deterministic::Runner::default();
743        // Build a db with `ELEMENTS` key/value pairs and prove ranges over them.
744        const ELEMENTS: u64 = 2_000;
745        executor.start(|context| async move {
746            let mut hasher = Standard::<Sha256>::new();
747            let db = open_db(context.clone()).await;
748            let mut db = db.into_mutable();
749
750            for i in 0u64..ELEMENTS {
751                let k = Sha256::hash(&i.to_be_bytes());
752                let v = vec![i as u8; 100];
753                db.set(k, v).await.unwrap();
754            }
755
756            assert_eq!(db.op_count(), ELEMENTS + 1);
757
758            let (durable_db, _) = db.commit(None).await.unwrap();
759            let db = durable_db.into_merkleized();
760            assert_eq!(db.op_count(), ELEMENTS + 2);
761
762            // Drop & reopen the db, making sure it has exactly the same state.
763            let root = db.root();
764            drop(db);
765
766            let db = open_db(context.clone()).await;
767            assert_eq!(root, db.root());
768            assert_eq!(db.op_count(), ELEMENTS + 2);
769            for i in 0u64..ELEMENTS {
770                let k = Sha256::hash(&i.to_be_bytes());
771                let v = vec![i as u8; 100];
772                assert_eq!(db.get(&k).await.unwrap().unwrap(), v);
773            }
774
775            // Make sure all ranges of 5 operations are provable, including truncated ranges at the
776            // end.
777            let max_ops = NZU64!(5);
778            for i in 0..*db.op_count() {
779                let (proof, log) = db.proof(Location::new_unchecked(i), max_ops).await.unwrap();
780                assert!(verify_proof(
781                    &mut hasher,
782                    &proof,
783                    Location::new_unchecked(i),
784                    &log,
785                    &root
786                ));
787            }
788
789            db.destroy().await.unwrap();
790        });
791    }
792
793    #[test_traced("WARN")]
794    pub fn test_immutable_db_recovery_from_failed_mmr_sync() {
795        let executor = deterministic::Runner::default();
796        executor.start(|context| async move {
797            // Insert 1000 keys then sync.
798            const ELEMENTS: u64 = 1000;
799            let db = open_db(context.clone()).await;
800            let mut db = db.into_mutable();
801
802            for i in 0u64..ELEMENTS {
803                let k = Sha256::hash(&i.to_be_bytes());
804                let v = vec![i as u8; 100];
805                db.set(k, v).await.unwrap();
806            }
807
808            assert_eq!(db.op_count(), ELEMENTS + 1);
809            let (durable_db, _) = db.commit(None).await.unwrap();
810            let mut db = durable_db.into_merkleized();
811            db.sync().await.unwrap();
812            let halfway_root = db.root();
813
814            // Insert another 1000 keys then simulate a failed close and test recovery.
815            let mut db = db.into_mutable();
816            for i in 0u64..ELEMENTS {
817                let k = Sha256::hash(&i.to_be_bytes());
818                let v = vec![i as u8; 100];
819                db.set(k, v).await.unwrap();
820            }
821
822            // Commit without merkleizing the MMR, then drop to simulate failure.
823            // The commit persists the data to the journal, but the MMR is not synced.
824            let (durable_db, _) = db.commit(None).await.unwrap();
825            drop(durable_db); // Drop before merkleizing
826
827            // Recovery should replay the log to regenerate the MMR.
828            // op_count = 1002 (first batch + commit) + 1000 (second batch) + 1 (second commit) = 2003
829            let db = open_db(context.clone()).await;
830            assert_eq!(db.op_count(), 2003);
831            let root = db.root();
832            assert_ne!(root, halfway_root);
833
834            // Drop & reopen could preserve the final commit.
835            drop(db);
836            let db = open_db(context.clone()).await;
837            assert_eq!(db.op_count(), 2003);
838            assert_eq!(db.root(), root);
839
840            db.destroy().await.unwrap();
841        });
842    }
843
844    #[test_traced("WARN")]
845    pub fn test_immutable_db_recovery_from_failed_log_sync() {
846        let executor = deterministic::Runner::default();
847        executor.start(|context| async move {
848            let mut db = open_db(context.clone()).await.into_mutable();
849
850            // Insert a single key and then commit to create a first commit point.
851            let k1 = Sha256::fill(1u8);
852            let v1 = vec![1, 2, 3];
853            db.set(k1, v1).await.unwrap();
854            let (durable_db, _) = db.commit(None).await.unwrap();
855            let db = durable_db.into_merkleized();
856            let first_commit_root = db.root();
857
858            // Insert 1000 keys then sync.
859            const ELEMENTS: u64 = 1000;
860
861            let mut db = db.into_mutable();
862            for i in 0u64..ELEMENTS {
863                let k = Sha256::hash(&i.to_be_bytes());
864                let v = vec![i as u8; 100];
865                db.set(k, v).await.unwrap();
866            }
867
868            assert_eq!(db.op_count(), ELEMENTS + 3);
869
870            // Insert another 1000 keys then simulate a failed close and test recovery.
871            for i in 0u64..ELEMENTS {
872                let k = Sha256::hash(&i.to_be_bytes());
873                let v = vec![i as u8; 100];
874                db.set(k, v).await.unwrap();
875            }
876
877            // Simulate failure.
878            drop(db);
879
880            // Recovery should back up to previous commit point.
881            let db = open_db(context.clone()).await;
882            assert_eq!(db.op_count(), 3);
883            let root = db.root();
884            assert_eq!(root, first_commit_root);
885
886            db.destroy().await.unwrap();
887        });
888    }
889
890    #[test_traced("WARN")]
891    pub fn test_immutable_db_pruning() {
892        let executor = deterministic::Runner::default();
893        // Build a db with `ELEMENTS` key/value pairs then prune some of them.
894        const ELEMENTS: u64 = 2_000;
895        executor.start(|context| async move {
896            let db = open_db(context.clone()).await;
897            let mut db = db.into_mutable();
898
899            for i in 1u64..ELEMENTS+1 {
900                let k = Sha256::hash(&i.to_be_bytes());
901                let v = vec![i as u8; 100];
902                db.set(k, v).await.unwrap();
903            }
904
905            assert_eq!(db.op_count(), ELEMENTS + 1);
906
907            let (durable_db, _) = db.commit(None).await.unwrap();
908            let mut db = durable_db.into_merkleized();
909            assert_eq!(db.op_count(), ELEMENTS + 2);
910
911            // Prune the db to the first half of the operations.
912            db.prune(Location::new_unchecked((ELEMENTS+2) / 2))
913                .await
914                .unwrap();
915            assert_eq!(db.op_count(), ELEMENTS + 2);
916
917            // items_per_section is 5, so half should be exactly at a blob boundary, in which case
918            // the actual pruning location should match the requested.
919            let oldest_retained_loc = db.oldest_retained_loc();
920            assert_eq!(oldest_retained_loc, Location::new_unchecked(ELEMENTS / 2));
921
922            // Try to fetch a pruned key.
923            let pruned_loc = oldest_retained_loc - 1;
924            let pruned_key = Sha256::hash(&pruned_loc.to_be_bytes());
925            assert!(db.get(&pruned_key).await.unwrap().is_none());
926
927            // Try to fetch unpruned key.
928            let unpruned_key = Sha256::hash(&oldest_retained_loc.to_be_bytes());
929            assert!(db.get(&unpruned_key).await.unwrap().is_some());
930
931            // Drop & reopen the db, making sure it has exactly the same state.
932            let root = db.root();
933            db.sync().await.unwrap();
934            drop(db);
935
936            let mut db = open_db(context.clone()).await;
937            assert_eq!(root, db.root());
938            assert_eq!(db.op_count(), ELEMENTS + 2);
939            let oldest_retained_loc = db.oldest_retained_loc();
940            assert_eq!(oldest_retained_loc, Location::new_unchecked(ELEMENTS / 2));
941
942            // Prune to a non-blob boundary.
943            let loc = Location::new_unchecked(ELEMENTS / 2 + (ITEMS_PER_SECTION * 2 - 1));
944            db.prune(loc).await.unwrap();
945            // Actual boundary should be a multiple of 5.
946            let oldest_retained_loc = db.oldest_retained_loc();
947            assert_eq!(
948                oldest_retained_loc,
949                Location::new_unchecked(ELEMENTS / 2 + ITEMS_PER_SECTION)
950            );
951
952            // Confirm boundary persists across restart.
953            db.sync().await.unwrap();
954            drop(db);
955            let db = open_db(context.clone()).await;
956            let oldest_retained_loc = db.oldest_retained_loc();
957            assert_eq!(
958                oldest_retained_loc,
959                Location::new_unchecked(ELEMENTS / 2 + ITEMS_PER_SECTION)
960            );
961
962            // Try to fetch a pruned key.
963            let pruned_loc = oldest_retained_loc - 3;
964            let pruned_key = Sha256::hash(&pruned_loc.to_be_bytes());
965            assert!(db.get(&pruned_key).await.unwrap().is_none());
966
967            // Try to fetch unpruned key.
968            let unpruned_key = Sha256::hash(&oldest_retained_loc.to_be_bytes());
969            assert!(db.get(&unpruned_key).await.unwrap().is_some());
970
971            // Confirm behavior of trying to create a proof of pruned items is as expected.
972            let pruned_pos = ELEMENTS / 2;
973            let proof_result = db
974                .proof(
975                    Location::new_unchecked(pruned_pos),
976                    NZU64!(pruned_pos + 100),
977                )
978                .await;
979            assert!(matches!(proof_result, Err(Error::Journal(crate::journal::Error::ItemPruned(pos))) if pos == pruned_pos));
980
981            db.destroy().await.unwrap();
982        });
983    }
984
985    #[test_traced("INFO")]
986    pub fn test_immutable_db_prune_beyond_commit() {
987        let executor = deterministic::Runner::default();
988        executor.start(|context| async move {
989            let mut db = open_db(context.clone()).await;
990
991            // Test pruning empty database (no commits)
992            let result = db.prune(Location::new_unchecked(1)).await;
993            assert!(
994                matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
995                    if prune_loc == Location::new_unchecked(1) && commit_loc == Location::new_unchecked(0))
996            );
997
998            // Add key-value pairs and commit
999            let k1 = Digest::from(*b"12345678901234567890123456789012");
1000            let k2 = Digest::from(*b"abcdefghijklmnopqrstuvwxyz123456");
1001            let k3 = Digest::from(*b"99999999999999999999999999999999");
1002            let v1 = vec![1u8; 16];
1003            let v2 = vec![2u8; 16];
1004            let v3 = vec![3u8; 16];
1005
1006            let mut db = db.into_mutable();
1007            db.set(k1, v1.clone()).await.unwrap();
1008            db.set(k2, v2.clone()).await.unwrap();
1009            let (durable_db, _) = db.commit(None).await.unwrap();
1010            let db = durable_db.into_merkleized();
1011            let mut db = db.into_mutable();
1012            db.set(k3, v3.clone()).await.unwrap();
1013
1014            // op_count is 5 (initial_commit, k1, k2, commit, k3), last_commit is at location 3
1015            assert_eq!(*db.last_commit_loc, 3);
1016
1017            // Test valid prune (at last commit) - need Merkleized state for prune
1018            let (durable_db, _) = db.commit(None).await.unwrap();
1019            let mut db = durable_db.into_merkleized();
1020            assert!(db.prune(Location::new_unchecked(3)).await.is_ok());
1021
1022            // Test pruning beyond last commit
1023            let new_last_commit = db.last_commit_loc;
1024            let beyond = new_last_commit + 1;
1025            let result = db.prune(beyond).await;
1026            assert!(
1027                matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
1028                    if prune_loc == beyond && commit_loc == new_last_commit)
1029            );
1030
1031            db.destroy().await.unwrap();
1032        });
1033    }
1034
1035    use crate::{
1036        kv::tests::{assert_gettable, assert_send},
1037        qmdb::store::tests::{assert_log_store, assert_merkleized_store, assert_prunable_store},
1038    };
1039
1040    type MerkleizedDb =
1041        Immutable<deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap, Merkleized<Sha256>>;
1042    type MutableDb = Immutable<
1043        deterministic::Context,
1044        Digest,
1045        Vec<u8>,
1046        Sha256,
1047        TwoCap,
1048        Unmerkleized,
1049        NonDurable,
1050    >;
1051
1052    #[allow(dead_code)]
1053    fn assert_merkleized_db_futures_are_send(db: &mut MerkleizedDb, key: Digest, loc: Location) {
1054        assert_gettable(db, &key);
1055        assert_log_store(db);
1056        assert_prunable_store(db, loc);
1057        assert_merkleized_store(db, loc);
1058        assert_send(db.sync());
1059    }
1060
1061    #[allow(dead_code)]
1062    fn assert_mutable_db_futures_are_send(db: &mut MutableDb, key: Digest, value: Vec<u8>) {
1063        assert_gettable(db, &key);
1064        assert_log_store(db);
1065        assert_send(db.set(key, value));
1066    }
1067
1068    #[allow(dead_code)]
1069    fn assert_mutable_db_commit_is_send(db: MutableDb) {
1070        assert_send(db.commit(None));
1071    }
1072}