commonware_storage/qmdb/current/unordered/
fixed.rs

1//! An _unordered_ variant of a [crate::qmdb::current] authenticated database optimized for
2//! fixed-size values.
3//!
4//! This variant does not maintain key ordering, so it cannot generate exclusion proofs. Use
5//! [super::super::ordered::fixed] if exclusion proofs are required.
6//!
7//! See [Db] for the main database type.
8
9use crate::{
10    bitmap::{CleanBitMap, DirtyBitMap},
11    mmr::{
12        mem::{Clean, Dirty, State},
13        Location, Proof, StandardHasher,
14    },
15    qmdb::{
16        any::{
17            unordered::{
18                fixed::{Db as AnyDb, Operation},
19                Update,
20            },
21            CleanAny, DirtyAny, FixedValue,
22        },
23        current::{
24            merkleize_grafted_bitmap,
25            proof::{OperationProof, RangeProof},
26            root, FixedConfig as Config,
27        },
28        store::{Batchable, CleanStore, DirtyStore, LogStore},
29        Error,
30    },
31    translator::Translator,
32    AuthenticatedBitMap as BitMap,
33};
34use commonware_codec::FixedSize;
35use commonware_cryptography::{DigestOf, Hasher};
36use commonware_runtime::{Clock, Metrics, Storage as RStorage};
37use commonware_utils::Array;
38use core::ops::Range;
39use std::num::NonZeroU64;
40
41/// Proof information for verifying a key has a particular value in the database.
42pub type KeyValueProof<D, const N: usize> = OperationProof<D, N>;
43
44/// A key-value QMDB based on an MMR over its log of operations, supporting authentication of
45/// whether a key ever had a specific value, and whether the key currently has that value.
46///
47/// Note: The generic parameter N is not really generic, and must be manually set to double the size
48/// of the hash digest being produced by the hasher. A compile-time assertion is used to prevent any
49/// other setting.
50pub struct Db<
51    E: RStorage + Clock + Metrics,
52    K: Array,
53    V: FixedValue,
54    H: Hasher,
55    T: Translator,
56    const N: usize,
57    S: State<DigestOf<H>> = Clean<DigestOf<H>>,
58> {
59    /// An authenticated database that provides the ability to prove whether a key ever had a
60    /// specific value.
61    any: AnyDb<E, K, V, H, T, S>,
62
63    /// The bitmap over the activity status of each operation. Supports augmenting [Db] proofs in
64    /// order to further prove whether a key _currently_ has a specific value.
65    status: BitMap<H::Digest, N, S>,
66
67    context: E,
68
69    bitmap_metadata_partition: String,
70
71    /// Cached root digest. Invariant: valid when in Clean state.
72    cached_root: Option<H::Digest>,
73}
74
75impl<
76        E: RStorage + Clock + Metrics,
77        K: Array,
78        V: FixedValue,
79        H: Hasher,
80        T: Translator,
81        const N: usize,
82        S: State<DigestOf<H>>,
83    > Db<E, K, V, H, T, N, S>
84{
85    /// The number of operations that have been applied to this db, including those that have been
86    /// pruned and those that are not yet committed.
87    pub fn op_count(&self) -> Location {
88        self.any.op_count()
89    }
90
91    /// Return the inactivity floor location. This is the location before which all operations are
92    /// known to be inactive. Operations before this point can be safely pruned.
93    pub const fn inactivity_floor_loc(&self) -> Location {
94        self.any.inactivity_floor_loc()
95    }
96
97    /// Get the value of `key` in the db, or None if it has no value.
98    pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
99        self.any.get(key).await
100    }
101
102    /// Get the metadata associated with the last commit.
103    pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
104        self.any.get_metadata().await
105    }
106
107    /// Get the level of the base MMR into which we are grafting.
108    ///
109    /// This value is log2 of the chunk size in bits. Since we assume the chunk size is a power of
110    /// 2, we compute this from trailing_zeros.
111    const fn grafting_height() -> u32 {
112        CleanBitMap::<H::Digest, N>::CHUNK_SIZE_BITS.trailing_zeros()
113    }
114
115    /// Return true if the proof authenticates that `key` currently has value `value` in the db with
116    /// the provided `root`.
117    pub fn verify_key_value_proof(
118        hasher: &mut H,
119        key: K,
120        value: V,
121        proof: &KeyValueProof<H::Digest, N>,
122        root: &H::Digest,
123    ) -> bool {
124        let op = Operation::Update(Update(key, value));
125
126        proof.verify(hasher, Self::grafting_height(), op, root)
127    }
128
129    /// Return true if the given sequence of `ops` were applied starting at location `start_loc` in
130    /// the log with the provided root.
131    pub fn verify_range_proof(
132        hasher: &mut H,
133        proof: &RangeProof<H::Digest>,
134        start_loc: Location,
135        ops: &[Operation<K, V>],
136        chunks: &[[u8; N]],
137        root: &H::Digest,
138    ) -> bool {
139        let height = Self::grafting_height();
140
141        proof.verify(hasher, height, start_loc, ops, chunks, root)
142    }
143}
144
145impl<
146        E: RStorage + Clock + Metrics,
147        K: Array,
148        V: FixedValue,
149        H: Hasher,
150        T: Translator,
151        const N: usize,
152    > Db<E, K, V, H, T, N>
153{
154    /// Initializes a [Db] authenticated database from the given `config`. Leverages parallel
155    /// Merkleization to initialize the bitmap MMR if a thread pool is provided.
156    pub async fn init(context: E, config: Config<T>) -> Result<Self, Error> {
157        // TODO: Re-evaluate assertion placement after `generic_const_exprs` is stable.
158        const {
159            // A compile-time assertion that the chunk size is some multiple of digest size. A
160            // multiple of 1 is optimal with respect to proof size, but a higher multiple allows for
161            // a smaller (RAM resident) merkle tree over the structure.
162            assert!(
163                N.is_multiple_of(H::Digest::SIZE),
164                "chunk size must be some multiple of the digest size",
165            );
166            // A compile-time assertion that chunk size is a power of 2, which is necessary to allow
167            // the status bitmap tree to be aligned with the underlying operations MMR.
168            assert!(N.is_power_of_two(), "chunk size must be a power of 2");
169        }
170
171        let thread_pool = config.thread_pool.clone();
172        let bitmap_metadata_partition = config.bitmap_metadata_partition.clone();
173
174        let mut hasher = StandardHasher::<H>::new();
175        let mut status = CleanBitMap::restore_pruned(
176            context.with_label("bitmap"),
177            &bitmap_metadata_partition,
178            thread_pool,
179            &mut hasher,
180        )
181        .await?
182        .into_dirty();
183
184        // Initialize the anydb with a callback that initializes the status bitmap.
185        let last_known_inactivity_floor = Location::new_unchecked(status.len());
186        let any = AnyDb::init_with_callback(
187            context.with_label("any"),
188            config.to_any_config(),
189            Some(last_known_inactivity_floor),
190            |append: bool, loc: Option<Location>| {
191                status.push(append);
192                if let Some(loc) = loc {
193                    status.set_bit(*loc, false);
194                }
195            },
196        )
197        .await?;
198
199        let height = Self::grafting_height();
200        let status = merkleize_grafted_bitmap(&mut hasher, status, &any.log.mmr, height).await?;
201
202        // Compute and cache the root
203        let cached_root = Some(root(&mut hasher, height, &status, &any.log.mmr).await?);
204
205        Ok(Self {
206            any,
207            status,
208            context,
209            bitmap_metadata_partition,
210            cached_root,
211        })
212    }
213
214    /// Return the cached root of the db.
215    pub const fn root(&self) -> H::Digest {
216        self.cached_root.expect("Clean state must have cached root")
217    }
218
219    /// Returns a proof that the specified range of operations are part of the database, along with
220    /// the operations from the range. A truncated range (from hitting the max) can be detected by
221    /// looking at the length of the returned operations vector. Also returns the bitmap chunks
222    /// required to verify the proof.
223    ///
224    /// # Errors
225    ///
226    /// Returns [crate::mmr::Error::LocationOverflow] if `start_loc` > [crate::mmr::MAX_LOCATION].
227    /// Returns [crate::mmr::Error::RangeOutOfBounds] if `start_loc` >= number of leaves in the MMR.
228    pub async fn range_proof(
229        &self,
230        hasher: &mut H,
231        start_loc: Location,
232        max_ops: NonZeroU64,
233    ) -> Result<(RangeProof<H::Digest>, Vec<Operation<K, V>>, Vec<[u8; N]>), Error> {
234        RangeProof::<H::Digest>::new_with_ops(
235            hasher,
236            &self.status,
237            Self::grafting_height(),
238            &self.any.log.mmr,
239            &self.any.log,
240            start_loc,
241            max_ops,
242        )
243        .await
244    }
245
246    /// Generate and return a proof of the current value of `key`, along with the other
247    /// [KeyValueProof] required to verify the proof. Returns KeyNotFound error if the key is not
248    /// currently assigned any value.
249    ///
250    /// # Errors
251    ///
252    /// Returns [Error::KeyNotFound] if the key is not currently assigned any value.
253    pub async fn key_value_proof(
254        &self,
255        hasher: &mut H,
256        key: K,
257    ) -> Result<KeyValueProof<H::Digest, N>, Error> {
258        let op_loc = self.any.get_with_loc(&key).await?;
259        let Some((_, loc)) = op_loc else {
260            return Err(Error::KeyNotFound);
261        };
262        let height = Self::grafting_height();
263        let mmr = &self.any.log.mmr;
264
265        OperationProof::<H::Digest, N>::new(hasher, &self.status, height, mmr, loc).await
266    }
267
268    #[cfg(test)]
269    /// Simulate a crash that prevents any data from being written to disk, which involves simply
270    /// consuming the db before it can be cleanly closed.
271    fn simulate_commit_failure_before_any_writes(self) {
272        // Don't successfully complete any of the commit operations.
273    }
274
275    #[cfg(test)]
276    /// Simulate a crash that happens during commit and prevents the any db from being pruned of
277    /// inactive operations, and bitmap state from being written/pruned.
278    async fn simulate_commit_failure_after_any_db_commit(mut self) -> Result<(), Error> {
279        // Only successfully complete the log write part of the commit process.
280        let _ = self.commit_to_log(None).await?;
281        Ok(())
282    }
283
284    /// Helper that performs the commit operations up to and including writing to the log,
285    /// but does not merkleize the bitmap or prune. Used for simulating partial commit failures
286    /// in tests, and as the first phase of the full commit operation.
287    ///
288    /// Returns the dirty bitmap that needs to be merkleized and pruned.
289    async fn commit_to_log(
290        &mut self,
291        metadata: Option<V>,
292    ) -> Result<DirtyBitMap<H::Digest, N>, Error> {
293        let empty_status = CleanBitMap::<H::Digest, N>::new(&mut self.any.log.hasher, None);
294        let mut status = std::mem::replace(&mut self.status, empty_status).into_dirty();
295
296        // Inactivate the current commit operation.
297        status.set_bit(*self.any.last_commit_loc, false);
298
299        // Raise the inactivity floor by taking `self.steps` steps, plus 1 to account for the
300        // previous commit becoming inactive.
301        let inactivity_floor_loc = self.any.raise_floor_with_bitmap(&mut status).await?;
302
303        // Append the commit operation with the new floor and tag it as active in the bitmap.
304        status.push(true);
305        let commit_op = Operation::CommitFloor(metadata, inactivity_floor_loc);
306
307        self.any.apply_commit_op(commit_op).await?;
308
309        Ok(status)
310    }
311
312    /// Commit any pending operations to the database, ensuring their durability upon return from
313    /// this function. Also raises the inactivity floor according to the schedule. Returns the
314    /// `(start_loc, end_loc]` location range of committed operations.
315    pub async fn commit(&mut self, metadata: Option<V>) -> Result<Range<Location>, Error> {
316        let start_loc = self.any.last_commit_loc + 1;
317
318        // Phase 1: Commit to log (recovery is ensured after this returns)
319        let status = self.commit_to_log(metadata).await?;
320
321        // Phase 2: Merkleize the new bitmap entries.
322        let mmr = &self.any.log.mmr;
323        let height = Self::grafting_height();
324        self.status =
325            merkleize_grafted_bitmap(&mut self.any.log.hasher, status, mmr, height).await?;
326
327        // Phase 3: Prune bits that are no longer needed because they precede the inactivity floor.
328        self.status.prune_to_bit(*self.any.inactivity_floor_loc())?;
329
330        // Phase 4: Refresh cached root after commit
331        self.cached_root = Some(root(&mut self.any.log.hasher, height, &self.status, mmr).await?);
332
333        Ok(start_loc..self.op_count())
334    }
335
336    /// Sync all database state to disk.
337    pub async fn sync(&mut self) -> Result<(), Error> {
338        self.any.sync().await?;
339
340        // Write the bitmap pruning boundary to disk so that next startup doesn't have to
341        // re-Merkleize the inactive portion up to the inactivity floor.
342        self.status
343            .write_pruned(
344                self.context.with_label("bitmap"),
345                &self.bitmap_metadata_partition,
346            )
347            .await
348            .map_err(Into::into)
349    }
350
351    /// Prune historical operations prior to `prune_loc`. This does not affect the db's root
352    /// or current snapshot.
353    pub async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> {
354        // Write the pruned portion of the bitmap to disk *first* to ensure recovery in case of
355        // failure during pruning. If we don't do this, we may not be able to recover the bitmap
356        // because it may require replaying of pruned operations.
357        self.status
358            .write_pruned(
359                self.context.with_label("bitmap"),
360                &self.bitmap_metadata_partition,
361            )
362            .await?;
363
364        self.any.prune(prune_loc).await
365    }
366
367    /// Close the db. Operations that have not been committed will be lost or rolled back on
368    /// restart.
369    pub async fn close(self) -> Result<(), Error> {
370        self.any.close().await
371    }
372
373    /// Destroy the db, removing all data from disk.
374    pub async fn destroy(self) -> Result<(), Error> {
375        // Clean up bitmap metadata partition.
376        CleanBitMap::<H::Digest, N>::destroy(self.context, &self.bitmap_metadata_partition).await?;
377
378        // Clean up Any components (MMR and log).
379        self.any.destroy().await
380    }
381
382    /// Convert this clean database into its dirty counterpart for performing mutations.
383    pub fn into_dirty(self) -> Db<E, K, V, H, T, N, Dirty> {
384        Db {
385            any: self.any.into_dirty(),
386            status: self.status.into_dirty(),
387            context: self.context,
388            bitmap_metadata_partition: self.bitmap_metadata_partition,
389            cached_root: None,
390        }
391    }
392}
393
394impl<
395        E: RStorage + Clock + Metrics,
396        K: Array,
397        V: FixedValue,
398        H: Hasher,
399        T: Translator,
400        const N: usize,
401    > Db<E, K, V, H, T, N, Dirty>
402{
403    /// Updates `key` to have value `value`. The operation is reflected in the snapshot, but will be
404    /// subject to rollback until the next successful `commit`.
405    pub async fn update(&mut self, key: K, value: V) -> Result<(), Error> {
406        if let Some(old_loc) = self.any.update_key(key, value).await? {
407            self.status.set_bit(*old_loc, false);
408        }
409        self.status.push(true);
410
411        Ok(())
412    }
413
414    /// Creates a new key-value pair in the db. The operation is reflected in the snapshot, but will
415    /// be subject to rollback until the next successful `commit`. Returns true if the key was
416    /// created, false if it already existed.
417    pub async fn create(&mut self, key: K, value: V) -> Result<bool, Error> {
418        if !self.any.create(key, value).await? {
419            return Ok(false);
420        }
421        self.status.push(true);
422
423        Ok(true)
424    }
425
426    /// Delete `key` and its value from the db. Deleting a key that already has no value is a no-op.
427    /// The operation is reflected in the snapshot, but will be subject to rollback until the next
428    /// successful `commit`. Returns true if the key was deleted, false if it was already inactive.
429    pub async fn delete(&mut self, key: K) -> Result<bool, Error> {
430        let Some(loc) = self.any.delete_key(key).await? else {
431            return Ok(false);
432        };
433
434        self.status.push(false);
435        self.status.set_bit(*loc, false);
436
437        Ok(true)
438    }
439
440    /// Merkleize the bitmap and convert this dirty database into its clean counterpart.
441    /// This computes the Merkle tree over any new bitmap entries but does NOT persist
442    /// changes to storage. Use `commit()` for durable state transitions.
443    pub async fn merkleize(self) -> Result<Db<E, K, V, H, T, N, Clean<DigestOf<H>>>, Error> {
444        // First merkleize the any to get a Clean MMR
445        let clean_any = self.any.merkleize();
446
447        // Now use the clean MMR for bitmap merkleization
448        let mut hasher = StandardHasher::<H>::new();
449        let height = Self::grafting_height();
450        let status =
451            merkleize_grafted_bitmap(&mut hasher, self.status, &clean_any.log.mmr, height).await?;
452
453        // Compute and cache the root
454        let cached_root = Some(root(&mut hasher, height, &status, &clean_any.log.mmr).await?);
455
456        Ok(Db {
457            any: clean_any,
458            status,
459            context: self.context,
460            bitmap_metadata_partition: self.bitmap_metadata_partition,
461            cached_root,
462        })
463    }
464}
465
466impl<
467        E: RStorage + Clock + Metrics,
468        K: Array,
469        V: FixedValue,
470        H: Hasher,
471        T: Translator,
472        const N: usize,
473    > crate::qmdb::store::LogStorePrunable for Db<E, K, V, H, T, N>
474{
475    async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> {
476        self.prune(prune_loc).await
477    }
478}
479
480impl<
481        E: RStorage + Clock + Metrics,
482        K: Array,
483        V: FixedValue,
484        H: Hasher,
485        T: Translator,
486        const N: usize,
487        S: State<DigestOf<H>>,
488    > LogStore for Db<E, K, V, H, T, N, S>
489{
490    type Value = V;
491
492    fn op_count(&self) -> Location {
493        self.op_count()
494    }
495
496    fn inactivity_floor_loc(&self) -> Location {
497        self.inactivity_floor_loc()
498    }
499
500    async fn get_metadata(&self) -> Result<Option<V>, Error> {
501        self.get_metadata().await
502    }
503
504    fn is_empty(&self) -> bool {
505        self.any.is_empty()
506    }
507}
508
509impl<
510        E: RStorage + Clock + Metrics,
511        K: Array,
512        V: FixedValue,
513        H: Hasher,
514        T: Translator,
515        const N: usize,
516        S: State<DigestOf<H>>,
517    > crate::store::Store for Db<E, K, V, H, T, N, S>
518{
519    type Key = K;
520    type Value = V;
521    type Error = Error;
522
523    async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
524        self.get(key).await
525    }
526}
527
528impl<
529        E: RStorage + Clock + Metrics,
530        K: Array,
531        V: FixedValue,
532        H: Hasher,
533        T: Translator,
534        const N: usize,
535    > crate::store::StoreMut for Db<E, K, V, H, T, N, Dirty>
536{
537    async fn update(&mut self, key: Self::Key, value: Self::Value) -> Result<(), Self::Error> {
538        self.update(key, value).await
539    }
540}
541
542impl<
543        E: RStorage + Clock + Metrics,
544        K: Array,
545        V: FixedValue,
546        H: Hasher,
547        T: Translator,
548        const N: usize,
549    > crate::store::StoreDeletable for Db<E, K, V, H, T, N, Dirty>
550{
551    async fn delete(&mut self, key: Self::Key) -> Result<bool, Self::Error> {
552        self.delete(key).await
553    }
554}
555
556impl<
557        E: RStorage + Clock + Metrics,
558        K: Array,
559        V: FixedValue,
560        H: Hasher,
561        T: Translator,
562        const N: usize,
563    > CleanStore for Db<E, K, V, H, T, N, Clean<DigestOf<H>>>
564{
565    type Digest = H::Digest;
566    type Operation = Operation<K, V>;
567    type Dirty = Db<E, K, V, H, T, N, Dirty>;
568
569    fn root(&self) -> Self::Digest {
570        self.root()
571    }
572
573    async fn proof(
574        &self,
575        start_loc: Location,
576        max_ops: NonZeroU64,
577    ) -> Result<(Proof<Self::Digest>, Vec<Self::Operation>), Error> {
578        self.any.proof(start_loc, max_ops).await
579    }
580
581    async fn historical_proof(
582        &self,
583        historical_size: Location,
584        start_loc: Location,
585        max_ops: NonZeroU64,
586    ) -> Result<(Proof<Self::Digest>, Vec<Self::Operation>), Error> {
587        self.any
588            .historical_proof(historical_size, start_loc, max_ops)
589            .await
590    }
591
592    fn into_dirty(self) -> Self::Dirty {
593        self.into_dirty()
594    }
595}
596
597impl<E, K, V, T, H, const N: usize> Batchable for Db<E, K, V, H, T, N, Dirty>
598where
599    E: RStorage + Clock + Metrics,
600    K: Array,
601    V: FixedValue,
602    T: Translator,
603    H: Hasher,
604{
605    async fn write_batch(
606        &mut self,
607        iter: impl Iterator<Item = (K, Option<V>)>,
608    ) -> Result<(), Error> {
609        let status = &mut self.status;
610        self.any
611            .write_batch_with_callback(iter, move |append: bool, loc: Option<Location>| {
612                status.push(append);
613                if let Some(loc) = loc {
614                    status.set_bit(*loc, false);
615                }
616            })
617            .await
618    }
619}
620
621impl<
622        E: RStorage + Clock + Metrics,
623        K: Array,
624        V: FixedValue,
625        H: Hasher,
626        T: Translator,
627        const N: usize,
628    > DirtyStore for Db<E, K, V, H, T, N, Dirty>
629{
630    type Digest = H::Digest;
631    type Operation = Operation<K, V>;
632    type Clean = Db<E, K, V, H, T, N, Clean<DigestOf<H>>>;
633
634    async fn merkleize(self) -> Result<Self::Clean, Error> {
635        self.merkleize().await
636    }
637}
638
639impl<
640        E: RStorage + Clock + Metrics,
641        K: Array,
642        V: FixedValue,
643        H: Hasher,
644        T: Translator,
645        const N: usize,
646    > CleanAny for Db<E, K, V, H, T, N, Clean<DigestOf<H>>>
647{
648    type Key = K;
649
650    async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Error> {
651        self.get(key).await
652    }
653
654    async fn commit(&mut self, metadata: Option<Self::Value>) -> Result<Range<Location>, Error> {
655        self.commit(metadata).await
656    }
657
658    async fn sync(&mut self) -> Result<(), Error> {
659        self.sync().await
660    }
661
662    async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> {
663        self.prune(prune_loc).await
664    }
665
666    async fn close(self) -> Result<(), Error> {
667        self.close().await
668    }
669
670    async fn destroy(self) -> Result<(), Error> {
671        self.destroy().await
672    }
673}
674
675impl<
676        E: RStorage + Clock + Metrics,
677        K: Array,
678        V: FixedValue,
679        H: Hasher,
680        T: Translator,
681        const N: usize,
682    > DirtyAny for Db<E, K, V, H, T, N, Dirty>
683{
684    type Key = K;
685
686    async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Error> {
687        self.get(key).await
688    }
689
690    async fn update(&mut self, key: Self::Key, value: Self::Value) -> Result<(), Error> {
691        self.update(key, value).await
692    }
693
694    async fn create(&mut self, key: Self::Key, value: Self::Value) -> Result<bool, Error> {
695        self.create(key, value).await
696    }
697
698    async fn delete(&mut self, key: Self::Key) -> Result<bool, Error> {
699        self.delete(key).await
700    }
701}
702
703#[cfg(test)]
704pub mod test {
705    use super::*;
706    use crate::{
707        index::Unordered as _,
708        mmr::hasher::Hasher as _,
709        qmdb::{any::AnyExt, store::batch_tests},
710        translator::TwoCap,
711    };
712    use commonware_cryptography::{sha256::Digest, Sha256};
713    use commonware_macros::test_traced;
714    use commonware_runtime::{buffer::PoolRef, deterministic, Runner as _};
715    use commonware_utils::{NZUsize, NZU64};
716    use rand::{rngs::StdRng, RngCore, SeedableRng};
717    use std::collections::HashMap;
718    use tracing::warn;
719
720    const PAGE_SIZE: usize = 88;
721    const PAGE_CACHE_SIZE: usize = 8;
722
723    fn current_db_config(partition_prefix: &str) -> Config<TwoCap> {
724        Config {
725            mmr_journal_partition: format!("{partition_prefix}_journal_partition"),
726            mmr_metadata_partition: format!("{partition_prefix}_metadata_partition"),
727            mmr_items_per_blob: NZU64!(11),
728            mmr_write_buffer: NZUsize!(1024),
729            log_journal_partition: format!("{partition_prefix}_partition_prefix"),
730            log_items_per_blob: NZU64!(7),
731            log_write_buffer: NZUsize!(1024),
732            bitmap_metadata_partition: format!("{partition_prefix}_bitmap_metadata_partition"),
733            translator: TwoCap,
734            thread_pool: None,
735            buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
736        }
737    }
738
739    /// A type alias for the concrete [Db] type used in these unit tests.
740    type CleanCurrentTest = Db<deterministic::Context, Digest, Digest, Sha256, TwoCap, 32>;
741
742    /// A type alias for the Dirty variant of CurrentTest.
743    type DirtyCurrentTest = Db<deterministic::Context, Digest, Digest, Sha256, TwoCap, 32, Dirty>;
744
745    /// Return an [Db] database initialized with a fixed config.
746    async fn open_db(context: deterministic::Context, partition_prefix: &str) -> CleanCurrentTest {
747        CleanCurrentTest::init(context, current_db_config(partition_prefix))
748            .await
749            .unwrap()
750    }
751
752    /// Build a small database, then close and reopen it and ensure state is preserved.
753    #[test_traced("DEBUG")]
754    pub fn test_current_db_build_small_close_reopen() {
755        let executor = deterministic::Runner::default();
756        executor.start(|context| async move {
757            let partition = "build_small";
758            let db = open_db(context.clone(), partition).await;
759            assert_eq!(db.op_count(), 1);
760            assert_eq!(db.inactivity_floor_loc(), Location::new_unchecked(0));
761            let root0 = db.root();
762            db.close().await.unwrap();
763            let db = open_db(context.clone(), partition).await;
764            assert_eq!(db.op_count(), 1);
765            assert!(db.get_metadata().await.unwrap().is_none());
766            assert_eq!(db.root(), root0);
767
768            // Add one key.
769            let k1 = Sha256::hash(&0u64.to_be_bytes());
770            let v1 = Sha256::hash(&10u64.to_be_bytes());
771            let mut db = db.into_dirty();
772            assert!(db.create(k1, v1).await.unwrap());
773            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
774            let mut db = db.merkleize().await.unwrap();
775            let range = db.commit(None).await.unwrap();
776            assert_eq!(range.start, 1);
777            assert_eq!(range.end, 4);
778            assert!(db.get_metadata().await.unwrap().is_none());
779            assert_eq!(db.op_count(), 4); // 1 update, 1 commit, 1 move + 1 initial commit.
780            let root1 = db.root();
781            assert!(root1 != root0);
782            db.close().await.unwrap();
783            let db = open_db(context.clone(), partition).await;
784            assert_eq!(db.op_count(), 4); // 1 update, 1 commit, 1 moves + 1 initial commit.
785            assert!(db.get_metadata().await.unwrap().is_none());
786            assert_eq!(db.root(), root1);
787
788            // Create of same key should fail.
789            let mut db = db.into_dirty();
790            assert!(!db.create(k1, v1).await.unwrap());
791
792            // Delete that one key.
793            assert!(db.delete(k1).await.unwrap());
794            let metadata = Sha256::hash(&1u64.to_be_bytes());
795            let mut db = db.merkleize().await.unwrap();
796            let range = db.commit(Some(metadata)).await.unwrap();
797            assert_eq!(range.start, 4);
798            assert_eq!(range.end, 6);
799
800            assert_eq!(db.op_count(), 6); // 1 update, 2 commits, 1 move, 1 delete.
801            assert_eq!(db.get_metadata().await.unwrap().unwrap(), metadata);
802            let root2 = db.root();
803
804            // Repeated delete of same key should fail.
805            let mut db = db.into_dirty();
806            assert!(!db.delete(k1).await.unwrap());
807            let db = db.merkleize().await.unwrap();
808
809            // Confirm close/re-open preserves state.
810            db.close().await.unwrap();
811            let db = open_db(context.clone(), partition).await;
812            assert_eq!(db.op_count(), 6); // 1 update, 2 commits, 1 move, 1 delete + 1 initial commit.
813            assert_eq!(db.get_metadata().await.unwrap().unwrap(), metadata);
814            assert_eq!(db.root(), root2);
815
816            // Confirm all activity bits are false except for the last commit.
817            for i in 0..*db.op_count() - 1 {
818                assert!(!db.status.get_bit(i));
819            }
820            assert!(db.status.get_bit(*db.op_count() - 1));
821
822            db.destroy().await.unwrap();
823        });
824    }
825
826    #[test_traced("WARN")]
827    fn test_current_db_build_big() {
828        let executor = deterministic::Runner::default();
829        // Build a db with 1000 keys, some of which we update and some of which we delete, and
830        // confirm that the end state of the db matches that of an identically updated hashmap.
831        const ELEMENTS: u64 = 1000;
832        executor.start(|context| async move {
833            let mut db = open_db(context.clone(), "build_big").await.into_dirty();
834
835            let mut map = HashMap::<Digest, Digest>::default();
836            for i in 0u64..ELEMENTS {
837                let k = Sha256::hash(&i.to_be_bytes());
838                let v = Sha256::hash(&(i * 1000).to_be_bytes());
839                db.update(k, v).await.unwrap();
840                map.insert(k, v);
841            }
842
843            // Update every 3rd key
844            for i in 0u64..ELEMENTS {
845                if i % 3 != 0 {
846                    continue;
847                }
848                let k = Sha256::hash(&i.to_be_bytes());
849                let v = Sha256::hash(&((i + 1) * 10000).to_be_bytes());
850                db.update(k, v).await.unwrap();
851                map.insert(k, v);
852            }
853
854            // Delete every 7th key
855            for i in 0u64..ELEMENTS {
856                if i % 7 != 1 {
857                    continue;
858                }
859                let k = Sha256::hash(&i.to_be_bytes());
860                db.delete(k).await.unwrap();
861                map.remove(&k);
862            }
863
864            assert_eq!(db.op_count(), 1478);
865            assert_eq!(db.inactivity_floor_loc(), Location::new_unchecked(0));
866            assert_eq!(db.op_count(), 1478);
867            assert_eq!(db.any.snapshot.items(), 857);
868
869            // Test that commit + sync w/ pruning will raise the activity floor.
870            let mut db = db.merkleize().await.unwrap();
871            db.commit(None).await.unwrap();
872            db.sync().await.unwrap();
873            db.prune(db.inactivity_floor_loc()).await.unwrap();
874            assert_eq!(db.op_count(), 1957);
875            assert_eq!(db.inactivity_floor_loc(), Location::new_unchecked(838));
876            assert_eq!(db.any.snapshot.items(), 857);
877
878            // Close & reopen the db, making sure the re-opened db has exactly the same state.
879            let root = db.root();
880            db.close().await.unwrap();
881            let db = open_db(context.clone(), "build_big").await;
882            assert_eq!(root, db.root());
883            assert_eq!(db.op_count(), 1957);
884            assert_eq!(db.inactivity_floor_loc(), Location::new_unchecked(838));
885            assert_eq!(db.any.snapshot.items(), 857);
886
887            // Confirm the db's state matches that of the separate map we computed independently.
888            for i in 0u64..1000 {
889                let k = Sha256::hash(&i.to_be_bytes());
890                if let Some(map_value) = map.get(&k) {
891                    let Some(db_value) = db.get(&k).await.unwrap() else {
892                        panic!("key not found in db: {k}");
893                    };
894                    assert_eq!(*map_value, db_value);
895                } else {
896                    assert!(db.get(&k).await.unwrap().is_none());
897                }
898            }
899        });
900    }
901
902    /// Build a tiny database and make sure we can't convince the verifier that some old value of a
903    /// key is active. We specifically test over the partial chunk case, since these bits are yet to
904    /// be committed to the underlying MMR.
905    #[test_traced("DEBUG")]
906    pub fn test_current_db_verify_proof_over_bits_in_uncommitted_chunk() {
907        let executor = deterministic::Runner::default();
908        executor.start(|context| async move {
909            let mut hasher = StandardHasher::<Sha256>::new();
910            let partition = "build_small";
911            let mut db = open_db(context.clone(), partition).await.into_dirty();
912
913            // Add one key.
914            let k = Sha256::fill(0x01);
915            let v1 = Sha256::fill(0xA1);
916            db.update(k, v1).await.unwrap();
917            let mut db = db.merkleize().await.unwrap();
918            db.commit(None).await.unwrap();
919
920            let (_, op_loc) = db.any.get_with_loc(&k).await.unwrap().unwrap();
921            let proof = db.key_value_proof(hasher.inner(), k).await.unwrap();
922
923            // Proof should be verifiable against current root.
924            let root = db.root();
925            assert!(CleanCurrentTest::verify_key_value_proof(
926                hasher.inner(),
927                k,
928                v1,
929                &proof,
930                &root
931            ));
932
933            let v2 = Sha256::fill(0xA2);
934            // Proof should not verify against a different value.
935            assert!(!CleanCurrentTest::verify_key_value_proof(
936                hasher.inner(),
937                k,
938                v2,
939                &proof,
940                &root,
941            ));
942
943            // Update the key to a new value (v2), which inactivates the previous operation.
944            let mut db = db.into_dirty();
945            db.update(k, v2).await.unwrap();
946            let mut db = db.merkleize().await.unwrap();
947            db.commit(None).await.unwrap();
948            let root = db.root();
949
950            // New value should not be verifiable against the old proof.
951            assert!(!CleanCurrentTest::verify_key_value_proof(
952                hasher.inner(),
953                k,
954                v2,
955                &proof,
956                &root,
957            ));
958
959            // But the new value should verify against a new proof.
960            let proof = db.key_value_proof(hasher.inner(), k).await.unwrap();
961            assert!(CleanCurrentTest::verify_key_value_proof(
962                hasher.inner(),
963                k,
964                v2,
965                &proof,
966                &root,
967            ));
968            // Old value will not verify against new proof.
969            assert!(!CleanCurrentTest::verify_key_value_proof(
970                hasher.inner(),
971                k,
972                v1,
973                &proof,
974                &root,
975            ));
976
977            // Create a proof of the now-inactive update operation assigining v1 to k against the
978            // current root.
979            let (range_proof, _, chunks) = db
980                .range_proof(hasher.inner(), op_loc, NZU64!(1))
981                .await
982                .unwrap();
983            let proof_inactive = KeyValueProof {
984                loc: op_loc,
985                chunk: chunks[0],
986                range_proof,
987            };
988            // This proof should verify using verify_range_proof which does not check activity
989            // status.
990            let op = Operation::Update(Update(k, v1));
991            assert!(CleanCurrentTest::verify_range_proof(
992                hasher.inner(),
993                &proof_inactive.range_proof,
994                proof_inactive.loc,
995                &[op],
996                &[proof_inactive.chunk],
997                &root,
998            ));
999            // But this proof should *not* verify as a key value proof, since verification will see
1000            // that the operation is inactive.
1001            assert!(!CleanCurrentTest::verify_key_value_proof(
1002                hasher.inner(),
1003                k,
1004                v1,
1005                &proof_inactive,
1006                &root,
1007            ));
1008
1009            // Attempt #1 to "fool" the verifier:  change the location to that of an active
1010            // operation. This should not fool the verifier if we're properly validating the
1011            // inclusion of the operation itself, and not just the chunk.
1012            let (_, active_loc) = db.any.get_with_loc(&k).await.unwrap().unwrap();
1013            // The new location should differ but still be in the same chunk.
1014            assert_ne!(active_loc, proof_inactive.loc);
1015            assert_eq!(
1016                CleanBitMap::<Digest, 32>::leaf_pos(*active_loc),
1017                CleanBitMap::<Digest, 32>::leaf_pos(*proof_inactive.loc)
1018            );
1019            let mut fake_proof = proof_inactive.clone();
1020            fake_proof.loc = active_loc;
1021            assert!(!CleanCurrentTest::verify_key_value_proof(
1022                hasher.inner(),
1023                k,
1024                v1,
1025                &fake_proof,
1026                &root,
1027            ));
1028
1029            // Attempt #2 to "fool" the verifier: Modify the chunk in the proof info to make it look
1030            // like the operation is active by flipping its corresponding bit to 1. This should not
1031            // fool the verifier if we are correctly incorporating the partial chunk information
1032            // into the root computation.
1033            let mut modified_chunk = proof_inactive.chunk;
1034            let bit_pos = *proof_inactive.loc;
1035            let byte_idx = bit_pos / 8;
1036            let bit_idx = bit_pos % 8;
1037            modified_chunk[byte_idx as usize] |= 1 << bit_idx;
1038
1039            let mut fake_proof = proof_inactive.clone();
1040            fake_proof.chunk = modified_chunk;
1041            assert!(!CleanCurrentTest::verify_key_value_proof(
1042                hasher.inner(),
1043                k,
1044                v1,
1045                &fake_proof,
1046                &root,
1047            ));
1048
1049            db.destroy().await.unwrap();
1050        });
1051    }
1052
1053    /// Apply random operations to the given db, committing them (randomly & at the end) only if
1054    /// `commit_changes` is true.
1055    async fn apply_random_ops(
1056        num_elements: u64,
1057        commit_changes: bool,
1058        rng_seed: u64,
1059        mut db: DirtyCurrentTest,
1060    ) -> Result<CleanCurrentTest, Error> {
1061        // Log the seed with high visibility to make failures reproducible.
1062        warn!("rng_seed={}", rng_seed);
1063        let mut rng = StdRng::seed_from_u64(rng_seed);
1064
1065        for i in 0u64..num_elements {
1066            let k = Sha256::hash(&i.to_be_bytes());
1067            let v = Sha256::hash(&rng.next_u32().to_be_bytes());
1068            db.update(k, v).await.unwrap();
1069        }
1070
1071        // Randomly update / delete them. We use a delete frequency that is 1/7th of the update
1072        // frequency.
1073        for _ in 0u64..num_elements * 10 {
1074            let rand_key = Sha256::hash(&(rng.next_u64() % num_elements).to_be_bytes());
1075            if rng.next_u32() % 7 == 0 {
1076                db.delete(rand_key).await.unwrap();
1077                continue;
1078            }
1079            let v = Sha256::hash(&rng.next_u32().to_be_bytes());
1080            db.update(rand_key, v).await.unwrap();
1081            if commit_changes && rng.next_u32() % 20 == 0 {
1082                // Commit every ~20 updates.
1083                let mut clean_db = db.merkleize().await?;
1084                clean_db.commit(None).await?;
1085                db = clean_db.into_dirty();
1086            }
1087        }
1088        if commit_changes {
1089            let mut clean_db = db.merkleize().await?;
1090            clean_db.commit(None).await?;
1091            Ok(clean_db)
1092        } else {
1093            db.merkleize().await
1094        }
1095    }
1096
1097    #[test_traced("DEBUG")]
1098    pub fn test_current_db_range_proofs() {
1099        let executor = deterministic::Runner::default();
1100        executor.start(|mut context| async move {
1101            let partition = "range_proofs";
1102            let mut hasher = StandardHasher::<Sha256>::new();
1103            let db = open_db(context.clone(), partition).await.into_dirty();
1104            let db = apply_random_ops(200, true, context.next_u64(), db)
1105                .await
1106                .unwrap();
1107            let root = db.root();
1108
1109            // Make sure size-constrained batches of operations are provable from the oldest
1110            // retained op to tip.
1111            let max_ops = 4;
1112            let end_loc = db.op_count();
1113            let start_loc = db.any.inactivity_floor_loc();
1114
1115            for loc in *start_loc..*end_loc {
1116                let loc = Location::new_unchecked(loc);
1117                let (proof, ops, chunks) = db
1118                    .range_proof(hasher.inner(), loc, NZU64!(max_ops))
1119                    .await
1120                    .unwrap();
1121                assert!(
1122                    CleanCurrentTest::verify_range_proof(
1123                        hasher.inner(),
1124                        &proof,
1125                        loc,
1126                        &ops,
1127                        &chunks,
1128                        &root
1129                    ),
1130                    "failed to verify range at start_loc {start_loc}",
1131                );
1132            }
1133
1134            db.destroy().await.unwrap();
1135        });
1136    }
1137
1138    #[test_traced("DEBUG")]
1139    pub fn test_current_db_key_value_proof() {
1140        let executor = deterministic::Runner::default();
1141        executor.start(|mut context| async move {
1142            let partition = "range_proofs";
1143            let mut hasher = StandardHasher::<Sha256>::new();
1144            let db = open_db(context.clone(), partition).await.into_dirty();
1145            let db = apply_random_ops(500, true, context.next_u64(), db)
1146                .await
1147                .unwrap();
1148            let root = db.root();
1149
1150            // Confirm bad keys produce the expected error.
1151            let bad_key = Sha256::fill(0xAA);
1152            let res = db.key_value_proof(hasher.inner(), bad_key).await;
1153            assert!(matches!(res, Err(Error::KeyNotFound)));
1154
1155            let start = *db.inactivity_floor_loc();
1156            for i in start..db.status.len() {
1157                if !db.status.get_bit(i) {
1158                    continue;
1159                }
1160                // Found an active operation! Create a proof for its active current key/value if
1161                // it's a key-updating operation.
1162                let (key, value) = match db.any.log.read(Location::new_unchecked(i)).await.unwrap()
1163                {
1164                    Operation::Update(Update(key, value)) => (key, value),
1165                    Operation::CommitFloor(_, _) => continue,
1166                    Operation::Delete(_) => {
1167                        unreachable!("location does not reference update/commit operation")
1168                    }
1169                };
1170
1171                let proof = db.key_value_proof(hasher.inner(), key).await.unwrap();
1172                // Proof should validate against the current value and correct root.
1173                assert!(CleanCurrentTest::verify_key_value_proof(
1174                    hasher.inner(),
1175                    key,
1176                    value,
1177                    &proof,
1178                    &root
1179                ));
1180                // Proof should fail against the wrong value.
1181                let wrong_val = Sha256::fill(0xFF);
1182                assert!(!CleanCurrentTest::verify_key_value_proof(
1183                    hasher.inner(),
1184                    key,
1185                    wrong_val,
1186                    &proof,
1187                    &root
1188                ));
1189                // Proof should fail against the wrong key.
1190                let wrong_key = Sha256::fill(0xEE);
1191                assert!(!CleanCurrentTest::verify_key_value_proof(
1192                    hasher.inner(),
1193                    wrong_key,
1194                    value,
1195                    &proof,
1196                    &root
1197                ));
1198                // Proof should fail against the wrong root.
1199                let wrong_root = Sha256::fill(0xDD);
1200                assert!(!CleanCurrentTest::verify_key_value_proof(
1201                    hasher.inner(),
1202                    key,
1203                    value,
1204                    &proof,
1205                    &wrong_root,
1206                ));
1207            }
1208
1209            db.destroy().await.unwrap();
1210        });
1211    }
1212
1213    /// This test builds a random database, and makes sure that its state is correctly restored
1214    /// after closing and re-opening.
1215    #[test_traced("WARN")]
1216    pub fn test_current_db_build_random_close_reopen() {
1217        // Number of elements to initially insert into the db.
1218        const ELEMENTS: u64 = 1000;
1219
1220        let executor = deterministic::Runner::default();
1221        executor.start(|mut context| async move {
1222            let partition = "build_random";
1223            let rng_seed = context.next_u64();
1224            let db = open_db(context.clone(), partition).await.into_dirty();
1225            let db = apply_random_ops(ELEMENTS, true, rng_seed, db)
1226                .await
1227                .unwrap();
1228
1229            // Close the db, then replay its operations with a bitmap.
1230            let root = db.root();
1231            // Create a bitmap based on the current db's pruned/inactive state.
1232            db.close().await.unwrap();
1233
1234            let db = open_db(context, partition).await;
1235            assert_eq!(db.root(), root);
1236
1237            db.destroy().await.unwrap();
1238        });
1239    }
1240
1241    /// Repeatedly update the same key to a new value and ensure we can prove its current value
1242    /// after each update.
1243    #[test_traced("WARN")]
1244    pub fn test_current_db_proving_repeated_updates() {
1245        let executor = deterministic::Runner::default();
1246        executor.start(|context| async move {
1247            let mut hasher = StandardHasher::<Sha256>::new();
1248            let partition = "build_small";
1249            let mut db = open_db(context.clone(), partition).await;
1250
1251            // Add one key.
1252            let k = Sha256::fill(0x00);
1253            let mut old_val = Sha256::fill(0x00);
1254            for i in 1u8..=255 {
1255                let v = Sha256::fill(i);
1256                let mut dirty_db = db.into_dirty();
1257                dirty_db.update(k, v).await.unwrap();
1258                assert_eq!(dirty_db.get(&k).await.unwrap().unwrap(), v);
1259                db = dirty_db.merkleize().await.unwrap();
1260                db.commit(None).await.unwrap();
1261                let root = db.root();
1262
1263                // Create a proof for the current value of k.
1264                let proof = db.key_value_proof(hasher.inner(), k).await.unwrap();
1265                assert!(
1266                    CleanCurrentTest::verify_key_value_proof(hasher.inner(), k, v, &proof, &root),
1267                    "proof of update {i} failed to verify"
1268                );
1269                // Ensure the proof does NOT verify if we use the previous value.
1270                assert!(
1271                    !CleanCurrentTest::verify_key_value_proof(
1272                        hasher.inner(),
1273                        k,
1274                        old_val,
1275                        &proof,
1276                        &root
1277                    ),
1278                    "proof of update {i} verified when it should not have"
1279                );
1280                old_val = v;
1281            }
1282
1283            db.destroy().await.unwrap();
1284        });
1285    }
1286
1287    /// This test builds a random database and simulates we can recover from different types of
1288    /// failure scenarios.
1289    #[test_traced("WARN")]
1290    pub fn test_current_db_simulate_write_failures() {
1291        // Number of elements to initially insert into the db.
1292        const ELEMENTS: u64 = 1000;
1293
1294        let executor = deterministic::Runner::default();
1295        executor.start(|mut context| async move {
1296            let partition = "build_random_fail_commit";
1297            let rng_seed = context.next_u64();
1298            let db = open_db(context.clone(), partition).await.into_dirty();
1299            let mut db = apply_random_ops(ELEMENTS, true, rng_seed, db)
1300                .await
1301                .unwrap();
1302            let committed_root = db.root();
1303            let committed_op_count = db.op_count();
1304            let committed_inactivity_floor = db.any.inactivity_floor_loc();
1305            db.prune(committed_inactivity_floor).await.unwrap();
1306
1307            // Perform more random operations without committing any of them.
1308            let db = apply_random_ops(ELEMENTS, false, rng_seed + 1, db.into_dirty())
1309                .await
1310                .unwrap();
1311
1312            // SCENARIO #1: Simulate a crash that happens before any writes. Upon reopening, the
1313            // state of the DB should be as of the last commit.
1314            db.simulate_commit_failure_before_any_writes();
1315            let db = open_db(context.clone(), partition).await;
1316            assert_eq!(db.root(), committed_root);
1317            assert_eq!(db.op_count(), committed_op_count);
1318
1319            // Re-apply the exact same uncommitted operations.
1320            let db = apply_random_ops(ELEMENTS, false, rng_seed + 1, db.into_dirty())
1321                .await
1322                .unwrap();
1323
1324            // SCENARIO #2: Simulate a crash that happens after the any db has been committed, but
1325            // before the state of the pruned bitmap can be written to disk.
1326            db.simulate_commit_failure_after_any_db_commit()
1327                .await
1328                .unwrap();
1329
1330            // We should be able to recover, so the root should differ from the previous commit, and
1331            // the op count should be greater than before.
1332            let db = open_db(context.clone(), partition).await;
1333            let scenario_2_root = db.root();
1334
1335            // To confirm the second committed hash is correct we'll re-build the DB in a new
1336            // partition, but without any failures. They should have the exact same state.
1337            let fresh_partition = "build_random_fail_commit_fresh";
1338            let db = open_db(context.clone(), fresh_partition).await.into_dirty();
1339            let db = apply_random_ops(ELEMENTS, true, rng_seed, db)
1340                .await
1341                .unwrap();
1342            let db = apply_random_ops(ELEMENTS, false, rng_seed + 1, db.into_dirty())
1343                .await
1344                .unwrap();
1345            let mut db = db.into_dirty().merkleize().await.unwrap();
1346            db.commit(None).await.unwrap();
1347            db.prune(db.any.inactivity_floor_loc()).await.unwrap();
1348            // State from scenario #2 should match that of a successful commit.
1349            assert_eq!(db.root(), scenario_2_root);
1350
1351            db.destroy().await.unwrap();
1352        });
1353    }
1354
1355    #[test_traced("WARN")]
1356    pub fn test_current_db_different_pruning_delays_same_root() {
1357        let executor = deterministic::Runner::default();
1358        executor.start(|context| async move {
1359            // Create two databases that are identical other than how they are pruned.
1360            let db_config_no_pruning = current_db_config("no_pruning_test");
1361
1362            let db_config_pruning = current_db_config("pruning_test");
1363
1364            let mut db_no_pruning =
1365                CleanCurrentTest::init(context.clone(), db_config_no_pruning.clone())
1366                    .await
1367                    .unwrap()
1368                    .into_dirty();
1369            let mut db_pruning = CleanCurrentTest::init(context.clone(), db_config_pruning.clone())
1370                .await
1371                .unwrap()
1372                .into_dirty();
1373
1374            // Apply identical operations to both databases, but only prune one.
1375            const NUM_OPERATIONS: u64 = 1000;
1376            for i in 0..NUM_OPERATIONS {
1377                let key = Sha256::hash(&i.to_be_bytes());
1378                let value = Sha256::hash(&(i * 1000).to_be_bytes());
1379
1380                db_no_pruning.update(key, value).await.unwrap();
1381                db_pruning.update(key, value).await.unwrap();
1382
1383                // Commit periodically
1384                if i % 50 == 49 {
1385                    let mut clean_no_pruning = db_no_pruning.merkleize().await.unwrap();
1386                    clean_no_pruning.commit(None).await.unwrap();
1387                    let mut clean_pruning = db_pruning.merkleize().await.unwrap();
1388                    clean_pruning.commit(None).await.unwrap();
1389                    clean_pruning
1390                        .prune(clean_no_pruning.any.inactivity_floor_loc())
1391                        .await
1392                        .unwrap();
1393                    db_no_pruning = clean_no_pruning.into_dirty();
1394                    db_pruning = clean_pruning.into_dirty();
1395                }
1396            }
1397
1398            // Final commit
1399            let mut db_no_pruning = db_no_pruning.merkleize().await.unwrap();
1400            db_no_pruning.commit(None).await.unwrap();
1401            let mut db_pruning = db_pruning.merkleize().await.unwrap();
1402            db_pruning.commit(None).await.unwrap();
1403
1404            // Get roots from both databases
1405            let root_no_pruning = db_no_pruning.root();
1406            let root_pruning = db_pruning.root();
1407
1408            // Verify they generate the same roots
1409            assert_eq!(root_no_pruning, root_pruning);
1410
1411            // Close both databases
1412            db_no_pruning.close().await.unwrap();
1413            db_pruning.close().await.unwrap();
1414
1415            // Restart both databases
1416            let db_no_pruning = CleanCurrentTest::init(context.clone(), db_config_no_pruning)
1417                .await
1418                .unwrap();
1419            let db_pruning = CleanCurrentTest::init(context.clone(), db_config_pruning)
1420                .await
1421                .unwrap();
1422            assert_eq!(
1423                db_no_pruning.inactivity_floor_loc(),
1424                db_pruning.inactivity_floor_loc()
1425            );
1426
1427            // Get roots after restart
1428            let root_no_pruning_restart = db_no_pruning.root();
1429            let root_pruning_restart = db_pruning.root();
1430
1431            // Ensure roots still match after restart
1432            assert_eq!(root_no_pruning, root_no_pruning_restart);
1433            assert_eq!(root_pruning, root_pruning_restart);
1434
1435            db_no_pruning.destroy().await.unwrap();
1436            db_pruning.destroy().await.unwrap();
1437        });
1438    }
1439
1440    #[test_traced("DEBUG")]
1441    fn test_batch() {
1442        batch_tests::test_batch(|mut ctx| async move {
1443            let seed = ctx.next_u64();
1444            let prefix = format!("current_unordered_batch_{seed}");
1445            AnyExt::new(open_db(ctx, &prefix).await)
1446        });
1447    }
1448}