Skip to main content

commonware_storage/qmdb/any/
mod.rs

1//! An _Any_ authenticated database provides succinct proofs of any value ever associated with a
2//! key.
3//!
4//! The specific variants provided within this module include:
5//! - Unordered: The database does not maintain or require any ordering over the key space.
6//!   - Fixed-size values
7//!   - Variable-size values
8//! - Ordered: The database maintains a total order over active keys.
9//!   - Fixed-size values
10//!   - Variable-size values
11//!
12//! # Examples
13//!
14//! ```ignore
15//! // 1. Create a batch and apply it.
16//! let batch = db.new_batch()
17//!     .write(key, Some(value))    // upsert
18//!     .write(other_key, None)     // delete
19//!     .merkleize(&db, None).await?;
20//! let root = batch.root();        // speculative root
21//! db.apply_batch(batch).await?;
22//! db.commit().await?;             // flush to disk
23//! ```
24//!
25//! ```ignore
26//! // 2. Fork two batches from the same parent. Apply one; the other is stale.
27//! let parent = db.new_batch().write(k1, Some(v1)).merkleize(&db, None).await?;
28//! let fork_a = parent.new_batch::<Sha256>().write(k2, Some(v2)).merkleize(&db, None).await?;
29//! let fork_b = parent.new_batch::<Sha256>().write(k3, Some(v3)).merkleize(&db, None).await?;
30//!
31//! db.apply_batch(fork_a).await?;           // OK -- includes parent
32//! assert!(db.apply_batch(fork_b).is_err()) // StaleBatch
33//! ```
34//!
35//! ```ignore
36//! // 3. Chain two batches. Apply parent first, then child.
37//! let parent = db.new_batch().write(k1, Some(v1)).merkleize(&db, None).await?;
38//! let child = parent.new_batch::<Sha256>().write(k2, Some(v2)).merkleize(&db, None).await?;
39//!
40//! db.apply_batch(parent).await?;           // apply parent
41//! db.apply_batch(child).await?;            // ancestors skipped automatically
42//! db.commit().await?;
43//! ```
44//!
45//! ```ignore
46//! // 4. Chain two batches. Apply child directly (includes parent's changes).
47//! let parent = db.new_batch().write(k1, Some(v1)).merkleize(&db, None).await?;
48//! let child = parent.new_batch::<Sha256>().write(k2, Some(v2)).merkleize(&db, None).await?;
49//!
50//! db.apply_batch(child).await?;            // OK -- includes parent
51//! assert!(db.apply_batch(parent).is_err()) // StaleBatch
52//! ```
53//!
54//! ```ignore
55//! // 5. Two independent chains. Commit the tail of one; the other chain is stale.
56//! let a1 = db.new_batch().write(k1, Some(v1)).merkleize(&db, None).await?;
57//! let a2 = a1.new_batch::<Sha256>().write(k2, Some(v2)).merkleize(&db, None).await?;
58//!
59//! let b1 = db.new_batch().write(k3, Some(v3)).merkleize(&db, None).await?;
60//! let b2 = b1.new_batch::<Sha256>().write(k4, Some(v4)).merkleize(&db, None).await?;
61//!
62//! db.apply_batch(a2).await?;               // OK -- includes a1
63//! assert!(db.apply_batch(b2).is_err())     // StaleBatch
64//! ```
65
66use crate::{
67    index::Factory as IndexFactory,
68    journal::{
69        authenticated::Inner,
70        contiguous::{fixed::Config as FConfig, variable::Config as VConfig},
71    },
72    merkle::{journaled::Config as MerkleConfig, Family, Location},
73    qmdb::{
74        any::operation::{Operation, Update},
75        operation::Committable,
76    },
77    translator::Translator,
78    Context,
79};
80use commonware_codec::CodecShared;
81use commonware_cryptography::Hasher;
82use tracing::warn;
83
84pub mod batch;
85pub mod db;
86pub mod operation;
87#[cfg(any(test, feature = "test-traits"))]
88pub mod traits;
89pub mod value;
90pub use value::{FixedValue, ValueEncoding, VariableValue};
91pub mod ordered;
92pub(crate) mod sync;
93pub mod unordered;
94
95/// Configuration for an `Any` authenticated db.
96#[derive(Clone)]
97pub struct Config<T: Translator, J> {
98    /// Configuration for the Merkle structure backing the authenticated journal.
99    pub merkle_config: MerkleConfig,
100
101    /// Configuration for the operations log journal.
102    pub journal_config: J,
103
104    /// The translator used by the compressed index.
105    pub translator: T,
106}
107
108/// Configuration for an `Any` authenticated db with fixed-size values.
109pub type FixedConfig<T> = Config<T, FConfig>;
110
111/// Configuration for an `Any` authenticated db with variable-sized values.
112pub type VariableConfig<T, C> = Config<T, VConfig<C>>;
113
114/// Initialize an `Any` authenticated db from the given config.
115pub async fn init<F, E, U, H, T, I, J, Cb>(
116    context: E,
117    cfg: Config<T, J::Config>,
118    known_inactivity_floor: Option<Location<F>>,
119    callback: Cb,
120) -> Result<db::Db<F, E, J, I, H, U>, crate::qmdb::Error<F>>
121where
122    F: Family,
123    E: Context,
124    U: Update + Send + Sync,
125    H: Hasher,
126    T: Translator,
127    I: IndexFactory<T, Value = Location<F>>,
128    J: Inner<E, Item = Operation<F, U>>,
129    Operation<F, U>: Committable + CodecShared,
130    Cb: FnMut(bool, Option<Location<F>>),
131{
132    let mut log = J::init::<F, H>(
133        context.with_label("log"),
134        cfg.merkle_config,
135        cfg.journal_config,
136        Operation::is_commit,
137    )
138    .await?;
139
140    if log.size().await == 0 {
141        warn!("Authenticated log is empty, initializing new db");
142        let commit_floor = Operation::CommitFloor(None, Location::new(0));
143        log.append(&commit_floor).await?;
144        log.sync().await?;
145    }
146
147    let index = I::new(context.with_label("index"), cfg.translator);
148    db::Db::init_from_log(index, log, known_inactivity_floor, callback).await
149}
150
151#[cfg(test)]
152// pub(crate) so qmdb/current can use the generic tests.
153pub(crate) mod test {
154    use super::*;
155    use crate::{
156        journal::contiguous::{fixed::Config as FConfig, variable::Config as VConfig},
157        qmdb::any::{FixedConfig, MerkleConfig, VariableConfig},
158        translator::OneCap,
159    };
160    use commonware_codec::{Codec, CodecShared};
161    use commonware_cryptography::{sha256::Digest, Hasher, Sha256};
162    use commonware_runtime::{
163        buffer::paged::CacheRef, deterministic::Context, BufferPooler, Metrics,
164    };
165    use commonware_utils::{NZUsize, NZU16, NZU64};
166    use core::{future::Future, pin::Pin};
167    use std::{
168        collections::HashMap,
169        num::{NonZeroU16, NonZeroUsize},
170    };
171
172    pub(crate) fn colliding_digest(prefix: u8, suffix: u64) -> Digest {
173        let mut bytes = [0u8; 32];
174        bytes[0] = prefix;
175        bytes[24..].copy_from_slice(&suffix.to_be_bytes());
176        Digest::from(bytes)
177    }
178
179    // Janky page & cache sizes to exercise boundary conditions.
180    const PAGE_SIZE: NonZeroU16 = NZU16!(101);
181    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(11);
182
183    pub(crate) fn fixed_db_config<T: Translator + Default>(
184        suffix: &str,
185        pooler: &impl BufferPooler,
186    ) -> FixedConfig<T> {
187        let page_cache = CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE);
188        FixedConfig {
189            merkle_config: MerkleConfig {
190                journal_partition: format!("journal-{suffix}"),
191                metadata_partition: format!("metadata-{suffix}"),
192                items_per_blob: NZU64!(11),
193                write_buffer: NZUsize!(1024),
194                thread_pool: None,
195                page_cache: page_cache.clone(),
196            },
197            journal_config: FConfig {
198                partition: format!("log-journal-{suffix}"),
199                items_per_blob: NZU64!(7),
200                page_cache,
201                write_buffer: NZUsize!(1024),
202            },
203            translator: T::default(),
204        }
205    }
206
207    pub(crate) fn variable_db_config<T: Translator + Default>(
208        suffix: &str,
209        pooler: &impl BufferPooler,
210    ) -> VariableConfig<T, ((), ())> {
211        let page_cache = CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE);
212        VariableConfig {
213            merkle_config: MerkleConfig {
214                journal_partition: format!("journal-{suffix}"),
215                metadata_partition: format!("metadata-{suffix}"),
216                items_per_blob: NZU64!(11),
217                write_buffer: NZUsize!(1024),
218                thread_pool: None,
219                page_cache: page_cache.clone(),
220            },
221            journal_config: VConfig {
222                partition: format!("log-journal-{suffix}"),
223                items_per_section: NZU64!(7),
224                compression: None,
225                codec_config: ((), ()),
226                page_cache,
227                write_buffer: NZUsize!(1024),
228            },
229            translator: T::default(),
230        }
231    }
232
233    use crate::{
234        index::Unordered as UnorderedIndex,
235        journal::contiguous::Mutable,
236        merkle::mmr,
237        qmdb::any::{
238            db::Db as AnyDb,
239            operation::{update::Update as UpdateTrait, Operation as AnyOperation},
240            traits::{DbAny, Provable, UnmerkleizedBatch as _},
241        },
242    };
243
244    type Error = crate::qmdb::Error<mmr::Family>;
245    type Location = mmr::Location;
246
247    pub(crate) trait RewindableDb {
248        fn rewind_to_size(
249            &mut self,
250            size: Location,
251        ) -> impl Future<Output = Result<(), Error>> + Send;
252    }
253
254    impl<E, U, C, I, H> RewindableDb for AnyDb<mmr::Family, E, C, I, H, U>
255    where
256        E: crate::Context,
257        U: UpdateTrait,
258        C: Mutable<Item = AnyOperation<mmr::Family, U>>,
259        I: UnorderedIndex<Value = Location>,
260        H: Hasher,
261        AnyOperation<mmr::Family, U>: Codec,
262    {
263        async fn rewind_to_size(&mut self, size: Location) -> Result<(), Error> {
264            self.rewind(size).await?;
265            Ok(())
266        }
267    }
268
269    /// Test recovery on non-empty db.
270    pub(crate) async fn test_any_db_non_empty_recovery<F: Family, D, V: Clone + CodecShared>(
271        context: Context,
272        mut db: D,
273        reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
274        make_value: impl Fn(u64) -> V,
275    ) where
276        D: DbAny<F, Key = Digest, Value = V, Digest = Digest>,
277    {
278        const ELEMENTS: u64 = 1000;
279
280        // Commit initial batch.
281        {
282            let mut batch = db.new_batch();
283            for i in 0u64..ELEMENTS {
284                let k = Sha256::hash(&i.to_be_bytes());
285                let v = make_value(i * 1000);
286                batch = batch.write(k, Some(v));
287            }
288            let merkleized = batch.merkleize(&db, None).await.unwrap();
289            db.apply_batch(merkleized).await.unwrap();
290        }
291        db.commit().await.unwrap();
292        db.prune(db.inactivity_floor_loc().await).await.unwrap();
293        let root = db.root();
294        let op_count = db.size().await;
295        let inactivity_floor_loc = db.inactivity_floor_loc().await;
296
297        let db = reopen_db(context.with_label("reopen1")).await;
298        assert_eq!(db.size().await, op_count);
299        assert_eq!(db.inactivity_floor_loc().await, inactivity_floor_loc);
300        assert_eq!(db.root(), root);
301
302        // Write without applying (unapplied batch should be lost on reopen).
303        {
304            let mut batch = db.new_batch();
305            for i in 0u64..ELEMENTS {
306                let k = Sha256::hash(&i.to_be_bytes());
307                let v = make_value((i + 1) * 10000);
308                batch = batch.write(k, Some(v));
309            }
310            let _merkleized = batch.merkleize(&db, None).await.unwrap();
311        }
312        let db = reopen_db(context.with_label("reopen2")).await;
313        assert_eq!(db.size().await, op_count);
314        assert_eq!(db.inactivity_floor_loc().await, inactivity_floor_loc);
315        assert_eq!(db.root(), root);
316
317        // Write without applying again.
318        {
319            let mut batch = db.new_batch();
320            for i in 0u64..ELEMENTS {
321                let k = Sha256::hash(&i.to_be_bytes());
322                let v = make_value((i + 1) * 10000);
323                batch = batch.write(k, Some(v));
324            }
325            let _merkleized = batch.merkleize(&db, None).await.unwrap();
326        }
327        let db = reopen_db(context.with_label("reopen3")).await;
328        assert_eq!(db.size().await, op_count);
329        assert_eq!(db.root(), root);
330
331        // Three rounds of unapplied batches.
332        for _ in 0..3 {
333            let mut batch = db.new_batch();
334            for i in 0u64..ELEMENTS {
335                let k = Sha256::hash(&i.to_be_bytes());
336                let v = make_value((i + 1) * 10000);
337                batch = batch.write(k, Some(v));
338            }
339            let _merkleized = batch.merkleize(&db, None).await.unwrap();
340        }
341        let mut db = reopen_db(context.with_label("reopen4")).await;
342        assert_eq!(db.size().await, op_count);
343        assert_eq!(db.root(), root);
344
345        // Now actually commit a batch.
346        {
347            let mut batch = db.new_batch();
348            for i in 0u64..ELEMENTS {
349                let k = Sha256::hash(&i.to_be_bytes());
350                let v = make_value((i + 1) * 10000);
351                batch = batch.write(k, Some(v));
352            }
353            let merkleized = batch.merkleize(&db, None).await.unwrap();
354            db.apply_batch(merkleized).await.unwrap();
355        }
356        db.commit().await.unwrap();
357        let db = reopen_db(context.with_label("reopen5")).await;
358        assert!(db.size().await > op_count);
359        assert_ne!(db.inactivity_floor_loc().await, inactivity_floor_loc);
360        assert_ne!(db.root(), root);
361
362        db.destroy().await.unwrap();
363    }
364
365    /// Test recovery on empty db.
366    pub(crate) async fn test_any_db_empty_recovery<F: Family, D, V: Clone + CodecShared>(
367        context: Context,
368        db: D,
369        reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
370        make_value: impl Fn(u64) -> V,
371    ) where
372        D: DbAny<F, Key = Digest, Value = V, Digest = Digest>,
373    {
374        let root = db.root();
375
376        let db = reopen_db(context.with_label("reopen1")).await;
377        assert_eq!(db.size().await, 1);
378        assert_eq!(db.root(), root);
379
380        // Write without applying (unapplied batch should be lost on reopen).
381        {
382            let mut batch = db.new_batch();
383            for i in 0u64..1000 {
384                let k = Sha256::hash(&i.to_be_bytes());
385                let v = make_value((i + 1) * 10000);
386                batch = batch.write(k, Some(v));
387            }
388            let _merkleized = batch.merkleize(&db, None).await.unwrap();
389        }
390        let db = reopen_db(context.with_label("reopen2")).await;
391        assert_eq!(db.size().await, 1);
392        assert_eq!(db.root(), root);
393
394        // Write without applying again.
395        {
396            let mut batch = db.new_batch();
397            for i in 0u64..1000 {
398                let k = Sha256::hash(&i.to_be_bytes());
399                let v = make_value((i + 1) * 10000);
400                batch = batch.write(k, Some(v));
401            }
402            let _merkleized = batch.merkleize(&db, None).await.unwrap();
403        }
404        drop(db);
405        let db = reopen_db(context.with_label("reopen3")).await;
406        assert_eq!(db.size().await, 1);
407        assert_eq!(db.root(), root);
408
409        // Three rounds of unapplied batches.
410        for _ in 0..3 {
411            let mut batch = db.new_batch();
412            for i in 0u64..1000 {
413                let k = Sha256::hash(&i.to_be_bytes());
414                let v = make_value((i + 1) * 10000);
415                batch = batch.write(k, Some(v));
416            }
417            let _merkleized = batch.merkleize(&db, None).await.unwrap();
418        }
419        drop(db);
420        let mut db = reopen_db(context.with_label("reopen4")).await;
421        assert_eq!(db.size().await, 1);
422        assert_eq!(db.root(), root);
423
424        // Now actually commit a batch.
425        {
426            let mut batch = db.new_batch();
427            for i in 0u64..1000 {
428                let k = Sha256::hash(&i.to_be_bytes());
429                let v = make_value((i + 1) * 10000);
430                batch = batch.write(k, Some(v));
431            }
432            let merkleized = batch.merkleize(&db, None).await.unwrap();
433            db.apply_batch(merkleized).await.unwrap();
434        }
435        db.commit().await.unwrap();
436        drop(db);
437        let db = reopen_db(context.with_label("reopen5")).await;
438        assert!(db.size().await > 1);
439        assert_ne!(db.root(), root);
440
441        db.destroy().await.unwrap();
442    }
443
444    /// Test rewinding to a prior committed state and recovering that state after reopen.
445    pub(crate) async fn test_any_db_rewind_recovery<D, V>(
446        context: Context,
447        mut db: D,
448        reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
449        make_value: impl Fn(u64) -> V,
450    ) where
451        D: DbAny<mmr::Family, Key = Digest, Value = V, Digest = Digest> + RewindableDb,
452        V: Clone + CodecShared + Eq + std::fmt::Debug,
453    {
454        let key0 = Sha256::hash(&0u64.to_be_bytes());
455        let key1 = Sha256::hash(&1u64.to_be_bytes());
456        let key2 = Sha256::hash(&2u64.to_be_bytes());
457        let initial_root = db.root();
458        let initial_size = db.size().await;
459        let initial_floor = db.inactivity_floor_loc().await;
460
461        // Empty-batch rewind on an otherwise empty DB should apply no snapshot undos.
462        let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
463        let empty_range = db.apply_batch(merkleized).await.unwrap();
464        db.commit().await.unwrap();
465        assert_eq!(empty_range.start, initial_size);
466        assert_eq!(db.size().await, empty_range.end);
467        db.rewind_to_size(initial_size).await.unwrap();
468        assert_eq!(db.root(), initial_root);
469        assert_eq!(db.size().await, initial_size);
470        assert_eq!(db.inactivity_floor_loc().await, initial_floor);
471        assert_eq!(db.get_metadata().await.unwrap(), None);
472
473        let value0_a = make_value(10);
474        let value1_a = make_value(11);
475        let metadata_a = make_value(12);
476
477        let merkleized = db
478            .new_batch()
479            .write(key0, Some(value0_a.clone()))
480            .write(key1, Some(value1_a.clone()))
481            .merkleize(&db, Some(metadata_a.clone()))
482            .await
483            .unwrap();
484        let range_a = db.apply_batch(merkleized).await.unwrap();
485        db.commit().await.unwrap();
486
487        let root_a = db.root();
488        let size_a = db.size().await;
489        let floor_a = db.inactivity_floor_loc().await;
490        assert_eq!(size_a, range_a.end);
491
492        let value0_b = make_value(20);
493        let value2_b = make_value(21);
494        let metadata_b = make_value(22);
495
496        let merkleized = db
497            .new_batch()
498            .write(key0, Some(value0_b))
499            .write(key1, None)
500            .write(key2, Some(value2_b))
501            .merkleize(&db, Some(metadata_b))
502            .await
503            .unwrap();
504        let range_b = db.apply_batch(merkleized).await.unwrap();
505        db.commit().await.unwrap();
506        assert_eq!(range_b.start, size_a);
507        assert_ne!(db.root(), root_a);
508
509        let value0_c = make_value(30);
510        let value1_c = make_value(31);
511        let metadata_c = make_value(32);
512        let merkleized = db
513            .new_batch()
514            .write(key0, Some(value0_c))
515            .write(key1, Some(value1_c))
516            .write(key2, None)
517            .merkleize(&db, Some(metadata_c))
518            .await
519            .unwrap();
520        db.apply_batch(merkleized).await.unwrap();
521        db.commit().await.unwrap();
522
523        // Rewind across a tail where:
524        // - the same key (`key0`) was updated multiple times
525        // - `key1` was deleted then recreated (exercises net-zero active_keys_delta path)
526        db.rewind_to_size(size_a).await.unwrap();
527        assert_eq!(db.root(), root_a);
528        assert_eq!(db.size().await, size_a);
529        assert_eq!(db.inactivity_floor_loc().await, floor_a);
530        assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_a.clone()));
531        assert_eq!(db.get(&key0).await.unwrap(), Some(value0_a));
532        assert_eq!(db.get(&key1).await.unwrap(), Some(value1_a));
533        assert_eq!(db.get(&key2).await.unwrap(), None);
534
535        db.commit().await.unwrap();
536        drop(db);
537        let mut db = reopen_db(context.with_label("reopen_after_rewind")).await;
538        assert_eq!(db.root(), root_a);
539        assert_eq!(db.size().await, size_a);
540        assert_eq!(db.inactivity_floor_loc().await, floor_a);
541        assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_a));
542        assert_eq!(db.get(&key0).await.unwrap(), Some(make_value(10)));
543        assert_eq!(db.get(&key1).await.unwrap(), Some(make_value(11)));
544        assert_eq!(db.get(&key2).await.unwrap(), None);
545
546        // Fresh writes from the rewound tip should produce a correct new chain and persist
547        // across reopen.
548        let value2_d = make_value(40);
549        let metadata_d = make_value(41);
550        let merkleized = db
551            .new_batch()
552            .write(key2, Some(value2_d.clone()))
553            .merkleize(&db, Some(metadata_d.clone()))
554            .await
555            .unwrap();
556        db.apply_batch(merkleized).await.unwrap();
557        db.commit().await.unwrap();
558        assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_d.clone()));
559        assert_eq!(db.get(&key0).await.unwrap(), Some(make_value(10)));
560        assert_eq!(db.get(&key1).await.unwrap(), Some(make_value(11)));
561        assert_eq!(db.get(&key2).await.unwrap(), Some(value2_d.clone()));
562
563        drop(db);
564        let mut db = reopen_db(context.with_label("reopen_after_rewind_new_writes")).await;
565        assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_d));
566        assert_eq!(db.get(&key0).await.unwrap(), Some(make_value(10)));
567        assert_eq!(db.get(&key1).await.unwrap(), Some(make_value(11)));
568        assert_eq!(db.get(&key2).await.unwrap(), Some(value2_d));
569
570        // Rewind all the way to the initial commit boundary (`first_commit_loc + 1`).
571        db.rewind_to_size(initial_size).await.unwrap();
572        assert_eq!(db.root(), initial_root);
573        assert_eq!(db.size().await, initial_size);
574        assert_eq!(db.inactivity_floor_loc().await, initial_floor);
575        assert_eq!(db.get_metadata().await.unwrap(), None);
576        assert_eq!(db.get(&key0).await.unwrap(), None);
577        assert_eq!(db.get(&key1).await.unwrap(), None);
578        assert_eq!(db.get(&key2).await.unwrap(), None);
579
580        db.commit().await.unwrap();
581        drop(db);
582        let db = reopen_db(context.with_label("reopen_initial_boundary")).await;
583        assert_eq!(db.root(), initial_root);
584        assert_eq!(db.size().await, initial_size);
585        assert_eq!(db.inactivity_floor_loc().await, initial_floor);
586        assert_eq!(db.get_metadata().await.unwrap(), None);
587        assert_eq!(db.get(&key0).await.unwrap(), None);
588        assert_eq!(db.get(&key1).await.unwrap(), None);
589        assert_eq!(db.get(&key2).await.unwrap(), None);
590
591        db.destroy().await.unwrap();
592    }
593
594    /// Test that a large mixed workload can be authenticated and replayed correctly.
595    pub(crate) async fn test_any_db_build_and_authenticate<D, V>(
596        context: Context,
597        mut db: D,
598        reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
599        make_value: impl Fn(u64) -> V,
600    ) where
601        D: DbAny<mmr::Family, Key = Digest, Value = V, Digest = Digest> + Provable<mmr::Family>,
602        V: CodecShared + Clone + Eq + std::hash::Hash + std::fmt::Debug,
603        <D as Provable<mmr::Family>>::Operation: Codec,
604    {
605        use crate::{mmr::StandardHasher, qmdb::verify_proof};
606
607        const ELEMENTS: u64 = 1000;
608
609        let mut map = HashMap::<Digest, V>::default();
610        {
611            let mut batch = db.new_batch();
612            for i in 0u64..ELEMENTS {
613                let k = Sha256::hash(&i.to_be_bytes());
614                let v = make_value(i * 1000);
615                batch = batch.write(k, Some(v.clone()));
616                map.insert(k, v);
617            }
618
619            // Update every 3rd key.
620            for i in 0u64..ELEMENTS {
621                if i % 3 != 0 {
622                    continue;
623                }
624                let k = Sha256::hash(&i.to_be_bytes());
625                let v = make_value((i + 1) * 10000);
626                batch = batch.write(k, Some(v.clone()));
627                map.insert(k, v);
628            }
629
630            // Delete every 7th key.
631            for i in 0u64..ELEMENTS {
632                if i % 7 != 1 {
633                    continue;
634                }
635                let k = Sha256::hash(&i.to_be_bytes());
636                batch = batch.write(k, None);
637                map.remove(&k);
638            }
639
640            let merkleized = batch.merkleize(&db, None).await.unwrap();
641            db.apply_batch(merkleized).await.unwrap();
642        }
643        // Commit + sync with pruning raises inactivity floor.
644        db.sync().await.unwrap();
645        db.prune(db.inactivity_floor_loc().await).await.unwrap();
646
647        // Drop & reopen and ensure state matches.
648        let root = db.root();
649        db.sync().await.unwrap();
650        drop(db);
651        let db = reopen_db(context.with_label("reopened")).await;
652        assert_eq!(root, db.root());
653
654        // State matches reference map.
655        for i in 0u64..ELEMENTS {
656            let k = Sha256::hash(&i.to_be_bytes());
657            if let Some(map_value) = map.get(&k) {
658                let Some(db_value) = db.get(&k).await.unwrap() else {
659                    panic!("key not found in db: {k}");
660                };
661                assert_eq!(*map_value, db_value);
662            } else {
663                assert!(db.get(&k).await.unwrap().is_none());
664            }
665        }
666
667        let hasher = StandardHasher::<Sha256>::new();
668        let bounds = db.bounds().await;
669        let inactivity_floor = db.inactivity_floor_loc().await;
670        for loc in *inactivity_floor..*bounds.end {
671            let loc = Location::new(loc);
672            let (proof, ops) = db.proof(loc, NZU64!(10)).await.unwrap();
673            assert!(verify_proof(&hasher, &proof, loc, &ops, &root));
674        }
675
676        db.destroy().await.unwrap();
677    }
678
679    /// Test that replaying multiple updates of the same key on startup preserves correct state.
680    pub(crate) async fn test_any_db_log_replay<
681        F: Family,
682        D,
683        V: Clone + CodecShared + PartialEq + std::fmt::Debug,
684    >(
685        context: Context,
686        mut db: D,
687        reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
688        make_value: impl Fn(u64) -> V,
689    ) where
690        D: DbAny<F, Key = Digest, Value = V, Digest = Digest>,
691    {
692        // Update the same key many times within a single batch.
693        const UPDATES: u64 = 100;
694        let k = Sha256::hash(&UPDATES.to_be_bytes());
695        let mut last_value = None;
696        {
697            let mut batch = db.new_batch();
698            for i in 0u64..UPDATES {
699                let v = make_value(i * 1000);
700                last_value = Some(v.clone());
701                batch = batch.write(k, Some(v));
702            }
703            let merkleized = batch.merkleize(&db, None).await.unwrap();
704            db.apply_batch(merkleized).await.unwrap();
705        }
706        db.commit().await.unwrap();
707        let root = db.root();
708
709        // Reopen and verify the state is preserved correctly.
710        drop(db);
711        let db = reopen_db(context.with_label("reopened")).await;
712        assert_eq!(db.root(), root);
713        assert_eq!(db.get(&k).await.unwrap(), last_value);
714
715        db.destroy().await.unwrap();
716    }
717
718    /// Test that historical_proof returns correct proofs for past database states.
719    pub(crate) async fn test_any_db_historical_proof_basic<D, V: Clone + CodecShared>(
720        _context: Context,
721        mut db: D,
722        make_value: impl Fn(u64) -> V,
723    ) where
724        D: DbAny<mmr::Family, Key = Digest, Value = V, Digest = Digest> + Provable<mmr::Family>,
725        <D as Provable<mmr::Family>>::Operation: Codec + PartialEq + std::fmt::Debug,
726    {
727        use crate::{mmr::StandardHasher, qmdb::verify_proof};
728        use commonware_utils::NZU64;
729
730        // Add some operations
731        const OPS: u64 = 20;
732        {
733            let mut batch = db.new_batch();
734            for i in 0u64..OPS {
735                let k = Sha256::hash(&i.to_be_bytes());
736                let v = make_value(i * 1000);
737                batch = batch.write(k, Some(v));
738            }
739            let merkleized = batch.merkleize(&db, None).await.unwrap();
740            db.apply_batch(merkleized).await.unwrap();
741        }
742        let root_hash = db.root();
743        let original_op_count = db.size().await;
744
745        // Historical proof should match "regular" proof when historical size == current database size
746        let max_ops = NZU64!(10);
747        let start_loc = Location::new(5);
748        let (historical_proof, historical_ops) = db
749            .historical_proof(original_op_count, start_loc, max_ops)
750            .await
751            .unwrap();
752        let (regular_proof, regular_ops) = db.proof(start_loc, max_ops).await.unwrap();
753
754        assert_eq!(historical_proof.leaves, regular_proof.leaves);
755        assert_eq!(historical_proof.digests, regular_proof.digests);
756        assert_eq!(historical_ops, regular_ops);
757        let hasher = StandardHasher::<Sha256>::new();
758        assert!(verify_proof(
759            &hasher,
760            &historical_proof,
761            start_loc,
762            &historical_ops,
763            &root_hash
764        ));
765
766        // Add more operations to the database
767        {
768            let mut batch = db.new_batch();
769            for i in OPS..(OPS + 5) {
770                let k = Sha256::hash(&(i + 1000).to_be_bytes()); // different keys
771                let v = make_value(i * 1000);
772                batch = batch.write(k, Some(v));
773            }
774            let merkleized = batch.merkleize(&db, None).await.unwrap();
775            db.apply_batch(merkleized).await.unwrap();
776        }
777
778        // Historical proof should remain the same even though database has grown
779        let (historical_proof2, historical_ops2) = db
780            .historical_proof(original_op_count, start_loc, max_ops)
781            .await
782            .unwrap();
783        assert_eq!(historical_proof2.leaves, original_op_count);
784        assert_eq!(historical_proof2.digests, regular_proof.digests);
785        assert_eq!(historical_ops2, regular_ops);
786        assert!(verify_proof(
787            &hasher,
788            &historical_proof2,
789            start_loc,
790            &historical_ops2,
791            &root_hash
792        ));
793
794        db.destroy().await.unwrap();
795    }
796
797    /// Test that tampering with historical proofs causes verification to fail.
798    pub(crate) async fn test_any_db_historical_proof_invalid<D, V: Clone + CodecShared>(
799        _context: Context,
800        mut db: D,
801        make_value: impl Fn(u64) -> V,
802    ) where
803        D: DbAny<mmr::Family, Key = Digest, Value = V, Digest = Digest> + Provable<mmr::Family>,
804        <D as Provable<mmr::Family>>::Operation: Codec + PartialEq + std::fmt::Debug + Clone,
805    {
806        use crate::{mmr::StandardHasher, qmdb::verify_proof};
807        use commonware_utils::NZU64;
808
809        // Add some operations
810        {
811            let mut batch = db.new_batch();
812            for i in 0u64..10 {
813                let k = Sha256::hash(&i.to_be_bytes());
814                let v = make_value(i * 1000);
815                batch = batch.write(k, Some(v));
816            }
817            let merkleized = batch.merkleize(&db, None).await.unwrap();
818            db.apply_batch(merkleized).await.unwrap();
819        }
820
821        let historical_op_count = Location::new(5);
822        let (proof, ops) = db
823            .historical_proof(historical_op_count, Location::new(1), NZU64!(10))
824            .await
825            .unwrap();
826        assert_eq!(proof.leaves, historical_op_count);
827        assert_eq!(ops.len(), 4);
828
829        let hasher = StandardHasher::<Sha256>::new();
830
831        // Changing the proof digests should cause verification to fail
832        {
833            let mut tampered_proof = proof.clone();
834            tampered_proof.digests[0] = Sha256::hash(b"invalid");
835            let root_hash = db.root();
836            assert!(!verify_proof(
837                &hasher,
838                &tampered_proof,
839                Location::new(1),
840                &ops,
841                &root_hash
842            ));
843        }
844
845        // Appending an extra digest should cause verification to fail
846        {
847            let mut tampered_proof = proof.clone();
848            tampered_proof.digests.push(Sha256::hash(b"invalid"));
849            let root_hash = db.root();
850            assert!(!verify_proof(
851                &hasher,
852                &tampered_proof,
853                Location::new(1),
854                &ops,
855                &root_hash
856            ));
857        }
858
859        // Changing the ops should cause verification to fail
860        {
861            let root_hash = db.root();
862            let mut tampered_ops = ops.clone();
863            // Swap first two ops if we have at least 2
864            if tampered_ops.len() >= 2 {
865                tampered_ops.swap(0, 1);
866                assert!(!verify_proof(
867                    &hasher,
868                    &proof,
869                    Location::new(1),
870                    &tampered_ops,
871                    &root_hash
872                ));
873            }
874        }
875
876        // Appending an extra (duplicate) op should cause verification to fail
877        {
878            let root_hash = db.root();
879            let mut tampered_ops = ops.clone();
880            tampered_ops.push(tampered_ops[0].clone());
881            assert!(!verify_proof(
882                &hasher,
883                &proof,
884                Location::new(1),
885                &tampered_ops,
886                &root_hash
887            ));
888        }
889
890        // Changing the start location should cause verification to fail
891        {
892            let root_hash = db.root();
893            assert!(!verify_proof(
894                &hasher,
895                &proof,
896                Location::new(2),
897                &ops,
898                &root_hash
899            ));
900        }
901
902        // Changing the root digest should cause verification to fail
903        {
904            let invalid_root = Sha256::hash(b"invalid");
905            assert!(!verify_proof(
906                &hasher,
907                &proof,
908                Location::new(1),
909                &ops,
910                &invalid_root
911            ));
912        }
913
914        // Changing the proof leaves count should cause verification to fail
915        {
916            let mut tampered_proof = proof.clone();
917            tampered_proof.leaves = Location::new(100);
918            let root_hash = db.root();
919            assert!(!verify_proof(
920                &hasher,
921                &tampered_proof,
922                Location::new(1),
923                &ops,
924                &root_hash
925            ));
926        }
927
928        db.destroy().await.unwrap();
929    }
930
931    /// Test historical_proof edge cases: singleton db, limited ops, min position.
932    pub(crate) async fn test_any_db_historical_proof_edge_cases<D, V: Clone + CodecShared>(
933        _context: Context,
934        mut db: D,
935        make_value: impl Fn(u64) -> V,
936    ) where
937        D: DbAny<mmr::Family, Key = Digest, Value = V, Digest = Digest> + Provable<mmr::Family>,
938        <D as Provable<mmr::Family>>::Operation: Codec + PartialEq + std::fmt::Debug,
939    {
940        use commonware_utils::NZU64;
941
942        // Add 50 operations
943        {
944            let mut batch = db.new_batch();
945            for i in 0u64..50 {
946                let k = Sha256::hash(&i.to_be_bytes());
947                let v = make_value(i * 1000);
948                batch = batch.write(k, Some(v));
949            }
950            let merkleized = batch.merkleize(&db, None).await.unwrap();
951            db.apply_batch(merkleized).await.unwrap();
952        }
953
954        // Test singleton database (historical size = 2 means 1 op after initial commit)
955        let (single_proof, single_ops) = db
956            .historical_proof(Location::new(2), Location::new(1), NZU64!(1))
957            .await
958            .unwrap();
959        assert_eq!(single_proof.leaves, Location::new(2));
960        assert_eq!(single_ops.len(), 1);
961
962        // Test requesting more operations than available in historical position
963        let (_limited_proof, limited_ops) = db
964            .historical_proof(Location::new(11), Location::new(6), NZU64!(20))
965            .await
966            .unwrap();
967        assert_eq!(limited_ops.len(), 5); // Should be limited by historical position
968
969        // Test proof at minimum historical position
970        let (min_proof, min_ops) = db
971            .historical_proof(Location::new(4), Location::new(1), NZU64!(3))
972            .await
973            .unwrap();
974        assert_eq!(min_proof.leaves, Location::new(4));
975        assert_eq!(min_ops.len(), 3);
976
977        db.destroy().await.unwrap();
978    }
979
980    /// Test making multiple commits, one of which deletes a key from a previous commit.
981    pub(crate) async fn test_any_db_multiple_commits_delete_replayed<F: Family, D, V>(
982        context: Context,
983        mut db: D,
984        reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
985        make_value: impl Fn(u64) -> V,
986    ) where
987        D: DbAny<F, Key = Digest, Value = V, Digest = Digest>,
988        V: Clone + CodecShared + Eq + std::fmt::Debug,
989    {
990        let mut map = HashMap::<Digest, V>::default();
991        const ELEMENTS: u64 = 10;
992        let metadata_value = make_value(42);
993        let key_at = |j: u64, i: u64| Sha256::hash(&(j * 1000 + i).to_be_bytes());
994        for j in 0u64..ELEMENTS {
995            let mut batch = db.new_batch();
996            for i in 0u64..ELEMENTS {
997                let k = key_at(j, i);
998                let v = make_value(i * 1000);
999                batch = batch.write(k, Some(v.clone()));
1000                map.insert(k, v);
1001            }
1002            let merkleized = batch
1003                .merkleize(&db, Some(metadata_value.clone()))
1004                .await
1005                .unwrap();
1006            db.apply_batch(merkleized).await.unwrap();
1007            db.commit().await.unwrap();
1008        }
1009        assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_value));
1010        let k = key_at(ELEMENTS - 1, ELEMENTS - 1);
1011
1012        let merkleized = db
1013            .new_batch()
1014            .write(k, None)
1015            .merkleize(&db, None)
1016            .await
1017            .unwrap();
1018        db.apply_batch(merkleized).await.unwrap();
1019        db.commit().await.unwrap();
1020        assert_eq!(db.get_metadata().await.unwrap(), None);
1021        assert!(db.get(&k).await.unwrap().is_none());
1022
1023        let root = db.root();
1024        drop(db);
1025        let db = reopen_db(context.with_label("reopened")).await;
1026        assert_eq!(root, db.root());
1027        assert_eq!(db.get_metadata().await.unwrap(), None);
1028        assert!(db.get(&k).await.unwrap().is_none());
1029
1030        db.destroy().await.unwrap();
1031    }
1032
1033    use crate::qmdb::any::{
1034        ordered::{fixed::Db as OrderedFixedDb, variable::Db as OrderedVariableDb},
1035        unordered::{fixed::Db as UnorderedFixedDb, variable::Db as UnorderedVariableDb},
1036    };
1037    use commonware_macros::{test_group, test_traced};
1038    use commonware_runtime::{deterministic, Runner as _};
1039
1040    // Type aliases for all 12 MMR variants (all use OneCap for collision coverage).
1041    type UnorderedFixed = UnorderedFixedDb<mmr::Family, Context, Digest, Digest, Sha256, OneCap>;
1042    type UnorderedVariable =
1043        UnorderedVariableDb<mmr::Family, Context, Digest, Digest, Sha256, OneCap>;
1044    type OrderedFixed = OrderedFixedDb<mmr::Family, Context, Digest, Digest, Sha256, OneCap>;
1045    type OrderedVariable = OrderedVariableDb<mmr::Family, Context, Digest, Digest, Sha256, OneCap>;
1046    type UnorderedFixedP1 =
1047        unordered::fixed::partitioned::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 1>;
1048    type UnorderedVariableP1 = unordered::variable::partitioned::Db<
1049        mmr::Family,
1050        Context,
1051        Digest,
1052        Digest,
1053        Sha256,
1054        OneCap,
1055        1,
1056    >;
1057    type OrderedFixedP1 =
1058        ordered::fixed::partitioned::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 1>;
1059    type OrderedVariableP1 =
1060        ordered::variable::partitioned::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 1>;
1061    type UnorderedFixedP2 =
1062        unordered::fixed::partitioned::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 2>;
1063    type UnorderedVariableP2 = unordered::variable::partitioned::Db<
1064        mmr::Family,
1065        Context,
1066        Digest,
1067        Digest,
1068        Sha256,
1069        OneCap,
1070        2,
1071    >;
1072    type OrderedFixedP2 =
1073        ordered::fixed::partitioned::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 2>;
1074    type OrderedVariableP2 =
1075        ordered::variable::partitioned::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 2>;
1076
1077    // MMB type aliases for with_all_variants.
1078    mod mmb_types {
1079        use super::*;
1080        use crate::{
1081            index::{ordered::Index as OrderedIndex, unordered::Index as UnorderedIndex},
1082            journal::contiguous::{fixed::Journal as FJournal, variable::Journal as VJournal},
1083            merkle::{mmb, Location},
1084            qmdb::any::{
1085                operation::{update, Operation},
1086                value::{FixedEncoding, VariableEncoding},
1087            },
1088        };
1089
1090        type MmbLocation = Location<mmb::Family>;
1091
1092        pub type MmbUnorderedFixed = super::super::db::Db<
1093            mmb::Family,
1094            Context,
1095            FJournal<
1096                Context,
1097                Operation<mmb::Family, update::Unordered<Digest, FixedEncoding<Digest>>>,
1098            >,
1099            UnorderedIndex<OneCap, MmbLocation>,
1100            Sha256,
1101            update::Unordered<Digest, FixedEncoding<Digest>>,
1102        >;
1103
1104        pub type MmbUnorderedVariable = super::super::db::Db<
1105            mmb::Family,
1106            Context,
1107            VJournal<
1108                Context,
1109                Operation<mmb::Family, update::Unordered<Digest, VariableEncoding<Digest>>>,
1110            >,
1111            UnorderedIndex<OneCap, MmbLocation>,
1112            Sha256,
1113            update::Unordered<Digest, VariableEncoding<Digest>>,
1114        >;
1115
1116        pub type MmbOrderedFixed = super::super::db::Db<
1117            mmb::Family,
1118            Context,
1119            FJournal<
1120                Context,
1121                Operation<mmb::Family, update::Ordered<Digest, FixedEncoding<Digest>>>,
1122            >,
1123            OrderedIndex<OneCap, MmbLocation>,
1124            Sha256,
1125            update::Ordered<Digest, FixedEncoding<Digest>>,
1126        >;
1127
1128        pub type MmbOrderedVariable = super::super::db::Db<
1129            mmb::Family,
1130            Context,
1131            VJournal<
1132                Context,
1133                Operation<mmb::Family, update::Ordered<Digest, VariableEncoding<Digest>>>,
1134            >,
1135            OrderedIndex<OneCap, MmbLocation>,
1136            Sha256,
1137            update::Ordered<Digest, VariableEncoding<Digest>>,
1138        >;
1139    }
1140    use mmb_types::*;
1141
1142    #[inline]
1143    fn to_digest(i: u64) -> Digest {
1144        Sha256::hash(&i.to_be_bytes())
1145    }
1146
1147    // Defines MMR-only variants (for tests that require mmr::Family, e.g. proof verification).
1148    macro_rules! with_mmr_variants {
1149        ($cb:ident!($($args:tt)*)) => {
1150            $cb!($($args)*, "uf", UnorderedFixed, fixed_db_config);
1151            $cb!($($args)*, "uv", UnorderedVariable, variable_db_config);
1152            $cb!($($args)*, "of", OrderedFixed, fixed_db_config);
1153            $cb!($($args)*, "ov", OrderedVariable, variable_db_config);
1154            $cb!($($args)*, "ufp1", UnorderedFixedP1, fixed_db_config);
1155            $cb!($($args)*, "uvp1", UnorderedVariableP1, variable_db_config);
1156            $cb!($($args)*, "ofp1", OrderedFixedP1, fixed_db_config);
1157            $cb!($($args)*, "ovp1", OrderedVariableP1, variable_db_config);
1158            $cb!($($args)*, "ufp2", UnorderedFixedP2, fixed_db_config);
1159            $cb!($($args)*, "uvp2", UnorderedVariableP2, variable_db_config);
1160            $cb!($($args)*, "ofp2", OrderedFixedP2, fixed_db_config);
1161            $cb!($($args)*, "ovp2", OrderedVariableP2, variable_db_config);
1162        };
1163    }
1164
1165    // Defines all variants (MMR + MMB). Calls $cb!($($args)*, $label, $type, $config) for each.
1166    macro_rules! with_all_variants {
1167        ($cb:ident!($($args:tt)*)) => {
1168            $cb!($($args)*, "uf", UnorderedFixed, fixed_db_config);
1169            $cb!($($args)*, "uv", UnorderedVariable, variable_db_config);
1170            $cb!($($args)*, "of", OrderedFixed, fixed_db_config);
1171            $cb!($($args)*, "ov", OrderedVariable, variable_db_config);
1172            $cb!($($args)*, "ufp1", UnorderedFixedP1, fixed_db_config);
1173            $cb!($($args)*, "uvp1", UnorderedVariableP1, variable_db_config);
1174            $cb!($($args)*, "ofp1", OrderedFixedP1, fixed_db_config);
1175            $cb!($($args)*, "ovp1", OrderedVariableP1, variable_db_config);
1176            $cb!($($args)*, "ufp2", UnorderedFixedP2, fixed_db_config);
1177            $cb!($($args)*, "uvp2", UnorderedVariableP2, variable_db_config);
1178            $cb!($($args)*, "ofp2", OrderedFixedP2, fixed_db_config);
1179            $cb!($($args)*, "ovp2", OrderedVariableP2, variable_db_config);
1180            $cb!($($args)*, "uf_mmb", MmbUnorderedFixed, fixed_db_config);
1181            $cb!($($args)*, "uv_mmb", MmbUnorderedVariable, variable_db_config);
1182            $cb!($($args)*, "of_mmb", MmbOrderedFixed, fixed_db_config);
1183            $cb!($($args)*, "ov_mmb", MmbOrderedVariable, variable_db_config);
1184        };
1185    }
1186
1187    // Runner macros - each receives common args followed by (label, type, config) from with_all_variants.
1188    // Uses Box::pin to move futures to the heap and avoid stack overflow.
1189    macro_rules! test_with_reopen {
1190        ($ctx:expr, $sfx:expr, $f:expr, $l:literal, $db:ty, $cfg:ident) => {{
1191            let p = concat!($l, "_", $sfx);
1192            Box::pin(async {
1193                let ctx = $ctx.with_label($l);
1194                let db = <$db>::init(ctx.clone(), $cfg::<OneCap>(p, &ctx))
1195                    .await
1196                    .unwrap();
1197                $f(
1198                    ctx,
1199                    db,
1200                    |ctx| {
1201                        Box::pin(async move {
1202                            <$db>::init(ctx.clone(), $cfg::<OneCap>(p, &ctx))
1203                                .await
1204                                .unwrap()
1205                        })
1206                    },
1207                    to_digest,
1208                )
1209                .await;
1210            })
1211            .await
1212        }};
1213    }
1214
1215    macro_rules! test_with_make_value {
1216        ($ctx:expr, $sfx:expr, $f:expr, $l:literal, $db:ty, $cfg:ident) => {{
1217            let p = concat!($l, "_", $sfx);
1218            Box::pin(async {
1219                let ctx = $ctx.with_label($l);
1220                let db = <$db>::init(ctx.clone(), $cfg::<OneCap>(p, &ctx))
1221                    .await
1222                    .unwrap();
1223                $f(ctx, db, to_digest).await;
1224            })
1225            .await
1226        }};
1227    }
1228
1229    // Macro to run a test on all DB variants (MMR + MMB).
1230    macro_rules! for_all_variants {
1231        ($ctx:expr, $sfx:expr, with_reopen: $f:expr) => {{
1232            with_all_variants!(test_with_reopen!($ctx, $sfx, $f));
1233        }};
1234        ($ctx:expr, $sfx:expr, with_make_value: $f:expr) => {{
1235            with_all_variants!(test_with_make_value!($ctx, $sfx, $f));
1236        }};
1237    }
1238
1239    // Macro to run a test on MMR-only DB variants (for tests that use mmr::Family-specific
1240    // features like Location::new or verify_proof).
1241    macro_rules! for_mmr_variants {
1242        ($ctx:expr, $sfx:expr, with_reopen: $f:expr) => {{
1243            with_mmr_variants!(test_with_reopen!($ctx, $sfx, $f));
1244        }};
1245        ($ctx:expr, $sfx:expr, with_make_value: $f:expr) => {{
1246            with_mmr_variants!(test_with_make_value!($ctx, $sfx, $f));
1247        }};
1248    }
1249
1250    #[test_group("slow")]
1251    #[test_traced("WARN")]
1252    fn test_all_variants_log_replay() {
1253        let executor = deterministic::Runner::default();
1254        executor.start(|context| async move {
1255            for_all_variants!(context, "lr", with_reopen: test_any_db_log_replay);
1256        });
1257    }
1258
1259    #[test_group("slow")]
1260    #[test_traced("WARN")]
1261    fn test_all_variants_build_and_authenticate() {
1262        let executor = deterministic::Runner::default();
1263        executor.start(|context| async move {
1264            for_mmr_variants!(context, "baa", with_reopen: test_any_db_build_and_authenticate);
1265        });
1266    }
1267
1268    #[test_group("slow")]
1269    #[test_traced("WARN")]
1270    fn test_all_variants_historical_proof_basic() {
1271        let executor = deterministic::Runner::default();
1272        executor.start(|context| async move {
1273            for_mmr_variants!(context, "hpb", with_make_value: test_any_db_historical_proof_basic);
1274        });
1275    }
1276
1277    #[test_group("slow")]
1278    #[test_traced("WARN")]
1279    fn test_all_variants_historical_proof_invalid() {
1280        let executor = deterministic::Runner::default();
1281        executor.start(|context| async move {
1282            for_mmr_variants!(context, "hpi", with_make_value: test_any_db_historical_proof_invalid);
1283        });
1284    }
1285
1286    #[test_group("slow")]
1287    #[test_traced("WARN")]
1288    fn test_all_variants_historical_proof_edge_cases() {
1289        let executor = deterministic::Runner::default();
1290        executor.start(|context| async move {
1291            for_mmr_variants!(context, "hpec", with_make_value: test_any_db_historical_proof_edge_cases);
1292        });
1293    }
1294
1295    #[test_group("slow")]
1296    #[test_traced("WARN")]
1297    fn test_all_variants_multiple_commits_delete_replayed() {
1298        let executor = deterministic::Runner::default();
1299        executor.start(|context| async move {
1300            for_all_variants!(context, "mcdr", with_reopen: test_any_db_multiple_commits_delete_replayed);
1301        });
1302    }
1303
1304    #[test_group("slow")]
1305    #[test_traced("WARN")]
1306    fn test_all_variants_non_empty_recovery() {
1307        let executor = deterministic::Runner::default();
1308        executor.start(|context| async move {
1309            for_all_variants!(context, "ner", with_reopen: test_any_db_non_empty_recovery);
1310        });
1311    }
1312
1313    #[test_group("slow")]
1314    #[test_traced("WARN")]
1315    fn test_all_variants_empty_recovery() {
1316        let executor = deterministic::Runner::default();
1317        executor.start(|context| async move {
1318            for_all_variants!(context, "er", with_reopen: test_any_db_empty_recovery);
1319        });
1320    }
1321
1322    #[test_group("slow")]
1323    #[test_traced("WARN")]
1324    fn test_all_variants_rewind_recovery() {
1325        let executor = deterministic::Runner::default();
1326        executor.start(|context| async move {
1327            for_mmr_variants!(context, "rr", with_reopen: test_any_db_rewind_recovery);
1328        });
1329    }
1330
1331    fn key(i: u64) -> Digest {
1332        Sha256::hash(&i.to_be_bytes())
1333    }
1334
1335    fn val(i: u64) -> Digest {
1336        Sha256::hash(&(i + 10000).to_be_bytes())
1337    }
1338
1339    /// Helper: commit a batch of key-value writes and return the applied range.
1340    async fn commit_writes(
1341        db: &mut UnorderedVariable,
1342        writes: impl IntoIterator<Item = (Digest, Option<Digest>)>,
1343        metadata: Option<Digest>,
1344    ) -> std::ops::Range<crate::mmr::Location> {
1345        let mut batch = db.new_batch();
1346        for (k, v) in writes {
1347            batch = batch.write(k, v);
1348        }
1349        let merkleized = batch.merkleize(&*db, metadata).await.unwrap();
1350        let range = db.apply_batch(merkleized).await.unwrap();
1351        db.commit().await.unwrap();
1352        range
1353    }
1354
1355    /// An empty batch (no mutations) still produces a valid commit.
1356    #[test_traced("INFO")]
1357    fn test_any_batch_empty() {
1358        let executor = deterministic::Runner::default();
1359        executor.start(|context| async move {
1360            let ctx = context.with_label("db");
1361            let mut db: UnorderedVariable =
1362                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("e", &ctx))
1363                    .await
1364                    .unwrap();
1365
1366            let root_before = db.root();
1367            let batch = db.new_batch();
1368            let merkleized = batch.merkleize(&db, None).await.unwrap();
1369            db.apply_batch(merkleized).await.unwrap();
1370
1371            // A CommitFloor op was appended, so root must change.
1372            assert_ne!(db.root(), root_before);
1373
1374            // DB should still be functional.
1375            commit_writes(&mut db, [(key(0), Some(val(0)))], None).await;
1376            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
1377
1378            db.destroy().await.unwrap();
1379        });
1380    }
1381
1382    /// Metadata propagates through merkleize and clears with None.
1383    #[test_traced("INFO")]
1384    fn test_any_batch_metadata() {
1385        let executor = deterministic::Runner::default();
1386        executor.start(|context| async move {
1387            let ctx = context.with_label("db");
1388            let mut db: UnorderedVariable =
1389                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("m", &ctx))
1390                    .await
1391                    .unwrap();
1392
1393            let metadata = val(42);
1394
1395            // Batch with metadata.
1396            commit_writes(&mut db, [(key(0), Some(val(0)))], Some(metadata)).await;
1397            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
1398
1399            // Batch without metadata clears it.
1400            let batch = db.new_batch();
1401            let merkleized = batch.merkleize(&db, None).await.unwrap();
1402            db.apply_batch(merkleized).await.unwrap();
1403            assert_eq!(db.get_metadata().await.unwrap(), None);
1404
1405            db.destroy().await.unwrap();
1406        });
1407    }
1408
1409    /// batch.get() reads through: pending mutations -> base DB.
1410    /// Updates shadow the base value; deletes hide the key.
1411    #[test_traced("INFO")]
1412    fn test_any_batch_get_read_through() {
1413        let executor = deterministic::Runner::default();
1414        executor.start(|context| async move {
1415            let ctx = context.with_label("db");
1416            let mut db: UnorderedVariable =
1417                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("g", &ctx))
1418                    .await
1419                    .unwrap();
1420
1421            // Pre-populate with key A.
1422            let ka = key(0);
1423            let va = val(0);
1424            commit_writes(&mut db, [(ka, Some(va))], None).await;
1425
1426            let kb = key(1);
1427            let vb = val(1);
1428            let kc = key(2);
1429
1430            let mut batch = db.new_batch();
1431
1432            // Read-through to base DB.
1433            assert_eq!(batch.get(&ka, &db).await.unwrap(), Some(va));
1434
1435            // Pending mutation visible.
1436            batch = batch.write(kb, Some(vb));
1437            assert_eq!(batch.get(&kb, &db).await.unwrap(), Some(vb));
1438
1439            // Nonexistent key.
1440            assert_eq!(batch.get(&kc, &db).await.unwrap(), None);
1441
1442            // Update shadows base DB value.
1443            let va2 = val(100);
1444            batch = batch.write(ka, Some(va2));
1445            assert_eq!(batch.get(&ka, &db).await.unwrap(), Some(va2));
1446
1447            // Delete hides the key.
1448            batch = batch.write(ka, None);
1449            assert_eq!(batch.get(&ka, &db).await.unwrap(), None);
1450
1451            db.destroy().await.unwrap();
1452        });
1453    }
1454
1455    /// merkleized.get() reflects the resolved diff after merkleize.
1456    #[test_traced("INFO")]
1457    fn test_any_batch_get_on_merkleized() {
1458        let executor = deterministic::Runner::default();
1459        executor.start(|context| async move {
1460            let ctx = context.with_label("db");
1461            let mut db: UnorderedVariable =
1462                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("mg", &ctx))
1463                    .await
1464                    .unwrap();
1465
1466            let ka = key(0);
1467            let kb = key(1);
1468            let kc = key(2);
1469            let kd = key(3);
1470
1471            // Pre-populate A and B.
1472            commit_writes(&mut db, [(ka, Some(val(0))), (kb, Some(val(1)))], None).await;
1473
1474            // Batch: update A, delete B, create C.
1475            let va2 = val(100);
1476            let vc = val(2);
1477            let mut batch = db.new_batch();
1478            batch = batch.write(ka, Some(va2));
1479            batch = batch.write(kb, None);
1480            batch = batch.write(kc, Some(vc));
1481            let merkleized = batch.merkleize(&db, None).await.unwrap();
1482
1483            assert_eq!(merkleized.get(&ka, &db).await.unwrap(), Some(va2));
1484            assert_eq!(merkleized.get(&kb, &db).await.unwrap(), None);
1485            assert_eq!(merkleized.get(&kc, &db).await.unwrap(), Some(vc));
1486            assert_eq!(merkleized.get(&kd, &db).await.unwrap(), None);
1487
1488            db.destroy().await.unwrap();
1489        });
1490    }
1491
1492    /// Child batch reads through: child mutations -> parent diff -> base DB.
1493    #[test_traced("INFO")]
1494    fn test_any_batch_stacked_get() {
1495        let executor = deterministic::Runner::default();
1496        executor.start(|context| async move {
1497            let ctx = context.with_label("db");
1498            let db: UnorderedVariable =
1499                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("sg", &ctx))
1500                    .await
1501                    .unwrap();
1502
1503            let ka = key(0);
1504            let kb = key(1);
1505
1506            // Parent batch writes A.
1507            let mut batch = db.new_batch();
1508            batch = batch.write(ka, Some(val(0)));
1509            let merkleized = batch.merkleize(&db, None).await.unwrap();
1510
1511            // Child reads parent's A.
1512            let mut child = merkleized.new_batch::<Sha256>();
1513            assert_eq!(child.get(&ka, &db).await.unwrap(), Some(val(0)));
1514
1515            // Child overwrites A.
1516            child = child.write(ka, Some(val(100)));
1517            assert_eq!(child.get(&ka, &db).await.unwrap(), Some(val(100)));
1518
1519            // Child writes new key B.
1520            child = child.write(kb, Some(val(1)));
1521            assert_eq!(child.get(&kb, &db).await.unwrap(), Some(val(1)));
1522
1523            // Child deletes A.
1524            child = child.write(ka, None);
1525            assert_eq!(child.get(&ka, &db).await.unwrap(), None);
1526
1527            db.destroy().await.unwrap();
1528        });
1529    }
1530
1531    /// Parent deletes a base-DB key, child re-creates it.
1532    #[test_traced("INFO")]
1533    fn test_any_batch_stacked_delete_recreate() {
1534        let executor = deterministic::Runner::default();
1535        executor.start(|context| async move {
1536            let ctx = context.with_label("db");
1537            let mut db: UnorderedVariable =
1538                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("dr", &ctx))
1539                    .await
1540                    .unwrap();
1541
1542            let ka = key(0);
1543
1544            // Pre-populate with key A.
1545            commit_writes(&mut db, [(ka, Some(val(0)))], None).await;
1546
1547            // Parent batch deletes A.
1548            let mut parent = db.new_batch();
1549            parent = parent.write(ka, None);
1550            let parent_m = parent.merkleize(&db, None).await.unwrap();
1551            assert_eq!(parent_m.get(&ka, &db).await.unwrap(), None);
1552
1553            // Child re-creates A with a new value.
1554            let mut child = parent_m.new_batch::<Sha256>();
1555            child = child.write(ka, Some(val(200)));
1556            let child_m = child.merkleize(&db, None).await.unwrap();
1557            assert_eq!(child_m.get(&ka, &db).await.unwrap(), Some(val(200)));
1558
1559            // Apply and verify DB state.
1560            db.apply_batch(child_m).await.unwrap();
1561            assert_eq!(db.get(&ka).await.unwrap(), Some(val(200)));
1562
1563            db.destroy().await.unwrap();
1564        });
1565    }
1566
1567    /// Floor raise during merkleize moves active operations to the tip.
1568    /// All keys remain accessible with correct values.
1569    #[test_traced("INFO")]
1570    fn test_any_batch_floor_raise() {
1571        let executor = deterministic::Runner::default();
1572        executor.start(|context| async move {
1573            let ctx = context.with_label("db");
1574            let mut db: UnorderedVariable =
1575                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("fr", &ctx))
1576                    .await
1577                    .unwrap();
1578
1579            // Pre-populate with 100 keys.
1580            let init: Vec<_> = (0..100).map(|i| (key(i), Some(val(i)))).collect();
1581            commit_writes(&mut db, init, None).await;
1582
1583            let floor_before = db.inactivity_floor_loc();
1584
1585            // Update 30 keys.
1586            let updates: Vec<_> = (0..30).map(|i| (key(i), Some(val(i + 500)))).collect();
1587            commit_writes(&mut db, updates, None).await;
1588
1589            // Floor should have advanced.
1590            assert!(db.inactivity_floor_loc() > floor_before);
1591
1592            // All keys should still be accessible with correct values.
1593            for i in 0..30 {
1594                assert_eq!(
1595                    db.get(&key(i)).await.unwrap(),
1596                    Some(val(i + 500)),
1597                    "updated key {i} mismatch"
1598                );
1599            }
1600            for i in 30..100 {
1601                assert_eq!(
1602                    db.get(&key(i)).await.unwrap(),
1603                    Some(val(i)),
1604                    "untouched key {i} mismatch"
1605                );
1606            }
1607
1608            db.destroy().await.unwrap();
1609        });
1610    }
1611
1612    /// apply_batch() returns the correct range of committed locations.
1613    #[test_traced("INFO")]
1614    fn test_any_batch_apply_returns_range() {
1615        let executor = deterministic::Runner::default();
1616        executor.start(|context| async move {
1617            let ctx = context.with_label("db");
1618            let mut db: UnorderedVariable =
1619                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("ar", &ctx))
1620                    .await
1621                    .unwrap();
1622
1623            // First batch: 5 keys.
1624            let writes: Vec<_> = (0..5).map(|i| (key(i), Some(val(i)))).collect();
1625            let range1 = commit_writes(&mut db, writes, None).await;
1626
1627            // Range should start after the initial CommitFloor (location 0).
1628            assert_eq!(range1.start, crate::mmr::Location::new(1));
1629            // Range length >= 6 (5 writes + 1 CommitFloor + possible floor raise ops).
1630            assert!(range1.end.saturating_sub(*range1.start) >= 6);
1631
1632            // Second batch: ranges must be contiguous.
1633            let writes: Vec<_> = (5..10).map(|i| (key(i), Some(val(i)))).collect();
1634            let range2 = commit_writes(&mut db, writes, None).await;
1635            assert_eq!(range2.start, range1.end);
1636
1637            db.destroy().await.unwrap();
1638        });
1639    }
1640
1641    /// 3-level chain: parent -> child -> grandchild, merkleize grandchild and apply.
1642    #[test_traced("INFO")]
1643    fn test_any_batch_deep_chain() {
1644        let executor = deterministic::Runner::default();
1645        executor.start(|context| async move {
1646            let ctx = context.with_label("db");
1647            let mut db: UnorderedVariable =
1648                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("dc", &ctx))
1649                    .await
1650                    .unwrap();
1651
1652            // Pre-populate with keys 0..5.
1653            let init: Vec<_> = (0..5).map(|i| (key(i), Some(val(i)))).collect();
1654            commit_writes(&mut db, init, None).await;
1655
1656            // Parent: overwrite key 0, add key 5.
1657            let mut parent = db.new_batch();
1658            parent = parent.write(key(0), Some(val(100)));
1659            parent = parent.write(key(5), Some(val(5)));
1660            let parent_m = parent.merkleize(&db, None).await.unwrap();
1661
1662            // Child: overwrite key 1, add key 6.
1663            let mut child = parent_m.new_batch::<Sha256>();
1664            child = child.write(key(1), Some(val(101)));
1665            child = child.write(key(6), Some(val(6)));
1666            let child_m = child.merkleize(&db, None).await.unwrap();
1667
1668            // Grandchild: delete key 2, add key 7.
1669            let mut grandchild = child_m.new_batch::<Sha256>();
1670            grandchild = grandchild.write(key(2), None);
1671            grandchild = grandchild.write(key(7), Some(val(7)));
1672            let grandchild_m = grandchild.merkleize(&db, None).await.unwrap();
1673
1674            // Verify reads through the chain.
1675            assert_eq!(
1676                grandchild_m.get(&key(0), &db).await.unwrap(),
1677                Some(val(100))
1678            );
1679            assert_eq!(
1680                grandchild_m.get(&key(1), &db).await.unwrap(),
1681                Some(val(101))
1682            );
1683            assert_eq!(grandchild_m.get(&key(2), &db).await.unwrap(), None);
1684            assert_eq!(grandchild_m.get(&key(7), &db).await.unwrap(), Some(val(7)));
1685
1686            // Apply.
1687            db.apply_batch(grandchild_m).await.unwrap();
1688
1689            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(100)));
1690            assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(101)));
1691            assert_eq!(db.get(&key(2)).await.unwrap(), None);
1692            assert_eq!(db.get(&key(3)).await.unwrap(), Some(val(3)));
1693            assert_eq!(db.get(&key(4)).await.unwrap(), Some(val(4)));
1694            assert_eq!(db.get(&key(5)).await.unwrap(), Some(val(5)));
1695            assert_eq!(db.get(&key(6)).await.unwrap(), Some(val(6)));
1696            assert_eq!(db.get(&key(7)).await.unwrap(), Some(val(7)));
1697
1698            db.destroy().await.unwrap();
1699        });
1700    }
1701
1702    /// Chained batch produces the same DB state as sequential apply_batch calls.
1703    #[test_traced("INFO")]
1704    fn test_any_batch_chain_matches_sequential() {
1705        let executor = deterministic::Runner::default();
1706        executor.start(|context| async move {
1707            let ctx = context.with_label("db");
1708
1709            // DB A: sequential apply.
1710            let ctx_a = ctx.with_label("a");
1711            let mut db_a: UnorderedVariable = UnorderedVariableDb::init(
1712                ctx_a.clone(),
1713                variable_db_config::<OneCap>("cms-a", &ctx_a),
1714            )
1715            .await
1716            .unwrap();
1717
1718            // DB B: chained batch.
1719            let ctx_b = ctx.with_label("b");
1720            let mut db_b: UnorderedVariable = UnorderedVariableDb::init(
1721                ctx_b.clone(),
1722                variable_db_config::<OneCap>("cms-b", &ctx_b),
1723            )
1724            .await
1725            .unwrap();
1726
1727            // Batch 1 operations: create keys 0..5.
1728            let writes1: Vec<_> = (0..5).map(|i| (key(i), Some(val(i)))).collect();
1729
1730            // Batch 2 operations: update key 0, delete key 1, create key 5.
1731            let writes2 = vec![
1732                (key(0), Some(val(100))),
1733                (key(1), None),
1734                (key(5), Some(val(5))),
1735            ];
1736
1737            // DB A: apply sequentially.
1738            commit_writes(&mut db_a, writes1.clone(), None).await;
1739            commit_writes(&mut db_a, writes2.clone(), None).await;
1740
1741            // DB B: apply as chain.
1742            let mut parent = db_b.new_batch();
1743            for (k, v) in &writes1 {
1744                parent = parent.write(*k, *v);
1745            }
1746            let parent_m = parent.merkleize(&db_b, None).await.unwrap();
1747
1748            let mut child = parent_m.new_batch::<Sha256>();
1749            for (k, v) in &writes2 {
1750                child = child.write(*k, *v);
1751            }
1752            let child_m = child.merkleize(&db_b, None).await.unwrap();
1753            db_b.apply_batch(child_m).await.unwrap();
1754
1755            // Both DBs must have the same state.
1756            assert_eq!(db_a.root(), db_b.root());
1757            for i in 0..6 {
1758                assert_eq!(
1759                    db_a.get(&key(i)).await.unwrap(),
1760                    db_b.get(&key(i)).await.unwrap(),
1761                    "key {i} mismatch"
1762                );
1763            }
1764
1765            db_a.destroy().await.unwrap();
1766            db_b.destroy().await.unwrap();
1767        });
1768    }
1769
1770    /// Create and delete the same key in a single batch produces no net change for that key.
1771    #[test_traced("INFO")]
1772    fn test_any_batch_create_then_delete_same_batch() {
1773        let executor = deterministic::Runner::default();
1774        executor.start(|context| async move {
1775            let ctx = context.with_label("db");
1776            let mut db: UnorderedVariable =
1777                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("cd", &ctx))
1778                    .await
1779                    .unwrap();
1780
1781            // Pre-populate key A.
1782            commit_writes(&mut db, [(key(0), Some(val(0)))], None).await;
1783
1784            // In one batch: create B then delete B, also create C and delete A.
1785            let mut batch = db.new_batch();
1786            batch = batch.write(key(1), Some(val(1))); // create B
1787            batch = batch.write(key(1), None); // delete B (net: no B)
1788            batch = batch.write(key(2), Some(val(2))); // create C
1789            batch = batch.write(key(0), None); // delete A
1790            let merkleized = batch.merkleize(&db, None).await.unwrap();
1791            db.apply_batch(merkleized).await.unwrap();
1792
1793            assert_eq!(db.get(&key(0)).await.unwrap(), None);
1794            assert_eq!(db.get(&key(1)).await.unwrap(), None);
1795            assert_eq!(db.get(&key(2)).await.unwrap(), Some(val(2)));
1796
1797            db.destroy().await.unwrap();
1798        });
1799    }
1800
1801    /// Deleting all keys exercises the total_active_keys == 0 floor-raise fast path.
1802    #[test_traced("INFO")]
1803    fn test_any_batch_delete_all_keys() {
1804        let executor = deterministic::Runner::default();
1805        executor.start(|context| async move {
1806            let ctx = context.with_label("db");
1807            let mut db: UnorderedVariable =
1808                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("da", &ctx))
1809                    .await
1810                    .unwrap();
1811
1812            // Pre-populate 5 keys.
1813            let init: Vec<_> = (0..5).map(|i| (key(i), Some(val(i)))).collect();
1814            commit_writes(&mut db, init, None).await;
1815
1816            // Delete all 5.
1817            let deletes: Vec<_> = (0..5).map(|i| (key(i), None)).collect();
1818            commit_writes(&mut db, deletes, None).await;
1819
1820            for i in 0..5 {
1821                assert_eq!(db.get(&key(i)).await.unwrap(), None, "key {i} not deleted");
1822            }
1823
1824            // DB should still be functional after deleting everything.
1825            commit_writes(&mut db, [(key(10), Some(val(10)))], None).await;
1826            assert_eq!(db.get(&key(10)).await.unwrap(), Some(val(10)));
1827
1828            db.destroy().await.unwrap();
1829        });
1830    }
1831
1832    /// Two independent batches from the same DB do not interfere with each other.
1833    #[test_traced("INFO")]
1834    fn test_any_batch_parallel_forks() {
1835        let executor = deterministic::Runner::default();
1836        executor.start(|context| async move {
1837            let ctx = context.with_label("db");
1838            let mut db: UnorderedVariable =
1839                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("pf", &ctx))
1840                    .await
1841                    .unwrap();
1842
1843            // Pre-populate.
1844            commit_writes(&mut db, [(key(0), Some(val(0)))], None).await;
1845            let root_before = db.root();
1846
1847            // Fork A: update key 0 and create key 1.
1848            let fork_a_m = db
1849                .new_batch()
1850                .write(key(0), Some(val(100)))
1851                .write(key(1), Some(val(1)))
1852                .merkleize(&db, None)
1853                .await
1854                .unwrap();
1855
1856            // Fork B: delete key 0 and create key 2.
1857            let fork_b_m = db
1858                .new_batch()
1859                .write(key(0), None)
1860                .write(key(2), Some(val(2)))
1861                .merkleize(&db, None)
1862                .await
1863                .unwrap();
1864
1865            // Different mutations must produce different roots.
1866            assert_ne!(fork_a_m.root(), fork_b_m.root());
1867
1868            // DB is unchanged (neither batch applied).
1869            assert_eq!(db.root(), root_before);
1870            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
1871            assert_eq!(db.get(&key(1)).await.unwrap(), None);
1872
1873            // Apply fork A only.
1874            db.apply_batch(fork_a_m).await.unwrap();
1875            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(100)));
1876            assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
1877            assert_eq!(db.get(&key(2)).await.unwrap(), None);
1878
1879            db.destroy().await.unwrap();
1880        });
1881    }
1882
1883    /// Floor raise advances correctly across a chained batch.
1884    #[test_traced("INFO")]
1885    fn test_any_batch_floor_raise_chained() {
1886        let executor = deterministic::Runner::default();
1887        executor.start(|context| async move {
1888            let ctx = context.with_label("db");
1889            let mut db: UnorderedVariable =
1890                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("frc", &ctx))
1891                    .await
1892                    .unwrap();
1893
1894            // Pre-populate with 50 keys.
1895            let init: Vec<_> = (0..50).map(|i| (key(i), Some(val(i)))).collect();
1896            commit_writes(&mut db, init, None).await;
1897            let floor_before = db.inactivity_floor_loc();
1898
1899            // Parent: update keys 0..20.
1900            let mut parent = db.new_batch();
1901            for i in 0..20 {
1902                parent = parent.write(key(i), Some(val(i + 500)));
1903            }
1904            let parent_m = parent.merkleize(&db, None).await.unwrap();
1905
1906            // Child: update keys 20..30.
1907            let mut child = parent_m.new_batch::<Sha256>();
1908            for i in 20..30 {
1909                child = child.write(key(i), Some(val(i + 500)));
1910            }
1911            let child_m = child.merkleize(&db, None).await.unwrap();
1912            db.apply_batch(child_m).await.unwrap();
1913
1914            // Floor must have advanced.
1915            assert!(db.inactivity_floor_loc() > floor_before);
1916
1917            // All keys should be accessible.
1918            for i in 0..30 {
1919                assert_eq!(
1920                    db.get(&key(i)).await.unwrap(),
1921                    Some(val(i + 500)),
1922                    "updated key {i} mismatch"
1923                );
1924            }
1925            for i in 30..50 {
1926                assert_eq!(
1927                    db.get(&key(i)).await.unwrap(),
1928                    Some(val(i)),
1929                    "untouched key {i} mismatch"
1930                );
1931            }
1932
1933            db.destroy().await.unwrap();
1934        });
1935    }
1936
1937    /// Dropping a batch without applying it leaves the DB unchanged.
1938    #[test_traced("INFO")]
1939    fn test_any_batch_abandoned() {
1940        let executor = deterministic::Runner::default();
1941        executor.start(|context| async move {
1942            let ctx = context.with_label("db");
1943            let mut db: UnorderedVariable =
1944                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("ab", &ctx))
1945                    .await
1946                    .unwrap();
1947
1948            commit_writes(&mut db, [(key(0), Some(val(0)))], None).await;
1949            let root_before = db.root();
1950
1951            // Create, populate, merkleize, then drop without apply.
1952            {
1953                let mut batch = db.new_batch();
1954                batch = batch.write(key(0), Some(val(999)));
1955                batch = batch.write(key(1), Some(val(1)));
1956                let _merkleized = batch.merkleize(&db, None).await.unwrap();
1957                // dropped here
1958            }
1959
1960            // DB state is identical.
1961            assert_eq!(db.root(), root_before);
1962            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
1963            assert_eq!(db.get(&key(1)).await.unwrap(), None);
1964
1965            db.destroy().await.unwrap();
1966        });
1967    }
1968
1969    /// Applying without `commit()` publishes in memory but is not recovered after reopen.
1970    #[test_traced("INFO")]
1971    fn test_any_batch_apply_requires_commit_for_recovery() {
1972        let executor = deterministic::Runner::default();
1973        executor.start(|context| async move {
1974            let partition = "apply_requires_commit";
1975            let ctx = context.with_label("db");
1976            let mut db: UnorderedVariable = UnorderedVariableDb::init(
1977                ctx.clone(),
1978                variable_db_config::<OneCap>(partition, &ctx),
1979            )
1980            .await
1981            .unwrap();
1982
1983            let committed_root = db.root();
1984
1985            let merkleized = db
1986                .new_batch()
1987                .write(key(0), Some(val(0)))
1988                .merkleize(&db, None)
1989                .await
1990                .unwrap();
1991            db.apply_batch(merkleized).await.unwrap();
1992
1993            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
1994
1995            drop(db);
1996
1997            let reopened: UnorderedVariable = UnorderedVariableDb::init(
1998                context.with_label("reopen"),
1999                variable_db_config::<OneCap>(partition, &context),
2000            )
2001            .await
2002            .unwrap();
2003            assert_eq!(reopened.root(), committed_root);
2004            assert_eq!(reopened.get(&key(0)).await.unwrap(), None);
2005
2006            reopened.destroy().await.unwrap();
2007        });
2008    }
2009
2010    /// Rewinding to a pruned target returns an error.
2011    #[test_traced("INFO")]
2012    fn test_any_rewind_pruned_target_errors() {
2013        let executor = deterministic::Runner::default();
2014        executor.start(|context| async move {
2015            const KEYS: u64 = 64;
2016
2017            let ctx = context.with_label("db");
2018            let mut db: UnorderedVariable =
2019                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("rp", &ctx))
2020                    .await
2021                    .unwrap();
2022
2023            let initial: Vec<_> = (0..KEYS).map(|i| (key(i), Some(val(i)))).collect();
2024            let first_range = commit_writes(&mut db, initial, None).await;
2025
2026            let mut round = 0u64;
2027            loop {
2028                round += 1;
2029                assert!(
2030                    round <= 64,
2031                    "failed to prune enough history for rewind test"
2032                );
2033
2034                let updates: Vec<_> = (0..KEYS)
2035                    .map(|i| (key(i), Some(val(1000 + round * KEYS + i))))
2036                    .collect();
2037                commit_writes(&mut db, updates, None).await;
2038
2039                db.prune(db.inactivity_floor_loc()).await.unwrap();
2040                let bounds = db.bounds().await;
2041                if bounds.start > first_range.start {
2042                    break;
2043                }
2044            }
2045
2046            let oldest_retained = db.bounds().await.start;
2047            let boundary_err = db.rewind(oldest_retained).await.unwrap_err();
2048            assert!(
2049                matches!(
2050                    boundary_err,
2051                    crate::qmdb::Error::Journal(crate::journal::Error::ItemPruned(_))
2052                ),
2053                "unexpected rewind error at retained boundary: {boundary_err:?}"
2054            );
2055
2056            let err = db.rewind(first_range.start).await.unwrap_err();
2057            assert!(
2058                matches!(
2059                    err,
2060                    crate::qmdb::Error::Journal(crate::journal::Error::ItemPruned(_))
2061                ),
2062                "unexpected rewind error: {err:?}"
2063            );
2064
2065            db.destroy().await.unwrap();
2066        });
2067    }
2068
2069    /// Rewinding rejects out-of-range targets and keeps state unchanged.
2070    #[test_traced("INFO")]
2071    fn test_any_rewind_invalid_target_errors() {
2072        let executor = deterministic::Runner::default();
2073        executor.start(|context| async move {
2074            let ctx = context.with_label("db");
2075            let mut db: UnorderedVariable =
2076                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("ri", &ctx))
2077                    .await
2078                    .unwrap();
2079
2080            let root_before = db.root();
2081            let size_before = db.size().await;
2082            let no_op_locs = db.rewind(size_before).await.unwrap();
2083            assert!(
2084                no_op_locs.is_empty(),
2085                "expected no-op rewind to return no restored locations"
2086            );
2087            assert_eq!(db.root(), root_before);
2088            assert_eq!(db.size().await, size_before);
2089
2090            let zero_err = db.rewind(Location::new(0)).await.unwrap_err();
2091            assert!(
2092                matches!(
2093                    zero_err,
2094                    crate::qmdb::Error::Journal(crate::journal::Error::InvalidRewind(0))
2095                ),
2096                "unexpected rewind error: {zero_err:?}"
2097            );
2098            assert_eq!(db.root(), root_before);
2099            assert_eq!(db.size().await, size_before);
2100
2101            let too_large_target = Location::new(*size_before + 1);
2102            let too_large_err = db.rewind(too_large_target).await.unwrap_err();
2103            assert!(
2104                matches!(
2105                    too_large_err,
2106                    crate::qmdb::Error::Journal(crate::journal::Error::InvalidRewind(size))
2107                    if size == *too_large_target
2108                ),
2109                "unexpected rewind error: {too_large_err:?}"
2110            );
2111            assert_eq!(db.root(), root_before);
2112            assert_eq!(db.size().await, size_before);
2113
2114            db.destroy().await.unwrap();
2115        });
2116    }
2117
2118    /// Rewinding fails when the target commit's inactivity floor has been pruned, even if the
2119    /// target commit location is still retained.
2120    #[test_traced("INFO")]
2121    fn test_any_rewind_rejects_target_with_pruned_floor() {
2122        let executor = deterministic::Runner::default();
2123        executor.start(|context| async move {
2124            const KEYS: u64 = 64;
2125
2126            let ctx = context.with_label("db");
2127            let mut db: UnorderedVariable =
2128                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("rf", &ctx))
2129                    .await
2130                    .unwrap();
2131
2132            commit_writes(&mut db, (0..KEYS).map(|i| (key(i), Some(val(i)))), None).await;
2133            commit_writes(
2134                &mut db,
2135                (0..KEYS).map(|i| (key(i), Some(val(1_000 + i)))),
2136                None,
2137            )
2138            .await;
2139
2140            let rewind_target = db.size().await;
2141            let target_floor = db.inactivity_floor_loc();
2142            let prune_loc = Location::new(*target_floor + (KEYS / 2));
2143            assert!(
2144                rewind_target > *prune_loc,
2145                "test setup expected target size > prune_loc; target={rewind_target:?}, floor={target_floor:?}"
2146            );
2147
2148            let mut round = 0u64;
2149            while db.inactivity_floor_loc() < prune_loc {
2150                round += 1;
2151                assert!(
2152                    round <= 8,
2153                    "failed to advance inactivity floor enough for floor-pruned rewind test"
2154                );
2155                commit_writes(
2156                    &mut db,
2157                    (0..KEYS).map(|i| (key(i), Some(val(10_000 + round * KEYS + i)))),
2158                    None,
2159                )
2160                .await;
2161            }
2162
2163            db.prune(prune_loc).await.unwrap();
2164            let bounds = db.bounds().await;
2165            assert!(
2166                bounds.start > *target_floor,
2167                "test setup expected pruned start beyond target floor; bounds={bounds:?}, target_floor={target_floor:?}"
2168            );
2169            assert!(
2170                rewind_target > bounds.start,
2171                "test setup expected target commit retained; target={rewind_target:?}, bounds={bounds:?}"
2172            );
2173
2174            let err = db.rewind(rewind_target).await.unwrap_err();
2175            assert!(
2176                matches!(
2177                    err,
2178                    crate::qmdb::Error::Journal(crate::journal::Error::ItemPruned(_))
2179                ),
2180                "unexpected rewind error: {err:?}"
2181            );
2182
2183            db.destroy().await.unwrap();
2184        });
2185    }
2186
2187    // --- MMB family tests ---
2188    //
2189    // The tests above use MMR-backed databases (via the concrete Db type aliases). The tests
2190    // below verify the same core operations work with the MMB family, exercising the generic
2191    // `init_fixed`/`init_variable` path with `mmb::Family`.
2192
2193    type MmbVariable = super::db::Db<
2194        crate::merkle::mmb::Family,
2195        Context,
2196        crate::journal::contiguous::variable::Journal<
2197            Context,
2198            super::operation::Operation<
2199                crate::merkle::mmb::Family,
2200                super::operation::update::Unordered<Digest, super::value::VariableEncoding<Digest>>,
2201            >,
2202        >,
2203        crate::index::unordered::Index<OneCap, crate::merkle::Location<crate::merkle::mmb::Family>>,
2204        Sha256,
2205        super::operation::update::Unordered<Digest, super::value::VariableEncoding<Digest>>,
2206    >;
2207
2208    async fn open_mmb_db(context: Context, suffix: &str) -> MmbVariable {
2209        let cfg = variable_db_config::<OneCap>(suffix, &context);
2210        super::init(context, cfg, None, |_, _| {}).await.unwrap()
2211    }
2212
2213    async fn commit_writes_mmb(
2214        db: &mut MmbVariable,
2215        writes: impl IntoIterator<Item = (Digest, Option<Digest>)>,
2216        metadata: Option<Digest>,
2217    ) {
2218        let mut batch = db.new_batch();
2219        for (k, v) in writes {
2220            batch = batch.write(k, v);
2221        }
2222        let merkleized = batch.merkleize(db, metadata).await.unwrap();
2223        db.apply_batch(merkleized).await.unwrap();
2224        db.commit().await.unwrap();
2225    }
2226
2227    #[test_traced("INFO")]
2228    fn test_mmb_batch_crud() {
2229        let executor = deterministic::Runner::default();
2230        executor.start(|context| async move {
2231            let mut db = open_mmb_db(context.with_label("db"), "crud").await;
2232
2233            // Insert and read back.
2234            commit_writes_mmb(&mut db, [(key(0), Some(val(0)))], None).await;
2235            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2236
2237            // Update existing key.
2238            commit_writes_mmb(&mut db, [(key(0), Some(val(1)))], None).await;
2239            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(1)));
2240
2241            // Delete key.
2242            commit_writes_mmb(&mut db, [(key(0), None)], None).await;
2243            assert!(db.get(&key(0)).await.unwrap().is_none());
2244
2245            // Multiple keys.
2246            commit_writes_mmb(
2247                &mut db,
2248                [(key(1), Some(val(1))), (key(2), Some(val(2)))],
2249                None,
2250            )
2251            .await;
2252            assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
2253            assert_eq!(db.get(&key(2)).await.unwrap(), Some(val(2)));
2254
2255            db.destroy().await.unwrap();
2256        });
2257    }
2258
2259    #[test_traced("INFO")]
2260    fn test_mmb_batch_empty() {
2261        let executor = deterministic::Runner::default();
2262        executor.start(|context| async move {
2263            let mut db = open_mmb_db(context.with_label("db"), "empty").await;
2264            let root_before = db.root();
2265
2266            let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
2267            db.apply_batch(merkleized).await.unwrap();
2268            assert_ne!(db.root(), root_before);
2269
2270            commit_writes_mmb(&mut db, [(key(0), Some(val(0)))], None).await;
2271            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2272
2273            db.destroy().await.unwrap();
2274        });
2275    }
2276
2277    #[test_traced("INFO")]
2278    fn test_mmb_batch_metadata() {
2279        let executor = deterministic::Runner::default();
2280        executor.start(|context| async move {
2281            let mut db = open_mmb_db(context.with_label("db"), "meta").await;
2282
2283            let metadata = val(42);
2284            commit_writes_mmb(&mut db, [(key(0), Some(val(0)))], Some(metadata)).await;
2285            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
2286
2287            let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
2288            db.apply_batch(merkleized).await.unwrap();
2289            assert_eq!(db.get_metadata().await.unwrap(), None);
2290
2291            db.destroy().await.unwrap();
2292        });
2293    }
2294
2295    #[test_traced("WARN")]
2296    fn test_mmb_recovery() {
2297        let executor = deterministic::Runner::default();
2298        executor.start(|context| async move {
2299            let mut db = open_mmb_db(context.with_label("db0"), "recovery").await;
2300
2301            commit_writes_mmb(&mut db, [(key(0), Some(val(0)))], Some(val(99))).await;
2302            commit_writes_mmb(&mut db, [(key(1), Some(val(1)))], None).await;
2303
2304            let root = db.root();
2305            let bounds = db.bounds().await;
2306            db.sync().await.unwrap();
2307            drop(db);
2308
2309            // Reopen and verify state.
2310            let db = open_mmb_db(context.with_label("db1"), "recovery").await;
2311            assert_eq!(db.root(), root);
2312            assert_eq!(db.bounds().await, bounds);
2313            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2314            assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
2315            assert_eq!(db.get_metadata().await.unwrap(), None);
2316
2317            db.destroy().await.unwrap();
2318        });
2319    }
2320
2321    #[test_traced("INFO")]
2322    fn test_mmb_prune() {
2323        let executor = deterministic::Runner::default();
2324        executor.start(|context| async move {
2325            let mut db = open_mmb_db(context.with_label("db"), "prune").await;
2326
2327            for i in 0u64..20 {
2328                commit_writes_mmb(&mut db, [(key(i), Some(val(i)))], None).await;
2329            }
2330
2331            let floor = db.inactivity_floor_loc();
2332            db.prune(floor).await.unwrap();
2333
2334            // All keys still accessible.
2335            for i in 0u64..20 {
2336                assert_eq!(db.get(&key(i)).await.unwrap(), Some(val(i)));
2337            }
2338
2339            db.destroy().await.unwrap();
2340        });
2341    }
2342
2343    /// One-stage pipelining lets the next batch be built while the prior batch commits.
2344    #[test_traced("INFO")]
2345    fn test_any_batch_single_stage_pipeline() {
2346        let executor = deterministic::Runner::default();
2347        executor.start(|context| async move {
2348            let ctx = context.with_label("db");
2349            let mut db: UnorderedVariable =
2350                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("pipe", &ctx))
2351                    .await
2352                    .unwrap();
2353
2354            {
2355                let mut batch = db.new_batch();
2356                batch = batch.write(key(0), Some(val(0)));
2357                let merkleized = batch.merkleize(&db, None).await.unwrap();
2358                db.apply_batch(merkleized).await.unwrap();
2359            }
2360
2361            let (child_merkleized, commit_result) = futures::join!(
2362                async {
2363                    assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2364                    let mut child = db.new_batch();
2365                    child = child.write(key(1), Some(val(1)));
2366                    child.merkleize(&db, None).await.unwrap()
2367                },
2368                db.commit(),
2369            );
2370            commit_result.unwrap();
2371
2372            db.apply_batch(child_merkleized).await.unwrap();
2373            db.commit().await.unwrap();
2374
2375            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2376            assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
2377
2378            db.destroy().await.unwrap();
2379        });
2380    }
2381}