Skip to main content

commonware_storage/qmdb/any/
mod.rs

1//! An _Any_ authenticated database provides succinct proofs of any value ever associated with a
2//! key.
3//!
4//! The specific variants provided within this module include:
5//! - Unordered: The database does not maintain or require any ordering over the key space.
6//!   - Fixed-size values
7//!   - Variable-size values
8//! - Ordered: The database maintains a total order over active keys.
9//!   - Fixed-size values
10//!   - Variable-size values
11//!
12//! # Examples
13//!
14//! ```ignore
15//! // 1. Create a batch and apply it.
16//! let batch = db.new_batch()
17//!     .write(key, Some(value))    // upsert
18//!     .write(other_key, None)     // delete
19//!     .merkleize(&db, None).await?;
20//! let root = batch.root();        // speculative root
21//! db.apply_batch(batch).await?;
22//! db.commit().await?;             // flush to disk
23//! ```
24//!
25//! ```ignore
26//! // 2. Fork two batches from the same parent. Apply one; the other is stale.
27//! let parent = db.new_batch().write(k1, Some(v1)).merkleize(&db, None).await?;
28//! let fork_a = parent.new_batch::<Sha256>().write(k2, Some(v2)).merkleize(&db, None).await?;
29//! let fork_b = parent.new_batch::<Sha256>().write(k3, Some(v3)).merkleize(&db, None).await?;
30//!
31//! db.apply_batch(fork_a).await?;                 // OK -- includes parent
32//! assert!(db.apply_batch(fork_b).await.is_err()); // StaleBatch
33//! ```
34//!
35//! ```ignore
36//! // 3. Chain two batches. Apply parent first, then child.
37//! let parent = db.new_batch().write(k1, Some(v1)).merkleize(&db, None).await?;
38//! let child = parent.new_batch::<Sha256>().write(k2, Some(v2)).merkleize(&db, None).await?;
39//!
40//! db.apply_batch(parent).await?;           // apply parent
41//! db.apply_batch(child).await?;            // ancestors skipped automatically
42//! db.commit().await?;
43//! ```
44//!
45//! ```ignore
46//! // 4. Chain two batches. Apply child directly (includes parent's changes).
47//! let parent = db.new_batch().write(k1, Some(v1)).merkleize(&db, None).await?;
48//! let child = parent.new_batch::<Sha256>().write(k2, Some(v2)).merkleize(&db, None).await?;
49//!
50//! db.apply_batch(child).await?;                  // OK -- includes parent
51//! assert!(db.apply_batch(parent).await.is_err()); // StaleBatch
52//! ```
53//!
54//! ```ignore
55//! // 5. Two independent chains. Commit the tail of one; the other chain is stale.
56//! let a1 = db.new_batch().write(k1, Some(v1)).merkleize(&db, None).await?;
57//! let a2 = a1.new_batch::<Sha256>().write(k2, Some(v2)).merkleize(&db, None).await?;
58//!
59//! let b1 = db.new_batch().write(k3, Some(v3)).merkleize(&db, None).await?;
60//! let b2 = b1.new_batch::<Sha256>().write(k4, Some(v4)).merkleize(&db, None).await?;
61//!
62//! db.apply_batch(a2).await?;                 // OK -- includes a1
63//! assert!(db.apply_batch(b2).await.is_err()); // StaleBatch
64//! ```
65
66use crate::{
67    index::Factory as IndexFactory,
68    journal::{
69        authenticated::Inner,
70        contiguous::{fixed::Config as FConfig, variable::Config as VConfig},
71    },
72    merkle::{full::Config as MerkleConfig, Family, Location},
73    qmdb::{
74        any::operation::{Operation, Update},
75        bitmap::Shared,
76        operation::Committable,
77        ROOT_BAGGING,
78    },
79    translator::Translator,
80    Context,
81};
82use commonware_codec::CodecShared;
83use commonware_cryptography::Hasher;
84use commonware_parallel::Strategy;
85use std::sync::Arc;
86use tracing::warn;
87
88pub mod batch;
89pub mod db;
90pub mod operation;
91#[cfg(any(test, feature = "test-traits"))]
92pub mod traits;
93pub mod value;
94pub use value::{FixedValue, ValueEncoding, VariableValue};
95pub mod ordered;
96pub(crate) mod sync;
97pub mod unordered;
98
99pub(crate) const BITMAP_CHUNK_BYTES: usize = 64;
100
101/// Configuration for an `Any` authenticated db.
102#[derive(Clone)]
103pub struct Config<T: Translator, J, S: Strategy> {
104    /// Configuration for the Merkle structure backing the authenticated journal.
105    pub merkle_config: MerkleConfig<S>,
106
107    /// Configuration for the operations log journal.
108    pub journal_config: J,
109
110    /// The translator used by the compressed index.
111    pub translator: T,
112}
113
114/// Configuration for an `Any` authenticated db with fixed-size values.
115pub type FixedConfig<T, S> = Config<T, FConfig, S>;
116
117/// Configuration for an `Any` authenticated db with variable-sized values.
118pub type VariableConfig<T, C, S> = Config<T, VConfig<C>, S>;
119
120/// Initialize an `Any` authenticated db from the given config.
121pub async fn init<F, E, U, H, T, I, J, S>(
122    context: E,
123    cfg: Config<T, J::Config, S>,
124) -> Result<db::Db<F, E, J, I, H, U, BITMAP_CHUNK_BYTES, S>, crate::qmdb::Error<F>>
125where
126    F: Family,
127    E: Context,
128    U: Update + Send + Sync,
129    H: Hasher,
130    T: Translator,
131    I: IndexFactory<T, Value = Location<F>>,
132    J: Inner<E, Item = Operation<F, U>>,
133    S: Strategy,
134    Operation<F, U>: Committable + CodecShared,
135{
136    init_with_bitmap::<F, E, U, H, T, I, J, S, BITMAP_CHUNK_BYTES>(context, cfg, None).await
137}
138
139/// Like [`init`] but accepts a pre-allocated bitmap (used by `current::Db`, which sizes pruned
140/// chunks from grafted metadata). `bitmap = None` allocates internally.
141pub(crate) async fn init_with_bitmap<F, E, U, H, T, I, J, S, const N: usize>(
142    context: E,
143    cfg: Config<T, J::Config, S>,
144    bitmap: Option<Arc<Shared<N>>>,
145) -> Result<db::Db<F, E, J, I, H, U, N, S>, crate::qmdb::Error<F>>
146where
147    F: Family,
148    E: Context,
149    U: Update + Send + Sync,
150    H: Hasher,
151    T: Translator,
152    I: IndexFactory<T, Value = Location<F>>,
153    J: Inner<E, Item = Operation<F, U>>,
154    S: Strategy,
155    Operation<F, U>: Committable + CodecShared,
156{
157    let mut log = J::init::<F, H, S>(
158        context.child("log"),
159        cfg.merkle_config,
160        cfg.journal_config,
161        Operation::is_commit,
162        ROOT_BAGGING,
163    )
164    .await?;
165
166    if log.size().await == 0 {
167        warn!("Authenticated log is empty, initializing new db");
168        let commit_floor = Operation::CommitFloor(None, Location::new(0));
169        log.append(&commit_floor).await?;
170        log.sync().await?;
171    }
172
173    let index = I::new(context.child("index"), cfg.translator);
174    let metrics = db::Metrics::new(context);
175    db::Db::init_from_log(index, log, bitmap, metrics).await
176}
177
178#[cfg(test)]
179// pub(crate) so qmdb/current can use the generic tests.
180pub(crate) mod test {
181    use super::*;
182    use crate::{
183        journal::contiguous::{fixed::Config as FConfig, variable::Config as VConfig},
184        qmdb::{
185            self,
186            any::{FixedConfig, MerkleConfig, VariableConfig},
187        },
188        translator::OneCap,
189    };
190    use commonware_codec::{Codec, CodecShared};
191    use commonware_cryptography::{sha256::Digest, Hasher, Sha256};
192    use commonware_runtime::{
193        buffer::paged::CacheRef, deterministic::Context, BufferPooler, Supervisor as _,
194    };
195    use commonware_utils::{NZUsize, NZU16, NZU64};
196    use core::{future::Future, pin::Pin};
197    use std::{
198        collections::HashMap,
199        num::{NonZeroU16, NonZeroUsize},
200    };
201
202    pub(crate) fn colliding_digest(prefix: u8, suffix: u64) -> Digest {
203        let mut bytes = [0u8; 32];
204        bytes[0] = prefix;
205        bytes[24..].copy_from_slice(&suffix.to_be_bytes());
206        Digest::from(bytes)
207    }
208
209    // Janky page & cache sizes to exercise boundary conditions.
210    const PAGE_SIZE: NonZeroU16 = NZU16!(101);
211    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(11);
212
213    pub(crate) fn fixed_db_config<T: Translator + Default>(
214        suffix: &str,
215        pooler: &impl BufferPooler,
216    ) -> FixedConfig<T, Sequential> {
217        let page_cache = CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE);
218        FixedConfig {
219            merkle_config: MerkleConfig {
220                journal_partition: format!("journal-{suffix}"),
221                metadata_partition: format!("metadata-{suffix}"),
222                items_per_blob: NZU64!(11),
223                write_buffer: NZUsize!(1024),
224                strategy: Sequential,
225                page_cache: page_cache.clone(),
226            },
227            journal_config: FConfig {
228                partition: format!("log-journal-{suffix}"),
229                items_per_blob: NZU64!(7),
230                page_cache,
231                write_buffer: NZUsize!(1024),
232            },
233            translator: T::default(),
234        }
235    }
236
237    pub(crate) fn variable_db_config<T: Translator + Default>(
238        suffix: &str,
239        pooler: &impl BufferPooler,
240    ) -> VariableConfig<T, ((), ()), Sequential> {
241        let page_cache = CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE);
242        VariableConfig {
243            merkle_config: MerkleConfig {
244                journal_partition: format!("journal-{suffix}"),
245                metadata_partition: format!("metadata-{suffix}"),
246                items_per_blob: NZU64!(11),
247                write_buffer: NZUsize!(1024),
248                strategy: Sequential,
249                page_cache: page_cache.clone(),
250            },
251            journal_config: VConfig {
252                partition: format!("log-journal-{suffix}"),
253                items_per_section: NZU64!(7),
254                compression: None,
255                codec_config: ((), ()),
256                page_cache,
257                write_buffer: NZUsize!(1024),
258            },
259            translator: T::default(),
260        }
261    }
262
263    use crate::{
264        index::Unordered as UnorderedIndex,
265        journal::contiguous::Mutable,
266        merkle::mmr,
267        qmdb::any::{
268            db::Db as AnyDb,
269            operation::{update::Update as UpdateTrait, Operation as AnyOperation},
270            traits::{DbAny, Provable, UnmerkleizedBatch as _},
271        },
272    };
273
274    type Error = crate::qmdb::Error<mmr::Family>;
275    type Location = mmr::Location;
276
277    pub(crate) trait RewindableDb {
278        fn rewind_to_size(
279            &mut self,
280            size: Location,
281        ) -> impl Future<Output = Result<(), Error>> + Send;
282    }
283
284    impl<E, U, C, I, H, const N: usize, S> RewindableDb for AnyDb<mmr::Family, E, C, I, H, U, N, S>
285    where
286        E: crate::Context,
287        U: UpdateTrait,
288        C: Mutable<Item = AnyOperation<mmr::Family, U>>,
289        I: UnorderedIndex<Value = Location>,
290        H: Hasher,
291        AnyOperation<mmr::Family, U>: Codec,
292        S: Strategy,
293    {
294        async fn rewind_to_size(&mut self, size: Location) -> Result<(), Error> {
295            self.rewind(size).await?;
296            Ok(())
297        }
298    }
299
300    /// Test recovery on non-empty db.
301    pub(crate) async fn test_any_db_non_empty_recovery<F: Family, D, V: Clone + CodecShared>(
302        context: Context,
303        mut db: D,
304        reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
305        make_value: impl Fn(u64) -> V,
306    ) where
307        D: DbAny<F, Key = Digest, Value = V, Digest = Digest>,
308    {
309        const ELEMENTS: u64 = 1000;
310
311        // Commit initial batch.
312        {
313            let mut batch = db.new_batch();
314            for i in 0u64..ELEMENTS {
315                let k = Sha256::hash(&i.to_be_bytes());
316                let v = make_value(i * 1000);
317                batch = batch.write(k, Some(v));
318            }
319            let merkleized = batch.merkleize(&db, None).await.unwrap();
320            db.apply_batch(merkleized).await.unwrap();
321        }
322        db.commit().await.unwrap();
323        db.prune(db.sync_boundary().await).await.unwrap();
324        let root = db.root();
325        let op_count = db.size().await;
326        let inactivity_floor_loc = db.inactivity_floor_loc().await;
327
328        let db = reopen_db(context.child("reopen").with_attribute("index", 1)).await;
329        assert_eq!(db.size().await, op_count);
330        assert_eq!(db.inactivity_floor_loc().await, inactivity_floor_loc);
331        assert_eq!(db.root(), root);
332
333        // Write without applying (unapplied batch should be lost on reopen).
334        {
335            let mut batch = db.new_batch();
336            for i in 0u64..ELEMENTS {
337                let k = Sha256::hash(&i.to_be_bytes());
338                let v = make_value((i + 1) * 10000);
339                batch = batch.write(k, Some(v));
340            }
341            let _merkleized = batch.merkleize(&db, None).await.unwrap();
342        }
343        let db = reopen_db(context.child("reopen").with_attribute("index", 2)).await;
344        assert_eq!(db.size().await, op_count);
345        assert_eq!(db.inactivity_floor_loc().await, inactivity_floor_loc);
346        assert_eq!(db.root(), root);
347
348        // Write without applying again.
349        {
350            let mut batch = db.new_batch();
351            for i in 0u64..ELEMENTS {
352                let k = Sha256::hash(&i.to_be_bytes());
353                let v = make_value((i + 1) * 10000);
354                batch = batch.write(k, Some(v));
355            }
356            let _merkleized = batch.merkleize(&db, None).await.unwrap();
357        }
358        let db = reopen_db(context.child("reopen").with_attribute("index", 3)).await;
359        assert_eq!(db.size().await, op_count);
360        assert_eq!(db.root(), root);
361
362        // Three rounds of unapplied batches.
363        for _ in 0..3 {
364            let mut batch = db.new_batch();
365            for i in 0u64..ELEMENTS {
366                let k = Sha256::hash(&i.to_be_bytes());
367                let v = make_value((i + 1) * 10000);
368                batch = batch.write(k, Some(v));
369            }
370            let _merkleized = batch.merkleize(&db, None).await.unwrap();
371        }
372        let mut db = reopen_db(context.child("reopen").with_attribute("index", 4)).await;
373        assert_eq!(db.size().await, op_count);
374        assert_eq!(db.root(), root);
375
376        // Now actually commit a batch.
377        {
378            let mut batch = db.new_batch();
379            for i in 0u64..ELEMENTS {
380                let k = Sha256::hash(&i.to_be_bytes());
381                let v = make_value((i + 1) * 10000);
382                batch = batch.write(k, Some(v));
383            }
384            let merkleized = batch.merkleize(&db, None).await.unwrap();
385            db.apply_batch(merkleized).await.unwrap();
386        }
387        db.commit().await.unwrap();
388        let db = reopen_db(context.child("reopen").with_attribute("index", 5)).await;
389        assert!(db.size().await > op_count);
390        assert_ne!(db.inactivity_floor_loc().await, inactivity_floor_loc);
391        assert_ne!(db.root(), root);
392
393        db.destroy().await.unwrap();
394    }
395
396    /// Test recovery on empty db.
397    pub(crate) async fn test_any_db_empty_recovery<F: Family, D, V: Clone + CodecShared>(
398        context: Context,
399        db: D,
400        reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
401        make_value: impl Fn(u64) -> V,
402    ) where
403        D: DbAny<F, Key = Digest, Value = V, Digest = Digest>,
404    {
405        let root = db.root();
406
407        let db = reopen_db(context.child("reopen").with_attribute("index", 1)).await;
408        assert_eq!(db.size().await, 1);
409        assert_eq!(db.root(), root);
410
411        // Write without applying (unapplied batch should be lost on reopen).
412        {
413            let mut batch = db.new_batch();
414            for i in 0u64..1000 {
415                let k = Sha256::hash(&i.to_be_bytes());
416                let v = make_value((i + 1) * 10000);
417                batch = batch.write(k, Some(v));
418            }
419            let _merkleized = batch.merkleize(&db, None).await.unwrap();
420        }
421        let db = reopen_db(context.child("reopen").with_attribute("index", 2)).await;
422        assert_eq!(db.size().await, 1);
423        assert_eq!(db.root(), root);
424
425        // Write without applying again.
426        {
427            let mut batch = db.new_batch();
428            for i in 0u64..1000 {
429                let k = Sha256::hash(&i.to_be_bytes());
430                let v = make_value((i + 1) * 10000);
431                batch = batch.write(k, Some(v));
432            }
433            let _merkleized = batch.merkleize(&db, None).await.unwrap();
434        }
435        drop(db);
436        let db = reopen_db(context.child("reopen").with_attribute("index", 3)).await;
437        assert_eq!(db.size().await, 1);
438        assert_eq!(db.root(), root);
439
440        // Three rounds of unapplied batches.
441        for _ in 0..3 {
442            let mut batch = db.new_batch();
443            for i in 0u64..1000 {
444                let k = Sha256::hash(&i.to_be_bytes());
445                let v = make_value((i + 1) * 10000);
446                batch = batch.write(k, Some(v));
447            }
448            let _merkleized = batch.merkleize(&db, None).await.unwrap();
449        }
450        drop(db);
451        let mut db = reopen_db(context.child("reopen").with_attribute("index", 4)).await;
452        assert_eq!(db.size().await, 1);
453        assert_eq!(db.root(), root);
454
455        // Now actually commit a batch.
456        {
457            let mut batch = db.new_batch();
458            for i in 0u64..1000 {
459                let k = Sha256::hash(&i.to_be_bytes());
460                let v = make_value((i + 1) * 10000);
461                batch = batch.write(k, Some(v));
462            }
463            let merkleized = batch.merkleize(&db, None).await.unwrap();
464            db.apply_batch(merkleized).await.unwrap();
465        }
466        db.commit().await.unwrap();
467        drop(db);
468        let db = reopen_db(context.child("reopen").with_attribute("index", 5)).await;
469        assert!(db.size().await > 1);
470        assert_ne!(db.root(), root);
471
472        db.destroy().await.unwrap();
473    }
474
475    /// Test rewinding to a prior committed state and recovering that state after reopen.
476    pub(crate) async fn test_any_db_rewind_recovery<D, V>(
477        context: Context,
478        mut db: D,
479        reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
480        make_value: impl Fn(u64) -> V,
481    ) where
482        D: DbAny<mmr::Family, Key = Digest, Value = V, Digest = Digest> + RewindableDb,
483        V: Clone + CodecShared + Eq + std::fmt::Debug,
484    {
485        let key0 = Sha256::hash(&0u64.to_be_bytes());
486        let key1 = Sha256::hash(&1u64.to_be_bytes());
487        let key2 = Sha256::hash(&2u64.to_be_bytes());
488        let initial_root = db.root();
489        let initial_size = db.size().await;
490        let initial_floor = db.inactivity_floor_loc().await;
491
492        // Empty-batch rewind on an otherwise empty DB should apply no snapshot undos.
493        let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
494        let empty_range = db.apply_batch(merkleized).await.unwrap();
495        db.commit().await.unwrap();
496        assert_eq!(empty_range.start, initial_size);
497        assert_eq!(db.size().await, empty_range.end);
498        db.rewind_to_size(initial_size).await.unwrap();
499        assert_eq!(db.root(), initial_root);
500        assert_eq!(db.size().await, initial_size);
501        assert_eq!(db.inactivity_floor_loc().await, initial_floor);
502        assert_eq!(db.get_metadata().await.unwrap(), None);
503
504        let value0_a = make_value(10);
505        let value1_a = make_value(11);
506        let metadata_a = make_value(12);
507
508        let merkleized = db
509            .new_batch()
510            .write(key0, Some(value0_a.clone()))
511            .write(key1, Some(value1_a.clone()))
512            .merkleize(&db, Some(metadata_a.clone()))
513            .await
514            .unwrap();
515        let range_a = db.apply_batch(merkleized).await.unwrap();
516        db.commit().await.unwrap();
517
518        let root_a = db.root();
519        let size_a = db.size().await;
520        let floor_a = db.inactivity_floor_loc().await;
521        assert_eq!(size_a, range_a.end);
522
523        let value0_b = make_value(20);
524        let value2_b = make_value(21);
525        let metadata_b = make_value(22);
526
527        let merkleized = db
528            .new_batch()
529            .write(key0, Some(value0_b))
530            .write(key1, None)
531            .write(key2, Some(value2_b))
532            .merkleize(&db, Some(metadata_b))
533            .await
534            .unwrap();
535        let range_b = db.apply_batch(merkleized).await.unwrap();
536        db.commit().await.unwrap();
537        assert_eq!(range_b.start, size_a);
538        assert_ne!(db.root(), root_a);
539
540        let value0_c = make_value(30);
541        let value1_c = make_value(31);
542        let metadata_c = make_value(32);
543        let merkleized = db
544            .new_batch()
545            .write(key0, Some(value0_c))
546            .write(key1, Some(value1_c))
547            .write(key2, None)
548            .merkleize(&db, Some(metadata_c))
549            .await
550            .unwrap();
551        db.apply_batch(merkleized).await.unwrap();
552        db.commit().await.unwrap();
553
554        // Rewind across a tail where:
555        // - the same key (`key0`) was updated multiple times
556        // - `key1` was deleted then recreated (exercises net-zero active_keys_delta path)
557        db.rewind_to_size(size_a).await.unwrap();
558        assert_eq!(db.root(), root_a);
559        assert_eq!(db.size().await, size_a);
560        assert_eq!(db.inactivity_floor_loc().await, floor_a);
561        assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_a.clone()));
562        assert_eq!(db.get(&key0).await.unwrap(), Some(value0_a));
563        assert_eq!(db.get(&key1).await.unwrap(), Some(value1_a));
564        assert_eq!(db.get(&key2).await.unwrap(), None);
565
566        db.commit().await.unwrap();
567        drop(db);
568        let mut db = reopen_db(context.child("reopen_after_rewind")).await;
569        assert_eq!(db.root(), root_a);
570        assert_eq!(db.size().await, size_a);
571        assert_eq!(db.inactivity_floor_loc().await, floor_a);
572        assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_a));
573        assert_eq!(db.get(&key0).await.unwrap(), Some(make_value(10)));
574        assert_eq!(db.get(&key1).await.unwrap(), Some(make_value(11)));
575        assert_eq!(db.get(&key2).await.unwrap(), None);
576
577        // Fresh writes from the rewound tip should produce a correct new chain and persist
578        // across reopen.
579        let value2_d = make_value(40);
580        let metadata_d = make_value(41);
581        let merkleized = db
582            .new_batch()
583            .write(key2, Some(value2_d.clone()))
584            .merkleize(&db, Some(metadata_d.clone()))
585            .await
586            .unwrap();
587        db.apply_batch(merkleized).await.unwrap();
588        db.commit().await.unwrap();
589        assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_d.clone()));
590        assert_eq!(db.get(&key0).await.unwrap(), Some(make_value(10)));
591        assert_eq!(db.get(&key1).await.unwrap(), Some(make_value(11)));
592        assert_eq!(db.get(&key2).await.unwrap(), Some(value2_d.clone()));
593
594        drop(db);
595        let mut db = reopen_db(context.child("reopen_after_rewind_new_writes")).await;
596        assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_d));
597        assert_eq!(db.get(&key0).await.unwrap(), Some(make_value(10)));
598        assert_eq!(db.get(&key1).await.unwrap(), Some(make_value(11)));
599        assert_eq!(db.get(&key2).await.unwrap(), Some(value2_d));
600
601        // Rewind all the way to the initial commit boundary (`first_commit_loc + 1`).
602        db.rewind_to_size(initial_size).await.unwrap();
603        assert_eq!(db.root(), initial_root);
604        assert_eq!(db.size().await, initial_size);
605        assert_eq!(db.inactivity_floor_loc().await, initial_floor);
606        assert_eq!(db.get_metadata().await.unwrap(), None);
607        assert_eq!(db.get(&key0).await.unwrap(), None);
608        assert_eq!(db.get(&key1).await.unwrap(), None);
609        assert_eq!(db.get(&key2).await.unwrap(), None);
610
611        db.commit().await.unwrap();
612        drop(db);
613        let db = reopen_db(context.child("reopen_initial_boundary")).await;
614        assert_eq!(db.root(), initial_root);
615        assert_eq!(db.size().await, initial_size);
616        assert_eq!(db.inactivity_floor_loc().await, initial_floor);
617        assert_eq!(db.get_metadata().await.unwrap(), None);
618        assert_eq!(db.get(&key0).await.unwrap(), None);
619        assert_eq!(db.get(&key1).await.unwrap(), None);
620        assert_eq!(db.get(&key2).await.unwrap(), None);
621
622        db.destroy().await.unwrap();
623    }
624
625    /// Test that a large mixed workload can be authenticated and replayed correctly.
626    pub(crate) async fn test_any_db_build_and_authenticate<D, V>(
627        context: Context,
628        mut db: D,
629        reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
630        make_value: impl Fn(u64) -> V,
631    ) where
632        D: DbAny<mmr::Family, Key = Digest, Value = V, Digest = Digest> + Provable<mmr::Family>,
633        V: CodecShared + Clone + Eq + std::hash::Hash + std::fmt::Debug,
634        <D as Provable<mmr::Family>>::Operation: Codec,
635    {
636        use crate::qmdb::verify_proof;
637
638        const ELEMENTS: u64 = 1000;
639
640        let mut map = HashMap::<Digest, V>::default();
641        {
642            let mut batch = db.new_batch();
643            for i in 0u64..ELEMENTS {
644                let k = Sha256::hash(&i.to_be_bytes());
645                let v = make_value(i * 1000);
646                batch = batch.write(k, Some(v.clone()));
647                map.insert(k, v);
648            }
649
650            // Update every 3rd key.
651            for i in 0u64..ELEMENTS {
652                if i % 3 != 0 {
653                    continue;
654                }
655                let k = Sha256::hash(&i.to_be_bytes());
656                let v = make_value((i + 1) * 10000);
657                batch = batch.write(k, Some(v.clone()));
658                map.insert(k, v);
659            }
660
661            // Delete every 7th key.
662            for i in 0u64..ELEMENTS {
663                if i % 7 != 1 {
664                    continue;
665                }
666                let k = Sha256::hash(&i.to_be_bytes());
667                batch = batch.write(k, None);
668                map.remove(&k);
669            }
670
671            let merkleized = batch.merkleize(&db, None).await.unwrap();
672            db.apply_batch(merkleized).await.unwrap();
673        }
674        // Commit + sync with pruning raises inactivity floor.
675        db.sync().await.unwrap();
676        db.prune(db.sync_boundary().await).await.unwrap();
677
678        // Drop & reopen and ensure state matches.
679        let root = db.root();
680        db.sync().await.unwrap();
681        drop(db);
682        let db = reopen_db(context.child("reopened")).await;
683        assert_eq!(root, db.root());
684
685        // State matches reference map.
686        for i in 0u64..ELEMENTS {
687            let k = Sha256::hash(&i.to_be_bytes());
688            if let Some(map_value) = map.get(&k) {
689                let Some(db_value) = db.get(&k).await.unwrap() else {
690                    panic!("key not found in db: {k}");
691                };
692                assert_eq!(*map_value, db_value);
693            } else {
694                assert!(db.get(&k).await.unwrap().is_none());
695            }
696        }
697
698        let hasher = qmdb::hasher::<Sha256>();
699        let bounds = db.bounds().await;
700        let inactivity_floor = db.inactivity_floor_loc().await;
701        for loc in *inactivity_floor..*bounds.end {
702            let loc = Location::new(loc);
703            let (proof, ops) = db.proof(loc, NZU64!(10)).await.unwrap();
704            assert!(verify_proof(&hasher, &proof, loc, &ops, &root));
705        }
706
707        db.destroy().await.unwrap();
708    }
709
710    /// Test that replaying multiple updates of the same key on startup preserves correct state.
711    pub(crate) async fn test_any_db_log_replay<
712        F: Family,
713        D,
714        V: Clone + CodecShared + PartialEq + std::fmt::Debug,
715    >(
716        context: Context,
717        mut db: D,
718        reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
719        make_value: impl Fn(u64) -> V,
720    ) where
721        D: DbAny<F, Key = Digest, Value = V, Digest = Digest>,
722    {
723        // Update the same key many times within a single batch.
724        const UPDATES: u64 = 100;
725        let k = Sha256::hash(&UPDATES.to_be_bytes());
726        let mut last_value = None;
727        {
728            let mut batch = db.new_batch();
729            for i in 0u64..UPDATES {
730                let v = make_value(i * 1000);
731                last_value = Some(v.clone());
732                batch = batch.write(k, Some(v));
733            }
734            let merkleized = batch.merkleize(&db, None).await.unwrap();
735            db.apply_batch(merkleized).await.unwrap();
736        }
737        db.commit().await.unwrap();
738        let root = db.root();
739
740        // Reopen and verify the state is preserved correctly.
741        drop(db);
742        let db = reopen_db(context.child("reopened")).await;
743        assert_eq!(db.root(), root);
744        assert_eq!(db.get(&k).await.unwrap(), last_value);
745
746        db.destroy().await.unwrap();
747    }
748
749    /// Test that historical_proof returns correct proofs for past database states.
750    pub(crate) async fn test_any_db_historical_proof_basic<D, V: Clone + CodecShared>(
751        _context: Context,
752        mut db: D,
753        make_value: impl Fn(u64) -> V,
754    ) where
755        D: DbAny<mmr::Family, Key = Digest, Value = V, Digest = Digest> + Provable<mmr::Family>,
756        <D as Provable<mmr::Family>>::Operation: Codec + PartialEq + std::fmt::Debug,
757    {
758        use crate::qmdb::verify_proof;
759        use commonware_utils::NZU64;
760
761        // Add some operations
762        const OPS: u64 = 20;
763        {
764            let mut batch = db.new_batch();
765            for i in 0u64..OPS {
766                let k = Sha256::hash(&i.to_be_bytes());
767                let v = make_value(i * 1000);
768                batch = batch.write(k, Some(v));
769            }
770            let merkleized = batch.merkleize(&db, None).await.unwrap();
771            db.apply_batch(merkleized).await.unwrap();
772        }
773        let root_hash = db.root();
774        let original_op_count = db.size().await;
775
776        // Historical proof should match "regular" proof when historical size == current database size
777        let max_ops = NZU64!(10);
778        let start_loc = Location::new(5);
779        let (historical_proof, historical_ops) = db
780            .historical_proof(original_op_count, start_loc, max_ops)
781            .await
782            .unwrap();
783        let (regular_proof, regular_ops) = db.proof(start_loc, max_ops).await.unwrap();
784
785        assert_eq!(historical_proof.leaves, regular_proof.leaves);
786        assert_eq!(historical_proof.digests, regular_proof.digests);
787        assert_eq!(historical_ops, regular_ops);
788        let hasher = qmdb::hasher::<Sha256>();
789        assert!(verify_proof(
790            &hasher,
791            &historical_proof,
792            start_loc,
793            &historical_ops,
794            &root_hash,
795        ));
796
797        // Add more operations to the database
798        {
799            let mut batch = db.new_batch();
800            for i in OPS..(OPS + 5) {
801                let k = Sha256::hash(&(i + 1000).to_be_bytes()); // different keys
802                let v = make_value(i * 1000);
803                batch = batch.write(k, Some(v));
804            }
805            let merkleized = batch.merkleize(&db, None).await.unwrap();
806            db.apply_batch(merkleized).await.unwrap();
807        }
808
809        // Historical proof should remain the same even though database has grown
810        let (historical_proof2, historical_ops2) = db
811            .historical_proof(original_op_count, start_loc, max_ops)
812            .await
813            .unwrap();
814        assert_eq!(historical_proof2.leaves, original_op_count);
815        assert_eq!(historical_proof2.digests, regular_proof.digests);
816        assert_eq!(historical_ops2, regular_ops);
817        assert!(verify_proof(
818            &hasher,
819            &historical_proof2,
820            start_loc,
821            &historical_ops2,
822            &root_hash,
823        ));
824
825        db.destroy().await.unwrap();
826    }
827
828    /// Test that tampering with historical proofs causes verification to fail.
829    pub(crate) async fn test_any_db_historical_proof_invalid<D, V: Clone + CodecShared>(
830        _context: Context,
831        mut db: D,
832        make_value: impl Fn(u64) -> V,
833    ) where
834        D: DbAny<mmr::Family, Key = Digest, Value = V, Digest = Digest> + Provable<mmr::Family>,
835        <D as Provable<mmr::Family>>::Operation: Codec + PartialEq + std::fmt::Debug + Clone,
836    {
837        use crate::qmdb::verify_proof;
838        use commonware_utils::NZU64;
839
840        // Apply two single-write batches and capture the commit-boundary size after the
841        // first batch. `historical_proof` requires the historical size to land on a commit
842        // boundary when the db commits to an inactive peak boundary.
843        let mut historical_op_count = Location::new(0);
844        for i in 0u64..2 {
845            let k = Sha256::hash(&i.to_be_bytes());
846            let v = make_value(i * 1000);
847            let merkleized = db
848                .new_batch()
849                .write(k, Some(v))
850                .merkleize(&db, None)
851                .await
852                .unwrap();
853            db.apply_batch(merkleized).await.unwrap();
854            if i == 0 {
855                historical_op_count = db.bounds().await.end;
856            }
857        }
858
859        let expected_ops_len = (*historical_op_count - 1) as usize;
860        let (proof, ops) = db
861            .historical_proof(historical_op_count, Location::new(1), NZU64!(10))
862            .await
863            .unwrap();
864        assert_eq!(proof.leaves, historical_op_count);
865        assert_eq!(ops.len(), expected_ops_len);
866
867        let hasher = qmdb::hasher::<Sha256>();
868
869        // Changing the proof digests should cause verification to fail
870        {
871            let mut tampered_proof = proof.clone();
872            tampered_proof.digests[0] = Sha256::hash(b"invalid");
873            let root_hash = db.root();
874            assert!(!verify_proof(
875                &hasher,
876                &tampered_proof,
877                Location::new(1),
878                &ops,
879                &root_hash,
880            ));
881        }
882
883        // Appending an extra digest should cause verification to fail
884        {
885            let mut tampered_proof = proof.clone();
886            tampered_proof.digests.push(Sha256::hash(b"invalid"));
887            let root_hash = db.root();
888            assert!(!verify_proof(
889                &hasher,
890                &tampered_proof,
891                Location::new(1),
892                &ops,
893                &root_hash,
894            ));
895        }
896
897        // Changing the ops should cause verification to fail
898        {
899            let root_hash = db.root();
900            let mut tampered_ops = ops.clone();
901            // Swap first two ops if we have at least 2
902            if tampered_ops.len() >= 2 {
903                tampered_ops.swap(0, 1);
904                assert!(!verify_proof(
905                    &hasher,
906                    &proof,
907                    Location::new(1),
908                    &tampered_ops,
909                    &root_hash,
910                ));
911            }
912        }
913
914        // Appending an extra (duplicate) op should cause verification to fail
915        {
916            let root_hash = db.root();
917            let mut tampered_ops = ops.clone();
918            tampered_ops.push(tampered_ops[0].clone());
919            assert!(!verify_proof(
920                &hasher,
921                &proof,
922                Location::new(1),
923                &tampered_ops,
924                &root_hash,
925            ));
926        }
927
928        // Changing the start location should cause verification to fail
929        {
930            let root_hash = db.root();
931            assert!(!verify_proof(
932                &hasher,
933                &proof,
934                Location::new(2),
935                &ops,
936                &root_hash,
937            ));
938        }
939
940        // Changing the root digest should cause verification to fail
941        {
942            let invalid_root = Sha256::hash(b"invalid");
943            assert!(!verify_proof(
944                &hasher,
945                &proof,
946                Location::new(1),
947                &ops,
948                &invalid_root,
949            ));
950        }
951
952        // Changing the proof leaves count should cause verification to fail
953        {
954            let mut tampered_proof = proof.clone();
955            tampered_proof.leaves = Location::new(100);
956            let root_hash = db.root();
957            assert!(!verify_proof(
958                &hasher,
959                &tampered_proof,
960                Location::new(1),
961                &ops,
962                &root_hash,
963            ));
964        }
965
966        db.destroy().await.unwrap();
967    }
968
969    /// Test historical_proof edge cases: singleton db, limited ops, min position.
970    pub(crate) async fn test_any_db_historical_proof_edge_cases<D, V: Clone + CodecShared>(
971        _context: Context,
972        mut db: D,
973        make_value: impl Fn(u64) -> V,
974    ) where
975        D: DbAny<mmr::Family, Key = Digest, Value = V, Digest = Digest> + Provable<mmr::Family>,
976        <D as Provable<mmr::Family>>::Operation: Codec + PartialEq + std::fmt::Debug,
977    {
978        use commonware_utils::NZU64;
979
980        // Apply a sequence of single-write batches and record the commit-boundary size
981        // reached after each. `historical_proof` requires the historical size to be a
982        // commit boundary when the db commits to an inactive peak boundary, so we anchor each test on
983        // one of the boundaries we recorded here rather than hardcoding sizes that depend
984        // on internal floor-raising behavior.
985        let initial_size = db.bounds().await.end;
986        let mut boundaries = vec![initial_size];
987        for i in 0u64..5 {
988            let k = Sha256::hash(&i.to_be_bytes());
989            let v = make_value(i * 1000);
990            let merkleized = db
991                .new_batch()
992                .write(k, Some(v))
993                .merkleize(&db, None)
994                .await
995                .unwrap();
996            db.apply_batch(merkleized).await.unwrap();
997            boundaries.push(db.bounds().await.end);
998        }
999
1000        // Singleton historical state: only the initial CommitFloor is visible.
1001        let singleton_size = boundaries[0];
1002        let (single_proof, single_ops) = db
1003            .historical_proof(singleton_size, Location::new(0), NZU64!(1))
1004            .await
1005            .unwrap();
1006        assert_eq!(single_proof.leaves, singleton_size);
1007        assert_eq!(single_ops.len(), 1);
1008
1009        // max_ops exceeds the ops remaining at this historical size, so the returned count
1010        // is capped at `historical_size - start_loc`. Anchor at the earliest post-batch
1011        // boundary that has at least 3 ops past `boundaries[1]`.
1012        let limited_size = boundaries[2];
1013        let limited_start = boundaries[1];
1014        let expected_limited = (*limited_size - *limited_start) as usize;
1015        assert!(expected_limited > 0);
1016        let (_limited_proof, limited_ops) = db
1017            .historical_proof(limited_size, limited_start, NZU64!(20))
1018            .await
1019            .unwrap();
1020        assert_eq!(limited_ops.len(), expected_limited);
1021
1022        // Standard historical proof anchored at an early commit boundary, requesting a
1023        // bounded number of ops within the historical range.
1024        let min_size = boundaries[2];
1025        let max_ops = NZU64!(3);
1026        let expected_min = core::cmp::min(max_ops.get(), *min_size - 1) as usize;
1027        let (min_proof, min_ops) = db
1028            .historical_proof(min_size, Location::new(1), max_ops)
1029            .await
1030            .unwrap();
1031        assert_eq!(min_proof.leaves, min_size);
1032        assert_eq!(min_ops.len(), expected_min);
1033
1034        db.destroy().await.unwrap();
1035    }
1036
1037    /// Test making multiple commits, one of which deletes a key from a previous commit.
1038    pub(crate) async fn test_any_db_multiple_commits_delete_replayed<F: Family, D, V>(
1039        context: Context,
1040        mut db: D,
1041        reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
1042        make_value: impl Fn(u64) -> V,
1043    ) where
1044        D: DbAny<F, Key = Digest, Value = V, Digest = Digest>,
1045        V: Clone + CodecShared + Eq + std::fmt::Debug,
1046    {
1047        let mut map = HashMap::<Digest, V>::default();
1048        const ELEMENTS: u64 = 10;
1049        let metadata_value = make_value(42);
1050        let key_at = |j: u64, i: u64| Sha256::hash(&(j * 1000 + i).to_be_bytes());
1051        for j in 0u64..ELEMENTS {
1052            let mut batch = db.new_batch();
1053            for i in 0u64..ELEMENTS {
1054                let k = key_at(j, i);
1055                let v = make_value(i * 1000);
1056                batch = batch.write(k, Some(v.clone()));
1057                map.insert(k, v);
1058            }
1059            let merkleized = batch
1060                .merkleize(&db, Some(metadata_value.clone()))
1061                .await
1062                .unwrap();
1063            db.apply_batch(merkleized).await.unwrap();
1064            db.commit().await.unwrap();
1065        }
1066        assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_value));
1067        let k = key_at(ELEMENTS - 1, ELEMENTS - 1);
1068
1069        let merkleized = db
1070            .new_batch()
1071            .write(k, None)
1072            .merkleize(&db, None)
1073            .await
1074            .unwrap();
1075        db.apply_batch(merkleized).await.unwrap();
1076        db.commit().await.unwrap();
1077        assert_eq!(db.get_metadata().await.unwrap(), None);
1078        assert!(db.get(&k).await.unwrap().is_none());
1079
1080        let root = db.root();
1081        drop(db);
1082        let db = reopen_db(context.child("reopened")).await;
1083        assert_eq!(root, db.root());
1084        assert_eq!(db.get_metadata().await.unwrap(), None);
1085        assert!(db.get(&k).await.unwrap().is_none());
1086
1087        db.destroy().await.unwrap();
1088    }
1089
1090    use crate::qmdb::any::{
1091        ordered::{fixed::Db as OrderedFixedDb, variable::Db as OrderedVariableDb},
1092        unordered::{fixed::Db as UnorderedFixedDb, variable::Db as UnorderedVariableDb},
1093    };
1094    use commonware_macros::{test_group, test_traced};
1095    use commonware_parallel::{Sequential, Strategy};
1096    use commonware_runtime::{deterministic, Runner as _};
1097
1098    // Type aliases for all 12 MMR variants (all use OneCap for collision coverage).
1099    type UnorderedFixed =
1100        UnorderedFixedDb<mmr::Family, Context, Digest, Digest, Sha256, OneCap, Sequential>;
1101    type UnorderedVariable =
1102        UnorderedVariableDb<mmr::Family, Context, Digest, Digest, Sha256, OneCap, Sequential>;
1103    type OrderedFixed =
1104        OrderedFixedDb<mmr::Family, Context, Digest, Digest, Sha256, OneCap, Sequential>;
1105    type OrderedVariable =
1106        OrderedVariableDb<mmr::Family, Context, Digest, Digest, Sha256, OneCap, Sequential>;
1107    type UnorderedFixedP1 = unordered::fixed::partitioned::Db<
1108        mmr::Family,
1109        Context,
1110        Digest,
1111        Digest,
1112        Sha256,
1113        OneCap,
1114        1,
1115        Sequential,
1116    >;
1117    type UnorderedVariableP1 = unordered::variable::partitioned::Db<
1118        mmr::Family,
1119        Context,
1120        Digest,
1121        Digest,
1122        Sha256,
1123        OneCap,
1124        1,
1125        Sequential,
1126    >;
1127    type OrderedFixedP1 = ordered::fixed::partitioned::Db<
1128        mmr::Family,
1129        Context,
1130        Digest,
1131        Digest,
1132        Sha256,
1133        OneCap,
1134        1,
1135        Sequential,
1136    >;
1137    type OrderedVariableP1 = ordered::variable::partitioned::Db<
1138        mmr::Family,
1139        Context,
1140        Digest,
1141        Digest,
1142        Sha256,
1143        OneCap,
1144        1,
1145        Sequential,
1146    >;
1147    type UnorderedFixedP2 = unordered::fixed::partitioned::Db<
1148        mmr::Family,
1149        Context,
1150        Digest,
1151        Digest,
1152        Sha256,
1153        OneCap,
1154        2,
1155        Sequential,
1156    >;
1157    type UnorderedVariableP2 = unordered::variable::partitioned::Db<
1158        mmr::Family,
1159        Context,
1160        Digest,
1161        Digest,
1162        Sha256,
1163        OneCap,
1164        2,
1165        Sequential,
1166    >;
1167    type OrderedFixedP2 = ordered::fixed::partitioned::Db<
1168        mmr::Family,
1169        Context,
1170        Digest,
1171        Digest,
1172        Sha256,
1173        OneCap,
1174        2,
1175        Sequential,
1176    >;
1177    type OrderedVariableP2 = ordered::variable::partitioned::Db<
1178        mmr::Family,
1179        Context,
1180        Digest,
1181        Digest,
1182        Sha256,
1183        OneCap,
1184        2,
1185        Sequential,
1186    >;
1187
1188    // MMB type aliases for with_all_variants.
1189    mod mmb_types {
1190        use super::*;
1191        use crate::{
1192            index::{ordered::Index as OrderedIndex, unordered::Index as UnorderedIndex},
1193            journal::contiguous::{fixed::Journal as FJournal, variable::Journal as VJournal},
1194            merkle::{mmb, Location},
1195            qmdb::any::{
1196                operation::{update, Operation},
1197                value::{FixedEncoding, VariableEncoding},
1198            },
1199        };
1200
1201        type MmbLocation = Location<mmb::Family>;
1202
1203        pub type MmbUnorderedFixed = super::super::db::Db<
1204            mmb::Family,
1205            Context,
1206            FJournal<
1207                Context,
1208                Operation<mmb::Family, update::Unordered<Digest, FixedEncoding<Digest>>>,
1209            >,
1210            UnorderedIndex<OneCap, MmbLocation>,
1211            Sha256,
1212            update::Unordered<Digest, FixedEncoding<Digest>>,
1213            { crate::qmdb::any::BITMAP_CHUNK_BYTES },
1214            Sequential,
1215        >;
1216
1217        pub type MmbUnorderedVariable = super::super::db::Db<
1218            mmb::Family,
1219            Context,
1220            VJournal<
1221                Context,
1222                Operation<mmb::Family, update::Unordered<Digest, VariableEncoding<Digest>>>,
1223            >,
1224            UnorderedIndex<OneCap, MmbLocation>,
1225            Sha256,
1226            update::Unordered<Digest, VariableEncoding<Digest>>,
1227            { crate::qmdb::any::BITMAP_CHUNK_BYTES },
1228            Sequential,
1229        >;
1230
1231        pub type MmbOrderedFixed = super::super::db::Db<
1232            mmb::Family,
1233            Context,
1234            FJournal<
1235                Context,
1236                Operation<mmb::Family, update::Ordered<Digest, FixedEncoding<Digest>>>,
1237            >,
1238            OrderedIndex<OneCap, MmbLocation>,
1239            Sha256,
1240            update::Ordered<Digest, FixedEncoding<Digest>>,
1241            { crate::qmdb::any::BITMAP_CHUNK_BYTES },
1242            Sequential,
1243        >;
1244
1245        pub type MmbOrderedVariable = super::super::db::Db<
1246            mmb::Family,
1247            Context,
1248            VJournal<
1249                Context,
1250                Operation<mmb::Family, update::Ordered<Digest, VariableEncoding<Digest>>>,
1251            >,
1252            OrderedIndex<OneCap, MmbLocation>,
1253            Sha256,
1254            update::Ordered<Digest, VariableEncoding<Digest>>,
1255            { crate::qmdb::any::BITMAP_CHUNK_BYTES },
1256            Sequential,
1257        >;
1258    }
1259    use mmb_types::*;
1260
1261    #[inline]
1262    fn to_digest(i: u64) -> Digest {
1263        Sha256::hash(&i.to_be_bytes())
1264    }
1265
1266    // Defines MMR-only variants (for tests that require mmr::Family, e.g. proof verification).
1267    macro_rules! with_mmr_variants {
1268        ($cb:ident!($($args:tt)*)) => {
1269            $cb!($($args)*, "uf", UnorderedFixed, mmr::Family, fixed_db_config);
1270            $cb!($($args)*, "uv", UnorderedVariable, mmr::Family, variable_db_config);
1271            $cb!($($args)*, "of", OrderedFixed, mmr::Family, fixed_db_config);
1272            $cb!($($args)*, "ov", OrderedVariable, mmr::Family, variable_db_config);
1273            $cb!($($args)*, "ufp1", UnorderedFixedP1, mmr::Family, fixed_db_config);
1274            $cb!($($args)*, "uvp1", UnorderedVariableP1, mmr::Family, variable_db_config);
1275            $cb!($($args)*, "ofp1", OrderedFixedP1, mmr::Family, fixed_db_config);
1276            $cb!($($args)*, "ovp1", OrderedVariableP1, mmr::Family, variable_db_config);
1277            $cb!($($args)*, "ufp2", UnorderedFixedP2, mmr::Family, fixed_db_config);
1278            $cb!($($args)*, "uvp2", UnorderedVariableP2, mmr::Family, variable_db_config);
1279            $cb!($($args)*, "ofp2", OrderedFixedP2, mmr::Family, fixed_db_config);
1280            $cb!($($args)*, "ovp2", OrderedVariableP2, mmr::Family, variable_db_config);
1281        };
1282    }
1283
1284    // Defines all variants (MMR + MMB). Calls $cb!($($args)*, $label, $type, $family, $config) for each.
1285    macro_rules! with_all_variants {
1286        ($cb:ident!($($args:tt)*)) => {
1287            $cb!($($args)*, "uf", UnorderedFixed, mmr::Family, fixed_db_config);
1288            $cb!($($args)*, "uv", UnorderedVariable, mmr::Family, variable_db_config);
1289            $cb!($($args)*, "of", OrderedFixed, mmr::Family, fixed_db_config);
1290            $cb!($($args)*, "ov", OrderedVariable, mmr::Family, variable_db_config);
1291            $cb!($($args)*, "ufp1", UnorderedFixedP1, mmr::Family, fixed_db_config);
1292            $cb!($($args)*, "uvp1", UnorderedVariableP1, mmr::Family, variable_db_config);
1293            $cb!($($args)*, "ofp1", OrderedFixedP1, mmr::Family, fixed_db_config);
1294            $cb!($($args)*, "ovp1", OrderedVariableP1, mmr::Family, variable_db_config);
1295            $cb!($($args)*, "ufp2", UnorderedFixedP2, mmr::Family, fixed_db_config);
1296            $cb!($($args)*, "uvp2", UnorderedVariableP2, mmr::Family, variable_db_config);
1297            $cb!($($args)*, "ofp2", OrderedFixedP2, mmr::Family, fixed_db_config);
1298            $cb!($($args)*, "ovp2", OrderedVariableP2, mmr::Family, variable_db_config);
1299            $cb!($($args)*, "uf_mmb", MmbUnorderedFixed, mmb::Family, fixed_db_config);
1300            $cb!($($args)*, "uv_mmb", MmbUnorderedVariable, mmb::Family, variable_db_config);
1301            $cb!($($args)*, "of_mmb", MmbOrderedFixed, mmb::Family, fixed_db_config);
1302            $cb!($($args)*, "ov_mmb", MmbOrderedVariable, mmb::Family, variable_db_config);
1303        };
1304    }
1305
1306    // Runner macros - each receives common args followed by (label, type, family, config)
1307    // from with_all_variants. Each variant constructs a fresh `deterministic::Runner` so
1308    // the outer state machine of one variant never has to fit alongside the outer state
1309    // machines of the others on the test thread stack.
1310    macro_rules! test_with_reopen {
1311        ($sfx:expr, $f:expr, $l:literal, $db:ty, $family:ty, $cfg:ident) => {{
1312            let p = concat!($l, "_", $sfx);
1313            let executor = deterministic::Runner::default();
1314            executor.start(|context| async move {
1315                let ctx = context.child($l);
1316                let db = <$db>::init(ctx.child("storage"), $cfg::<OneCap>(p, &ctx))
1317                    .await
1318                    .unwrap();
1319                $f(
1320                    ctx,
1321                    db,
1322                    |ctx| {
1323                        Box::pin(async move {
1324                            <$db>::init(ctx.child("storage"), $cfg::<OneCap>(p, &ctx))
1325                                .await
1326                                .unwrap()
1327                        })
1328                    },
1329                    to_digest,
1330                )
1331                .await;
1332            });
1333        }};
1334    }
1335
1336    macro_rules! test_with_make_value {
1337        ($sfx:expr, $f:expr, $l:literal, $db:ty, $family:ty, $cfg:ident) => {{
1338            let p = concat!($l, "_", $sfx);
1339            let executor = deterministic::Runner::default();
1340            executor.start(|context| async move {
1341                let ctx = context.child($l);
1342                let db = <$db>::init(ctx.child("storage"), $cfg::<OneCap>(p, &ctx))
1343                    .await
1344                    .unwrap();
1345                $f(ctx, db, to_digest).await;
1346            });
1347        }};
1348    }
1349
1350    // Macro to run a test on all DB variants (MMR + MMB).
1351    macro_rules! for_all_variants {
1352        ($sfx:expr, with_reopen: $f:expr) => {{
1353            with_all_variants!(test_with_reopen!($sfx, $f));
1354        }};
1355        ($sfx:expr, with_make_value: $f:expr) => {{
1356            with_all_variants!(test_with_make_value!($sfx, $f));
1357        }};
1358    }
1359
1360    // Macro to run a test on MMR-only DB variants (for tests that use mmr::Family-specific
1361    // features like Location::new or verify_proof).
1362    macro_rules! for_mmr_variants {
1363        ($sfx:expr, with_reopen: $f:expr) => {{
1364            with_mmr_variants!(test_with_reopen!($sfx, $f));
1365        }};
1366        ($sfx:expr, with_make_value: $f:expr) => {{
1367            with_mmr_variants!(test_with_make_value!($sfx, $f));
1368        }};
1369    }
1370
1371    #[test_group("slow")]
1372    #[test_traced("WARN")]
1373    fn test_all_variants_log_replay() {
1374        for_all_variants!("lr", with_reopen: test_any_db_log_replay);
1375    }
1376
1377    #[test_group("slow")]
1378    #[test_traced("WARN")]
1379    fn test_all_variants_build_and_authenticate() {
1380        for_mmr_variants!("baa", with_reopen: test_any_db_build_and_authenticate);
1381    }
1382
1383    #[test_group("slow")]
1384    #[test_traced("WARN")]
1385    fn test_all_variants_historical_proof_basic() {
1386        for_mmr_variants!("hpb", with_make_value: test_any_db_historical_proof_basic);
1387    }
1388
1389    #[test_group("slow")]
1390    #[test_traced("WARN")]
1391    fn test_all_variants_historical_proof_invalid() {
1392        for_mmr_variants!("hpi", with_make_value: test_any_db_historical_proof_invalid);
1393    }
1394
1395    #[test_group("slow")]
1396    #[test_traced("WARN")]
1397    fn test_all_variants_historical_proof_edge_cases() {
1398        for_mmr_variants!("hpec", with_make_value: test_any_db_historical_proof_edge_cases);
1399    }
1400
1401    #[test_group("slow")]
1402    #[test_traced("WARN")]
1403    fn test_all_variants_multiple_commits_delete_replayed() {
1404        for_all_variants!("mcdr", with_reopen: test_any_db_multiple_commits_delete_replayed);
1405    }
1406
1407    #[test_group("slow")]
1408    #[test_traced("WARN")]
1409    fn test_all_variants_non_empty_recovery() {
1410        for_all_variants!("ner", with_reopen: test_any_db_non_empty_recovery);
1411    }
1412
1413    #[test_group("slow")]
1414    #[test_traced("WARN")]
1415    fn test_all_variants_empty_recovery() {
1416        for_all_variants!("er", with_reopen: test_any_db_empty_recovery);
1417    }
1418
1419    #[test_group("slow")]
1420    #[test_traced("WARN")]
1421    fn test_all_variants_rewind_recovery() {
1422        for_mmr_variants!("rr", with_reopen: test_any_db_rewind_recovery);
1423    }
1424
1425    fn key(i: u64) -> Digest {
1426        Sha256::hash(&i.to_be_bytes())
1427    }
1428
1429    fn val(i: u64) -> Digest {
1430        Sha256::hash(&(i + 10000).to_be_bytes())
1431    }
1432
1433    /// Helper: commit a batch of key-value writes and return the applied range.
1434    async fn commit_writes(
1435        db: &mut UnorderedVariable,
1436        writes: impl IntoIterator<Item = (Digest, Option<Digest>)>,
1437        metadata: Option<Digest>,
1438    ) -> std::ops::Range<crate::mmr::Location> {
1439        let mut batch = db.new_batch();
1440        for (k, v) in writes {
1441            batch = batch.write(k, v);
1442        }
1443        let merkleized = batch.merkleize(&*db, metadata).await.unwrap();
1444        let range = db.apply_batch(merkleized).await.unwrap();
1445        db.commit().await.unwrap();
1446        range
1447    }
1448
1449    /// An empty batch (no mutations) still produces a valid commit.
1450    #[test_traced("INFO")]
1451    fn test_any_batch_empty() {
1452        let executor = deterministic::Runner::default();
1453        executor.start(|context| async move {
1454            let ctx = context.child("db");
1455            let mut db: UnorderedVariable = UnorderedVariableDb::init(
1456                ctx.child("storage"),
1457                variable_db_config::<OneCap>("e", &ctx),
1458            )
1459            .await
1460            .unwrap();
1461
1462            let root_before = db.root();
1463            let batch = db.new_batch();
1464            let merkleized = batch.merkleize(&db, None).await.unwrap();
1465            db.apply_batch(merkleized).await.unwrap();
1466
1467            // A CommitFloor op was appended, so root must change.
1468            assert_ne!(db.root(), root_before);
1469
1470            // DB should still be functional.
1471            commit_writes(&mut db, [(key(0), Some(val(0)))], None).await;
1472            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
1473
1474            db.destroy().await.unwrap();
1475        });
1476    }
1477
1478    /// Metadata propagates through merkleize and clears with None.
1479    #[test_traced("INFO")]
1480    fn test_any_batch_metadata() {
1481        let executor = deterministic::Runner::default();
1482        executor.start(|context| async move {
1483            let ctx = context.child("db");
1484            let mut db: UnorderedVariable = UnorderedVariableDb::init(
1485                ctx.child("storage"),
1486                variable_db_config::<OneCap>("m", &ctx),
1487            )
1488            .await
1489            .unwrap();
1490
1491            let metadata = val(42);
1492
1493            // Batch with metadata.
1494            commit_writes(&mut db, [(key(0), Some(val(0)))], Some(metadata)).await;
1495            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
1496
1497            // Batch without metadata clears it.
1498            let batch = db.new_batch();
1499            let merkleized = batch.merkleize(&db, None).await.unwrap();
1500            db.apply_batch(merkleized).await.unwrap();
1501            assert_eq!(db.get_metadata().await.unwrap(), None);
1502
1503            db.destroy().await.unwrap();
1504        });
1505    }
1506
1507    /// batch.get() reads through: pending mutations -> base DB.
1508    /// Updates shadow the base value; deletes hide the key.
1509    #[test_traced("INFO")]
1510    fn test_any_batch_get_read_through() {
1511        let executor = deterministic::Runner::default();
1512        executor.start(|context| async move {
1513            let ctx = context.child("db");
1514            let mut db: UnorderedVariable = UnorderedVariableDb::init(
1515                ctx.child("storage"),
1516                variable_db_config::<OneCap>("g", &ctx),
1517            )
1518            .await
1519            .unwrap();
1520
1521            // Pre-populate with key A.
1522            let ka = key(0);
1523            let va = val(0);
1524            commit_writes(&mut db, [(ka, Some(va))], None).await;
1525
1526            let kb = key(1);
1527            let vb = val(1);
1528            let kc = key(2);
1529
1530            let mut batch = db.new_batch();
1531
1532            // Read-through to base DB.
1533            assert_eq!(batch.get(&ka, &db).await.unwrap(), Some(va));
1534
1535            // Pending mutation visible.
1536            batch = batch.write(kb, Some(vb));
1537            assert_eq!(batch.get(&kb, &db).await.unwrap(), Some(vb));
1538
1539            // Nonexistent key.
1540            assert_eq!(batch.get(&kc, &db).await.unwrap(), None);
1541
1542            // Update shadows base DB value.
1543            let va2 = val(100);
1544            batch = batch.write(ka, Some(va2));
1545            assert_eq!(batch.get(&ka, &db).await.unwrap(), Some(va2));
1546
1547            // Delete hides the key.
1548            batch = batch.write(ka, None);
1549            assert_eq!(batch.get(&ka, &db).await.unwrap(), None);
1550
1551            db.destroy().await.unwrap();
1552        });
1553    }
1554
1555    /// merkleized.get() reflects the resolved diff after merkleize.
1556    #[test_traced("INFO")]
1557    fn test_any_batch_get_on_merkleized() {
1558        let executor = deterministic::Runner::default();
1559        executor.start(|context| async move {
1560            let ctx = context.child("db");
1561            let mut db: UnorderedVariable = UnorderedVariableDb::init(
1562                ctx.child("storage"),
1563                variable_db_config::<OneCap>("mg", &ctx),
1564            )
1565            .await
1566            .unwrap();
1567
1568            let ka = key(0);
1569            let kb = key(1);
1570            let kc = key(2);
1571            let kd = key(3);
1572
1573            // Pre-populate A and B.
1574            commit_writes(&mut db, [(ka, Some(val(0))), (kb, Some(val(1)))], None).await;
1575
1576            // Batch: update A, delete B, create C.
1577            let va2 = val(100);
1578            let vc = val(2);
1579            let mut batch = db.new_batch();
1580            batch = batch.write(ka, Some(va2));
1581            batch = batch.write(kb, None);
1582            batch = batch.write(kc, Some(vc));
1583            let merkleized = batch.merkleize(&db, None).await.unwrap();
1584
1585            assert_eq!(merkleized.get(&ka, &db).await.unwrap(), Some(va2));
1586            assert_eq!(merkleized.get(&kb, &db).await.unwrap(), None);
1587            assert_eq!(merkleized.get(&kc, &db).await.unwrap(), Some(vc));
1588            assert_eq!(merkleized.get(&kd, &db).await.unwrap(), None);
1589
1590            db.destroy().await.unwrap();
1591        });
1592    }
1593
1594    /// Child batch reads through: child mutations -> parent diff -> base DB.
1595    #[test_traced("INFO")]
1596    fn test_any_batch_stacked_get() {
1597        let executor = deterministic::Runner::default();
1598        executor.start(|context| async move {
1599            let ctx = context.child("db");
1600            let db: UnorderedVariable = UnorderedVariableDb::init(
1601                ctx.child("storage"),
1602                variable_db_config::<OneCap>("sg", &ctx),
1603            )
1604            .await
1605            .unwrap();
1606
1607            let ka = key(0);
1608            let kb = key(1);
1609
1610            // Parent batch writes A.
1611            let mut batch = db.new_batch();
1612            batch = batch.write(ka, Some(val(0)));
1613            let merkleized = batch.merkleize(&db, None).await.unwrap();
1614
1615            // Child reads parent's A.
1616            let mut child = merkleized.new_batch::<Sha256>();
1617            assert_eq!(child.get(&ka, &db).await.unwrap(), Some(val(0)));
1618
1619            // Child overwrites A.
1620            child = child.write(ka, Some(val(100)));
1621            assert_eq!(child.get(&ka, &db).await.unwrap(), Some(val(100)));
1622
1623            // Child writes new key B.
1624            child = child.write(kb, Some(val(1)));
1625            assert_eq!(child.get(&kb, &db).await.unwrap(), Some(val(1)));
1626
1627            // Child deletes A.
1628            child = child.write(ka, None);
1629            assert_eq!(child.get(&ka, &db).await.unwrap(), None);
1630
1631            db.destroy().await.unwrap();
1632        });
1633    }
1634
1635    /// Parent deletes a base-DB key, child re-creates it.
1636    #[test_traced("INFO")]
1637    fn test_any_batch_stacked_delete_recreate() {
1638        let executor = deterministic::Runner::default();
1639        executor.start(|context| async move {
1640            let ctx = context.child("db");
1641            let mut db: UnorderedVariable = UnorderedVariableDb::init(
1642                ctx.child("storage"),
1643                variable_db_config::<OneCap>("dr", &ctx),
1644            )
1645            .await
1646            .unwrap();
1647
1648            let ka = key(0);
1649
1650            // Pre-populate with key A.
1651            commit_writes(&mut db, [(ka, Some(val(0)))], None).await;
1652
1653            // Parent batch deletes A.
1654            let mut parent = db.new_batch();
1655            parent = parent.write(ka, None);
1656            let parent_m = parent.merkleize(&db, None).await.unwrap();
1657            assert_eq!(parent_m.get(&ka, &db).await.unwrap(), None);
1658
1659            // Child re-creates A with a new value.
1660            let mut child = parent_m.new_batch::<Sha256>();
1661            child = child.write(ka, Some(val(200)));
1662            let child_m = child.merkleize(&db, None).await.unwrap();
1663            assert_eq!(child_m.get(&ka, &db).await.unwrap(), Some(val(200)));
1664
1665            // Apply and verify DB state.
1666            db.apply_batch(child_m).await.unwrap();
1667            assert_eq!(db.get(&ka).await.unwrap(), Some(val(200)));
1668
1669            db.destroy().await.unwrap();
1670        });
1671    }
1672
1673    /// Floor raise during merkleize moves active operations to the tip.
1674    /// All keys remain accessible with correct values.
1675    #[test_traced("INFO")]
1676    fn test_any_batch_floor_raise() {
1677        let executor = deterministic::Runner::default();
1678        executor.start(|context| async move {
1679            let ctx = context.child("db");
1680            let mut db: UnorderedVariable = UnorderedVariableDb::init(
1681                ctx.child("storage"),
1682                variable_db_config::<OneCap>("fr", &ctx),
1683            )
1684            .await
1685            .unwrap();
1686
1687            // Pre-populate with 100 keys.
1688            let init: Vec<_> = (0..100).map(|i| (key(i), Some(val(i)))).collect();
1689            commit_writes(&mut db, init, None).await;
1690
1691            let floor_before = db.inactivity_floor_loc();
1692
1693            // Update 30 keys.
1694            let updates: Vec<_> = (0..30).map(|i| (key(i), Some(val(i + 500)))).collect();
1695            commit_writes(&mut db, updates, None).await;
1696
1697            // Floor should have advanced.
1698            assert!(db.inactivity_floor_loc() > floor_before);
1699
1700            // All keys should still be accessible with correct values.
1701            for i in 0..30 {
1702                assert_eq!(
1703                    db.get(&key(i)).await.unwrap(),
1704                    Some(val(i + 500)),
1705                    "updated key {i} mismatch"
1706                );
1707            }
1708            for i in 30..100 {
1709                assert_eq!(
1710                    db.get(&key(i)).await.unwrap(),
1711                    Some(val(i)),
1712                    "untouched key {i} mismatch"
1713                );
1714            }
1715
1716            db.destroy().await.unwrap();
1717        });
1718    }
1719
1720    /// apply_batch() returns the correct range of committed locations.
1721    #[test_traced("INFO")]
1722    fn test_any_batch_apply_returns_range() {
1723        let executor = deterministic::Runner::default();
1724        executor.start(|context| async move {
1725            let ctx = context.child("db");
1726            let mut db: UnorderedVariable = UnorderedVariableDb::init(
1727                ctx.child("storage"),
1728                variable_db_config::<OneCap>("ar", &ctx),
1729            )
1730            .await
1731            .unwrap();
1732
1733            // First batch: 5 keys.
1734            let writes: Vec<_> = (0..5).map(|i| (key(i), Some(val(i)))).collect();
1735            let range1 = commit_writes(&mut db, writes, None).await;
1736
1737            // Range should start after the initial CommitFloor (location 0).
1738            assert_eq!(range1.start, crate::mmr::Location::new(1));
1739            // Range length >= 6 (5 writes + 1 CommitFloor + possible floor raise ops).
1740            assert!(range1.end.saturating_sub(*range1.start) >= 6);
1741
1742            // Second batch: ranges must be contiguous.
1743            let writes: Vec<_> = (5..10).map(|i| (key(i), Some(val(i)))).collect();
1744            let range2 = commit_writes(&mut db, writes, None).await;
1745            assert_eq!(range2.start, range1.end);
1746
1747            db.destroy().await.unwrap();
1748        });
1749    }
1750
1751    /// 3-level chain: parent -> child -> grandchild, merkleize grandchild and apply.
1752    #[test_traced("INFO")]
1753    fn test_any_batch_deep_chain() {
1754        let executor = deterministic::Runner::default();
1755        executor.start(|context| async move {
1756            let ctx = context.child("db");
1757            let mut db: UnorderedVariable = UnorderedVariableDb::init(
1758                ctx.child("storage"),
1759                variable_db_config::<OneCap>("dc", &ctx),
1760            )
1761            .await
1762            .unwrap();
1763
1764            // Pre-populate with keys 0..5.
1765            let init: Vec<_> = (0..5).map(|i| (key(i), Some(val(i)))).collect();
1766            commit_writes(&mut db, init, None).await;
1767
1768            // Parent: overwrite key 0, add key 5.
1769            let mut parent = db.new_batch();
1770            parent = parent.write(key(0), Some(val(100)));
1771            parent = parent.write(key(5), Some(val(5)));
1772            let parent_m = parent.merkleize(&db, None).await.unwrap();
1773
1774            // Child: overwrite key 1, add key 6.
1775            let mut child = parent_m.new_batch::<Sha256>();
1776            child = child.write(key(1), Some(val(101)));
1777            child = child.write(key(6), Some(val(6)));
1778            let child_m = child.merkleize(&db, None).await.unwrap();
1779
1780            // Grandchild: delete key 2, add key 7.
1781            let mut grandchild = child_m.new_batch::<Sha256>();
1782            grandchild = grandchild.write(key(2), None);
1783            grandchild = grandchild.write(key(7), Some(val(7)));
1784            let grandchild_m = grandchild.merkleize(&db, None).await.unwrap();
1785
1786            // Verify reads through the chain.
1787            assert_eq!(
1788                grandchild_m.get(&key(0), &db).await.unwrap(),
1789                Some(val(100))
1790            );
1791            assert_eq!(
1792                grandchild_m.get(&key(1), &db).await.unwrap(),
1793                Some(val(101))
1794            );
1795            assert_eq!(grandchild_m.get(&key(2), &db).await.unwrap(), None);
1796            assert_eq!(grandchild_m.get(&key(7), &db).await.unwrap(), Some(val(7)));
1797
1798            // Apply.
1799            db.apply_batch(grandchild_m).await.unwrap();
1800
1801            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(100)));
1802            assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(101)));
1803            assert_eq!(db.get(&key(2)).await.unwrap(), None);
1804            assert_eq!(db.get(&key(3)).await.unwrap(), Some(val(3)));
1805            assert_eq!(db.get(&key(4)).await.unwrap(), Some(val(4)));
1806            assert_eq!(db.get(&key(5)).await.unwrap(), Some(val(5)));
1807            assert_eq!(db.get(&key(6)).await.unwrap(), Some(val(6)));
1808            assert_eq!(db.get(&key(7)).await.unwrap(), Some(val(7)));
1809
1810            db.destroy().await.unwrap();
1811        });
1812    }
1813
1814    /// Chained batch produces the same DB state as sequential apply_batch calls.
1815    #[test_traced("INFO")]
1816    fn test_any_batch_chain_matches_sequential() {
1817        let executor = deterministic::Runner::default();
1818        executor.start(|context| async move {
1819            let ctx = context.child("db");
1820
1821            // DB A: sequential apply.
1822            let ctx_a = ctx.child("a");
1823            let mut db_a: UnorderedVariable = UnorderedVariableDb::init(
1824                ctx_a.child("db"),
1825                variable_db_config::<OneCap>("cms-a", &ctx_a),
1826            )
1827            .await
1828            .unwrap();
1829
1830            // DB B: chained batch.
1831            let ctx_b = ctx.child("b");
1832            let mut db_b: UnorderedVariable = UnorderedVariableDb::init(
1833                ctx_b.child("db"),
1834                variable_db_config::<OneCap>("cms-b", &ctx_b),
1835            )
1836            .await
1837            .unwrap();
1838
1839            // Batch 1 operations: create keys 0..5.
1840            let writes1: Vec<_> = (0..5).map(|i| (key(i), Some(val(i)))).collect();
1841
1842            // Batch 2 operations: update key 0, delete key 1, create key 5.
1843            let writes2 = vec![
1844                (key(0), Some(val(100))),
1845                (key(1), None),
1846                (key(5), Some(val(5))),
1847            ];
1848
1849            // DB A: apply sequentially.
1850            commit_writes(&mut db_a, writes1.clone(), None).await;
1851            commit_writes(&mut db_a, writes2.clone(), None).await;
1852
1853            // DB B: apply as chain.
1854            let mut parent = db_b.new_batch();
1855            for (k, v) in &writes1 {
1856                parent = parent.write(*k, *v);
1857            }
1858            let parent_m = parent.merkleize(&db_b, None).await.unwrap();
1859
1860            let mut child = parent_m.new_batch::<Sha256>();
1861            for (k, v) in &writes2 {
1862                child = child.write(*k, *v);
1863            }
1864            let child_m = child.merkleize(&db_b, None).await.unwrap();
1865            db_b.apply_batch(child_m).await.unwrap();
1866
1867            // Both DBs must have the same state.
1868            assert_eq!(db_a.root(), db_b.root());
1869            for i in 0..6 {
1870                assert_eq!(
1871                    db_a.get(&key(i)).await.unwrap(),
1872                    db_b.get(&key(i)).await.unwrap(),
1873                    "key {i} mismatch"
1874                );
1875            }
1876
1877            db_a.destroy().await.unwrap();
1878            db_b.destroy().await.unwrap();
1879        });
1880    }
1881
1882    /// Create and delete the same key in a single batch produces no net change for that key.
1883    #[test_traced("INFO")]
1884    fn test_any_batch_create_then_delete_same_batch() {
1885        let executor = deterministic::Runner::default();
1886        executor.start(|context| async move {
1887            let ctx = context.child("db");
1888            let mut db: UnorderedVariable = UnorderedVariableDb::init(
1889                ctx.child("storage"),
1890                variable_db_config::<OneCap>("cd", &ctx),
1891            )
1892            .await
1893            .unwrap();
1894
1895            // Pre-populate key A.
1896            commit_writes(&mut db, [(key(0), Some(val(0)))], None).await;
1897
1898            // In one batch: create B then delete B, also create C and delete A.
1899            let mut batch = db.new_batch();
1900            batch = batch.write(key(1), Some(val(1))); // create B
1901            batch = batch.write(key(1), None); // delete B (net: no B)
1902            batch = batch.write(key(2), Some(val(2))); // create C
1903            batch = batch.write(key(0), None); // delete A
1904            let merkleized = batch.merkleize(&db, None).await.unwrap();
1905            db.apply_batch(merkleized).await.unwrap();
1906
1907            assert_eq!(db.get(&key(0)).await.unwrap(), None);
1908            assert_eq!(db.get(&key(1)).await.unwrap(), None);
1909            assert_eq!(db.get(&key(2)).await.unwrap(), Some(val(2)));
1910
1911            db.destroy().await.unwrap();
1912        });
1913    }
1914
1915    /// Deleting all keys exercises the total_active_keys == 0 floor-raise fast path.
1916    #[test_traced("INFO")]
1917    fn test_any_batch_delete_all_keys() {
1918        let executor = deterministic::Runner::default();
1919        executor.start(|context| async move {
1920            let ctx = context.child("db");
1921            let mut db: UnorderedVariable = UnorderedVariableDb::init(
1922                ctx.child("storage"),
1923                variable_db_config::<OneCap>("da", &ctx),
1924            )
1925            .await
1926            .unwrap();
1927
1928            // Pre-populate 5 keys.
1929            let init: Vec<_> = (0..5).map(|i| (key(i), Some(val(i)))).collect();
1930            commit_writes(&mut db, init, None).await;
1931
1932            // Delete all 5.
1933            let deletes: Vec<_> = (0..5).map(|i| (key(i), None)).collect();
1934            commit_writes(&mut db, deletes, None).await;
1935
1936            for i in 0..5 {
1937                assert_eq!(db.get(&key(i)).await.unwrap(), None, "key {i} not deleted");
1938            }
1939
1940            // DB should still be functional after deleting everything.
1941            commit_writes(&mut db, [(key(10), Some(val(10)))], None).await;
1942            assert_eq!(db.get(&key(10)).await.unwrap(), Some(val(10)));
1943
1944            db.destroy().await.unwrap();
1945        });
1946    }
1947
1948    /// Two independent batches from the same DB do not interfere with each other.
1949    #[test_traced("INFO")]
1950    fn test_any_batch_parallel_forks() {
1951        let executor = deterministic::Runner::default();
1952        executor.start(|context| async move {
1953            let ctx = context.child("db");
1954            let mut db: UnorderedVariable = UnorderedVariableDb::init(
1955                ctx.child("storage"),
1956                variable_db_config::<OneCap>("pf", &ctx),
1957            )
1958            .await
1959            .unwrap();
1960
1961            // Pre-populate.
1962            commit_writes(&mut db, [(key(0), Some(val(0)))], None).await;
1963            let root_before = db.root();
1964
1965            // Fork A: update key 0 and create key 1.
1966            let fork_a_m = db
1967                .new_batch()
1968                .write(key(0), Some(val(100)))
1969                .write(key(1), Some(val(1)))
1970                .merkleize(&db, None)
1971                .await
1972                .unwrap();
1973
1974            // Fork B: delete key 0 and create key 2.
1975            let fork_b_m = db
1976                .new_batch()
1977                .write(key(0), None)
1978                .write(key(2), Some(val(2)))
1979                .merkleize(&db, None)
1980                .await
1981                .unwrap();
1982
1983            // Different mutations must produce different roots.
1984            assert_ne!(fork_a_m.root(), fork_b_m.root());
1985
1986            // DB is unchanged (neither batch applied).
1987            assert_eq!(db.root(), root_before);
1988            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
1989            assert_eq!(db.get(&key(1)).await.unwrap(), None);
1990
1991            // Apply fork A only.
1992            db.apply_batch(fork_a_m).await.unwrap();
1993            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(100)));
1994            assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
1995            assert_eq!(db.get(&key(2)).await.unwrap(), None);
1996
1997            db.destroy().await.unwrap();
1998        });
1999    }
2000
2001    /// Floor raise advances correctly across a chained batch.
2002    #[test_traced("INFO")]
2003    fn test_any_batch_floor_raise_chained() {
2004        let executor = deterministic::Runner::default();
2005        executor.start(|context| async move {
2006            let ctx = context.child("db");
2007            let mut db: UnorderedVariable = UnorderedVariableDb::init(
2008                ctx.child("storage"),
2009                variable_db_config::<OneCap>("frc", &ctx),
2010            )
2011            .await
2012            .unwrap();
2013
2014            // Pre-populate with 50 keys.
2015            let init: Vec<_> = (0..50).map(|i| (key(i), Some(val(i)))).collect();
2016            commit_writes(&mut db, init, None).await;
2017            let floor_before = db.inactivity_floor_loc();
2018
2019            // Parent: update keys 0..20.
2020            let mut parent = db.new_batch();
2021            for i in 0..20 {
2022                parent = parent.write(key(i), Some(val(i + 500)));
2023            }
2024            let parent_m = parent.merkleize(&db, None).await.unwrap();
2025
2026            // Child: update keys 20..30.
2027            let mut child = parent_m.new_batch::<Sha256>();
2028            for i in 20..30 {
2029                child = child.write(key(i), Some(val(i + 500)));
2030            }
2031            let child_m = child.merkleize(&db, None).await.unwrap();
2032            db.apply_batch(child_m).await.unwrap();
2033
2034            // Floor must have advanced.
2035            assert!(db.inactivity_floor_loc() > floor_before);
2036
2037            // All keys should be accessible.
2038            for i in 0..30 {
2039                assert_eq!(
2040                    db.get(&key(i)).await.unwrap(),
2041                    Some(val(i + 500)),
2042                    "updated key {i} mismatch"
2043                );
2044            }
2045            for i in 30..50 {
2046                assert_eq!(
2047                    db.get(&key(i)).await.unwrap(),
2048                    Some(val(i)),
2049                    "untouched key {i} mismatch"
2050                );
2051            }
2052
2053            db.destroy().await.unwrap();
2054        });
2055    }
2056
2057    /// Dropping a batch without applying it leaves the DB unchanged.
2058    #[test_traced("INFO")]
2059    fn test_any_batch_abandoned() {
2060        let executor = deterministic::Runner::default();
2061        executor.start(|context| async move {
2062            let ctx = context.child("db");
2063            let mut db: UnorderedVariable = UnorderedVariableDb::init(
2064                ctx.child("storage"),
2065                variable_db_config::<OneCap>("ab", &ctx),
2066            )
2067            .await
2068            .unwrap();
2069
2070            commit_writes(&mut db, [(key(0), Some(val(0)))], None).await;
2071            let root_before = db.root();
2072
2073            // Create, populate, merkleize, then drop without apply.
2074            {
2075                let mut batch = db.new_batch();
2076                batch = batch.write(key(0), Some(val(999)));
2077                batch = batch.write(key(1), Some(val(1)));
2078                let _merkleized = batch.merkleize(&db, None).await.unwrap();
2079                // dropped here
2080            }
2081
2082            // DB state is identical.
2083            assert_eq!(db.root(), root_before);
2084            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2085            assert_eq!(db.get(&key(1)).await.unwrap(), None);
2086
2087            db.destroy().await.unwrap();
2088        });
2089    }
2090
2091    /// Applying without `commit()` publishes in memory but is not recovered after reopen.
2092    #[test_traced("INFO")]
2093    fn test_any_batch_apply_requires_commit_for_recovery() {
2094        let executor = deterministic::Runner::default();
2095        executor.start(|context| async move {
2096            let partition = "apply_requires_commit";
2097            let ctx = context.child("db");
2098            let mut db: UnorderedVariable = UnorderedVariableDb::init(
2099                ctx.child("storage"),
2100                variable_db_config::<OneCap>(partition, &ctx),
2101            )
2102            .await
2103            .unwrap();
2104
2105            let committed_root = db.root();
2106
2107            let merkleized = db
2108                .new_batch()
2109                .write(key(0), Some(val(0)))
2110                .merkleize(&db, None)
2111                .await
2112                .unwrap();
2113            db.apply_batch(merkleized).await.unwrap();
2114
2115            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2116
2117            drop(db);
2118
2119            let reopened: UnorderedVariable = UnorderedVariableDb::init(
2120                context.child("reopen"),
2121                variable_db_config::<OneCap>(partition, &context),
2122            )
2123            .await
2124            .unwrap();
2125            assert_eq!(reopened.root(), committed_root);
2126            assert_eq!(reopened.get(&key(0)).await.unwrap(), None);
2127
2128            reopened.destroy().await.unwrap();
2129        });
2130    }
2131
2132    /// Rewinding to a pruned target returns an error.
2133    #[test_traced("INFO")]
2134    fn test_any_rewind_pruned_target_errors() {
2135        let executor = deterministic::Runner::default();
2136        executor.start(|context| async move {
2137            const KEYS: u64 = 64;
2138
2139            let ctx = context.child("db");
2140            let mut db: UnorderedVariable = UnorderedVariableDb::init(
2141                ctx.child("storage"),
2142                variable_db_config::<OneCap>("rp", &ctx),
2143            )
2144            .await
2145            .unwrap();
2146
2147            let initial: Vec<_> = (0..KEYS).map(|i| (key(i), Some(val(i)))).collect();
2148            let first_range = commit_writes(&mut db, initial, None).await;
2149
2150            let mut round = 0u64;
2151            loop {
2152                round += 1;
2153                assert!(
2154                    round <= 64,
2155                    "failed to prune enough history for rewind test"
2156                );
2157
2158                let updates: Vec<_> = (0..KEYS)
2159                    .map(|i| (key(i), Some(val(1000 + round * KEYS + i))))
2160                    .collect();
2161                commit_writes(&mut db, updates, None).await;
2162
2163                db.prune(db.sync_boundary()).await.unwrap();
2164                let bounds = db.bounds().await;
2165                if bounds.start > first_range.start {
2166                    break;
2167                }
2168            }
2169
2170            let oldest_retained = db.bounds().await.start;
2171            let boundary_err = db.rewind(oldest_retained).await.unwrap_err();
2172            assert!(
2173                matches!(
2174                    boundary_err,
2175                    crate::qmdb::Error::Journal(crate::journal::Error::ItemPruned(_))
2176                ),
2177                "unexpected rewind error at retained boundary: {boundary_err:?}"
2178            );
2179
2180            let err = db.rewind(first_range.start).await.unwrap_err();
2181            assert!(
2182                matches!(
2183                    err,
2184                    crate::qmdb::Error::Journal(crate::journal::Error::ItemPruned(_))
2185                ),
2186                "unexpected rewind error: {err:?}"
2187            );
2188
2189            db.destroy().await.unwrap();
2190        });
2191    }
2192
2193    /// Rewinding rejects out-of-range targets and keeps state unchanged.
2194    #[test_traced("INFO")]
2195    fn test_any_rewind_invalid_target_errors() {
2196        let executor = deterministic::Runner::default();
2197        executor.start(|context| async move {
2198            let ctx = context.child("db");
2199            let mut db: UnorderedVariable = UnorderedVariableDb::init(
2200                ctx.child("storage"),
2201                variable_db_config::<OneCap>("ri", &ctx),
2202            )
2203            .await
2204            .unwrap();
2205
2206            let root_before = db.root();
2207            let size_before = db.size().await;
2208            db.rewind(size_before).await.unwrap();
2209            assert_eq!(db.root(), root_before);
2210            assert_eq!(db.size().await, size_before);
2211
2212            let zero_err = db.rewind(Location::new(0)).await.unwrap_err();
2213            assert!(
2214                matches!(
2215                    zero_err,
2216                    crate::qmdb::Error::Journal(crate::journal::Error::InvalidRewind(0))
2217                ),
2218                "unexpected rewind error: {zero_err:?}"
2219            );
2220            assert_eq!(db.root(), root_before);
2221            assert_eq!(db.size().await, size_before);
2222
2223            let too_large_target = Location::new(*size_before + 1);
2224            let too_large_err = db.rewind(too_large_target).await.unwrap_err();
2225            assert!(
2226                matches!(
2227                    too_large_err,
2228                    crate::qmdb::Error::Journal(crate::journal::Error::InvalidRewind(size))
2229                    if size == *too_large_target
2230                ),
2231                "unexpected rewind error: {too_large_err:?}"
2232            );
2233            assert_eq!(db.root(), root_before);
2234            assert_eq!(db.size().await, size_before);
2235
2236            db.destroy().await.unwrap();
2237        });
2238    }
2239
2240    /// Rewinding fails when the target commit's inactivity floor has been pruned, even if the
2241    /// target commit location is still retained.
2242    #[test_traced("INFO")]
2243    fn test_any_rewind_rejects_target_with_pruned_floor() {
2244        let executor = deterministic::Runner::default();
2245        executor.start(|context| async move {
2246            const KEYS: u64 = 64;
2247
2248            let ctx = context.child("db");
2249            let mut db: UnorderedVariable =
2250                UnorderedVariableDb::init(ctx.child("storage"), variable_db_config::<OneCap>("rf", &ctx))
2251                    .await
2252                    .unwrap();
2253
2254            commit_writes(&mut db, (0..KEYS).map(|i| (key(i), Some(val(i)))), None).await;
2255            commit_writes(
2256                &mut db,
2257                (0..KEYS).map(|i| (key(i), Some(val(1_000 + i)))),
2258                None,
2259            )
2260            .await;
2261
2262            let rewind_target = db.size().await;
2263            let target_floor = db.inactivity_floor_loc();
2264            let prune_loc = Location::new(*target_floor + (KEYS / 2));
2265            assert!(
2266                rewind_target > *prune_loc,
2267                "test setup expected target size > prune_loc; target={rewind_target:?}, floor={target_floor:?}"
2268            );
2269
2270            let mut round = 0u64;
2271            while db.inactivity_floor_loc() < prune_loc {
2272                round += 1;
2273                assert!(
2274                    round <= 8,
2275                    "failed to advance inactivity floor enough for floor-pruned rewind test"
2276                );
2277                commit_writes(
2278                    &mut db,
2279                    (0..KEYS).map(|i| (key(i), Some(val(10_000 + round * KEYS + i)))),
2280                    None,
2281                )
2282                .await;
2283            }
2284
2285            db.prune(prune_loc).await.unwrap();
2286            let bounds = db.bounds().await;
2287            assert!(
2288                bounds.start > *target_floor,
2289                "test setup expected pruned start beyond target floor; bounds={bounds:?}, target_floor={target_floor:?}"
2290            );
2291            assert!(
2292                rewind_target > bounds.start,
2293                "test setup expected target commit retained; target={rewind_target:?}, bounds={bounds:?}"
2294            );
2295
2296            let err = db.rewind(rewind_target).await.unwrap_err();
2297            assert!(
2298                matches!(
2299                    err,
2300                    crate::qmdb::Error::Journal(crate::journal::Error::ItemPruned(_))
2301                ),
2302                "unexpected rewind error: {err:?}"
2303            );
2304
2305            db.destroy().await.unwrap();
2306        });
2307    }
2308
2309    /// `prune()` must advance the bitmap only as far as the authenticated journal actually
2310    /// pruned. Journal pruning is section-granular while bitmap pruning rounds to chunk
2311    /// boundaries, so a coarse `items_per_section` can leave the journal retaining from the
2312    /// start while the bitmap has already crossed the next chunk boundary. A subsequent
2313    /// `rewind()` to a still-retained early commit must still succeed.
2314    #[test_traced("INFO")]
2315    fn test_any_prune_keeps_bitmap_aligned_with_journal() {
2316        let executor = deterministic::Runner::default();
2317        executor.start(|context| async move {
2318            // Bitmap chunk size in bits. The bug requires the bitmap to round across at least
2319            // one chunk boundary while the journal cannot prune any section.
2320            const BITMAP_CHUNK_BITS: u64 =
2321                commonware_utils::bitmap::Prunable::<BITMAP_CHUNK_BYTES>::CHUNK_SIZE_BITS;
2322            // Items-per-section is chosen so that no full section fits in the test's op count,
2323            // forcing the journal to retain from 0 even when prune is requested past the first
2324            // bitmap chunk boundary.
2325            const ITEMS_PER_SECTION: u64 = 2048;
2326            const { assert!(ITEMS_PER_SECTION > BITMAP_CHUNK_BITS) };
2327
2328            let ctx = context.child("db");
2329            let mut cfg = variable_db_config::<OneCap>("rg", &ctx);
2330            cfg.journal_config.items_per_section = NZU64!(ITEMS_PER_SECTION);
2331
2332            let mut db: UnorderedVariable =
2333                UnorderedVariableDb::init(ctx.child("storage"), cfg).await.unwrap();
2334
2335            commit_writes(&mut db, (0..100).map(|i| (key(i), Some(val(i)))), None).await;
2336            let rewind_target = db.size().await;
2337            // Rewind target must lie below the chunk boundary the buggy prune would advance
2338            // to; otherwise the unfixed code would not panic on truncate.
2339            assert!(
2340                *rewind_target < BITMAP_CHUNK_BITS,
2341                "rewind_target {rewind_target:?} must be < {BITMAP_CHUNK_BITS} for the bug to manifest"
2342            );
2343            let root_at_target = db.root();
2344
2345            commit_writes(
2346                &mut db,
2347                (0..700).map(|i| (key(i), Some(val(1_000 + i)))),
2348                None,
2349            )
2350            .await;
2351            commit_writes(
2352                &mut db,
2353                (0..700).map(|i| (key(i), Some(val(10_000 + i)))),
2354                None,
2355            )
2356            .await;
2357
2358            // Pre-rewind size must actually exceed the rewind target so the rewind is not a
2359            // no-op.
2360            let pre_prune_size = db.size().await;
2361            assert!(pre_prune_size > rewind_target);
2362
2363            let prune_loc = Location::new(600);
2364            // prune_loc must cross at least one bitmap chunk boundary; otherwise the buggy
2365            // bitmap prune would correctly stay at 0 and the test would pass even unfixed.
2366            assert!(
2367                *prune_loc > BITMAP_CHUNK_BITS,
2368                "prune_loc {prune_loc:?} must exceed one bitmap chunk ({BITMAP_CHUNK_BITS} bits)"
2369            );
2370            // prune_loc must lie within the first journal section so the journal cannot
2371            // prune any section, leaving bounds.start at 0 to expose the bitmap drift.
2372            assert!(
2373                *prune_loc < ITEMS_PER_SECTION,
2374                "prune_loc {prune_loc:?} must be < {ITEMS_PER_SECTION} so the journal retains section 0"
2375            );
2376            assert!(db.inactivity_floor_loc() >= prune_loc);
2377
2378            db.prune(prune_loc).await.unwrap();
2379
2380            // Journal could not prune any section, so it still retains from 0. The bitmap
2381            // must therefore also remain at 0.
2382            let bounds = db.bounds().await;
2383            assert_eq!(bounds.start, Location::new(0));
2384            assert_eq!(
2385                db.bitmap.pruned_bits(),
2386                0,
2387                "bitmap pruned past journal retained start"
2388            );
2389
2390            // Rewind to the still-retained early commit must succeed and restore visible
2391            // state (root match implies the snapshot was rebuilt correctly).
2392            db.rewind(rewind_target).await.unwrap();
2393            assert_eq!(db.size().await, rewind_target);
2394            assert_eq!(db.root(), root_at_target);
2395
2396            db.destroy().await.unwrap();
2397        });
2398    }
2399
2400    // --- MMB family tests ---
2401    //
2402    // The tests above use MMR-backed databases (via the concrete Db type aliases). The tests
2403    // below verify the same core operations work with the MMB family, exercising the generic
2404    // `init_fixed`/`init_variable` path with `mmb::Family`.
2405
2406    type MmbVariable = super::db::Db<
2407        crate::merkle::mmb::Family,
2408        Context,
2409        crate::journal::contiguous::variable::Journal<
2410            Context,
2411            super::operation::Operation<
2412                crate::merkle::mmb::Family,
2413                super::operation::update::Unordered<Digest, super::value::VariableEncoding<Digest>>,
2414            >,
2415        >,
2416        crate::index::unordered::Index<OneCap, crate::merkle::Location<crate::merkle::mmb::Family>>,
2417        Sha256,
2418        super::operation::update::Unordered<Digest, super::value::VariableEncoding<Digest>>,
2419        { crate::qmdb::any::BITMAP_CHUNK_BYTES },
2420        Sequential,
2421    >;
2422
2423    async fn open_mmb_db(context: Context, suffix: &str) -> MmbVariable {
2424        let cfg = variable_db_config::<OneCap>(suffix, &context);
2425        super::init(context, cfg).await.unwrap()
2426    }
2427
2428    async fn commit_writes_mmb(
2429        db: &mut MmbVariable,
2430        writes: impl IntoIterator<Item = (Digest, Option<Digest>)>,
2431        metadata: Option<Digest>,
2432    ) {
2433        let mut batch = db.new_batch();
2434        for (k, v) in writes {
2435            batch = batch.write(k, v);
2436        }
2437        let merkleized = batch.merkleize(db, metadata).await.unwrap();
2438        db.apply_batch(merkleized).await.unwrap();
2439        db.commit().await.unwrap();
2440    }
2441
2442    #[test_traced("INFO")]
2443    fn test_mmb_batch_crud() {
2444        let executor = deterministic::Runner::default();
2445        executor.start(|context| async move {
2446            let mut db = open_mmb_db(context.child("db"), "crud").await;
2447
2448            // Insert and read back.
2449            commit_writes_mmb(&mut db, [(key(0), Some(val(0)))], None).await;
2450            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2451
2452            // Update existing key.
2453            commit_writes_mmb(&mut db, [(key(0), Some(val(1)))], None).await;
2454            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(1)));
2455
2456            // Delete key.
2457            commit_writes_mmb(&mut db, [(key(0), None)], None).await;
2458            assert!(db.get(&key(0)).await.unwrap().is_none());
2459
2460            // Multiple keys.
2461            commit_writes_mmb(
2462                &mut db,
2463                [(key(1), Some(val(1))), (key(2), Some(val(2)))],
2464                None,
2465            )
2466            .await;
2467            assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
2468            assert_eq!(db.get(&key(2)).await.unwrap(), Some(val(2)));
2469
2470            db.destroy().await.unwrap();
2471        });
2472    }
2473
2474    #[test_traced("INFO")]
2475    fn test_mmb_batch_empty() {
2476        let executor = deterministic::Runner::default();
2477        executor.start(|context| async move {
2478            let mut db = open_mmb_db(context.child("db"), "empty").await;
2479            let root_before = db.root();
2480
2481            let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
2482            db.apply_batch(merkleized).await.unwrap();
2483            assert_ne!(db.root(), root_before);
2484
2485            commit_writes_mmb(&mut db, [(key(0), Some(val(0)))], None).await;
2486            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2487
2488            db.destroy().await.unwrap();
2489        });
2490    }
2491
2492    #[test_traced("INFO")]
2493    fn test_mmb_batch_metadata() {
2494        let executor = deterministic::Runner::default();
2495        executor.start(|context| async move {
2496            let mut db = open_mmb_db(context.child("db"), "meta").await;
2497
2498            let metadata = val(42);
2499            commit_writes_mmb(&mut db, [(key(0), Some(val(0)))], Some(metadata)).await;
2500            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
2501
2502            let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
2503            db.apply_batch(merkleized).await.unwrap();
2504            assert_eq!(db.get_metadata().await.unwrap(), None);
2505
2506            db.destroy().await.unwrap();
2507        });
2508    }
2509
2510    #[test_traced("WARN")]
2511    fn test_mmb_recovery() {
2512        let executor = deterministic::Runner::default();
2513        executor.start(|context| async move {
2514            let mut db =
2515                open_mmb_db(context.child("db").with_attribute("index", 0), "recovery").await;
2516
2517            commit_writes_mmb(&mut db, [(key(0), Some(val(0)))], Some(val(99))).await;
2518            commit_writes_mmb(&mut db, [(key(1), Some(val(1)))], None).await;
2519
2520            let root = db.root();
2521            let bounds = db.bounds().await;
2522            db.sync().await.unwrap();
2523            drop(db);
2524
2525            // Reopen and verify state.
2526            let db = open_mmb_db(context.child("db").with_attribute("index", 1), "recovery").await;
2527            assert_eq!(db.root(), root);
2528            assert_eq!(db.bounds().await, bounds);
2529            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2530            assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
2531            assert_eq!(db.get_metadata().await.unwrap(), None);
2532
2533            db.destroy().await.unwrap();
2534        });
2535    }
2536
2537    #[test_traced("INFO")]
2538    fn test_mmb_prune() {
2539        let executor = deterministic::Runner::default();
2540        executor.start(|context| async move {
2541            let mut db = open_mmb_db(context.child("db"), "prune").await;
2542
2543            for i in 0u64..20 {
2544                commit_writes_mmb(&mut db, [(key(i), Some(val(i)))], None).await;
2545            }
2546
2547            let floor = db.inactivity_floor_loc();
2548            db.prune(floor).await.unwrap();
2549
2550            // All keys still accessible.
2551            for i in 0u64..20 {
2552                assert_eq!(db.get(&key(i)).await.unwrap(), Some(val(i)));
2553            }
2554
2555            db.destroy().await.unwrap();
2556        });
2557    }
2558
2559    /// One-stage pipelining lets the next batch be built while the prior batch commits.
2560    #[test_traced("INFO")]
2561    fn test_any_batch_single_stage_pipeline() {
2562        let executor = deterministic::Runner::default();
2563        executor.start(|context| async move {
2564            let ctx = context.child("db");
2565            let mut db: UnorderedVariable = UnorderedVariableDb::init(
2566                ctx.child("storage"),
2567                variable_db_config::<OneCap>("pipe", &ctx),
2568            )
2569            .await
2570            .unwrap();
2571
2572            {
2573                let mut batch = db.new_batch();
2574                batch = batch.write(key(0), Some(val(0)));
2575                let merkleized = batch.merkleize(&db, None).await.unwrap();
2576                db.apply_batch(merkleized).await.unwrap();
2577            }
2578
2579            let (child_merkleized, commit_result) = futures::join!(
2580                async {
2581                    assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2582                    let mut child = db.new_batch();
2583                    child = child.write(key(1), Some(val(1)));
2584                    child.merkleize(&db, None).await.unwrap()
2585                },
2586                db.commit(),
2587            );
2588            commit_result.unwrap();
2589
2590            db.apply_batch(child_merkleized).await.unwrap();
2591            db.commit().await.unwrap();
2592
2593            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2594            assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
2595
2596            db.destroy().await.unwrap();
2597        });
2598    }
2599}
2600
2601#[cfg(test)]
2602mod bitmap_tests {
2603    //! Regression tests for activity-bitmap maintenance in `any::Db`. The mutation code in
2604    //! `apply_batch`, `prune_bitmap`, and `rewind` is independent of the snapshot index variant,
2605    //! so one variant (`unordered::variable`) suffices as the test bed.
2606    use crate::{
2607        merkle::Location,
2608        qmdb::any::unordered::variable::test::{create_test_config, AnyTest},
2609    };
2610    use commonware_cryptography::{Hasher, Sha256};
2611    use commonware_macros::test_traced;
2612    use commonware_runtime::{
2613        deterministic::{self, Context},
2614        Runner as _, Supervisor as _,
2615    };
2616    use commonware_utils::bitmap::Readable as _;
2617
2618    /// Open a fresh test DB.
2619    async fn open_db(context: Context) -> AnyTest {
2620        let cfg = create_test_config(0, &context);
2621        AnyTest::init(context, cfg).await.unwrap()
2622    }
2623
2624    /// Active locations (bit=1) in `[pruned_bits, len)` of `db.bitmap`.
2625    fn bitmap_active_locs(db: &AnyTest) -> Vec<u64> {
2626        let b = &db.bitmap;
2627        (b.pruned_bits()..b.len())
2628            .filter(|loc| b.get_bit(*loc))
2629            .collect()
2630    }
2631
2632    /// Commit, drop, reopen, and assert the rebuilt bitmap matches the in-memory bitmap.
2633    async fn assert_oracle_round_trip(db: AnyTest, context: Context, label: &str) -> AnyTest {
2634        let pre_active = bitmap_active_locs(&db);
2635        let pre_len = db.bitmap.len();
2636        let pre_pruned = db.bitmap.pruned_bits();
2637
2638        db.commit().await.unwrap();
2639        drop(db);
2640
2641        let db = open_db(context.child("reopen").with_attribute("case", label)).await;
2642
2643        assert_eq!(
2644            db.bitmap.pruned_bits(),
2645            pre_pruned,
2646            "pruned_bits diverged on reopen",
2647        );
2648        assert_eq!(db.bitmap.len(), pre_len, "bitmap len diverged on reopen");
2649        assert_eq!(
2650            bitmap_active_locs(&db),
2651            pre_active,
2652            "active locations diverged on reopen",
2653        );
2654        db
2655    }
2656
2657    /// CommitFloor convention: only the *current* `last_commit_loc` carries bit=1; every earlier
2658    /// (now intermediate) commit boundary carries bit=0.
2659    ///
2660    /// Maintained by `apply_batch`'s explicit demote-then-promote pair on CommitFloor bits. If
2661    /// the demote step were missed, intermediate commits would persist at bit=1.
2662    #[test_traced]
2663    fn current_commit_floor_bit_is_one_others_zero() {
2664        deterministic::Runner::default().start(|context| async move {
2665            let mut db = open_db(context.child("db")).await;
2666
2667            // Apply three single-write batches; each produces one CommitFloor op.
2668            let mut commit_locs = Vec::new();
2669            for i in 0..3u64 {
2670                let key = Sha256::hash(&i.to_be_bytes());
2671                let batch = db
2672                    .new_batch()
2673                    .write(key, Some(vec![i as u8]))
2674                    .merkleize(&db, None)
2675                    .await
2676                    .unwrap();
2677                commit_locs.push(Location::<crate::merkle::mmr::Family>::new(
2678                    batch.bounds.total_size - 1,
2679                ));
2680                db.apply_batch(batch).await.unwrap();
2681            }
2682            db.commit().await.unwrap();
2683
2684            // Setup sanity: three strictly-increasing commit locations, all within the bitmap.
2685            assert_eq!(commit_locs.len(), 3);
2686            assert!(*commit_locs[0] < *commit_locs[1]);
2687            assert!(*commit_locs[1] < *commit_locs[2]);
2688            assert!(*commit_locs[2] < db.bitmap.len());
2689
2690            // Earlier two commits are intermediate -> bit=0.
2691            assert!(!db.bitmap.get_bit(*commit_locs[0]));
2692            assert!(!db.bitmap.get_bit(*commit_locs[1]));
2693            // Most recent commit is current -> bit=1.
2694            assert!(db.bitmap.get_bit(*commit_locs[2]));
2695
2696            let db = assert_oracle_round_trip(db, context, "commit_floor").await;
2697            db.destroy().await.unwrap();
2698        });
2699    }
2700
2701    /// `any::Db::rewind` restores bitmap state correctly.
2702    ///
2703    /// `any::rewind` is the sole writer of the bitmap during rewind; it must:
2704    ///   1. truncate the bitmap to the rewind size,
2705    ///   2. flip restored locs (committed snapshot entries the rewound tail had superseded) back
2706    ///      to active,
2707    ///   3. set the rewound tail's CommitFloor bit to 1 (the new current commit).
2708    ///
2709    /// The oracle round-trip catches all three: any divergence from `init_from_log`'s rebuild
2710    /// fails the comparison.
2711    #[test_traced]
2712    fn rewind_restores_bitmap_to_target_commit() {
2713        deterministic::Runner::default().start(|context| async move {
2714            let mut db = open_db(context.child("db")).await;
2715            let k1 = Sha256::hash(&[1]);
2716            let k2 = Sha256::hash(&[2]);
2717
2718            // Two committed batches; remember the size after the first.
2719            let b1 = db
2720                .new_batch()
2721                .write(k1, Some(vec![10]))
2722                .merkleize(&db, None)
2723                .await
2724                .unwrap();
2725            db.apply_batch(b1).await.unwrap();
2726            db.commit().await.unwrap();
2727            let size_after_first = Location::new(*db.last_commit_loc + 1);
2728
2729            let b2 = db
2730                .new_batch()
2731                .write(k2, Some(vec![20]))
2732                .merkleize(&db, None)
2733                .await
2734                .unwrap();
2735            db.apply_batch(b2).await.unwrap();
2736
2737            // Setup sanity: both keys present, db has advanced past size_after_first.
2738            assert_eq!(db.get(&k1).await.unwrap(), Some(vec![10]));
2739            assert_eq!(db.get(&k2).await.unwrap(), Some(vec![20]));
2740            assert!(*db.last_commit_loc + 1 > *size_after_first);
2741
2742            // Rewind to the state after the first commit.
2743            db.rewind(size_after_first).await.unwrap();
2744
2745            // Post-rewind: k2 gone, k1 remains.
2746            assert_eq!(db.get(&k1).await.unwrap(), Some(vec![10]));
2747            assert!(db.get(&k2).await.unwrap().is_none());
2748
2749            let db = assert_oracle_round_trip(db, context, "rewind").await;
2750            db.destroy().await.unwrap();
2751        });
2752    }
2753
2754    /// Floor-scan falls through to the uncommitted tail when the committed bitmap region runs
2755    /// out of active bits.
2756    ///
2757    /// `next_candidate` returns set-bit locations within `[floor, bitmap.len)` (skipping inactive
2758    /// ones), then sequential candidates beyond `bitmap.len` (uncommitted ancestor ops not
2759    /// tracked in the bitmap). `is_active_at` revalidation in the floor-raise loop is the only
2760    /// thing that prevents stale ancestor locations from being moved when a child batch
2761    /// supersedes the same key.
2762    ///
2763    /// Setup: 1 committed key + uncommitted parent re-touching that key + uncommitted child that
2764    /// supersedes the key AND writes many other keys. The added user mutations push
2765    /// `total_steps` past the active bits available in the committed region, forcing the scan
2766    /// to walk into the tail.
2767    ///
2768    /// Failure modes caught:
2769    /// - tail-fallthrough boundary off-by-one → wrong root,
2770    /// - missing `is_active_at` revalidation → parent's superseded loc gets moved → divergent
2771    ///   root,
2772    /// - bitmap state inconsistent with `init_from_log` → oracle reopen mismatch.
2773    #[test_traced]
2774    fn floor_scan_falls_through_to_uncommitted_tail() {
2775        deterministic::Runner::default().start(|context| async move {
2776            let mut db = open_db(context.child("db")).await;
2777            let anchor = Sha256::hash(&[0xAA]);
2778
2779            // Commit one key.
2780            let b = db
2781                .new_batch()
2782                .write(anchor, Some(vec![1]))
2783                .merkleize(&db, None)
2784                .await
2785                .unwrap();
2786            db.apply_batch(b).await.unwrap();
2787
2788            // Setup sanity: anchor in committed snapshot.
2789            assert_eq!(db.get(&anchor).await.unwrap(), Some(vec![1]));
2790            let committed_bitmap_len = db.bitmap.len();
2791
2792            // Uncommitted parent: re-touch anchor at a location above the committed bitmap.
2793            let parent = db
2794                .new_batch()
2795                .write(anchor, Some(vec![2]))
2796                .merkleize(&db, None)
2797                .await
2798                .unwrap();
2799            assert!(
2800                parent.bounds.total_size > committed_bitmap_len,
2801                "parent must extend past committed bitmap to exercise the tail path",
2802            );
2803
2804            // Uncommitted child: supersede anchor + add 16 more writes. The extra user_steps
2805            // ensure `total_steps` exceeds active bits in the committed region, forcing the
2806            // floor-raise scan into the uncommitted tail.
2807            let mut child_batch = parent.new_batch::<Sha256>();
2808            child_batch = child_batch.write(anchor, Some(vec![3]));
2809            for i in 0..16u64 {
2810                let k = Sha256::hash(&(1000 + i).to_be_bytes());
2811                child_batch = child_batch.write(k, Some(vec![i as u8]));
2812            }
2813            let child = child_batch.merkleize(&db, None).await.unwrap();
2814            assert!(
2815                child.bounds.total_size > committed_bitmap_len,
2816                "child must include an uncommitted tail beyond committed bitmap",
2817            );
2818            let expected_root = child.root();
2819
2820            // Apply. If tail-fallthrough or revalidation were wrong, the produced root would
2821            // diverge from the merkleize-time root.
2822            db.apply_batch(child).await.unwrap();
2823            assert_eq!(db.root(), expected_root);
2824            assert_eq!(db.get(&anchor).await.unwrap(), Some(vec![3]));
2825
2826            let db = assert_oracle_round_trip(db, context, "tail").await;
2827            db.destroy().await.unwrap();
2828        });
2829    }
2830}