commonware_storage/adb/
current.rs

1//! An authenticated database (ADB) that provides succinct proofs of _any_ value ever associated
2//! with a key, and also whether that value is the _current_ value associated with it. Its
3//! implementation is based on an [Any] authenticated database combined with an authenticated
4//! [Bitmap] over the activity status of each operation. The two structures are "grafted" together
5//! to minimize proof sizes.
6
7use crate::{
8    adb::{
9        any::fixed::{Any, Config as AConfig},
10        Error,
11    },
12    index::Index,
13    mmr::{
14        bitmap::Bitmap,
15        hasher::{Grafting, GraftingVerifier, Hasher, Standard},
16        iterator::{leaf_num_to_pos, leaf_pos_to_num},
17        storage::Grafting as GStorage,
18        verification::Proof,
19    },
20    store::operation::Fixed,
21    translator::Translator,
22};
23use commonware_codec::{CodecFixed, Encode as _, FixedSize};
24use commonware_cryptography::Hasher as CHasher;
25use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage as RStorage, ThreadPool};
26use commonware_utils::Array;
27use futures::{future::try_join_all, try_join};
28use std::num::{NonZeroU64, NonZeroUsize};
29use tracing::debug;
30
31/// Configuration for a [Current] authenticated db.
32#[derive(Clone)]
33pub struct Config<T: Translator> {
34    /// The name of the [RStorage] partition used for the MMR's backing journal.
35    pub mmr_journal_partition: String,
36
37    /// The items per blob configuration value used by the MMR journal.
38    pub mmr_items_per_blob: NonZeroU64,
39
40    /// The size of the write buffer to use for each blob in the MMR journal.
41    pub mmr_write_buffer: NonZeroUsize,
42
43    /// The name of the [RStorage] partition used for the MMR's metadata.
44    pub mmr_metadata_partition: String,
45
46    /// The name of the [RStorage] partition used to persist the (pruned) log of operations.
47    pub log_journal_partition: String,
48
49    /// The items per blob configuration value used by the log journal.
50    pub log_items_per_blob: NonZeroU64,
51
52    /// The size of the write buffer to use for each blob in the log journal.
53    pub log_write_buffer: NonZeroUsize,
54
55    /// The name of the [RStorage] partition used for the bitmap metadata.
56    pub bitmap_metadata_partition: String,
57
58    /// The translator used by the compressed index.
59    pub translator: T,
60
61    /// An optional thread pool to use for parallelizing batch operations.
62    pub thread_pool: Option<ThreadPool>,
63
64    /// The buffer pool to use for caching data.
65    pub buffer_pool: PoolRef,
66}
67
68/// A key-value ADB based on an MMR over its log of operations, supporting authentication of whether
69/// a key ever had a specific value, and whether the key currently has that value.
70///
71/// Note: The generic parameter N is not really generic, and must be manually set to double the size
72/// of the hash digest being produced by the hasher. A compile-time assertion is used to prevent any
73/// other setting.
74pub struct Current<
75    E: RStorage + Clock + Metrics,
76    K: Array,
77    V: CodecFixed<Cfg = ()> + Clone,
78    H: CHasher,
79    T: Translator,
80    const N: usize,
81> {
82    /// An [Any] authenticated database that provides the ability to prove whether a key ever had a
83    /// specific value.
84    pub any: Any<E, K, V, H, T>,
85
86    /// The bitmap over the activity status of each operation. Supports augmenting [Any] proofs in
87    /// order to further prove whether a key _currently_ has a specific value.
88    pub status: Bitmap<H, N>,
89
90    context: E,
91
92    bitmap_metadata_partition: String,
93}
94
95/// The information required to verify a key value proof.
96#[derive(Clone)]
97pub struct KeyValueProofInfo<K, V, const N: usize> {
98    /// The key whose value is being proven.
99    pub key: K,
100
101    /// The value of the key.
102    pub value: V,
103
104    /// The location of the operation that assigned this value to the key.
105    pub loc: u64,
106
107    /// The status bitmap chunk that contains the bit corresponding the operation's location.
108    pub chunk: [u8; N],
109}
110
111impl<
112        E: RStorage + Clock + Metrics,
113        K: Array,
114        V: CodecFixed<Cfg = ()> + Clone,
115        H: CHasher,
116        T: Translator,
117        const N: usize,
118    > Current<E, K, V, H, T, N>
119{
120    // A compile-time assertion that the chunk size is some multiple of digest size. A multiple of 1 is optimal with
121    // respect to proof size, but a higher multiple allows for a smaller (RAM resident) merkle tree over the structure.
122    const _CHUNK_SIZE_ASSERT: () = assert!(
123        N.is_multiple_of(H::Digest::SIZE),
124        "chunk size must be some multiple of the digest size",
125    );
126
127    // A compile-time assertion that chunk size is a power of 2, which is necessary to allow the status bitmap tree to
128    // be aligned with the underlying operations MMR.
129    const _CHUNK_SIZE_IS_POW_OF_2_ASSERT: () =
130        assert!(N.is_power_of_two(), "chunk size must be a power of 2");
131
132    /// Initializes a [Current] authenticated database from the given `config`. Leverages parallel
133    /// Merkleization to initialize the bitmap MMR if a thread pool is provided.
134    pub async fn init(context: E, config: Config<T>) -> Result<Self, Error> {
135        // Initialize the MMR journal and metadata.
136        let cfg = AConfig {
137            mmr_journal_partition: config.mmr_journal_partition,
138            mmr_metadata_partition: config.mmr_metadata_partition,
139            mmr_items_per_blob: config.mmr_items_per_blob,
140            mmr_write_buffer: config.mmr_write_buffer,
141            log_journal_partition: config.log_journal_partition,
142            log_items_per_blob: config.log_items_per_blob,
143            log_write_buffer: config.log_write_buffer,
144            translator: config.translator.clone(),
145            thread_pool: config.thread_pool,
146            buffer_pool: config.buffer_pool,
147        };
148
149        let context = context.with_label("adb::current");
150        let cloned_pool = cfg.thread_pool.clone();
151        let mut status = Bitmap::restore_pruned(
152            context.with_label("bitmap"),
153            &config.bitmap_metadata_partition,
154            cloned_pool,
155        )
156        .await?;
157
158        // Initialize the db's mmr/log.
159        let mut hasher = Standard::<H>::new();
160        let (inactivity_floor_loc, mmr, log) =
161            Any::<_, _, _, _, T>::init_mmr_and_log(context.clone(), cfg, &mut hasher).await?;
162
163        // Ensure consistency between the bitmap and the db.
164        let mut grafter = Grafting::new(&mut hasher, Self::grafting_height());
165        if status.bit_count() < inactivity_floor_loc {
166            // Prepend the missing (inactive) bits needed to align the bitmap, which can only be
167            // pruned to a chunk boundary.
168            while status.bit_count() < inactivity_floor_loc {
169                status.append(false);
170            }
171
172            // Load the digests of the grafting destination nodes from `mmr` into the grafting
173            // hasher so the new leaf digests can be computed during sync.
174            grafter
175                .load_grafted_digests(&status.dirty_chunks(), &mmr)
176                .await?;
177            status.sync(&mut grafter).await?;
178        }
179
180        // Replay the log to generate the snapshot & populate the retained portion of the bitmap.
181        let mut snapshot = Index::init(context.with_label("snapshot"), config.translator);
182        Any::build_snapshot_from_log(inactivity_floor_loc, &log, &mut snapshot, Some(&mut status))
183            .await
184            .unwrap();
185        grafter
186            .load_grafted_digests(&status.dirty_chunks(), &mmr)
187            .await?;
188        status.sync(&mut grafter).await?;
189
190        let any = Any {
191            mmr,
192            log,
193            snapshot,
194            inactivity_floor_loc,
195            uncommitted_ops: 0,
196            hasher: Standard::<H>::new(),
197        };
198
199        Ok(Self {
200            any,
201            status,
202            context,
203            bitmap_metadata_partition: config.bitmap_metadata_partition,
204        })
205    }
206
207    /// Get the number of operations that have been applied to this db, including those that are not
208    /// yet committed.
209    pub fn op_count(&self) -> u64 {
210        self.any.op_count()
211    }
212
213    /// Return the inactivity floor location. Locations prior to this point can be safely pruned.
214    pub fn inactivity_floor_loc(&self) -> u64 {
215        self.any.inactivity_floor_loc()
216    }
217
218    /// Get the value of `key` in the db, or None if it has no value.
219    pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
220        self.any.get(key).await
221    }
222
223    /// Get the value of the operation with location `loc` in the db. Returns
224    /// [Error::OperationPruned] if loc precedes the oldest retained location. The location is
225    /// otherwise assumed valid.
226    pub async fn get_loc(&self, loc: u64) -> Result<Option<V>, Error> {
227        self.any.get_loc(loc).await
228    }
229
230    /// Get the level of the base MMR into which we are grafting.
231    ///
232    /// This value is log2 of the chunk size in bits. Since we assume the chunk size is a power of
233    /// 2, we compute this from trailing_zeros.
234    const fn grafting_height() -> u32 {
235        Bitmap::<H, N>::CHUNK_SIZE_BITS.trailing_zeros()
236    }
237
238    /// Updates `key` to have value `value`. If the key already has this same value, then this is a
239    /// no-op. The operation is reflected in the snapshot, but will be subject to rollback until the
240    /// next successful `commit`.
241    pub async fn update(&mut self, key: K, value: V) -> Result<(), Error> {
242        let update_result = self.any.update_return_loc(key, value).await?;
243        if let Some(old_loc) = update_result {
244            self.status.set_bit(old_loc, false);
245        }
246        self.status.append(true);
247
248        Ok(())
249    }
250
251    /// Delete `key` and its value from the db. Deleting a key that already has no value is a no-op.
252    /// The operation is reflected in the snapshot, but will be subject to rollback until the next
253    /// successful `commit`.
254    pub async fn delete(&mut self, key: K) -> Result<(), Error> {
255        let Some(old_loc) = self.any.delete(key).await? else {
256            return Ok(());
257        };
258
259        self.status.append(false);
260        self.status.set_bit(old_loc, false);
261
262        Ok(())
263    }
264
265    /// Commit pending operations to the adb::any and sync it to disk. Leverages parallel
266    /// Merkleization of the any-db if a thread pool is provided.
267    async fn commit_ops(&mut self) -> Result<(), Error> {
268        // Raise the inactivity floor by the # of uncommitted operations, plus 1 to account for the
269        // commit op that will be appended.
270        self.raise_inactivity_floor(self.any.uncommitted_ops + 1)
271            .await?;
272
273        // Sync the log and process the updates to the MMR in parallel.
274        let log_fut = async { self.any.log.sync().await.map_err(Error::Journal) };
275        let mmr_fut = async {
276            self.any.mmr.process_updates(&mut self.any.hasher);
277            Ok::<(), Error>(())
278        };
279        try_join!(log_fut, mmr_fut)?;
280        self.any.uncommitted_ops = 0;
281
282        self.any.sync().await
283    }
284
285    /// Raise the inactivity floor by exactly `max_steps` steps, followed by applying a commit
286    /// operation. Each step either advances over an inactive operation, or re-applies an active
287    /// operation to the tip and then advances over it. An active bit will be added to the status
288    /// bitmap for any moved operation, with its old location in the bitmap flipped to false.
289    ///
290    /// This method does not change the state of the db's snapshot, but it always changes the root
291    /// since it applies at least one operation.
292    async fn raise_inactivity_floor(&mut self, max_steps: u64) -> Result<(), Error> {
293        for _ in 0..max_steps {
294            if self.any.inactivity_floor_loc == self.op_count() {
295                break;
296            }
297            let op = self.any.log.read(self.any.inactivity_floor_loc).await?;
298            let old_loc = self
299                .any
300                .move_op_if_active(op, self.any.inactivity_floor_loc)
301                .await?;
302            if let Some(old_loc) = old_loc {
303                self.status.set_bit(old_loc, false);
304                self.status.append(true);
305            }
306            self.any.inactivity_floor_loc += 1;
307        }
308
309        self.any
310            .apply_op(Fixed::CommitFloor(self.any.inactivity_floor_loc))
311            .await?;
312        self.status.append(false);
313
314        Ok(())
315    }
316
317    /// Commit any pending operations to the db, ensuring they are persisted to disk & recoverable
318    /// upon return from this function. Also raises the inactivity floor according to the schedule.
319    /// Leverages parallel Merkleization of the MMR structures if a thread pool is provided.
320    pub async fn commit(&mut self) -> Result<(), Error> {
321        // Failure recovery relies on this specific order of these three disk-based operations:
322        //  (1) commit the any db to disk (which raises the inactivity floor).
323        //  (2) prune the bitmap to the updated inactivity floor and write its state to disk.
324        self.commit_ops().await?; // (1)
325
326        let mut grafter = Grafting::new(&mut self.any.hasher, Self::grafting_height());
327        grafter
328            .load_grafted_digests(&self.status.dirty_chunks(), &self.any.mmr)
329            .await?;
330        self.status.sync(&mut grafter).await?;
331
332        self.status.prune_to_bit(self.any.inactivity_floor_loc);
333        self.status
334            .write_pruned(
335                self.context.with_label("bitmap"),
336                &self.bitmap_metadata_partition,
337            )
338            .await?; // (2)
339
340        Ok(())
341    }
342
343    /// Sync data to disk, ensuring clean recovery.
344    pub async fn sync(&mut self) -> Result<(), Error> {
345        self.any.sync().await
346    }
347
348    /// Prune all operations prior to `target_prune_loc` from the db.
349    ///
350    /// # Panic
351    ///
352    /// Panics if `target_prune_loc` is greater than the inactivity floor.
353    pub async fn prune(&mut self, target_prune_loc: u64) -> Result<(), Error> {
354        self.any.prune(target_prune_loc).await
355    }
356
357    /// Return the root of the db.
358    ///
359    /// # Warning
360    ///
361    /// Panics if there are uncommitted operations.
362    pub async fn root(&self, hasher: &mut Standard<H>) -> Result<H::Digest, Error> {
363        assert!(
364            !self.status.is_dirty(),
365            "must process updates before computing root"
366        );
367        let ops = &self.any.mmr;
368        let height = Self::grafting_height();
369        let grafted_mmr = GStorage::<'_, H, _, _>::new(&self.status, ops, height);
370        let mmr_root = grafted_mmr.root(hasher).await?;
371
372        // The digest contains all information from the base mmr, and all information from the peak
373        // tree except for the partial chunk, if any.  If we are at a chunk boundary, then this is
374        // all the information we need.
375        let last_chunk = self.status.last_chunk();
376        if last_chunk.1 == 0 {
377            return Ok(mmr_root);
378        }
379
380        // There are bits in an uncommitted (partial) chunk, so we need to incorporate that
381        // information into the root digest. We do so by computing a root in the same format as an
382        // unaligned [Bitmap] root, which involves additionally hashing in the number of bits within
383        // the last chunk and the digest of the last chunk.
384        hasher.inner().update(last_chunk.0);
385        let last_chunk_digest = hasher.inner().finalize();
386
387        Ok(Bitmap::<H, N>::partial_chunk_root(
388            hasher.inner(),
389            &mmr_root,
390            last_chunk.1,
391            &last_chunk_digest,
392        ))
393    }
394
395    /// Returns a proof that the specified range of operations are part of the database, along with
396    /// the operations from the range. A truncated range (from hitting the max) can be detected by
397    /// looking at the length of the returned operations vector. Also returns the bitmap chunks
398    /// required to verify the proof.
399    ///
400    /// # Panics
401    ///
402    /// Panics if there are uncommitted operations or if `start_loc` is invalid.
403    pub async fn range_proof(
404        &self,
405        hasher: &mut H,
406        start_loc: u64,
407        max_ops: NonZeroU64,
408    ) -> Result<(Proof<H::Digest>, Vec<Fixed<K, V>>, Vec<[u8; N]>), Error> {
409        assert!(
410            !self.status.is_dirty(),
411            "must process updates before computing proofs"
412        );
413
414        // Compute the start and end locations & positions of the range.
415        let mmr = &self.any.mmr;
416        let start_pos = leaf_num_to_pos(start_loc);
417        let leaves = mmr.leaves();
418        assert!(start_loc < leaves, "start_loc is invalid");
419        let max_loc = start_loc + max_ops.get();
420        let end_loc = if max_loc > leaves {
421            leaves - 1
422        } else {
423            max_loc - 1
424        };
425        let end_pos = leaf_num_to_pos(end_loc);
426
427        // Generate the proof from the grafted MMR.
428        let height = Self::grafting_height();
429        let grafted_mmr = GStorage::<'_, H, _, _>::new(&self.status, mmr, height);
430        let mut proof = Proof::<H::Digest>::range_proof(&grafted_mmr, start_pos, end_pos).await?;
431
432        // Collect the operations necessary to verify the proof.
433        let mut ops = Vec::with_capacity((end_loc - start_loc + 1) as usize);
434        let futures = (start_loc..=end_loc)
435            .map(|i| self.any.log.read(i))
436            .collect::<Vec<_>>();
437        try_join_all(futures)
438            .await?
439            .into_iter()
440            .for_each(|op| ops.push(op));
441
442        // Gather the chunks necessary to verify the proof.
443        let chunk_bits = Bitmap::<H, N>::CHUNK_SIZE_BITS;
444        let start = start_loc / chunk_bits;
445        let end = end_loc / chunk_bits;
446        let mut chunks = Vec::with_capacity((end - start + 1) as usize);
447        for i in start..=end {
448            let bit_offset = i * chunk_bits;
449            let chunk = *self.status.get_chunk(bit_offset);
450            chunks.push(chunk);
451        }
452
453        let last_chunk = self.status.last_chunk();
454        if last_chunk.1 == 0 {
455            return Ok((proof, ops, chunks));
456        }
457
458        hasher.update(last_chunk.0);
459        proof.digests.push(hasher.finalize());
460
461        Ok((proof, ops, chunks))
462    }
463
464    /// Return true if the given sequence of `ops` were applied starting at location `start_loc` in
465    /// the log with the provided root.
466    pub fn verify_range_proof(
467        hasher: &mut Standard<H>,
468        proof: &Proof<H::Digest>,
469        start_loc: u64,
470        ops: &[Fixed<K, V>],
471        chunks: &[[u8; N]],
472        root: &H::Digest,
473    ) -> bool {
474        let op_count = leaf_pos_to_num(proof.size);
475        let Some(op_count) = op_count else {
476            debug!("verification failed, invalid proof size");
477            return false;
478        };
479        let end_loc = start_loc + ops.len() as u64 - 1;
480        if end_loc >= op_count {
481            debug!(
482                loc = end_loc,
483                op_count, "proof verification failed, invalid range"
484            );
485            return false;
486        }
487
488        let start_pos = leaf_num_to_pos(start_loc);
489
490        let elements = ops.iter().map(|op| op.encode()).collect::<Vec<_>>();
491
492        let chunk_vec = chunks.iter().map(|c| c.as_ref()).collect::<Vec<_>>();
493        let mut verifier = GraftingVerifier::<H>::new(
494            Self::grafting_height(),
495            start_loc / Bitmap::<H, N>::CHUNK_SIZE_BITS,
496            chunk_vec,
497        );
498
499        if op_count % Bitmap::<H, N>::CHUNK_SIZE_BITS == 0 {
500            return proof.verify_range_inclusion(&mut verifier, &elements, start_pos, root);
501        }
502
503        // The proof must contain the partial chunk digest as its last hash.
504        if proof.digests.is_empty() {
505            debug!("proof has no digests");
506            return false;
507        }
508        let mut proof = proof.clone();
509        let last_chunk_digest = proof.digests.pop().unwrap();
510
511        // Reconstruct the MMR root.
512        let mmr_root = match proof.reconstruct_root(&mut verifier, &elements, start_pos) {
513            Ok(root) => root,
514            Err(error) => {
515                debug!(error = ?error, "invalid proof input");
516                return false;
517            }
518        };
519
520        let next_bit = op_count % Bitmap::<H, N>::CHUNK_SIZE_BITS;
521        let reconstructed_root = Bitmap::<H, N>::partial_chunk_root(
522            hasher.inner(),
523            &mmr_root,
524            next_bit,
525            &last_chunk_digest,
526        );
527
528        reconstructed_root == *root
529    }
530
531    /// Generate and return a proof of the current value of `key`, along with the other
532    /// [KeyValueProofInfo] required to verify the proof. Returns KeyNotFound error if the key is
533    /// not currently assigned any value.
534    ///
535    /// # Warning
536    ///
537    /// Panics if there are uncommitted operations.
538    pub async fn key_value_proof(
539        &self,
540        hasher: &mut H,
541        key: K,
542    ) -> Result<(Proof<H::Digest>, KeyValueProofInfo<K, V, N>), Error> {
543        assert!(
544            !self.status.is_dirty(),
545            "must process updates before computing proofs"
546        );
547        let op = self.any.get_key_loc(&key).await?;
548        let Some((value, loc)) = op else {
549            return Err(Error::KeyNotFound);
550        };
551        let pos = leaf_num_to_pos(loc);
552        let height = Self::grafting_height();
553        let grafted_mmr = GStorage::<'_, H, _, _>::new(&self.status, &self.any.mmr, height);
554
555        let mut proof = Proof::<H::Digest>::range_proof(&grafted_mmr, pos, pos).await?;
556        let chunk = *self.status.get_chunk(loc);
557
558        let last_chunk = self.status.last_chunk();
559        if last_chunk.1 != 0 {
560            hasher.update(last_chunk.0);
561            proof.digests.push(hasher.finalize());
562        }
563
564        Ok((
565            proof,
566            KeyValueProofInfo {
567                key,
568                value,
569                loc,
570                chunk,
571            },
572        ))
573    }
574
575    /// Return true if the proof authenticates that `key` currently has value `value` in the db with
576    /// the given root.
577    pub fn verify_key_value_proof(
578        hasher: &mut H,
579        proof: &Proof<H::Digest>,
580        info: &KeyValueProofInfo<K, V, N>,
581        root: &H::Digest,
582    ) -> bool {
583        let Some(op_count) = leaf_pos_to_num(proof.size) else {
584            debug!("verification failed, invalid proof size");
585            return false;
586        };
587
588        // Make sure that the bit for the operation in the bitmap chunk is actually a 1 (indicating
589        // the operation is indeed active).
590        if !Bitmap::<H, N>::get_bit_from_chunk(&info.chunk, info.loc) {
591            debug!(
592                loc = info.loc,
593                "proof verification failed, operation is inactive"
594            );
595            return false;
596        }
597
598        let pos = leaf_num_to_pos(info.loc);
599        let num = info.loc / Bitmap::<H, N>::CHUNK_SIZE_BITS;
600        let mut verifier =
601            GraftingVerifier::<H>::new(Self::grafting_height(), num, vec![&info.chunk]);
602        let element = Fixed::Update(info.key.clone(), info.value.clone()).encode();
603
604        if op_count % Bitmap::<H, N>::CHUNK_SIZE_BITS == 0 {
605            return proof.verify_element_inclusion(&mut verifier, &element, pos, root);
606        }
607
608        // The proof must contain the partial chunk digest as its last hash.
609        if proof.digests.is_empty() {
610            debug!("proof has no digests");
611            return false;
612        }
613
614        let mut proof = proof.clone();
615        let last_chunk_digest = proof.digests.pop().unwrap();
616
617        // If the proof is over an operation in the partial chunk, we need to verify the last chunk
618        // digest from the proof matches the digest of info.chunk, since these bits are not part of
619        // the mmr.
620        if info.loc / Bitmap::<H, N>::CHUNK_SIZE_BITS == op_count / Bitmap::<H, N>::CHUNK_SIZE_BITS
621        {
622            let expected_last_chunk_digest = verifier.digest(&info.chunk);
623            if last_chunk_digest != expected_last_chunk_digest {
624                debug!("last chunk digest does not match expected value");
625                return false;
626            }
627        }
628
629        // Reconstruct the MMR root.
630        let mmr_root = match proof.reconstruct_root(&mut verifier, &[element], pos) {
631            Ok(root) => root,
632            Err(error) => {
633                debug!(error = ?error, "invalid proof input");
634                return false;
635            }
636        };
637
638        let next_bit = op_count % Bitmap::<H, N>::CHUNK_SIZE_BITS;
639        let reconstructed_root =
640            Bitmap::<H, N>::partial_chunk_root(hasher, &mmr_root, next_bit, &last_chunk_digest);
641
642        reconstructed_root == *root
643    }
644
645    /// Close the db. Operations that have not been committed will be lost.
646    pub async fn close(self) -> Result<(), Error> {
647        self.any.close().await
648    }
649
650    /// Destroy the db, removing all data from disk.
651    pub async fn destroy(self) -> Result<(), Error> {
652        // Clean up bitmap metadata partition.
653        Bitmap::<H, N>::destroy(self.context, &self.bitmap_metadata_partition).await?;
654
655        // Clean up Any components (MMR and log).
656        self.any.destroy().await
657    }
658
659    #[cfg(test)]
660    /// Generate an inclusion proof for any operation regardless of its activity state.
661    async fn operation_inclusion_proof(
662        &self,
663        hasher: &mut H,
664        loc: u64,
665    ) -> Result<(Proof<H::Digest>, Fixed<K, V>, u64, [u8; N]), Error> {
666        let op = self.any.log.read(loc).await?;
667
668        let pos = leaf_num_to_pos(loc);
669        let height = Self::grafting_height();
670        let grafted_mmr = GStorage::<'_, H, _, _>::new(&self.status, &self.any.mmr, height);
671
672        let mut proof = Proof::<H::Digest>::range_proof(&grafted_mmr, pos, pos).await?;
673        let chunk = *self.status.get_chunk(loc);
674
675        let last_chunk = self.status.last_chunk();
676        if last_chunk.1 != 0 {
677            hasher.update(last_chunk.0);
678            proof.digests.push(hasher.finalize());
679        }
680
681        Ok((proof, op, loc, chunk))
682    }
683
684    #[cfg(test)]
685    /// Simulate a crash that prevents any data from being written to disk, which involves simply
686    /// consuming the db before it can be cleanly closed.
687    fn simulate_commit_failure_before_any_writes(self) {
688        // Don't successfully complete any of the commit operations.
689    }
690
691    #[cfg(test)]
692    /// Simulate a crash that happens during commit and prevents the any db from being pruned of
693    /// inactive operations, and bitmap state from being written/pruned.
694    async fn simulate_commit_failure_after_any_db_commit(mut self) -> Result<(), Error> {
695        // Only successfully complete operation (1) of the commit process.
696        self.commit_ops().await
697    }
698
699    #[cfg(test)]
700    /// Simulate a crash that happens during commit after the bitmap has been pruned & written, but
701    /// before the any db is pruned of inactive elements.
702    async fn simulate_commit_failure_after_bitmap_written(mut self) -> Result<(), Error> {
703        // Only successfully complete operations (1) and (2) of the commit process.
704        self.commit_ops().await?; // (1)
705
706        let mut grafter = Grafting::new(&mut self.any.hasher, Self::grafting_height());
707        grafter
708            .load_grafted_digests(&self.status.dirty_chunks(), &self.any.mmr)
709            .await?;
710        self.status.sync(&mut grafter).await?;
711        let target_prune_loc = self.any.inactivity_floor_loc;
712        self.status.prune_to_bit(target_prune_loc);
713        self.status
714            .write_pruned(
715                self.context.with_label("bitmap"),
716                &self.bitmap_metadata_partition,
717            )
718            .await?; // (2)
719
720        Ok(())
721    }
722}
723
724#[cfg(test)]
725pub mod test {
726    use super::*;
727    use crate::translator::TwoCap;
728    use commonware_cryptography::{sha256::Digest, Sha256};
729    use commonware_macros::test_traced;
730    use commonware_runtime::{deterministic, Runner as _};
731    use commonware_utils::{NZUsize, NZU64};
732    use rand::{rngs::StdRng, RngCore, SeedableRng};
733    use tracing::warn;
734
735    const PAGE_SIZE: usize = 88;
736    const PAGE_CACHE_SIZE: usize = 8;
737
738    fn current_db_config(partition_prefix: &str) -> Config<TwoCap> {
739        Config {
740            mmr_journal_partition: format!("{partition_prefix}_journal_partition"),
741            mmr_metadata_partition: format!("{partition_prefix}_metadata_partition"),
742            mmr_items_per_blob: NZU64!(11),
743            mmr_write_buffer: NZUsize!(1024),
744            log_journal_partition: format!("{partition_prefix}_partition_prefix"),
745            log_items_per_blob: NZU64!(7),
746            log_write_buffer: NZUsize!(1024),
747            bitmap_metadata_partition: format!("{partition_prefix}_bitmap_metadata_partition"),
748            translator: TwoCap,
749            thread_pool: None,
750            buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
751        }
752    }
753
754    /// A type alias for the concrete [Current] type used in these unit tests.
755    type CurrentTest = Current<deterministic::Context, Digest, Digest, Sha256, TwoCap, 32>;
756
757    /// Return an [Current] database initialized with a fixed config.
758    async fn open_db(context: deterministic::Context, partition_prefix: &str) -> CurrentTest {
759        CurrentTest::init(context, current_db_config(partition_prefix))
760            .await
761            .unwrap()
762    }
763
764    /// Build a small database, then close and reopen it and ensure state is preserved.
765    #[test_traced("DEBUG")]
766    pub fn test_current_db_build_small_close_reopen() {
767        let executor = deterministic::Runner::default();
768        executor.start(|context| async move {
769            let mut hasher = Standard::<Sha256>::new();
770            let partition = "build_small";
771            let mut db = open_db(context.clone(), partition).await;
772            assert_eq!(db.op_count(), 0);
773            assert_eq!(db.inactivity_floor_loc(), 0);
774            let root0 = db.root(&mut hasher).await.unwrap();
775            db.close().await.unwrap();
776            db = open_db(context.clone(), partition).await;
777            assert_eq!(db.op_count(), 0);
778            assert_eq!(db.root(&mut hasher).await.unwrap(), root0);
779
780            // Add one key.
781            let k1 = Sha256::hash(&0u64.to_be_bytes());
782            let v1 = Sha256::hash(&10u64.to_be_bytes());
783            db.update(k1, v1).await.unwrap();
784            assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
785            db.commit().await.unwrap();
786            assert_eq!(db.op_count(), 4); // 1 update, 1 commit, 2 moves.
787            let root1 = db.root(&mut hasher).await.unwrap();
788            assert!(root1 != root0);
789            db.close().await.unwrap();
790            db = open_db(context.clone(), partition).await;
791            assert_eq!(db.op_count(), 4); // 1 update, 1 commit, 2 moves.
792            assert_eq!(db.root(&mut hasher).await.unwrap(), root1);
793
794            // Delete that one key.
795            db.delete(k1).await.unwrap();
796            db.commit().await.unwrap();
797            assert_eq!(db.op_count(), 6); // 1 update, 2 commits, 2 moves, 1 delete.
798            let root2 = db.root(&mut hasher).await.unwrap();
799            db.close().await.unwrap();
800            db = open_db(context.clone(), partition).await;
801            assert_eq!(db.op_count(), 6); // 1 update, 2 commits, 2 moves, 1 delete.
802            assert_eq!(db.root(&mut hasher).await.unwrap(), root2);
803
804            // Confirm all activity bits are false
805            for i in 0..db.op_count() {
806                assert!(!db.status.get_bit(i));
807            }
808
809            db.destroy().await.unwrap();
810        });
811    }
812
813    /// Build a tiny database and make sure we can't convince the verifier that some old value of a
814    /// key is active. We specifically test over the partial chunk case, since these bits are yet to
815    /// be committed to the underlying MMR.
816    #[test_traced("DEBUG")]
817    pub fn test_current_db_verify_proof_over_bits_in_uncommitted_chunk() {
818        let executor = deterministic::Runner::default();
819        executor.start(|context| async move {
820            let mut hasher = Standard::<Sha256>::new();
821            let partition = "build_small";
822            let mut db = open_db(context.clone(), partition).await;
823
824            // Add one key.
825            let k = Sha256::fill(0x01);
826            let v1 = Sha256::fill(0xA1);
827            db.update(k, v1).await.unwrap();
828            db.commit().await.unwrap();
829
830            let op = db.any.get_key_loc(&k).await.unwrap().unwrap();
831            let proof = db
832                .operation_inclusion_proof(hasher.inner(), op.1)
833                .await
834                .unwrap();
835            let info = KeyValueProofInfo {
836                key: k,
837                value: v1,
838                loc: op.1,
839                chunk: proof.3,
840            };
841            let root = db.root(&mut hasher).await.unwrap();
842            // Proof should be verifiable against current root.
843            assert!(CurrentTest::verify_key_value_proof(
844                hasher.inner(),
845                &proof.0,
846                &info,
847                &root,
848            ),);
849
850            let v2 = Sha256::fill(0xA2);
851            // Proof should not verify against a different value.
852            let mut bad_info = info.clone();
853            bad_info.value = v2;
854            assert!(!CurrentTest::verify_key_value_proof(
855                hasher.inner(),
856                &proof.0,
857                &bad_info,
858                &root,
859            ),);
860
861            // update the key to invalidate its previous update
862            db.update(k, v2).await.unwrap();
863            db.commit().await.unwrap();
864
865            // Proof should not be verifiable against the new root.
866            let root = db.root(&mut hasher).await.unwrap();
867            assert!(!CurrentTest::verify_key_value_proof(
868                hasher.inner(),
869                &proof.0,
870                &info,
871                &root,
872            ),);
873
874            // Create a proof of the now-inactive operation.
875            let proof_inactive = db
876                .operation_inclusion_proof(hasher.inner(), op.1)
877                .await
878                .unwrap();
879            // This proof should not verify, but only because verification will see that the
880            // corresponding bit in the chunk is false.
881            let proof_inactive_info = KeyValueProofInfo {
882                key: k,
883                value: v1,
884                loc: proof_inactive.2,
885                chunk: proof_inactive.3,
886            };
887            assert!(!CurrentTest::verify_key_value_proof(
888                hasher.inner(),
889                &proof_inactive.0,
890                &proof_inactive_info,
891                &root,
892            ),);
893
894            // Attempt #1 to "fool" the verifier:  change the location to that of an active
895            // operation. This should not fool the verifier if we're properly validating the
896            // inclusion of the operation itself, and not just the chunk.
897            let (_, active_loc) = db.any.get_key_loc(&info.key).await.unwrap().unwrap();
898            // The new location should differ but still be in the same chunk.
899            assert_ne!(active_loc, info.loc);
900            assert_eq!(
901                Bitmap::<Sha256, 32>::leaf_pos(active_loc),
902                Bitmap::<Sha256, 32>::leaf_pos(info.loc)
903            );
904            let mut info_with_modified_loc = info.clone();
905            info_with_modified_loc.loc = active_loc;
906            assert!(!CurrentTest::verify_key_value_proof(
907                hasher.inner(),
908                &proof_inactive.0,
909                &proof_inactive_info,
910                &root,
911            ),);
912
913            // Attempt #2 to "fool" the verifier: Modify the chunk in the proof info to make it look
914            // like the operation is active by flipping its corresponding bit to 1. This should not
915            // fool the verifier if we are correctly incorporating the partial chunk information
916            // into the root computation.
917            let mut modified_chunk = proof_inactive.3;
918            let bit_pos = proof_inactive.2;
919            let byte_idx = bit_pos / 8;
920            let bit_idx = bit_pos % 8;
921            modified_chunk[byte_idx as usize] |= 1 << bit_idx;
922
923            let mut info_with_modified_chunk = info.clone();
924            info_with_modified_chunk.chunk = modified_chunk;
925            assert!(!CurrentTest::verify_key_value_proof(
926                hasher.inner(),
927                &proof_inactive.0,
928                &info_with_modified_chunk,
929                &root,
930            ),);
931
932            db.destroy().await.unwrap();
933        });
934    }
935
936    /// Apply random operations to the given db, committing them (randomly & at the end) only if
937    /// `commit_changes` is true.
938    async fn apply_random_ops(
939        num_elements: u64,
940        commit_changes: bool,
941        rng_seed: u64,
942        db: &mut CurrentTest,
943    ) -> Result<(), Error> {
944        // Log the seed with high visibility to make failures reproducible.
945        warn!("rng_seed={}", rng_seed);
946        let mut rng = StdRng::seed_from_u64(rng_seed);
947
948        for i in 0u64..num_elements {
949            let k = Sha256::hash(&i.to_be_bytes());
950            let v = Sha256::hash(&rng.next_u32().to_be_bytes());
951            db.update(k, v).await.unwrap();
952        }
953
954        // Randomly update / delete them. We use a delete frequency that is 1/7th of the update
955        // frequency.
956        for _ in 0u64..num_elements * 10 {
957            let rand_key = Sha256::hash(&(rng.next_u64() % num_elements).to_be_bytes());
958            if rng.next_u32() % 7 == 0 {
959                db.delete(rand_key).await.unwrap();
960                continue;
961            }
962            let v = Sha256::hash(&rng.next_u32().to_be_bytes());
963            db.update(rand_key, v).await.unwrap();
964            if commit_changes && rng.next_u32() % 20 == 0 {
965                // Commit every ~20 updates.
966                db.commit().await.unwrap();
967            }
968        }
969        if commit_changes {
970            db.commit().await.unwrap();
971        }
972
973        Ok(())
974    }
975
976    #[test_traced("DEBUG")]
977    pub fn test_current_db_range_proofs() {
978        let executor = deterministic::Runner::default();
979        executor.start(|mut context| async move {
980            let partition = "range_proofs";
981            let mut hasher = Standard::<Sha256>::new();
982            let mut db = open_db(context.clone(), partition).await;
983            apply_random_ops(200, true, context.next_u64(), &mut db)
984                .await
985                .unwrap();
986            let root = db.root(&mut hasher).await.unwrap();
987
988            // Make sure size-constrained batches of operations are provable from the oldest
989            // retained op to tip.
990            let max_ops = 4;
991            let end_loc = db.op_count();
992            let start_loc = db.any.inactivity_floor_loc();
993
994            for i in start_loc..end_loc {
995                let (proof, ops, chunks) = db
996                    .range_proof(hasher.inner(), i, NZU64!(max_ops))
997                    .await
998                    .unwrap();
999                assert!(
1000                    CurrentTest::verify_range_proof(&mut hasher, &proof, i, &ops, &chunks, &root),
1001                    "failed to verify range at start_loc {start_loc}",
1002                );
1003            }
1004
1005            db.destroy().await.unwrap();
1006        });
1007    }
1008
1009    #[test_traced("DEBUG")]
1010    pub fn test_current_db_key_value_proof() {
1011        let executor = deterministic::Runner::default();
1012        executor.start(|mut context| async move {
1013            let partition = "range_proofs";
1014            let mut hasher = Standard::<Sha256>::new();
1015            let mut db = open_db(context.clone(), partition).await;
1016            apply_random_ops(500, true, context.next_u64(), &mut db)
1017                .await
1018                .unwrap();
1019            let root = db.root(&mut hasher).await.unwrap();
1020
1021            // Confirm bad keys produce the expected error.
1022            let bad_key = Sha256::fill(0xAA);
1023            let res = db.key_value_proof(hasher.inner(), bad_key).await;
1024            assert!(matches!(res, Err(Error::KeyNotFound)));
1025
1026            let start = db.inactivity_floor_loc();
1027            for i in start..db.status.bit_count() {
1028                if !db.status.get_bit(i) {
1029                    continue;
1030                }
1031                // Found an active operation! Create a proof for its active current key/value.
1032                let op = db.any.log.read(i).await.unwrap();
1033                let key = op.key().unwrap();
1034                let (proof, info) = db.key_value_proof(hasher.inner(), *key).await.unwrap();
1035                assert_eq!(info.value, *op.value().unwrap());
1036                // Proof should validate against the current value and correct root.
1037                assert!(CurrentTest::verify_key_value_proof(
1038                    hasher.inner(),
1039                    &proof,
1040                    &info,
1041                    &root
1042                ));
1043                // Proof should fail against the wrong value.
1044                let wrong_val = Sha256::fill(0xFF);
1045                let mut bad_info = info.clone();
1046                bad_info.value = wrong_val;
1047                assert!(!CurrentTest::verify_key_value_proof(
1048                    hasher.inner(),
1049                    &proof,
1050                    &bad_info,
1051                    &root
1052                ));
1053                // Proof should fail against the wrong key.
1054                let wrong_key = Sha256::fill(0xEE);
1055                let mut bad_info = info.clone();
1056                bad_info.key = wrong_key;
1057                assert!(!CurrentTest::verify_key_value_proof(
1058                    hasher.inner(),
1059                    &proof,
1060                    &bad_info,
1061                    &root
1062                ));
1063                // Proof should fail against the wrong root.
1064                let wrong_root = Sha256::fill(0xDD);
1065                assert!(!CurrentTest::verify_key_value_proof(
1066                    hasher.inner(),
1067                    &proof,
1068                    &info,
1069                    &wrong_root,
1070                ),);
1071            }
1072
1073            db.destroy().await.unwrap();
1074        });
1075    }
1076
1077    /// This test builds a random database, and makes sure that its state is correctly restored
1078    /// after closing and re-opening.
1079    #[test_traced("WARN")]
1080    pub fn test_current_db_build_random_close_reopen() {
1081        // Number of elements to initially insert into the db.
1082        const ELEMENTS: u64 = 1000;
1083
1084        let executor = deterministic::Runner::default();
1085        executor.start(|mut context| async move {
1086            let partition = "build_random";
1087            let rng_seed = context.next_u64();
1088            let mut hasher = Standard::<Sha256>::new();
1089            let mut db = open_db(context.clone(), partition).await;
1090            apply_random_ops(ELEMENTS, true, rng_seed, &mut db)
1091                .await
1092                .unwrap();
1093
1094            // Close the db, then replay its operations with a bitmap.
1095            let root = db.root(&mut hasher).await.unwrap();
1096            // Create a bitmap based on the current db's pruned/inactive state.
1097            db.close().await.unwrap();
1098
1099            let db = open_db(context, partition).await;
1100            assert_eq!(db.root(&mut hasher).await.unwrap(), root);
1101
1102            db.destroy().await.unwrap();
1103        });
1104    }
1105
1106    /// Repeatedly update the same key to a new value and ensure we can prove its current value
1107    /// after each update.
1108    #[test_traced("WARN")]
1109    pub fn test_current_db_proving_repeated_updates() {
1110        let executor = deterministic::Runner::default();
1111        executor.start(|context| async move {
1112            let mut hasher = Standard::<Sha256>::new();
1113            let partition = "build_small";
1114            let mut db = open_db(context.clone(), partition).await;
1115
1116            // Add one key.
1117            let k = Sha256::fill(0x00);
1118            let mut old_info = KeyValueProofInfo {
1119                key: k,
1120                value: Sha256::fill(0x00),
1121                loc: 0,
1122                chunk: [0; 32],
1123            };
1124            for i in 1u8..=255 {
1125                let v = Sha256::fill(i);
1126                db.update(k, v).await.unwrap();
1127                assert_eq!(db.get(&k).await.unwrap().unwrap(), v);
1128                db.commit().await.unwrap();
1129                let root = db.root(&mut hasher).await.unwrap();
1130
1131                // Create a proof for the current value of k.
1132                let (proof, info) = db.key_value_proof(hasher.inner(), k).await.unwrap();
1133                assert_eq!(info.value, v);
1134                assert!(
1135                    CurrentTest::verify_key_value_proof(hasher.inner(), &proof, &info, &root),
1136                    "proof of update {i} failed to verify"
1137                );
1138                // Ensure the proof does NOT verify if we use the previous value.
1139                assert!(
1140                    !CurrentTest::verify_key_value_proof(hasher.inner(), &proof, &old_info, &root),
1141                    "proof of update {i} failed to verify"
1142                );
1143                old_info = info.clone();
1144            }
1145
1146            db.destroy().await.unwrap();
1147        });
1148    }
1149
1150    /// This test builds a random database and simulates we can recover from 3 different types of
1151    /// failure scenarios.
1152    #[test_traced("WARN")]
1153    pub fn test_current_db_simulate_write_failures() {
1154        // Number of elements to initially insert into the db.
1155        const ELEMENTS: u64 = 1000;
1156
1157        let executor = deterministic::Runner::default();
1158        executor.start(|mut context| async move {
1159            let partition = "build_random_fail_commit";
1160            let rng_seed = context.next_u64();
1161            let mut hasher = Standard::<Sha256>::new();
1162            let mut db = open_db(context.clone(), partition).await;
1163            apply_random_ops(ELEMENTS, true, rng_seed, &mut db)
1164                .await
1165                .unwrap();
1166            let committed_root = db.root(&mut hasher).await.unwrap();
1167            let committed_op_count = db.op_count();
1168            let committed_inactivity_floor = db.any.inactivity_floor_loc;
1169            db.prune(committed_inactivity_floor).await.unwrap();
1170
1171            // Perform more random operations without committing any of them.
1172            apply_random_ops(ELEMENTS, false, rng_seed + 1, &mut db)
1173                .await
1174                .unwrap();
1175
1176            // SCENARIO #1: Simulate a crash that happens before any writes. Upon reopening, the
1177            // state of the DB should be as of the last commit.
1178            db.simulate_commit_failure_before_any_writes();
1179            let mut db = open_db(context.clone(), partition).await;
1180            assert_eq!(db.root(&mut hasher).await.unwrap(), committed_root);
1181            assert_eq!(db.op_count(), committed_op_count);
1182
1183            // Re-apply the exact same uncommitted operations.
1184            apply_random_ops(ELEMENTS, false, rng_seed + 1, &mut db)
1185                .await
1186                .unwrap();
1187
1188            // SCENARIO #2: Simulate a crash that happens after the any db has been committed, but
1189            // before the state of the pruned bitmap can be written to disk.
1190            db.simulate_commit_failure_after_any_db_commit()
1191                .await
1192                .unwrap();
1193
1194            // We should be able to recover, so the root should differ from the previous commit, and
1195            // the op count should be greater than before.
1196            let db = open_db(context.clone(), partition).await;
1197            let scenario_2_root = db.root(&mut hasher).await.unwrap();
1198
1199            // To confirm the second committed hash is correct we'll re-build the DB in a new
1200            // partition, but without any failures. They should have the exact same state.
1201            let fresh_partition = "build_random_fail_commit_fresh";
1202            let mut db = open_db(context.clone(), fresh_partition).await;
1203            apply_random_ops(ELEMENTS, true, rng_seed, &mut db)
1204                .await
1205                .unwrap();
1206            apply_random_ops(ELEMENTS, false, rng_seed + 1, &mut db)
1207                .await
1208                .unwrap();
1209            db.commit().await.unwrap();
1210            db.prune(db.any.inactivity_floor_loc()).await.unwrap();
1211            // State from scenario #2 should match that of a successful commit.
1212            assert_eq!(db.root(&mut hasher).await.unwrap(), scenario_2_root);
1213            db.close().await.unwrap();
1214
1215            // SCENARIO #3: Simulate a crash that happens after the any db has been committed and
1216            // the bitmap is written. Full state restoration should remain possible.
1217            let fresh_partition = "build_random_fail_commit_fresh_2";
1218            let mut db = open_db(context.clone(), fresh_partition).await;
1219            apply_random_ops(ELEMENTS, true, rng_seed, &mut db)
1220                .await
1221                .unwrap();
1222            apply_random_ops(ELEMENTS, false, rng_seed + 1, &mut db)
1223                .await
1224                .unwrap();
1225            db.simulate_commit_failure_after_bitmap_written()
1226                .await
1227                .unwrap();
1228            let db = open_db(context.clone(), fresh_partition).await;
1229            // State should match that of the successful commit.
1230            assert_eq!(db.root(&mut hasher).await.unwrap(), scenario_2_root);
1231
1232            db.destroy().await.unwrap();
1233        });
1234    }
1235
1236    #[test_traced("WARN")]
1237    pub fn test_current_db_different_pruning_delays_same_root() {
1238        let executor = deterministic::Runner::default();
1239        executor.start(|context| async move {
1240            let mut hasher = Standard::<Sha256>::new();
1241
1242            // Create two databases that are identical other than how they are pruned.
1243            let db_config_no_pruning = current_db_config("no_pruning_test");
1244
1245            let db_config_pruning = current_db_config("pruning_test");
1246
1247            let mut db_no_pruning =
1248                CurrentTest::init(context.clone(), db_config_no_pruning.clone())
1249                    .await
1250                    .unwrap();
1251            let mut db_pruning = CurrentTest::init(context.clone(), db_config_pruning.clone())
1252                .await
1253                .unwrap();
1254
1255            // Apply identical operations to both databases, but only prune one.
1256            const NUM_OPERATIONS: u64 = 1000;
1257            for i in 0..NUM_OPERATIONS {
1258                let key = Sha256::hash(&i.to_be_bytes());
1259                let value = Sha256::hash(&(i * 1000).to_be_bytes());
1260
1261                db_no_pruning.update(key, value).await.unwrap();
1262                db_pruning.update(key, value).await.unwrap();
1263
1264                // Commit periodically
1265                if i % 50 == 49 {
1266                    db_no_pruning.commit().await.unwrap();
1267                    db_pruning.commit().await.unwrap();
1268                    db_pruning
1269                        .prune(db_no_pruning.any.inactivity_floor_loc())
1270                        .await
1271                        .unwrap();
1272                }
1273            }
1274
1275            // Final commit
1276            db_no_pruning.commit().await.unwrap();
1277            db_pruning.commit().await.unwrap();
1278
1279            // Get roots from both databases
1280            let root_no_pruning = db_no_pruning.root(&mut hasher).await.unwrap();
1281            let root_pruning = db_pruning.root(&mut hasher).await.unwrap();
1282
1283            // Verify they generate the same roots
1284            assert_eq!(root_no_pruning, root_pruning);
1285
1286            // Close both databases
1287            db_no_pruning.close().await.unwrap();
1288            db_pruning.close().await.unwrap();
1289
1290            // Restart both databases
1291            let db_no_pruning = CurrentTest::init(context.clone(), db_config_no_pruning)
1292                .await
1293                .unwrap();
1294            let db_pruning = CurrentTest::init(context.clone(), db_config_pruning)
1295                .await
1296                .unwrap();
1297            assert_eq!(
1298                db_no_pruning.inactivity_floor_loc(),
1299                db_pruning.inactivity_floor_loc()
1300            );
1301
1302            // Get roots after restart
1303            let root_no_pruning_restart = db_no_pruning.root(&mut hasher).await.unwrap();
1304            let root_pruning_restart = db_pruning.root(&mut hasher).await.unwrap();
1305
1306            // Ensure roots still match after restart
1307            assert_eq!(root_no_pruning, root_no_pruning_restart);
1308            assert_eq!(root_pruning, root_pruning_restart);
1309
1310            db_no_pruning.destroy().await.unwrap();
1311            db_pruning.destroy().await.unwrap();
1312        });
1313    }
1314}