Skip to main content

commonware_storage/qmdb/current/
mod.rs

1//! A _Current_ authenticated database provides succinct proofs of _any_ value ever associated with
2//! a key, and also whether that value is the _current_ value associated with it.
3//!
4//! # Examples
5//!
6//! See [`crate::qmdb::any`] for batch API examples (forking, sequential
7//! commit, staleness). The Current layer uses the same batch API.
8//!
9//! # Motivation
10//!
11//! An [crate::qmdb::any] ("Any") database can prove that a key had a particular value at some
12//! point, but it cannot prove that the value is still current -- some later operation may have
13//! updated or deleted it. A Current database adds exactly this capability by maintaining a bitmap
14//! that tracks which operations are _active_ (i.e. represent the current state of their key).
15//!
16//! To make this useful, a verifier needs both the operation and its activity status authenticated
17//! under a single root. We achieve this by _grafting_ bitmap chunks onto the operations tree.
18//!
19//! # Data structures
20//!
21//! A Current database ([db::Db]) wraps an Any database and adds:
22//!
23//! - **Status bitmap** ([BitMap]): One bit per operation in the log. Bit _i_ is 1 if
24//!   operation _i_ is active, 0 otherwise. The bitmap is divided into fixed-size chunks of `N`
25//!   bytes (i.e. `N * 8` bits each). `N` must be a power of two.
26//!
27//! - **Grafted tree**: An in-memory Merkle structure of digests at and above the
28//!   _grafting height_ in the ops tree. This is the core of how bitmap and ops state are combined
29//!   into a single authenticated structure (see below).
30//!
31//! - **Bitmap metadata** (`Metadata`): Persists the pruning boundary and "pinned" digests needed
32//!   to restore the grafted tree after pruning old bitmap chunks.
33//!
34//! # Grafting: combining the activity status bitmap and the ops tree
35//!
36//! ## The problem
37//!
38//! Naively authenticating the bitmap and ops tree as two independent Merkle structures would
39//! require two separate proofs per operation -- one for the operation's value, one for its
40//! activity status. This doubles proof sizes.
41//!
42//! ## The solution
43//!
44//! We combine ("graft") the two structures at a specific height in the ops tree called the
45//! _grafting height_. The grafting height `h = log2(N * 8)` is chosen so that each subtree of
46//! height `h` in the ops tree covers exactly one bitmap chunk's worth of operations.
47//!
48//! At the grafting height, instead of using the ops tree's own subtree root, we replace it with a
49//! _grafted leaf_ digest that incorporates both the bitmap chunk and the ops subtree root:
50//!
51//! ```text
52//! grafted_leaf = hash(bitmap_chunk || ops_subtree_root)   // non-zero chunk
53//! grafted_leaf = ops_subtree_root                         // all-zero chunk (identity)
54//! ```
55//!
56//! The all-zero identity means that for pruned regions (where every operation is inactive), the
57//! grafted tree is structurally identical to the ops tree at and above the grafting height.
58//!
59//! Above the grafting height, internal nodes use standard hashing over the grafted leaves.
60//! Below the grafting height, the ops tree is unchanged.
61//!
62//! ## Example
63//!
64//! Consider 8 operations with `N = 1` (8-bit chunks, so `h = log2(8) = 3`). But to illustrate
65//! the structure more clearly, let's use a smaller example: 8 operations with chunk size 4 bits
66//! (`h = 2`), yielding 2 complete bitmap chunks:
67//!
68//! ```text
69//! Ops tree positions (8 leaves):
70//!
71//!   Height
72//!     3              14                    <-- peak: digest commits to ops tree and bitmap chunks
73//!                  /    \
74//!                 /      \
75//!                /        \
76//!     2  [G]    6          13    [G]       <-- grafting height: grafted leaves
77//!             /   \      /    \
78//!     1      2     5    9     12           <-- below grafting height: pure ops tree nodes
79//!           / \   / \  / \   /  \
80//!     0    0   1 3   4 7  8 10  11
81//!          ^           ^
82//!          |           |
83//!      ops 0-3     ops 4-7
84//!      chunk 0     chunk 1
85//! ```
86//!
87//! Positions 6 and 13 are at the grafting height. Their digests are:
88//! - `pos 6: hash(chunk_0 || ops_subtree_root(pos 6))`
89//! - `pos 13: hash(chunk_1 || ops_subtree_root(pos 13))`
90//!
91//! Position 14 (above grafting height) is a standard internal node:
92//! - `pos 14: hash(14 || digest(pos 6) || digest(pos 13))`
93//!
94//! The grafted tree stores positions 6, 13, and 14. The ops tree stores everything below
95//! (positions 0-5 and 7-12). Together they form a single virtual Merkle structure whose root
96//! authenticates
97//! both the operations and their activity status.
98//!
99//! ## Proof generation and verification
100//!
101//! To prove that operation _i_ is active, we provide:
102//! 1. An inclusion proof for the operation's leaf, using the virtual (grafted) storage.
103//! 2. The bitmap chunk containing bit _i_.
104//!
105//! The verifier (see `grafting::Verifier`) walks the proof from leaf to root. Below the grafting
106//! height, it uses standard hashing. At the grafting height, it detects the boundary and
107//! reconstructs the grafted leaf from the chunk and the ops subtree root. For non-zero chunks
108//! the grafted leaf is `hash(chunk || ops_subtree_root)`; for all-zero chunks the grafted leaf
109//! is the ops subtree root itself (identity optimization -- see `grafting::Verifier::node`).
110//! Above the grafting height, it resumes standard hashing. If the reconstructed root
111//! matches the expected root and bit _i_ is set in the chunk, the operation is proven active.
112//!
113//! This is a single proof path, not two independent ones -- the bitmap chunk is embedded in the
114//! proof verification at the grafting boundary.
115//!
116//! ## Partial chunks
117//!
118//! Operations arrive continuously, so the last bitmap chunk is usually incomplete (fewer than
119//! `N * 8` bits). An incomplete chunk has no grafted leaf in the cache because there is no
120//! corresponding complete subtree in the ops tree. To still authenticate these bits, the partial
121//! chunk's digest and bit count are folded into the canonical root hash:
122//!
123//! ```text
124//! root = hash(ops_root || grafted_root || next_bit || hash(partial_chunk))
125//! ```
126//!
127//! where `next_bit` is the index of the next unset position in the partial chunk and
128//! `grafted_root` is the root of the grafted tree (which covers only complete chunks).
129//! When all chunks are complete, the partial chunk components are omitted.
130//!
131//! ## Incremental updates
132//!
133//! When operations are added or bits change (e.g. an operation becomes inactive during floor
134//! raising), only the affected chunks are marked "dirty". During `merkleize`, only dirty grafted
135//! leaves are recomputed and their ancestors are propagated upward through the cache. This avoids
136//! recomputing the entire grafted tree.
137//!
138//! ## Pruning
139//!
140//! Old bitmap chunks (below the inactivity floor) can be pruned. Before pruning, the grafted
141//! digest peaks covering the pruned region are persisted to metadata as "pinned nodes". On
142//! recovery, these pinned nodes are loaded and serve as opaque siblings during upward propagation,
143//! allowing the grafted tree to be rebuilt without the pruned chunks.
144//!
145//! # Root structure
146//!
147//! The canonical root of a `current` database is:
148//!
149//! ```text
150//! root = hash(ops_root || grafted_root [|| next_bit || hash(partial_chunk)])
151//! ```
152//!
153//! where `grafted_root` is the root of the grafted tree (covering only complete
154//! bitmap chunks), `next_bit` is the index of the next unset position in the partial chunk, and
155//! `hash(partial_chunk)` is the digest of the incomplete trailing chunk. The partial chunk
156//! components are only present when the last bitmap chunk is incomplete.
157//!
158//! This combines two (or three) components into a single hash:
159//!
160//! - **Ops root**: The root of the raw operations tree (the inner [crate::qmdb::any] database's
161//!   root). Used for state sync, where a client downloads operations and verifies each batch
162//!   against this root using standard Merkle range proofs.
163//!
164//! - **Grafted root**: The root of the grafted tree (overlaying bitmap chunks
165//!   with ops subtree roots). Used for proofs about operation values and their activity status.
166//!   See [RangeProof](proof::RangeProof) and [OperationProof](proof::OperationProof).
167//!
168//! - **Partial chunk** (optional): When operations arrive continuously, the last bitmap chunk is
169//!   usually incomplete. Its digest and bit count are folded into the canonical root hash.
170//!
171//! The canonical root is returned by [Db](db::Db)`::`[root()](db::Db::root).
172//! The ops root is returned by the `sync::Database` trait's `root()` method, since the sync engine
173//! verifies batches against the ops root, not the canonical root.
174//!
175//! For state sync, the sync engine targets the ops root and verifies each batch against it.
176//! After sync, the bitmap and grafted tree are reconstructed deterministically from the
177//! operations, and the canonical root is computed. Validating that the ops root is part of the
178//! canonical root is the caller's responsibility; the sync engine does not perform this check.
179
180use crate::{
181    index::Factory as IndexFactory,
182    journal::{
183        authenticated::Inner,
184        contiguous::{fixed::Config as FConfig, variable::Config as VConfig},
185    },
186    merkle::{self, Location},
187    mmr::{journaled::Config as MmrConfig, StandardHasher},
188    qmdb::{
189        any::{
190            self,
191            operation::{Operation, Update},
192            Config as AnyConfig,
193        },
194        operation::Committable,
195    },
196    translator::Translator,
197    Context,
198};
199use commonware_codec::{CodecShared, FixedSize};
200use commonware_cryptography::Hasher;
201use commonware_utils::{bitmap::Prunable as BitMap, sync::AsyncMutex};
202use std::sync::Arc;
203
204pub mod batch;
205pub mod db;
206mod grafting;
207
208pub mod ordered;
209pub mod proof;
210pub(crate) mod sync;
211pub mod unordered;
212
213/// Configuration for a `Current` authenticated db.
214#[derive(Clone)]
215pub struct Config<T: Translator, J> {
216    /// Configuration for the Merkle structure backing the authenticated journal.
217    pub merkle_config: MmrConfig,
218
219    /// Configuration for the operations log journal.
220    pub journal_config: J,
221
222    /// The name of the storage partition used for grafted tree metadata.
223    pub grafted_metadata_partition: String,
224
225    /// The translator used by the compressed index.
226    pub translator: T,
227}
228
229impl<T: Translator, J> From<Config<T, J>> for AnyConfig<T, J> {
230    fn from(cfg: Config<T, J>) -> Self {
231        Self {
232            merkle_config: cfg.merkle_config,
233            journal_config: cfg.journal_config,
234            translator: cfg.translator,
235        }
236    }
237}
238
239/// Configuration for a `Current` authenticated db with fixed-size values.
240pub type FixedConfig<T> = Config<T, FConfig>;
241
242/// Configuration for a `Current` authenticated db with variable-sized values.
243pub type VariableConfig<T, C> = Config<T, VConfig<C>>;
244
245/// Initialize a `Current` authenticated db from the given config.
246pub(super) async fn init<F, E, U, H, T, I, J, const N: usize>(
247    context: E,
248    config: Config<T, J::Config>,
249) -> Result<db::Db<F, E, J, I, H, U, N>, crate::qmdb::Error<F>>
250where
251    F: merkle::Graftable,
252    E: Context,
253    U: Update + Send + Sync,
254    H: Hasher,
255    T: Translator,
256    I: IndexFactory<T, Value = Location<F>>,
257    J: Inner<E, Item = Operation<F, U>>,
258    Operation<F, U>: Committable + CodecShared,
259{
260    // TODO: Re-evaluate assertion placement after `generic_const_exprs` is stable.
261    const {
262        // A compile-time assertion that the chunk size is some multiple of digest size. A multiple
263        // of 1 is optimal with respect to proof size, but a higher multiple allows for a smaller
264        // (RAM resident) merkle tree over the structure.
265        assert!(
266            N.is_multiple_of(H::Digest::SIZE),
267            "chunk size must be some multiple of the digest size",
268        );
269        // A compile-time assertion that chunk size is a power of 2, which is necessary to allow
270        // the status bitmap tree to be aligned with the underlying operations MMR.
271        assert!(N.is_power_of_two(), "chunk size must be a power of 2");
272    }
273
274    let thread_pool = config.merkle_config.thread_pool.clone();
275    let metadata_partition = config.grafted_metadata_partition.clone();
276
277    // Load bitmap metadata (pruned_chunks + pinned nodes for the grafted tree).
278    let (metadata, pruned_chunks, pinned_nodes) =
279        db::init_metadata(context.with_label("metadata"), &metadata_partition).await?;
280
281    // Initialize the activity status bitmap.
282    let mut status = BitMap::<N>::new_with_pruned_chunks(pruned_chunks)
283        .map_err(|_| crate::qmdb::Error::<F>::DataCorrupted("pruned chunks overflow"))?;
284
285    // Initialize the anydb with a callback that populates the status bitmap.
286    let last_known_inactivity_floor = Location::new(status.len());
287    let any = any::init(
288        context.with_label("any"),
289        config.into(),
290        Some(last_known_inactivity_floor),
291        |append: bool, loc: Option<Location<F>>| {
292            status.push(append);
293            if let Some(loc) = loc {
294                status.set_bit(*loc, false);
295            }
296        },
297    )
298    .await?;
299
300    // Build the grafted tree from the bitmap and ops tree.
301    let hasher = StandardHasher::<H>::new();
302    let grafted_tree = db::build_grafted_tree::<F, H, N>(
303        &hasher,
304        &status,
305        &pinned_nodes,
306        &any.log.merkle,
307        thread_pool.as_ref(),
308    )
309    .await?;
310
311    // Compute and cache the root.
312    let storage = grafting::Storage::new(&grafted_tree, grafting::height::<N>(), &any.log.merkle);
313    let partial_chunk = db::partial_chunk(&status);
314    let ops_root = any.log.root();
315    let root = db::compute_db_root(&hasher, &status, &storage, partial_chunk, &ops_root).await?;
316
317    Ok(db::Db {
318        any,
319        status: batch::BitmapBatch::Base(Arc::new(status)),
320        grafted_tree,
321        metadata: AsyncMutex::new(metadata),
322        thread_pool,
323        root,
324    })
325}
326
327/// Extension trait for Current QMDB types that exposes bitmap information for testing.
328#[cfg(any(test, feature = "test-traits"))]
329pub trait BitmapPrunedBits {
330    /// Returns the number of bits that have been pruned from the bitmap.
331    fn pruned_bits(&self) -> u64;
332
333    /// Returns the value of the bit at the given index.
334    fn get_bit(&self, index: u64) -> bool;
335
336    /// Returns the position of the oldest retained bit.
337    fn oldest_retained(&self) -> impl core::future::Future<Output = u64> + Send;
338}
339
340#[cfg(test)]
341pub mod tests {
342    //! Shared test utilities for Current QMDB variants.
343
344    pub use super::BitmapPrunedBits;
345    use super::{ordered, unordered, FConfig, FixedConfig, MmrConfig, VConfig, VariableConfig};
346    use crate::{
347        merkle::{self, mmb, mmr},
348        qmdb::{
349            any::{
350                test::colliding_digest,
351                traits::{DbAny, MerkleizedBatch as _, UnmerkleizedBatch as _},
352            },
353            store::tests::{TestKey, TestValue},
354        },
355        translator::Translator,
356    };
357    use commonware_runtime::{
358        buffer::paged::CacheRef,
359        deterministic::{self, Context},
360        BufferPooler, Metrics as _, Runner as _,
361    };
362    use commonware_utils::{NZUsize, NZU16, NZU64};
363    use core::future::Future;
364    use rand::{rngs::StdRng, RngCore, SeedableRng};
365    use std::num::{NonZeroU16, NonZeroUsize};
366    use tracing::warn;
367
368    type Error<F> = crate::qmdb::Error<F>;
369    type Location<F> = merkle::Location<F>;
370    type WriteVec<F, C> = Vec<(<C as DbAny<F>>::Key, Option<<C as DbAny<F>>::Value>)>;
371
372    // Janky page & cache sizes to exercise boundary conditions.
373    const PAGE_SIZE: NonZeroU16 = NZU16!(88);
374    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(8);
375
376    /// Shared config factory for fixed-value Current QMDB tests.
377    pub(crate) fn fixed_config<T: Translator + Default>(
378        partition_prefix: &str,
379        pooler: &impl BufferPooler,
380    ) -> FixedConfig<T> {
381        let page_cache = CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE);
382        FixedConfig {
383            merkle_config: MmrConfig {
384                journal_partition: format!("{partition_prefix}-journal-partition"),
385                metadata_partition: format!("{partition_prefix}-metadata-partition"),
386                items_per_blob: NZU64!(11),
387                write_buffer: NZUsize!(1024),
388                thread_pool: None,
389                page_cache: page_cache.clone(),
390            },
391            journal_config: FConfig {
392                partition: format!("{partition_prefix}-partition-prefix"),
393                items_per_blob: NZU64!(7),
394                page_cache,
395                write_buffer: NZUsize!(1024),
396            },
397            grafted_metadata_partition: format!("{partition_prefix}-grafted-metadata-partition"),
398            translator: T::default(),
399        }
400    }
401
402    /// Shared config factory for variable-value Current QMDB tests with unit codec config.
403    pub(crate) fn variable_config<T: Translator + Default>(
404        partition_prefix: &str,
405        pooler: &impl BufferPooler,
406    ) -> VariableConfig<T, ((), ())> {
407        let page_cache = CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE);
408        VariableConfig {
409            merkle_config: MmrConfig {
410                journal_partition: format!("{partition_prefix}-journal-partition"),
411                metadata_partition: format!("{partition_prefix}-metadata-partition"),
412                items_per_blob: NZU64!(11),
413                write_buffer: NZUsize!(1024),
414                thread_pool: None,
415                page_cache: page_cache.clone(),
416            },
417            journal_config: VConfig {
418                partition: format!("{partition_prefix}-partition-prefix"),
419                items_per_section: NZU64!(7),
420                compression: None,
421                codec_config: ((), ()),
422                page_cache,
423                write_buffer: NZUsize!(1024),
424            },
425            grafted_metadata_partition: format!("{partition_prefix}-grafted-metadata-partition"),
426            translator: T::default(),
427        }
428    }
429
430    /// Commit a set of writes as a single batch.
431    async fn commit_writes<F: merkle::Graftable, C: DbAny<F>>(
432        db: &mut C,
433        writes: impl IntoIterator<Item = (C::Key, Option<<C as DbAny<F>>::Value>)>,
434    ) -> Result<(), Error<F>> {
435        let mut batch = db.new_batch();
436        for (k, v) in writes {
437            batch = batch.write(k, v);
438        }
439        let merkleized = batch.merkleize(db, None).await?;
440        db.apply_batch(merkleized).await?;
441        db.commit().await?;
442        Ok(())
443    }
444
445    /// Apply random operations to the given db, committing them (randomly and at the end) only if
446    /// `commit_changes` is true. Returns the db; callers should commit if needed.
447    ///
448    /// Returns a boxed future to prevent stack overflow when monomorphized across many DB variants.
449    async fn apply_random_ops_inner<F, C>(
450        num_elements: u64,
451        commit_changes: bool,
452        rng_seed: u64,
453        mut db: C,
454    ) -> Result<C, Error<F>>
455    where
456        F: merkle::Graftable,
457        C: DbAny<F>,
458        C::Key: TestKey,
459        <C as DbAny<F>>::Value: TestValue,
460    {
461        // Log the seed with high visibility to make failures reproducible.
462        warn!("rng_seed={}", rng_seed);
463        let mut rng = StdRng::seed_from_u64(rng_seed);
464
465        // First loop: all initial writes in one batch.
466        let writes: Vec<_> = (0u64..num_elements)
467            .map(|i| {
468                let k = TestKey::from_seed(i);
469                let v = TestValue::from_seed(rng.next_u64());
470                (k, Some(v))
471            })
472            .collect();
473        if commit_changes {
474            commit_writes(&mut db, writes).await?;
475        }
476
477        // Randomly update / delete them. We use a delete frequency that is 1/7th of the update
478        // frequency. Accumulate writes and commit periodically.
479        let mut pending: WriteVec<F, C> = Vec::new();
480        for _ in 0u64..num_elements * 10 {
481            let rand_key = TestKey::from_seed(rng.next_u64() % num_elements);
482            if rng.next_u32() % 7 == 0 {
483                pending.push((rand_key, None));
484                continue;
485            }
486            let v = TestValue::from_seed(rng.next_u64());
487            pending.push((rand_key, Some(v)));
488            if commit_changes && rng.next_u32() % 20 == 0 {
489                commit_writes(&mut db, pending.drain(..)).await?;
490            }
491        }
492        if commit_changes {
493            commit_writes(&mut db, pending).await?;
494        }
495        Ok(db)
496    }
497
498    pub fn apply_random_ops<F, C>(
499        num_elements: u64,
500        commit_changes: bool,
501        rng_seed: u64,
502        db: C,
503    ) -> std::pin::Pin<Box<dyn Future<Output = Result<C, Error<F>>>>>
504    where
505        F: merkle::Graftable + 'static,
506        C: DbAny<F> + 'static,
507        C::Key: TestKey,
508        <C as DbAny<F>>::Value: TestValue,
509    {
510        Box::pin(apply_random_ops_inner::<F, C>(
511            num_elements,
512            commit_changes,
513            rng_seed,
514            db,
515        ))
516    }
517
518    /// Run `test_build_random_close_reopen` against a database factory.
519    ///
520    /// The factory should return a database when given a context and partition name.
521    /// The factory will be called multiple times to test reopening.
522    pub fn test_build_random_close_reopen<M, C, F, Fut>(mut open_db: F)
523    where
524        M: merkle::Graftable + 'static,
525        C: DbAny<M> + 'static,
526        C::Key: TestKey,
527        <C as DbAny<M>>::Value: TestValue,
528        F: FnMut(Context, String) -> Fut + Clone,
529        Fut: Future<Output = C>,
530    {
531        const ELEMENTS: u64 = 1000;
532
533        let executor = deterministic::Runner::default();
534        let mut open_db_clone = open_db.clone();
535        let state1 = executor.start(|mut context| async move {
536            let partition = "build-random".to_string();
537            let rng_seed = context.next_u64();
538            let mut db: C = open_db_clone(context.with_label("first"), partition.clone()).await;
539            db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed, db)
540                .await
541                .unwrap();
542            let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
543            db.apply_batch(merkleized).await.unwrap();
544            db.sync().await.unwrap();
545
546            // Drop and reopen the db
547            let root = db.root();
548            drop(db);
549            let db: C = open_db_clone(context.with_label("second"), partition).await;
550
551            // Ensure the root matches
552            assert_eq!(db.root(), root);
553
554            db.destroy().await.unwrap();
555            context.auditor().state()
556        });
557
558        // Run again to verify determinism
559        let executor = deterministic::Runner::default();
560        let state2 = executor.start(|mut context| async move {
561            let partition = "build-random".to_string();
562            let rng_seed = context.next_u64();
563            let mut db: C = open_db(context.with_label("first"), partition.clone()).await;
564            db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed, db)
565                .await
566                .unwrap();
567            let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
568            db.apply_batch(merkleized).await.unwrap();
569            db.sync().await.unwrap();
570
571            let root = db.root();
572            drop(db);
573            let db: C = open_db(context.with_label("second"), partition).await;
574            assert_eq!(db.root(), root);
575
576            db.destroy().await.unwrap();
577            context.auditor().state()
578        });
579
580        assert_eq!(state1, state2);
581    }
582
583    /// Run `test_simulate_write_failures` against a database factory.
584    ///
585    /// This test builds a random database and simulates recovery from different types of
586    /// failure scenarios.
587    pub fn test_simulate_write_failures<M, C, F, Fut>(mut open_db: F)
588    where
589        M: merkle::Graftable + 'static,
590        C: DbAny<M> + 'static,
591        C::Key: TestKey,
592        <C as DbAny<M>>::Value: TestValue,
593        F: FnMut(Context, String) -> Fut + Clone,
594        Fut: Future<Output = C>,
595    {
596        const ELEMENTS: u64 = 1000;
597
598        let executor = deterministic::Runner::default();
599        // Box the future to prevent stack overflow when monomorphized across DB variants.
600        executor.start(|mut context| {
601            Box::pin(async move {
602                let partition = "build-random-fail-commit".to_string();
603                let rng_seed = context.next_u64();
604                let mut db: C = open_db(context.with_label("first"), partition.clone()).await;
605                db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed, db)
606                    .await
607                    .unwrap();
608                commit_writes(&mut db, []).await.unwrap();
609                let committed_root = db.root();
610                let committed_op_count = db.bounds().await.end;
611                let committed_inactivity_floor = db.inactivity_floor_loc().await;
612                db.prune(committed_inactivity_floor).await.unwrap();
613
614                // Perform more random operations without committing any of them.
615                let db = apply_random_ops::<M, C>(ELEMENTS, false, rng_seed + 1, db)
616                    .await
617                    .unwrap();
618
619                // SCENARIO #1: Simulate a crash that happens before any writes. Upon reopening, the
620                // state of the DB should be as of the last commit.
621                drop(db);
622                let db: C = open_db(context.with_label("scenario1"), partition.clone()).await;
623                assert_eq!(db.root(), committed_root);
624                assert_eq!(db.bounds().await.end, committed_op_count);
625
626                // Re-apply the exact same operations, this time committed.
627                let db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed + 1, db)
628                    .await
629                    .unwrap();
630
631                // SCENARIO #2: Simulate a crash that happens after the any db has been committed, but
632                // before sync/prune is called. We do this by dropping the db without calling
633                // sync or prune.
634                let committed_op_count = db.bounds().await.end;
635                drop(db);
636
637                // We should be able to recover, so the root should differ from the previous commit, and
638                // the op count should be greater than before.
639                let db: C = open_db(context.with_label("scenario2"), partition.clone()).await;
640                let scenario_2_root = db.root();
641
642                // To confirm the second committed hash is correct we'll re-build the DB in a new
643                // partition, but without any failures. They should have the exact same state.
644                let fresh_partition = "build-random-fail-commit-fresh".to_string();
645                let mut db: C = open_db(context.with_label("fresh"), fresh_partition.clone()).await;
646                db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed, db)
647                    .await
648                    .unwrap();
649                commit_writes(&mut db, []).await.unwrap();
650                db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed + 1, db)
651                    .await
652                    .unwrap();
653                db.prune(db.inactivity_floor_loc().await).await.unwrap();
654                // State from scenario #2 should match that of a successful commit.
655                assert_eq!(db.bounds().await.end, committed_op_count);
656                assert_eq!(db.root(), scenario_2_root);
657
658                db.destroy().await.unwrap();
659            })
660        });
661    }
662
663    /// Run `test_different_pruning_delays_same_root` against a database factory.
664    ///
665    /// This test verifies that pruning operations do not affect the root hash - two databases
666    /// with identical operations but different pruning schedules should have the same root.
667    pub fn test_different_pruning_delays_same_root<M, C, F, Fut>(mut open_db: F)
668    where
669        M: merkle::Graftable,
670        C: DbAny<M>,
671        C::Key: TestKey,
672        <C as DbAny<M>>::Value: TestValue,
673        F: FnMut(Context, String) -> Fut + Clone,
674        Fut: Future<Output = C>,
675    {
676        const NUM_OPERATIONS: u64 = 1000;
677
678        let executor = deterministic::Runner::default();
679        let mut open_db_clone = open_db.clone();
680        executor.start(|context| async move {
681            // Create two databases that are identical other than how they are pruned.
682            let mut db_no_pruning: C =
683                open_db_clone(context.with_label("no_pruning"), "no-pruning-test".into()).await;
684            let mut db_pruning: C =
685                open_db(context.with_label("pruning"), "pruning-test".into()).await;
686
687            // Apply identical operations to both databases, but only prune one.
688            // Accumulate writes between commits.
689            let mut pending_no_pruning: WriteVec<M, C> = Vec::new();
690            let mut pending_pruning: WriteVec<M, C> = Vec::new();
691            for i in 0..NUM_OPERATIONS {
692                let key: C::Key = TestKey::from_seed(i);
693                let value: <C as DbAny<M>>::Value = TestValue::from_seed(i * 1000);
694
695                pending_no_pruning.push((key, Some(value.clone())));
696                pending_pruning.push((key, Some(value)));
697
698                // Commit periodically
699                if i % 50 == 49 {
700                    commit_writes(&mut db_no_pruning, pending_no_pruning.drain(..))
701                        .await
702                        .unwrap();
703                    commit_writes(&mut db_pruning, pending_pruning.drain(..))
704                        .await
705                        .unwrap();
706                    db_pruning
707                        .prune(db_no_pruning.inactivity_floor_loc().await)
708                        .await
709                        .unwrap();
710                }
711            }
712
713            // Final commit for remaining writes.
714            commit_writes(&mut db_no_pruning, pending_no_pruning)
715                .await
716                .unwrap();
717            commit_writes(&mut db_pruning, pending_pruning)
718                .await
719                .unwrap();
720
721            // Get roots from both databases - they should match
722            let root_no_pruning = db_no_pruning.root();
723            let root_pruning = db_pruning.root();
724            assert_eq!(root_no_pruning, root_pruning);
725
726            // Also verify inactivity floors match
727            assert_eq!(
728                db_no_pruning.inactivity_floor_loc().await,
729                db_pruning.inactivity_floor_loc().await
730            );
731
732            db_no_pruning.destroy().await.unwrap();
733            db_pruning.destroy().await.unwrap();
734        });
735    }
736
737    /// Run `test_sync_persists_bitmap_pruning_boundary` against a database factory.
738    ///
739    /// This test verifies that calling `sync()` persists the bitmap pruning boundary that was
740    /// set during `commit()`. If `sync()` didn't call `write_pruned`, the
741    /// `pruned_bits()` count would be 0 after reopen instead of the expected value.
742    pub fn test_sync_persists_bitmap_pruning_boundary<M, C, F, Fut>(mut open_db: F)
743    where
744        M: merkle::Graftable + 'static,
745        C: DbAny<M> + BitmapPrunedBits + 'static,
746        C::Key: TestKey,
747        <C as DbAny<M>>::Value: TestValue,
748        F: FnMut(Context, String) -> Fut + Clone,
749        Fut: Future<Output = C>,
750    {
751        const ELEMENTS: u64 = 500;
752
753        let executor = deterministic::Runner::default();
754        let mut open_db_clone = open_db.clone();
755        executor.start(|mut context| async move {
756            let partition = "sync-bitmap-pruning".to_string();
757            let rng_seed = context.next_u64();
758            let mut db: C = open_db_clone(context.with_label("first"), partition.clone()).await;
759
760            // Apply random operations with commits to advance the inactivity floor.
761            db = apply_random_ops::<M, C>(ELEMENTS, true, rng_seed, db).await.unwrap();
762            let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
763            db.apply_batch(merkleized).await.unwrap();
764
765            // Prune to flatten bitmap layers and advance pruned_chunks.
766            let floor = db.inactivity_floor_loc().await;
767            db.prune(floor).await.unwrap();
768
769            let pruned_bits_before = db.pruned_bits();
770            warn!(
771                "pruned_bits_before={}, inactivity_floor={}, op_count={}",
772                pruned_bits_before,
773                *db.inactivity_floor_loc().await,
774                *db.bounds().await.end
775            );
776
777            // Verify we actually have some pruning (otherwise the test is meaningless).
778            assert!(
779                pruned_bits_before > 0,
780                "Expected bitmap to have pruned bits after prune()"
781            );
782
783            // Call sync() to persist the bitmap pruning boundary.
784            db.sync().await.unwrap();
785
786            // Record the root before dropping.
787            let root_before = db.root();
788            drop(db);
789
790            // Reopen the database.
791            let db: C = open_db(context.with_label("second"), partition).await;
792
793            // The pruned bits count should match. If sync() didn't persist the bitmap pruned
794            // state, this would be 0.
795            let pruned_bits_after = db.pruned_bits();
796            warn!("pruned_bits_after={}", pruned_bits_after);
797
798            assert_eq!(
799                pruned_bits_after, pruned_bits_before,
800                "Bitmap pruned bits mismatch after reopen - sync() may not have called write_pruned()"
801            );
802
803            // Also verify the root matches.
804            assert_eq!(db.root(), root_before);
805
806            db.destroy().await.unwrap();
807        });
808    }
809
810    /// Run `test_current_db_build_big` against a database factory.
811    ///
812    /// This test builds a database with 1000 keys, updates some, deletes some, and verifies that
813    /// the final state matches an independently computed HashMap. It also verifies that the state
814    /// persists correctly after close and reopen.
815    pub fn test_current_db_build_big<M, C, F, Fut>(mut open_db: F)
816    where
817        M: merkle::Graftable,
818        C: DbAny<M>,
819        C::Key: TestKey,
820        <C as DbAny<M>>::Value: TestValue,
821        F: FnMut(Context, String) -> Fut + Clone,
822        Fut: Future<Output = C>,
823    {
824        const ELEMENTS: u64 = 1000;
825
826        let executor = deterministic::Runner::default();
827        let mut open_db_clone = open_db.clone();
828        executor.start(|context| async move {
829            let mut db: C = open_db_clone(context.with_label("first"), "build-big".into()).await;
830
831            let mut map = std::collections::HashMap::<C::Key, <C as DbAny<M>>::Value>::default();
832
833            // All creates, updates, and deletes in one batch.
834            let mut batch = db.new_batch();
835
836            // Initial creates
837            for i in 0u64..ELEMENTS {
838                let k: C::Key = TestKey::from_seed(i);
839                let v: <C as DbAny<M>>::Value = TestValue::from_seed(i * 1000);
840                batch = batch.write(k, Some(v.clone()));
841                map.insert(k, v);
842            }
843
844            // Update every 3rd key
845            for i in 0u64..ELEMENTS {
846                if i % 3 != 0 {
847                    continue;
848                }
849                let k: C::Key = TestKey::from_seed(i);
850                let v: <C as DbAny<M>>::Value = TestValue::from_seed((i + 1) * 10000);
851                batch = batch.write(k, Some(v.clone()));
852                map.insert(k, v);
853            }
854
855            // Delete every 7th key
856            for i in 0u64..ELEMENTS {
857                if i % 7 != 1 {
858                    continue;
859                }
860                let k: C::Key = TestKey::from_seed(i);
861                batch = batch.write(k, None);
862                map.remove(&k);
863            }
864
865            let merkleized = batch.merkleize(&db, None).await.unwrap();
866            db.apply_batch(merkleized).await.unwrap();
867
868            // Sync and prune.
869            db.sync().await.unwrap();
870            db.prune(db.inactivity_floor_loc().await).await.unwrap();
871
872            // Record root before dropping.
873            let root = db.root();
874            db.sync().await.unwrap();
875            drop(db);
876
877            // Reopen the db and verify it has exactly the same state.
878            let db: C = open_db(context.with_label("second"), "build-big".into()).await;
879            assert_eq!(root, db.root());
880
881            // Confirm the db's state matches that of the separate map we computed independently.
882            for i in 0u64..ELEMENTS {
883                let k: C::Key = TestKey::from_seed(i);
884                if let Some(map_value) = map.get(&k) {
885                    let Some(db_value) = db.get(&k).await.unwrap() else {
886                        panic!("key not found in db: {k}");
887                    };
888                    assert_eq!(*map_value, db_value);
889                } else {
890                    assert!(db.get(&k).await.unwrap().is_none());
891                }
892            }
893        });
894    }
895
896    /// Run `test_stale_batch_side_effect_free` against a database factory.
897    ///
898    /// The stale batch must be rejected without mutating the committed state.
899    pub fn test_stale_batch_side_effect_free<M, C, F, Fut>(mut open_db: F)
900    where
901        M: merkle::Graftable,
902        C: DbAny<M>,
903        C::Key: TestKey,
904        <C as DbAny<M>>::Value: TestValue,
905        F: FnMut(Context, String) -> Fut,
906        Fut: Future<Output = C>,
907    {
908        let executor = deterministic::Runner::default();
909        executor.start(|context| async move {
910            let mut db: C =
911                open_db(context.with_label("db"), "stale-side-effect-free".into()).await;
912
913            let key1 = <C::Key as TestKey>::from_seed(1);
914            let key2 = <C::Key as TestKey>::from_seed(2);
915            let value1 = <<C as DbAny<M>>::Value as TestValue>::from_seed(10);
916            let value2 = <<C as DbAny<M>>::Value as TestValue>::from_seed(20);
917
918            let mut batch = db.new_batch();
919            batch = batch.write(key1, Some(value1.clone()));
920            let batch_a = batch.merkleize(&db, None).await.unwrap();
921            let mut batch = db.new_batch();
922            batch = batch.write(key2, Some(value2));
923            let batch_b = batch.merkleize(&db, None).await.unwrap();
924
925            db.apply_batch(batch_a).await.unwrap();
926            let expected_root = db.root();
927            let expected_bounds = db.bounds().await;
928            let expected_metadata = db.get_metadata().await.unwrap();
929            assert_eq!(db.get(&key1).await.unwrap(), Some(value1.clone()));
930            assert_eq!(db.get(&key2).await.unwrap(), None);
931
932            let result = db.apply_batch(batch_b).await;
933            assert!(
934                matches!(result, Err(Error::StaleBatch { .. })),
935                "expected StaleBatch error, got {result:?}"
936            );
937            assert_eq!(db.root(), expected_root);
938            assert_eq!(db.bounds().await, expected_bounds);
939            assert_eq!(db.get_metadata().await.unwrap(), expected_metadata);
940            assert_eq!(db.get(&key1).await.unwrap(), Some(value1));
941            assert_eq!(db.get(&key2).await.unwrap(), None);
942
943            db.destroy().await.unwrap();
944        });
945    }
946
947    use crate::translator::OneCap;
948    use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
949    use commonware_macros::{test_group, test_traced};
950
951    type OrderedFixedDb =
952        ordered::fixed::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 32>;
953    type OrderedVariableDb =
954        ordered::variable::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 32>;
955    type UnorderedFixedDb =
956        unordered::fixed::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 32>;
957    type UnorderedVariableDb =
958        unordered::variable::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 32>;
959    type OrderedFixedP1Db = ordered::fixed::partitioned::Db<
960        mmr::Family,
961        Context,
962        Digest,
963        Digest,
964        Sha256,
965        OneCap,
966        1,
967        32,
968    >;
969    type OrderedVariableP1Db = ordered::variable::partitioned::Db<
970        mmr::Family,
971        Context,
972        Digest,
973        Digest,
974        Sha256,
975        OneCap,
976        1,
977        32,
978    >;
979    type UnorderedFixedP1Db = unordered::fixed::partitioned::Db<
980        mmr::Family,
981        Context,
982        Digest,
983        Digest,
984        Sha256,
985        OneCap,
986        1,
987        32,
988    >;
989    type UnorderedVariableP1Db = unordered::variable::partitioned::Db<
990        mmr::Family,
991        Context,
992        Digest,
993        Digest,
994        Sha256,
995        OneCap,
996        1,
997        32,
998    >;
999    type OrderedFixedP2Db = ordered::fixed::partitioned::Db<
1000        mmr::Family,
1001        Context,
1002        Digest,
1003        Digest,
1004        Sha256,
1005        OneCap,
1006        2,
1007        32,
1008    >;
1009    type OrderedVariableP2Db = ordered::variable::partitioned::Db<
1010        mmr::Family,
1011        Context,
1012        Digest,
1013        Digest,
1014        Sha256,
1015        OneCap,
1016        2,
1017        32,
1018    >;
1019    type UnorderedFixedP2Db = unordered::fixed::partitioned::Db<
1020        mmr::Family,
1021        Context,
1022        Digest,
1023        Digest,
1024        Sha256,
1025        OneCap,
1026        2,
1027        32,
1028    >;
1029    type UnorderedVariableP2Db = unordered::variable::partitioned::Db<
1030        mmr::Family,
1031        Context,
1032        Digest,
1033        Digest,
1034        Sha256,
1035        OneCap,
1036        2,
1037        32,
1038    >;
1039
1040    type OrderedFixedMmbDb =
1041        ordered::fixed::Db<mmb::Family, Context, Digest, Digest, Sha256, OneCap, 32>;
1042    type OrderedVariableMmbDb =
1043        ordered::variable::Db<mmb::Family, Context, Digest, Digest, Sha256, OneCap, 32>;
1044    type UnorderedFixedMmbDb =
1045        unordered::fixed::Db<mmb::Family, Context, Digest, Digest, Sha256, OneCap, 32>;
1046    type UnorderedVariableMmbDb =
1047        unordered::variable::Db<mmb::Family, Context, Digest, Digest, Sha256, OneCap, 32>;
1048    type OrderedFixedMmbP1Db = ordered::fixed::partitioned::Db<
1049        mmb::Family,
1050        Context,
1051        Digest,
1052        Digest,
1053        Sha256,
1054        OneCap,
1055        1,
1056        32,
1057    >;
1058    type OrderedVariableMmbP1Db = ordered::variable::partitioned::Db<
1059        mmb::Family,
1060        Context,
1061        Digest,
1062        Digest,
1063        Sha256,
1064        OneCap,
1065        1,
1066        32,
1067    >;
1068    type UnorderedFixedMmbP1Db = unordered::fixed::partitioned::Db<
1069        mmb::Family,
1070        Context,
1071        Digest,
1072        Digest,
1073        Sha256,
1074        OneCap,
1075        1,
1076        32,
1077    >;
1078    type UnorderedVariableMmbP1Db = unordered::variable::partitioned::Db<
1079        mmb::Family,
1080        Context,
1081        Digest,
1082        Digest,
1083        Sha256,
1084        OneCap,
1085        1,
1086        32,
1087    >;
1088    type OrderedFixedMmbP2Db = ordered::fixed::partitioned::Db<
1089        mmb::Family,
1090        Context,
1091        Digest,
1092        Digest,
1093        Sha256,
1094        OneCap,
1095        2,
1096        32,
1097    >;
1098    type OrderedVariableMmbP2Db = ordered::variable::partitioned::Db<
1099        mmb::Family,
1100        Context,
1101        Digest,
1102        Digest,
1103        Sha256,
1104        OneCap,
1105        2,
1106        32,
1107    >;
1108    type UnorderedFixedMmbP2Db = unordered::fixed::partitioned::Db<
1109        mmb::Family,
1110        Context,
1111        Digest,
1112        Digest,
1113        Sha256,
1114        OneCap,
1115        2,
1116        32,
1117    >;
1118    type UnorderedVariableMmbP2Db = unordered::variable::partitioned::Db<
1119        mmb::Family,
1120        Context,
1121        Digest,
1122        Digest,
1123        Sha256,
1124        OneCap,
1125        2,
1126        32,
1127    >;
1128
1129    // Helper macro to create an open_db closure for a specific variant.
1130    macro_rules! open_db_fn {
1131        ($db:ty, $cfg:ident) => {
1132            |ctx: Context, partition: String| async move {
1133                <$db>::init(ctx.clone(), $cfg::<OneCap>(&partition, &ctx))
1134                    .await
1135                    .unwrap()
1136            }
1137        };
1138    }
1139
1140    // Defines all variants across both supported Merkle families.
1141    macro_rules! with_all_variants {
1142        ($cb:ident!($($args:tt)*)) => {
1143            $cb!($($args)*, "of", OrderedFixedDb, fixed_config);
1144            $cb!($($args)*, "ov", OrderedVariableDb, variable_config);
1145            $cb!($($args)*, "uf", UnorderedFixedDb, fixed_config);
1146            $cb!($($args)*, "uv", UnorderedVariableDb, variable_config);
1147            $cb!($($args)*, "ofp1", OrderedFixedP1Db, fixed_config);
1148            $cb!($($args)*, "ovp1", OrderedVariableP1Db, variable_config);
1149            $cb!($($args)*, "ufp1", UnorderedFixedP1Db, fixed_config);
1150            $cb!($($args)*, "uvp1", UnorderedVariableP1Db, variable_config);
1151            $cb!($($args)*, "ofp2", OrderedFixedP2Db, fixed_config);
1152            $cb!($($args)*, "ovp2", OrderedVariableP2Db, variable_config);
1153            $cb!($($args)*, "ufp2", UnorderedFixedP2Db, fixed_config);
1154            $cb!($($args)*, "uvp2", UnorderedVariableP2Db, variable_config);
1155            $cb!($($args)*, "of-mmb", OrderedFixedMmbDb, fixed_config);
1156            $cb!($($args)*, "ov-mmb", OrderedVariableMmbDb, variable_config);
1157            $cb!($($args)*, "uf-mmb", UnorderedFixedMmbDb, fixed_config);
1158            $cb!($($args)*, "uv-mmb", UnorderedVariableMmbDb, variable_config);
1159            $cb!($($args)*, "ofp1-mmb", OrderedFixedMmbP1Db, fixed_config);
1160            $cb!($($args)*, "ovp1-mmb", OrderedVariableMmbP1Db, variable_config);
1161            $cb!($($args)*, "ufp1-mmb", UnorderedFixedMmbP1Db, fixed_config);
1162            $cb!($($args)*, "uvp1-mmb", UnorderedVariableMmbP1Db, variable_config);
1163            $cb!($($args)*, "ofp2-mmb", OrderedFixedMmbP2Db, fixed_config);
1164            $cb!($($args)*, "ovp2-mmb", OrderedVariableMmbP2Db, variable_config);
1165            $cb!($($args)*, "ufp2-mmb", UnorderedFixedMmbP2Db, fixed_config);
1166            $cb!($($args)*, "uvp2-mmb", UnorderedVariableMmbP2Db, variable_config);
1167        };
1168    }
1169
1170    // Defines 6 ordered variants.
1171    macro_rules! with_ordered_variants {
1172        ($cb:ident!($($args:tt)*)) => {
1173            $cb!($($args)*, "of", OrderedFixedDb, fixed_config);
1174            $cb!($($args)*, "ov", OrderedVariableDb, variable_config);
1175            $cb!($($args)*, "ofp1", OrderedFixedP1Db, fixed_config);
1176            $cb!($($args)*, "ovp1", OrderedVariableP1Db, variable_config);
1177            $cb!($($args)*, "ofp2", OrderedFixedP2Db, fixed_config);
1178            $cb!($($args)*, "ovp2", OrderedVariableP2Db, variable_config);
1179            $cb!($($args)*, "of-mmb", OrderedFixedMmbDb, fixed_config);
1180            $cb!($($args)*, "ov-mmb", OrderedVariableMmbDb, variable_config);
1181            $cb!($($args)*, "ofp1-mmb", OrderedFixedMmbP1Db, fixed_config);
1182            $cb!($($args)*, "ovp1-mmb", OrderedVariableMmbP1Db, variable_config);
1183            $cb!($($args)*, "ofp2-mmb", OrderedFixedMmbP2Db, fixed_config);
1184            $cb!($($args)*, "ovp2-mmb", OrderedVariableMmbP2Db, variable_config);
1185        };
1186    }
1187
1188    // Defines 6 unordered variants.
1189    macro_rules! with_unordered_variants {
1190        ($cb:ident!($($args:tt)*)) => {
1191            $cb!($($args)*, "uf", UnorderedFixedDb, fixed_config);
1192            $cb!($($args)*, "uv", UnorderedVariableDb, variable_config);
1193            $cb!($($args)*, "ufp1", UnorderedFixedP1Db, fixed_config);
1194            $cb!($($args)*, "uvp1", UnorderedVariableP1Db, variable_config);
1195            $cb!($($args)*, "ufp2", UnorderedFixedP2Db, fixed_config);
1196            $cb!($($args)*, "uvp2", UnorderedVariableP2Db, variable_config);
1197            $cb!($($args)*, "uf-mmb", UnorderedFixedMmbDb, fixed_config);
1198            $cb!($($args)*, "uv-mmb", UnorderedVariableMmbDb, variable_config);
1199            $cb!($($args)*, "ufp1-mmb", UnorderedFixedMmbP1Db, fixed_config);
1200            $cb!($($args)*, "uvp1-mmb", UnorderedVariableMmbP1Db, variable_config);
1201            $cb!($($args)*, "ufp2-mmb", UnorderedFixedMmbP2Db, fixed_config);
1202            $cb!($($args)*, "uvp2-mmb", UnorderedVariableMmbP2Db, variable_config);
1203        };
1204    }
1205
1206    // Runner macros - receive common args followed by (label, type, config).
1207    macro_rules! test_simple {
1208        ($f:expr, $l:literal, $db:ty, $cfg:ident) => {
1209            Box::pin(async {
1210                $f(open_db_fn!($db, $cfg));
1211            })
1212            .await
1213        };
1214    }
1215
1216    // Macro to run a test on DB variants.
1217    macro_rules! for_all_variants {
1218        (simple: $f:expr) => {{
1219            with_all_variants!(test_simple!($f));
1220        }};
1221        (ordered: $f:expr) => {{
1222            with_ordered_variants!(test_simple!($f));
1223        }};
1224        (unordered: $f:expr) => {{
1225            with_unordered_variants!(test_simple!($f));
1226        }};
1227    }
1228
1229    // Wrapper functions for build_big tests with ordered/unordered expected values.
1230    fn test_ordered_build_big<M, C, F, Fut>(open_db: F)
1231    where
1232        M: merkle::Graftable,
1233        C: DbAny<M>,
1234        C::Key: TestKey,
1235        <C as DbAny<M>>::Value: TestValue,
1236        F: FnMut(Context, String) -> Fut + Clone,
1237        Fut: Future<Output = C>,
1238    {
1239        test_current_db_build_big::<M, C, F, Fut>(open_db);
1240    }
1241
1242    fn test_unordered_build_big<M, C, F, Fut>(open_db: F)
1243    where
1244        M: merkle::Graftable,
1245        C: DbAny<M>,
1246        C::Key: TestKey,
1247        <C as DbAny<M>>::Value: TestValue,
1248        F: FnMut(Context, String) -> Fut + Clone,
1249        Fut: Future<Output = C>,
1250    {
1251        test_current_db_build_big::<M, C, F, Fut>(open_db);
1252    }
1253
1254    #[test_group("slow")]
1255    #[test_traced("WARN")]
1256    fn test_all_variants_build_random_close_reopen() {
1257        let executor = deterministic::Runner::default();
1258        executor.start(|_context| async move {
1259            for_all_variants!(simple: test_build_random_close_reopen);
1260        });
1261    }
1262
1263    #[test_group("slow")]
1264    #[test_traced("WARN")]
1265    fn test_all_variants_simulate_write_failures() {
1266        let executor = deterministic::Runner::default();
1267        executor.start(|_context| async move {
1268            for_all_variants!(simple: test_simulate_write_failures);
1269        });
1270    }
1271
1272    #[test_group("slow")]
1273    #[test_traced("WARN")]
1274    fn test_all_variants_different_pruning_delays_same_root() {
1275        let executor = deterministic::Runner::default();
1276        executor.start(|_context| async move {
1277            for_all_variants!(simple: test_different_pruning_delays_same_root);
1278        });
1279    }
1280
1281    #[test_group("slow")]
1282    #[test_traced("WARN")]
1283    fn test_all_variants_sync_persists_bitmap_pruning_boundary() {
1284        let executor = deterministic::Runner::default();
1285        executor.start(|_context| async move {
1286            for_all_variants!(simple: test_sync_persists_bitmap_pruning_boundary);
1287        });
1288    }
1289
1290    #[test_traced("WARN")]
1291    fn test_all_variants_stale_batch_side_effect_free() {
1292        let executor = deterministic::Runner::default();
1293        executor.start(|_context| async move {
1294            for_all_variants!(simple: test_stale_batch_side_effect_free);
1295        });
1296    }
1297
1298    #[test_group("slow")]
1299    #[test_traced("WARN")]
1300    fn test_ordered_variants_build_big() {
1301        let executor = deterministic::Runner::default();
1302        executor.start(|_context| async move {
1303            for_all_variants!(ordered: test_ordered_build_big);
1304        });
1305    }
1306
1307    #[test_group("slow")]
1308    #[test_traced("WARN")]
1309    fn test_unordered_variants_build_big() {
1310        let executor = deterministic::Runner::default();
1311        executor.start(|_context| async move {
1312            for_all_variants!(unordered: test_unordered_build_big);
1313        });
1314    }
1315
1316    #[test_group("slow")]
1317    #[test_traced("DEBUG")]
1318    fn test_ordered_variants_build_small_close_reopen() {
1319        let executor = deterministic::Runner::default();
1320        executor.start(|_context| async move {
1321            for_all_variants!(ordered: ordered::tests::test_build_small_close_reopen);
1322        });
1323    }
1324
1325    #[test_group("slow")]
1326    #[test_traced("DEBUG")]
1327    fn test_unordered_variants_build_small_close_reopen() {
1328        let executor = deterministic::Runner::default();
1329        executor.start(|_context| async move {
1330            for_all_variants!(unordered: unordered::tests::test_build_small_close_reopen);
1331        });
1332    }
1333
1334    // ---- Current-level batch API tests ----
1335    //
1336    // These exercise the current wrapper's batch methods (root, ops_root,
1337    // MerkleizedBatch::get, batch chaining) which layer bitmap and grafted tree
1338    // computation on top of the `any` batch.
1339
1340    fn key(i: u64) -> Digest {
1341        Sha256::hash(&i.to_be_bytes())
1342    }
1343
1344    fn val(i: u64) -> Digest {
1345        Sha256::hash(&(i + 10000).to_be_bytes())
1346    }
1347
1348    async fn commit_writes_with_metadata(
1349        db: &mut UnorderedVariableDb,
1350        writes: impl IntoIterator<Item = (Digest, Option<Digest>)>,
1351        metadata: Option<Digest>,
1352    ) -> std::ops::Range<Location<mmr::Family>> {
1353        let mut batch = db.new_batch();
1354        for (k, v) in writes {
1355            batch = batch.write(k, v);
1356        }
1357        let merkleized = batch.merkleize(db, metadata).await.unwrap();
1358        let range = db.apply_batch(merkleized).await.unwrap();
1359        db.commit().await.unwrap();
1360        range
1361    }
1362
1363    #[test_traced("INFO")]
1364    fn test_current_rewind_recovery() {
1365        let executor = deterministic::Runner::default();
1366        executor.start(|context| async move {
1367            let partition = "current-rewind-recovery";
1368            let ctx = context.with_label("db");
1369            let mut db: UnorderedVariableDb =
1370                UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>(partition, &ctx))
1371                    .await
1372                    .unwrap();
1373            let initial_size = db.bounds().await.end;
1374            let initial_root = db.root();
1375            let initial_ops_root = db.ops_root();
1376            let initial_floor = db.inactivity_floor_loc();
1377
1378            let metadata_a = val(900);
1379            let first_range = commit_writes_with_metadata(
1380                &mut db,
1381                [(key(0), Some(val(0))), (key(1), Some(val(1)))],
1382                Some(metadata_a),
1383            )
1384            .await;
1385            assert_eq!(first_range.start, initial_size);
1386            let size_before = db.bounds().await.end;
1387            let root_before = db.root();
1388            let ops_root_before = db.ops_root();
1389            let floor_before = db.inactivity_floor_loc();
1390            assert_eq!(size_before, first_range.end);
1391
1392            let metadata_b = val(901);
1393            let second_range = commit_writes_with_metadata(
1394                &mut db,
1395                [
1396                    (key(0), Some(val(100))),
1397                    (key(1), None),
1398                    (key(2), Some(val(2))),
1399                ],
1400                Some(metadata_b),
1401            )
1402            .await;
1403            assert_eq!(second_range.start, size_before);
1404            assert_ne!(db.root(), root_before);
1405            assert_eq!(db.get_metadata().await.unwrap(), Some(val(901)));
1406            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(100)));
1407            assert_eq!(db.get(&key(1)).await.unwrap(), None);
1408            assert_eq!(db.get(&key(2)).await.unwrap(), Some(val(2)));
1409
1410            db.rewind(size_before).await.unwrap();
1411            assert_eq!(db.bounds().await.end, size_before);
1412            assert_eq!(db.root(), root_before);
1413            assert_eq!(db.ops_root(), ops_root_before);
1414            assert_eq!(db.inactivity_floor_loc(), floor_before);
1415            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_a));
1416            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
1417            assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
1418            assert_eq!(db.get(&key(2)).await.unwrap(), None);
1419
1420            db.commit().await.unwrap();
1421            drop(db);
1422
1423            let reopened: UnorderedVariableDb = UnorderedVariableDb::init(
1424                context.with_label("reopen"),
1425                variable_config::<OneCap>(partition, &context),
1426            )
1427            .await
1428            .unwrap();
1429            assert_eq!(reopened.bounds().await.end, size_before);
1430            assert_eq!(reopened.root(), root_before);
1431            assert_eq!(reopened.ops_root(), ops_root_before);
1432            assert_eq!(reopened.inactivity_floor_loc(), floor_before);
1433            assert_eq!(reopened.get_metadata().await.unwrap(), Some(val(900)));
1434            assert_eq!(reopened.get(&key(0)).await.unwrap(), Some(val(0)));
1435            assert_eq!(reopened.get(&key(1)).await.unwrap(), Some(val(1)));
1436            assert_eq!(reopened.get(&key(2)).await.unwrap(), None);
1437
1438            let mut reopened = reopened;
1439            reopened.rewind(initial_size).await.unwrap();
1440            assert_eq!(reopened.bounds().await.end, initial_size);
1441            assert_eq!(reopened.root(), initial_root);
1442            assert_eq!(reopened.ops_root(), initial_ops_root);
1443            assert_eq!(reopened.inactivity_floor_loc(), initial_floor);
1444            assert_eq!(reopened.get_metadata().await.unwrap(), None);
1445            assert_eq!(reopened.get(&key(0)).await.unwrap(), None);
1446            assert_eq!(reopened.get(&key(1)).await.unwrap(), None);
1447            assert_eq!(reopened.get(&key(2)).await.unwrap(), None);
1448
1449            reopened.commit().await.unwrap();
1450            drop(reopened);
1451
1452            let reopened_initial: UnorderedVariableDb = UnorderedVariableDb::init(
1453                context.with_label("reopen_initial"),
1454                variable_config::<OneCap>(partition, &context),
1455            )
1456            .await
1457            .unwrap();
1458            assert_eq!(reopened_initial.bounds().await.end, initial_size);
1459            assert_eq!(reopened_initial.root(), initial_root);
1460            assert_eq!(reopened_initial.ops_root(), initial_ops_root);
1461            assert_eq!(reopened_initial.inactivity_floor_loc(), initial_floor);
1462            assert_eq!(reopened_initial.get_metadata().await.unwrap(), None);
1463            assert_eq!(reopened_initial.get(&key(0)).await.unwrap(), None);
1464            assert_eq!(reopened_initial.get(&key(1)).await.unwrap(), None);
1465            assert_eq!(reopened_initial.get(&key(2)).await.unwrap(), None);
1466
1467            reopened_initial.destroy().await.unwrap();
1468        });
1469    }
1470
1471    #[test_traced("INFO")]
1472    fn test_current_rewind_recovery_pruned_repeated_updates() {
1473        let executor = deterministic::Runner::default();
1474        executor.start(|context| async move {
1475            const COMMITS: u64 = 96;
1476
1477            let partition = "current-rewind-pruned-recovery";
1478            let ctx = context.with_label("db");
1479            let mut db: UnorderedVariableDb =
1480                UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>(partition, &ctx))
1481                    .await
1482                    .unwrap();
1483
1484            let key0 = key(0);
1485            let mut history = Vec::new();
1486            for round in 0..COMMITS {
1487                commit_writes_with_metadata(
1488                    &mut db,
1489                    [(key0, Some(val(20_000 + round)))],
1490                    None,
1491                )
1492                .await;
1493                history.push((
1494                    db.bounds().await.end,
1495                    db.inactivity_floor_loc(),
1496                    db.root(),
1497                    db.ops_root(),
1498                    val(20_000 + round),
1499                ));
1500            }
1501
1502            // Keep most ops-log history, but force bitmap pruning so rewind uses pinned-node
1503            // reconstruction (`pruned_chunks > 0` path).
1504            db.prune(Location::new(1)).await.unwrap();
1505            let pruned_bits = db.pruned_bits();
1506            assert!(pruned_bits > 0, "expected bitmap pruning for rewind test");
1507            let bounds = db.bounds().await;
1508
1509            let (target_size, target_root, target_ops_root, target_value) = history
1510                .iter()
1511                .enumerate()
1512                .find_map(|(idx, (size, floor, root, ops_root, value))| {
1513                    let removed_commits = history.len() - idx - 1;
1514                    if removed_commits >= 3 && *size > bounds.start && *floor >= pruned_bits {
1515                        Some((*size, *root, *ops_root, *value))
1516                    } else {
1517                        None
1518                    }
1519                })
1520                .unwrap_or_else(|| {
1521                    panic!(
1522                        "expected legal pruned rewind target with repeated updates; bounds={bounds:?}, pruned_bits={pruned_bits}, latest_floor={:?}, history={history:?}",
1523                        db.inactivity_floor_loc()
1524                    )
1525                });
1526
1527            db.rewind(target_size).await.unwrap();
1528            assert_eq!(db.root(), target_root);
1529            assert_eq!(db.ops_root(), target_ops_root);
1530            assert_eq!(db.bounds().await.end, target_size);
1531            assert_eq!(db.get(&key0).await.unwrap(), Some(target_value));
1532
1533            db.commit().await.unwrap();
1534            drop(db);
1535
1536            let mut reopened: UnorderedVariableDb = UnorderedVariableDb::init(
1537                context.with_label("reopen_pruned_recovery"),
1538                variable_config::<OneCap>(partition, &context),
1539            )
1540            .await
1541            .unwrap();
1542            assert_eq!(reopened.root(), target_root);
1543            assert_eq!(reopened.ops_root(), target_ops_root);
1544            assert_eq!(reopened.bounds().await.end, target_size);
1545            assert_eq!(reopened.get(&key0).await.unwrap(), Some(target_value));
1546
1547            let metadata_after_rewind = val(30_000);
1548            let new_key = key(1);
1549            let new_value = val(30_001);
1550            let expected_end = commit_writes_with_metadata(
1551                &mut reopened,
1552                [(new_key, Some(new_value))],
1553                Some(metadata_after_rewind),
1554            )
1555            .await
1556            .end;
1557            let root_after_new_write = reopened.root();
1558            let ops_root_after_new_write = reopened.ops_root();
1559            assert_eq!(reopened.bounds().await.end, expected_end);
1560            assert_eq!(reopened.get_metadata().await.unwrap(), Some(metadata_after_rewind));
1561            assert_eq!(reopened.get(&key0).await.unwrap(), Some(target_value));
1562            assert_eq!(reopened.get(&new_key).await.unwrap(), Some(new_value));
1563
1564            drop(reopened);
1565            let reopened_after_new_write: UnorderedVariableDb = UnorderedVariableDb::init(
1566                context.with_label("reopen_pruned_after_new_write"),
1567                variable_config::<OneCap>(partition, &context),
1568            )
1569            .await
1570            .unwrap();
1571            assert_eq!(reopened_after_new_write.root(), root_after_new_write);
1572            assert_eq!(reopened_after_new_write.ops_root(), ops_root_after_new_write);
1573            assert_eq!(reopened_after_new_write.bounds().await.end, expected_end);
1574            assert_eq!(
1575                reopened_after_new_write.get_metadata().await.unwrap(),
1576                Some(metadata_after_rewind)
1577            );
1578            assert_eq!(reopened_after_new_write.get(&key0).await.unwrap(), Some(target_value));
1579            assert_eq!(
1580                reopened_after_new_write.get(&new_key).await.unwrap(),
1581                Some(new_value)
1582            );
1583
1584            reopened_after_new_write.destroy().await.unwrap();
1585        });
1586    }
1587
1588    /// Verify that reopening and proving a pruned MMB database does not panic when the pruned
1589    /// prefix contains sub-grafting-height peaks that require chunk regrouping.
1590    ///
1591    /// With 100 single-key commits the MMB has 301 leaves (100 writes + 100 commit floors +
1592    /// 101 internal nodes). The first 256 leaves span three sub-grafting-height ops peaks
1593    /// (128 + 64 + 64), so grafted root recomposition must regroup them as chunk 0. After
1594    /// pruning, chunk 0 is gone and get_chunk(0) would panic without the pruned-chunk guard.
1595    #[test_traced("INFO")]
1596    fn test_current_mmb_reopen_and_prove_after_prune_multi_peak_chunk() {
1597        let executor = deterministic::Runner::default();
1598        executor.start(|context| async move {
1599            const COMMITS: u64 = 100;
1600
1601            let partition = "current-mmb-reopen-prove-after-prune";
1602            let ctx = context.with_label("db");
1603            let mut db: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
1604                ctx.clone(),
1605                variable_config::<OneCap>(partition, &ctx),
1606            )
1607            .await
1608            .unwrap();
1609
1610            let k = key(0);
1611            let mut expected = None;
1612            for round in 0..COMMITS {
1613                expected = Some(val(50_000 + round));
1614                let mut batch = db.new_batch();
1615                batch = batch.write(k, expected);
1616                let merkleized = batch.merkleize(&db, None).await.unwrap();
1617                db.apply_batch(merkleized).await.unwrap();
1618                db.commit().await.unwrap();
1619            }
1620
1621            let root_before = db.root();
1622            assert!(
1623                *db.inactivity_floor_loc() >= 256,
1624                "expected inactivity floor past chunk 0"
1625            );
1626
1627            db.prune(Location::<mmb::Family>::new(1)).await.unwrap();
1628            assert_eq!(db.pruned_bits(), 256);
1629            db.sync().await.unwrap();
1630            drop(db);
1631
1632            // Reopen: compute_grafted_root must handle pruned chunk 0.
1633            let reopened: UnorderedVariableMmbDb = UnorderedVariableMmbDb::init(
1634                context.with_label("reopen"),
1635                variable_config::<OneCap>(partition, &context),
1636            )
1637            .await
1638            .unwrap();
1639
1640            assert_eq!(reopened.root(), root_before);
1641            assert_eq!(reopened.get(&k).await.unwrap(), expected);
1642
1643            // key_value_proof: RangeProof::new must also handle pruned chunk 0.
1644            let mut hasher = commonware_cryptography::Sha256::new();
1645            let _proof = reopened.key_value_proof(&mut hasher, k).await.unwrap();
1646
1647            reopened.destroy().await.unwrap();
1648        });
1649    }
1650
1651    #[test_traced("INFO")]
1652    fn test_current_rewind_small_delta_large_history() {
1653        let executor = deterministic::Runner::default();
1654        executor.start(|context| async move {
1655            const COMMITS: u64 = 200;
1656
1657            let partition = "current-rewind-small-delta";
1658            let ctx = context.with_label("db");
1659            let mut db: UnorderedVariableDb =
1660                UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>(partition, &ctx))
1661                    .await
1662                    .unwrap();
1663
1664            let key0 = key(0);
1665            let key1 = key(1);
1666            let mut history = Vec::new();
1667
1668            for round in 0..COMMITS {
1669                let key0_value = val(40_000 + round);
1670                let key1_value = if round % 3 == 1 {
1671                    None
1672                } else {
1673                    Some(val(50_000 + round))
1674                };
1675
1676                commit_writes_with_metadata(
1677                    &mut db,
1678                    [(key0, Some(key0_value)), (key1, key1_value)],
1679                    None,
1680                )
1681                .await;
1682
1683                history.push((
1684                    db.bounds().await.end,
1685                    db.root(),
1686                    db.ops_root(),
1687                    key0_value,
1688                    key1_value,
1689                ));
1690            }
1691
1692            let target = *history
1693                .get(history.len() - 3)
1694                .expect("history should contain at least three commits");
1695            let (target_size, target_root, target_ops_root, target_key0, target_key1) = target;
1696
1697            db.rewind(target_size).await.unwrap();
1698            assert_eq!(db.bounds().await.end, target_size);
1699            assert_eq!(db.root(), target_root);
1700            assert_eq!(db.ops_root(), target_ops_root);
1701            assert_eq!(db.get(&key0).await.unwrap(), Some(target_key0));
1702            assert_eq!(db.get(&key1).await.unwrap(), target_key1);
1703
1704            db.commit().await.unwrap();
1705            drop(db);
1706
1707            let reopened: UnorderedVariableDb = UnorderedVariableDb::init(
1708                context.with_label("reopen_small_delta"),
1709                variable_config::<OneCap>(partition, &context),
1710            )
1711            .await
1712            .unwrap();
1713            assert_eq!(reopened.bounds().await.end, target_size);
1714            assert_eq!(reopened.root(), target_root);
1715            assert_eq!(reopened.ops_root(), target_ops_root);
1716            assert_eq!(reopened.get(&key0).await.unwrap(), Some(target_key0));
1717            assert_eq!(reopened.get(&key1).await.unwrap(), target_key1);
1718
1719            reopened.destroy().await.unwrap();
1720        });
1721    }
1722
1723    #[test_traced("INFO")]
1724    fn test_current_rewind_pruned_target_errors() {
1725        let executor = deterministic::Runner::default();
1726        executor.start(|context| async move {
1727            const KEYS: u64 = 384;
1728
1729            let partition = "current-rewind-pruned";
1730            let ctx = context.with_label("db");
1731            let mut db: UnorderedVariableDb =
1732                UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>(partition, &ctx))
1733                    .await
1734                    .unwrap();
1735
1736            let first_range = commit_writes_with_metadata(
1737                &mut db,
1738                (0..KEYS).map(|i| (key(i), Some(val(i)))),
1739                None,
1740            )
1741            .await;
1742            commit_writes_with_metadata(
1743                &mut db,
1744                (0..KEYS).map(|i| (key(i), Some(val(1000 + i)))),
1745                None,
1746            )
1747            .await;
1748
1749            db.prune(db.inactivity_floor_loc()).await.unwrap();
1750            let pruned_bits = db.pruned_bits();
1751            assert!(
1752                pruned_bits > *first_range.start,
1753                "expected bitmap pruning boundary above rewind target: pruned_bits={pruned_bits}, target={:?}",
1754                first_range.start
1755            );
1756
1757            let oldest_retained = db.bounds().await.start;
1758            let boundary_err = db.rewind(oldest_retained).await.unwrap_err();
1759            assert!(
1760                matches!(
1761                    boundary_err,
1762                    Error::Journal(crate::journal::Error::ItemPruned(_))
1763                ),
1764                "unexpected rewind error at retained boundary: {boundary_err:?}"
1765            );
1766
1767            let expected_pruned_loc = *first_range.start - 1;
1768            let err = db.rewind(first_range.start).await.unwrap_err();
1769            assert!(
1770                matches!(
1771                    err,
1772                    Error::Journal(crate::journal::Error::ItemPruned(loc))
1773                    if loc == expected_pruned_loc
1774                ),
1775                "unexpected rewind error: {err:?}"
1776            );
1777
1778            db.destroy().await.unwrap();
1779        });
1780    }
1781
1782    #[test_traced("INFO")]
1783    fn test_current_rewind_rejects_target_below_bitmap_floor() {
1784        let executor = deterministic::Runner::default();
1785        executor.start(|context| async move {
1786            const COMMITS: u64 = 96;
1787
1788            let partition = "current-rewind-bitmap-floor";
1789            let ctx = context.with_label("db");
1790            let mut db: UnorderedVariableDb =
1791                UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>(partition, &ctx))
1792                    .await
1793                    .unwrap();
1794
1795            let mut history = Vec::new();
1796            for round in 0..COMMITS {
1797                commit_writes_with_metadata(
1798                    &mut db,
1799                    [(key(0), Some(val(10_000 + round)))],
1800                    None,
1801                )
1802                .await;
1803                history.push((db.bounds().await.end, db.inactivity_floor_loc()));
1804            }
1805            assert!(db.inactivity_floor_loc() > Location::new(64));
1806
1807            // Intentionally prune less than the inactivity floor: log retains older ops, but the
1808            // bitmap still prunes to inactivity floor.
1809            let prune_loc = Location::new(1);
1810            db.prune(prune_loc).await.unwrap();
1811            let pruned_bits = db.pruned_bits();
1812            assert!(pruned_bits > 0);
1813            let retained_start = db.bounds().await.start;
1814
1815            // Pick a historical commit that is still within retained log bounds but whose floor is
1816            // below the bitmap pruning boundary.
1817            let rewind_target = history
1818                .iter()
1819                .find_map(|(size, floor)| {
1820                    if *size > *retained_start
1821                        && *size >= pruned_bits
1822                        && *floor >= *retained_start
1823                        && *floor < pruned_bits
1824                    {
1825                        Some(*size)
1826                    } else {
1827                        None
1828                    }
1829                })
1830                .unwrap_or_else(|| {
1831                    panic!(
1832                        "expected rewind target below bitmap boundary. retained_start={retained_start:?}, pruned_bits={pruned_bits}, latest_floor={:?}, history={history:?}",
1833                        db.inactivity_floor_loc()
1834                    )
1835                });
1836
1837            let err = db.rewind(rewind_target).await.unwrap_err();
1838            assert!(
1839                matches!(err, Error::Journal(crate::journal::Error::ItemPruned(_))),
1840                "unexpected rewind error: {err:?}"
1841            );
1842
1843            db.destroy().await.unwrap();
1844        });
1845    }
1846
1847    /// Verify that the speculative canonical root from a merkleized batch matches the root
1848    /// recomputed from committed state after sync + reopen.
1849    ///
1850    /// Uses enough operations to cross a chunk boundary (CHUNK_SIZE_BITS = N*8), which exercises
1851    /// the grafted root computation for newly completed chunks.
1852    pub fn test_speculative_root_matches_committed<M, C, F, Fut>(mut open_db: F)
1853    where
1854        M: merkle::Graftable + 'static,
1855        C: DbAny<M> + 'static,
1856        C::Key: TestKey,
1857        <C as DbAny<M>>::Value: TestValue,
1858        F: FnMut(Context, String) -> Fut + Clone,
1859        Fut: Future<Output = C>,
1860    {
1861        let executor = deterministic::Runner::default();
1862        let mut open_db_clone = open_db.clone();
1863        executor.start(|context| async move {
1864            let partition = "speculative-root".to_string();
1865
1866            // Write enough operations to cross a chunk boundary. With N=32 (CHUNK_SIZE_BITS=256),
1867            // 260 writes + 1 CommitFloor = 261 operations, completing one chunk with 5 ops in the
1868            // next partial chunk. This ensures the grafted root computation must handle the
1869            // newly completed chunk.
1870            let mut db: C = open_db_clone(context.with_label("init"), partition.clone()).await;
1871            let mut batch = db.new_batch();
1872            for i in 0..260 {
1873                batch = batch.write(TestKey::from_seed(i), Some(TestValue::from_seed(i + 1000)));
1874            }
1875            let merkleized = batch.merkleize(&db, None).await.unwrap();
1876            db.apply_batch(merkleized).await.unwrap();
1877            let speculative_root = db.root();
1878
1879            // Sync, close, and reopen to get the root recomputed from committed state.
1880            db.sync().await.unwrap();
1881            drop(db);
1882
1883            let db: C = open_db(context.with_label("reopen"), partition).await;
1884            assert_eq!(db.root(), speculative_root);
1885
1886            db.destroy().await.unwrap();
1887        });
1888    }
1889
1890    #[test_traced("INFO")]
1891    fn test_all_variants_speculative_root_matches_committed() {
1892        let executor = deterministic::Runner::default();
1893        executor.start(|_context| async move {
1894            for_all_variants!(simple: test_speculative_root_matches_committed);
1895        });
1896    }
1897
1898    /// MerkleizedBatch::get() at the current level reads overlay then base DB.
1899    #[test_traced("INFO")]
1900    fn test_current_batch_merkleized_get() {
1901        let executor = deterministic::Runner::default();
1902        executor.start(|context| async move {
1903            let ctx = context.with_label("db");
1904            let mut db: UnorderedVariableDb =
1905                UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>("mg", &ctx))
1906                    .await
1907                    .unwrap();
1908
1909            let ka = key(0);
1910            let kb = key(1);
1911            let kc = key(2);
1912
1913            // Pre-populate A.
1914            {
1915                let mut batch = db.new_batch();
1916                batch = batch.write(ka, Some(val(0)));
1917                let merkleized = batch.merkleize(&db, None).await.unwrap();
1918                db.apply_batch(merkleized).await.unwrap();
1919            }
1920
1921            // Batch: update A, delete nothing, create B.
1922            let va2 = val(100);
1923            let vb = val(1);
1924            let mut batch = db.new_batch();
1925            batch = batch.write(ka, Some(va2));
1926            batch = batch.write(kb, Some(vb));
1927            let merkleized = batch.merkleize(&db, None).await.unwrap();
1928
1929            assert_eq!(merkleized.get(&ka, &db).await.unwrap(), Some(va2));
1930            assert_eq!(merkleized.get(&kb, &db).await.unwrap(), Some(vb));
1931            assert_eq!(merkleized.get(&kc, &db).await.unwrap(), None);
1932
1933            db.destroy().await.unwrap();
1934        });
1935    }
1936
1937    /// Batch chaining at the current level: parent -> merkleize -> child -> merkleize.
1938    /// Child's canonical root matches db.root() after apply.
1939    #[test_traced("INFO")]
1940    fn test_current_batch_chaining() {
1941        let executor = deterministic::Runner::default();
1942        executor.start(|context| async move {
1943            let ctx = context.with_label("db");
1944            let mut db: UnorderedVariableDb =
1945                UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>("ch", &ctx))
1946                    .await
1947                    .unwrap();
1948
1949            // Parent batch writes keys 0..5.
1950            let mut parent = db.new_batch();
1951            for i in 0..5 {
1952                parent = parent.write(key(i), Some(val(i)));
1953            }
1954            let parent_m = parent.merkleize(&db, None).await.unwrap();
1955
1956            // Child batch writes keys 5..10 and overrides key 0.
1957            let mut child = parent_m.new_batch::<Sha256>();
1958            for i in 5..10 {
1959                child = child.write(key(i), Some(val(i)));
1960            }
1961            child = child.write(key(0), Some(val(999)));
1962            let child_m = child.merkleize(&db, None).await.unwrap();
1963
1964            let child_root = child_m.root();
1965
1966            // Child get reads through all layers.
1967            assert_eq!(child_m.get(&key(0), &db).await.unwrap(), Some(val(999)));
1968            assert_eq!(child_m.get(&key(3), &db).await.unwrap(), Some(val(3)));
1969            assert_eq!(child_m.get(&key(7), &db).await.unwrap(), Some(val(7)));
1970
1971            db.apply_batch(child_m).await.unwrap();
1972            assert_eq!(db.root(), child_root);
1973
1974            // Verify all keys are correct.
1975            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(999)));
1976            for i in 1..10 {
1977                assert_eq!(db.get(&key(i)).await.unwrap(), Some(val(i)));
1978            }
1979
1980            db.destroy().await.unwrap();
1981        });
1982    }
1983
1984    #[test_traced("INFO")]
1985    fn test_current_unordered_root_matches_between_pending_and_committed_paths() {
1986        let executor = deterministic::Runner::default();
1987        executor.start(|context| async move {
1988            let ctx = context.with_label("db");
1989            let mut db: UnorderedFixedDb =
1990                UnorderedFixedDb::init(ctx.clone(), fixed_config::<OneCap>("ucr", &ctx))
1991                    .await
1992                    .unwrap();
1993            let key_a = colliding_digest(0xAA, 1);
1994            let key_b = colliding_digest(0xAA, 0);
1995
1996            // Seed four colliding committed keys, then update only key_a.
1997            // The specific 4 / 1 / 0 shape is a concrete counterexample:
1998            // key_b remains outside the parent diff and is still resolved
1999            // through the committed snapshot in the child.
2000            let mut initial = db.new_batch();
2001            for i in 0..4 {
2002                initial = initial.write(colliding_digest(0xAA, i), Some(colliding_digest(0xBB, i)));
2003            }
2004            let merkleized = initial.merkleize(&db, None).await.unwrap();
2005            db.apply_batch(merkleized).await.unwrap();
2006            db.commit().await.unwrap();
2007
2008            // Update only key_a so the colliding sibling key_b remains outside
2009            // the parent diff and must still be resolved through the committed
2010            // snapshot in the child.
2011            let parent = db
2012                .new_batch()
2013                .write(key_a, Some(colliding_digest(0xCC, 1)))
2014                .merkleize(&db, None)
2015                .await
2016                .unwrap();
2017
2018            // Build the child while the parent is still pending, then rebuild
2019            // the same logical child after committing the parent and compare
2020            // both canonical and ops roots.
2021            let pending_child = parent
2022                .new_batch::<Sha256>()
2023                .write(key_a, Some(colliding_digest(0xDD, 1)))
2024                .write(key_b, Some(colliding_digest(0xDD, 0)))
2025                .merkleize(&db, None)
2026                .await
2027                .unwrap();
2028
2029            let pending_root = pending_child.root();
2030            let pending_ops_root = pending_child.ops_root();
2031
2032            db.apply_batch(parent).await.unwrap();
2033            db.commit().await.unwrap();
2034
2035            let committed_child = db
2036                .new_batch()
2037                .write(key_a, Some(colliding_digest(0xDD, 1)))
2038                .write(key_b, Some(colliding_digest(0xDD, 0)))
2039                .merkleize(&db, None)
2040                .await
2041                .unwrap();
2042
2043            assert_eq!(pending_root, committed_child.root());
2044            assert_eq!(pending_ops_root, committed_child.ops_root());
2045
2046            // Apply pending child onto the committed parent
2047            // and ensure the applied wrapper roots still match.
2048            db.apply_batch(pending_child).await.unwrap();
2049            assert_eq!(db.root(), committed_child.root());
2050            assert_eq!(db.ops_root(), committed_child.ops_root());
2051
2052            db.destroy().await.unwrap();
2053        });
2054    }
2055
2056    #[test_traced("INFO")]
2057    fn test_current_ordered_root_matches_between_pending_and_committed_paths() {
2058        let executor = deterministic::Runner::default();
2059        executor.start(|context| async move {
2060            let ctx = context.with_label("db");
2061            let mut db: OrderedFixedDb =
2062                OrderedFixedDb::init(ctx.clone(), fixed_config::<OneCap>("ocr", &ctx))
2063                    .await
2064                    .unwrap();
2065            let key_a = colliding_digest(0xAA, 1);
2066            let key_b = colliding_digest(0xAA, 0);
2067
2068            // Match the unordered counterexample shape on the ordered path so
2069            // both wrappers exercise the same collision pattern.
2070            let mut initial = db.new_batch();
2071            for i in 0..4 {
2072                initial = initial.write(colliding_digest(0xAA, i), Some(colliding_digest(0xBB, i)));
2073            }
2074            let merkleized = initial.merkleize(&db, None).await.unwrap();
2075            db.apply_batch(merkleized).await.unwrap();
2076            db.commit().await.unwrap();
2077
2078            // Update only key_a so the colliding sibling key_b remains outside
2079            // the parent diff and must still be resolved through the committed
2080            // snapshot in the child.
2081            let parent = db
2082                .new_batch()
2083                .write(key_a, Some(colliding_digest(0xCC, 1)))
2084                .merkleize(&db, None)
2085                .await
2086                .unwrap();
2087
2088            // Build the child while the parent is still pending, then rebuild
2089            // the same logical child after committing the parent.
2090            let pending_child = parent
2091                .new_batch::<Sha256>()
2092                .write(key_a, Some(colliding_digest(0xDD, 1)))
2093                .write(key_b, Some(colliding_digest(0xDD, 0)))
2094                .merkleize(&db, None)
2095                .await
2096                .unwrap();
2097
2098            let pending_root = pending_child.root();
2099            let pending_ops_root = pending_child.ops_root();
2100
2101            db.apply_batch(parent).await.unwrap();
2102            db.commit().await.unwrap();
2103
2104            let committed_child = db
2105                .new_batch()
2106                .write(key_a, Some(colliding_digest(0xDD, 1)))
2107                .write(key_b, Some(colliding_digest(0xDD, 0)))
2108                .merkleize(&db, None)
2109                .await
2110                .unwrap();
2111
2112            assert_eq!(pending_root, committed_child.root());
2113            assert_eq!(pending_ops_root, committed_child.ops_root());
2114
2115            // Apply pending child onto the committed parent
2116            // and compare the applied wrapper roots with the committed-path child roots.
2117            db.apply_batch(pending_child).await.unwrap();
2118            assert_eq!(db.root(), committed_child.root());
2119            assert_eq!(db.ops_root(), committed_child.ops_root());
2120
2121            db.destroy().await.unwrap();
2122        });
2123    }
2124
2125    /// Applying without `commit()` publishes in memory but is not recovered after reopen.
2126    #[test_traced("INFO")]
2127    fn test_current_batch_apply_requires_commit_for_recovery() {
2128        let executor = deterministic::Runner::default();
2129        executor.start(|context| async move {
2130            let partition = "apply_requires_commit";
2131            let ctx = context.with_label("db");
2132            let mut db: UnorderedVariableDb =
2133                UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>(partition, &ctx))
2134                    .await
2135                    .unwrap();
2136
2137            let committed_root = db.root();
2138
2139            let merkleized = db
2140                .new_batch()
2141                .write(key(0), Some(val(0)))
2142                .merkleize(&db, None)
2143                .await
2144                .unwrap();
2145            db.apply_batch(merkleized).await.unwrap();
2146
2147            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2148
2149            drop(db);
2150
2151            let reopened: UnorderedVariableDb = UnorderedVariableDb::init(
2152                context.with_label("reopen"),
2153                variable_config::<OneCap>(partition, &context),
2154            )
2155            .await
2156            .unwrap();
2157            assert_eq!(reopened.root(), committed_root);
2158            assert_eq!(reopened.get(&key(0)).await.unwrap(), None);
2159
2160            reopened.destroy().await.unwrap();
2161        });
2162    }
2163
2164    /// One-stage pipelining lets the next batch be built while the prior batch commits.
2165    #[test_traced("INFO")]
2166    fn test_current_batch_single_stage_pipeline() {
2167        let executor = deterministic::Runner::default();
2168        executor.start(|context| async move {
2169            let ctx = context.with_label("db");
2170            let mut db: UnorderedVariableDb =
2171                UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>("pipe", &ctx))
2172                    .await
2173                    .unwrap();
2174
2175            let mut batch = db.new_batch();
2176            batch = batch.write(key(0), Some(val(0)));
2177            let parent_merkleized = batch.merkleize(&db, None).await.unwrap();
2178            db.apply_batch(parent_merkleized).await.unwrap();
2179
2180            let (child_merkleized, commit_result) = futures::join!(
2181                async {
2182                    assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2183                    let mut child = db.new_batch();
2184                    child = child.write(key(1), Some(val(1)));
2185                    child.merkleize(&db, None).await.unwrap()
2186                },
2187                db.commit(),
2188            );
2189            commit_result.unwrap();
2190
2191            db.apply_batch(child_merkleized).await.unwrap();
2192            db.commit().await.unwrap();
2193
2194            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2195            assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
2196
2197            db.destroy().await.unwrap();
2198        });
2199    }
2200
2201    /// Apply parent then child sequentially. Both keys
2202    /// present and canonical root matches a fresh single-batch build.
2203    #[test_traced("INFO")]
2204    fn test_current_sequential_commit() {
2205        let executor = deterministic::Runner::default();
2206        executor.start(|context| async move {
2207            let ctx = context.with_label("db");
2208            let mut db: UnorderedVariableDb =
2209                UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>("ff", &ctx))
2210                    .await
2211                    .unwrap();
2212
2213            // Parent batch: insert key(0).
2214            let parent_m = db
2215                .new_batch()
2216                .write(key(0), Some(val(0)))
2217                .merkleize(&db, None)
2218                .await
2219                .unwrap();
2220
2221            // Child batch on parent: insert key(1).
2222            let child_m = parent_m
2223                .new_batch::<Sha256>()
2224                .write(key(1), Some(val(1)))
2225                .merkleize(&db, None)
2226                .await
2227                .unwrap();
2228
2229            db.apply_batch(parent_m).await.unwrap();
2230            db.apply_batch(child_m).await.unwrap();
2231
2232            // Both keys present.
2233            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2234            assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
2235
2236            // Build the same result via two sequential plain batches in a fresh DB
2237            // and verify the roots match.
2238            let ctx2 = context.with_label("db2");
2239            let mut db2: UnorderedVariableDb =
2240                UnorderedVariableDb::init(ctx2.clone(), variable_config::<OneCap>("ff2", &ctx2))
2241                    .await
2242                    .unwrap();
2243            let m1 = db2
2244                .new_batch()
2245                .write(key(0), Some(val(0)))
2246                .merkleize(&db2, None)
2247                .await
2248                .unwrap();
2249            db2.apply_batch(m1).await.unwrap();
2250            let m2 = db2
2251                .new_batch()
2252                .write(key(1), Some(val(1)))
2253                .merkleize(&db2, None)
2254                .await
2255                .unwrap();
2256            db2.apply_batch(m2).await.unwrap();
2257
2258            assert_eq!(db.root(), db2.root());
2259
2260            db.destroy().await.unwrap();
2261            db2.destroy().await.unwrap();
2262        });
2263    }
2264
2265    /// to_batch() produces a MerkleizedBatch that can be used to chain further
2266    /// batches via new_batch().
2267    #[test_traced("INFO")]
2268    fn test_current_to_batch_then_chain() {
2269        let executor = deterministic::Runner::default();
2270        executor.start(|context| async move {
2271            let ctx = context.with_label("db");
2272            let mut db: UnorderedVariableDb =
2273                UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>("tb", &ctx))
2274                    .await
2275                    .unwrap();
2276
2277            // Apply an initial batch.
2278            let m = db
2279                .new_batch()
2280                .write(key(0), Some(val(0)))
2281                .merkleize(&db, None)
2282                .await
2283                .unwrap();
2284            db.apply_batch(m).await.unwrap();
2285
2286            // Get an owned batch from the committed state.
2287            let snapshot = db.to_batch();
2288            assert_eq!(snapshot.root(), db.root());
2289
2290            // Chain a child batch from the snapshot.
2291            let child = snapshot
2292                .new_batch::<Sha256>()
2293                .write(key(1), Some(val(1)))
2294                .merkleize(&db, None)
2295                .await
2296                .unwrap();
2297
2298            // The child's root should differ from the snapshot.
2299            assert_ne!(child.root(), snapshot.root());
2300
2301            // Apply child.
2302            db.apply_batch(child).await.unwrap();
2303            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2304            assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
2305
2306            db.destroy().await.unwrap();
2307        });
2308    }
2309
2310    /// flatten() is a no-op on a freshly initialized DB (no layers to collapse).
2311    #[test_traced("INFO")]
2312    fn test_flatten_noop_on_fresh_db() {
2313        let executor = deterministic::Runner::default();
2314        executor.start(|context| async move {
2315            let ctx = context.with_label("db");
2316            let mut db: UnorderedVariableDb =
2317                UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>("fl-noop", &ctx))
2318                    .await
2319                    .unwrap();
2320
2321            let root_before = db.root();
2322            db.flatten();
2323            assert_eq!(db.root(), root_before);
2324
2325            db.destroy().await.unwrap();
2326        });
2327    }
2328
2329    /// flatten() preserves the root and data after multiple apply_batch calls.
2330    #[test_traced("INFO")]
2331    fn test_flatten_preserves_root_after_batches() {
2332        let executor = deterministic::Runner::default();
2333        executor.start(|context| async move {
2334            let ctx = context.with_label("db");
2335            let mut db: UnorderedVariableDb =
2336                UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>("fl-root", &ctx))
2337                    .await
2338                    .unwrap();
2339
2340            // Apply several batches to accumulate layers.
2341            for i in 0u64..5 {
2342                let m = db
2343                    .new_batch()
2344                    .write(key(i), Some(val(i)))
2345                    .merkleize(&db, None)
2346                    .await
2347                    .unwrap();
2348                db.apply_batch(m).await.unwrap();
2349            }
2350
2351            let root_before = db.root();
2352            db.flatten();
2353            assert_eq!(db.root(), root_before);
2354
2355            // Data is still readable.
2356            for i in 0u64..5 {
2357                assert_eq!(db.get(&key(i)).await.unwrap(), Some(val(i)));
2358            }
2359
2360            db.destroy().await.unwrap();
2361        });
2362    }
2363
2364    /// flatten() is idempotent: a second call is a no-op.
2365    #[test_traced("INFO")]
2366    fn test_flatten_idempotent() {
2367        let executor = deterministic::Runner::default();
2368        executor.start(|context| async move {
2369            let ctx = context.with_label("db");
2370            let mut db: UnorderedVariableDb =
2371                UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>("fl-idem", &ctx))
2372                    .await
2373                    .unwrap();
2374
2375            let m = db
2376                .new_batch()
2377                .write(key(0), Some(val(0)))
2378                .merkleize(&db, None)
2379                .await
2380                .unwrap();
2381            db.apply_batch(m).await.unwrap();
2382
2383            db.flatten();
2384            let root_after_first = db.root();
2385
2386            db.flatten();
2387            assert_eq!(db.root(), root_after_first);
2388
2389            db.destroy().await.unwrap();
2390        });
2391    }
2392
2393    /// New batches built after flatten() produce correct roots and can be applied.
2394    #[test_traced("INFO")]
2395    fn test_flatten_then_new_batch() {
2396        let executor = deterministic::Runner::default();
2397        executor.start(|context| async move {
2398            let ctx = context.with_label("db");
2399            let mut db: UnorderedVariableDb =
2400                UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>("fl-then", &ctx))
2401                    .await
2402                    .unwrap();
2403
2404            // Apply a batch, flatten, then apply another.
2405            let m = db
2406                .new_batch()
2407                .write(key(0), Some(val(0)))
2408                .merkleize(&db, None)
2409                .await
2410                .unwrap();
2411            db.apply_batch(m).await.unwrap();
2412            db.flatten();
2413
2414            let m = db
2415                .new_batch()
2416                .write(key(1), Some(val(1)))
2417                .merkleize(&db, None)
2418                .await
2419                .unwrap();
2420            db.apply_batch(m).await.unwrap();
2421
2422            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2423            assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
2424
2425            db.destroy().await.unwrap();
2426        });
2427    }
2428
2429    /// Regression: applying a batch after its ancestor Arc is dropped (without
2430    /// committing) must still apply the ancestor's bitmap pushes/clears and
2431    /// snapshot diffs.
2432    #[test_traced("WARN")]
2433    fn test_current_apply_after_ancestor_dropped() {
2434        let executor = deterministic::Runner::default();
2435        executor.start(|context| async move {
2436            let ctx = context.with_label("db");
2437            let mut db: UnorderedVariableDb =
2438                UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>("adrop", &ctx))
2439                    .await
2440                    .unwrap();
2441
2442            // Chain: DB <- A <- B <- C
2443            let mut a = db.new_batch();
2444            for i in 0..3 {
2445                a = a.write(key(i), Some(val(i)));
2446            }
2447            let a_m = a.merkleize(&db, None).await.unwrap();
2448
2449            let mut b = a_m.new_batch::<Sha256>();
2450            for i in 3..6 {
2451                b = b.write(key(i), Some(val(i)));
2452            }
2453            let b_m = b.merkleize(&db, None).await.unwrap();
2454
2455            let mut c = b_m.new_batch::<Sha256>();
2456            for i in 6..9 {
2457                c = c.write(key(i), Some(val(i)));
2458            }
2459            let c_m = c.merkleize(&db, None).await.unwrap();
2460
2461            // Drop A and B without committing. Their Weak refs in C are now dead.
2462            drop(a_m);
2463            drop(b_m);
2464
2465            // Apply only the tip. This is !skip_ancestors (DB hasn't changed).
2466            db.apply_batch(c_m).await.unwrap();
2467            db.commit().await.unwrap();
2468
2469            // All nine keys must be accessible.
2470            for i in 0..9 {
2471                assert_eq!(
2472                    db.get(&key(i)).await.unwrap(),
2473                    Some(val(i)),
2474                    "key({i}) missing after apply_batch with dropped ancestors"
2475                );
2476            }
2477
2478            db.destroy().await.unwrap();
2479        });
2480    }
2481
2482    /// Regression: applying a 3-deep chain as a single batch must leave the
2483    /// bitmap in the same state as applying the same operations sequentially.
2484    /// This fails if ancestor bitmap pushes are concatenated in the wrong order
2485    /// (tip-to-root instead of root-to-tip), because Delete operations produce
2486    /// false bitmap bits, and wrong ordering puts the false at the wrong
2487    /// position. We detect this by building a NEW batch on top of the
2488    /// (possibly corrupted) bitmap and comparing its root against the
2489    /// sequential path.
2490    #[test_traced("WARN")]
2491    fn test_current_chain_bitmap_order_matches_sequential() {
2492        let executor = deterministic::Runner::default();
2493        executor.start(|context| async move {
2494            // -- Path 1: build a 3-deep chain and apply the tip directly. --
2495            let ctx1 = context.with_label("db1");
2496            let mut db1: UnorderedVariableDb =
2497                UnorderedVariableDb::init(ctx1.clone(), variable_config::<OneCap>("ord1", &ctx1))
2498                    .await
2499                    .unwrap();
2500
2501            // Seed some committed data so there's a base bitmap to clear.
2502            commit_writes_with_metadata(
2503                &mut db1,
2504                [(key(10), Some(val(10))), (key(11), Some(val(11)))],
2505                None,
2506            )
2507            .await;
2508
2509            // Chain: DB <- A <- B <- C
2510            // A: updates key(10) and DELETES key(11). The delete produces a
2511            //    false bitmap bit. If A's bits end up at B's positions (wrong
2512            //    order), the false bit lands at the wrong journal location.
2513            // B: updates key(12) and key(13). All true bits.
2514            // C: updates key(14). All true bits.
2515            let a = db1
2516                .new_batch()
2517                .write(key(10), Some(val(100)))
2518                .write(key(11), None) // DELETE
2519                .merkleize(&db1, None)
2520                .await
2521                .unwrap();
2522
2523            let b = a
2524                .new_batch::<Sha256>()
2525                .write(key(12), Some(val(120)))
2526                .write(key(13), Some(val(130)))
2527                .merkleize(&db1, None)
2528                .await
2529                .unwrap();
2530
2531            let c = b
2532                .new_batch::<Sha256>()
2533                .write(key(14), Some(val(140)))
2534                .merkleize(&db1, None)
2535                .await
2536                .unwrap();
2537
2538            db1.apply_batch(c).await.unwrap();
2539            db1.commit().await.unwrap();
2540
2541            // Build one more batch on top to exercise the bitmap state.
2542            let d1 = db1
2543                .new_batch()
2544                .write(key(20), Some(val(200)))
2545                .merkleize(&db1, None)
2546                .await
2547                .unwrap();
2548            let chain_then_d_root = d1.root();
2549
2550            // -- Path 2: apply the same operations sequentially. --
2551            let ctx2 = context.with_label("db2");
2552            let mut db2: UnorderedVariableDb =
2553                UnorderedVariableDb::init(ctx2.clone(), variable_config::<OneCap>("ord2", &ctx2))
2554                    .await
2555                    .unwrap();
2556
2557            commit_writes_with_metadata(
2558                &mut db2,
2559                [(key(10), Some(val(10))), (key(11), Some(val(11)))],
2560                None,
2561            )
2562            .await;
2563
2564            let a2 = db2
2565                .new_batch()
2566                .write(key(10), Some(val(100)))
2567                .write(key(11), None)
2568                .merkleize(&db2, None)
2569                .await
2570                .unwrap();
2571            db2.apply_batch(a2).await.unwrap();
2572            db2.commit().await.unwrap();
2573
2574            let b2 = db2
2575                .new_batch()
2576                .write(key(12), Some(val(120)))
2577                .write(key(13), Some(val(130)))
2578                .merkleize(&db2, None)
2579                .await
2580                .unwrap();
2581            db2.apply_batch(b2).await.unwrap();
2582            db2.commit().await.unwrap();
2583
2584            let c2 = db2
2585                .new_batch()
2586                .write(key(14), Some(val(140)))
2587                .merkleize(&db2, None)
2588                .await
2589                .unwrap();
2590            db2.apply_batch(c2).await.unwrap();
2591            db2.commit().await.unwrap();
2592
2593            let d2 = db2
2594                .new_batch()
2595                .write(key(20), Some(val(200)))
2596                .merkleize(&db2, None)
2597                .await
2598                .unwrap();
2599            let sequential_then_d_root = d2.root();
2600
2601            assert_eq!(
2602                chain_then_d_root, sequential_then_d_root,
2603                "batch D's root on top of chain-applied state must match sequential state"
2604            );
2605
2606            db1.destroy().await.unwrap();
2607            db2.destroy().await.unwrap();
2608        });
2609    }
2610
2611    /// Regression: C's precomputed bitmap clears can target a chunk that
2612    /// was pruned after parent P was committed.
2613    ///
2614    /// With N=32, CHUNK_SIZE_BITS=256. Seed places key(0) at loc 255
2615    /// (end of chunk 0). P overwrites keys 1..254, whose floor raise
2616    /// moves key(0) from 255 to tip, pushing the floor past chunk 0.
2617    /// C is built from P and writes key(0); its base_old_loc is 255.
2618    /// After committing P and pruning chunk 0, C's clear at 255 targets
2619    /// the pruned chunk.
2620    #[test_traced("WARN")]
2621    fn test_current_stale_bitmap_clears_after_prune() {
2622        let executor = deterministic::Runner::default();
2623        executor.start(|context| async move {
2624            let ctx = context.with_label("db");
2625            let mut db: UnorderedVariableDb = UnorderedVariableDb::init(
2626                ctx.clone(),
2627                variable_config::<OneCap>("stale-clears", &ctx),
2628            )
2629            .await
2630            .unwrap();
2631
2632            // Seed: 255 keys in one batch. key(0) lands at loc 255 (chunk 0).
2633            let mut seed = db.new_batch();
2634            for i in 0u64..255 {
2635                seed = seed.write(key(i), Some(val(i)));
2636            }
2637            let seed_m = seed.merkleize(&db, None).await.unwrap();
2638            db.apply_batch(seed_m).await.unwrap();
2639            db.commit().await.unwrap();
2640
2641            // P: overwrite keys 1..254. Does NOT touch key(0), but P's floor
2642            // raise moves key(0) from 255, advancing the floor past chunk 0.
2643            let mut p = db.new_batch();
2644            for i in 1u64..255 {
2645                p = p.write(key(i), Some(val(i + 10000)));
2646            }
2647            let p_m = p.merkleize(&db, None).await.unwrap();
2648
2649            // C: built from P. Writes key(0). base_old_loc = 255 (chunk 0).
2650            let c_m = p_m
2651                .new_batch::<Sha256>()
2652                .write(key(0), Some(val(9999)))
2653                .merkleize(&db, None)
2654                .await
2655                .unwrap();
2656
2657            // Commit P, prune chunk 0, then apply C.
2658            db.apply_batch(p_m).await.unwrap();
2659            db.commit().await.unwrap();
2660
2661            let floor = *db.inactivity_floor_loc();
2662            assert!(floor >= 256, "floor must be past chunk 0: floor={floor}",);
2663
2664            db.prune(db.inactivity_floor_loc()).await.unwrap();
2665            db.apply_batch(c_m).await.unwrap();
2666            db.flatten();
2667
2668            db.destroy().await.unwrap();
2669        });
2670    }
2671
2672    /// Apply C (grandchild of A) after only A is committed. B's data (any-layer
2673    /// snapshot diff + current-layer bitmap) must still be applied.
2674    #[test_traced("INFO")]
2675    fn test_current_partial_ancestor_commit() {
2676        let executor = deterministic::Runner::default();
2677        executor.start(|context| async move {
2678            let ctx = context.with_label("db");
2679            let mut db: UnorderedVariableDb =
2680                UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>("pac", &ctx))
2681                    .await
2682                    .unwrap();
2683
2684            let a = db
2685                .new_batch()
2686                .write(key(0), Some(val(0)))
2687                .merkleize(&db, None)
2688                .await
2689                .unwrap();
2690            let b = a
2691                .new_batch::<Sha256>()
2692                .write(key(1), Some(val(1)))
2693                .merkleize(&db, None)
2694                .await
2695                .unwrap();
2696            let c = b
2697                .new_batch::<Sha256>()
2698                .write(key(2), Some(val(2)))
2699                .merkleize(&db, None)
2700                .await
2701                .unwrap();
2702
2703            let expected_root = c.root();
2704
2705            db.apply_batch(a).await.unwrap();
2706            db.apply_batch(c).await.unwrap();
2707
2708            assert_eq!(db.root(), expected_root);
2709            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2710            assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
2711            assert_eq!(db.get(&key(2)).await.unwrap(), Some(val(2)));
2712
2713            db.destroy().await.unwrap();
2714        });
2715    }
2716
2717    /// Regression: bitmap ancestor skip logic must correctly pair each ancestor's
2718    /// bitmap data with its batch_end. Requires a 3-ancestor chain (A->B->C->D)
2719    /// to expose ordering bugs.
2720    #[test_traced("INFO")]
2721    fn test_current_partial_ancestor_bitmap_ordering() {
2722        let executor = deterministic::Runner::default();
2723        executor.start(|context| async move {
2724            let ctx = context.with_label("db");
2725            let mut db: UnorderedVariableDb =
2726                UnorderedVariableDb::init(ctx.clone(), variable_config::<OneCap>("bmo", &ctx))
2727                    .await
2728                    .unwrap();
2729
2730            // Build A -> B -> C -> D. Each writes a distinct key.
2731            let a = db
2732                .new_batch()
2733                .write(key(0), Some(val(0)))
2734                .merkleize(&db, None)
2735                .await
2736                .unwrap();
2737            let b = a
2738                .new_batch::<Sha256>()
2739                .write(key(1), Some(val(1)))
2740                .merkleize(&db, None)
2741                .await
2742                .unwrap();
2743            let c = b
2744                .new_batch::<Sha256>()
2745                .write(key(2), Some(val(2)))
2746                .merkleize(&db, None)
2747                .await
2748                .unwrap();
2749            let d = c
2750                .new_batch::<Sha256>()
2751                .write(key(3), Some(val(3)))
2752                .merkleize(&db, None)
2753                .await
2754                .unwrap();
2755
2756            // Apply A only, then apply D (B and C uncommitted).
2757            // D has 3 ancestors: [C, B, A] (parent-first) with batch_ends [C.total, B.total, A.total].
2758            // Bitmap ancestors are also parent-first: [C, B, A].
2759            db.apply_batch(a).await.unwrap();
2760            db.apply_batch(d.clone()).await.unwrap();
2761
2762            // Build a new batch E on top of the current state. If the bitmap was
2763            // corrupted by the ordering bug (A's pushes duplicated or B/C's pushes
2764            // missing), merkleize will compute a different root than a reference
2765            // that applied all ancestors sequentially.
2766            let e = db
2767                .new_batch()
2768                .write(key(4), Some(val(4)))
2769                .merkleize(&db, None)
2770                .await
2771                .unwrap();
2772            db.apply_batch(e).await.unwrap();
2773
2774            // Reference: apply all five sequentially.
2775            let ref_ctx = context.with_label("ref");
2776            let mut ref_db: UnorderedVariableDb = UnorderedVariableDb::init(
2777                ref_ctx.clone(),
2778                variable_config::<OneCap>("bmo_ref", &ref_ctx),
2779            )
2780            .await
2781            .unwrap();
2782            for i in 0..5 {
2783                let batch = ref_db
2784                    .new_batch()
2785                    .write(key(i), Some(val(i)))
2786                    .merkleize(&ref_db, None)
2787                    .await
2788                    .unwrap();
2789                ref_db.apply_batch(batch).await.unwrap();
2790            }
2791
2792            assert_eq!(
2793                db.root(),
2794                ref_db.root(),
2795                "root mismatch: bitmap ordering bug"
2796            );
2797
2798            db.destroy().await.unwrap();
2799            ref_db.destroy().await.unwrap();
2800        });
2801    }
2802}