commonware_storage/qmdb/current/ordered/
fixed.rs

1//! An _ordered_ variant of a [crate::qmdb::current] authenticated database optimized for fixed-size
2//! values
3//!
4//! This variant maintains the lexicographic-next active key for each active key, enabling exclusion
5//! proofs (proving a key is currently inactive). Use [super::super::unordered::fixed] if exclusion
6//! proofs are not needed.
7//!
8//! See [Db] for the main database type and [ExclusionProof] for proving key inactivity.
9
10use crate::{
11    bitmap::{CleanBitMap, DirtyBitMap},
12    mmr::{
13        grafting::Storage as GraftingStorage,
14        mem::{Clean, Dirty, State},
15        Location, Proof, StandardHasher,
16    },
17    qmdb::{
18        any::{
19            ordered::fixed::{Db as AnyDb, Operation, Update},
20            CleanAny, DirtyAny, FixedValue,
21        },
22        current::{
23            merkleize_grafted_bitmap,
24            ordered::ExclusionProof,
25            proof::{OperationProof, RangeProof},
26            root, FixedConfig as Config,
27        },
28        store::{Batchable, CleanStore, DirtyStore, LogStore},
29        Error,
30    },
31    translator::Translator,
32    AuthenticatedBitMap as BitMap,
33};
34use commonware_codec::FixedSize;
35use commonware_cryptography::{Digest, DigestOf, Hasher};
36use commonware_runtime::{Clock, Metrics, Storage as RStorage};
37use commonware_utils::Array;
38use core::ops::Range;
39use futures::stream::Stream;
40use std::num::NonZeroU64;
41
42/// A key-value QMDB based on an MMR over its log of operations, supporting key exclusion proofs and
43/// authentication of whether a currently has a specific value.
44///
45/// Note: The generic parameter N is not really generic, and must be manually set to double the size
46/// of the hash digest being produced by the hasher. A compile-time assertion is used to prevent any
47/// other setting.
48pub struct Db<
49    E: RStorage + Clock + Metrics,
50    K: Array,
51    V: FixedValue,
52    H: Hasher,
53    T: Translator,
54    const N: usize,
55    S: State<DigestOf<H>> = Clean<DigestOf<H>>,
56> {
57    /// An authenticated database that provides the ability to prove whether a key ever had a
58    /// specific value.
59    any: AnyDb<E, K, V, H, T, S>,
60
61    /// The bitmap over the activity status of each operation. Supports augmenting [Db] proofs in
62    /// order to further prove whether a key _currently_ has a specific value.
63    status: BitMap<H::Digest, N, S>,
64
65    context: E,
66
67    bitmap_metadata_partition: String,
68
69    /// Cached root digest. Invariant: valid when in Clean state.
70    cached_root: Option<H::Digest>,
71}
72
73/// Proof information for verifying a key has a particular value in the database.
74#[derive(Clone, Eq, PartialEq, Debug)]
75pub struct KeyValueProof<K: Array, D: Digest, const N: usize> {
76    pub proof: OperationProof<D, N>,
77    pub next_key: K,
78}
79
80impl<
81        E: RStorage + Clock + Metrics,
82        K: Array,
83        V: FixedValue,
84        H: Hasher,
85        T: Translator,
86        const N: usize,
87        S: State<DigestOf<H>>,
88    > Db<E, K, V, H, T, N, S>
89{
90    /// The number of operations that have been applied to this db, including those that have been
91    /// pruned and those that are not yet committed.
92    pub fn op_count(&self) -> Location {
93        self.any.op_count()
94    }
95
96    /// Return the inactivity floor location. This is the location before which all operations are
97    /// known to be inactive. Operations before this point can be safely pruned.
98    pub const fn inactivity_floor_loc(&self) -> Location {
99        self.any.inactivity_floor_loc()
100    }
101
102    /// Get the value of `key` in the db, or None if it has no value.
103    pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
104        self.any.get(key).await
105    }
106
107    /// Get the metadata associated with the last commit.
108    pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
109        self.any.get_metadata().await
110    }
111
112    /// Whether the db currently has no active keys.
113    pub const fn is_empty(&self) -> bool {
114        self.any.is_empty()
115    }
116
117    /// Get the level of the base MMR into which we are grafting.
118    ///
119    /// This value is log2 of the chunk size in bits. Since we assume the chunk size is a power of
120    /// 2, we compute this from trailing_zeros.
121    const fn grafting_height() -> u32 {
122        CleanBitMap::<H::Digest, N>::CHUNK_SIZE_BITS.trailing_zeros()
123    }
124
125    /// Return true if the proof authenticates that `key` currently has value `value` in the db with
126    /// the provided `root`.
127    pub fn verify_key_value_proof(
128        hasher: &mut H,
129        key: K,
130        value: V,
131        proof: &KeyValueProof<K, H::Digest, N>,
132        root: &H::Digest,
133    ) -> bool {
134        let op = Operation::Update(Update {
135            key,
136            value,
137            next_key: proof.next_key.clone(),
138        });
139
140        proof
141            .proof
142            .verify(hasher, Self::grafting_height(), op, root)
143    }
144
145    /// Get the operation that currently defines the span whose range contains `key`, or None if the
146    /// DB is empty.
147    pub async fn get_span(&self, key: &K) -> Result<Option<(Location, Update<K, V>)>, Error> {
148        self.any.get_span(key).await
149    }
150
151    /// Streams all active (key, value) pairs in the database in key order, starting from the first
152    /// active key greater than or equal to `start`.
153    pub async fn stream_range<'a>(
154        &'a self,
155        start: K,
156    ) -> Result<impl Stream<Item = Result<(K, V), Error>> + 'a, Error> {
157        self.any.stream_range(start).await
158    }
159
160    /// Return true if the proof authenticates that `key` does _not_ exist in the db with the
161    /// provided `root`.
162    pub fn verify_exclusion_proof(
163        hasher: &mut H,
164        key: &K,
165        proof: &ExclusionProof<K, V, H::Digest, N>,
166        root: &H::Digest,
167    ) -> bool {
168        let (op_proof, op) = match proof {
169            ExclusionProof::KeyValue(op_proof, data) => {
170                if data.key == *key {
171                    // The provided `key` is in the DB if it matches the start of the span.
172                    return false;
173                }
174                if !AnyDb::<E, K, V, H, T>::span_contains(&data.key, &data.next_key, key) {
175                    // If the key is not within the span, then this proof cannot prove its
176                    // exclusion.
177                    return false;
178                }
179
180                (op_proof, Operation::Update(data.clone()))
181            }
182            ExclusionProof::Commit(op_proof, metadata) => {
183                // Handle the case where the proof shows the db is empty, hence any key is proven
184                // excluded. For the db to be empty, the floor must equal the commit operation's
185                // location.
186                let floor_loc = op_proof.loc;
187                (
188                    op_proof,
189                    Operation::CommitFloor(metadata.clone(), floor_loc),
190                )
191            }
192        };
193
194        op_proof.verify(hasher, Self::grafting_height(), op, root)
195    }
196}
197
198impl<
199        E: RStorage + Clock + Metrics,
200        K: Array,
201        V: FixedValue,
202        H: Hasher,
203        T: Translator,
204        const N: usize,
205    > Db<E, K, V, H, T, N>
206{
207    /// Initializes a [Db] from the given `config`. Leverages parallel Merkleization to initialize
208    /// the bitmap MMR if a thread pool is provided.
209    pub async fn init(context: E, config: Config<T>) -> Result<Self, Error> {
210        // TODO: Re-evaluate assertion placement after `generic_const_exprs` is stable.
211        const {
212            // A compile-time assertion that the chunk size is some multiple of digest size. A multiple of 1 is optimal
213            // with respect to proof size, but a higher multiple allows for a smaller (RAM resident) merkle tree over
214            // the structure.
215            assert!(
216                N.is_multiple_of(H::Digest::SIZE),
217                "chunk size must be some multiple of the digest size",
218            );
219            // A compile-time assertion that chunk size is a power of 2, which is necessary to allow the status bitmap
220            // tree to be aligned with the underlying operations MMR.
221            assert!(N.is_power_of_two(), "chunk size must be a power of 2");
222        }
223
224        let thread_pool = config.thread_pool.clone();
225        let bitmap_metadata_partition = config.bitmap_metadata_partition.clone();
226
227        let mut hasher = StandardHasher::<H>::new();
228        let mut status = CleanBitMap::restore_pruned(
229            context.with_label("bitmap"),
230            &bitmap_metadata_partition,
231            thread_pool,
232            &mut hasher,
233        )
234        .await?
235        .into_dirty();
236
237        // Initialize the anydb with a callback that initializes the status bitmap.
238        let last_known_inactivity_floor = Location::new_unchecked(status.len());
239        let any = AnyDb::init_with_callback(
240            context.with_label("any"),
241            config.to_any_config(),
242            Some(last_known_inactivity_floor),
243            |append: bool, loc: Option<Location>| {
244                status.push(append);
245                if let Some(loc) = loc {
246                    status.set_bit(*loc, false);
247                }
248            },
249        )
250        .await?;
251
252        let height = Self::grafting_height();
253        let status = merkleize_grafted_bitmap(&mut hasher, status, &any.log.mmr, height).await?;
254
255        // Compute and cache the root
256        let cached_root = Some(root(&mut hasher, height, &status, &any.log.mmr).await?);
257
258        Ok(Self {
259            any,
260            status,
261            context,
262            bitmap_metadata_partition,
263            cached_root,
264        })
265    }
266
267    /// Return the cached root of the db.
268    pub const fn root(&self) -> H::Digest {
269        self.cached_root.expect("Clean state must have cached root")
270    }
271
272    /// Returns a proof that the specified range of operations are part of the database, along with
273    /// the operations from the range. A truncated range (from hitting the max) can be detected by
274    /// looking at the length of the returned operations vector. Also returns the bitmap chunks
275    /// required to verify the proof.
276    ///
277    /// # Errors
278    ///
279    /// Returns [crate::mmr::Error::LocationOverflow] if `start_loc` > [crate::mmr::MAX_LOCATION].
280    /// Returns [crate::mmr::Error::RangeOutOfBounds] if `start_loc` >= number of leaves in the MMR.
281    pub async fn range_proof(
282        &self,
283        hasher: &mut H,
284        start_loc: Location,
285        max_ops: NonZeroU64,
286    ) -> Result<(RangeProof<H::Digest>, Vec<Operation<K, V>>, Vec<[u8; N]>), Error> {
287        RangeProof::<H::Digest>::new_with_ops(
288            hasher,
289            &self.status,
290            Self::grafting_height(),
291            &self.any.log.mmr,
292            &self.any.log,
293            start_loc,
294            max_ops,
295        )
296        .await
297    }
298
299    /// Return true if the given sequence of `ops` were applied starting at location `start_loc` in
300    /// the log with the provided root.
301    pub fn verify_range_proof(
302        hasher: &mut H,
303        proof: &RangeProof<H::Digest>,
304        start_loc: Location,
305        ops: &[Operation<K, V>],
306        chunks: &[[u8; N]],
307        root: &H::Digest,
308    ) -> bool {
309        let height = Self::grafting_height();
310
311        proof.verify(hasher, height, start_loc, ops, chunks, root)
312    }
313
314    /// Generate and return a proof of the current value of `key`, along with the other
315    /// [KeyValueProof] required to verify the proof. Returns KeyNotFound error if the key is not
316    /// currently assigned any value.
317    ///
318    /// # Errors
319    ///
320    /// Returns [Error::KeyNotFound] if the key is not currently assigned any value.
321    pub async fn key_value_proof(
322        &self,
323        hasher: &mut H,
324        key: K,
325    ) -> Result<KeyValueProof<K, H::Digest, N>, Error> {
326        let op_loc = self.any.get_with_loc(&key).await?;
327        let Some((data, loc)) = op_loc else {
328            return Err(Error::KeyNotFound);
329        };
330        let height = Self::grafting_height();
331        let mmr = &self.any.log.mmr;
332        let proof =
333            OperationProof::<H::Digest, N>::new(hasher, &self.status, height, mmr, loc).await?;
334
335        Ok(KeyValueProof {
336            proof,
337            next_key: data.next_key,
338        })
339    }
340
341    /// Generate and return a proof that the specified `key` does not exist in the db.
342    ///
343    /// # Errors
344    ///
345    /// Returns [Error::KeyExists] if the key exists in the db.
346    pub async fn exclusion_proof(
347        &self,
348        hasher: &mut H,
349        key: &K,
350    ) -> Result<ExclusionProof<K, V, H::Digest, N>, Error> {
351        let height = Self::grafting_height();
352        let grafted_mmr =
353            GraftingStorage::<'_, H, _, _>::new(&self.status, &self.any.log.mmr, height);
354
355        let span = self.any.get_span(key).await?;
356        let loc = match &span {
357            Some((loc, key_data)) => {
358                if key_data.key == *key {
359                    // Cannot prove exclusion of a key that exists in the db.
360                    return Err(Error::KeyExists);
361                }
362                *loc
363            }
364            None => self
365                .op_count()
366                .checked_sub(1)
367                .expect("db shouldn't be empty"),
368        };
369
370        let op_proof =
371            OperationProof::<H::Digest, N>::new(hasher, &self.status, height, &grafted_mmr, loc)
372                .await?;
373
374        Ok(match span {
375            Some((_, key_data)) => ExclusionProof::KeyValue(op_proof, key_data),
376            None => {
377                let value = match self.any.log.read(loc).await? {
378                    Operation::CommitFloor(value, _) => value,
379                    _ => unreachable!("last commit is not a CommitFloor operation"),
380                };
381                ExclusionProof::Commit(op_proof, value)
382            }
383        })
384    }
385
386    /// Close the db. Operations that have not been committed will be lost.
387    pub async fn close(self) -> Result<(), Error> {
388        self.any.close().await
389    }
390
391    /// Destroy the db, removing all data from disk.
392    pub async fn destroy(self) -> Result<(), Error> {
393        // Clean up bitmap metadata partition.
394        CleanBitMap::<H::Digest, N>::destroy(self.context, &self.bitmap_metadata_partition).await?;
395
396        // Clean up Any components (MMR and log).
397        self.any.destroy().await
398    }
399
400    #[cfg(test)]
401    /// Simulate a crash that prevents any data from being written to disk, which involves simply
402    /// consuming the db before it can be cleanly closed.
403    fn simulate_commit_failure_before_any_writes(self) {
404        // Don't successfully complete any of the commit operations.
405    }
406
407    #[cfg(test)]
408    /// Simulate a crash that happens during commit and prevents the any db from being pruned of
409    /// inactive operations, and bitmap state from being written/pruned.
410    async fn simulate_commit_failure_after_any_db_commit(mut self) -> Result<(), Error> {
411        // Only successfully complete the log write part of the commit process.
412        let _ = self.commit_to_log(None).await?;
413        Ok(())
414    }
415
416    /// Helper that performs the commit operations up to and including writing to the log,
417    /// but does not merkleize the bitmap or prune. Used for simulating partial commit failures
418    /// in tests, and as the first phase of the full commit operation.
419    ///
420    /// Returns the dirty bitmap that needs to be merkleized and pruned.
421    async fn commit_to_log(
422        &mut self,
423        metadata: Option<V>,
424    ) -> Result<DirtyBitMap<H::Digest, N>, Error> {
425        let empty_status = CleanBitMap::<H::Digest, N>::new(&mut self.any.log.hasher, None);
426        let mut status = std::mem::replace(&mut self.status, empty_status).into_dirty();
427
428        // Inactivate the current commit operation.
429        status.set_bit(*self.any.last_commit_loc, false);
430
431        // Raise the inactivity floor by taking `self.steps` steps, plus 1 to account for the
432        // previous commit becoming inactive.
433        let inactivity_floor_loc = self.any.raise_floor_with_bitmap(&mut status).await?;
434
435        // Append the commit operation with the new floor and tag it as active in the bitmap.
436        status.push(true);
437        let commit_op = Operation::CommitFloor(metadata, inactivity_floor_loc);
438
439        self.any.apply_commit_op(commit_op).await?;
440
441        Ok(status)
442    }
443
444    /// Commit any pending operations to the database, ensuring their durability upon return from
445    /// this function. Also raises the inactivity floor according to the schedule. Returns the
446    /// `(start_loc, end_loc]` location range of committed operations.
447    pub async fn commit(&mut self, metadata: Option<V>) -> Result<Range<Location>, Error> {
448        let start_loc = self.any.last_commit_loc + 1;
449
450        // Commit to log (recovery is ensured after this returns)
451        let status = self.commit_to_log(metadata).await?;
452
453        // Merkleize the new bitmap entries.
454        let height = Self::grafting_height();
455        self.status =
456            merkleize_grafted_bitmap(&mut self.any.log.hasher, status, &self.any.log.mmr, height)
457                .await?;
458
459        // Prune bits that are no longer needed because they precede the inactivity floor.
460        self.status.prune_to_bit(*self.any.inactivity_floor_loc())?;
461
462        // Refresh cached root after commit
463        self.cached_root = Some(
464            root(
465                &mut self.any.log.hasher,
466                height,
467                &self.status,
468                &self.any.log.mmr,
469            )
470            .await?,
471        );
472
473        Ok(start_loc..self.op_count())
474    }
475
476    /// Sync all database state to disk.
477    pub async fn sync(&mut self) -> Result<(), Error> {
478        self.any.sync().await?;
479
480        // Write the bitmap pruning boundary to disk so that next startup doesn't have to
481        // re-Merkleize the inactive portion up to the inactivity floor.
482        self.status
483            .write_pruned(
484                self.context.with_label("bitmap"),
485                &self.bitmap_metadata_partition,
486            )
487            .await
488            .map_err(Into::into)
489    }
490
491    /// Prune historical operations prior to `prune_loc`. This does not affect the db's root
492    /// or current snapshot.
493    pub async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> {
494        // Write the pruned portion of the bitmap to disk *first* to ensure recovery in case of
495        // failure during pruning. If we don't do this, we may not be able to recover the bitmap
496        // because it may require replaying of pruned operations.
497        self.status
498            .write_pruned(
499                self.context.with_label("bitmap"),
500                &self.bitmap_metadata_partition,
501            )
502            .await?;
503
504        self.any.prune(prune_loc).await
505    }
506
507    /// Convert this clean database into its dirty counterpart for performing mutations.
508    pub fn into_dirty(self) -> Db<E, K, V, H, T, N, Dirty> {
509        Db {
510            any: self.any.into_dirty(),
511            status: self.status.into_dirty(),
512            context: self.context,
513            bitmap_metadata_partition: self.bitmap_metadata_partition,
514            cached_root: None,
515        }
516    }
517}
518
519impl<
520        E: RStorage + Clock + Metrics,
521        K: Array,
522        V: FixedValue,
523        H: Hasher,
524        T: Translator,
525        const N: usize,
526    > Db<E, K, V, H, T, N, Dirty>
527{
528    /// Updates `key` to have value `value`. The operation is reflected in the snapshot, but will be
529    /// subject to rollback until the next successful `commit`.
530    pub async fn update(&mut self, key: K, value: V) -> Result<(), Error> {
531        self.any
532            .update_with_callback(key, value, |loc| {
533                self.status.push(true);
534                if let Some(loc) = loc {
535                    self.status.set_bit(*loc, false);
536                }
537            })
538            .await
539    }
540
541    /// Creates a new key-value pair in the db. The operation is reflected in the snapshot, but will
542    /// be subject to rollback until the next successful `commit`. Returns true if the key was
543    /// created, false if it already existed.
544    pub async fn create(&mut self, key: K, value: V) -> Result<bool, Error> {
545        self.any
546            .create_with_callback(key, value, |loc| {
547                self.status.push(true);
548                if let Some(loc) = loc {
549                    self.status.set_bit(*loc, false);
550                }
551            })
552            .await
553    }
554
555    /// Delete `key` and its value from the db. Deleting a key that already has no value is a no-op.
556    /// The operation is reflected in the snapshot, but will be subject to rollback until the next
557    /// successful `commit`. Returns true if the key was deleted, false if it was already inactive.
558    pub async fn delete(&mut self, key: K) -> Result<bool, Error> {
559        let mut r = false;
560        self.any
561            .delete_with_callback(key, |append, loc| {
562                if let Some(loc) = loc {
563                    self.status.set_bit(*loc, false);
564                }
565                self.status.push(append);
566                r = true;
567            })
568            .await?;
569
570        Ok(r)
571    }
572
573    /// Merkleize the bitmap and convert this dirty database into its clean counterpart.
574    /// This computes the Merkle tree over any new bitmap entries but does NOT persist
575    /// changes to storage. Use `commit()` for durable state transitions.
576    pub async fn merkleize(self) -> Result<Db<E, K, V, H, T, N, Clean<DigestOf<H>>>, Error> {
577        // First merkleize the any to get a Clean MMR
578        let clean_any = self.any.merkleize();
579
580        // Now use the clean MMR for bitmap merkleization
581        let mut hasher = StandardHasher::<H>::new();
582        let height = Self::grafting_height();
583        let status =
584            merkleize_grafted_bitmap(&mut hasher, self.status, &clean_any.log.mmr, height).await?;
585
586        // Compute and cache the root
587        let cached_root = Some(root(&mut hasher, height, &status, &clean_any.log.mmr).await?);
588
589        Ok(Db {
590            any: clean_any,
591            status,
592            context: self.context,
593            bitmap_metadata_partition: self.bitmap_metadata_partition,
594            cached_root,
595        })
596    }
597}
598
599impl<
600        E: RStorage + Clock + Metrics,
601        K: Array,
602        V: FixedValue,
603        H: Hasher,
604        T: Translator,
605        const N: usize,
606    > crate::qmdb::store::LogStorePrunable for Db<E, K, V, H, T, N>
607{
608    async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> {
609        self.prune(prune_loc).await
610    }
611}
612
613impl<
614        E: RStorage + Clock + Metrics,
615        K: Array,
616        V: FixedValue,
617        H: Hasher,
618        T: Translator,
619        const N: usize,
620        S: State<DigestOf<H>>,
621    > LogStore for Db<E, K, V, H, T, N, S>
622{
623    type Value = V;
624
625    fn op_count(&self) -> Location {
626        self.op_count()
627    }
628
629    fn inactivity_floor_loc(&self) -> Location {
630        self.inactivity_floor_loc()
631    }
632
633    async fn get_metadata(&self) -> Result<Option<V>, Error> {
634        self.get_metadata().await
635    }
636
637    fn is_empty(&self) -> bool {
638        self.is_empty()
639    }
640}
641
642impl<
643        E: RStorage + Clock + Metrics,
644        K: Array,
645        V: FixedValue,
646        H: Hasher,
647        T: Translator,
648        const N: usize,
649        S: State<DigestOf<H>>,
650    > crate::store::Store for Db<E, K, V, H, T, N, S>
651{
652    type Key = K;
653    type Value = V;
654    type Error = Error;
655
656    async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
657        self.get(key).await
658    }
659}
660
661impl<
662        E: RStorage + Clock + Metrics,
663        K: Array,
664        V: FixedValue,
665        H: Hasher,
666        T: Translator,
667        const N: usize,
668    > crate::store::StoreMut for Db<E, K, V, H, T, N, Dirty>
669{
670    async fn update(&mut self, key: Self::Key, value: Self::Value) -> Result<(), Self::Error> {
671        self.update(key, value).await
672    }
673}
674
675impl<
676        E: RStorage + Clock + Metrics,
677        K: Array,
678        V: FixedValue,
679        H: Hasher,
680        T: Translator,
681        const N: usize,
682    > crate::store::StoreDeletable for Db<E, K, V, H, T, N, Dirty>
683{
684    async fn delete(&mut self, key: Self::Key) -> Result<bool, Self::Error> {
685        self.delete(key).await
686    }
687}
688
689impl<
690        E: RStorage + Clock + Metrics,
691        K: Array,
692        V: FixedValue,
693        H: Hasher,
694        T: Translator,
695        const N: usize,
696    > CleanStore for Db<E, K, V, H, T, N, Clean<DigestOf<H>>>
697{
698    type Digest = H::Digest;
699    type Operation = Operation<K, V>;
700    type Dirty = Db<E, K, V, H, T, N, Dirty>;
701
702    fn root(&self) -> Self::Digest {
703        self.root()
704    }
705
706    async fn proof(
707        &self,
708        start_loc: Location,
709        max_ops: NonZeroU64,
710    ) -> Result<(Proof<Self::Digest>, Vec<Self::Operation>), Error> {
711        self.any.proof(start_loc, max_ops).await
712    }
713
714    async fn historical_proof(
715        &self,
716        historical_size: Location,
717        start_loc: Location,
718        max_ops: NonZeroU64,
719    ) -> Result<(Proof<Self::Digest>, Vec<Self::Operation>), Error> {
720        self.any
721            .historical_proof(historical_size, start_loc, max_ops)
722            .await
723    }
724
725    fn into_dirty(self) -> Self::Dirty {
726        self.into_dirty()
727    }
728}
729
730impl<E, K, V, T, H, const N: usize> Batchable for Db<E, K, V, H, T, N, Dirty>
731where
732    E: RStorage + Clock + Metrics,
733    K: Array,
734    V: FixedValue,
735    T: Translator,
736    H: Hasher,
737{
738    async fn write_batch(
739        &mut self,
740        iter: impl Iterator<Item = (K, Option<V>)>,
741    ) -> Result<(), Error> {
742        let status = &mut self.status;
743        self.any
744            .write_batch_with_callback(iter, move |append: bool, loc: Option<Location>| {
745                status.push(append);
746                if let Some(loc) = loc {
747                    status.set_bit(*loc, false);
748                }
749            })
750            .await
751    }
752}
753
754impl<
755        E: RStorage + Clock + Metrics,
756        K: Array,
757        V: FixedValue,
758        H: Hasher,
759        T: Translator,
760        const N: usize,
761    > DirtyStore for Db<E, K, V, H, T, N, Dirty>
762{
763    type Digest = H::Digest;
764    type Operation = Operation<K, V>;
765    type Clean = Db<E, K, V, H, T, N, Clean<DigestOf<H>>>;
766
767    async fn merkleize(self) -> Result<Self::Clean, Error> {
768        self.merkleize().await
769    }
770}
771
772impl<
773        E: RStorage + Clock + Metrics,
774        K: Array,
775        V: FixedValue,
776        H: Hasher,
777        T: Translator,
778        const N: usize,
779    > CleanAny for Db<E, K, V, H, T, N, Clean<DigestOf<H>>>
780{
781    type Key = K;
782
783    async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Error> {
784        self.get(key).await
785    }
786
787    async fn commit(&mut self, metadata: Option<Self::Value>) -> Result<Range<Location>, Error> {
788        self.commit(metadata).await
789    }
790
791    async fn sync(&mut self) -> Result<(), Error> {
792        self.sync().await
793    }
794
795    async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> {
796        self.prune(prune_loc).await
797    }
798
799    async fn close(self) -> Result<(), Error> {
800        self.close().await
801    }
802
803    async fn destroy(self) -> Result<(), Error> {
804        self.destroy().await
805    }
806}
807
808impl<
809        E: RStorage + Clock + Metrics,
810        K: Array,
811        V: FixedValue,
812        H: Hasher,
813        T: Translator,
814        const N: usize,
815    > DirtyAny for Db<E, K, V, H, T, N, Dirty>
816{
817    type Key = K;
818
819    async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Error> {
820        self.get(key).await
821    }
822
823    async fn update(&mut self, key: Self::Key, value: Self::Value) -> Result<(), Error> {
824        self.update(key, value).await
825    }
826
827    async fn create(&mut self, key: Self::Key, value: Self::Value) -> Result<bool, Error> {
828        self.create(key, value).await
829    }
830
831    async fn delete(&mut self, key: Self::Key) -> Result<bool, Error> {
832        self.delete(key).await
833    }
834}
835
836#[cfg(test)]
837pub mod test {
838    use super::*;
839    use crate::{
840        index::Unordered as _,
841        mmr::hasher::Hasher as _,
842        qmdb::{any::AnyExt, store::batch_tests},
843        translator::OneCap,
844    };
845    use commonware_cryptography::{sha256::Digest, Sha256};
846    use commonware_macros::test_traced;
847    use commonware_runtime::{buffer::PoolRef, deterministic, Runner as _};
848    use commonware_utils::{NZUsize, NZU64};
849    use rand::{rngs::StdRng, RngCore, SeedableRng};
850    use std::collections::HashMap;
851    use tracing::warn;
852
853    const PAGE_SIZE: usize = 88;
854    const PAGE_CACHE_SIZE: usize = 8;
855
856    fn current_db_config(partition_prefix: &str) -> Config<OneCap> {
857        Config {
858            mmr_journal_partition: format!("{partition_prefix}_journal_partition"),
859            mmr_metadata_partition: format!("{partition_prefix}_metadata_partition"),
860            mmr_items_per_blob: NZU64!(11),
861            mmr_write_buffer: NZUsize!(1024),
862            log_journal_partition: format!("{partition_prefix}_partition_prefix"),
863            log_items_per_blob: NZU64!(7),
864            log_write_buffer: NZUsize!(1024),
865            bitmap_metadata_partition: format!("{partition_prefix}_bitmap_metadata_partition"),
866            translator: OneCap,
867            thread_pool: None,
868            buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
869        }
870    }
871
872    /// A type alias for the concrete [Db] type used in these unit tests.
873    type CleanCurrentTest = Db<deterministic::Context, Digest, Digest, Sha256, OneCap, 32>;
874
875    /// A type alias for the Dirty variant of CurrentTest.
876    type DirtyCurrentTest = Db<deterministic::Context, Digest, Digest, Sha256, OneCap, 32, Dirty>;
877
878    /// Return an [Db] database initialized with a fixed config.
879    async fn open_db(context: deterministic::Context, partition_prefix: &str) -> CleanCurrentTest {
880        CleanCurrentTest::init(context, current_db_config(partition_prefix))
881            .await
882            .unwrap()
883    }
884
885    /// Build a small database, then close and reopen it and ensure state is preserved.
886    #[test_traced("DEBUG")]
887    pub fn test_current_db_build_small_close_reopen() {
888        let executor = deterministic::Runner::default();
889        executor.start(|context| async move {
890            let partition = "build_small";
891            let db = open_db(context.clone(), partition).await;
892            assert_eq!(db.op_count(), 1);
893            assert_eq!(db.inactivity_floor_loc(), Location::new_unchecked(0));
894            let root0 = db.root();
895            db.close().await.unwrap();
896            let db = open_db(context.clone(), partition).await;
897            assert_eq!(db.op_count(), 1);
898            assert!(db.get_metadata().await.unwrap().is_none());
899            assert_eq!(db.root(), root0);
900
901            // Add one key.
902            let k1 = Sha256::hash(&0u64.to_be_bytes());
903            let v1 = Sha256::hash(&10u64.to_be_bytes());
904            let mut db = db.into_dirty();
905            assert!(db.create(k1, v1).await.unwrap());
906            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
907            let mut db = db.merkleize().await.unwrap();
908            db.commit(None).await.unwrap();
909            assert_eq!(db.op_count(), 4); // 1 update, 1 commit, 1 move + 1 initial commit.
910            assert!(db.get_metadata().await.unwrap().is_none());
911            let root1 = db.root();
912            assert!(root1 != root0);
913            db.close().await.unwrap();
914            let db = open_db(context.clone(), partition).await;
915            assert_eq!(db.op_count(), 4);
916            assert_eq!(db.root(), root1);
917
918            // Create of same key should fail.
919            let mut db = db.into_dirty();
920            assert!(!db.create(k1, v1).await.unwrap());
921
922            // Delete that one key.
923            assert!(db.delete(k1).await.unwrap());
924
925            let metadata = Sha256::hash(&1u64.to_be_bytes());
926            let mut db = db.merkleize().await.unwrap();
927            db.commit(Some(metadata)).await.unwrap();
928            assert_eq!(db.op_count(), 6); // 1 update, 2 commits, 1 move, 1 delete.
929            assert_eq!(db.get_metadata().await.unwrap().unwrap(), metadata);
930            assert_eq!(db.inactivity_floor_loc(), Location::new_unchecked(5));
931            let root2 = db.root();
932            db.close().await.unwrap();
933            let db = open_db(context.clone(), partition).await;
934            assert_eq!(db.op_count(), 6);
935            assert_eq!(db.get_metadata().await.unwrap().unwrap(), metadata);
936            assert_eq!(db.inactivity_floor_loc(), Location::new_unchecked(5));
937            assert_eq!(db.root(), root2);
938
939            // Repeated delete of same key should fail.
940            let mut db = db.into_dirty();
941            assert!(!db.delete(k1).await.unwrap());
942            let db = db.merkleize().await.unwrap();
943
944            // Confirm all activity bits except the last are false.
945            for i in 0..*db.op_count() - 1 {
946                assert!(!db.status.get_bit(i));
947            }
948            assert!(db.status.get_bit(*db.op_count() - 1));
949
950            db.destroy().await.unwrap();
951        });
952    }
953
954    #[test_traced("WARN")]
955    fn test_current_db_build_big() {
956        let executor = deterministic::Runner::default();
957        // Build a db with 1000 keys, some of which we update and some of which we delete, and
958        // confirm that the end state of the db matches that of an identically updated hashmap.
959        const ELEMENTS: u64 = 1000;
960        executor.start(|context| async move {
961            let mut db = open_db(context.clone(), "build_big").await.into_dirty();
962
963            let mut map = HashMap::<Digest, Digest>::default();
964            for i in 0u64..ELEMENTS {
965                let k = Sha256::hash(&i.to_be_bytes());
966                let v = Sha256::hash(&(i * 1000).to_be_bytes());
967                db.update(k, v).await.unwrap();
968                map.insert(k, v);
969            }
970
971            // Update every 3rd key
972            for i in 0u64..ELEMENTS {
973                if i % 3 != 0 {
974                    continue;
975                }
976                let k = Sha256::hash(&i.to_be_bytes());
977                let v = Sha256::hash(&((i + 1) * 10000).to_be_bytes());
978                db.update(k, v).await.unwrap();
979                map.insert(k, v);
980            }
981
982            // Delete every 7th key
983            for i in 0u64..ELEMENTS {
984                if i % 7 != 1 {
985                    continue;
986                }
987                let k = Sha256::hash(&i.to_be_bytes());
988                db.delete(k).await.unwrap();
989                map.remove(&k);
990            }
991
992            assert_eq!(db.op_count(), 2620);
993            assert_eq!(db.inactivity_floor_loc(), Location::new_unchecked(0));
994            assert_eq!(db.any.snapshot.items(), 857);
995
996            // Test that commit + sync w/ pruning will raise the activity floor.
997            let mut db = db.merkleize().await.unwrap();
998            db.commit(None).await.unwrap();
999            db.sync().await.unwrap();
1000            db.prune(db.inactivity_floor_loc()).await.unwrap();
1001            assert_eq!(db.op_count(), 4241);
1002            assert_eq!(db.inactivity_floor_loc(), Location::new_unchecked(3383));
1003            assert_eq!(db.any.snapshot.items(), 857);
1004
1005            // Close & reopen the db, making sure the re-opened db has exactly the same state.
1006            let root = db.root();
1007            db.close().await.unwrap();
1008            let db = open_db(context.clone(), "build_big").await;
1009            assert_eq!(root, db.root());
1010            assert_eq!(db.op_count(), 4241);
1011            assert_eq!(db.inactivity_floor_loc(), Location::new_unchecked(3383));
1012            assert_eq!(db.any.snapshot.items(), 857);
1013
1014            // Confirm the db's state matches that of the separate map we computed independently.
1015            for i in 0u64..1000 {
1016                let k = Sha256::hash(&i.to_be_bytes());
1017                if let Some(map_value) = map.get(&k) {
1018                    let Some(db_value) = db.get(&k).await.unwrap() else {
1019                        panic!("key not found in db: {k}");
1020                    };
1021                    assert_eq!(*map_value, db_value);
1022                } else {
1023                    assert!(db.get(&k).await.unwrap().is_none());
1024                }
1025            }
1026        });
1027    }
1028
1029    /// Build a tiny database and make sure we can't convince the verifier that some old value of a
1030    /// key is active. We specifically test over the partial chunk case, since these bits are yet to
1031    /// be committed to the underlying MMR.
1032    #[test_traced("DEBUG")]
1033    pub fn test_current_db_verify_proof_over_bits_in_uncommitted_chunk() {
1034        let executor = deterministic::Runner::default();
1035        executor.start(|context| async move {
1036            let mut hasher = StandardHasher::<Sha256>::new();
1037            let partition = "build_small";
1038            let mut db = open_db(context.clone(), partition).await.into_dirty();
1039
1040            // Add one key.
1041            let k = Sha256::fill(0x01);
1042            let v1 = Sha256::fill(0xA1);
1043            db.update(k, v1).await.unwrap();
1044            let mut db = db.merkleize().await.unwrap();
1045            db.commit(None).await.unwrap();
1046
1047            let (_, op_loc) = db.any.get_with_loc(&k).await.unwrap().unwrap();
1048            let proof = db.key_value_proof(hasher.inner(), k).await.unwrap();
1049
1050            // Proof should be verifiable against current root.
1051            let root = db.root();
1052            assert!(CleanCurrentTest::verify_key_value_proof(
1053                hasher.inner(),
1054                k,
1055                v1,
1056                &proof,
1057                &root,
1058            ));
1059
1060            let v2 = Sha256::fill(0xA2);
1061            // Proof should not verify against a different value.
1062            assert!(!CleanCurrentTest::verify_key_value_proof(
1063                hasher.inner(),
1064                k,
1065                v2,
1066                &proof,
1067                &root,
1068            ));
1069            // Proof should not verify against a mangled next_key.
1070            let mut mangled_proof = proof.clone();
1071            mangled_proof.next_key = Sha256::fill(0xFF);
1072            assert!(!CleanCurrentTest::verify_key_value_proof(
1073                hasher.inner(),
1074                k,
1075                v1,
1076                &mangled_proof,
1077                &root,
1078            ));
1079
1080            // Update the key to a new value (v2), which inactivates the previous operation.
1081            let mut db = db.into_dirty();
1082            db.update(k, v2).await.unwrap();
1083            let mut db = db.merkleize().await.unwrap();
1084            db.commit(None).await.unwrap();
1085            let root = db.root();
1086
1087            // New value should not be verifiable against the old proof.
1088            assert!(!CleanCurrentTest::verify_key_value_proof(
1089                hasher.inner(),
1090                k,
1091                v2,
1092                &proof,
1093                &root,
1094            ));
1095
1096            // But the new value should verify against a new proof.
1097            let proof = db.key_value_proof(hasher.inner(), k).await.unwrap();
1098            assert!(CleanCurrentTest::verify_key_value_proof(
1099                hasher.inner(),
1100                k,
1101                v2,
1102                &proof,
1103                &root,
1104            ));
1105            // Old value will not verify against new proof.
1106            assert!(!CleanCurrentTest::verify_key_value_proof(
1107                hasher.inner(),
1108                k,
1109                v1,
1110                &proof,
1111                &root,
1112            ));
1113
1114            // Create a proof of the now-inactive update operation assigining v1 to k against the
1115            // current root.
1116            let (p, _, chunks) = db
1117                .range_proof(hasher.inner(), op_loc, NZU64!(1))
1118                .await
1119                .unwrap();
1120            let proof_inactive = KeyValueProof {
1121                proof: OperationProof {
1122                    loc: op_loc,
1123                    chunk: chunks[0],
1124                    range_proof: p,
1125                },
1126                next_key: k,
1127            };
1128            // This proof should verify using verify_range_proof which does not check activity
1129            // status.
1130            let op = Operation::Update(Update {
1131                key: k,
1132                value: v1,
1133                next_key: k,
1134            });
1135            assert!(CleanCurrentTest::verify_range_proof(
1136                hasher.inner(),
1137                &proof_inactive.proof.range_proof,
1138                proof_inactive.proof.loc,
1139                &[op],
1140                &[proof_inactive.proof.chunk],
1141                &root,
1142            ));
1143            // But this proof should *not* verify as a key value proof, since verification will see
1144            // that the operation is inactive.
1145            assert!(!CleanCurrentTest::verify_key_value_proof(
1146                hasher.inner(),
1147                k,
1148                v1,
1149                &proof_inactive,
1150                &root,
1151            ));
1152
1153            // Attempt #1 to "fool" the verifier:  change the location to that of an active
1154            // operation. This should not fool the verifier if we're properly validating the
1155            // inclusion of the operation itself, and not just the chunk.
1156            let (_, active_loc) = db.any.get_with_loc(&k).await.unwrap().unwrap();
1157            // The new location should differ but still be in the same chunk.
1158            assert_ne!(active_loc, proof_inactive.proof.loc);
1159            assert_eq!(
1160                CleanBitMap::<Digest, 32>::leaf_pos(*active_loc),
1161                CleanBitMap::<Digest, 32>::leaf_pos(*proof_inactive.proof.loc)
1162            );
1163            let mut fake_proof = proof_inactive.clone();
1164            fake_proof.proof.loc = active_loc;
1165            assert!(!CleanCurrentTest::verify_key_value_proof(
1166                hasher.inner(),
1167                k,
1168                v1,
1169                &fake_proof,
1170                &root,
1171            ));
1172
1173            // Attempt #2 to "fool" the verifier: Modify the chunk in the proof info to make it look
1174            // like the operation is active by flipping its corresponding bit to 1. This should not
1175            // fool the verifier if we are correctly incorporating the partial chunk information
1176            // into the root computation.
1177            let mut modified_chunk = proof_inactive.proof.chunk;
1178            let bit_pos = *proof_inactive.proof.loc;
1179            let byte_idx = bit_pos / 8;
1180            let bit_idx = bit_pos % 8;
1181            modified_chunk[byte_idx as usize] |= 1 << bit_idx;
1182
1183            let mut fake_proof = proof_inactive.clone();
1184            fake_proof.proof.chunk = modified_chunk;
1185            assert!(!CleanCurrentTest::verify_key_value_proof(
1186                hasher.inner(),
1187                k,
1188                v1,
1189                &fake_proof,
1190                &root,
1191            ));
1192
1193            db.destroy().await.unwrap();
1194        });
1195    }
1196
1197    /// Apply random operations to the given db, committing them (randomly & at the end) only if
1198    /// `commit_changes` is true.
1199    async fn apply_random_ops(
1200        num_elements: u64,
1201        commit_changes: bool,
1202        rng_seed: u64,
1203        mut db: DirtyCurrentTest,
1204    ) -> Result<CleanCurrentTest, Error> {
1205        // Log the seed with high visibility to make failures reproducible.
1206        warn!("rng_seed={}", rng_seed);
1207        let mut rng = StdRng::seed_from_u64(rng_seed);
1208
1209        for i in 0u64..num_elements {
1210            let k = Sha256::hash(&i.to_be_bytes());
1211            let v = Sha256::hash(&rng.next_u32().to_be_bytes());
1212            db.update(k, v).await.unwrap();
1213        }
1214
1215        // Randomly update / delete them. We use a delete frequency that is 1/7th of the update
1216        // frequency.
1217        for _ in 0u64..num_elements * 10 {
1218            let rand_key = Sha256::hash(&(rng.next_u64() % num_elements).to_be_bytes());
1219            if rng.next_u32() % 7 == 0 {
1220                db.delete(rand_key).await.unwrap();
1221                continue;
1222            }
1223            let v = Sha256::hash(&rng.next_u32().to_be_bytes());
1224            db.update(rand_key, v).await.unwrap();
1225            if commit_changes && rng.next_u32() % 20 == 0 {
1226                // Commit every ~20 updates.
1227                let mut clean_db = db.merkleize().await?;
1228                clean_db.commit(None).await?;
1229                db = clean_db.into_dirty();
1230            }
1231        }
1232        if commit_changes {
1233            let mut clean_db = db.merkleize().await?;
1234            clean_db.commit(None).await?;
1235            Ok(clean_db)
1236        } else {
1237            db.merkleize().await
1238        }
1239    }
1240
1241    #[test_traced("DEBUG")]
1242    pub fn test_current_db_range_proofs() {
1243        let executor = deterministic::Runner::default();
1244        executor.start(|mut context| async move {
1245            let partition = "range_proofs";
1246            let mut hasher = StandardHasher::<Sha256>::new();
1247            let db = open_db(context.clone(), partition).await.into_dirty();
1248            let db = apply_random_ops(200, true, context.next_u64(), db)
1249                .await
1250                .unwrap();
1251            let root = db.root();
1252
1253            // Make sure size-constrained batches of operations are provable from the oldest
1254            // retained op to tip.
1255            let max_ops = 4;
1256            let end_loc = db.op_count();
1257            let start_loc = db.any.inactivity_floor_loc();
1258
1259            for loc in *start_loc..*end_loc {
1260                let loc = Location::new_unchecked(loc);
1261                let (proof, ops, chunks) = db
1262                    .range_proof(hasher.inner(), loc, NZU64!(max_ops))
1263                    .await
1264                    .unwrap();
1265                assert!(
1266                    CleanCurrentTest::verify_range_proof(
1267                        hasher.inner(),
1268                        &proof,
1269                        loc,
1270                        &ops,
1271                        &chunks,
1272                        &root
1273                    ),
1274                    "failed to verify range at start_loc {start_loc}",
1275                );
1276            }
1277
1278            db.destroy().await.unwrap();
1279        });
1280    }
1281
1282    #[test_traced("DEBUG")]
1283    pub fn test_current_db_key_value_proof() {
1284        let executor = deterministic::Runner::default();
1285        executor.start(|mut context| async move {
1286            let partition = "range_proofs";
1287            let mut hasher = StandardHasher::<Sha256>::new();
1288            let db = open_db(context.clone(), partition).await.into_dirty();
1289            let db = apply_random_ops(500, true, context.next_u64(), db)
1290                .await
1291                .unwrap();
1292            let root = db.root();
1293
1294            // Confirm bad keys produce the expected error.
1295            let bad_key = Sha256::fill(0xAA);
1296            let res = db.key_value_proof(hasher.inner(), bad_key).await;
1297            assert!(matches!(res, Err(Error::KeyNotFound)));
1298
1299            let start = *db.inactivity_floor_loc();
1300            for i in start..db.status.len() {
1301                if !db.status.get_bit(i) {
1302                    continue;
1303                }
1304                // Found an active operation! Create a proof for its active current key/value if
1305                // it's a key-updating operation.
1306                let op = db.any.log.read(Location::new_unchecked(i)).await.unwrap();
1307                let (key, value) = match op {
1308                    Operation::Update(key_data) => (key_data.key, key_data.value),
1309                    Operation::CommitFloor(_, _) => continue,
1310                    _ => unreachable!("expected update or commit floor operation"),
1311                };
1312                let proof = db.key_value_proof(hasher.inner(), key).await.unwrap();
1313
1314                // Proof should validate against the current value and correct root.
1315                assert!(CleanCurrentTest::verify_key_value_proof(
1316                    hasher.inner(),
1317                    key,
1318                    value,
1319                    &proof,
1320                    &root
1321                ));
1322                // Proof should fail against the wrong value.
1323                let wrong_val = Sha256::fill(0xFF);
1324                assert!(!CleanCurrentTest::verify_key_value_proof(
1325                    hasher.inner(),
1326                    key,
1327                    wrong_val,
1328                    &proof,
1329                    &root
1330                ));
1331                // Proof should fail against the wrong key.
1332                let wrong_key = Sha256::fill(0xEE);
1333                assert!(!CleanCurrentTest::verify_key_value_proof(
1334                    hasher.inner(),
1335                    wrong_key,
1336                    value,
1337                    &proof,
1338                    &root
1339                ));
1340                // Proof should fail against the wrong root.
1341                let wrong_root = Sha256::fill(0xDD);
1342                assert!(!CleanCurrentTest::verify_key_value_proof(
1343                    hasher.inner(),
1344                    key,
1345                    value,
1346                    &proof,
1347                    &wrong_root,
1348                ));
1349                // Proof should fail with the wrong next-key.
1350                let mut bad_proof = proof.clone();
1351                bad_proof.next_key = wrong_key;
1352                assert!(!CleanCurrentTest::verify_key_value_proof(
1353                    hasher.inner(),
1354                    key,
1355                    value,
1356                    &bad_proof,
1357                    &root,
1358                ));
1359            }
1360
1361            db.destroy().await.unwrap();
1362        });
1363    }
1364
1365    /// This test builds a random database, and makes sure that its state is correctly restored
1366    /// after closing and re-opening.
1367    #[test_traced("WARN")]
1368    pub fn test_current_db_build_random_close_reopen() {
1369        // Number of elements to initially insert into the db.
1370        const ELEMENTS: u64 = 1000;
1371
1372        let executor = deterministic::Runner::default();
1373        executor.start(|mut context| async move {
1374            let partition = "build_random";
1375            let rng_seed = context.next_u64();
1376            let db = open_db(context.clone(), partition).await.into_dirty();
1377            let db = apply_random_ops(ELEMENTS, true, rng_seed, db)
1378                .await
1379                .unwrap();
1380
1381            // Close the db, then replay its operations with a bitmap.
1382            let root = db.root();
1383            // Create a bitmap based on the current db's pruned/inactive state.
1384            db.close().await.unwrap();
1385
1386            let db = open_db(context, partition).await;
1387            assert_eq!(db.root(), root);
1388
1389            db.destroy().await.unwrap();
1390        });
1391    }
1392
1393    /// Repeatedly update the same key to a new value and ensure we can prove its current value
1394    /// after each update.
1395    #[test_traced("WARN")]
1396    pub fn test_current_db_proving_repeated_updates() {
1397        let executor = deterministic::Runner::default();
1398        executor.start(|context| async move {
1399            let mut hasher = StandardHasher::<Sha256>::new();
1400            let partition = "build_small";
1401            let mut db = open_db(context.clone(), partition).await;
1402
1403            // Add one key.
1404            let k = Sha256::fill(0x00);
1405            let mut old_val = Sha256::fill(0x00);
1406            for i in 1u8..=255 {
1407                let v = Sha256::fill(i);
1408                let mut dirty_db = db.into_dirty();
1409                dirty_db.update(k, v).await.unwrap();
1410                assert_eq!(dirty_db.get(&k).await.unwrap().unwrap(), v);
1411                db = dirty_db.merkleize().await.unwrap();
1412                db.commit(None).await.unwrap();
1413                let root = db.root();
1414
1415                // Create a proof for the current value of k.
1416                let proof = db.key_value_proof(hasher.inner(), k).await.unwrap();
1417                assert!(
1418                    CleanCurrentTest::verify_key_value_proof(hasher.inner(), k, v, &proof, &root),
1419                    "proof of update {i} failed to verify"
1420                );
1421                // Ensure the proof does NOT verify if we use the previous value.
1422                assert!(
1423                    !CleanCurrentTest::verify_key_value_proof(
1424                        hasher.inner(),
1425                        k,
1426                        old_val,
1427                        &proof,
1428                        &root
1429                    ),
1430                    "proof of update {i} verified when it should not have"
1431                );
1432                old_val = v;
1433            }
1434
1435            db.destroy().await.unwrap();
1436        });
1437    }
1438
1439    /// This test builds a random database and simulates we can recover from different types of
1440    /// failure scenarios.
1441    #[test_traced("WARN")]
1442    pub fn test_current_db_simulate_write_failures() {
1443        // Number of elements to initially insert into the db.
1444        const ELEMENTS: u64 = 1000;
1445
1446        let executor = deterministic::Runner::default();
1447        executor.start(|mut context| async move {
1448            let partition = "build_random_fail_commit";
1449            let rng_seed = context.next_u64();
1450            let db = open_db(context.clone(), partition).await.into_dirty();
1451            let mut db = apply_random_ops(ELEMENTS, true, rng_seed, db)
1452                .await
1453                .unwrap();
1454            let committed_root = db.root();
1455            let committed_op_count = db.op_count();
1456            let committed_inactivity_floor = db.any.inactivity_floor_loc();
1457            db.prune(committed_inactivity_floor).await.unwrap();
1458
1459            // Perform more random operations without committing any of them.
1460            let db = apply_random_ops(ELEMENTS, false, rng_seed + 1, db.into_dirty())
1461                .await
1462                .unwrap();
1463
1464            // SCENARIO #1: Simulate a crash that happens before any writes. Upon reopening, the
1465            // state of the DB should be as of the last commit.
1466            db.simulate_commit_failure_before_any_writes();
1467            let db = open_db(context.clone(), partition).await;
1468            assert_eq!(db.root(), committed_root);
1469            assert_eq!(db.op_count(), committed_op_count);
1470
1471            // Re-apply the exact same uncommitted operations.
1472            let db = apply_random_ops(ELEMENTS, false, rng_seed + 1, db.into_dirty())
1473                .await
1474                .unwrap();
1475
1476            // SCENARIO #2: Simulate a crash that happens after the any db has been committed, but
1477            // before the state of the pruned bitmap can be written to disk.
1478            db.simulate_commit_failure_after_any_db_commit()
1479                .await
1480                .unwrap();
1481
1482            // We should be able to recover, so the root should differ from the previous commit, and
1483            // the op count should be greater than before.
1484            let db = open_db(context.clone(), partition).await;
1485            let scenario_2_root = db.root();
1486
1487            // To confirm the second committed hash is correct we'll re-build the DB in a new
1488            // partition, but without any failures. They should have the exact same state.
1489            let fresh_partition = "build_random_fail_commit_fresh";
1490            let db = open_db(context.clone(), fresh_partition).await.into_dirty();
1491            let db = apply_random_ops(ELEMENTS, true, rng_seed, db)
1492                .await
1493                .unwrap();
1494            let db = apply_random_ops(ELEMENTS, false, rng_seed + 1, db.into_dirty())
1495                .await
1496                .unwrap();
1497            let mut db = db.into_dirty().merkleize().await.unwrap();
1498            db.commit(None).await.unwrap();
1499            db.prune(db.any.inactivity_floor_loc()).await.unwrap();
1500            // State from scenario #2 should match that of a successful commit.
1501            assert_eq!(db.root(), scenario_2_root);
1502
1503            db.destroy().await.unwrap();
1504        });
1505    }
1506
1507    #[test_traced("WARN")]
1508    pub fn test_current_db_different_pruning_delays_same_root() {
1509        let executor = deterministic::Runner::default();
1510        executor.start(|context| async move {
1511            // Create two databases that are identical other than how they are pruned.
1512            let db_config_no_pruning = current_db_config("no_pruning_test");
1513
1514            let db_config_pruning = current_db_config("pruning_test");
1515
1516            let mut db_no_pruning =
1517                CleanCurrentTest::init(context.clone(), db_config_no_pruning.clone())
1518                    .await
1519                    .unwrap()
1520                    .into_dirty();
1521            let mut db_pruning = CleanCurrentTest::init(context.clone(), db_config_pruning.clone())
1522                .await
1523                .unwrap()
1524                .into_dirty();
1525
1526            // Apply identical operations to both databases, but only prune one.
1527            const NUM_OPERATIONS: u64 = 1000;
1528            for i in 0..NUM_OPERATIONS {
1529                let key = Sha256::hash(&i.to_be_bytes());
1530                let value = Sha256::hash(&(i * 1000).to_be_bytes());
1531
1532                db_no_pruning.update(key, value).await.unwrap();
1533                db_pruning.update(key, value).await.unwrap();
1534
1535                // Commit periodically
1536                if i % 50 == 49 {
1537                    let mut clean_no_pruning = db_no_pruning.merkleize().await.unwrap();
1538                    clean_no_pruning.commit(None).await.unwrap();
1539                    let mut clean_pruning = db_pruning.merkleize().await.unwrap();
1540                    clean_pruning.commit(None).await.unwrap();
1541                    clean_pruning
1542                        .prune(clean_no_pruning.any.inactivity_floor_loc())
1543                        .await
1544                        .unwrap();
1545                    db_no_pruning = clean_no_pruning.into_dirty();
1546                    db_pruning = clean_pruning.into_dirty();
1547                }
1548            }
1549
1550            // Final commit
1551            let mut db_no_pruning = db_no_pruning.merkleize().await.unwrap();
1552            db_no_pruning.commit(None).await.unwrap();
1553            let mut db_pruning = db_pruning.merkleize().await.unwrap();
1554            db_pruning.commit(None).await.unwrap();
1555
1556            // Get roots from both databases
1557            let root_no_pruning = db_no_pruning.root();
1558            let root_pruning = db_pruning.root();
1559
1560            // Verify they generate the same roots
1561            assert_eq!(root_no_pruning, root_pruning);
1562
1563            // Close both databases
1564            db_no_pruning.close().await.unwrap();
1565            db_pruning.close().await.unwrap();
1566
1567            // Restart both databases
1568            let db_no_pruning = CleanCurrentTest::init(context.clone(), db_config_no_pruning)
1569                .await
1570                .unwrap();
1571            let db_pruning = CleanCurrentTest::init(context.clone(), db_config_pruning)
1572                .await
1573                .unwrap();
1574            assert_eq!(
1575                db_no_pruning.inactivity_floor_loc(),
1576                db_pruning.inactivity_floor_loc()
1577            );
1578
1579            // Get roots after restart
1580            let root_no_pruning_restart = db_no_pruning.root();
1581            let root_pruning_restart = db_pruning.root();
1582
1583            // Ensure roots still match after restart
1584            assert_eq!(root_no_pruning, root_no_pruning_restart);
1585            assert_eq!(root_pruning, root_pruning_restart);
1586
1587            db_no_pruning.destroy().await.unwrap();
1588            db_pruning.destroy().await.unwrap();
1589        });
1590    }
1591
1592    /// Build a tiny database and confirm exclusion proofs work as expected.
1593    #[test_traced("DEBUG")]
1594    pub fn test_current_db_exclusion_proofs() {
1595        let executor = deterministic::Runner::default();
1596        executor.start(|context| async move {
1597            let mut hasher = StandardHasher::<Sha256>::new();
1598            let partition = "exclusion_proofs";
1599            let db = open_db(context.clone(), partition).await;
1600
1601            let key_exists_1 = Sha256::fill(0x10);
1602
1603            // We should be able to prove exclusion for any key against an empty db.
1604            let empty_root = db.root();
1605            let empty_proof = db
1606                .exclusion_proof(hasher.inner(), &key_exists_1)
1607                .await
1608                .unwrap();
1609            assert!(CleanCurrentTest::verify_exclusion_proof(
1610                hasher.inner(),
1611                &key_exists_1,
1612                &empty_proof,
1613                &empty_root,
1614            ));
1615
1616            // Add `key_exists_1` and test exclusion proving over the single-key database case.
1617            let v1 = Sha256::fill(0xA1);
1618            let mut db = db.into_dirty();
1619            db.update(key_exists_1, v1).await.unwrap();
1620            let mut db = db.merkleize().await.unwrap();
1621            db.commit(None).await.unwrap();
1622            let root = db.root();
1623
1624            // We shouldn't be able to generate an exclusion proof for a key already in the db.
1625            let result = db.exclusion_proof(hasher.inner(), &key_exists_1).await;
1626            assert!(matches!(result, Err(Error::KeyExists)));
1627
1628            // Generate some valid exclusion proofs for keys on either side.
1629            let greater_key = Sha256::fill(0xFF);
1630            let lesser_key = Sha256::fill(0x00);
1631            let proof = db
1632                .exclusion_proof(hasher.inner(), &greater_key)
1633                .await
1634                .unwrap();
1635            let proof2 = db
1636                .exclusion_proof(hasher.inner(), &lesser_key)
1637                .await
1638                .unwrap();
1639
1640            // Since there's only one span in the DB, the two exclusion proofs should be identical,
1641            // and the proof should verify any key but the one that exists in the db.
1642            assert_eq!(proof, proof2);
1643            // Any key except the one that exists should verify against this proof.
1644            assert!(CleanCurrentTest::verify_exclusion_proof(
1645                hasher.inner(),
1646                &greater_key,
1647                &proof,
1648                &root,
1649            ));
1650            assert!(CleanCurrentTest::verify_exclusion_proof(
1651                hasher.inner(),
1652                &lesser_key,
1653                &proof,
1654                &root,
1655            ));
1656            // Exclusion should fail if we test it on a key that exists.
1657            assert!(!CleanCurrentTest::verify_exclusion_proof(
1658                hasher.inner(),
1659                &key_exists_1,
1660                &proof,
1661                &root,
1662            ));
1663
1664            // Add a second key and test exclusion proving over the two-key database case.
1665            let key_exists_2 = Sha256::fill(0x30);
1666            let v2 = Sha256::fill(0xB2);
1667
1668            let mut db = db.into_dirty();
1669            db.update(key_exists_2, v2).await.unwrap();
1670            let mut db = db.merkleize().await.unwrap();
1671            db.commit(None).await.unwrap();
1672            let root = db.root();
1673
1674            // Use a lesser/greater key that has a translated-key conflict based
1675            // on our use of OneCap translator.
1676            let lesser_key = Sha256::fill(0x0F); // < k1=0x10
1677            let greater_key = Sha256::fill(0x31); // > k2=0x30
1678            let middle_key = Sha256::fill(0x20); // between k1=0x10 and k2=0x30
1679            let proof = db
1680                .exclusion_proof(hasher.inner(), &greater_key)
1681                .await
1682                .unwrap();
1683            // Test the "cycle around" span. This should prove exclusion of greater_key & lesser
1684            // key, but fail on middle_key.
1685            assert!(CleanCurrentTest::verify_exclusion_proof(
1686                hasher.inner(),
1687                &greater_key,
1688                &proof,
1689                &root,
1690            ));
1691            assert!(CleanCurrentTest::verify_exclusion_proof(
1692                hasher.inner(),
1693                &lesser_key,
1694                &proof,
1695                &root,
1696            ));
1697            assert!(!CleanCurrentTest::verify_exclusion_proof(
1698                hasher.inner(),
1699                &middle_key,
1700                &proof,
1701                &root,
1702            ));
1703
1704            // Due to the cycle, lesser & greater keys should produce the same proof.
1705            let new_proof = db
1706                .exclusion_proof(hasher.inner(), &lesser_key)
1707                .await
1708                .unwrap();
1709            assert_eq!(proof, new_proof);
1710
1711            // Test the inner span [k, k2).
1712            let proof = db
1713                .exclusion_proof(hasher.inner(), &middle_key)
1714                .await
1715                .unwrap();
1716            // `k` should fail since it's in the db.
1717            assert!(!CleanCurrentTest::verify_exclusion_proof(
1718                hasher.inner(),
1719                &key_exists_1,
1720                &proof,
1721                &root,
1722            ));
1723            // `middle_key` should succeed since it's in range.
1724            assert!(CleanCurrentTest::verify_exclusion_proof(
1725                hasher.inner(),
1726                &middle_key,
1727                &proof,
1728                &root,
1729            ));
1730            assert!(!CleanCurrentTest::verify_exclusion_proof(
1731                hasher.inner(),
1732                &key_exists_2,
1733                &proof,
1734                &root,
1735            ));
1736
1737            let conflicting_middle_key = Sha256::fill(0x11); // between k1=0x10 and k2=0x30
1738            assert!(CleanCurrentTest::verify_exclusion_proof(
1739                hasher.inner(),
1740                &conflicting_middle_key,
1741                &proof,
1742                &root,
1743            ));
1744
1745            // Using lesser/greater keys for the middle-proof should fail.
1746            assert!(!CleanCurrentTest::verify_exclusion_proof(
1747                hasher.inner(),
1748                &greater_key,
1749                &proof,
1750                &root,
1751            ));
1752            assert!(!CleanCurrentTest::verify_exclusion_proof(
1753                hasher.inner(),
1754                &lesser_key,
1755                &proof,
1756                &root,
1757            ));
1758
1759            // Make the DB empty again by deleting the keys and check the empty case
1760            // again.
1761            let mut db = db.into_dirty();
1762            db.delete(key_exists_1).await.unwrap();
1763            db.delete(key_exists_2).await.unwrap();
1764            let mut db = db.merkleize().await.unwrap();
1765            db.sync().await.unwrap();
1766            db.commit(None).await.unwrap();
1767            let root = db.root();
1768            // This root should be different than the empty root from earlier since the DB now has a
1769            // non-zero number of operations.
1770            assert!(db.is_empty());
1771            assert_ne!(db.op_count(), 0);
1772            assert_ne!(root, empty_root);
1773
1774            let proof = db
1775                .exclusion_proof(hasher.inner(), &key_exists_1)
1776                .await
1777                .unwrap();
1778            assert!(CleanCurrentTest::verify_exclusion_proof(
1779                hasher.inner(),
1780                &key_exists_1,
1781                &proof,
1782                &root,
1783            ));
1784            assert!(CleanCurrentTest::verify_exclusion_proof(
1785                hasher.inner(),
1786                &key_exists_2,
1787                &proof,
1788                &root,
1789            ));
1790
1791            // Try fooling the verifier with improper values.
1792            assert!(!CleanCurrentTest::verify_exclusion_proof(
1793                hasher.inner(),
1794                &key_exists_1,
1795                &empty_proof, // wrong proof
1796                &root,
1797            ));
1798            assert!(!CleanCurrentTest::verify_exclusion_proof(
1799                hasher.inner(),
1800                &key_exists_1,
1801                &proof,
1802                &empty_root, // wrong root
1803            ));
1804        });
1805    }
1806
1807    #[test_traced("DEBUG")]
1808    fn test_batch() {
1809        batch_tests::test_batch(|mut ctx| async move {
1810            let seed = ctx.next_u64();
1811            let partition = format!("current_ordered_batch_{seed}");
1812            AnyExt::new(open_db(ctx, &partition).await)
1813        });
1814    }
1815}