Skip to main content

commonware_storage/qmdb/immutable/
mod.rs

1//! An authenticated database that only supports adding new keyed values (no updates or
2//! deletions), where values can have varying sizes.
3
4use crate::{
5    index::{unordered::Index, Unordered as _},
6    journal::{
7        authenticated,
8        contiguous::variable::{self, Config as JournalConfig},
9    },
10    kv,
11    mmr::{journaled::Config as MmrConfig, Location, Proof},
12    qmdb::{
13        any::VariableValue, build_snapshot_from_log, DurabilityState, Durable, Error,
14        MerkleizationState, Merkleized, NonDurable, Unmerkleized,
15    },
16    translator::Translator,
17};
18use commonware_codec::Read;
19use commonware_cryptography::{DigestOf, Hasher as CHasher};
20use commonware_parallel::ThreadPool;
21use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage as RStorage};
22use commonware_utils::Array;
23use std::{
24    num::{NonZeroU64, NonZeroUsize},
25    ops::Range,
26};
27use tracing::warn;
28
29mod operation;
30pub use operation::Operation;
31
32type Journal<E, K, V, H, S> =
33    authenticated::Journal<E, variable::Journal<E, Operation<K, V>>, H, S>;
34
35pub mod sync;
36
37/// Configuration for an [Immutable] authenticated db.
38#[derive(Clone)]
39pub struct Config<T: Translator, C> {
40    /// The name of the [RStorage] partition used for the MMR's backing journal.
41    pub mmr_journal_partition: String,
42
43    /// The items per blob configuration value used by the MMR journal.
44    pub mmr_items_per_blob: NonZeroU64,
45
46    /// The size of the write buffer to use for each blob in the MMR journal.
47    pub mmr_write_buffer: NonZeroUsize,
48
49    /// The name of the [RStorage] partition used for the MMR's metadata.
50    pub mmr_metadata_partition: String,
51
52    /// The name of the [RStorage] partition used to persist the log of operations.
53    pub log_partition: String,
54
55    /// The size of the write buffer to use for each blob in the log journal.
56    pub log_write_buffer: NonZeroUsize,
57
58    /// Optional compression level (using `zstd`) to apply to log data before storing.
59    pub log_compression: Option<u8>,
60
61    /// The codec configuration to use for encoding and decoding log items.
62    pub log_codec_config: C,
63
64    /// The number of items to put in each section of the journal.
65    pub log_items_per_section: NonZeroU64,
66
67    /// The translator used by the compressed index.
68    pub translator: T,
69
70    /// An optional thread pool to use for parallelizing batch operations.
71    pub thread_pool: Option<ThreadPool>,
72
73    /// The page cache to use for caching data.
74    pub page_cache: CacheRef,
75}
76
77/// An authenticated database that only supports adding new keyed values (no updates or
78/// deletions), where values can have varying sizes.
79pub struct Immutable<
80    E: RStorage + Clock + Metrics,
81    K: Array,
82    V: VariableValue,
83    H: CHasher,
84    T: Translator,
85    M: MerkleizationState<DigestOf<H>> + Send + Sync = Merkleized<H>,
86    D: DurabilityState = Durable,
87> {
88    /// Authenticated journal of operations.
89    journal: Journal<E, K, V, H, M>,
90
91    /// A map from each active key to the location of the operation that set its value.
92    ///
93    /// # Invariant
94    ///
95    /// Only references operations of type [Operation::Set].
96    snapshot: Index<T, Location>,
97
98    /// The location of the last commit operation.
99    last_commit_loc: Location,
100
101    /// Marker for the durability state.
102    _durable: core::marker::PhantomData<D>,
103}
104
105// Functionality shared across all DB states.
106impl<
107        E: RStorage + Clock + Metrics,
108        K: Array,
109        V: VariableValue,
110        H: CHasher,
111        T: Translator,
112        M: MerkleizationState<DigestOf<H>> + Send + Sync,
113        D: DurabilityState,
114    > Immutable<E, K, V, H, T, M, D>
115{
116    /// Return the Location of the next operation appended to this db.
117    pub fn size(&self) -> Location {
118        self.bounds().end
119    }
120
121    /// Return [start, end) where `start` and `end - 1` are the Locations of the oldest and newest
122    /// retained operations respectively.
123    pub fn bounds(&self) -> std::ops::Range<Location> {
124        self.journal.bounds()
125    }
126
127    /// Get the value of `key` in the db, or None if it has no value or its corresponding operation
128    /// has been pruned.
129    pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
130        let oldest = self.journal.bounds().start;
131        let iter = self.snapshot.get(key);
132        for &loc in iter {
133            if loc < oldest {
134                continue;
135            }
136            if let Some(v) = self.get_from_loc(key, loc).await? {
137                return Ok(Some(v));
138            }
139        }
140
141        Ok(None)
142    }
143
144    /// Get the value of the operation with location `loc` in the db if it matches `key`. Returns
145    /// [Error::OperationPruned] if loc precedes the oldest retained location. The location is
146    /// otherwise assumed valid.
147    async fn get_from_loc(&self, key: &K, loc: Location) -> Result<Option<V>, Error> {
148        if loc < self.journal.bounds().start {
149            return Err(Error::OperationPruned(loc));
150        }
151
152        let Operation::Set(k, v) = self.journal.read(loc).await? else {
153            return Err(Error::UnexpectedData(loc));
154        };
155
156        if k != *key {
157            Ok(None)
158        } else {
159            Ok(Some(v))
160        }
161    }
162
163    /// Get the metadata associated with the last commit.
164    pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
165        let last_commit_loc = self.last_commit_loc;
166        let Operation::Commit(metadata) = self.journal.read(last_commit_loc).await? else {
167            unreachable!("no commit operation at location of last commit {last_commit_loc}");
168        };
169
170        Ok(metadata)
171    }
172}
173
174// Functionality shared across Merkleized states.
175impl<
176        E: RStorage + Clock + Metrics,
177        K: Array,
178        V: VariableValue,
179        H: CHasher,
180        T: Translator,
181        D: DurabilityState,
182    > Immutable<E, K, V, H, T, Merkleized<H>, D>
183{
184    /// Return the root of the db.
185    pub const fn root(&self) -> H::Digest {
186        self.journal.root()
187    }
188
189    /// Generate and return:
190    ///  1. a proof of all operations applied to the db in the range starting at (and including)
191    ///     location `start_loc`, and ending at the first of either:
192    ///     - the last operation performed, or
193    ///     - the operation `max_ops` from the start.
194    ///  2. the operations corresponding to the leaves in this range.
195    pub async fn proof(
196        &self,
197        start_index: Location,
198        max_ops: NonZeroU64,
199    ) -> Result<(Proof<H::Digest>, Vec<Operation<K, V>>), Error> {
200        let op_count = self.bounds().end;
201        self.historical_proof(op_count, start_index, max_ops).await
202    }
203
204    /// Analogous to proof but with respect to the state of the database when it had `op_count`
205    /// operations.
206    ///
207    /// # Errors
208    ///
209    /// Returns [crate::mmr::Error::LocationOverflow] if `op_count` or `start_loc` >
210    /// [crate::mmr::MAX_LOCATION].
211    /// Returns [crate::mmr::Error::RangeOutOfBounds] if `op_count` > number of operations, or
212    /// if `start_loc` >= `op_count`.
213    /// Returns [`Error::OperationPruned`] if `start_loc` has been pruned.
214    pub async fn historical_proof(
215        &self,
216        op_count: Location,
217        start_loc: Location,
218        max_ops: NonZeroU64,
219    ) -> Result<(Proof<H::Digest>, Vec<Operation<K, V>>), Error> {
220        Ok(self
221            .journal
222            .historical_proof(op_count, start_loc, max_ops)
223            .await?)
224    }
225
226    /// Prune historical operations prior to `prune_loc`. This does not affect the db's root or
227    /// current snapshot.
228    ///
229    /// # Errors
230    ///
231    /// - Returns [Error::PruneBeyondMinRequired] if `prune_loc` > inactivity floor.
232    /// - Returns [crate::mmr::Error::LocationOverflow] if `prune_loc` > [crate::mmr::MAX_LOCATION].
233    pub async fn prune(&mut self, loc: Location) -> Result<(), Error> {
234        if loc > self.last_commit_loc {
235            return Err(Error::PruneBeyondMinRequired(loc, self.last_commit_loc));
236        }
237        self.journal.prune(loc).await?;
238
239        Ok(())
240    }
241}
242
243// Functionality specific to (Merkleized, Durable) state.
244impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
245    Immutable<E, K, V, H, T, Merkleized<H>, Durable>
246{
247    /// Returns an [Immutable] qmdb initialized from `cfg`. Any uncommitted log operations will be
248    /// discarded and the state of the db will be as of the last committed operation.
249    pub async fn init(
250        context: E,
251        cfg: Config<T, <Operation<K, V> as Read>::Cfg>,
252    ) -> Result<Self, Error> {
253        let mmr_cfg = MmrConfig {
254            journal_partition: cfg.mmr_journal_partition,
255            metadata_partition: cfg.mmr_metadata_partition,
256            items_per_blob: cfg.mmr_items_per_blob,
257            write_buffer: cfg.mmr_write_buffer,
258            thread_pool: cfg.thread_pool,
259            page_cache: cfg.page_cache.clone(),
260        };
261
262        let journal_cfg = JournalConfig {
263            partition: cfg.log_partition,
264            items_per_section: cfg.log_items_per_section,
265            compression: cfg.log_compression,
266            codec_config: cfg.log_codec_config,
267            page_cache: cfg.page_cache.clone(),
268            write_buffer: cfg.log_write_buffer,
269        };
270
271        let mut journal = Journal::new(
272            context.clone(),
273            mmr_cfg,
274            journal_cfg,
275            Operation::<K, V>::is_commit,
276        )
277        .await?;
278
279        if journal.size() == 0 {
280            warn!("Authenticated log is empty, initialized new db.");
281            let mut dirty_journal = journal.into_dirty();
282            dirty_journal.append(Operation::Commit(None)).await?;
283            journal = dirty_journal.merkleize();
284            journal.sync().await?;
285        }
286
287        let mut snapshot = Index::new(context.with_label("snapshot"), cfg.translator.clone());
288
289        // Get the start of the log.
290        let start_loc = journal.bounds().start;
291
292        // Build snapshot from the log.
293        build_snapshot_from_log(start_loc, &journal.journal, &mut snapshot, |_, _| {}).await?;
294
295        let last_commit_loc = journal.size().checked_sub(1).expect("commit should exist");
296
297        Ok(Self {
298            journal,
299            snapshot,
300            last_commit_loc,
301            _durable: core::marker::PhantomData,
302        })
303    }
304
305    /// Sync all database state to disk. While this isn't necessary to ensure durability of
306    /// committed operations, periodic invocation may reduce memory usage and the time required to
307    /// recover the database on restart.
308    pub async fn sync(&mut self) -> Result<(), Error> {
309        Ok(self.journal.sync().await?)
310    }
311
312    /// Destroy the db, removing all data from disk.
313    pub async fn destroy(self) -> Result<(), Error> {
314        Ok(self.journal.destroy().await?)
315    }
316
317    /// Convert this database into a mutable state for batched updates.
318    pub fn into_mutable(self) -> Immutable<E, K, V, H, T, Unmerkleized, NonDurable> {
319        Immutable {
320            journal: self.journal.into_dirty(),
321            snapshot: self.snapshot,
322            last_commit_loc: self.last_commit_loc,
323            _durable: core::marker::PhantomData,
324        }
325    }
326}
327
328// Functionality specific to (Unmerkleized, Durable) state.
329impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
330    Immutable<E, K, V, H, T, Unmerkleized, Durable>
331{
332    /// Convert this database into a mutable state for batched updates.
333    pub fn into_mutable(self) -> Immutable<E, K, V, H, T, Unmerkleized, NonDurable> {
334        Immutable {
335            journal: self.journal,
336            snapshot: self.snapshot,
337            last_commit_loc: self.last_commit_loc,
338            _durable: core::marker::PhantomData,
339        }
340    }
341
342    /// Convert to merkleized state.
343    pub fn into_merkleized(self) -> Immutable<E, K, V, H, T, Merkleized<H>, Durable> {
344        Immutable {
345            journal: self.journal.merkleize(),
346            snapshot: self.snapshot,
347            last_commit_loc: self.last_commit_loc,
348            _durable: core::marker::PhantomData,
349        }
350    }
351}
352
353// Functionality specific to (Merkleized, NonDurable) state.
354impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
355    Immutable<E, K, V, H, T, Merkleized<H>, NonDurable>
356{
357    /// Convert this database into a mutable state for batched updates.
358    pub fn into_mutable(self) -> Immutable<E, K, V, H, T, Unmerkleized, NonDurable> {
359        Immutable {
360            journal: self.journal.into_dirty(),
361            snapshot: self.snapshot,
362            last_commit_loc: self.last_commit_loc,
363            _durable: core::marker::PhantomData,
364        }
365    }
366}
367
368// Functionality specific to (Unmerkleized, NonDurable) state - the mutable state.
369impl<E: RStorage + Clock + Metrics, K: Array, V: VariableValue, H: CHasher, T: Translator>
370    Immutable<E, K, V, H, T, Unmerkleized, NonDurable>
371{
372    /// Update the operations MMR with the given operation, and append the operation to the log. The
373    /// `commit` method must be called to make any applied operation persistent & recoverable.
374    pub(super) async fn apply_op(&mut self, op: Operation<K, V>) -> Result<(), Error> {
375        self.journal.append(op).await?;
376
377        Ok(())
378    }
379
380    /// Sets `key` to have value `value`, assuming `key` hasn't already been assigned. The operation
381    /// is reflected in the snapshot, but will be subject to rollback until the next successful
382    /// `commit`. Attempting to set an already-set key results in undefined behavior.
383    ///
384    /// Any keys that have been pruned and map to the same translated key will be dropped
385    /// during this call.
386    pub async fn set(&mut self, key: K, value: V) -> Result<(), Error> {
387        let bounds = self.bounds();
388        self.snapshot
389            .insert_and_prune(&key, bounds.end, |v| *v < bounds.start);
390
391        let op = Operation::Set(key, value);
392        self.apply_op(op).await
393    }
394
395    /// Commit any pending operations to the database, ensuring their durability upon return from
396    /// this function. Caller can associate an arbitrary `metadata` value with the commit.
397    /// Returns the committed database and the range of committed locations. Note that even if no
398    /// operations were added since the last commit, this is a root-state changing operation.
399    pub async fn commit(
400        mut self,
401        metadata: Option<V>,
402    ) -> Result<
403        (
404            Immutable<E, K, V, H, T, Unmerkleized, Durable>,
405            Range<Location>,
406        ),
407        Error,
408    > {
409        let loc = self.journal.append(Operation::Commit(metadata)).await?;
410        self.journal.commit().await?;
411        self.last_commit_loc = loc;
412        let range = loc..self.bounds().end;
413
414        let db = Immutable {
415            journal: self.journal,
416            snapshot: self.snapshot,
417            last_commit_loc: self.last_commit_loc,
418            _durable: core::marker::PhantomData,
419        };
420
421        Ok((db, range))
422    }
423
424    /// Convert to merkleized state without committing (for read-only merkle operations).
425    pub fn into_merkleized(self) -> Immutable<E, K, V, H, T, Merkleized<H>, NonDurable> {
426        Immutable {
427            journal: self.journal.merkleize(),
428            snapshot: self.snapshot,
429            last_commit_loc: self.last_commit_loc,
430            _durable: core::marker::PhantomData,
431        }
432    }
433}
434
435impl<
436        E: RStorage + Clock + Metrics,
437        K: Array,
438        V: VariableValue,
439        H: CHasher,
440        T: Translator,
441        M: MerkleizationState<DigestOf<H>> + Send + Sync,
442        D: DurabilityState,
443    > kv::Gettable for Immutable<E, K, V, H, T, M, D>
444{
445    type Key = K;
446    type Value = V;
447    type Error = Error;
448
449    async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
450        self.get(key).await
451    }
452}
453
454impl<
455        E: RStorage + Clock + Metrics,
456        K: Array,
457        V: VariableValue,
458        H: CHasher,
459        T: Translator,
460        M: MerkleizationState<DigestOf<H>> + Send + Sync,
461        D: DurabilityState,
462    > crate::qmdb::store::LogStore for Immutable<E, K, V, H, T, M, D>
463{
464    type Value = V;
465
466    fn bounds(&self) -> std::ops::Range<Location> {
467        self.journal.bounds()
468    }
469
470    // All unpruned operations are active in an immutable store.
471    fn inactivity_floor_loc(&self) -> Location {
472        self.bounds().start
473    }
474
475    fn is_empty(&self) -> bool {
476        self.bounds().is_empty()
477    }
478
479    async fn get_metadata(&self) -> Result<Option<V>, Error> {
480        self.get_metadata().await
481    }
482}
483
484impl<
485        E: RStorage + Clock + Metrics,
486        K: Array,
487        V: VariableValue,
488        H: CHasher,
489        T: Translator,
490        D: DurabilityState,
491    > crate::qmdb::store::MerkleizedStore for Immutable<E, K, V, H, T, Merkleized<H>, D>
492{
493    type Digest = H::Digest;
494    type Operation = Operation<K, V>;
495
496    fn root(&self) -> Self::Digest {
497        self.root()
498    }
499
500    async fn historical_proof(
501        &self,
502        historical_size: Location,
503        start_loc: Location,
504        max_ops: NonZeroU64,
505    ) -> Result<(Proof<Self::Digest>, Vec<Self::Operation>), Error> {
506        self.historical_proof(historical_size, start_loc, max_ops)
507            .await
508    }
509}
510
511impl<
512        E: RStorage + Clock + Metrics,
513        K: Array,
514        V: VariableValue,
515        H: CHasher,
516        T: Translator,
517        D: DurabilityState,
518    > crate::qmdb::store::PrunableStore for Immutable<E, K, V, H, T, Merkleized<H>, D>
519{
520    async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> {
521        self.prune(prune_loc).await
522    }
523}
524
525#[cfg(test)]
526pub(super) mod test {
527    use super::*;
528    use crate::{mmr::StandardHasher, qmdb::verify_proof, translator::TwoCap};
529    use commonware_cryptography::{sha256::Digest, Sha256};
530    use commonware_macros::test_traced;
531    use commonware_runtime::{
532        deterministic::{self},
533        Runner as _,
534    };
535    use commonware_utils::{NZUsize, NZU16, NZU64};
536    use std::num::NonZeroU16;
537
538    const PAGE_SIZE: NonZeroU16 = NZU16!(77);
539    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(9);
540    const ITEMS_PER_SECTION: u64 = 5;
541
542    pub(crate) fn db_config(
543        suffix: &str,
544    ) -> Config<TwoCap, (commonware_codec::RangeCfg<usize>, ())> {
545        Config {
546            mmr_journal_partition: format!("journal_{suffix}"),
547            mmr_metadata_partition: format!("metadata_{suffix}"),
548            mmr_items_per_blob: NZU64!(11),
549            mmr_write_buffer: NZUsize!(1024),
550            log_partition: format!("log_{suffix}"),
551            log_items_per_section: NZU64!(ITEMS_PER_SECTION),
552            log_compression: None,
553            log_codec_config: ((0..=10000).into(), ()),
554            log_write_buffer: NZUsize!(1024),
555            translator: TwoCap,
556            thread_pool: None,
557            page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
558        }
559    }
560
561    /// Return an [Immutable] database initialized with a fixed config.
562    async fn open_db(
563        context: deterministic::Context,
564    ) -> Immutable<deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap> {
565        Immutable::init(context, db_config("partition"))
566            .await
567            .unwrap()
568    }
569
570    #[test_traced("WARN")]
571    pub fn test_immutable_db_empty() {
572        let executor = deterministic::Runner::default();
573        executor.start(|context| async move {
574            let db = open_db(context.with_label("first")).await;
575            assert_eq!(db.bounds().end, 1);
576            assert_eq!(db.journal.bounds().start, Location::new_unchecked(0));
577            assert!(db.get_metadata().await.unwrap().is_none());
578
579            // Make sure closing/reopening gets us back to the same state, even after adding an uncommitted op.
580            let k1 = Sha256::fill(1u8);
581            let v1 = vec![4, 5, 6, 7];
582            let root = db.root();
583            let mut db = db.into_mutable();
584            db.set(k1, v1).await.unwrap();
585            drop(db); // Simulate failed commit
586            let db = open_db(context.with_label("second")).await;
587            assert_eq!(db.root(), root);
588            assert_eq!(db.bounds().end, 1);
589
590            // Test calling commit on an empty db which should make it (durably) non-empty.
591            let db = db.into_mutable();
592            let (durable_db, _) = db.commit(None).await.unwrap();
593            let db = durable_db.into_merkleized();
594            assert_eq!(db.bounds().end, 2); // commit op added
595            let root = db.root();
596            drop(db);
597
598            let db = open_db(context.with_label("third")).await;
599            assert_eq!(db.root(), root);
600
601            db.destroy().await.unwrap();
602        });
603    }
604
605    #[test_traced("DEBUG")]
606    pub fn test_immutable_db_build_basic() {
607        let executor = deterministic::Runner::default();
608        executor.start(|context| async move {
609            // Build a db with 2 keys.
610            let db = open_db(context.with_label("first")).await;
611
612            let k1 = Sha256::fill(1u8);
613            let k2 = Sha256::fill(2u8);
614            let v1 = vec![1, 2, 3];
615            let v2 = vec![4, 5, 6, 7, 8];
616
617            assert!(db.get(&k1).await.unwrap().is_none());
618            assert!(db.get(&k2).await.unwrap().is_none());
619
620            // Set the first key.
621            let mut db = db.into_mutable();
622            db.set(k1, v1.clone()).await.unwrap();
623            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
624            assert!(db.get(&k2).await.unwrap().is_none());
625            assert_eq!(db.bounds().end, 2);
626            // Commit the first key.
627            let metadata = Some(vec![99, 100]);
628            let (durable_db, _) = db.commit(metadata.clone()).await.unwrap();
629            let db = durable_db.into_merkleized();
630            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
631            assert!(db.get(&k2).await.unwrap().is_none());
632            assert_eq!(db.bounds().end, 3);
633            assert_eq!(db.get_metadata().await.unwrap(), metadata.clone());
634            // Set the second key.
635            let mut db = db.into_mutable();
636            db.set(k2, v2.clone()).await.unwrap();
637            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
638            assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
639            assert_eq!(db.bounds().end, 4);
640
641            // Make sure we can still get metadata.
642            assert_eq!(db.get_metadata().await.unwrap(), metadata);
643
644            // Commit the second key.
645            let (durable_db, _) = db.commit(None).await.unwrap();
646            let db = durable_db.into_merkleized();
647            assert_eq!(db.bounds().end, 5);
648            assert_eq!(db.get_metadata().await.unwrap(), None);
649
650            // Capture state.
651            let root = db.root();
652
653            // Add an uncommitted op then simulate failure.
654            let k3 = Sha256::fill(3u8);
655            let v3 = vec![9, 10, 11];
656            let mut db = db.into_mutable();
657            db.set(k3, v3).await.unwrap();
658            assert_eq!(db.bounds().end, 6);
659
660            // Reopen, make sure state is restored to last commit point.
661            drop(db); // Simulate failed commit
662            let db = open_db(context.with_label("second")).await;
663            assert!(db.get(&k3).await.unwrap().is_none());
664            assert_eq!(db.bounds().end, 5);
665            assert_eq!(db.root(), root);
666            assert_eq!(db.get_metadata().await.unwrap(), None);
667
668            // Cleanup.
669            db.destroy().await.unwrap();
670        });
671    }
672
673    #[test_traced("WARN")]
674    pub fn test_immutable_db_build_and_authenticate() {
675        let executor = deterministic::Runner::default();
676        // Build a db with `ELEMENTS` key/value pairs and prove ranges over them.
677        const ELEMENTS: u64 = 2_000;
678        executor.start(|context| async move {
679            let mut hasher = StandardHasher::<Sha256>::new();
680            let db = open_db(context.with_label("first")).await;
681            let mut db = db.into_mutable();
682
683            for i in 0u64..ELEMENTS {
684                let k = Sha256::hash(&i.to_be_bytes());
685                let v = vec![i as u8; 100];
686                db.set(k, v).await.unwrap();
687            }
688
689            assert_eq!(db.bounds().end, ELEMENTS + 1);
690
691            let (durable_db, _) = db.commit(None).await.unwrap();
692            let db = durable_db.into_merkleized();
693            assert_eq!(db.bounds().end, ELEMENTS + 2);
694
695            // Drop & reopen the db, making sure it has exactly the same state.
696            let root = db.root();
697            drop(db);
698
699            let db = open_db(context.with_label("second")).await;
700            assert_eq!(root, db.root());
701            assert_eq!(db.bounds().end, ELEMENTS + 2);
702            for i in 0u64..ELEMENTS {
703                let k = Sha256::hash(&i.to_be_bytes());
704                let v = vec![i as u8; 100];
705                assert_eq!(db.get(&k).await.unwrap().unwrap(), v);
706            }
707
708            // Make sure all ranges of 5 operations are provable, including truncated ranges at the
709            // end.
710            let max_ops = NZU64!(5);
711            for i in 0..*db.bounds().end {
712                let (proof, log) = db.proof(Location::new_unchecked(i), max_ops).await.unwrap();
713                assert!(verify_proof(
714                    &mut hasher,
715                    &proof,
716                    Location::new_unchecked(i),
717                    &log,
718                    &root
719                ));
720            }
721
722            db.destroy().await.unwrap();
723        });
724    }
725
726    #[test_traced("WARN")]
727    pub fn test_immutable_db_recovery_from_failed_mmr_sync() {
728        let executor = deterministic::Runner::default();
729        executor.start(|context| async move {
730            // Insert 1000 keys then sync.
731            const ELEMENTS: u64 = 1000;
732            let db = open_db(context.with_label("first")).await;
733            let mut db = db.into_mutable();
734
735            for i in 0u64..ELEMENTS {
736                let k = Sha256::hash(&i.to_be_bytes());
737                let v = vec![i as u8; 100];
738                db.set(k, v).await.unwrap();
739            }
740
741            assert_eq!(db.bounds().end, ELEMENTS + 1);
742            let (durable_db, _) = db.commit(None).await.unwrap();
743            let mut db = durable_db.into_merkleized();
744            db.sync().await.unwrap();
745            let halfway_root = db.root();
746
747            // Insert another 1000 keys then simulate a failed close and test recovery.
748            let mut db = db.into_mutable();
749            for i in 0u64..ELEMENTS {
750                let k = Sha256::hash(&i.to_be_bytes());
751                let v = vec![i as u8; 100];
752                db.set(k, v).await.unwrap();
753            }
754
755            // Commit without merkleizing the MMR, then drop to simulate failure.
756            // The commit persists the data to the journal, but the MMR is not synced.
757            let (durable_db, _) = db.commit(None).await.unwrap();
758            drop(durable_db); // Drop before merkleizing
759
760            // Recovery should replay the log to regenerate the MMR.
761            // op_count = 1002 (first batch + commit) + 1000 (second batch) + 1 (second commit) = 2003
762            let db = open_db(context.with_label("second")).await;
763            assert_eq!(db.bounds().end, 2003);
764            let root = db.root();
765            assert_ne!(root, halfway_root);
766
767            // Drop & reopen could preserve the final commit.
768            drop(db);
769            let db = open_db(context.with_label("third")).await;
770            assert_eq!(db.bounds().end, 2003);
771            assert_eq!(db.root(), root);
772
773            db.destroy().await.unwrap();
774        });
775    }
776
777    #[test_traced("WARN")]
778    pub fn test_immutable_db_recovery_from_failed_log_sync() {
779        let executor = deterministic::Runner::default();
780        executor.start(|context| async move {
781            let mut db = open_db(context.with_label("first")).await.into_mutable();
782
783            // Insert a single key and then commit to create a first commit point.
784            let k1 = Sha256::fill(1u8);
785            let v1 = vec![1, 2, 3];
786            db.set(k1, v1).await.unwrap();
787            let (durable_db, _) = db.commit(None).await.unwrap();
788            let db = durable_db.into_merkleized();
789            let first_commit_root = db.root();
790
791            // Insert 1000 keys then sync.
792            const ELEMENTS: u64 = 1000;
793
794            let mut db = db.into_mutable();
795            for i in 0u64..ELEMENTS {
796                let k = Sha256::hash(&i.to_be_bytes());
797                let v = vec![i as u8; 100];
798                db.set(k, v).await.unwrap();
799            }
800
801            assert_eq!(db.bounds().end, ELEMENTS + 3);
802
803            // Insert another 1000 keys then simulate a failed close and test recovery.
804            for i in 0u64..ELEMENTS {
805                let k = Sha256::hash(&i.to_be_bytes());
806                let v = vec![i as u8; 100];
807                db.set(k, v).await.unwrap();
808            }
809
810            // Simulate failure.
811            drop(db);
812
813            // Recovery should back up to previous commit point.
814            let db = open_db(context.with_label("second")).await;
815            assert_eq!(db.bounds().end, 3);
816            let root = db.root();
817            assert_eq!(root, first_commit_root);
818
819            db.destroy().await.unwrap();
820        });
821    }
822
823    #[test_traced("WARN")]
824    pub fn test_immutable_db_pruning() {
825        let executor = deterministic::Runner::default();
826        // Build a db with `ELEMENTS` key/value pairs then prune some of them.
827        const ELEMENTS: u64 = 2_000;
828        executor.start(|context| async move {
829            let db = open_db(context.with_label("first")).await;
830            let mut db = db.into_mutable();
831
832            for i in 1u64..ELEMENTS+1 {
833                let k = Sha256::hash(&i.to_be_bytes());
834                let v = vec![i as u8; 100];
835                db.set(k, v).await.unwrap();
836            }
837
838            assert_eq!(db.bounds().end, ELEMENTS + 1);
839
840            let (durable_db, _) = db.commit(None).await.unwrap();
841            let mut db = durable_db.into_merkleized();
842            assert_eq!(db.bounds().end, ELEMENTS + 2);
843
844            // Prune the db to the first half of the operations.
845            db.prune(Location::new_unchecked((ELEMENTS+2) / 2))
846                .await
847                .unwrap();
848            assert_eq!(db.bounds().end, ELEMENTS + 2);
849
850            // items_per_section is 5, so half should be exactly at a blob boundary, in which case
851            // the actual pruning location should match the requested.
852            let oldest_retained_loc = db.journal.bounds().start;
853            assert_eq!(oldest_retained_loc, Location::new_unchecked(ELEMENTS / 2));
854
855            // Try to fetch a pruned key.
856            let pruned_loc = oldest_retained_loc - 1;
857            let pruned_key = Sha256::hash(&pruned_loc.to_be_bytes());
858            assert!(db.get(&pruned_key).await.unwrap().is_none());
859
860            // Try to fetch unpruned key.
861            let unpruned_key = Sha256::hash(&oldest_retained_loc.to_be_bytes());
862            assert!(db.get(&unpruned_key).await.unwrap().is_some());
863
864            // Drop & reopen the db, making sure it has exactly the same state.
865            let root = db.root();
866            db.sync().await.unwrap();
867            drop(db);
868
869            let mut db = open_db(context.with_label("second")).await;
870            assert_eq!(root, db.root());
871            assert_eq!(db.bounds().end, ELEMENTS + 2);
872            let oldest_retained_loc = db.journal.bounds().start;
873            assert_eq!(oldest_retained_loc, Location::new_unchecked(ELEMENTS / 2));
874
875            // Prune to a non-blob boundary.
876            let loc = Location::new_unchecked(ELEMENTS / 2 + (ITEMS_PER_SECTION * 2 - 1));
877            db.prune(loc).await.unwrap();
878            // Actual boundary should be a multiple of 5.
879            let oldest_retained_loc = db.journal.bounds().start;
880            assert_eq!(
881                oldest_retained_loc,
882                Location::new_unchecked(ELEMENTS / 2 + ITEMS_PER_SECTION)
883            );
884
885            // Confirm boundary persists across restart.
886            db.sync().await.unwrap();
887            drop(db);
888            let db = open_db(context.with_label("third")).await;
889            let oldest_retained_loc = db.journal.bounds().start;
890            assert_eq!(
891                oldest_retained_loc,
892                Location::new_unchecked(ELEMENTS / 2 + ITEMS_PER_SECTION)
893            );
894
895            // Try to fetch a pruned key.
896            let pruned_loc = oldest_retained_loc - 3;
897            let pruned_key = Sha256::hash(&pruned_loc.to_be_bytes());
898            assert!(db.get(&pruned_key).await.unwrap().is_none());
899
900            // Try to fetch unpruned key.
901            let unpruned_key = Sha256::hash(&oldest_retained_loc.to_be_bytes());
902            assert!(db.get(&unpruned_key).await.unwrap().is_some());
903
904            // Confirm behavior of trying to create a proof of pruned items is as expected.
905            let pruned_pos = ELEMENTS / 2;
906            let proof_result = db
907                .proof(
908                    Location::new_unchecked(pruned_pos),
909                    NZU64!(pruned_pos + 100),
910                )
911                .await;
912            assert!(matches!(proof_result, Err(Error::Journal(crate::journal::Error::ItemPruned(pos))) if pos == pruned_pos));
913
914            db.destroy().await.unwrap();
915        });
916    }
917
918    #[test_traced("INFO")]
919    pub fn test_immutable_db_prune_beyond_commit() {
920        let executor = deterministic::Runner::default();
921        executor.start(|context| async move {
922            let mut db = open_db(context.with_label("test")).await;
923
924            // Test pruning empty database (no commits)
925            let result = db.prune(Location::new_unchecked(1)).await;
926            assert!(
927                matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
928                    if prune_loc == Location::new_unchecked(1) && commit_loc == Location::new_unchecked(0))
929            );
930
931            // Add key-value pairs and commit
932            let k1 = Digest::from(*b"12345678901234567890123456789012");
933            let k2 = Digest::from(*b"abcdefghijklmnopqrstuvwxyz123456");
934            let k3 = Digest::from(*b"99999999999999999999999999999999");
935            let v1 = vec![1u8; 16];
936            let v2 = vec![2u8; 16];
937            let v3 = vec![3u8; 16];
938
939            let mut db = db.into_mutable();
940            db.set(k1, v1.clone()).await.unwrap();
941            db.set(k2, v2.clone()).await.unwrap();
942            let (durable_db, _) = db.commit(None).await.unwrap();
943            let db = durable_db.into_merkleized();
944            let mut db = db.into_mutable();
945            db.set(k3, v3.clone()).await.unwrap();
946
947            // op_count is 5 (initial_commit, k1, k2, commit, k3), last_commit is at location 3
948            assert_eq!(*db.last_commit_loc, 3);
949
950            // Test valid prune (at last commit) - need Merkleized state for prune
951            let (durable_db, _) = db.commit(None).await.unwrap();
952            let mut db = durable_db.into_merkleized();
953            assert!(db.prune(Location::new_unchecked(3)).await.is_ok());
954
955            // Test pruning beyond last commit
956            let new_last_commit = db.last_commit_loc;
957            let beyond = new_last_commit + 1;
958            let result = db.prune(beyond).await;
959            assert!(
960                matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
961                    if prune_loc == beyond && commit_loc == new_last_commit)
962            );
963
964            db.destroy().await.unwrap();
965        });
966    }
967
968    use crate::{
969        kv::tests::{assert_gettable, assert_send},
970        qmdb::store::tests::{assert_log_store, assert_merkleized_store, assert_prunable_store},
971    };
972
973    type MerkleizedDb =
974        Immutable<deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap, Merkleized<Sha256>>;
975    type MutableDb = Immutable<
976        deterministic::Context,
977        Digest,
978        Vec<u8>,
979        Sha256,
980        TwoCap,
981        Unmerkleized,
982        NonDurable,
983    >;
984
985    #[allow(dead_code)]
986    fn assert_merkleized_db_futures_are_send(db: &mut MerkleizedDb, key: Digest, loc: Location) {
987        assert_gettable(db, &key);
988        assert_log_store(db);
989        assert_prunable_store(db, loc);
990        assert_merkleized_store(db, loc);
991        assert_send(db.sync());
992    }
993
994    #[allow(dead_code)]
995    fn assert_mutable_db_futures_are_send(db: &mut MutableDb, key: Digest, value: Vec<u8>) {
996        assert_gettable(db, &key);
997        assert_log_store(db);
998        assert_send(db.set(key, value));
999    }
1000
1001    #[allow(dead_code)]
1002    fn assert_mutable_db_commit_is_send(db: MutableDb) {
1003        assert_send(db.commit(None));
1004    }
1005}