Skip to main content

commonware_storage/qmdb/current/
mod.rs

1//! A _Current_ authenticated database provides succinct proofs of _any_ value ever associated with
2//! a key, and also whether that value is the _current_ value associated with it.
3//!
4//! # Examples
5//!
6//! See [`crate::qmdb::any`] for batch API examples (forking, sequential commit, staleness). The
7//! Current layer uses the same batch API.
8//!
9//! # Batch validity
10//!
11//! Current batches are branch-scoped views, not immutable snapshots.
12//!
13//! A batch remains valid only while its ancestor chain is still the committed prefix of the DB.
14//! Once a non-ancestor batch is applied, that batch and all of its descendants are invalid objects:
15//! do not read through them, do not build children from them, and do not attempt to apply them.
16//!
17//! A short rule of thumb:
18//! - A batch is only usable while it stays on the winning branch.
19//!
20//! Valid:
21//! - Build `A`, apply `A`, then build `B` from `A` and read or merkleize `B`.
22//! - Call [`Db::to_batch`](db::Db::to_batch) and use the returned batch only while no divergent
23//!   branch has been applied.
24//!
25//! Invalid:
26//! - Build siblings `B1` and `B2`, apply `B1`, then call `B2.get()`, `B2.new_batch()`, or
27//!   `apply_batch(B2)`.
28//! - Hold `snapshot = db.to_batch()`, mutate the DB through another branch, then use `snapshot`
29//!   again.
30//!
31//! # Motivation
32//!
33//! An [crate::qmdb::any] ("Any") database can prove that a key had a particular value at some
34//! point, but it cannot prove that the value is still current -- some later operation may have
35//! updated or deleted it. A Current database adds exactly this capability by maintaining a bitmap
36//! that tracks which operations are _active_ (i.e. represent the current state of their key).
37//!
38//! To make this useful, a verifier needs both the operation and its activity status authenticated
39//! under a single root. We achieve this by _grafting_ bitmap chunks onto the operations tree.
40//!
41//! # Data structures
42//!
43//! A Current database ([db::Db]) wraps an Any database and adds:
44//!
45//! - **Status bitmap** ([BitMap]): One bit per operation in the log. Bit _i_ is 1 if operation _i_
46//!   is active, 0 otherwise. The bitmap is divided into fixed-size chunks of `N` bytes (i.e. `N *
47//!   8` bits each). `N` must be a power of two.
48//!
49//!   One exception by convention: the *current* `last_commit_loc` carries bit = 1 even though a
50//!   CommitFloor is not an active update — earlier (intermediate) CommitFloors carry bit =
51//!   0. Maintaining this makes the chunk containing the latest commit deterministic across init and
52//!   `apply_batch`.
53//!
54//!   The bitmap lives on the inner `any::Db.bitmap`; `current::Db` reads through it for
55//!   grafted-tree leaves and proofs.
56//!
57//! - **Grafted tree**: An in-memory Merkle structure of digests at and above the _grafting height_
58//!   in the ops tree. This is the core of how bitmap and ops state are combined into a single
59//!   authenticated structure (see below).
60//!
61//! - **Bitmap metadata** (`Metadata`): Persists the pruning boundary and "pinned" digests needed to
62//!   restore the grafted tree after pruning old bitmap chunks.
63//!
64//! # Grafting: combining the activity status bitmap and the ops tree
65//!
66//! ## The problem
67//!
68//! Naively authenticating the bitmap and ops tree as two independent Merkle structures would
69//! require two separate proofs per operation -- one for the operation's value, one for its activity
70//! status. This doubles proof sizes.
71//!
72//! ## The solution
73//!
74//! We combine ("graft") the two structures at a specific height in the ops tree called the
75//! _grafting height_. The grafting height `h = log2(N * 8)` is chosen so that each subtree of
76//! height `h` in the ops tree covers exactly one bitmap chunk's worth of operations.
77//!
78//! At the grafting height, instead of using the ops tree's own subtree root, we replace it with a
79//! _grafted leaf_ digest that incorporates both the bitmap chunk and the ops subtree root:
80//!
81//! ```text
82//! grafted_leaf = hash(bitmap_chunk || ops_subtree_root)   // non-zero chunk
83//! grafted_leaf = ops_subtree_root                         // all-zero chunk (identity)
84//! ```
85//!
86//! The all-zero identity means that for pruned regions (where every operation is inactive), the
87//! grafted tree is structurally identical to the ops tree at and above the grafting height.
88//!
89//! Above the grafting height, internal nodes use standard hashing over the grafted leaves. Below
90//! the grafting height, the ops tree is unchanged.
91//!
92//! Not every complete chunk is graftable. A chunk is _graftable_ when its height-`h` ancestor
93//! exists as a single node in the ops tree. In MMR every complete chunk is immediately graftable.
94//! In MMB, delayed merges can leave a chunk bit-complete before its height-`h` ancestor is born;
95//! that chunk is _pending_ and its digest is hashed directly into the canonical root until the
96//! merge happens, at which point it migrates into the grafted tree. See [Pending and partial
97//! chunks](#pending-and-partial-chunks).
98//!
99//! ## Example
100//!
101//! Consider 8 operations with `N = 1` (8-bit chunks, so `h = log2(8) = 3`). But to illustrate the
102//! structure more clearly, let's use a smaller example: 8 operations with chunk size 4 bits (`h =
103//! 2`), yielding 2 complete bitmap chunks:
104//!
105//! ```text
106//! Ops tree positions (8 leaves):
107//!
108//!   Height
109//!     3              14                    <-- peak: digest commits to ops tree and bitmap chunks
110//!                  /    \
111//!                 /      \
112//!                /        \
113//!     2  [G]    6          13    [G]       <-- grafting height: grafted leaves
114//!             /   \      /    \
115//!     1      2     5    9     12           <-- below grafting height: pure ops tree nodes
116//!           / \   / \  / \   /  \
117//!     0    0   1 3   4 7  8 10  11
118//!          ^           ^
119//!          |           |
120//!      ops 0-3     ops 4-7
121//!      chunk 0     chunk 1
122//! ```
123//!
124//! Positions 6 and 13 are at the grafting height. Their digests are:
125//! - `pos 6: hash(chunk_0 || ops_subtree_root(pos 6))`
126//! - `pos 13: hash(chunk_1 || ops_subtree_root(pos 13))`
127//!
128//! Position 14 (above grafting height) is a standard internal node:
129//! - `pos 14: hash(14 || digest(pos 6) || digest(pos 13))`
130//!
131//! The grafted tree stores positions 6, 13, and 14. The ops tree stores everything below (positions
132//! 0-5 and 7-12). Together they form a single virtual Merkle structure whose root authenticates
133//! both the operations and their activity status.
134//!
135//! ## Proof generation and verification
136//!
137//! To prove that operation _i_ is active, we provide:
138//! 1. An inclusion proof for the operation's leaf, using the virtual (grafted) storage.
139//! 2. The bitmap chunk containing bit _i_.
140//!
141//! The verifier (see `grafting::Verifier`) walks the proof from leaf to root. Below the grafting
142//! height, it uses standard hashing. At the grafting height, it detects the boundary and
143//! reconstructs the grafted leaf from the chunk and the ops subtree root. For non-zero chunks the
144//! grafted leaf is `hash(chunk || ops_subtree_root)`; for all-zero chunks the grafted leaf is the
145//! ops subtree root itself (identity optimization -- see `grafting::Verifier::node`). Above the
146//! grafting height, it resumes standard hashing. If the reconstructed root matches the expected
147//! root and bit _i_ is set in the chunk, the operation is proven active.
148//!
149//! This is a single proof path, not two independent ones -- the bitmap chunk is embedded in the
150//! proof verification at the grafting boundary.
151//!
152//! Range proofs over a window that includes the trailing pending or partial chunk additionally
153//! carry that chunk's digest in the proof and have the verifier re-derive it from the supplied
154//! chunk bytes. Graftable-chunk reconstruction is always single-peak: the height-`gh` ancestor
155//! exists in the ops tree and yields a single grafted leaf to fold against the bitmap chunk.
156//!
157//! ## Pending and partial chunks
158//!
159//! Two kinds of bitmap chunk are not grafted into the overlay at height `gh`; instead, their
160//! digests are hashed into the canonical root directly alongside `ops_root` and `grafted_root`.
161//!
162//! - **Partial chunk.** The trailing bitmap chunk is usually incomplete (fewer than `N * 8` bits).
163//!   It has no grafted leaf because no corresponding subtree exists in the ops tree.
164//!
165//! - **Pending chunk.** A chunk whose bits are complete but whose height-`gh` ancestor has not yet
166//!   been born in the ops tree. Its leaves are split across multiple sub-`gh` peaks, so there is no
167//!   single ops node to graft onto. The chunk is _deferred_ until the merge happens, at which point
168//!   it migrates into the grafted tree.
169//!
170//! Pending chunks only arise in families with delayed merges (MMB). Define `birth_chunk_0` as the
171//! value of `ops_leaves` at which chunk 0's height-`gh` ancestor is first born in the ops tree.
172//! Then chunk `i` is graftable when `ops_leaves >= i * 2^gh + birth_chunk_0`.
173//!
174//! - In MMR, `birth_chunk_0 = 2^gh`, so a chunk is graftable the moment it is bit-complete and
175//!   pending chunks never exist.
176//! - In MMB, `birth_chunk_0 = 3 * 2^(gh-1) - 1`, which exceeds the chunk size `2^gh` by `2^(gh-1) -
177//!   1`. That gap is the **pending window** for chunk `i`: ops_leaves values for which chunk `i` is
178//!   bit-complete but not yet graftable.
179//!
180//! The pending window is strictly narrower than one chunk stride, so **at most one chunk is pending
181//! at any time**. The pending and partial chunks are independent: at `gh >= 3` both can be present;
182//! at `gh == 1` the pending window is empty and only a partial chunk is ever present.
183//!
184//! ### Chunk lifecycle (MMB)
185//!
186//! ```text
187//!                       ops_leaves N grows --->
188//!
189//!   chunk i state:    incomplete    pending          graftable
190//!                    (bits being   (bits set;       (h=gh ancestor born;
191//!                     appended)    ancestor not      chunk grafted onto
192//!                                  yet born)         that ops node)
193//!
194//!   N reaches:                    (i+1)*2^gh     i*2^gh + birth_chunk_0
195//!                                 ^                  ^
196//!                                 |                  |
197//!                                 chunk completes    chunk becomes graftable
198//!
199//!   Where chunk i's bytes live:
200//!     incomplete -> canonical root, as partial_chunk_digest
201//!     pending    -> canonical root, as pending_chunk_digest
202//!     graftable  -> grafted tree
203//!
204//!   At any moment, the canonical root may include zero, one, or both of {pending, partial};
205//!   the grafted tree commits to every chunk in [pruned_chunks, graftable_chunks).
206//! ```
207//!
208//! ### Worked example: pending + partial coexistence (MMB, `gh = 3`)
209//!
210//! For `gh = 3` MMB, `birth_chunk_0 = 3 * 2^(gh-1) - 1 = 11`. Chunk 0 fills at `N = 8` but
211//! does not activate until `N = 11`. The pending window is `N` in `[8, 10]`.
212//!
213//! ```text
214//!   Snapshot at N = 10 (chunk 0 bit-complete; chunk 1 has 2 of 8 bits):
215//!
216//!     chunks (8 bits each)     chunk 0       chunk 1
217//!     ops index range          0..7          8..15
218//!     bits set                 11111111      11
219//!     state                    pending       incomplete
220//!
221//!     graftable_chunks = 0      (chunk 0's h=3 ancestor is not yet born)
222//!     grafted_root           no graftable substitutions; equals ops-tree bagged root
223//!     pending_chunk_digest   = H(chunk_0_bytes)
224//!     partial_chunk_digest   = (next_bit = 2, H(chunk_1_bytes))
225//!     canonical_root         = hash(ops_root || grafted_root || pending
226//!                                                            || next_bit || partial)
227//!
228//!   At N = 11, chunk 0's h=3 ancestor is born:
229//!     graftable_chunks becomes 1, the pending slot is dropped, chunk 0 migrates
230//!     into the grafted tree.
231//! ```
232//!
233//! See [Root structure](#root-structure) below for the canonical root layout.
234//!
235//! ## Incremental updates
236//!
237//! When operations are added or bits change (e.g. an operation becomes inactive during floor
238//! raising), only the affected chunks are marked "dirty". During `merkleize`, only dirty grafted
239//! leaves are recomputed and their ancestors are propagated upward through the cache. This avoids
240//! recomputing the entire grafted tree.
241//!
242//! ## Pruning
243//!
244//! Old bitmap chunks (below the inactivity floor) can be pruned. Before pruning, the grafted tree's
245//! peak digests covering the pruned region are persisted to metadata as "pinned nodes". On
246//! recovery, these pinned nodes are loaded and serve as opaque siblings during upward propagation,
247//! allowing the grafted tree to be rebuilt without the pruned chunks.
248//!
249//! ### Delayed-merge settlement
250//!
251//! For families with delayed merges (e.g. MMB), pruning is slightly more conservative than the
252//! inactivity floor alone would allow.
253//!
254//! The grafted root is computed by iterating the _ops tree's_ peaks and looking up the
255//! corresponding nodes in the grafted tree. After pruning, only the grafted tree's pinned peaks are
256//! available in the pruned region; interior nodes (including individual grafted leaves) are
257//! discarded. If the ops tree has a peak that maps to a discarded grafted node, root computation
258//! fails.
259//!
260//! In an MMR the ops tree's peaks within the pruned region always coincide with the grafted tree's
261//! pinned peaks, so this is never a problem. In an MMB, delayed merges cause the ops tree's peak
262//! structure to lag behind: a chunk pair's parent node at height `gh+1` is not created until some
263//! number of leaves after the pair's last leaf. Until that merge happens, the ops tree still has
264//! individual height-`gh` peaks for each chunk in the pair, and those map to grafted _leaves_
265//! (height 0 in the grafted tree), which are not pinned peaks.
266//!
267//! To avoid this, [`Db::prune`](db::Db::prune) defers bitmap pruning for chunks whose chunk-pair
268//! parent has not yet been born in the ops tree (see `Db::sync_boundary`). Once the parent is born,
269//! every ops peak within the pruned region is at height `gh+1` or above, and maps to a pinned peak
270//! or an ancestor of pinned peaks that can be reconstructed by hashing children (see
271//! `grafting::Storage::reconstruct_grafted_node`).
272//!
273//! The same birth threshold also defines a _rewind floor_: rewinding the database to a size where
274//! the chunk-pair parent has not been born would re-expose the individual ops peaks and break
275//! reconstruction. [`Db::rewind`](db::Db::rewind) rejects targets below this floor. The floor is a
276//! pure function of the pruned chunk count and the family geometry, so it does not need to be
277//! persisted; it is recomputed on startup from the pruned chunk count stored in metadata.
278//!
279//! The pruning lag is small: at most `2^(gh+1) - 1` ops beyond the chunk boundary (just under 2
280//! chunks for the default chunk size).
281//!
282//! # Root structure
283//!
284//! The canonical root of a `current` database is:
285//!
286//! ```text
287//! root = hash(
288//!     ops_root
289//!     || grafted_root
290//!     [|| pending_chunk_digest]
291//!     [|| next_bit || partial_chunk_digest]
292//! )
293//! ```
294//!
295//! Components:
296//!
297//! - **Ops root**: The root of the raw operations tree (the inner [crate::qmdb::any] database's
298//!   root). Used for state sync, where a client downloads operations and verifies each batch
299//!   against this root using ops-tree range proofs.
300//!
301//! - **Grafted root**: The bagged root of the virtual overlay (see `grafting::Storage`) that shares
302//!   the ops-tree topology but substitutes graftable chunks at height `gh`. When no chunks are
303//!   graftable, `grafted_root` still reflects the ops-tree peak structure. Used for proofs about
304//!   operation values and their activity status. See [RangeProof](proof::RangeProof) and
305//!   [OperationProof](proof::OperationProof).
306//!
307//! - **Pending chunk digest** (optional): `H(pending_chunk_bytes)` when a chunk's bits are complete
308//!   but its height-`gh` ancestor has not yet been born in the ops tree. Absent in MMR and in the
309//!   steady state of MMB.
310//!
311//! - **Partial chunk** (optional): When the bitmap length is not chunk-aligned, the trailing
312//!   incomplete chunk's digest and bit count are folded in.
313//!
314//! Pending and partial slots are independent. When both are present, pending hashes in before
315//! partial.
316//!
317//! The canonical root is returned by [Db](db::Db)`::`[root()](db::Db::root). The ops root is
318//! returned by the `sync::Database` trait's `root()` method, since the sync engine verifies batches
319//! against the ops root, not the canonical root.
320//!
321//! For state sync, the sync engine targets the ops root and verifies each batch against it. Callers
322//! verifying ops proofs directly should use [`crate::qmdb::hasher`]. After sync, the bitmap and
323//! grafted tree are reconstructed deterministically from the operations, and the canonical root is
324//! computed. [proof::OpsRootWitness] can be used to validate that a particular ops root is
325//! committed by a trusted canonical root; the sync engine does not perform this check itself.
326
327use crate::{
328    index::Factory as IndexFactory,
329    journal::{
330        authenticated::Inner,
331        contiguous::{fixed::Config as FConfig, variable::Config as VConfig},
332    },
333    merkle::{self, full::Config as MerkleConfig, Location},
334    qmdb::{
335        self,
336        any::{
337            self,
338            operation::{Operation, Update},
339            Config as AnyConfig,
340        },
341        bitmap::Shared,
342        operation::Committable,
343    },
344    translator::Translator,
345    Context,
346};
347use commonware_codec::{CodecShared, FixedSize};
348use commonware_cryptography::Hasher;
349use commonware_parallel::Strategy;
350use commonware_utils::{bitmap::Prunable as BitMap, sync::AsyncMutex};
351use std::sync::Arc;
352
353pub mod batch;
354pub mod db;
355pub mod grafting;
356
357pub mod ordered;
358pub mod proof;
359pub(crate) mod sync;
360pub mod unordered;
361
362use self::db::Metrics;
363
364/// Configuration for a `Current` authenticated db.
365#[derive(Clone)]
366pub struct Config<T: Translator, J, S: Strategy> {
367    /// Configuration for the Merkle structure backing the authenticated journal.
368    pub merkle_config: MerkleConfig<S>,
369
370    /// Configuration for the operations log journal.
371    pub journal_config: J,
372
373    /// The name of the storage partition used for grafted tree metadata.
374    pub grafted_metadata_partition: String,
375
376    /// The translator used by the compressed index.
377    pub translator: T,
378}
379
380impl<T: Translator, J, S: Strategy> From<Config<T, J, S>> for AnyConfig<T, J, S> {
381    fn from(cfg: Config<T, J, S>) -> Self {
382        Self {
383            merkle_config: cfg.merkle_config,
384            journal_config: cfg.journal_config,
385            translator: cfg.translator,
386        }
387    }
388}
389
390/// Configuration for a `Current` authenticated db with fixed-size values.
391pub type FixedConfig<T, S> = Config<T, FConfig, S>;
392
393/// Configuration for a `Current` authenticated db with variable-sized values.
394pub type VariableConfig<T, C, S> = Config<T, VConfig<C>, S>;
395
396/// Initialize a `Current` authenticated db from the given config.
397pub(super) async fn init<F, E, U, H, T, I, J, const N: usize, S>(
398    context: E,
399    config: Config<T, J::Config, S>,
400) -> Result<db::Db<F, E, J, I, H, U, N, S>, crate::qmdb::Error<F>>
401where
402    F: merkle::Graftable,
403    E: Context,
404    U: Update + Send + Sync,
405    H: Hasher,
406    T: Translator,
407    I: IndexFactory<T, Value = Location<F>>,
408    J: Inner<E, Item = Operation<F, U>>,
409    S: Strategy,
410    Operation<F, U>: Committable + CodecShared,
411{
412    // TODO: Re-evaluate assertion placement after `generic_const_exprs` is stable.
413    const {
414        // A compile-time assertion that the chunk size is some multiple of digest size. A multiple
415        // of 1 is optimal with respect to proof size, but a higher multiple allows for a smaller
416        // (RAM resident) merkle tree over the structure.
417        assert!(
418            N.is_multiple_of(H::Digest::SIZE),
419            "chunk size must be some multiple of the digest size",
420        );
421        // A compile-time assertion that chunk size is a power of 2, which is necessary to allow
422        // the status bitmap tree to be aligned with the underlying operations MMR.
423        assert!(N.is_power_of_two(), "chunk size must be a power of 2");
424    }
425
426    let strategy = config.merkle_config.strategy.clone();
427    let metadata_partition = config.grafted_metadata_partition.clone();
428
429    // Load bitmap metadata (pruned_chunks + pinned nodes for the grafted tree).
430    let (metadata, pruned_chunks, pinned_nodes) =
431        db::init_metadata(context.child("metadata"), &metadata_partition).await?;
432
433    // Pre-build the activity-status bitmap with the known pruned-chunk count from grafted
434    // metadata, then hand it to `any` which becomes the sole owner. `any::init_with_bitmap`
435    // populates it during snapshot rebuild.
436    let bitmap = BitMap::<N>::new_with_pruned_chunks(pruned_chunks)
437        .map_err(|_| crate::qmdb::Error::<F>::DataCorrupted("pruned chunks overflow"))?;
438    let bitmap = Arc::new(Shared::<N>::new(bitmap));
439
440    let any = any::init_with_bitmap(context.child("any"), config.into(), Some(bitmap)).await?;
441
442    // Build the grafted tree from the bitmap and ops tree.
443    let hasher = qmdb::hasher::<H>();
444    let ops_size = any.log.merkle.size();
445    let ops_leaves = crate::merkle::Location::<F>::try_from(ops_size)?;
446    let grafted_tree = db::build_grafted_tree::<F, H, S, N>(
447        &hasher,
448        any.bitmap.as_ref(),
449        &pinned_nodes,
450        &any.log.merkle,
451        ops_leaves,
452        &strategy,
453    )
454    .await?;
455
456    // Compute and cache the root.
457    let storage = grafting::Storage::new(
458        &grafted_tree,
459        grafting::height::<N>(),
460        &any.log.merkle,
461        hasher.clone(),
462    );
463    let partial_chunk = db::partial_chunk(any.bitmap.as_ref());
464    let ops_root = any.root();
465    let root = db::compute_db_root(
466        &hasher,
467        any.bitmap.as_ref(),
468        &storage,
469        ops_leaves,
470        partial_chunk,
471        any.inactivity_floor_loc,
472        &ops_root,
473    )
474    .await?;
475
476    let metrics = Metrics::new(context);
477    let db = db::Db {
478        any,
479        grafted_tree,
480        metadata: AsyncMutex::new(metadata),
481        strategy,
482        root,
483        metrics,
484    };
485    db.update_metrics();
486    Ok(db)
487}
488
489/// Extension trait for Current QMDB types that exposes bitmap information for testing.
490#[cfg(any(test, feature = "test-traits"))]
491pub trait BitmapPrunedBits {
492    /// Returns the number of bits that have been pruned from the bitmap.
493    fn pruned_bits(&self) -> u64;
494
495    /// Returns the value of the bit at the given index.
496    fn get_bit(&self, index: u64) -> bool;
497
498    /// Returns the position of the oldest retained bit.
499    fn oldest_retained(&self) -> impl core::future::Future<Output = u64> + Send;
500}
501
502#[cfg(test)]
503pub mod tests {
504    //! Shared test utilities for Current QMDB variants.
505
506    pub use super::BitmapPrunedBits;
507    use super::{ordered, unordered, FConfig, FixedConfig, MerkleConfig, VConfig, VariableConfig};
508    use crate::{
509        merkle::{self, mmb, mmr, Bagging::ForwardFold},
510        qmdb::{
511            self,
512            any::{
513                test::colliding_digest,
514                traits::{DbAny, MerkleizedBatch as _, UnmerkleizedBatch as _},
515            },
516            store::tests::{TestKey, TestValue},
517        },
518        translator::Translator,
519    };
520    use commonware_parallel::Sequential;
521    use commonware_runtime::{
522        buffer::paged::CacheRef,
523        deterministic::{self, Context},
524        BufferPooler, Runner as _, Supervisor as _,
525    };
526    use commonware_utils::{bitmap::Readable, NZUsize, NZU16, NZU64};
527    use core::future::Future;
528    use rand::{rngs::StdRng, RngCore, SeedableRng};
529    use std::{
530        num::{NonZeroU16, NonZeroUsize},
531        sync::Arc,
532    };
533    use tracing::warn;
534
535    type Error<F> = crate::qmdb::Error<F>;
536    type Location<F> = merkle::Location<F>;
537    type WriteVec<F, C> = Vec<(<C as DbAny<F>>::Key, Option<<C as DbAny<F>>::Value>)>;
538
539    // Janky page & cache sizes to exercise boundary conditions.
540    const PAGE_SIZE: NonZeroU16 = NZU16!(88);
541    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(8);
542
543    /// Shared config factory for fixed-value Current QMDB tests.
544    pub(crate) fn fixed_config<T: Translator + Default>(
545        partition_prefix: &str,
546        pooler: &impl BufferPooler,
547    ) -> FixedConfig<T, Sequential> {
548        let page_cache = CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE);
549        FixedConfig {
550            merkle_config: MerkleConfig {
551                journal_partition: format!("{partition_prefix}-journal-partition"),
552                metadata_partition: format!("{partition_prefix}-metadata-partition"),
553                items_per_blob: NZU64!(11),
554                write_buffer: NZUsize!(1024),
555                strategy: Sequential,
556                page_cache: page_cache.clone(),
557            },
558            journal_config: FConfig {
559                partition: format!("{partition_prefix}-partition-prefix"),
560                items_per_blob: NZU64!(7),
561                page_cache,
562                write_buffer: NZUsize!(1024),
563            },
564            grafted_metadata_partition: format!("{partition_prefix}-grafted-metadata-partition"),
565            translator: T::default(),
566        }
567    }
568
569    /// Shared config factory for variable-value Current QMDB tests with unit codec config.
570    pub(crate) fn variable_config<T: Translator + Default>(
571        partition_prefix: &str,
572        pooler: &impl BufferPooler,
573    ) -> VariableConfig<T, ((), ()), Sequential> {
574        let page_cache = CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE);
575        VariableConfig {
576            merkle_config: MerkleConfig {
577                journal_partition: format!("{partition_prefix}-journal-partition"),
578                metadata_partition: format!("{partition_prefix}-metadata-partition"),
579                items_per_blob: NZU64!(11),
580                write_buffer: NZUsize!(1024),
581                strategy: Sequential,
582                page_cache: page_cache.clone(),
583            },
584            journal_config: VConfig {
585                partition: format!("{partition_prefix}-partition-prefix"),
586                items_per_section: NZU64!(7),
587                compression: None,
588                codec_config: ((), ()),
589                page_cache,
590                write_buffer: NZUsize!(1024),
591            },
592            grafted_metadata_partition: format!("{partition_prefix}-grafted-metadata-partition"),
593            translator: T::default(),
594        }
595    }
596
597    /// Commit a set of writes as a single batch.
598    async fn commit_writes<F: merkle::Graftable, C: DbAny<F>>(
599        db: &mut C,
600        writes: impl IntoIterator<Item = (C::Key, Option<<C as DbAny<F>>::Value>)>,
601    ) -> Result<(), Error<F>> {
602        let mut batch = db.new_batch();
603        for (k, v) in writes {
604            batch = batch.write(k, v);
605        }
606        let merkleized = batch.merkleize(db, None).await?;
607        db.apply_batch(merkleized).await?;
608        db.commit().await?;
609        Ok(())
610    }
611
612    /// Apply random operations to the given db, committing them (randomly and at the end) only if
613    /// `commit_changes` is true. Returns the db; callers should commit if needed.
614    ///
615    /// Returns a boxed future to prevent stack overflow when monomorphized across many DB variants.
616    async fn apply_random_ops_inner<F, C>(
617        num_elements: u64,
618        commit_changes: bool,
619        rng_seed: u64,
620        mut db: C,
621    ) -> Result<C, Error<F>>
622    where
623        F: merkle::Graftable,
624        C: DbAny<F>,
625        C::Key: TestKey,
626        <C as DbAny<F>>::Value: TestValue,
627    {
628        // Log the seed with high visibility to make failures reproducible.
629        warn!("rng_seed={}", rng_seed);
630        let mut rng = StdRng::seed_from_u64(rng_seed);
631
632        // First loop: all initial writes in one batch.
633        let writes: Vec<_> = (0u64..num_elements)
634            .map(|i| {
635                let k = TestKey::from_seed(i);
636                let v = TestValue::from_seed(rng.next_u64());
637                (k, Some(v))
638            })
639            .collect();
640        if commit_changes {
641            commit_writes(&mut db, writes).await?;
642        }
643
644        // Randomly update / delete them. We use a delete frequency that is 1/7th of the update
645        // frequency. Accumulate writes and commit periodically.
646        let mut pending: WriteVec<F, C> = Vec::new();
647        for _ in 0u64..num_elements * 10 {
648            let rand_key = TestKey::from_seed(rng.next_u64() % num_elements);
649            if rng.next_u32() % 7 == 0 {
650                pending.push((rand_key, None));
651                continue;
652            }
653            let v = TestValue::from_seed(rng.next_u64());
654            pending.push((rand_key, Some(v)));
655            if commit_changes && rng.next_u32() % 20 == 0 {
656                commit_writes(&mut db, pending.drain(..)).await?;
657            }
658        }
659        if commit_changes {
660            commit_writes(&mut db, pending).await?;
661        }
662        Ok(db)
663    }
664
665    pub fn apply_random_ops<F, C>(
666        num_elements: u64,
667        commit_changes: bool,
668        rng_seed: u64,
669        db: C,
670    ) -> std::pin::Pin<Box<dyn Future<Output = Result<C, Error<F>>>>>
671    where
672        F: merkle::Graftable + 'static,
673        C: DbAny<F> + 'static,
674        C::Key: TestKey,
675        <C as DbAny<F>>::Value: TestValue,
676    {
677        Box::pin(apply_random_ops_inner::<F, C>(
678            num_elements,
679            commit_changes,
680            rng_seed,
681            db,
682        ))
683    }
684
685    /// Run `test_build_random_close_reopen` against a database factory.
686    ///
687    /// The factory should return a database when given a context and partition name.
688    /// The factory will be called multiple times to test reopening.
689    pub fn test_build_random_close_reopen<M, C, F, Fut>(mut open_db: F)
690    where
691        M: merkle::Graftable + 'static,
692        C: DbAny<M> + 'static,
693        C::Key: TestKey,
694        <C as DbAny<M>>::Value: TestValue,
695        F: FnMut(Context, String) -> Fut + Clone,
696        Fut: Future<Output = C>,
697    {
698        const ELEMENTS: u64 = 1000;
699
700        let executor = deterministic::Runner::default();
701        let mut open_db_clone = open_db.clone();
702        let state1 = executor.start(|mut context| async move {
703            let partition = "build-random".to_string();
704            let rng_seed = context.next_u64();
705            let mut db: C = open_db_clone(context.child("first"), partition.clone()).await;
706            db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed, db)
707                .await
708                .unwrap();
709            let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
710            db.apply_batch(merkleized).await.unwrap();
711            db.sync().await.unwrap();
712
713            // Drop and reopen the db
714            let root = db.root();
715            drop(db);
716            let db: C = open_db_clone(context.child("second"), partition).await;
717
718            // Ensure the root matches
719            assert_eq!(db.root(), root);
720
721            db.destroy().await.unwrap();
722            context.auditor().state()
723        });
724
725        // Run again to verify determinism
726        let executor = deterministic::Runner::default();
727        let state2 = executor.start(|mut context| async move {
728            let partition = "build-random".to_string();
729            let rng_seed = context.next_u64();
730            let mut db: C = open_db(context.child("first"), partition.clone()).await;
731            db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed, db)
732                .await
733                .unwrap();
734            let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
735            db.apply_batch(merkleized).await.unwrap();
736            db.sync().await.unwrap();
737
738            let root = db.root();
739            drop(db);
740            let db: C = open_db(context.child("second"), partition).await;
741            assert_eq!(db.root(), root);
742
743            db.destroy().await.unwrap();
744            context.auditor().state()
745        });
746
747        assert_eq!(state1, state2);
748    }
749
750    /// Run `test_simulate_write_failures` against a database factory.
751    ///
752    /// This test builds a random database and simulates recovery from different types of
753    /// failure scenarios.
754    pub fn test_simulate_write_failures<M, C, F, Fut>(mut open_db: F)
755    where
756        M: merkle::Graftable + 'static,
757        C: DbAny<M> + 'static,
758        C::Key: TestKey,
759        <C as DbAny<M>>::Value: TestValue,
760        F: FnMut(Context, String) -> Fut + Clone,
761        Fut: Future<Output = C>,
762    {
763        const ELEMENTS: u64 = 1000;
764
765        let executor = deterministic::Runner::default();
766        // Box the future to prevent stack overflow when monomorphized across DB variants.
767        executor.start(|mut context| {
768            Box::pin(async move {
769                let partition = "build-random-fail-commit".to_string();
770                let rng_seed = context.next_u64();
771                let mut db: C = open_db(context.child("first"), partition.clone()).await;
772                db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed, db)
773                    .await
774                    .unwrap();
775                commit_writes(&mut db, []).await.unwrap();
776                let committed_root = db.root();
777                let committed_op_count = db.bounds().await.end;
778                db.prune(db.sync_boundary().await).await.unwrap();
779
780                // Perform more random operations without committing any of them.
781                let db = apply_random_ops::<M, C>(ELEMENTS, false, rng_seed + 1, db)
782                    .await
783                    .unwrap();
784
785                // SCENARIO #1: Simulate a crash that happens before any writes. Upon reopening, the
786                // state of the DB should be as of the last commit.
787                drop(db);
788                let db: C = open_db(
789                    context.child("scenario").with_attribute("index", 1),
790                    partition.clone(),
791                )
792                .await;
793                assert_eq!(db.root(), committed_root);
794                assert_eq!(db.bounds().await.end, committed_op_count);
795
796                // Re-apply the exact same operations, this time committed.
797                let db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed + 1, db)
798                    .await
799                    .unwrap();
800
801                // SCENARIO #2: Simulate a crash that happens after the any db has been committed, but
802                // before sync/prune is called. We do this by dropping the db without calling
803                // sync or prune.
804                let committed_op_count = db.bounds().await.end;
805                drop(db);
806
807                // We should be able to recover, so the root should differ from the previous commit, and
808                // the op count should be greater than before.
809                let db: C = open_db(
810                    context.child("scenario").with_attribute("index", 2),
811                    partition.clone(),
812                )
813                .await;
814                let scenario_2_root = db.root();
815
816                // To confirm the second committed hash is correct we'll re-build the DB in a new
817                // partition, but without any failures. They should have the exact same state.
818                let fresh_partition = "build-random-fail-commit-fresh".to_string();
819                let mut db: C = open_db(context.child("fresh"), fresh_partition.clone()).await;
820                db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed, db)
821                    .await
822                    .unwrap();
823                commit_writes(&mut db, []).await.unwrap();
824                db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed + 1, db)
825                    .await
826                    .unwrap();
827                db.prune(db.sync_boundary().await).await.unwrap();
828                // State from scenario #2 should match that of a successful commit.
829                assert_eq!(db.bounds().await.end, committed_op_count);
830                assert_eq!(db.root(), scenario_2_root);
831
832                db.destroy().await.unwrap();
833            })
834        });
835    }
836
837    /// Run `test_different_pruning_delays_same_root` against a database factory.
838    ///
839    /// This test verifies that pruning operations do not affect the root hash - two databases
840    /// with identical operations but different pruning schedules should have the same root.
841    pub fn test_different_pruning_delays_same_root<M, C, F, Fut>(mut open_db: F)
842    where
843        M: merkle::Graftable,
844        C: DbAny<M>,
845        C::Key: TestKey,
846        <C as DbAny<M>>::Value: TestValue,
847        F: FnMut(Context, String) -> Fut + Clone,
848        Fut: Future<Output = C>,
849    {
850        const NUM_OPERATIONS: u64 = 1000;
851
852        let executor = deterministic::Runner::default();
853        let mut open_db_clone = open_db.clone();
854        executor.start(|context| async move {
855            // Create two databases that are identical other than how they are pruned.
856            let mut db_no_pruning: C =
857                open_db_clone(context.child("no_pruning"), "no-pruning-test".into()).await;
858            let mut db_pruning: C = open_db(context.child("pruning"), "pruning-test".into()).await;
859
860            // Apply identical operations to both databases, but only prune one.
861            // Accumulate writes between commits.
862            let mut pending_no_pruning: WriteVec<M, C> = Vec::new();
863            let mut pending_pruning: WriteVec<M, C> = Vec::new();
864            for i in 0..NUM_OPERATIONS {
865                let key: C::Key = TestKey::from_seed(i);
866                let value: <C as DbAny<M>>::Value = TestValue::from_seed(i * 1000);
867
868                pending_no_pruning.push((key, Some(value.clone())));
869                pending_pruning.push((key, Some(value)));
870
871                // Commit periodically
872                if i % 50 == 49 {
873                    commit_writes(&mut db_no_pruning, pending_no_pruning.drain(..))
874                        .await
875                        .unwrap();
876                    commit_writes(&mut db_pruning, pending_pruning.drain(..))
877                        .await
878                        .unwrap();
879                    db_pruning
880                        .prune(db_no_pruning.sync_boundary().await)
881                        .await
882                        .unwrap();
883                }
884            }
885
886            // Final commit for remaining writes.
887            commit_writes(&mut db_no_pruning, pending_no_pruning)
888                .await
889                .unwrap();
890            commit_writes(&mut db_pruning, pending_pruning)
891                .await
892                .unwrap();
893
894            // Get roots from both databases - they should match
895            let root_no_pruning = db_no_pruning.root();
896            let root_pruning = db_pruning.root();
897            assert_eq!(root_no_pruning, root_pruning);
898
899            // Also verify inactivity floors match
900            assert_eq!(
901                db_no_pruning.inactivity_floor_loc().await,
902                db_pruning.inactivity_floor_loc().await
903            );
904
905            db_no_pruning.destroy().await.unwrap();
906            db_pruning.destroy().await.unwrap();
907        });
908    }
909
910    /// Run `test_sync_persists_bitmap_pruning_boundary` against a database factory.
911    ///
912    /// This test verifies that calling `sync()` persists the bitmap pruning boundary that was
913    /// set during `commit()`. If `sync()` didn't call `write_pruned`, the
914    /// `pruned_bits()` count would be 0 after reopen instead of the expected value.
915    pub fn test_sync_persists_bitmap_pruning_boundary<M, C, F, Fut>(mut open_db: F)
916    where
917        M: merkle::Graftable + 'static,
918        C: DbAny<M> + BitmapPrunedBits + 'static,
919        C::Key: TestKey,
920        <C as DbAny<M>>::Value: TestValue,
921        F: FnMut(Context, String) -> Fut + Clone,
922        Fut: Future<Output = C>,
923    {
924        const ELEMENTS: u64 = 500;
925
926        let executor = deterministic::Runner::default();
927        let mut open_db_clone = open_db.clone();
928        executor.start(|mut context| async move {
929            let partition = "sync-bitmap-pruning".to_string();
930            let rng_seed = context.next_u64();
931            let mut db: C = open_db_clone(context.child("first"), partition.clone()).await;
932
933            // Apply random operations with commits to advance the inactivity floor.
934            db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed, db).await.unwrap();
935            let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
936            db.apply_batch(merkleized).await.unwrap();
937
938            // Prune to flatten bitmap layers and advance pruned_chunks.
939            db.prune(db.sync_boundary().await).await.unwrap();
940
941            let pruned_bits_before = db.pruned_bits();
942            warn!(
943                "pruned_bits_before={}, inactivity_floor={}, op_count={}",
944                pruned_bits_before,
945                *db.inactivity_floor_loc().await,
946                *db.bounds().await.end
947            );
948
949            // Verify we actually have some pruning (otherwise the test is meaningless).
950            assert!(
951                pruned_bits_before > 0,
952                "Expected bitmap to have pruned bits after prune()"
953            );
954
955            // Call sync() to persist the bitmap pruning boundary.
956            db.sync().await.unwrap();
957
958            // Record the root before dropping.
959            let root_before = db.root();
960            drop(db);
961
962            // Reopen the database.
963            let db: C = open_db(context.child("second"), partition).await;
964
965            // The pruned bits count should match. If sync() didn't persist the bitmap pruned
966            // state, this would be 0.
967            let pruned_bits_after = db.pruned_bits();
968            warn!("pruned_bits_after={}", pruned_bits_after);
969
970            assert_eq!(
971                pruned_bits_after, pruned_bits_before,
972                "Bitmap pruned bits mismatch after reopen - sync() may not have called write_pruned()"
973            );
974
975            // Also verify the root matches.
976            assert_eq!(db.root(), root_before);
977
978            db.destroy().await.unwrap();
979        });
980    }
981
982    /// Run `test_current_db_build_big` against a database factory.
983    ///
984    /// This test builds a database with 1000 keys, updates some, deletes some, and verifies that
985    /// the final state matches an independently computed HashMap. It also verifies that the state
986    /// persists correctly after close and reopen.
987    pub fn test_current_db_build_big<M, C, F, Fut>(mut open_db: F)
988    where
989        M: merkle::Graftable,
990        C: DbAny<M>,
991        C::Key: TestKey,
992        <C as DbAny<M>>::Value: TestValue,
993        F: FnMut(Context, String) -> Fut + Clone,
994        Fut: Future<Output = C>,
995    {
996        const ELEMENTS: u64 = 1000;
997
998        let executor = deterministic::Runner::default();
999        let mut open_db_clone = open_db.clone();
1000        executor.start(|context| async move {
1001            let mut db: C = open_db_clone(context.child("first"), "build-big".into()).await;
1002
1003            let mut map = std::collections::HashMap::<C::Key, <C as DbAny<M>>::Value>::default();
1004
1005            // All creates, updates, and deletes in one batch.
1006            let mut batch = db.new_batch();
1007
1008            // Initial creates
1009            for i in 0u64..ELEMENTS {
1010                let k: C::Key = TestKey::from_seed(i);
1011                let v: <C as DbAny<M>>::Value = TestValue::from_seed(i * 1000);
1012                batch = batch.write(k, Some(v.clone()));
1013                map.insert(k, v);
1014            }
1015
1016            // Update every 3rd key
1017            for i in 0u64..ELEMENTS {
1018                if i % 3 != 0 {
1019                    continue;
1020                }
1021                let k: C::Key = TestKey::from_seed(i);
1022                let v: <C as DbAny<M>>::Value = TestValue::from_seed((i + 1) * 10000);
1023                batch = batch.write(k, Some(v.clone()));
1024                map.insert(k, v);
1025            }
1026
1027            // Delete every 7th key
1028            for i in 0u64..ELEMENTS {
1029                if i % 7 != 1 {
1030                    continue;
1031                }
1032                let k: C::Key = TestKey::from_seed(i);
1033                batch = batch.write(k, None);
1034                map.remove(&k);
1035            }
1036
1037            let merkleized = batch.merkleize(&db, None).await.unwrap();
1038            db.apply_batch(merkleized).await.unwrap();
1039
1040            // Sync and prune.
1041            db.sync().await.unwrap();
1042            db.prune(db.sync_boundary().await).await.unwrap();
1043
1044            // Record root before dropping.
1045            let root = db.root();
1046            db.sync().await.unwrap();
1047            drop(db);
1048
1049            // Reopen the db and verify it has exactly the same state.
1050            let db: C = open_db(context.child("second"), "build-big".into()).await;
1051            assert_eq!(root, db.root());
1052
1053            // Confirm the db's state matches that of the separate map we computed independently.
1054            for i in 0u64..ELEMENTS {
1055                let k: C::Key = TestKey::from_seed(i);
1056                if let Some(map_value) = map.get(&k) {
1057                    let Some(db_value) = db.get(&k).await.unwrap() else {
1058                        panic!("key not found in db: {k}");
1059                    };
1060                    assert_eq!(*map_value, db_value);
1061                } else {
1062                    assert!(db.get(&k).await.unwrap().is_none());
1063                }
1064            }
1065        });
1066    }
1067
1068    /// Run `test_stale_batch_side_effect_free` against a database factory.
1069    ///
1070    /// The stale batch must be rejected without mutating the committed state.
1071    pub fn test_stale_batch_side_effect_free<M, C, F, Fut>(mut open_db: F)
1072    where
1073        M: merkle::Graftable,
1074        C: DbAny<M>,
1075        C::Key: TestKey,
1076        <C as DbAny<M>>::Value: TestValue,
1077        F: FnMut(Context, String) -> Fut,
1078        Fut: Future<Output = C>,
1079    {
1080        let executor = deterministic::Runner::default();
1081        executor.start(|context| async move {
1082            let mut db: C = open_db(context.child("db"), "stale-side-effect-free".into()).await;
1083
1084            let key1 = <C::Key as TestKey>::from_seed(1);
1085            let key2 = <C::Key as TestKey>::from_seed(2);
1086            let value1 = <<C as DbAny<M>>::Value as TestValue>::from_seed(10);
1087            let value2 = <<C as DbAny<M>>::Value as TestValue>::from_seed(20);
1088
1089            let mut batch = db.new_batch();
1090            batch = batch.write(key1, Some(value1.clone()));
1091            let batch_a = batch.merkleize(&db, None).await.unwrap();
1092            let mut batch = db.new_batch();
1093            batch = batch.write(key2, Some(value2));
1094            let batch_b = batch.merkleize(&db, None).await.unwrap();
1095
1096            db.apply_batch(batch_a).await.unwrap();
1097            let expected_root = db.root();
1098            let expected_bounds = db.bounds().await;
1099            let expected_metadata = db.get_metadata().await.unwrap();
1100            assert_eq!(db.get(&key1).await.unwrap(), Some(value1.clone()));
1101            assert_eq!(db.get(&key2).await.unwrap(), None);
1102
1103            let result = db.apply_batch(batch_b).await;
1104            assert!(
1105                matches!(result, Err(Error::StaleBatch { .. })),
1106                "expected StaleBatch error, got {result:?}"
1107            );
1108            assert_eq!(db.root(), expected_root);
1109            assert_eq!(db.bounds().await, expected_bounds);
1110            assert_eq!(db.get_metadata().await.unwrap(), expected_metadata);
1111            assert_eq!(db.get(&key1).await.unwrap(), Some(value1));
1112            assert_eq!(db.get(&key2).await.unwrap(), None);
1113
1114            db.destroy().await.unwrap();
1115        });
1116    }
1117
1118    use crate::translator::OneCap;
1119    use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
1120    use commonware_macros::{test_group, test_traced};
1121
1122    type OrderedFixedDb =
1123        ordered::fixed::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 32, Sequential>;
1124    type OrderedVariableDb =
1125        ordered::variable::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 32, Sequential>;
1126    type UnorderedFixedDb =
1127        unordered::fixed::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 32, Sequential>;
1128    type UnorderedVariableDb = unordered::variable::Db<
1129        mmr::Family,
1130        Context,
1131        Digest,
1132        Digest,
1133        Sha256,
1134        OneCap,
1135        32,
1136        Sequential,
1137    >;
1138    type OrderedFixedP1Db = ordered::fixed::partitioned::Db<
1139        mmr::Family,
1140        Context,
1141        Digest,
1142        Digest,
1143        Sha256,
1144        OneCap,
1145        1,
1146        32,
1147        Sequential,
1148    >;
1149    type OrderedVariableP1Db = ordered::variable::partitioned::Db<
1150        mmr::Family,
1151        Context,
1152        Digest,
1153        Digest,
1154        Sha256,
1155        OneCap,
1156        1,
1157        32,
1158        Sequential,
1159    >;
1160    type UnorderedFixedP1Db = unordered::fixed::partitioned::Db<
1161        mmr::Family,
1162        Context,
1163        Digest,
1164        Digest,
1165        Sha256,
1166        OneCap,
1167        1,
1168        32,
1169        Sequential,
1170    >;
1171    type UnorderedVariableP1Db = unordered::variable::partitioned::Db<
1172        mmr::Family,
1173        Context,
1174        Digest,
1175        Digest,
1176        Sha256,
1177        OneCap,
1178        1,
1179        32,
1180        Sequential,
1181    >;
1182    type OrderedFixedP2Db = ordered::fixed::partitioned::Db<
1183        mmr::Family,
1184        Context,
1185        Digest,
1186        Digest,
1187        Sha256,
1188        OneCap,
1189        2,
1190        32,
1191        Sequential,
1192    >;
1193    type OrderedVariableP2Db = ordered::variable::partitioned::Db<
1194        mmr::Family,
1195        Context,
1196        Digest,
1197        Digest,
1198        Sha256,
1199        OneCap,
1200        2,
1201        32,
1202        Sequential,
1203    >;
1204    type UnorderedFixedP2Db = unordered::fixed::partitioned::Db<
1205        mmr::Family,
1206        Context,
1207        Digest,
1208        Digest,
1209        Sha256,
1210        OneCap,
1211        2,
1212        32,
1213        Sequential,
1214    >;
1215    type UnorderedVariableP2Db = unordered::variable::partitioned::Db<
1216        mmr::Family,
1217        Context,
1218        Digest,
1219        Digest,
1220        Sha256,
1221        OneCap,
1222        2,
1223        32,
1224        Sequential,
1225    >;
1226
1227    type OrderedFixedMmbDb =
1228        ordered::fixed::Db<mmb::Family, Context, Digest, Digest, Sha256, OneCap, 32, Sequential>;
1229    type OrderedVariableMmbDb =
1230        ordered::variable::Db<mmb::Family, Context, Digest, Digest, Sha256, OneCap, 32, Sequential>;
1231    type UnorderedFixedMmbDb =
1232        unordered::fixed::Db<mmb::Family, Context, Digest, Digest, Sha256, OneCap, 32, Sequential>;
1233    type UnorderedVariableMmbDb = unordered::variable::Db<
1234        mmb::Family,
1235        Context,
1236        Digest,
1237        Digest,
1238        Sha256,
1239        OneCap,
1240        32,
1241        Sequential,
1242    >;
1243    type OrderedFixedMmbP1Db = ordered::fixed::partitioned::Db<
1244        mmb::Family,
1245        Context,
1246        Digest,
1247        Digest,
1248        Sha256,
1249        OneCap,
1250        1,
1251        32,
1252        Sequential,
1253    >;
1254    type OrderedVariableMmbP1Db = ordered::variable::partitioned::Db<
1255        mmb::Family,
1256        Context,
1257        Digest,
1258        Digest,
1259        Sha256,
1260        OneCap,
1261        1,
1262        32,
1263        Sequential,
1264    >;
1265    type UnorderedFixedMmbP1Db = unordered::fixed::partitioned::Db<
1266        mmb::Family,
1267        Context,
1268        Digest,
1269        Digest,
1270        Sha256,
1271        OneCap,
1272        1,
1273        32,
1274        Sequential,
1275    >;
1276    type UnorderedVariableMmbP1Db = unordered::variable::partitioned::Db<
1277        mmb::Family,
1278        Context,
1279        Digest,
1280        Digest,
1281        Sha256,
1282        OneCap,
1283        1,
1284        32,
1285        Sequential,
1286    >;
1287    type OrderedFixedMmbP2Db = ordered::fixed::partitioned::Db<
1288        mmb::Family,
1289        Context,
1290        Digest,
1291        Digest,
1292        Sha256,
1293        OneCap,
1294        2,
1295        32,
1296        Sequential,
1297    >;
1298    type OrderedVariableMmbP2Db = ordered::variable::partitioned::Db<
1299        mmb::Family,
1300        Context,
1301        Digest,
1302        Digest,
1303        Sha256,
1304        OneCap,
1305        2,
1306        32,
1307        Sequential,
1308    >;
1309    type UnorderedFixedMmbP2Db = unordered::fixed::partitioned::Db<
1310        mmb::Family,
1311        Context,
1312        Digest,
1313        Digest,
1314        Sha256,
1315        OneCap,
1316        2,
1317        32,
1318        Sequential,
1319    >;
1320    type UnorderedVariableMmbP2Db = unordered::variable::partitioned::Db<
1321        mmb::Family,
1322        Context,
1323        Digest,
1324        Digest,
1325        Sha256,
1326        OneCap,
1327        2,
1328        32,
1329        Sequential,
1330    >;
1331
1332    // Helper macro to create an open_db closure for a specific variant.
1333    macro_rules! open_db_fn {
1334        ($db:ty, $cfg:ident) => {
1335            |ctx: Context, partition: String| async move {
1336                <$db>::init(ctx.child("storage"), $cfg::<OneCap>(&partition, &ctx))
1337                    .await
1338                    .unwrap()
1339            }
1340        };
1341    }
1342
1343    // Defines all variants across both supported Merkle families.
1344    macro_rules! with_all_variants {
1345        ($cb:ident!($($args:tt)*)) => {
1346            $cb!($($args)*, "of", OrderedFixedDb, fixed_config);
1347            $cb!($($args)*, "ov", OrderedVariableDb, variable_config);
1348            $cb!($($args)*, "uf", UnorderedFixedDb, fixed_config);
1349            $cb!($($args)*, "uv", UnorderedVariableDb, variable_config);
1350            $cb!($($args)*, "ofp1", OrderedFixedP1Db, fixed_config);
1351            $cb!($($args)*, "ovp1", OrderedVariableP1Db, variable_config);
1352            $cb!($($args)*, "ufp1", UnorderedFixedP1Db, fixed_config);
1353            $cb!($($args)*, "uvp1", UnorderedVariableP1Db, variable_config);
1354            $cb!($($args)*, "ofp2", OrderedFixedP2Db, fixed_config);
1355            $cb!($($args)*, "ovp2", OrderedVariableP2Db, variable_config);
1356            $cb!($($args)*, "ufp2", UnorderedFixedP2Db, fixed_config);
1357            $cb!($($args)*, "uvp2", UnorderedVariableP2Db, variable_config);
1358            $cb!($($args)*, "of-mmb", OrderedFixedMmbDb, fixed_config);
1359            $cb!($($args)*, "ov-mmb", OrderedVariableMmbDb, variable_config);
1360            $cb!($($args)*, "uf-mmb", UnorderedFixedMmbDb, fixed_config);
1361            $cb!($($args)*, "uv-mmb", UnorderedVariableMmbDb, variable_config);
1362            $cb!($($args)*, "ofp1-mmb", OrderedFixedMmbP1Db, fixed_config);
1363            $cb!($($args)*, "ovp1-mmb", OrderedVariableMmbP1Db, variable_config);
1364            $cb!($($args)*, "ufp1-mmb", UnorderedFixedMmbP1Db, fixed_config);
1365            $cb!($($args)*, "uvp1-mmb", UnorderedVariableMmbP1Db, variable_config);
1366            $cb!($($args)*, "ofp2-mmb", OrderedFixedMmbP2Db, fixed_config);
1367            $cb!($($args)*, "ovp2-mmb", OrderedVariableMmbP2Db, variable_config);
1368            $cb!($($args)*, "ufp2-mmb", UnorderedFixedMmbP2Db, fixed_config);
1369            $cb!($($args)*, "uvp2-mmb", UnorderedVariableMmbP2Db, variable_config);
1370        };
1371    }
1372
1373    // Defines 6 ordered variants.
1374    macro_rules! with_ordered_variants {
1375        ($cb:ident!($($args:tt)*)) => {
1376            $cb!($($args)*, "of", OrderedFixedDb, fixed_config);
1377            $cb!($($args)*, "ov", OrderedVariableDb, variable_config);
1378            $cb!($($args)*, "ofp1", OrderedFixedP1Db, fixed_config);
1379            $cb!($($args)*, "ovp1", OrderedVariableP1Db, variable_config);
1380            $cb!($($args)*, "ofp2", OrderedFixedP2Db, fixed_config);
1381            $cb!($($args)*, "ovp2", OrderedVariableP2Db, variable_config);
1382            $cb!($($args)*, "of-mmb", OrderedFixedMmbDb, fixed_config);
1383            $cb!($($args)*, "ov-mmb", OrderedVariableMmbDb, variable_config);
1384            $cb!($($args)*, "ofp1-mmb", OrderedFixedMmbP1Db, fixed_config);
1385            $cb!($($args)*, "ovp1-mmb", OrderedVariableMmbP1Db, variable_config);
1386            $cb!($($args)*, "ofp2-mmb", OrderedFixedMmbP2Db, fixed_config);
1387            $cb!($($args)*, "ovp2-mmb", OrderedVariableMmbP2Db, variable_config);
1388        };
1389    }
1390
1391    // Defines 6 unordered variants.
1392    macro_rules! with_unordered_variants {
1393        ($cb:ident!($($args:tt)*)) => {
1394            $cb!($($args)*, "uf", UnorderedFixedDb, fixed_config);
1395            $cb!($($args)*, "uv", UnorderedVariableDb, variable_config);
1396            $cb!($($args)*, "ufp1", UnorderedFixedP1Db, fixed_config);
1397            $cb!($($args)*, "uvp1", UnorderedVariableP1Db, variable_config);
1398            $cb!($($args)*, "ufp2", UnorderedFixedP2Db, fixed_config);
1399            $cb!($($args)*, "uvp2", UnorderedVariableP2Db, variable_config);
1400            $cb!($($args)*, "uf-mmb", UnorderedFixedMmbDb, fixed_config);
1401            $cb!($($args)*, "uv-mmb", UnorderedVariableMmbDb, variable_config);
1402            $cb!($($args)*, "ufp1-mmb", UnorderedFixedMmbP1Db, fixed_config);
1403            $cb!($($args)*, "uvp1-mmb", UnorderedVariableMmbP1Db, variable_config);
1404            $cb!($($args)*, "ufp2-mmb", UnorderedFixedMmbP2Db, fixed_config);
1405            $cb!($($args)*, "uvp2-mmb", UnorderedVariableMmbP2Db, variable_config);
1406        };
1407    }
1408
1409    // Runner macros - receive common args followed by (label, type, config).
1410    macro_rules! test_simple {
1411        ($f:expr, $l:literal, $db:ty, $cfg:ident) => {
1412            Box::pin(async {
1413                $f(open_db_fn!($db, $cfg));
1414            })
1415            .await
1416        };
1417    }
1418
1419    // Macro to run a test on DB variants.
1420    macro_rules! for_all_variants {
1421        (simple: $f:expr) => {{
1422            with_all_variants!(test_simple!($f));
1423        }};
1424        (ordered: $f:expr) => {{
1425            with_ordered_variants!(test_simple!($f));
1426        }};
1427        (unordered: $f:expr) => {{
1428            with_unordered_variants!(test_simple!($f));
1429        }};
1430    }
1431
1432    // Wrapper functions for build_big tests with ordered/unordered expected values.
1433    fn test_ordered_build_big<M, C, F, Fut>(open_db: F)
1434    where
1435        M: merkle::Graftable,
1436        C: DbAny<M>,
1437        C::Key: TestKey,
1438        <C as DbAny<M>>::Value: TestValue,
1439        F: FnMut(Context, String) -> Fut + Clone,
1440        Fut: Future<Output = C>,
1441    {
1442        test_current_db_build_big::<M, C, F, Fut>(open_db);
1443    }
1444
1445    fn test_unordered_build_big<M, C, F, Fut>(open_db: F)
1446    where
1447        M: merkle::Graftable,
1448        C: DbAny<M>,
1449        C::Key: TestKey,
1450        <C as DbAny<M>>::Value: TestValue,
1451        F: FnMut(Context, String) -> Fut + Clone,
1452        Fut: Future<Output = C>,
1453    {
1454        test_current_db_build_big::<M, C, F, Fut>(open_db);
1455    }
1456
1457    #[test_group("slow")]
1458    #[test_traced("WARN")]
1459    fn test_all_variants_build_random_close_reopen() {
1460        let executor = deterministic::Runner::default();
1461        executor.start(|_context| async move {
1462            for_all_variants!(simple: test_build_random_close_reopen);
1463        });
1464    }
1465
1466    #[test_group("slow")]
1467    #[test_traced("WARN")]
1468    fn test_all_variants_simulate_write_failures() {
1469        let executor = deterministic::Runner::default();
1470        executor.start(|_context| async move {
1471            for_all_variants!(simple: test_simulate_write_failures);
1472        });
1473    }
1474
1475    #[test_group("slow")]
1476    #[test_traced("WARN")]
1477    fn test_all_variants_different_pruning_delays_same_root() {
1478        let executor = deterministic::Runner::default();
1479        executor.start(|_context| async move {
1480            for_all_variants!(simple: test_different_pruning_delays_same_root);
1481        });
1482    }
1483
1484    #[test_group("slow")]
1485    #[test_traced("WARN")]
1486    fn test_all_variants_sync_persists_bitmap_pruning_boundary() {
1487        let executor = deterministic::Runner::default();
1488        executor.start(|_context| async move {
1489            for_all_variants!(simple: test_sync_persists_bitmap_pruning_boundary);
1490        });
1491    }
1492
1493    #[test_traced("WARN")]
1494    fn test_all_variants_stale_batch_side_effect_free() {
1495        let executor = deterministic::Runner::default();
1496        executor.start(|_context| async move {
1497            for_all_variants!(simple: test_stale_batch_side_effect_free);
1498        });
1499    }
1500
1501    #[test_group("slow")]
1502    #[test_traced("WARN")]
1503    fn test_ordered_variants_build_big() {
1504        let executor = deterministic::Runner::default();
1505        executor.start(|_context| async move {
1506            for_all_variants!(ordered: test_ordered_build_big);
1507        });
1508    }
1509
1510    #[test_group("slow")]
1511    #[test_traced("WARN")]
1512    fn test_unordered_variants_build_big() {
1513        let executor = deterministic::Runner::default();
1514        executor.start(|_context| async move {
1515            for_all_variants!(unordered: test_unordered_build_big);
1516        });
1517    }
1518
1519    #[test_group("slow")]
1520    #[test_traced("DEBUG")]
1521    fn test_ordered_variants_build_small_close_reopen() {
1522        let executor = deterministic::Runner::default();
1523        executor.start(|_context| async move {
1524            for_all_variants!(ordered: ordered::tests::test_build_small_close_reopen);
1525        });
1526    }
1527
1528    #[test_group("slow")]
1529    #[test_traced("DEBUG")]
1530    fn test_unordered_variants_build_small_close_reopen() {
1531        let executor = deterministic::Runner::default();
1532        executor.start(|_context| async move {
1533            for_all_variants!(unordered: unordered::tests::test_build_small_close_reopen);
1534        });
1535    }
1536
1537    // ---- Current-level batch API tests ----
1538    //
1539    // These exercise the current wrapper's batch methods (root, ops_root,
1540    // MerkleizedBatch::get, batch chaining) which layer bitmap and grafted tree
1541    // computation on top of the `any` batch.
1542
1543    fn key(i: u64) -> Digest {
1544        Sha256::hash(&i.to_be_bytes())
1545    }
1546
1547    fn val(i: u64) -> Digest {
1548        Sha256::hash(&(i + 10000).to_be_bytes())
1549    }
1550
1551    async fn mmb_commit(
1552        db: &mut UnorderedVariableMmbDb,
1553        writes: impl IntoIterator<Item = (Digest, Option<Digest>)>,
1554    ) {
1555        let mut batch = db.new_batch();
1556        for (k, v) in writes {
1557            batch = batch.write(k, v);
1558        }
1559        let merkleized = batch.merkleize(db, None).await.unwrap();
1560        db.apply_batch(merkleized).await.unwrap();
1561        db.commit().await.unwrap();
1562    }
1563
1564    async fn commit_writes_with_metadata(
1565        db: &mut UnorderedVariableDb,
1566        writes: impl IntoIterator<Item = (Digest, Option<Digest>)>,
1567        metadata: Option<Digest>,
1568    ) -> std::ops::Range<Location<mmr::Family>> {
1569        let mut batch = db.new_batch();
1570        for (k, v) in writes {
1571            batch = batch.write(k, v);
1572        }
1573        let merkleized = batch.merkleize(db, metadata).await.unwrap();
1574        let range = db.apply_batch(merkleized).await.unwrap();
1575        db.commit().await.unwrap();
1576        range
1577    }
1578
1579    #[test_traced("INFO")]
1580    fn test_current_rewind_recovery() {
1581        let executor = deterministic::Runner::default();
1582        executor.start(|context| async move {
1583            let partition = "current-rewind-recovery";
1584            let ctx = context.child("db");
1585            let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
1586                ctx.child("storage"),
1587                variable_config::<OneCap>(partition, &ctx),
1588            )
1589            .await
1590            .unwrap();
1591            let initial_size = db.bounds().await.end;
1592            let initial_root = db.root();
1593            let initial_ops_root = db.ops_root();
1594            let initial_floor = db.inactivity_floor_loc();
1595
1596            let metadata_a = val(900);
1597            let first_range = commit_writes_with_metadata(
1598                &mut db,
1599                [(key(0), Some(val(0))), (key(1), Some(val(1)))],
1600                Some(metadata_a),
1601            )
1602            .await;
1603            assert_eq!(first_range.start, initial_size);
1604            let size_before = db.bounds().await.end;
1605            let root_before = db.root();
1606            let ops_root_before = db.ops_root();
1607            let floor_before = db.inactivity_floor_loc();
1608            assert_eq!(size_before, first_range.end);
1609
1610            let metadata_b = val(901);
1611            let second_range = commit_writes_with_metadata(
1612                &mut db,
1613                [
1614                    (key(0), Some(val(100))),
1615                    (key(1), None),
1616                    (key(2), Some(val(2))),
1617                ],
1618                Some(metadata_b),
1619            )
1620            .await;
1621            assert_eq!(second_range.start, size_before);
1622            assert_ne!(db.root(), root_before);
1623            assert_eq!(db.get_metadata().await.unwrap(), Some(val(901)));
1624            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(100)));
1625            assert_eq!(db.get(&key(1)).await.unwrap(), None);
1626            assert_eq!(db.get(&key(2)).await.unwrap(), Some(val(2)));
1627
1628            db.rewind(size_before).await.unwrap();
1629            assert_eq!(db.bounds().await.end, size_before);
1630            assert_eq!(db.root(), root_before);
1631            assert_eq!(db.ops_root(), ops_root_before);
1632            assert_eq!(db.inactivity_floor_loc(), floor_before);
1633            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_a));
1634            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
1635            assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
1636            assert_eq!(db.get(&key(2)).await.unwrap(), None);
1637
1638            db.commit().await.unwrap();
1639            drop(db);
1640
1641            let reopened: UnorderedVariableDb = UnorderedVariableDb::init(
1642                context.child("reopen"),
1643                variable_config::<OneCap>(partition, &context),
1644            )
1645            .await
1646            .unwrap();
1647            assert_eq!(reopened.bounds().await.end, size_before);
1648            assert_eq!(reopened.root(), root_before);
1649            assert_eq!(reopened.ops_root(), ops_root_before);
1650            assert_eq!(reopened.inactivity_floor_loc(), floor_before);
1651            assert_eq!(reopened.get_metadata().await.unwrap(), Some(val(900)));
1652            assert_eq!(reopened.get(&key(0)).await.unwrap(), Some(val(0)));
1653            assert_eq!(reopened.get(&key(1)).await.unwrap(), Some(val(1)));
1654            assert_eq!(reopened.get(&key(2)).await.unwrap(), None);
1655
1656            let mut reopened = reopened;
1657            reopened.rewind(initial_size).await.unwrap();
1658            assert_eq!(reopened.bounds().await.end, initial_size);
1659            assert_eq!(reopened.root(), initial_root);
1660            assert_eq!(reopened.ops_root(), initial_ops_root);
1661            assert_eq!(reopened.inactivity_floor_loc(), initial_floor);
1662            assert_eq!(reopened.get_metadata().await.unwrap(), None);
1663            assert_eq!(reopened.get(&key(0)).await.unwrap(), None);
1664            assert_eq!(reopened.get(&key(1)).await.unwrap(), None);
1665            assert_eq!(reopened.get(&key(2)).await.unwrap(), None);
1666
1667            reopened.commit().await.unwrap();
1668            drop(reopened);
1669
1670            let reopened_initial: UnorderedVariableDb = UnorderedVariableDb::init(
1671                context.child("reopen_initial"),
1672                variable_config::<OneCap>(partition, &context),
1673            )
1674            .await
1675            .unwrap();
1676            assert_eq!(reopened_initial.bounds().await.end, initial_size);
1677            assert_eq!(reopened_initial.root(), initial_root);
1678            assert_eq!(reopened_initial.ops_root(), initial_ops_root);
1679            assert_eq!(reopened_initial.inactivity_floor_loc(), initial_floor);
1680            assert_eq!(reopened_initial.get_metadata().await.unwrap(), None);
1681            assert_eq!(reopened_initial.get(&key(0)).await.unwrap(), None);
1682            assert_eq!(reopened_initial.get(&key(1)).await.unwrap(), None);
1683            assert_eq!(reopened_initial.get(&key(2)).await.unwrap(), None);
1684
1685            reopened_initial.destroy().await.unwrap();
1686        });
1687    }
1688
1689    #[test_traced("INFO")]
1690    fn test_current_rewind_recovery_pruned_repeated_updates() {
1691        let executor = deterministic::Runner::default();
1692        executor.start(|context| async move {
1693            const COMMITS: u64 = 96;
1694
1695            let partition = "current-rewind-pruned-recovery";
1696            let ctx = context.child("db");
1697            let mut db: UnorderedVariableDb =
1698                UnorderedVariableDb::init(ctx.child("storage"), variable_config::<OneCap>(partition, &ctx))
1699                    .await
1700                    .unwrap();
1701
1702            let key0 = key(0);
1703            let mut history = Vec::new();
1704            for round in 0..COMMITS {
1705                commit_writes_with_metadata(
1706                    &mut db,
1707                    [(key0, Some(val(20_000 + round)))],
1708                    None,
1709                )
1710                .await;
1711                history.push((
1712                    db.bounds().await.end,
1713                    db.inactivity_floor_loc(),
1714                    db.root(),
1715                    db.ops_root(),
1716                    val(20_000 + round),
1717                ));
1718            }
1719
1720            // Keep most ops-log history, but force bitmap pruning so rewind uses pinned-node
1721            // reconstruction (`pruned_chunks > 0` path).
1722            db.prune(Location::new(1)).await.unwrap();
1723            let pruned_bits = db.pruned_bits();
1724            assert!(pruned_bits > 0, "expected bitmap pruning for rewind test");
1725            let bounds = db.bounds().await;
1726
1727            let (target_size, target_root, target_ops_root, target_value) = history
1728                .iter()
1729                .enumerate()
1730                .find_map(|(idx, (size, floor, root, ops_root, value))| {
1731                    let removed_commits = history.len() - idx - 1;
1732                    if removed_commits >= 3 && *size > bounds.start && *floor >= pruned_bits {
1733                        Some((*size, *root, *ops_root, *value))
1734                    } else {
1735                        None
1736                    }
1737                })
1738                .unwrap_or_else(|| {
1739                    panic!(
1740                        "expected legal pruned rewind target with repeated updates; bounds={bounds:?}, pruned_bits={pruned_bits}, latest_floor={:?}, history={history:?}",
1741                        db.inactivity_floor_loc()
1742                    )
1743                });
1744
1745            db.rewind(target_size).await.unwrap();
1746            assert_eq!(db.root(), target_root);
1747            assert_eq!(db.ops_root(), target_ops_root);
1748            assert_eq!(db.bounds().await.end, target_size);
1749            assert_eq!(db.get(&key0).await.unwrap(), Some(target_value));
1750
1751            db.commit().await.unwrap();
1752            drop(db);
1753
1754            let mut reopened: UnorderedVariableDb = UnorderedVariableDb::init(
1755                context.child("reopen_pruned_recovery"),
1756                variable_config::<OneCap>(partition, &context),
1757            )
1758            .await
1759            .unwrap();
1760            assert_eq!(reopened.root(), target_root);
1761            assert_eq!(reopened.ops_root(), target_ops_root);
1762            assert_eq!(reopened.bounds().await.end, target_size);
1763            assert_eq!(reopened.get(&key0).await.unwrap(), Some(target_value));
1764
1765            let metadata_after_rewind = val(30_000);
1766            let new_key = key(1);
1767            let new_value = val(30_001);
1768            let expected_end = commit_writes_with_metadata(
1769                &mut reopened,
1770                [(new_key, Some(new_value))],
1771                Some(metadata_after_rewind),
1772            )
1773            .await
1774            .end;
1775            let root_after_new_write = reopened.root();
1776            let ops_root_after_new_write = reopened.ops_root();
1777            assert_eq!(reopened.bounds().await.end, expected_end);
1778            assert_eq!(reopened.get_metadata().await.unwrap(), Some(metadata_after_rewind));
1779            assert_eq!(reopened.get(&key0).await.unwrap(), Some(target_value));
1780            assert_eq!(reopened.get(&new_key).await.unwrap(), Some(new_value));
1781
1782            drop(reopened);
1783            let reopened_after_new_write: UnorderedVariableDb = UnorderedVariableDb::init(
1784                context.child("reopen_pruned_after_new_write"),
1785                variable_config::<OneCap>(partition, &context),
1786            )
1787            .await
1788            .unwrap();
1789            assert_eq!(reopened_after_new_write.root(), root_after_new_write);
1790            assert_eq!(reopened_after_new_write.ops_root(), ops_root_after_new_write);
1791            assert_eq!(reopened_after_new_write.bounds().await.end, expected_end);
1792            assert_eq!(
1793                reopened_after_new_write.get_metadata().await.unwrap(),
1794                Some(metadata_after_rewind)
1795            );
1796            assert_eq!(reopened_after_new_write.get(&key0).await.unwrap(), Some(target_value));
1797            assert_eq!(
1798                reopened_after_new_write.get(&new_key).await.unwrap(),
1799                Some(new_value)
1800            );
1801
1802            reopened_after_new_write.destroy().await.unwrap();
1803        });
1804    }
1805
1806    /// Verify that the delayed-merge settlement guard holds `sync_boundary` at 0 during the
1807    /// unsettled window, so `prune` rejects any non-zero `prune_loc`.
1808    #[test_traced("INFO")]
1809    fn test_current_mmb_settlement_guard_defers_pruning() {
1810        let executor = deterministic::Runner::default();
1811        executor.start(|context| async move {
1812            const COMMITS: u64 = 100;
1813
1814            let partition = "current-mmb-reopen-prove-after-prune";
1815            let ctx = context.child("db");
1816            let mut db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
1817                ctx.child("storage"),
1818                variable_config::<OneCap>(partition, &ctx),
1819            )
1820            .await
1821            .unwrap();
1822
1823            let k = key(0);
1824            let mut expected = None;
1825            for round in 0..COMMITS {
1826                expected = Some(val(50_000 + round));
1827                let mut batch = db.new_batch();
1828                batch = batch.write(k, expected);
1829                let merkleized = batch.merkleize(&db, None).await.unwrap();
1830                db.apply_batch(merkleized).await.unwrap();
1831                db.commit().await.unwrap();
1832            }
1833
1834            let root_before = db.root();
1835            assert!(
1836                *db.inactivity_floor_loc() >= 256,
1837                "expected inactivity floor past chunk 0"
1838            );
1839            assert_eq!(
1840                *db.sync_boundary(),
1841                0,
1842                "settlement guard should hold boundary at 0 during unsettled window"
1843            );
1844
1845            // `prune` must reject any non-zero loc because sync_boundary is still 0.
1846            let result = db.prune(Location::<mmb::Family>::new(1)).await;
1847            assert!(
1848                matches!(result, Err(Error::PruneBeyondMinRequired(_, _))),
1849                "expected PruneBeyondMinRequired, got {result:?}"
1850            );
1851            assert_eq!(db.pruned_bits(), 0);
1852            db.sync().await.unwrap();
1853            drop(db);
1854
1855            // Reopen: no pruning occurred, state is unchanged.
1856            let reopened: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
1857                context.child("reopen"),
1858                variable_config::<OneCap>(partition, &context),
1859            )
1860            .await
1861            .unwrap();
1862
1863            assert_eq!(reopened.root(), root_before);
1864            assert_eq!(reopened.get(&k).await.unwrap(), expected);
1865
1866            // key_value_proof: RangeProof::new must also handle pruned chunk 0.
1867            let hasher = qmdb::hasher::<Sha256>();
1868            let _proof = reopened.key_value_proof(&hasher, k).await.unwrap();
1869
1870            reopened.destroy().await.unwrap();
1871        });
1872    }
1873
1874    #[test_traced("INFO")]
1875    fn test_current_mmb_rewind_rejects_unsettled_pruned_window() {
1876        let executor = deterministic::Runner::default();
1877        executor.start(|context| async move {
1878            const COMMITS: u64 = 320;
1879            const N: usize = 32;
1880
1881            let partition = "current-mmb-rewind-unsettled-window";
1882            let ctx = context.child("db");
1883            let mut db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
1884                ctx.child("storage"),
1885                variable_config::<OneCap>(partition, &ctx),
1886            )
1887            .await
1888            .unwrap();
1889
1890            let key0 = key(0);
1891            let mut history = Vec::new();
1892            for round in 0..COMMITS {
1893                let mut batch = db.new_batch();
1894                batch = batch.write(key0, Some(val(60_000 + round)));
1895                let merkleized = batch.merkleize(&db, None).await.unwrap();
1896                db.apply_batch(merkleized).await.unwrap();
1897                db.commit().await.unwrap();
1898                history.push((db.bounds().await.end, db.inactivity_floor_loc()));
1899            }
1900
1901            db.prune(db.sync_boundary()).await.unwrap();
1902            let pruned_bits = db.pruned_bits();
1903            assert!(pruned_bits > 0, "expected MMB bitmap pruning to be active");
1904            db.sync().await.unwrap();
1905
1906            let chunk_bits = commonware_utils::bitmap::BitMap::<N>::CHUNK_SIZE_BITS;
1907            let pruned_chunks = (pruned_bits / chunk_bits) as u64;
1908            let gh = super::grafting::height::<N>();
1909            let youngest = pruned_chunks - 1;
1910            let pair_chunk = youngest & !1;
1911            let pair_start = pair_chunk << gh;
1912            let pair_pos = <mmb::Family as merkle::Graftable>::subtree_root_position(
1913                merkle::Location::<mmb::Family>::new(pair_start),
1914                gh + 1,
1915            );
1916            let absorbed_after =
1917                <mmb::Family as merkle::Graftable>::peak_birth_size(pair_pos, gh + 1);
1918
1919            let unsafe_target = history
1920                .iter()
1921                .filter_map(|(size, floor)| {
1922                    let s = **size;
1923                    if s >= pruned_bits && s < absorbed_after && **floor >= pruned_bits {
1924                        Some(s)
1925                    } else {
1926                        None
1927                    }
1928                })
1929                .max()
1930                .unwrap_or_else(|| {
1931                    panic!(
1932                        "expected rewind target in unsettled window: pruned_bits={pruned_bits}, absorbed_after={absorbed_after}, history={history:?}"
1933                    )
1934                });
1935
1936            let err = db
1937                .rewind(merkle::Location::<mmb::Family>::new(unsafe_target))
1938                .await
1939                .unwrap_err();
1940            assert!(
1941                matches!(err, Error::Journal(crate::journal::Error::ItemPruned(_))),
1942                "unexpected rewind error for unsettled delayed-merge window: {err:?}"
1943            );
1944            drop(db);
1945
1946            let reopened: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
1947                context.child("reopen"),
1948                variable_config::<OneCap>(partition, &context),
1949            )
1950            .await
1951            .unwrap();
1952            reopened.destroy().await.unwrap();
1953        });
1954    }
1955
1956    /// Verify that `Db::prune` never advances the ops journal past the settled bitmap
1957    /// pruning boundary on a delayed-merge (MMB) family. The journal's lower bound must be
1958    /// less than or equal to `sync_boundary()`, and the test setup must force the lag to
1959    /// be strictly active so the assertion is not vacuous.
1960    #[test_traced]
1961    fn test_current_mmb_prune_respects_sync_boundary() {
1962        let executor = deterministic::Runner::default();
1963        executor.start(|context| async move {
1964            const COMMITS: u64 = 320;
1965
1966            let ctx = context.child("db");
1967            let mut db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
1968                ctx.child("storage"),
1969                variable_config::<OneCap>("prune-clip-mmb", &ctx),
1970            )
1971            .await
1972            .unwrap();
1973
1974            let k = key(0);
1975            for round in 0..COMMITS {
1976                mmb_commit(&mut db, [(k, Some(val(70_000 + round)))]).await;
1977            }
1978
1979            db.prune(db.sync_boundary()).await.unwrap();
1980
1981            let boundary = db.sync_boundary();
1982            let floor = db.inactivity_floor_loc();
1983            assert!(
1984                boundary < floor,
1985                "delayed-merge lag must be strictly active: boundary={boundary}, floor={floor}"
1986            );
1987            assert!(
1988                db.bounds().await.start <= boundary,
1989                "ops journal was pruned past the settled bitmap boundary: \
1990                 bounds.start={}, boundary={boundary}",
1991                db.bounds().await.start
1992            );
1993
1994            db.destroy().await.unwrap();
1995        });
1996    }
1997
1998    /// Verify that on a non-delayed-merge (MMR) family `sync_boundary()` lags the inactivity
1999    /// floor only by chunk alignment (less than one chunk) — never by a delayed-merge absorption
2000    /// window. Guards against an accidental regression that would introduce a larger lag on
2001    /// families that don't need it.
2002    #[test_traced]
2003    fn test_current_mmr_prune_boundary_lag_is_only_chunk_alignment() {
2004        let executor = deterministic::Runner::default();
2005        executor.start(|context| async move {
2006            const COMMITS: u64 = 320;
2007            const N: usize = 32;
2008
2009            let ctx = context.child("db");
2010            let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
2011                ctx.child("storage"),
2012                variable_config::<OneCap>("prune-clip-mmr", &ctx),
2013            )
2014            .await
2015            .unwrap();
2016
2017            for round in 0..COMMITS {
2018                commit_writes_with_metadata(
2019                    &mut db,
2020                    [(key(0), Some(val(80_000 + round)))],
2021                    None,
2022                )
2023                .await;
2024            }
2025
2026            db.prune(db.sync_boundary()).await.unwrap();
2027
2028            let boundary = db.sync_boundary();
2029            let floor = db.inactivity_floor_loc();
2030            let chunk_bits = commonware_utils::bitmap::BitMap::<N>::CHUNK_SIZE_BITS;
2031            assert!(
2032                boundary <= floor && *floor - *boundary < chunk_bits,
2033                "MMR lag should be only chunk alignment: boundary={boundary}, floor={floor}, chunk_bits={chunk_bits}"
2034            );
2035            assert!(
2036                db.bounds().await.start <= boundary,
2037                "ops journal bounds must be <= sync_boundary: bounds.start={}, boundary={boundary}",
2038                db.bounds().await.start
2039            );
2040
2041            db.destroy().await.unwrap();
2042        });
2043    }
2044
2045    /// Verify that `prune(loc)` with `loc < sync_boundary()` prunes the ops journal only as far
2046    /// as the caller requested.
2047    #[test_traced]
2048    fn test_current_prune_below_settled_boundary_is_honored() {
2049        let executor = deterministic::Runner::default();
2050        executor.start(|context| async move {
2051            const COMMITS: u64 = 100;
2052
2053            let ctx = context.child("db");
2054            let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
2055                ctx.child("storage"),
2056                variable_config::<OneCap>("prune-below-boundary", &ctx),
2057            )
2058            .await
2059            .unwrap();
2060
2061            for round in 0..COMMITS {
2062                commit_writes_with_metadata(&mut db, [(key(0), Some(val(90_000 + round)))], None)
2063                    .await;
2064            }
2065
2066            assert!(*db.inactivity_floor_loc() > 1);
2067            let small = Location::new(1);
2068            db.prune(small).await.unwrap();
2069
2070            assert!(
2071                db.bounds().await.start <= small,
2072                "journal pruning exceeded the caller-supplied target: bounds.start={}, requested={small}",
2073                db.bounds().await.start
2074            );
2075
2076            db.destroy().await.unwrap();
2077        });
2078    }
2079
2080    /// Prune, then grow without pruning again so delayed MMB merges occur inside the
2081    /// already-pruned region. Verify proof + reopen correctness.
2082    #[test_traced]
2083    fn test_current_mmb_reopen_and_prove_after_prune_delayed_merge() {
2084        let executor = deterministic::Runner::default();
2085        executor.start(|context| async move {
2086            let db_ctx = context.child("db_init");
2087            let mut db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
2088                db_ctx.child("db"),
2089                variable_config::<OneCap>("test_prune_delayed_merge", &db_ctx),
2090            )
2091            .await
2092            .unwrap();
2093
2094            let k = key(0);
2095
2096            for round in 0..200u64 {
2097                mmb_commit(&mut db, [(k, Some(val(60_000 + round)))]).await;
2098            }
2099
2100            db.prune(db.sync_boundary()).await.unwrap();
2101            db.sync().await.unwrap();
2102
2103            // Keep growing without pruning: delayed merges now occur in the pruned region.
2104            for round in 200..300u64 {
2105                mmb_commit(&mut db, [(key(1), Some(val(round)))]).await;
2106            }
2107
2108            let hasher = qmdb::hasher::<Sha256>();
2109            let proof = db.key_value_proof(&hasher, k).await.unwrap();
2110            assert!(UnorderedVariableMmbDb::verify_key_value_proof(
2111                &hasher,
2112                k,
2113                val(60_000 + 199),
2114                &proof,
2115                &db.root()
2116            ));
2117
2118            let target_root = db.root();
2119            drop(db);
2120
2121            let reopen_ctx = context.child("db_reopen");
2122            let reopened: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
2123                reopen_ctx.child("db"),
2124                variable_config::<OneCap>("test_prune_delayed_merge", &reopen_ctx),
2125            )
2126            .await
2127            .unwrap();
2128
2129            assert_eq!(reopened.root(), target_root);
2130
2131            let hasher = qmdb::hasher::<Sha256>();
2132            let proof = reopened.key_value_proof(&hasher, k).await.unwrap();
2133            assert!(UnorderedVariableMmbDb::verify_key_value_proof(
2134                &hasher,
2135                k,
2136                val(60_000 + 199),
2137                &proof,
2138                &reopened.root()
2139            ));
2140
2141            reopened.destroy().await.unwrap();
2142        });
2143    }
2144
2145    /// Grow past 2 full pruned chunks, prune, reopen, verify root + value.
2146    #[test_traced]
2147    fn test_current_mmb_reopen_after_prune_two_chunks() {
2148        let executor = deterministic::Runner::default();
2149        executor.start(|context| async move {
2150            let db_ctx = context.child("db");
2151            let mut db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
2152                db_ctx.child("db"),
2153                variable_config::<OneCap>("test_prune_two", &db_ctx),
2154            )
2155            .await
2156            .unwrap();
2157
2158            let k = key(0);
2159            // Always assigned before the loop breaks.
2160            let mut expected;
2161
2162            // Keep growing until the settle guard allows 2+ pruned chunks.
2163            // The absorber for chunk pair [0,1] at gh=8 needs ~766 ops leaves.
2164            let mut round = 0u64;
2165            loop {
2166                expected = Some(val(60_000 + round));
2167                mmb_commit(&mut db, [(k, expected)]).await;
2168                round += 1;
2169                db.prune(db.sync_boundary()).await.unwrap();
2170                if db.pruned_bits() >= 512 {
2171                    break;
2172                }
2173                assert!(
2174                    round < 500,
2175                    "failed to reach 2 pruned chunks after {round} commits"
2176                );
2177            }
2178            db.sync().await.unwrap();
2179
2180            let target_root = db.root();
2181            drop(db);
2182
2183            let reopen_ctx = context.child("db_reopen");
2184            let reopened: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
2185                reopen_ctx.child("db"),
2186                variable_config::<OneCap>("test_prune_two", &reopen_ctx),
2187            )
2188            .await
2189            .unwrap();
2190
2191            assert_eq!(reopened.root(), target_root);
2192            assert_eq!(reopened.get(&k).await.unwrap(), expected);
2193            reopened.destroy().await.unwrap();
2194        });
2195    }
2196
2197    /// Three rounds of grow + prune + reopen. Verifies repeated prune cycles don't diverge.
2198    #[test_traced]
2199    fn test_current_mmb_repeated_prune() {
2200        let executor = deterministic::Runner::default();
2201        executor.start(|context| async move {
2202            let mut db_ctx = context.child("db_init");
2203            let mut db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
2204                db_ctx.child("db"),
2205                variable_config::<OneCap>("test_repeated_prune", &db_ctx),
2206            )
2207            .await
2208            .unwrap();
2209
2210            for round in 0..3u64 {
2211                let k = key(round * 1000);
2212                let mut expected = None;
2213                for i in 0..90 {
2214                    expected = Some(val(round * 1000 + i));
2215                    mmb_commit(&mut db, [(k, expected)]).await;
2216                }
2217
2218                db.prune(db.sync_boundary()).await.unwrap();
2219                db.sync().await.unwrap();
2220
2221                let root_before = db.root();
2222                db_ctx = context.child("db").with_attribute("round", round);
2223
2224                let prev_db = db;
2225                db = UnorderedVariableMmbDb::init(
2226                    db_ctx.child("db"),
2227                    variable_config::<OneCap>("test_repeated_prune", &db_ctx),
2228                )
2229                .await
2230                .unwrap();
2231
2232                assert_eq!(db.root(), root_before);
2233                assert_eq!(db.get(&k).await.unwrap(), expected);
2234                drop(prev_db);
2235            }
2236
2237            db.destroy().await.unwrap();
2238        });
2239    }
2240
2241    /// Step-by-step growth after prune, comparing roots against an unpruned reference.
2242    #[test_traced]
2243    fn test_current_mmb_stepwise_growth_matches_unpruned_reference() {
2244        let executor = deterministic::Runner::default();
2245        executor.start(|context| async move {
2246            let db_ctx = context.child("db_stepwise");
2247            let mut db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
2248                db_ctx.child("db"),
2249                variable_config::<OneCap>("test_stepwise", &db_ctx),
2250            )
2251            .await
2252            .unwrap();
2253
2254            let ref_ctx = context.child("ref_stepwise");
2255            let mut ref_db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
2256                ref_ctx.child("db"),
2257                variable_config::<OneCap>("test_stepwise_ref", &ref_ctx),
2258            )
2259            .await
2260            .unwrap();
2261
2262            let k = key(0);
2263            let mut commit_idx = 0u64;
2264
2265            // Grow until the inactivity floor reaches 4 chunks.
2266            while *db.inactivity_floor_loc() < 1024 {
2267                let value = Some(val(80_000 + commit_idx));
2268                mmb_commit(&mut db, [(k, value)]).await;
2269                mmb_commit(&mut ref_db, [(k, value)]).await;
2270                commit_idx += 1;
2271            }
2272
2273            db.prune(db.sync_boundary()).await.unwrap();
2274            db.sync().await.unwrap();
2275            assert_eq!(
2276                db.root(),
2277                ref_db.root(),
2278                "root mismatch immediately after prune"
2279            );
2280
2281            // Step-by-step growth through the delayed-merge window.
2282            loop {
2283                let db_leaves =
2284                    *Location::<mmb::Family>::try_from(db.any.log.merkle.size()).unwrap();
2285                if db_leaves >= 1560 {
2286                    break;
2287                }
2288
2289                let value = Some(val(80_000 + commit_idx));
2290                mmb_commit(&mut db, [(k, value)]).await;
2291                mmb_commit(&mut ref_db, [(k, value)]).await;
2292                commit_idx += 1;
2293
2294                let db_leaves =
2295                    *Location::<mmb::Family>::try_from(db.any.log.merkle.size()).unwrap();
2296                assert_eq!(
2297                    db.root(),
2298                    ref_db.root(),
2299                    "stepwise root mismatch: leaves={db_leaves}, commit_idx={commit_idx}"
2300                );
2301            }
2302
2303            db.destroy().await.unwrap();
2304            ref_db.destroy().await.unwrap();
2305        });
2306    }
2307
2308    /// Multi-round prune + reopen + proof against an unpruned reference.
2309    #[test_traced]
2310    fn test_current_mmb_large_repeated_prune_matches_unpruned_reference() {
2311        let executor = deterministic::Runner::default();
2312        executor.start(|context| async move {
2313            const ROUNDS: u64 = 8;
2314            const COMMITS_PER_ROUND: u64 = 120;
2315
2316            let mut db_ctx = context.child("db_init");
2317            let mut db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
2318                db_ctx.child("db"),
2319                variable_config::<OneCap>("test_large_prune", &db_ctx),
2320            )
2321            .await
2322            .unwrap();
2323
2324            let ref_ctx = context.child("ref");
2325            let mut ref_db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
2326                ref_ctx.child("db"),
2327                variable_config::<OneCap>("test_large_prune_ref", &ref_ctx),
2328            )
2329            .await
2330            .unwrap();
2331
2332            let k = key(0);
2333            let mut expected = None;
2334
2335            for round in 0..ROUNDS {
2336                for i in 0..COMMITS_PER_ROUND {
2337                    let value = Some(val(round * 10_000 + i));
2338                    expected = value;
2339                    mmb_commit(&mut db, [(k, value)]).await;
2340                    mmb_commit(&mut ref_db, [(k, value)]).await;
2341                }
2342
2343                assert_eq!(
2344                    db.root(),
2345                    ref_db.root(),
2346                    "root mismatch before prune at round {round}"
2347                );
2348
2349                db.prune(db.sync_boundary()).await.unwrap();
2350                db.sync().await.unwrap();
2351
2352                assert_eq!(
2353                    db.root(),
2354                    ref_db.root(),
2355                    "root mismatch after prune at round {round}"
2356                );
2357
2358                let hasher = qmdb::hasher::<Sha256>();
2359                let proof = db.key_value_proof(&hasher, k).await.unwrap();
2360                assert!(
2361                    UnorderedVariableMmbDb::verify_key_value_proof(
2362                        &hasher,
2363                        k,
2364                        expected.expect("value should exist"),
2365                        &proof,
2366                        &db.root()
2367                    ),
2368                    "proof verification failed at round {round}"
2369                );
2370
2371                db_ctx = context.child("db_reopen").with_attribute("round", round);
2372                let prev_db = db;
2373                db = UnorderedVariableMmbDb::init(
2374                    db_ctx.child("db"),
2375                    variable_config::<OneCap>("test_large_prune", &db_ctx),
2376                )
2377                .await
2378                .unwrap();
2379
2380                assert_eq!(
2381                    db.root(),
2382                    ref_db.root(),
2383                    "root mismatch after reopen at round {round}"
2384                );
2385                assert_eq!(
2386                    db.get(&k).await.unwrap(),
2387                    expected,
2388                    "value mismatch after reopen at round {round}"
2389                );
2390
2391                let hasher = qmdb::hasher::<Sha256>();
2392                let proof = db.key_value_proof(&hasher, k).await.unwrap();
2393                assert!(
2394                    UnorderedVariableMmbDb::verify_key_value_proof(
2395                        &hasher,
2396                        k,
2397                        expected.expect("value should exist"),
2398                        &proof,
2399                        &db.root()
2400                    ),
2401                    "proof verification failed after reopen at round {round}"
2402                );
2403
2404                drop(prev_db);
2405            }
2406
2407            db.destroy().await.unwrap();
2408            ref_db.destroy().await.unwrap();
2409        });
2410    }
2411
2412    /// Verify that prune beyond the sync boundary is rejected without mutating state.
2413    #[test_traced]
2414    fn test_current_prune_rejects_beyond_sync_boundary_without_mutation() {
2415        let executor = deterministic::Runner::default();
2416        executor.start(|context| async move {
2417            const COMMITS: u64 = 160;
2418
2419            let partition = "current-prune-beyond-boundary";
2420            let ctx = context.child("db");
2421            let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
2422                ctx.child("storage"),
2423                variable_config::<OneCap>(partition, &ctx),
2424            )
2425            .await
2426            .unwrap();
2427
2428            let key0 = key(0);
2429            for round in 0..COMMITS {
2430                commit_writes_with_metadata(&mut db, [(key0, Some(val(40_000 + round)))], None)
2431                    .await;
2432            }
2433
2434            let expected_root = db.root();
2435            let expected_ops_root = db.ops_root();
2436            let expected_boundary = db.sync_boundary();
2437            let expected_pruned_bits = db.pruned_bits();
2438            let expected_value = db.get(&key0).await.unwrap();
2439
2440            // 32 * 8 = 256 bits per chunk for N=32.
2441            let invalid_prune_loc = Location::new(*expected_boundary + 256);
2442            let result = db.prune(invalid_prune_loc).await;
2443            assert!(
2444                matches!(result, Err(Error::PruneBeyondMinRequired(loc, boundary))
2445                    if loc == invalid_prune_loc && boundary == expected_boundary),
2446                "expected prune rejection above sync boundary, got {result:?}"
2447            );
2448
2449            assert_eq!(db.root(), expected_root);
2450            assert_eq!(db.ops_root(), expected_ops_root);
2451            assert_eq!(db.pruned_bits(), expected_pruned_bits);
2452            assert_eq!(db.get(&key0).await.unwrap(), expected_value);
2453
2454            drop(db);
2455
2456            let reopened: UnorderedVariableDb = UnorderedVariableDb::init(
2457                context.child("reopen"),
2458                variable_config::<OneCap>(partition, &context),
2459            )
2460            .await
2461            .unwrap();
2462            assert_eq!(reopened.root(), expected_root);
2463            assert_eq!(reopened.ops_root(), expected_ops_root);
2464            assert_eq!(reopened.pruned_bits(), expected_pruned_bits);
2465            assert_eq!(reopened.get(&key0).await.unwrap(), expected_value);
2466
2467            reopened.destroy().await.unwrap();
2468        });
2469    }
2470
2471    #[test_traced("INFO")]
2472    fn test_current_rewind_small_delta_large_history() {
2473        let executor = deterministic::Runner::default();
2474        executor.start(|context| async move {
2475            const COMMITS: u64 = 200;
2476
2477            let partition = "current-rewind-small-delta";
2478            let ctx = context.child("db");
2479            let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
2480                ctx.child("storage"),
2481                variable_config::<OneCap>(partition, &ctx),
2482            )
2483            .await
2484            .unwrap();
2485
2486            let key0 = key(0);
2487            let key1 = key(1);
2488            let mut history = Vec::new();
2489
2490            for round in 0..COMMITS {
2491                let key0_value = val(40_000 + round);
2492                let key1_value = if round % 3 == 1 {
2493                    None
2494                } else {
2495                    Some(val(50_000 + round))
2496                };
2497
2498                commit_writes_with_metadata(
2499                    &mut db,
2500                    [(key0, Some(key0_value)), (key1, key1_value)],
2501                    None,
2502                )
2503                .await;
2504
2505                history.push((
2506                    db.bounds().await.end,
2507                    db.root(),
2508                    db.ops_root(),
2509                    key0_value,
2510                    key1_value,
2511                ));
2512            }
2513
2514            let target = *history
2515                .get(history.len() - 3)
2516                .expect("history should contain at least three commits");
2517            let (target_size, target_root, target_ops_root, target_key0, target_key1) = target;
2518
2519            db.rewind(target_size).await.unwrap();
2520            assert_eq!(db.bounds().await.end, target_size);
2521            assert_eq!(db.root(), target_root);
2522            assert_eq!(db.ops_root(), target_ops_root);
2523            assert_eq!(db.get(&key0).await.unwrap(), Some(target_key0));
2524            assert_eq!(db.get(&key1).await.unwrap(), target_key1);
2525
2526            db.commit().await.unwrap();
2527            drop(db);
2528
2529            let reopened: UnorderedVariableDb = UnorderedVariableDb::init(
2530                context.child("reopen_small_delta"),
2531                variable_config::<OneCap>(partition, &context),
2532            )
2533            .await
2534            .unwrap();
2535            assert_eq!(reopened.bounds().await.end, target_size);
2536            assert_eq!(reopened.root(), target_root);
2537            assert_eq!(reopened.ops_root(), target_ops_root);
2538            assert_eq!(reopened.get(&key0).await.unwrap(), Some(target_key0));
2539            assert_eq!(reopened.get(&key1).await.unwrap(), target_key1);
2540
2541            reopened.destroy().await.unwrap();
2542        });
2543    }
2544
2545    #[test_traced("INFO")]
2546    fn test_current_rewind_pruned_target_errors() {
2547        let executor = deterministic::Runner::default();
2548        executor.start(|context| async move {
2549            const KEYS: u64 = 384;
2550
2551            let partition = "current-rewind-pruned";
2552            let ctx = context.child("db");
2553            let mut db: UnorderedVariableDb =
2554                UnorderedVariableDb::init(ctx.child("storage"), variable_config::<OneCap>(partition, &ctx))
2555                    .await
2556                    .unwrap();
2557
2558            let first_range = commit_writes_with_metadata(
2559                &mut db,
2560                (0..KEYS).map(|i| (key(i), Some(val(i)))),
2561                None,
2562            )
2563            .await;
2564            commit_writes_with_metadata(
2565                &mut db,
2566                (0..KEYS).map(|i| (key(i), Some(val(1000 + i)))),
2567                None,
2568            )
2569            .await;
2570
2571            db.prune(db.sync_boundary()).await.unwrap();
2572            let pruned_bits = db.pruned_bits();
2573            assert!(
2574                pruned_bits > *first_range.start,
2575                "expected bitmap pruning boundary above rewind target: pruned_bits={pruned_bits}, target={:?}",
2576                first_range.start
2577            );
2578
2579            let oldest_retained = db.bounds().await.start;
2580            let boundary_err = db.rewind(oldest_retained).await.unwrap_err();
2581            assert!(
2582                matches!(
2583                    boundary_err,
2584                    Error::Journal(crate::journal::Error::ItemPruned(_))
2585                ),
2586                "unexpected rewind error at retained boundary: {boundary_err:?}"
2587            );
2588
2589            let expected_pruned_loc = *first_range.start - 1;
2590            let err = db.rewind(first_range.start).await.unwrap_err();
2591            assert!(
2592                matches!(
2593                    err,
2594                    Error::Journal(crate::journal::Error::ItemPruned(loc))
2595                    if loc == expected_pruned_loc
2596                ),
2597                "unexpected rewind error: {err:?}"
2598            );
2599
2600            db.destroy().await.unwrap();
2601        });
2602    }
2603
2604    #[test_traced("INFO")]
2605    fn test_current_rewind_rejects_target_below_bitmap_floor() {
2606        let executor = deterministic::Runner::default();
2607        executor.start(|context| async move {
2608            const COMMITS: u64 = 96;
2609
2610            let partition = "current-rewind-bitmap-floor";
2611            let ctx = context.child("db");
2612            let mut db: UnorderedVariableDb =
2613                UnorderedVariableDb::init(ctx.child("storage"), variable_config::<OneCap>(partition, &ctx))
2614                    .await
2615                    .unwrap();
2616
2617            let mut history = Vec::new();
2618            for round in 0..COMMITS {
2619                commit_writes_with_metadata(
2620                    &mut db,
2621                    [(key(0), Some(val(10_000 + round)))],
2622                    None,
2623                )
2624                .await;
2625                history.push((db.bounds().await.end, db.inactivity_floor_loc()));
2626            }
2627            assert!(db.inactivity_floor_loc() > Location::new(64));
2628
2629            // Intentionally prune less than the inactivity floor: log retains older ops, but the
2630            // bitmap still prunes to inactivity floor.
2631            let prune_loc = Location::new(1);
2632            db.prune(prune_loc).await.unwrap();
2633            let pruned_bits = db.pruned_bits();
2634            assert!(pruned_bits > 0);
2635            let retained_start = db.bounds().await.start;
2636
2637            // Pick a historical commit that is still within retained log bounds but whose floor is
2638            // below the bitmap pruning boundary.
2639            let rewind_target = history
2640                .iter()
2641                .find_map(|(size, floor)| {
2642                    if *size > *retained_start
2643                        && *size >= pruned_bits
2644                        && *floor >= *retained_start
2645                        && *floor < pruned_bits
2646                    {
2647                        Some(*size)
2648                    } else {
2649                        None
2650                    }
2651                })
2652                .unwrap_or_else(|| {
2653                    panic!(
2654                        "expected rewind target below bitmap boundary. retained_start={retained_start:?}, pruned_bits={pruned_bits}, latest_floor={:?}, history={history:?}",
2655                        db.inactivity_floor_loc()
2656                    )
2657                });
2658
2659            let err = db.rewind(rewind_target).await.unwrap_err();
2660            assert!(
2661                matches!(err, Error::Journal(crate::journal::Error::ItemPruned(_))),
2662                "unexpected rewind error: {err:?}"
2663            );
2664
2665            db.destroy().await.unwrap();
2666        });
2667    }
2668
2669    /// Verify that the speculative canonical root from a merkleized batch matches the root
2670    /// recomputed from committed state after sync + reopen.
2671    ///
2672    /// Uses enough operations to cross a chunk boundary (CHUNK_SIZE_BITS = N*8), which exercises
2673    /// the grafted root computation for newly completed chunks.
2674    pub fn test_speculative_root_matches_committed<M, C, F, Fut>(mut open_db: F)
2675    where
2676        M: merkle::Graftable + 'static,
2677        C: DbAny<M> + 'static,
2678        C::Key: TestKey,
2679        <C as DbAny<M>>::Value: TestValue,
2680        F: FnMut(Context, String) -> Fut + Clone,
2681        Fut: Future<Output = C>,
2682    {
2683        let executor = deterministic::Runner::default();
2684        let mut open_db_clone = open_db.clone();
2685        executor.start(|context| async move {
2686            let partition = "speculative-root".to_string();
2687
2688            // Write enough operations to cross a chunk boundary. With N=32 (CHUNK_SIZE_BITS=256),
2689            // 260 writes + 1 CommitFloor = 261 operations, completing one chunk with 5 ops in the
2690            // next partial chunk. This ensures the grafted root computation must handle the
2691            // newly completed chunk.
2692            let mut db: C = open_db_clone(context.child("init"), partition.clone()).await;
2693            let mut batch = db.new_batch();
2694            for i in 0..260 {
2695                batch = batch.write(TestKey::from_seed(i), Some(TestValue::from_seed(i + 1000)));
2696            }
2697            let merkleized = batch.merkleize(&db, None).await.unwrap();
2698            db.apply_batch(merkleized).await.unwrap();
2699            let speculative_root = db.root();
2700
2701            // Sync, close, and reopen to get the root recomputed from committed state.
2702            db.sync().await.unwrap();
2703            drop(db);
2704
2705            let db: C = open_db(context.child("reopen"), partition).await;
2706            assert_eq!(db.root(), speculative_root);
2707
2708            db.destroy().await.unwrap();
2709        });
2710    }
2711
2712    #[test_group("slow")]
2713    #[test_traced("INFO")]
2714    fn test_all_variants_speculative_root_matches_committed() {
2715        let executor = deterministic::Runner::default();
2716        executor.start(|_context| async move {
2717            for_all_variants!(simple: test_speculative_root_matches_committed);
2718        });
2719    }
2720
2721    /// MerkleizedBatch::get() at the current level reads overlay then base DB.
2722    #[test_traced("INFO")]
2723    fn test_current_batch_merkleized_get() {
2724        let executor = deterministic::Runner::default();
2725        executor.start(|context| async move {
2726            let ctx = context.child("db");
2727            let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
2728                ctx.child("storage"),
2729                variable_config::<OneCap>("mg", &ctx),
2730            )
2731            .await
2732            .unwrap();
2733
2734            let ka = key(0);
2735            let kb = key(1);
2736            let kc = key(2);
2737
2738            // Pre-populate A.
2739            {
2740                let mut batch = db.new_batch();
2741                batch = batch.write(ka, Some(val(0)));
2742                let merkleized = batch.merkleize(&db, None).await.unwrap();
2743                db.apply_batch(merkleized).await.unwrap();
2744            }
2745
2746            // Batch: update A, delete nothing, create B.
2747            let va2 = val(100);
2748            let vb = val(1);
2749            let mut batch = db.new_batch();
2750            batch = batch.write(ka, Some(va2));
2751            batch = batch.write(kb, Some(vb));
2752            let merkleized = batch.merkleize(&db, None).await.unwrap();
2753
2754            assert_eq!(merkleized.get(&ka, &db).await.unwrap(), Some(va2));
2755            assert_eq!(merkleized.get(&kb, &db).await.unwrap(), Some(vb));
2756            assert_eq!(merkleized.get(&kc, &db).await.unwrap(), None);
2757
2758            db.destroy().await.unwrap();
2759        });
2760    }
2761
2762    /// Batch chaining at the current level: parent -> merkleize -> child -> merkleize.
2763    /// Child's canonical root matches db.root() after apply.
2764    #[test_traced("INFO")]
2765    fn test_current_batch_chaining() {
2766        let executor = deterministic::Runner::default();
2767        executor.start(|context| async move {
2768            let ctx = context.child("db");
2769            let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
2770                ctx.child("storage"),
2771                variable_config::<OneCap>("ch", &ctx),
2772            )
2773            .await
2774            .unwrap();
2775
2776            // Parent batch writes keys 0..5.
2777            let mut parent = db.new_batch();
2778            for i in 0..5 {
2779                parent = parent.write(key(i), Some(val(i)));
2780            }
2781            let parent_m = parent.merkleize(&db, None).await.unwrap();
2782
2783            // Child batch writes keys 5..10 and overrides key 0.
2784            let mut child = parent_m.new_batch::<Sha256>();
2785            for i in 5..10 {
2786                child = child.write(key(i), Some(val(i)));
2787            }
2788            child = child.write(key(0), Some(val(999)));
2789            let child_m = child.merkleize(&db, None).await.unwrap();
2790
2791            let child_root = child_m.root();
2792
2793            // Child get reads through all layers.
2794            assert_eq!(child_m.get(&key(0), &db).await.unwrap(), Some(val(999)));
2795            assert_eq!(child_m.get(&key(3), &db).await.unwrap(), Some(val(3)));
2796            assert_eq!(child_m.get(&key(7), &db).await.unwrap(), Some(val(7)));
2797
2798            db.apply_batch(child_m).await.unwrap();
2799            assert_eq!(db.root(), child_root);
2800
2801            // Verify all keys are correct.
2802            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(999)));
2803            for i in 1..10 {
2804                assert_eq!(db.get(&key(i)).await.unwrap(), Some(val(i)));
2805            }
2806
2807            db.destroy().await.unwrap();
2808        });
2809    }
2810
2811    #[test_traced("INFO")]
2812    fn test_current_unordered_root_matches_between_pending_and_committed_paths() {
2813        let executor = deterministic::Runner::default();
2814        executor.start(|context| async move {
2815            let ctx = context.child("db");
2816            let mut db: UnorderedFixedDb =
2817                UnorderedFixedDb::init(ctx.child("storage"), fixed_config::<OneCap>("ucr", &ctx))
2818                    .await
2819                    .unwrap();
2820            let key_a = colliding_digest(0xAA, 1);
2821            let key_b = colliding_digest(0xAA, 0);
2822
2823            // Seed four colliding committed keys, then update only key_a.
2824            // The specific 4 / 1 / 0 shape is a concrete counterexample:
2825            // key_b remains outside the parent diff and is still resolved
2826            // through the committed snapshot in the child.
2827            let mut initial = db.new_batch();
2828            for i in 0..4 {
2829                initial = initial.write(colliding_digest(0xAA, i), Some(colliding_digest(0xBB, i)));
2830            }
2831            let merkleized = initial.merkleize(&db, None).await.unwrap();
2832            db.apply_batch(merkleized).await.unwrap();
2833            db.commit().await.unwrap();
2834
2835            // Update only key_a so the colliding sibling key_b remains outside
2836            // the parent diff and must still be resolved through the committed
2837            // snapshot in the child.
2838            let parent = db
2839                .new_batch()
2840                .write(key_a, Some(colliding_digest(0xCC, 1)))
2841                .merkleize(&db, None)
2842                .await
2843                .unwrap();
2844
2845            // Build the child while the parent is still pending, then rebuild
2846            // the same logical child after committing the parent and compare
2847            // both canonical and ops roots.
2848            let pending_child = parent
2849                .new_batch::<Sha256>()
2850                .write(key_a, Some(colliding_digest(0xDD, 1)))
2851                .write(key_b, Some(colliding_digest(0xDD, 0)))
2852                .merkleize(&db, None)
2853                .await
2854                .unwrap();
2855
2856            let pending_root = pending_child.root();
2857            let pending_ops_root = pending_child.ops_root();
2858
2859            db.apply_batch(parent).await.unwrap();
2860            db.commit().await.unwrap();
2861
2862            let committed_child = db
2863                .new_batch()
2864                .write(key_a, Some(colliding_digest(0xDD, 1)))
2865                .write(key_b, Some(colliding_digest(0xDD, 0)))
2866                .merkleize(&db, None)
2867                .await
2868                .unwrap();
2869
2870            assert_eq!(pending_root, committed_child.root());
2871            assert_eq!(pending_ops_root, committed_child.ops_root());
2872
2873            // Apply pending child onto the committed parent
2874            // and ensure the applied wrapper roots still match.
2875            db.apply_batch(pending_child).await.unwrap();
2876            assert_eq!(db.root(), committed_child.root());
2877            assert_eq!(db.ops_root(), committed_child.ops_root());
2878
2879            db.destroy().await.unwrap();
2880        });
2881    }
2882
2883    #[test_traced("INFO")]
2884    fn test_current_ordered_root_matches_between_pending_and_committed_paths() {
2885        let executor = deterministic::Runner::default();
2886        executor.start(|context| async move {
2887            let ctx = context.child("db");
2888            let mut db: OrderedFixedDb =
2889                OrderedFixedDb::init(ctx.child("storage"), fixed_config::<OneCap>("ocr", &ctx))
2890                    .await
2891                    .unwrap();
2892            let key_a = colliding_digest(0xAA, 1);
2893            let key_b = colliding_digest(0xAA, 0);
2894
2895            // Match the unordered counterexample shape on the ordered path so
2896            // both wrappers exercise the same collision pattern.
2897            let mut initial = db.new_batch();
2898            for i in 0..4 {
2899                initial = initial.write(colliding_digest(0xAA, i), Some(colliding_digest(0xBB, i)));
2900            }
2901            let merkleized = initial.merkleize(&db, None).await.unwrap();
2902            db.apply_batch(merkleized).await.unwrap();
2903            db.commit().await.unwrap();
2904
2905            // Update only key_a so the colliding sibling key_b remains outside
2906            // the parent diff and must still be resolved through the committed
2907            // snapshot in the child.
2908            let parent = db
2909                .new_batch()
2910                .write(key_a, Some(colliding_digest(0xCC, 1)))
2911                .merkleize(&db, None)
2912                .await
2913                .unwrap();
2914
2915            // Build the child while the parent is still pending, then rebuild
2916            // the same logical child after committing the parent.
2917            let pending_child = parent
2918                .new_batch::<Sha256>()
2919                .write(key_a, Some(colliding_digest(0xDD, 1)))
2920                .write(key_b, Some(colliding_digest(0xDD, 0)))
2921                .merkleize(&db, None)
2922                .await
2923                .unwrap();
2924
2925            let pending_root = pending_child.root();
2926            let pending_ops_root = pending_child.ops_root();
2927
2928            db.apply_batch(parent).await.unwrap();
2929            db.commit().await.unwrap();
2930
2931            let committed_child = db
2932                .new_batch()
2933                .write(key_a, Some(colliding_digest(0xDD, 1)))
2934                .write(key_b, Some(colliding_digest(0xDD, 0)))
2935                .merkleize(&db, None)
2936                .await
2937                .unwrap();
2938
2939            assert_eq!(pending_root, committed_child.root());
2940            assert_eq!(pending_ops_root, committed_child.ops_root());
2941
2942            // Apply pending child onto the committed parent
2943            // and compare the applied wrapper roots with the committed-path child roots.
2944            db.apply_batch(pending_child).await.unwrap();
2945            assert_eq!(db.root(), committed_child.root());
2946            assert_eq!(db.ops_root(), committed_child.ops_root());
2947
2948            db.destroy().await.unwrap();
2949        });
2950    }
2951
2952    /// Applying without `commit()` publishes in memory but is not recovered after reopen.
2953    #[test_traced("INFO")]
2954    fn test_current_batch_apply_requires_commit_for_recovery() {
2955        let executor = deterministic::Runner::default();
2956        executor.start(|context| async move {
2957            let partition = "apply_requires_commit";
2958            let ctx = context.child("db");
2959            let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
2960                ctx.child("storage"),
2961                variable_config::<OneCap>(partition, &ctx),
2962            )
2963            .await
2964            .unwrap();
2965
2966            let committed_root = db.root();
2967
2968            let merkleized = db
2969                .new_batch()
2970                .write(key(0), Some(val(0)))
2971                .merkleize(&db, None)
2972                .await
2973                .unwrap();
2974            db.apply_batch(merkleized).await.unwrap();
2975
2976            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2977
2978            drop(db);
2979
2980            let reopened: UnorderedVariableDb = UnorderedVariableDb::init(
2981                context.child("reopen"),
2982                variable_config::<OneCap>(partition, &context),
2983            )
2984            .await
2985            .unwrap();
2986            assert_eq!(reopened.root(), committed_root);
2987            assert_eq!(reopened.get(&key(0)).await.unwrap(), None);
2988
2989            reopened.destroy().await.unwrap();
2990        });
2991    }
2992
2993    /// One-stage pipelining lets the next batch be built while the prior batch commits.
2994    #[test_traced("INFO")]
2995    fn test_current_batch_single_stage_pipeline() {
2996        let executor = deterministic::Runner::default();
2997        executor.start(|context| async move {
2998            let ctx = context.child("db");
2999            let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
3000                ctx.child("storage"),
3001                variable_config::<OneCap>("pipe", &ctx),
3002            )
3003            .await
3004            .unwrap();
3005
3006            let mut batch = db.new_batch();
3007            batch = batch.write(key(0), Some(val(0)));
3008            let parent_merkleized = batch.merkleize(&db, None).await.unwrap();
3009            db.apply_batch(parent_merkleized).await.unwrap();
3010
3011            let (child_merkleized, commit_result) = futures::join!(
3012                async {
3013                    assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
3014                    let mut child = db.new_batch();
3015                    child = child.write(key(1), Some(val(1)));
3016                    child.merkleize(&db, None).await.unwrap()
3017                },
3018                db.commit(),
3019            );
3020            commit_result.unwrap();
3021
3022            db.apply_batch(child_merkleized).await.unwrap();
3023            db.commit().await.unwrap();
3024
3025            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
3026            assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
3027
3028            db.destroy().await.unwrap();
3029        });
3030    }
3031
3032    /// Apply parent then child sequentially. Both keys
3033    /// present and canonical root matches a fresh single-batch build.
3034    #[test_traced("INFO")]
3035    fn test_current_sequential_commit() {
3036        let executor = deterministic::Runner::default();
3037        executor.start(|context| async move {
3038            let ctx = context.child("db");
3039            let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
3040                ctx.child("storage"),
3041                variable_config::<OneCap>("ff", &ctx),
3042            )
3043            .await
3044            .unwrap();
3045
3046            // Parent batch: insert key(0).
3047            let parent_m = db
3048                .new_batch()
3049                .write(key(0), Some(val(0)))
3050                .merkleize(&db, None)
3051                .await
3052                .unwrap();
3053
3054            // Child batch on parent: insert key(1).
3055            let child_m = parent_m
3056                .new_batch::<Sha256>()
3057                .write(key(1), Some(val(1)))
3058                .merkleize(&db, None)
3059                .await
3060                .unwrap();
3061
3062            db.apply_batch(parent_m).await.unwrap();
3063            db.apply_batch(child_m).await.unwrap();
3064
3065            // Both keys present.
3066            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
3067            assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
3068
3069            // Build the same result via two sequential plain batches in a fresh DB
3070            // and verify the roots match.
3071            let ctx2 = context.child("db").with_attribute("index", 2);
3072            let mut db2: UnorderedVariableDb = UnorderedVariableDb::init(
3073                ctx2.child("db"),
3074                variable_config::<OneCap>("ff2", &ctx2),
3075            )
3076            .await
3077            .unwrap();
3078            let m1 = db2
3079                .new_batch()
3080                .write(key(0), Some(val(0)))
3081                .merkleize(&db2, None)
3082                .await
3083                .unwrap();
3084            db2.apply_batch(m1).await.unwrap();
3085            let m2 = db2
3086                .new_batch()
3087                .write(key(1), Some(val(1)))
3088                .merkleize(&db2, None)
3089                .await
3090                .unwrap();
3091            db2.apply_batch(m2).await.unwrap();
3092
3093            assert_eq!(db.root(), db2.root());
3094
3095            db.destroy().await.unwrap();
3096            db2.destroy().await.unwrap();
3097        });
3098    }
3099
3100    /// to_batch() produces a MerkleizedBatch that can be used to chain further
3101    /// batches via new_batch().
3102    #[test_traced("INFO")]
3103    fn test_current_to_batch_then_chain() {
3104        let executor = deterministic::Runner::default();
3105        executor.start(|context| async move {
3106            let ctx = context.child("db");
3107            let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
3108                ctx.child("storage"),
3109                variable_config::<OneCap>("tb", &ctx),
3110            )
3111            .await
3112            .unwrap();
3113
3114            // Apply an initial batch.
3115            let m = db
3116                .new_batch()
3117                .write(key(0), Some(val(0)))
3118                .merkleize(&db, None)
3119                .await
3120                .unwrap();
3121            db.apply_batch(m).await.unwrap();
3122
3123            // Get an owned batch from the committed state.
3124            let snapshot = db.to_batch();
3125            assert_eq!(snapshot.root(), db.root());
3126
3127            // Chain a child batch from the snapshot.
3128            let child = snapshot
3129                .new_batch::<Sha256>()
3130                .write(key(1), Some(val(1)))
3131                .merkleize(&db, None)
3132                .await
3133                .unwrap();
3134
3135            // The child's root should differ from the snapshot.
3136            assert_ne!(child.root(), snapshot.root());
3137
3138            // Apply child.
3139            db.apply_batch(child).await.unwrap();
3140            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
3141            assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
3142
3143            db.destroy().await.unwrap();
3144        });
3145    }
3146
3147    /// A live batch (built off the committed state) must remain readable and applicable after
3148    /// [`Db::prune`] advances the shared bitmap's pruning boundary. Pruning only discards
3149    /// chunks for inactive bits (below the inactivity floor); the batch's own chain and
3150    /// overlays operate at or above the floor, so no reads should land in the pruned region.
3151    #[test_traced("INFO")]
3152    fn test_current_live_batch_safe_across_prune() {
3153        let executor = deterministic::Runner::default();
3154        executor.start(|context| async move {
3155            let ctx = context.child("db");
3156            let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
3157                ctx.child("storage"),
3158                variable_config::<OneCap>("prune-live", &ctx),
3159            )
3160            .await
3161            .unwrap();
3162
3163            // Seed enough ops to span multiple bitmap chunks.
3164            let mut seed = db.new_batch();
3165            for i in 0u64..300 {
3166                seed = seed.write(key(i), Some(val(i)));
3167            }
3168            let seed_m = seed.merkleize(&db, None).await.unwrap();
3169            db.apply_batch(seed_m).await.unwrap();
3170            db.commit().await.unwrap();
3171
3172            // Overwrite keys 0..250 so the inactivity floor advances past chunk 0.
3173            let mut p = db.new_batch();
3174            for i in 0u64..250 {
3175                p = p.write(key(i), Some(val(i + 10_000)));
3176            }
3177            let p_m = p.merkleize(&db, None).await.unwrap();
3178            db.apply_batch(Arc::clone(&p_m)).await.unwrap();
3179            db.commit().await.unwrap();
3180
3181            // Build c off p_m; c is live and shares the committed bitmap via its chain.
3182            let c = p_m
3183                .new_batch::<Sha256>()
3184                .write(key(250), Some(val(99_999)))
3185                .merkleize(&db, None)
3186                .await
3187                .unwrap();
3188
3189            // Prune with c still alive. This advances pruned_chunks on the shared bitmap.
3190            db.prune(db.sync_boundary()).await.unwrap();
3191
3192            // Sanity: c's pending write is still readable via the any-layer diff chain.
3193            assert_eq!(c.get(&key(250), &db).await.unwrap(), Some(val(99_999)));
3194
3195            // The actual prune-interaction test: apply c after prune. apply_batch skips overlay
3196            // chunks below the current pruned boundary.
3197            db.apply_batch(c).await.unwrap();
3198            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(10_000)));
3199            assert_eq!(db.get(&key(250)).await.unwrap(), Some(val(99_999)));
3200
3201            db.destroy().await.unwrap();
3202        });
3203    }
3204
3205    /// Regression: extending a batch after it has been applied (building a child off the
3206    /// just-applied parent) must produce correct data.
3207    ///
3208    /// With the shared-bitmap `RwLock` design, applying `A` mutates the committed bitmap in
3209    /// place; reads through `A`'s chain after apply fall through to the committed bitmap (which
3210    /// now reflects `A`'s state), and `A`'s own overlays applied on top are consistent with
3211    /// committed. So `A.new_batch()` followed by merkleize + apply is the right-by-construction
3212    /// case, and this test locks it in.
3213    #[test_traced("INFO")]
3214    fn test_current_extend_applied_batch() {
3215        let executor = deterministic::Runner::default();
3216        executor.start(|context| async move {
3217            let ctx = context.child("db");
3218            let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
3219                ctx.child("storage"),
3220                variable_config::<OneCap>("xtend", &ctx),
3221            )
3222            .await
3223            .unwrap();
3224
3225            // Apply A, retaining our Arc so we can extend it post-apply.
3226            let a = db
3227                .new_batch()
3228                .write(key(0), Some(val(0)))
3229                .merkleize(&db, None)
3230                .await
3231                .unwrap();
3232            db.apply_batch(Arc::clone(&a)).await.unwrap();
3233
3234            // Build B off A after A was applied. B's chain walks through A's layer and falls
3235            // through to the committed bitmap (now post-A). B's merkleize must read consistent
3236            // state from both sources.
3237            let b = a
3238                .new_batch::<Sha256>()
3239                .write(key(1), Some(val(1)))
3240                .merkleize(&db, None)
3241                .await
3242                .unwrap();
3243            db.apply_batch(b).await.unwrap();
3244
3245            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
3246            assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
3247
3248            // Extend once more to lock in multi-generation behavior.
3249            let c = db
3250                .new_batch()
3251                .write(key(2), Some(val(2)))
3252                .merkleize(&db, None)
3253                .await
3254                .unwrap();
3255            db.apply_batch(c).await.unwrap();
3256
3257            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
3258            assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
3259            assert_eq!(db.get(&key(2)).await.unwrap(), Some(val(2)));
3260
3261            db.destroy().await.unwrap();
3262        });
3263    }
3264
3265    /// Build a child batch from a still-live parent whose apply was followed by a prune, then
3266    /// merkleize and apply the child. The parent's `BitmapBatch` chain terminates in the shared
3267    /// committed bitmap, and `prune` mutates that bitmap's pruning boundary in place. When the
3268    /// child is constructed via `parent.new_batch()`, the internal `trim_committed` call must
3269    /// observe the advanced boundary and produce a correct child chain; merkleize and apply must
3270    /// then produce correct state for keys at and beyond the advanced floor.
3271    #[test_traced("INFO")]
3272    fn test_current_live_batch_child_after_prune() {
3273        let executor = deterministic::Runner::default();
3274        executor.start(|context| async move {
3275            let ctx = context.child("db");
3276            let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
3277                ctx.child("storage"),
3278                variable_config::<OneCap>("child-after-prune", &ctx),
3279            )
3280            .await
3281            .unwrap();
3282
3283            // Seed enough ops to span multiple bitmap chunks.
3284            let mut seed = db.new_batch();
3285            for i in 0u64..300 {
3286                seed = seed.write(key(i), Some(val(i)));
3287            }
3288            let seed_m = seed.merkleize(&db, None).await.unwrap();
3289            db.apply_batch(seed_m).await.unwrap();
3290            db.commit().await.unwrap();
3291
3292            // Overwrite keys 0..250 so the inactivity floor advances past chunk 0.
3293            let mut a_batch = db.new_batch();
3294            for i in 0u64..250 {
3295                a_batch = a_batch.write(key(i), Some(val(i + 10_000)));
3296            }
3297            let a = a_batch.merkleize(&db, None).await.unwrap();
3298            db.apply_batch(Arc::clone(&a)).await.unwrap();
3299            db.commit().await.unwrap();
3300
3301            // Prune while `a` is still live. Mutates the shared bitmap's pruning boundary in place.
3302            db.prune(db.sync_boundary()).await.unwrap();
3303
3304            // Extend `a` into `b` AFTER the prune. Building `b` off `a` triggers
3305            // `trim_committed` on `a`'s chain, which must correctly see the advanced pruning
3306            // boundary on the shared bitmap.
3307            let b = a
3308                .new_batch::<Sha256>()
3309                .write(key(300), Some(val(300)))
3310                .merkleize(&db, None)
3311                .await
3312                .unwrap();
3313
3314            db.apply_batch(b).await.unwrap();
3315            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(10_000)));
3316            assert_eq!(db.get(&key(249)).await.unwrap(), Some(val(10_249)));
3317            assert_eq!(db.get(&key(300)).await.unwrap(), Some(val(300)));
3318
3319            db.destroy().await.unwrap();
3320        });
3321    }
3322
3323    /// Regression: applying a batch after its ancestor Arc is dropped (without
3324    /// committing) must still apply the ancestor's bitmap pushes/clears and
3325    /// snapshot diffs.
3326    #[test_traced("WARN")]
3327    fn test_current_apply_after_ancestor_dropped() {
3328        let executor = deterministic::Runner::default();
3329        executor.start(|context| async move {
3330            let ctx = context.child("db");
3331            let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
3332                ctx.child("storage"),
3333                variable_config::<OneCap>("adrop", &ctx),
3334            )
3335            .await
3336            .unwrap();
3337
3338            // Chain: DB <- A <- B <- C
3339            let mut a = db.new_batch();
3340            for i in 0..3 {
3341                a = a.write(key(i), Some(val(i)));
3342            }
3343            let a_m = a.merkleize(&db, None).await.unwrap();
3344
3345            let mut b = a_m.new_batch::<Sha256>();
3346            for i in 3..6 {
3347                b = b.write(key(i), Some(val(i)));
3348            }
3349            let b_m = b.merkleize(&db, None).await.unwrap();
3350
3351            let mut c = b_m.new_batch::<Sha256>();
3352            for i in 6..9 {
3353                c = c.write(key(i), Some(val(i)));
3354            }
3355            let c_m = c.merkleize(&db, None).await.unwrap();
3356
3357            // Drop A and B without committing. Their Weak refs in C are now dead.
3358            drop(a_m);
3359            drop(b_m);
3360
3361            // Apply only the tip. This is !skip_ancestors (DB hasn't changed).
3362            db.apply_batch(c_m).await.unwrap();
3363            db.commit().await.unwrap();
3364
3365            // All nine keys must be accessible.
3366            for i in 0..9 {
3367                assert_eq!(
3368                    db.get(&key(i)).await.unwrap(),
3369                    Some(val(i)),
3370                    "key({i}) missing after apply_batch with dropped ancestors"
3371                );
3372            }
3373
3374            db.destroy().await.unwrap();
3375        });
3376    }
3377
3378    /// Regression: applying a 3-deep chain as a single batch must leave the
3379    /// bitmap in the same state as applying the same operations sequentially.
3380    /// This fails if ancestor bitmap pushes are concatenated in the wrong order
3381    /// (tip-to-root instead of root-to-tip), because Delete operations produce
3382    /// false bitmap bits, and wrong ordering puts the false at the wrong
3383    /// position. We detect this by building a NEW batch on top of the
3384    /// (possibly corrupted) bitmap and comparing its root against the
3385    /// sequential path.
3386    #[test_traced("WARN")]
3387    fn test_current_chain_bitmap_order_matches_sequential() {
3388        let executor = deterministic::Runner::default();
3389        executor.start(|context| async move {
3390            // -- Path 1: build a 3-deep chain and apply the tip directly. --
3391            let ctx1 = context.child("db").with_attribute("index", 1);
3392            let mut db1: UnorderedVariableDb = UnorderedVariableDb::init(
3393                ctx1.child("db"),
3394                variable_config::<OneCap>("ord1", &ctx1),
3395            )
3396            .await
3397            .unwrap();
3398
3399            // Seed some committed data so there's a base bitmap to clear.
3400            commit_writes_with_metadata(
3401                &mut db1,
3402                [(key(10), Some(val(10))), (key(11), Some(val(11)))],
3403                None,
3404            )
3405            .await;
3406
3407            // Chain: DB <- A <- B <- C
3408            // A: updates key(10) and DELETES key(11). The delete produces a
3409            //    false bitmap bit. If A's bits end up at B's positions (wrong
3410            //    order), the false bit lands at the wrong journal location.
3411            // B: updates key(12) and key(13). All true bits.
3412            // C: updates key(14). All true bits.
3413            let a = db1
3414                .new_batch()
3415                .write(key(10), Some(val(100)))
3416                .write(key(11), None) // DELETE
3417                .merkleize(&db1, None)
3418                .await
3419                .unwrap();
3420
3421            let b = a
3422                .new_batch::<Sha256>()
3423                .write(key(12), Some(val(120)))
3424                .write(key(13), Some(val(130)))
3425                .merkleize(&db1, None)
3426                .await
3427                .unwrap();
3428
3429            let c = b
3430                .new_batch::<Sha256>()
3431                .write(key(14), Some(val(140)))
3432                .merkleize(&db1, None)
3433                .await
3434                .unwrap();
3435
3436            db1.apply_batch(c).await.unwrap();
3437            db1.commit().await.unwrap();
3438
3439            // Build one more batch on top to exercise the bitmap state.
3440            let d1 = db1
3441                .new_batch()
3442                .write(key(20), Some(val(200)))
3443                .merkleize(&db1, None)
3444                .await
3445                .unwrap();
3446            let chain_then_d_root = d1.root();
3447
3448            // -- Path 2: apply the same operations sequentially. --
3449            let ctx2 = context.child("db").with_attribute("index", 2);
3450            let mut db2: UnorderedVariableDb = UnorderedVariableDb::init(
3451                ctx2.child("db"),
3452                variable_config::<OneCap>("ord2", &ctx2),
3453            )
3454            .await
3455            .unwrap();
3456
3457            commit_writes_with_metadata(
3458                &mut db2,
3459                [(key(10), Some(val(10))), (key(11), Some(val(11)))],
3460                None,
3461            )
3462            .await;
3463
3464            let a2 = db2
3465                .new_batch()
3466                .write(key(10), Some(val(100)))
3467                .write(key(11), None)
3468                .merkleize(&db2, None)
3469                .await
3470                .unwrap();
3471            db2.apply_batch(a2).await.unwrap();
3472            db2.commit().await.unwrap();
3473
3474            let b2 = db2
3475                .new_batch()
3476                .write(key(12), Some(val(120)))
3477                .write(key(13), Some(val(130)))
3478                .merkleize(&db2, None)
3479                .await
3480                .unwrap();
3481            db2.apply_batch(b2).await.unwrap();
3482            db2.commit().await.unwrap();
3483
3484            let c2 = db2
3485                .new_batch()
3486                .write(key(14), Some(val(140)))
3487                .merkleize(&db2, None)
3488                .await
3489                .unwrap();
3490            db2.apply_batch(c2).await.unwrap();
3491            db2.commit().await.unwrap();
3492
3493            let d2 = db2
3494                .new_batch()
3495                .write(key(20), Some(val(200)))
3496                .merkleize(&db2, None)
3497                .await
3498                .unwrap();
3499            let sequential_then_d_root = d2.root();
3500
3501            assert_eq!(
3502                chain_then_d_root, sequential_then_d_root,
3503                "batch D's root on top of chain-applied state must match sequential state"
3504            );
3505
3506            db1.destroy().await.unwrap();
3507            db2.destroy().await.unwrap();
3508        });
3509    }
3510
3511    /// Regression: C's diff entry has a stale `base_old_loc` (255) pointing into a chunk that
3512    /// was pruned after parent P was committed. `committed_locs` precedence in
3513    /// `any::Db::apply_batch` must override the stale value with P's rewrite location, so the
3514    /// `set_bit(false)` call targets P's (post-floor-raise) loc, not the pruned chunk.
3515    ///
3516    /// With N=32, CHUNK_SIZE_BITS=256. Seed places key(0) at loc 255 (end of chunk 0). P
3517    /// overwrites keys 1..254; P's floor-raise moves key(0) from 255 to a fresh loc above 255.
3518    /// C is built from P and writes key(0) again. After committing P and pruning chunk 0, C's
3519    /// pre-merkleize `base_old_loc=255` is no longer the right clear target — `committed_locs`
3520    /// substitutes P's rewrite loc instead. If that precedence path broke, apply would panic
3521    /// (`set_bit` on a pruned bit).
3522    #[test_traced("WARN")]
3523    fn test_current_stale_bitmap_clears_after_prune() {
3524        let executor = deterministic::Runner::default();
3525        executor.start(|context| async move {
3526            let ctx = context.child("db");
3527            let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
3528                ctx.child("storage"),
3529                variable_config::<OneCap>("stale-clears", &ctx),
3530            )
3531            .await
3532            .unwrap();
3533
3534            // Seed: 255 keys in one batch. key(0) lands at loc 255 (chunk 0).
3535            let mut seed = db.new_batch();
3536            for i in 0u64..255 {
3537                seed = seed.write(key(i), Some(val(i)));
3538            }
3539            let seed_m = seed.merkleize(&db, None).await.unwrap();
3540            db.apply_batch(seed_m).await.unwrap();
3541            db.commit().await.unwrap();
3542
3543            // P: overwrite keys 1..254. Does NOT touch key(0), but P's floor
3544            // raise moves key(0) from 255, advancing the floor past chunk 0.
3545            let mut p = db.new_batch();
3546            for i in 1u64..255 {
3547                p = p.write(key(i), Some(val(i + 10000)));
3548            }
3549            let p_m = p.merkleize(&db, None).await.unwrap();
3550
3551            // C: built from P. Writes key(0). base_old_loc = 255 (chunk 0).
3552            let c_m = p_m
3553                .new_batch::<Sha256>()
3554                .write(key(0), Some(val(9999)))
3555                .merkleize(&db, None)
3556                .await
3557                .unwrap();
3558
3559            // Commit P, prune chunk 0, then apply C.
3560            db.apply_batch(p_m).await.unwrap();
3561            db.commit().await.unwrap();
3562
3563            let floor = *db.inactivity_floor_loc();
3564            assert!(floor >= 256, "floor must be past chunk 0: floor={floor}",);
3565
3566            db.prune(db.sync_boundary()).await.unwrap();
3567            db.apply_batch(c_m).await.unwrap();
3568
3569            db.destroy().await.unwrap();
3570        });
3571    }
3572
3573    /// Apply C (grandchild of A) after only A is committed. B's data (any-layer
3574    /// snapshot diff + current-layer bitmap) must still be applied.
3575    #[test_traced("INFO")]
3576    fn test_current_partial_ancestor_commit() {
3577        let executor = deterministic::Runner::default();
3578        executor.start(|context| async move {
3579            let ctx = context.child("db");
3580            let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
3581                ctx.child("storage"),
3582                variable_config::<OneCap>("pac", &ctx),
3583            )
3584            .await
3585            .unwrap();
3586
3587            let a = db
3588                .new_batch()
3589                .write(key(0), Some(val(0)))
3590                .merkleize(&db, None)
3591                .await
3592                .unwrap();
3593            let b = a
3594                .new_batch::<Sha256>()
3595                .write(key(1), Some(val(1)))
3596                .merkleize(&db, None)
3597                .await
3598                .unwrap();
3599            let c = b
3600                .new_batch::<Sha256>()
3601                .write(key(2), Some(val(2)))
3602                .merkleize(&db, None)
3603                .await
3604                .unwrap();
3605
3606            let expected_root = c.root();
3607
3608            db.apply_batch(a).await.unwrap();
3609            db.apply_batch(c).await.unwrap();
3610
3611            assert_eq!(db.root(), expected_root);
3612            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
3613            assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
3614            assert_eq!(db.get(&key(2)).await.unwrap(), Some(val(2)));
3615
3616            db.destroy().await.unwrap();
3617        });
3618    }
3619
3620    /// Regression: bitmap ancestor skip logic must correctly pair each ancestor's
3621    /// bitmap data with its batch_end. Requires a 3-ancestor chain (A->B->C->D)
3622    /// to expose ordering bugs.
3623    #[test_traced("INFO")]
3624    fn test_current_partial_ancestor_bitmap_ordering() {
3625        let executor = deterministic::Runner::default();
3626        executor.start(|context| async move {
3627            let ctx = context.child("db");
3628            let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
3629                ctx.child("storage"),
3630                variable_config::<OneCap>("bmo", &ctx),
3631            )
3632            .await
3633            .unwrap();
3634
3635            // Build A -> B -> C -> D. Each writes a distinct key.
3636            let a = db
3637                .new_batch()
3638                .write(key(0), Some(val(0)))
3639                .merkleize(&db, None)
3640                .await
3641                .unwrap();
3642            let b = a
3643                .new_batch::<Sha256>()
3644                .write(key(1), Some(val(1)))
3645                .merkleize(&db, None)
3646                .await
3647                .unwrap();
3648            let c = b
3649                .new_batch::<Sha256>()
3650                .write(key(2), Some(val(2)))
3651                .merkleize(&db, None)
3652                .await
3653                .unwrap();
3654            let d = c
3655                .new_batch::<Sha256>()
3656                .write(key(3), Some(val(3)))
3657                .merkleize(&db, None)
3658                .await
3659                .unwrap();
3660
3661            // Apply A only, then apply D (B and C uncommitted).
3662            // D has 3 ancestors: [C, B, A] (parent-first) with batch_ends [C.total, B.total, A.total].
3663            // Bitmap ancestors are also parent-first: [C, B, A].
3664            db.apply_batch(a).await.unwrap();
3665            db.apply_batch(d.clone()).await.unwrap();
3666
3667            // Build a new batch E on top of the current state. If the bitmap was
3668            // corrupted by the ordering bug (A's pushes duplicated or B/C's pushes
3669            // missing), merkleize will compute a different root than a reference
3670            // that applied all ancestors sequentially.
3671            let e = db
3672                .new_batch()
3673                .write(key(4), Some(val(4)))
3674                .merkleize(&db, None)
3675                .await
3676                .unwrap();
3677            db.apply_batch(e).await.unwrap();
3678
3679            // Reference: apply all five sequentially.
3680            let ref_ctx = context.child("ref");
3681            let mut ref_db: UnorderedVariableDb = UnorderedVariableDb::init(
3682                ref_ctx.child("db"),
3683                variable_config::<OneCap>("bmo_ref", &ref_ctx),
3684            )
3685            .await
3686            .unwrap();
3687            for i in 0..5 {
3688                let batch = ref_db
3689                    .new_batch()
3690                    .write(key(i), Some(val(i)))
3691                    .merkleize(&ref_db, None)
3692                    .await
3693                    .unwrap();
3694                ref_db.apply_batch(batch).await.unwrap();
3695            }
3696
3697            assert_eq!(
3698                db.root(),
3699                ref_db.root(),
3700                "root mismatch: bitmap ordering bug"
3701            );
3702
3703            db.destroy().await.unwrap();
3704            ref_db.destroy().await.unwrap();
3705        });
3706    }
3707
3708    /// Regression: the bitmap chunks produced by the speculative `BitmapBatch` chain during
3709    /// merkleize must equal the bytes that `any::Db::apply_batch` writes via diff-driven
3710    /// updates. `current::Db::apply_batch` relies on this equivalence to install the precomputed
3711    /// `batch.grafted` against the now-current bitmap.
3712    ///
3713    /// The workload spans multiple bitmap chunks and exercises:
3714    /// - parent/child same-key overwrite (`committed_locs` precedence path),
3715    /// - parent-create then child-delete (uncommitted-ancestor precedence),
3716    /// - mixed deletes and overwrites in different chunks (clear-bit + set-bit paths).
3717    #[test_traced("INFO")]
3718    fn test_current_apply_chunks_match_speculative_chunks() {
3719        const N: usize = 32;
3720        const CHUNK_SIZE_BITS: u64 = commonware_utils::bitmap::Prunable::<N>::CHUNK_SIZE_BITS;
3721        // Seed enough keys to cross at least one chunk boundary. Each batch also produces a
3722        // CommitFloor op, so the bitmap grows past the user-visible key count.
3723        const SEED_KEYS: u64 = CHUNK_SIZE_BITS + 50;
3724
3725        let executor = deterministic::Runner::default();
3726        executor.start(|context| async move {
3727            let ctx = context.child("db");
3728            let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
3729                ctx.child("storage"),
3730                variable_config::<OneCap>("spec_eq", &ctx),
3731            )
3732            .await
3733            .unwrap();
3734
3735            // Seed all keys in one committed batch.
3736            let seed = (0..SEED_KEYS).fold(db.new_batch(), |b, i| b.write(key(i), Some(val(i))));
3737            let seed = seed.merkleize(&db, None).await.unwrap();
3738            db.apply_batch(seed).await.unwrap();
3739            db.commit().await.unwrap();
3740
3741            // Setup sanity: the committed bitmap spans at least two chunks.
3742            assert!(
3743                Readable::<N>::len(db.any.bitmap.as_ref()) > CHUNK_SIZE_BITS,
3744                "setup must cross a chunk boundary",
3745            );
3746
3747            // Parent (uncommitted): overwrites + delete + creates spread across the bitmap.
3748            let parent = db
3749                .new_batch()
3750                .write(key(10), Some(val(110))) // overwrite (low chunk)
3751                .write(key(50), None) // delete (low chunk)
3752                .write(key(CHUNK_SIZE_BITS + 5), Some(val(120))) // overwrite (high chunk)
3753                .write(key(SEED_KEYS), Some(val(130))) // create new key
3754                .write(key(SEED_KEYS + 1), Some(val(131))) // create new key
3755                .merkleize(&db, None)
3756                .await
3757                .unwrap();
3758
3759            // Child (uncommitted, descendant of parent):
3760            //   - same-key overwrite of parent's key(10)        -> committed_locs precedence
3761            //   - delete of parent's just-created key(SEED_KEYS) -> uncommitted-create-child-delete
3762            //   - additional delete + overwrite in mixed chunks -> set-bit + clear-bit coverage
3763            let child = parent
3764                .new_batch::<Sha256>()
3765                .write(key(10), Some(val(210)))
3766                .write(key(SEED_KEYS), None)
3767                .write(key(75), None)
3768                .write(key(CHUNK_SIZE_BITS + 30), Some(val(220)))
3769                .merkleize(&db, None)
3770                .await
3771                .unwrap();
3772
3773            // Snapshot every chunk in the speculative `BitmapBatch` chain (read through child).
3774            let speculative_chunks: Vec<[u8; N]> = {
3775                let len = Readable::<N>::len(&child.bitmap);
3776                let chunk_count = len.div_ceil(CHUNK_SIZE_BITS) as usize;
3777                (0..chunk_count)
3778                    .map(|idx| Readable::<N>::get_chunk(&child.bitmap, idx))
3779                    .collect()
3780            };
3781            // Setup sanity: speculative state spans at least two chunks.
3782            assert!(speculative_chunks.len() >= 2);
3783
3784            // Apply child (commits parent + child) and re-read every chunk from the committed
3785            // bitmap. The two views must be byte-identical; otherwise the precomputed
3786            // `batch.canonical_root` is no longer valid against the post-apply state.
3787            db.apply_batch(child).await.unwrap();
3788            let committed_chunks: Vec<[u8; N]> = {
3789                let len = Readable::<N>::len(db.any.bitmap.as_ref());
3790                let chunk_count = len.div_ceil(CHUNK_SIZE_BITS) as usize;
3791                (0..chunk_count)
3792                    .map(|idx| Readable::<N>::get_chunk(db.any.bitmap.as_ref(), idx))
3793                    .collect()
3794            };
3795
3796            assert_eq!(
3797                speculative_chunks, committed_chunks,
3798                "speculative chunks must equal post-apply committed chunks across all chunks",
3799            );
3800
3801            db.destroy().await.unwrap();
3802        });
3803    }
3804
3805    /// Regression: `ops_historical_proof` must verify with QMDB's ops-tree hasher configuration.
3806    #[test_traced("INFO")]
3807    fn test_current_mmb_ops_historical_proof_verifies_with_backward_bagging() {
3808        use crate::{merkle::hasher::Standard, qmdb::verify_proof};
3809        use commonware_utils::NZU64;
3810
3811        let executor = deterministic::Runner::default();
3812        executor.start(|context| async move {
3813            let ctx = context.child("db");
3814            let mut db: UnorderedFixedMmbDb = UnorderedFixedMmbDb::init(
3815                ctx.child("storage"),
3816                fixed_config::<OneCap>("mmb-ops-proof", &ctx),
3817            )
3818            .await
3819            .unwrap();
3820
3821            // Apply a batch and commit so an ops historical proof exists.
3822            let writes: Vec<(Digest, Option<Digest>)> =
3823                (0u64..16).map(|i| (key(i), Some(val(i)))).collect();
3824            commit_writes(&mut db, writes).await.unwrap();
3825
3826            let ops_root = db.ops_root();
3827            let historical_size = db.bounds().await.end;
3828            let (proof, ops) = db
3829                .ops_historical_proof(historical_size, Location::new(0), NZU64!(32))
3830                .await
3831                .unwrap();
3832
3833            // Verifies under the QMDB ops-tree hasher configuration.
3834            let hasher = qmdb::hasher::<Sha256>();
3835            assert!(verify_proof(
3836                &hasher,
3837                &proof,
3838                Location::new(0),
3839                &ops,
3840                &ops_root
3841            ));
3842
3843            // Sanity: a different Merkle hasher configuration must not accept this proof.
3844            let plain = Standard::<Sha256>::new(ForwardFold);
3845            assert!(!verify_proof(
3846                &plain,
3847                &proof,
3848                Location::new(0),
3849                &ops,
3850                &ops_root
3851            ));
3852
3853            db.destroy().await.unwrap();
3854        });
3855    }
3856}