Skip to main content

commonware_storage/qmdb/current/
mod.rs

1//! A _Current_ authenticated database provides succinct proofs of _any_ value ever associated with
2//! a key, and also whether that value is the _current_ value associated with it.
3//!
4//! # Examples
5//!
6//! The batch API mirrors [crate::qmdb::any]:
7//!
8//! ```ignore
9//! let mut batch = db.new_batch();
10//! batch.write(key, Some(value));
11//! let merkleized = batch.merkleize(None).await?;
12//! let finalized = merkleized.finalize();
13//! db.apply_batch(finalized).await?;
14//! ```
15//!
16//! # Motivation
17//!
18//! An [crate::qmdb::any] ("Any") database can prove that a key had a particular value at some
19//! point, but it cannot prove that the value is still current -- some later operation may have
20//! updated or deleted it. A Current database adds exactly this capability by maintaining a bitmap
21//! that tracks which operations are _active_ (i.e. represent the current state of their key).
22//!
23//! To make this useful, a verifier needs both the operation and its activity status authenticated
24//! under a single root. We achieve this by _grafting_ bitmap chunks onto the operations MMR.
25//!
26//! # Data structures
27//!
28//! A Current database ([db::Db]) wraps an Any database and adds:
29//!
30//! - **Status bitmap** ([BitMap]): One bit per operation in the log. Bit _i_ is 1 if
31//!   operation _i_ is active, 0 otherwise. The bitmap is divided into fixed-size chunks of `N`
32//!   bytes (i.e. `N * 8` bits each). `N` must be a power of two.
33//!
34//! - **Grafted MMR** (`Mmr<Digest>`): An in-memory MMR of digests at and above the
35//!   _grafting height_ in the ops MMR. This is the core of how bitmap and ops MMR are combined
36//!   into a single authenticated structure (see below).
37//!
38//! - **Bitmap metadata** (`Metadata`): Persists the pruning boundary and "pinned" digests needed
39//!   to restore the grafted MMR after pruning old bitmap chunks.
40//!
41//! # Grafting: combining the activity status bitmap and the ops MMR
42//!
43//! ## The problem
44//!
45//! Naively authenticating the bitmap and ops MMR as two independent Merkle structures would
46//! require two separate proofs per operation -- one for the operation's value, one for its
47//! activity status. This doubles proof sizes.
48//!
49//! ## The solution
50//!
51//! We combine ("graft") the two structures at a specific height in the ops MMR called the
52//! _grafting height_. The grafting height `h = log2(N * 8)` is chosen so that each subtree of
53//! height `h` in the ops MMR covers exactly one bitmap chunk's worth of operations.
54//!
55//! At the grafting height, instead of using the ops MMR's own subtree root, we replace it with a
56//! _grafted leaf_ digest that incorporates both the bitmap chunk and the ops subtree root:
57//!
58//! ```text
59//! grafted_leaf = hash(bitmap_chunk || ops_subtree_root)   // non-zero chunk
60//! grafted_leaf = ops_subtree_root                         // all-zero chunk (identity)
61//! ```
62//!
63//! The all-zero identity means that for pruned regions (where every operation is inactive), the
64//! grafted MMR is structurally identical to the ops MMR at and above the grafting height.
65//!
66//! Above the grafting height, internal nodes use standard MMR hashing over the grafted leaves.
67//! Below the grafting height, the ops MMR is unchanged.
68//!
69//! ## Example
70//!
71//! Consider 8 operations with `N = 1` (8-bit chunks, so `h = log2(8) = 3`). But to illustrate
72//! the structure more clearly, let's use a smaller example: 8 operations with chunk size 4 bits
73//! (`h = 2`), yielding 2 complete bitmap chunks:
74//!
75//! ```text
76//! Ops MMR positions (8 leaves):
77//!
78//!   Height
79//!     3              14                    <-- peak: digest commits to ops MMR and bitmap chunks
80//!                  /    \
81//!                 /      \
82//!                /        \
83//!     2  [G]    6          13    [G]       <-- grafting height: grafted leaves
84//!             /   \      /    \
85//!     1      2     5    9     12           <-- below grafting height: pure ops MMR nodes
86//!           / \   / \  / \   /  \
87//!     0    0   1 3   4 7  8 10  11
88//!          ^           ^
89//!          |           |
90//!      ops 0-3     ops 4-7
91//!      chunk 0     chunk 1
92//! ```
93//!
94//! Positions 6 and 13 are at the grafting height. Their digests are:
95//! - `pos 6: hash(chunk_0 || ops_subtree_root(pos 6))`
96//! - `pos 13: hash(chunk_1 || ops_subtree_root(pos 13))`
97//!
98//! Position 14 (above grafting height) is a standard MMR internal node:
99//! - `pos 14: hash(14 || digest(pos 6) || digest(pos 13))`
100//!
101//! The grafted MMR stores positions 6, 13, and 14. The ops MMR stores everything below
102//! (positions 0-5 and 7-12). Together they form a single virtual MMR whose root authenticates
103//! both the operations and their activity status.
104//!
105//! ## Proof generation and verification
106//!
107//! To prove that operation _i_ is active, we provide:
108//! 1. An MMR inclusion proof for the operation's leaf, using the virtual (grafted) storage.
109//! 2. The bitmap chunk containing bit _i_.
110//!
111//! The verifier (see `grafting::Verifier`) walks the proof from leaf to root. Below the grafting
112//! height, it uses standard MMR hashing. At the grafting height, it detects the boundary and
113//! reconstructs the grafted leaf from the chunk and the ops subtree root. For non-zero chunks
114//! the grafted leaf is `hash(chunk || ops_subtree_root)`; for all-zero chunks the grafted leaf
115//! is the ops subtree root itself (identity optimization -- see `grafting::Verifier::node`).
116//! Above the grafting height, it resumes standard MMR hashing. If the reconstructed root
117//! matches the expected root and bit _i_ is set in the chunk, the operation is proven active.
118//!
119//! This is a single proof path, not two independent ones -- the bitmap chunk is embedded in the
120//! proof verification at the grafting boundary.
121//!
122//! ## Partial chunks
123//!
124//! Operations arrive continuously, so the last bitmap chunk is usually incomplete (fewer than
125//! `N * 8` bits). An incomplete chunk has no grafted leaf in the cache because there is no
126//! corresponding complete subtree in the ops MMR. To still authenticate these bits, the partial
127//! chunk's digest and bit count are folded into the canonical root hash:
128//!
129//! ```text
130//! root = hash(ops_root || grafted_mmr_root || next_bit || hash(partial_chunk))
131//! ```
132//!
133//! where `next_bit` is the index of the next unset position in the partial chunk and
134//! `grafted_mmr_root` is the root of the grafted MMR (which covers only complete chunks).
135//! When all chunks are complete, the partial chunk components are omitted.
136//!
137//! ## Incremental updates
138//!
139//! When operations are added or bits change (e.g. an operation becomes inactive during floor
140//! raising), only the affected chunks are marked "dirty". During `merkleize`, only dirty grafted
141//! leaves are recomputed and their ancestors are propagated upward through the cache. This avoids
142//! recomputing the entire grafted tree.
143//!
144//! ## Pruning
145//!
146//! Old bitmap chunks (below the inactivity floor) can be pruned. Before pruning, the grafted
147//! digest peaks covering the pruned region are persisted to metadata as "pinned nodes". On
148//! recovery, these pinned nodes are loaded and serve as opaque siblings during upward propagation,
149//! allowing the grafted tree to be rebuilt without the pruned chunks.
150//!
151//! # Root structure
152//!
153//! The canonical root of a `current` database is:
154//!
155//! ```text
156//! root = hash(ops_root || grafted_mmr_root [|| next_bit || hash(partial_chunk)])
157//! ```
158//!
159//! where `grafted_mmr_root` is the root of the grafted MMR (covering only complete
160//! bitmap chunks), `next_bit` is the index of the next unset position in the partial chunk, and
161//! `hash(partial_chunk)` is the digest of the incomplete trailing chunk. The partial chunk
162//! components are only present when the last bitmap chunk is incomplete.
163//!
164//! This combines two (or three) components into a single hash:
165//!
166//! - **Ops root**: The root of the raw operations MMR (the inner [crate::qmdb::any] database's
167//!   root). Used for state sync, where a client downloads operations and verifies each batch
168//!   against this root using standard MMR range proofs.
169//!
170//! - **Grafted MMR root**: The root of the grafted MMR (overlaying bitmap chunks
171//!   with ops subtree roots). Used for proofs about operation values and their activity status.
172//!   See [RangeProof](proof::RangeProof) and [OperationProof](proof::OperationProof).
173//!
174//! - **Partial chunk** (optional): When operations arrive continuously, the last bitmap chunk is
175//!   usually incomplete. Its digest and bit count are folded into the canonical root hash.
176//!
177//! The canonical root is returned by [Db](db::Db)`::`[root()](db::Db::root) and
178//! [MerkleizedStore](crate::qmdb::store::MerkleizedStore)`::`[root()](crate::qmdb::store::MerkleizedStore::root).
179//! The ops root is returned by the `sync::Database` trait's `root()` method, since the sync engine
180//! verifies batches against the ops root, not the canonical root.
181//!
182//! For state sync, the sync engine targets the ops root and verifies each batch against it.
183//! After sync, the bitmap and grafted MMR are reconstructed deterministically from the
184//! operations, and the canonical root is computed. Validating that the ops root is part of the
185//! canonical root is the caller's responsibility; the sync engine does not perform this check.
186
187use crate::{
188    index::Unordered as UnorderedIndex,
189    journal::contiguous::{fixed::Journal as FJournal, variable::Journal as VJournal},
190    mmr::{Location, StandardHasher},
191    qmdb::{
192        any::{
193            self,
194            operation::{Operation, Update},
195            FixedConfig as AnyFixedConfig, ValueEncoding, VariableConfig as AnyVariableConfig,
196        },
197        operation::{Committable, Key},
198        Error,
199    },
200    translator::Translator,
201};
202use commonware_codec::{Codec, CodecFixedShared, FixedSize, Read};
203use commonware_cryptography::Hasher;
204use commonware_parallel::ThreadPool;
205use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage};
206use commonware_utils::{bitmap::Prunable as BitMap, sync::AsyncMutex, Array};
207use std::num::{NonZeroU64, NonZeroUsize};
208
209pub mod batch;
210pub mod db;
211mod grafting;
212pub mod ordered;
213pub mod proof;
214pub(crate) mod sync;
215pub mod unordered;
216
217/// Configuration for a `Current` authenticated db with fixed-size values.
218#[derive(Clone)]
219pub struct FixedConfig<T: Translator> {
220    /// The name of the storage partition used for the MMR's backing journal.
221    pub mmr_journal_partition: String,
222
223    /// The items per blob configuration value used by the MMR journal.
224    pub mmr_items_per_blob: NonZeroU64,
225
226    /// The size of the write buffer to use for each blob in the MMR journal.
227    pub mmr_write_buffer: NonZeroUsize,
228
229    /// The name of the storage partition used for the MMR's metadata.
230    pub mmr_metadata_partition: String,
231
232    /// The name of the storage partition used to persist the (pruned) log of operations.
233    pub log_journal_partition: String,
234
235    /// The items per blob configuration value used by the log journal.
236    pub log_items_per_blob: NonZeroU64,
237
238    /// The size of the write buffer to use for each blob in the log journal.
239    pub log_write_buffer: NonZeroUsize,
240
241    /// The name of the storage partition used for the grafted MMR metadata.
242    pub grafted_mmr_metadata_partition: String,
243
244    /// The translator used by the compressed index.
245    pub translator: T,
246
247    /// An optional thread pool to use for parallelizing batch operations.
248    pub thread_pool: Option<ThreadPool>,
249
250    /// The page cache to use for caching data.
251    pub page_cache: CacheRef,
252}
253
254impl<T: Translator> From<FixedConfig<T>> for AnyFixedConfig<T> {
255    fn from(cfg: FixedConfig<T>) -> Self {
256        Self {
257            mmr_journal_partition: cfg.mmr_journal_partition,
258            mmr_metadata_partition: cfg.mmr_metadata_partition,
259            mmr_items_per_blob: cfg.mmr_items_per_blob,
260            mmr_write_buffer: cfg.mmr_write_buffer,
261            log_journal_partition: cfg.log_journal_partition,
262            log_items_per_blob: cfg.log_items_per_blob,
263            log_write_buffer: cfg.log_write_buffer,
264            translator: cfg.translator,
265            thread_pool: cfg.thread_pool,
266            page_cache: cfg.page_cache,
267        }
268    }
269}
270
271#[derive(Clone)]
272pub struct VariableConfig<T: Translator, C> {
273    /// The name of the storage partition used for the MMR's backing journal.
274    pub mmr_journal_partition: String,
275
276    /// The items per blob configuration value used by the MMR journal.
277    pub mmr_items_per_blob: NonZeroU64,
278
279    /// The size of the write buffer to use for each blob in the MMR journal.
280    pub mmr_write_buffer: NonZeroUsize,
281
282    /// The name of the storage partition used for the MMR's metadata.
283    pub mmr_metadata_partition: String,
284
285    /// The name of the storage partition used to persist the log of operations.
286    pub log_partition: String,
287
288    /// The size of the write buffer to use for each blob in the log journal.
289    pub log_write_buffer: NonZeroUsize,
290
291    /// Optional compression level (using `zstd`) to apply to log data before storing.
292    pub log_compression: Option<u8>,
293
294    /// The codec configuration to use for the log.
295    pub log_codec_config: C,
296
297    /// The items per blob configuration value used by the log journal.
298    pub log_items_per_blob: NonZeroU64,
299
300    /// The name of the storage partition used for the grafted MMR metadata.
301    pub grafted_mmr_metadata_partition: String,
302
303    /// The translator used by the compressed index.
304    pub translator: T,
305
306    /// An optional thread pool to use for parallelizing batch operations.
307    pub thread_pool: Option<ThreadPool>,
308
309    /// The page cache to use for caching data.
310    pub page_cache: CacheRef,
311}
312
313impl<T: Translator, C> From<VariableConfig<T, C>> for AnyVariableConfig<T, C> {
314    fn from(cfg: VariableConfig<T, C>) -> Self {
315        Self {
316            mmr_journal_partition: cfg.mmr_journal_partition,
317            mmr_metadata_partition: cfg.mmr_metadata_partition,
318            mmr_items_per_blob: cfg.mmr_items_per_blob,
319            mmr_write_buffer: cfg.mmr_write_buffer,
320            log_items_per_blob: cfg.log_items_per_blob,
321            log_partition: cfg.log_partition,
322            log_write_buffer: cfg.log_write_buffer,
323            log_compression: cfg.log_compression,
324            log_codec_config: cfg.log_codec_config,
325            translator: cfg.translator,
326            thread_pool: cfg.thread_pool,
327            page_cache: cfg.page_cache,
328        }
329    }
330}
331
332/// Shared initialization logic for fixed-sized value Current [db::Db].
333pub(super) async fn init_fixed<E, K, V, U, H, T, I, const N: usize, NewIndex>(
334    context: E,
335    config: FixedConfig<T>,
336    new_index: NewIndex,
337) -> Result<db::Db<E, FJournal<E, Operation<K, V, U>>, I, H, U, N>, Error>
338where
339    E: Storage + Clock + Metrics,
340    K: Array,
341    V: ValueEncoding,
342    U: Update<K, V> + Send + Sync,
343    H: Hasher,
344    T: Translator,
345    I: UnorderedIndex<Value = Location>,
346    NewIndex: FnOnce(E, T) -> I,
347    Operation<K, V, U>: CodecFixedShared + Committable,
348{
349    // TODO: Re-evaluate assertion placement after `generic_const_exprs` is stable.
350    const {
351        // A compile-time assertion that the chunk size is some multiple of digest size. A multiple
352        // of 1 is optimal with respect to proof size, but a higher multiple allows for a smaller
353        // (RAM resident) merkle tree over the structure.
354        assert!(
355            N.is_multiple_of(H::Digest::SIZE),
356            "chunk size must be some multiple of the digest size",
357        );
358        // A compile-time assertion that chunk size is a power of 2, which is necessary to allow
359        // the status bitmap tree to be aligned with the underlying operations MMR.
360        assert!(N.is_power_of_two(), "chunk size must be a power of 2");
361    }
362
363    let thread_pool = config.thread_pool.clone();
364    let metadata_partition = config.grafted_mmr_metadata_partition.clone();
365
366    // Load bitmap metadata (pruned_chunks + pinned nodes for grafted MMR).
367    let (metadata, pruned_chunks, pinned_nodes) =
368        db::init_metadata(context.with_label("metadata"), &metadata_partition).await?;
369
370    // Initialize the activity status bitmap.
371    let mut status = BitMap::<N>::new_with_pruned_chunks(pruned_chunks)
372        .map_err(|_| Error::DataCorrupted("pruned chunks overflow"))?;
373
374    // Initialize the anydb with a callback that populates the status bitmap.
375    let last_known_inactivity_floor = Location::new(status.len());
376    let any = any::init_fixed(
377        context.with_label("any"),
378        config.into(),
379        Some(last_known_inactivity_floor),
380        |append: bool, loc: Option<Location>| {
381            status.push(append);
382            if let Some(loc) = loc {
383                status.set_bit(*loc, false);
384            }
385        },
386        new_index,
387    )
388    .await?;
389
390    // Build the grafted MMR from the bitmap and ops MMR.
391    let mut hasher = StandardHasher::<H>::new();
392    let grafted_mmr = db::build_grafted_mmr::<H, N>(
393        &mut hasher,
394        &status,
395        &pinned_nodes,
396        &any.log.mmr,
397        thread_pool.as_ref(),
398    )
399    .await?;
400
401    // Compute and cache the root.
402    let storage = grafting::Storage::new(&grafted_mmr, grafting::height::<N>(), &any.log.mmr);
403    let partial_chunk = db::partial_chunk(&status);
404    let ops_root = any.log.root();
405    let root = db::compute_db_root(&mut hasher, &storage, partial_chunk, &ops_root).await?;
406
407    Ok(db::Db {
408        any,
409        status,
410        grafted_mmr,
411        metadata: AsyncMutex::new(metadata),
412        thread_pool,
413        root,
414    })
415}
416
417/// Shared initialization logic for variable-sized value Current [db::Db].
418pub(super) async fn init_variable<E, K, V, U, H, T, I, const N: usize, NewIndex>(
419    context: E,
420    config: VariableConfig<T, <Operation<K, V, U> as Read>::Cfg>,
421    new_index: NewIndex,
422) -> Result<db::Db<E, VJournal<E, Operation<K, V, U>>, I, H, U, N>, Error>
423where
424    E: Storage + Clock + Metrics,
425    K: Key,
426    V: ValueEncoding,
427    U: Update<K, V> + Send + Sync,
428    H: Hasher,
429    T: Translator,
430    I: UnorderedIndex<Value = Location>,
431    NewIndex: FnOnce(E, T) -> I,
432    Operation<K, V, U>: Codec + Committable,
433{
434    // TODO: Re-evaluate assertion placement after `generic_const_exprs` is stable.
435    const {
436        // A compile-time assertion that the chunk size is some multiple of digest size. A multiple
437        // of 1 is optimal with respect to proof size, but a higher multiple allows for a smaller
438        // (RAM resident) merkle tree over the structure.
439        assert!(
440            N.is_multiple_of(H::Digest::SIZE),
441            "chunk size must be some multiple of the digest size",
442        );
443        // A compile-time assertion that chunk size is a power of 2, which is necessary to allow
444        // the status bitmap tree to be aligned with the underlying operations MMR.
445        assert!(N.is_power_of_two(), "chunk size must be a power of 2");
446    }
447
448    let metadata_partition = config.grafted_mmr_metadata_partition.clone();
449    let pool = config.thread_pool.clone();
450
451    // Load bitmap metadata (pruned_chunks + pinned nodes for grafted MMR).
452    let (metadata, pruned_chunks, pinned_nodes) =
453        db::init_metadata(context.with_label("metadata"), &metadata_partition).await?;
454
455    // Initialize the activity status bitmap.
456    let mut status = BitMap::<N>::new_with_pruned_chunks(pruned_chunks)
457        .map_err(|_| Error::DataCorrupted("pruned chunks overflow"))?;
458
459    // Initialize the anydb with a callback that populates the activity status bitmap.
460    let last_known_inactivity_floor = Location::new(status.len());
461    let any = any::init_variable(
462        context.with_label("any"),
463        config.into(),
464        Some(last_known_inactivity_floor),
465        |append: bool, loc: Option<Location>| {
466            status.push(append);
467            if let Some(loc) = loc {
468                status.set_bit(*loc, false);
469            }
470        },
471        new_index,
472    )
473    .await?;
474
475    // Build the grafted MMR from the bitmap and ops MMR.
476    let mut hasher = StandardHasher::<H>::new();
477    let grafted_mmr = db::build_grafted_mmr::<H, N>(
478        &mut hasher,
479        &status,
480        &pinned_nodes,
481        &any.log.mmr,
482        pool.as_ref(),
483    )
484    .await?;
485
486    // Compute and cache the root.
487    let storage = grafting::Storage::new(&grafted_mmr, grafting::height::<N>(), &any.log.mmr);
488    let partial_chunk = db::partial_chunk(&status);
489    let ops_root = any.log.root();
490    let root = db::compute_db_root(&mut hasher, &storage, partial_chunk, &ops_root).await?;
491
492    Ok(db::Db {
493        any,
494        status,
495        grafted_mmr,
496        metadata: AsyncMutex::new(metadata),
497        thread_pool: pool,
498        root,
499    })
500}
501
502/// Extension trait for Current QMDB types that exposes bitmap information for testing.
503#[cfg(any(test, feature = "test-traits"))]
504pub trait BitmapPrunedBits {
505    /// Returns the number of bits that have been pruned from the bitmap.
506    fn pruned_bits(&self) -> u64;
507
508    /// Returns the value of the bit at the given index.
509    fn get_bit(&self, index: u64) -> bool;
510
511    /// Returns the position of the oldest retained bit.
512    fn oldest_retained(&self) -> impl core::future::Future<Output = u64> + Send;
513}
514
515#[cfg(test)]
516pub mod tests {
517    //! Shared test utilities for Current QMDB variants.
518
519    pub use super::BitmapPrunedBits;
520    use super::{ordered, unordered, FixedConfig, VariableConfig};
521    use crate::{
522        qmdb::{
523            any::traits::{DbAny, MerkleizedBatch as _, UnmerkleizedBatch as _},
524            store::{
525                tests::{TestKey, TestValue},
526                LogStore,
527            },
528            Error,
529        },
530        translator::Translator,
531    };
532    use commonware_runtime::{
533        buffer::paged::CacheRef,
534        deterministic::{self, Context},
535        BufferPooler, Metrics as _, Runner as _,
536    };
537    use commonware_utils::{NZUsize, NZU16, NZU64};
538    use core::future::Future;
539    use rand::{rngs::StdRng, RngCore, SeedableRng};
540    use std::num::{NonZeroU16, NonZeroUsize};
541    use tracing::warn;
542
543    // Janky page & cache sizes to exercise boundary conditions.
544    const PAGE_SIZE: NonZeroU16 = NZU16!(88);
545    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(8);
546
547    /// Shared config factory for fixed-value Current QMDB tests.
548    pub(crate) fn fixed_config<T: Translator + Default>(
549        partition_prefix: &str,
550        pooler: &impl BufferPooler,
551    ) -> FixedConfig<T> {
552        FixedConfig {
553            mmr_journal_partition: format!("{partition_prefix}-journal-partition"),
554            mmr_metadata_partition: format!("{partition_prefix}-metadata-partition"),
555            mmr_items_per_blob: NZU64!(11),
556            mmr_write_buffer: NZUsize!(1024),
557            log_journal_partition: format!("{partition_prefix}-partition-prefix"),
558            log_items_per_blob: NZU64!(7),
559            log_write_buffer: NZUsize!(1024),
560            grafted_mmr_metadata_partition: format!(
561                "{partition_prefix}-grafted-mmr-metadata-partition"
562            ),
563            translator: T::default(),
564            thread_pool: None,
565            page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
566        }
567    }
568
569    /// Shared config factory for variable-value Current QMDB tests with unit codec config.
570    pub(crate) fn variable_config<T: Translator + Default>(
571        partition_prefix: &str,
572        pooler: &impl BufferPooler,
573    ) -> VariableConfig<T, ((), ())> {
574        VariableConfig {
575            mmr_journal_partition: format!("{partition_prefix}-journal-partition"),
576            mmr_metadata_partition: format!("{partition_prefix}-metadata-partition"),
577            mmr_items_per_blob: NZU64!(11),
578            mmr_write_buffer: NZUsize!(1024),
579            log_partition: format!("{partition_prefix}-partition-prefix"),
580            log_items_per_blob: NZU64!(7),
581            log_write_buffer: NZUsize!(1024),
582            log_compression: None,
583            log_codec_config: ((), ()),
584            grafted_mmr_metadata_partition: format!(
585                "{partition_prefix}-grafted-mmr-metadata-partition"
586            ),
587            translator: T::default(),
588            thread_pool: None,
589            page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
590        }
591    }
592
593    /// Commit a set of writes as a single batch.
594    async fn commit_writes<C: DbAny>(
595        db: &mut C,
596        writes: impl IntoIterator<Item = (C::Key, Option<<C as LogStore>::Value>)>,
597    ) -> Result<(), Error> {
598        let mut batch = db.new_batch();
599        for (k, v) in writes {
600            batch.write(k, v);
601        }
602        let finalized = batch.merkleize(None).await?.finalize();
603        db.apply_batch(finalized).await?;
604        Ok(())
605    }
606
607    /// Apply random operations to the given db, committing them (randomly and at the end) only if
608    /// `commit_changes` is true. Returns the db; callers should commit if needed.
609    ///
610    /// Returns a boxed future to prevent stack overflow when monomorphized across many DB variants.
611    async fn apply_random_ops_inner<C>(
612        num_elements: u64,
613        commit_changes: bool,
614        rng_seed: u64,
615        mut db: C,
616    ) -> Result<C, Error>
617    where
618        C: DbAny,
619        C::Key: TestKey,
620        <C as LogStore>::Value: TestValue,
621    {
622        // Log the seed with high visibility to make failures reproducible.
623        warn!("rng_seed={}", rng_seed);
624        let mut rng = StdRng::seed_from_u64(rng_seed);
625
626        // First loop: all initial writes in one batch.
627        let writes: Vec<_> = (0u64..num_elements)
628            .map(|i| {
629                let k = TestKey::from_seed(i);
630                let v = TestValue::from_seed(rng.next_u64());
631                (k, Some(v))
632            })
633            .collect();
634        if commit_changes {
635            commit_writes(&mut db, writes).await?;
636        }
637
638        // Randomly update / delete them. We use a delete frequency that is 1/7th of the update
639        // frequency. Accumulate writes and commit periodically.
640        let mut pending: Vec<(C::Key, Option<<C as LogStore>::Value>)> = Vec::new();
641        for _ in 0u64..num_elements * 10 {
642            let rand_key = TestKey::from_seed(rng.next_u64() % num_elements);
643            if rng.next_u32() % 7 == 0 {
644                pending.push((rand_key, None));
645                continue;
646            }
647            let v = TestValue::from_seed(rng.next_u64());
648            pending.push((rand_key, Some(v)));
649            if commit_changes && rng.next_u32() % 20 == 0 {
650                commit_writes(&mut db, pending.drain(..)).await?;
651            }
652        }
653        if commit_changes {
654            commit_writes(&mut db, pending).await?;
655        }
656        Ok(db)
657    }
658
659    pub fn apply_random_ops<C>(
660        num_elements: u64,
661        commit_changes: bool,
662        rng_seed: u64,
663        db: C,
664    ) -> std::pin::Pin<Box<dyn Future<Output = Result<C, Error>>>>
665    where
666        C: DbAny + 'static,
667        C::Key: TestKey,
668        <C as LogStore>::Value: TestValue,
669    {
670        Box::pin(apply_random_ops_inner::<C>(
671            num_elements,
672            commit_changes,
673            rng_seed,
674            db,
675        ))
676    }
677
678    /// Run `test_build_random_close_reopen` against a database factory.
679    ///
680    /// The factory should return a database when given a context and partition name.
681    /// The factory will be called multiple times to test reopening.
682    pub fn test_build_random_close_reopen<C, F, Fut>(mut open_db: F)
683    where
684        C: DbAny + 'static,
685        C::Key: TestKey,
686        <C as LogStore>::Value: TestValue,
687        F: FnMut(Context, String) -> Fut + Clone,
688        Fut: Future<Output = C>,
689    {
690        const ELEMENTS: u64 = 1000;
691
692        let executor = deterministic::Runner::default();
693        let mut open_db_clone = open_db.clone();
694        let state1 = executor.start(|mut context| async move {
695            let partition = "build-random".to_string();
696            let rng_seed = context.next_u64();
697            let mut db: C = open_db_clone(context.with_label("first"), partition.clone()).await;
698            db = apply_random_ops::<C>(ELEMENTS, true, rng_seed, db)
699                .await
700                .unwrap();
701            let finalized = db.new_batch().merkleize(None).await.unwrap().finalize();
702            db.apply_batch(finalized).await.unwrap();
703            db.sync().await.unwrap();
704
705            // Drop and reopen the db
706            let root = db.root();
707            drop(db);
708            let db: C = open_db_clone(context.with_label("second"), partition).await;
709
710            // Ensure the root matches
711            assert_eq!(db.root(), root);
712
713            db.destroy().await.unwrap();
714            context.auditor().state()
715        });
716
717        // Run again to verify determinism
718        let executor = deterministic::Runner::default();
719        let state2 = executor.start(|mut context| async move {
720            let partition = "build-random".to_string();
721            let rng_seed = context.next_u64();
722            let mut db: C = open_db(context.with_label("first"), partition.clone()).await;
723            db = apply_random_ops::<C>(ELEMENTS, true, rng_seed, db)
724                .await
725                .unwrap();
726            let finalized = db.new_batch().merkleize(None).await.unwrap().finalize();
727            db.apply_batch(finalized).await.unwrap();
728            db.sync().await.unwrap();
729
730            let root = db.root();
731            drop(db);
732            let db: C = open_db(context.with_label("second"), partition).await;
733            assert_eq!(db.root(), root);
734
735            db.destroy().await.unwrap();
736            context.auditor().state()
737        });
738
739        assert_eq!(state1, state2);
740    }
741
742    /// Run `test_simulate_write_failures` against a database factory.
743    ///
744    /// This test builds a random database and simulates recovery from different types of
745    /// failure scenarios.
746    pub fn test_simulate_write_failures<C, F, Fut>(mut open_db: F)
747    where
748        C: DbAny + 'static,
749        C::Key: TestKey,
750        <C as LogStore>::Value: TestValue,
751        F: FnMut(Context, String) -> Fut + Clone,
752        Fut: Future<Output = C>,
753    {
754        const ELEMENTS: u64 = 1000;
755
756        let executor = deterministic::Runner::default();
757        // Box the future to prevent stack overflow when monomorphized across DB variants.
758        executor.start(|mut context| {
759            Box::pin(async move {
760                let partition = "build-random-fail-commit".to_string();
761                let rng_seed = context.next_u64();
762                let mut db: C = open_db(context.with_label("first"), partition.clone()).await;
763                db = apply_random_ops::<C>(ELEMENTS, true, rng_seed, db)
764                    .await
765                    .unwrap();
766                commit_writes(&mut db, []).await.unwrap();
767                let committed_root = db.root();
768                let committed_op_count = db.bounds().await.end;
769                let committed_inactivity_floor = db.inactivity_floor_loc().await;
770                db.prune(committed_inactivity_floor).await.unwrap();
771
772                // Perform more random operations without committing any of them.
773                let db = apply_random_ops::<C>(ELEMENTS, false, rng_seed + 1, db)
774                    .await
775                    .unwrap();
776
777                // SCENARIO #1: Simulate a crash that happens before any writes. Upon reopening, the
778                // state of the DB should be as of the last commit.
779                drop(db);
780                let db: C = open_db(context.with_label("scenario1"), partition.clone()).await;
781                assert_eq!(db.root(), committed_root);
782                assert_eq!(db.bounds().await.end, committed_op_count);
783
784                // Re-apply the exact same operations, this time committed.
785                let db = apply_random_ops::<C>(ELEMENTS, true, rng_seed + 1, db)
786                    .await
787                    .unwrap();
788
789                // SCENARIO #2: Simulate a crash that happens after the any db has been committed, but
790                // before sync/prune is called. We do this by dropping the db without calling
791                // sync or prune.
792                let committed_op_count = db.bounds().await.end;
793                drop(db);
794
795                // We should be able to recover, so the root should differ from the previous commit, and
796                // the op count should be greater than before.
797                let db: C = open_db(context.with_label("scenario2"), partition.clone()).await;
798                let scenario_2_root = db.root();
799
800                // To confirm the second committed hash is correct we'll re-build the DB in a new
801                // partition, but without any failures. They should have the exact same state.
802                let fresh_partition = "build-random-fail-commit-fresh".to_string();
803                let mut db: C = open_db(context.with_label("fresh"), fresh_partition.clone()).await;
804                db = apply_random_ops::<C>(ELEMENTS, true, rng_seed, db)
805                    .await
806                    .unwrap();
807                commit_writes(&mut db, []).await.unwrap();
808                db = apply_random_ops::<C>(ELEMENTS, true, rng_seed + 1, db)
809                    .await
810                    .unwrap();
811                db.prune(db.inactivity_floor_loc().await).await.unwrap();
812                // State from scenario #2 should match that of a successful commit.
813                assert_eq!(db.bounds().await.end, committed_op_count);
814                assert_eq!(db.root(), scenario_2_root);
815
816                db.destroy().await.unwrap();
817            })
818        });
819    }
820
821    /// Run `test_different_pruning_delays_same_root` against a database factory.
822    ///
823    /// This test verifies that pruning operations do not affect the root hash - two databases
824    /// with identical operations but different pruning schedules should have the same root.
825    pub fn test_different_pruning_delays_same_root<C, F, Fut>(mut open_db: F)
826    where
827        C: DbAny,
828        C::Key: TestKey,
829        <C as LogStore>::Value: TestValue,
830        F: FnMut(Context, String) -> Fut + Clone,
831        Fut: Future<Output = C>,
832    {
833        const NUM_OPERATIONS: u64 = 1000;
834
835        let executor = deterministic::Runner::default();
836        let mut open_db_clone = open_db.clone();
837        executor.start(|context| async move {
838            // Create two databases that are identical other than how they are pruned.
839            let mut db_no_pruning: C =
840                open_db_clone(context.with_label("no_pruning"), "no-pruning-test".into()).await;
841            let mut db_pruning: C =
842                open_db(context.with_label("pruning"), "pruning-test".into()).await;
843
844            // Apply identical operations to both databases, but only prune one.
845            // Accumulate writes between commits.
846            let mut pending_no_pruning: Vec<(C::Key, Option<<C as LogStore>::Value>)> = Vec::new();
847            let mut pending_pruning: Vec<(C::Key, Option<<C as LogStore>::Value>)> = Vec::new();
848            for i in 0..NUM_OPERATIONS {
849                let key: C::Key = TestKey::from_seed(i);
850                let value: <C as LogStore>::Value = TestValue::from_seed(i * 1000);
851
852                pending_no_pruning.push((key, Some(value.clone())));
853                pending_pruning.push((key, Some(value)));
854
855                // Commit periodically
856                if i % 50 == 49 {
857                    commit_writes(&mut db_no_pruning, pending_no_pruning.drain(..))
858                        .await
859                        .unwrap();
860                    commit_writes(&mut db_pruning, pending_pruning.drain(..))
861                        .await
862                        .unwrap();
863                    db_pruning
864                        .prune(db_no_pruning.inactivity_floor_loc().await)
865                        .await
866                        .unwrap();
867                }
868            }
869
870            // Final commit for remaining writes.
871            commit_writes(&mut db_no_pruning, pending_no_pruning)
872                .await
873                .unwrap();
874            commit_writes(&mut db_pruning, pending_pruning)
875                .await
876                .unwrap();
877
878            // Get roots from both databases - they should match
879            let root_no_pruning = db_no_pruning.root();
880            let root_pruning = db_pruning.root();
881            assert_eq!(root_no_pruning, root_pruning);
882
883            // Also verify inactivity floors match
884            assert_eq!(
885                db_no_pruning.inactivity_floor_loc().await,
886                db_pruning.inactivity_floor_loc().await
887            );
888
889            db_no_pruning.destroy().await.unwrap();
890            db_pruning.destroy().await.unwrap();
891        });
892    }
893
894    /// Run `test_sync_persists_bitmap_pruning_boundary` against a database factory.
895    ///
896    /// This test verifies that calling `sync()` persists the bitmap pruning boundary that was
897    /// set during `commit()`. If `sync()` didn't call `write_pruned`, the
898    /// `pruned_bits()` count would be 0 after reopen instead of the expected value.
899    pub fn test_sync_persists_bitmap_pruning_boundary<C, F, Fut>(mut open_db: F)
900    where
901        C: DbAny + BitmapPrunedBits + 'static,
902        C::Key: TestKey,
903        <C as LogStore>::Value: TestValue,
904        F: FnMut(Context, String) -> Fut + Clone,
905        Fut: Future<Output = C>,
906    {
907        const ELEMENTS: u64 = 500;
908
909        let executor = deterministic::Runner::default();
910        let mut open_db_clone = open_db.clone();
911        executor.start(|mut context| async move {
912            let partition = "sync-bitmap-pruning".to_string();
913            let rng_seed = context.next_u64();
914            let mut db: C = open_db_clone(context.with_label("first"), partition.clone()).await;
915
916            // Apply random operations with commits to advance the inactivity floor.
917            db = apply_random_ops::<C>(ELEMENTS, true, rng_seed, db).await.unwrap();
918            let finalized = db.new_batch().merkleize(None).await.unwrap().finalize();
919            db.apply_batch(finalized).await.unwrap();
920
921            // The bitmap should have been pruned during commit().
922            let pruned_bits_before = db.pruned_bits();
923            warn!(
924                "pruned_bits_before={}, inactivity_floor={}, op_count={}",
925                pruned_bits_before,
926                *db.inactivity_floor_loc().await,
927                *db.bounds().await.end
928            );
929
930            // Verify we actually have some pruning (otherwise the test is meaningless).
931            assert!(
932                pruned_bits_before > 0,
933                "Expected bitmap to have pruned bits after merkleization"
934            );
935
936            // Call sync() WITHOUT calling prune(). The bitmap pruning boundary was set
937            // during commit(), and sync() should persist it.
938            db.sync().await.unwrap();
939
940            // Record the root before dropping.
941            let root_before = db.root();
942            drop(db);
943
944            // Reopen the database.
945            let db: C = open_db(context.with_label("second"), partition).await;
946
947            // The pruned bits count should match. If sync() didn't persist the bitmap pruned
948            // state, this would be 0.
949            let pruned_bits_after = db.pruned_bits();
950            warn!("pruned_bits_after={}", pruned_bits_after);
951
952            assert_eq!(
953                pruned_bits_after, pruned_bits_before,
954                "Bitmap pruned bits mismatch after reopen - sync() may not have called write_pruned()"
955            );
956
957            // Also verify the root matches.
958            assert_eq!(db.root(), root_before);
959
960            db.destroy().await.unwrap();
961        });
962    }
963
964    /// Run `test_current_db_build_big` against a database factory.
965    ///
966    /// This test builds a database with 1000 keys, updates some, deletes some, and verifies that
967    /// the final state matches an independently computed HashMap. It also verifies that the state
968    /// persists correctly after close and reopen.
969    pub fn test_current_db_build_big<C, F, Fut>(mut open_db: F)
970    where
971        C: DbAny,
972        C::Key: TestKey,
973        <C as LogStore>::Value: TestValue,
974        F: FnMut(Context, String) -> Fut + Clone,
975        Fut: Future<Output = C>,
976    {
977        const ELEMENTS: u64 = 1000;
978
979        let executor = deterministic::Runner::default();
980        let mut open_db_clone = open_db.clone();
981        executor.start(|context| async move {
982            let mut db: C = open_db_clone(context.with_label("first"), "build-big".into()).await;
983
984            let mut map = std::collections::HashMap::<C::Key, <C as LogStore>::Value>::default();
985
986            // All creates, updates, and deletes in one batch.
987            let finalized = {
988                let mut batch = db.new_batch();
989
990                // Initial creates
991                for i in 0u64..ELEMENTS {
992                    let k: C::Key = TestKey::from_seed(i);
993                    let v: <C as LogStore>::Value = TestValue::from_seed(i * 1000);
994                    batch.write(k, Some(v.clone()));
995                    map.insert(k, v);
996                }
997
998                // Update every 3rd key
999                for i in 0u64..ELEMENTS {
1000                    if i % 3 != 0 {
1001                        continue;
1002                    }
1003                    let k: C::Key = TestKey::from_seed(i);
1004                    let v: <C as LogStore>::Value = TestValue::from_seed((i + 1) * 10000);
1005                    batch.write(k, Some(v.clone()));
1006                    map.insert(k, v);
1007                }
1008
1009                // Delete every 7th key
1010                for i in 0u64..ELEMENTS {
1011                    if i % 7 != 1 {
1012                        continue;
1013                    }
1014                    let k: C::Key = TestKey::from_seed(i);
1015                    batch.write(k, None);
1016                    map.remove(&k);
1017                }
1018
1019                batch.merkleize(None).await.unwrap().finalize()
1020            };
1021            db.apply_batch(finalized).await.unwrap();
1022
1023            // Sync and prune.
1024            db.sync().await.unwrap();
1025            db.prune(db.inactivity_floor_loc().await).await.unwrap();
1026
1027            // Record root before dropping.
1028            let root = db.root();
1029            db.sync().await.unwrap();
1030            drop(db);
1031
1032            // Reopen the db and verify it has exactly the same state.
1033            let db: C = open_db(context.with_label("second"), "build-big".into()).await;
1034            assert_eq!(root, db.root());
1035
1036            // Confirm the db's state matches that of the separate map we computed independently.
1037            for i in 0u64..ELEMENTS {
1038                let k: C::Key = TestKey::from_seed(i);
1039                if let Some(map_value) = map.get(&k) {
1040                    let Some(db_value) = db.get(&k).await.unwrap() else {
1041                        panic!("key not found in db: {k}");
1042                    };
1043                    assert_eq!(*map_value, db_value);
1044                } else {
1045                    assert!(db.get(&k).await.unwrap().is_none());
1046                }
1047            }
1048        });
1049    }
1050
1051    /// Run `test_stale_changeset_side_effect_free` against a database factory.
1052    ///
1053    /// The stale batch must be rejected without mutating the committed state.
1054    pub fn test_stale_changeset_side_effect_free<C, F, Fut>(mut open_db: F)
1055    where
1056        C: DbAny,
1057        C::Key: TestKey,
1058        <C as LogStore>::Value: TestValue,
1059        F: FnMut(Context, String) -> Fut,
1060        Fut: Future<Output = C>,
1061    {
1062        let executor = deterministic::Runner::default();
1063        executor.start(|context| async move {
1064            let mut db: C =
1065                open_db(context.with_label("db"), "stale-side-effect-free".into()).await;
1066
1067            let key1 = <C::Key as TestKey>::from_seed(1);
1068            let key2 = <C::Key as TestKey>::from_seed(2);
1069            let value1 = <<C as LogStore>::Value as TestValue>::from_seed(10);
1070            let value2 = <<C as LogStore>::Value as TestValue>::from_seed(20);
1071
1072            let changeset_a = {
1073                let mut batch = db.new_batch();
1074                batch.write(key1, Some(value1.clone()));
1075                batch.merkleize(None).await.unwrap().finalize()
1076            };
1077            let changeset_b = {
1078                let mut batch = db.new_batch();
1079                batch.write(key2, Some(value2));
1080                batch.merkleize(None).await.unwrap().finalize()
1081            };
1082
1083            db.apply_batch(changeset_a).await.unwrap();
1084            let expected_root = db.root();
1085            let expected_bounds = db.bounds().await;
1086            let expected_metadata = db.get_metadata().await.unwrap();
1087            assert_eq!(db.get(&key1).await.unwrap(), Some(value1.clone()));
1088            assert_eq!(db.get(&key2).await.unwrap(), None);
1089
1090            let result = db.apply_batch(changeset_b).await;
1091            assert!(
1092                matches!(result, Err(Error::StaleChangeset { .. })),
1093                "expected StaleChangeset error, got {result:?}"
1094            );
1095            assert_eq!(db.root(), expected_root);
1096            assert_eq!(db.bounds().await, expected_bounds);
1097            assert_eq!(db.get_metadata().await.unwrap(), expected_metadata);
1098            assert_eq!(db.get(&key1).await.unwrap(), Some(value1));
1099            assert_eq!(db.get(&key2).await.unwrap(), None);
1100
1101            db.destroy().await.unwrap();
1102        });
1103    }
1104
1105    use crate::translator::OneCap;
1106    use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
1107    use commonware_macros::{test_group, test_traced};
1108
1109    // Type aliases for all 12 variants (all use OneCap for collision coverage).
1110    type OrderedFixedDb = ordered::fixed::Db<Context, Digest, Digest, Sha256, OneCap, 32>;
1111    type OrderedVariableDb = ordered::variable::Db<Context, Digest, Digest, Sha256, OneCap, 32>;
1112    type UnorderedFixedDb = unordered::fixed::Db<Context, Digest, Digest, Sha256, OneCap, 32>;
1113    type UnorderedVariableDb = unordered::variable::Db<Context, Digest, Digest, Sha256, OneCap, 32>;
1114    type OrderedFixedP1Db =
1115        ordered::fixed::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 1, 32>;
1116    type OrderedVariableP1Db =
1117        ordered::variable::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 1, 32>;
1118    type UnorderedFixedP1Db =
1119        unordered::fixed::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 1, 32>;
1120    type UnorderedVariableP1Db =
1121        unordered::variable::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 1, 32>;
1122    type OrderedFixedP2Db =
1123        ordered::fixed::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 2, 32>;
1124    type OrderedVariableP2Db =
1125        ordered::variable::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 2, 32>;
1126    type UnorderedFixedP2Db =
1127        unordered::fixed::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 2, 32>;
1128    type UnorderedVariableP2Db =
1129        unordered::variable::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 2, 32>;
1130
1131    // Helper macro to create an open_db closure for a specific variant.
1132    macro_rules! open_db_fn {
1133        ($db:ty, $cfg:ident) => {
1134            |ctx: Context, partition: String| async move {
1135                <$db>::init(ctx.clone(), $cfg::<OneCap>(&partition, &ctx))
1136                    .await
1137                    .unwrap()
1138            }
1139        };
1140    }
1141
1142    // Defines all 12 variants. Calls $cb!($($args)*, $label, $type, $config) for each.
1143    macro_rules! with_all_variants {
1144        ($cb:ident!($($args:tt)*)) => {
1145            $cb!($($args)*, "of", OrderedFixedDb, fixed_config);
1146            $cb!($($args)*, "ov", OrderedVariableDb, variable_config);
1147            $cb!($($args)*, "uf", UnorderedFixedDb, fixed_config);
1148            $cb!($($args)*, "uv", UnorderedVariableDb, variable_config);
1149            $cb!($($args)*, "ofp1", OrderedFixedP1Db, fixed_config);
1150            $cb!($($args)*, "ovp1", OrderedVariableP1Db, variable_config);
1151            $cb!($($args)*, "ufp1", UnorderedFixedP1Db, fixed_config);
1152            $cb!($($args)*, "uvp1", UnorderedVariableP1Db, variable_config);
1153            $cb!($($args)*, "ofp2", OrderedFixedP2Db, fixed_config);
1154            $cb!($($args)*, "ovp2", OrderedVariableP2Db, variable_config);
1155            $cb!($($args)*, "ufp2", UnorderedFixedP2Db, fixed_config);
1156            $cb!($($args)*, "uvp2", UnorderedVariableP2Db, variable_config);
1157        };
1158    }
1159
1160    // Defines 6 ordered variants.
1161    macro_rules! with_ordered_variants {
1162        ($cb:ident!($($args:tt)*)) => {
1163            $cb!($($args)*, "of", OrderedFixedDb, fixed_config);
1164            $cb!($($args)*, "ov", OrderedVariableDb, variable_config);
1165            $cb!($($args)*, "ofp1", OrderedFixedP1Db, fixed_config);
1166            $cb!($($args)*, "ovp1", OrderedVariableP1Db, variable_config);
1167            $cb!($($args)*, "ofp2", OrderedFixedP2Db, fixed_config);
1168            $cb!($($args)*, "ovp2", OrderedVariableP2Db, variable_config);
1169        };
1170    }
1171
1172    // Defines 6 unordered variants.
1173    macro_rules! with_unordered_variants {
1174        ($cb:ident!($($args:tt)*)) => {
1175            $cb!($($args)*, "uf", UnorderedFixedDb, fixed_config);
1176            $cb!($($args)*, "uv", UnorderedVariableDb, variable_config);
1177            $cb!($($args)*, "ufp1", UnorderedFixedP1Db, fixed_config);
1178            $cb!($($args)*, "uvp1", UnorderedVariableP1Db, variable_config);
1179            $cb!($($args)*, "ufp2", UnorderedFixedP2Db, fixed_config);
1180            $cb!($($args)*, "uvp2", UnorderedVariableP2Db, variable_config);
1181        };
1182    }
1183
1184    // Runner macros - receive common args followed by (label, type, config).
1185    macro_rules! test_simple {
1186        ($f:expr, $l:literal, $db:ty, $cfg:ident) => {
1187            Box::pin(async {
1188                $f(open_db_fn!($db, $cfg));
1189            })
1190            .await
1191        };
1192    }
1193
1194    // Macro to run a test on DB variants.
1195    macro_rules! for_all_variants {
1196        (simple: $f:expr) => {{
1197            with_all_variants!(test_simple!($f));
1198        }};
1199        (ordered: $f:expr) => {{
1200            with_ordered_variants!(test_simple!($f));
1201        }};
1202        (unordered: $f:expr) => {{
1203            with_unordered_variants!(test_simple!($f));
1204        }};
1205    }
1206
1207    // Wrapper functions for build_big tests with ordered/unordered expected values.
1208    fn test_ordered_build_big<C, F, Fut>(open_db: F)
1209    where
1210        C: DbAny,
1211        C::Key: TestKey,
1212        <C as LogStore>::Value: TestValue,
1213        F: FnMut(Context, String) -> Fut + Clone,
1214        Fut: Future<Output = C>,
1215    {
1216        test_current_db_build_big::<C, F, Fut>(open_db);
1217    }
1218
1219    fn test_unordered_build_big<C, F, Fut>(open_db: F)
1220    where
1221        C: DbAny,
1222        C::Key: TestKey,
1223        <C as LogStore>::Value: TestValue,
1224        F: FnMut(Context, String) -> Fut + Clone,
1225        Fut: Future<Output = C>,
1226    {
1227        test_current_db_build_big::<C, F, Fut>(open_db);
1228    }
1229
1230    #[test_group("slow")]
1231    #[test_traced("WARN")]
1232    fn test_all_variants_build_random_close_reopen() {
1233        let executor = deterministic::Runner::default();
1234        executor.start(|_context| async move {
1235            for_all_variants!(simple: test_build_random_close_reopen);
1236        });
1237    }
1238
1239    #[test_group("slow")]
1240    #[test_traced("WARN")]
1241    fn test_all_variants_simulate_write_failures() {
1242        let executor = deterministic::Runner::default();
1243        executor.start(|_context| async move {
1244            for_all_variants!(simple: test_simulate_write_failures);
1245        });
1246    }
1247
1248    #[test_group("slow")]
1249    #[test_traced("WARN")]
1250    fn test_all_variants_different_pruning_delays_same_root() {
1251        let executor = deterministic::Runner::default();
1252        executor.start(|_context| async move {
1253            for_all_variants!(simple: test_different_pruning_delays_same_root);
1254        });
1255    }
1256
1257    #[test_group("slow")]
1258    #[test_traced("WARN")]
1259    fn test_all_variants_sync_persists_bitmap_pruning_boundary() {
1260        let executor = deterministic::Runner::default();
1261        executor.start(|_context| async move {
1262            for_all_variants!(simple: test_sync_persists_bitmap_pruning_boundary);
1263        });
1264    }
1265
1266    #[test_traced("WARN")]
1267    fn test_all_variants_stale_changeset_side_effect_free() {
1268        let executor = deterministic::Runner::default();
1269        executor.start(|_context| async move {
1270            for_all_variants!(simple: test_stale_changeset_side_effect_free);
1271        });
1272    }
1273
1274    #[test_group("slow")]
1275    #[test_traced("WARN")]
1276    fn test_ordered_variants_build_big() {
1277        let executor = deterministic::Runner::default();
1278        executor.start(|_context| async move {
1279            for_all_variants!(ordered: test_ordered_build_big);
1280        });
1281    }
1282
1283    #[test_group("slow")]
1284    #[test_traced("WARN")]
1285    fn test_unordered_variants_build_big() {
1286        let executor = deterministic::Runner::default();
1287        executor.start(|_context| async move {
1288            for_all_variants!(unordered: test_unordered_build_big);
1289        });
1290    }
1291
1292    #[test_group("slow")]
1293    #[test_traced("DEBUG")]
1294    fn test_ordered_variants_build_small_close_reopen() {
1295        let executor = deterministic::Runner::default();
1296        executor.start(|_context| async move {
1297            for_all_variants!(ordered: ordered::tests::test_build_small_close_reopen);
1298        });
1299    }
1300
1301    #[test_group("slow")]
1302    #[test_traced("DEBUG")]
1303    fn test_unordered_variants_build_small_close_reopen() {
1304        let executor = deterministic::Runner::default();
1305        executor.start(|_context| async move {
1306            for_all_variants!(unordered: unordered::tests::test_build_small_close_reopen);
1307        });
1308    }
1309
1310    // ---- Current-level batch API tests ----
1311    //
1312    // These exercise the current wrapper's batch methods (root, ops_root,
1313    // MerkleizedBatch::get, batch chaining) which layer bitmap and grafted MMR
1314    // computation on top of the `any` batch.
1315
1316    fn key(i: u64) -> Digest {
1317        Sha256::hash(&i.to_be_bytes())
1318    }
1319
1320    fn val(i: u64) -> Digest {
1321        Sha256::hash(&(i + 10000).to_be_bytes())
1322    }
1323
1324    /// MerkleizedBatch::root() returns the canonical root that matches db.root()
1325    /// after apply. ops_root() differs from root() because the canonical root
1326    /// includes the bitmap/grafted MMR layers.
1327    #[test_traced("INFO")]
1328    fn test_current_batch_speculative_root() {
1329        let executor = deterministic::Runner::default();
1330        executor.start(|context| async move {
1331            let ctx = context.with_label("db");
1332            let mut db: UnorderedVariableDb =
1333                UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>("sr", &ctx))
1334                    .await
1335                    .unwrap();
1336
1337            let mut batch = db.new_batch();
1338            for i in 0..10 {
1339                batch.write(key(i), Some(val(i)));
1340            }
1341            let merkleized = batch.merkleize(None).await.unwrap();
1342            let speculative_root = merkleized.root();
1343            let ops_root = merkleized.ops_root();
1344
1345            // Canonical root includes bitmap/grafted layers, so it differs from ops root.
1346            assert_ne!(speculative_root, ops_root);
1347
1348            let finalized = merkleized.finalize();
1349            db.apply_batch(finalized).await.unwrap();
1350
1351            // Speculative canonical root matches the committed canonical root.
1352            assert_eq!(db.root(), speculative_root);
1353
1354            db.destroy().await.unwrap();
1355        });
1356    }
1357
1358    /// MerkleizedBatch::get() at the current level reads overlay then base DB.
1359    #[test_traced("INFO")]
1360    fn test_current_batch_merkleized_get() {
1361        let executor = deterministic::Runner::default();
1362        executor.start(|context| async move {
1363            let ctx = context.with_label("db");
1364            let mut db: UnorderedVariableDb =
1365                UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>("mg", &ctx))
1366                    .await
1367                    .unwrap();
1368
1369            let ka = key(0);
1370            let kb = key(1);
1371            let kc = key(2);
1372
1373            // Pre-populate A.
1374            {
1375                let mut batch = db.new_batch();
1376                batch.write(ka, Some(val(0)));
1377                let finalized = batch.merkleize(None).await.unwrap().finalize();
1378                db.apply_batch(finalized).await.unwrap();
1379            }
1380
1381            // Batch: update A, delete nothing, create B.
1382            let va2 = val(100);
1383            let vb = val(1);
1384            let mut batch = db.new_batch();
1385            batch.write(ka, Some(va2));
1386            batch.write(kb, Some(vb));
1387            let merkleized = batch.merkleize(None).await.unwrap();
1388
1389            assert_eq!(merkleized.get(&ka).await.unwrap(), Some(va2));
1390            assert_eq!(merkleized.get(&kb).await.unwrap(), Some(vb));
1391            assert_eq!(merkleized.get(&kc).await.unwrap(), None);
1392
1393            db.destroy().await.unwrap();
1394        });
1395    }
1396
1397    /// Batch chaining at the current level: parent -> merkleize -> child -> merkleize.
1398    /// Child's canonical root matches db.root() after apply.
1399    #[test_traced("INFO")]
1400    fn test_current_batch_chaining() {
1401        let executor = deterministic::Runner::default();
1402        executor.start(|context| async move {
1403            let ctx = context.with_label("db");
1404            let mut db: UnorderedVariableDb =
1405                UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>("ch", &ctx))
1406                    .await
1407                    .unwrap();
1408
1409            // Parent batch writes keys 0..5.
1410            let mut parent = db.new_batch();
1411            for i in 0..5 {
1412                parent.write(key(i), Some(val(i)));
1413            }
1414            let parent_m = parent.merkleize(None).await.unwrap();
1415
1416            // Child batch writes keys 5..10 and overrides key 0.
1417            let mut child = parent_m.new_batch();
1418            for i in 5..10 {
1419                child.write(key(i), Some(val(i)));
1420            }
1421            child.write(key(0), Some(val(999)));
1422            let child_m = child.merkleize(None).await.unwrap();
1423
1424            let child_root = child_m.root();
1425
1426            // Child get reads through all layers.
1427            assert_eq!(child_m.get(&key(0)).await.unwrap(), Some(val(999)));
1428            assert_eq!(child_m.get(&key(3)).await.unwrap(), Some(val(3)));
1429            assert_eq!(child_m.get(&key(7)).await.unwrap(), Some(val(7)));
1430
1431            let finalized = child_m.finalize();
1432            db.apply_batch(finalized).await.unwrap();
1433            assert_eq!(db.root(), child_root);
1434
1435            // Verify all keys are correct.
1436            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(999)));
1437            for i in 1..10 {
1438                assert_eq!(db.get(&key(i)).await.unwrap(), Some(val(i)));
1439            }
1440
1441            db.destroy().await.unwrap();
1442        });
1443    }
1444}