Skip to main content

commonware_storage/qmdb/any/
mod.rs

1//! An _Any_ authenticated database provides succinct proofs of any value ever associated with a
2//! key.
3//!
4//! The specific variants provided within this module include:
5//! - Unordered: The database does not maintain or require any ordering over the key space.
6//!   - Fixed-size values
7//!   - Variable-size values
8//! - Ordered: The database maintains a total order over active keys.
9//!   - Fixed-size values
10//!   - Variable-size values
11//!
12//! # Examples
13//!
14//! ```ignore
15//! // Create a batch, stage mutations, merkleize, and apply.
16//! let mut batch = db.new_batch();
17//! batch.write(key, Some(value));   // upsert
18//! batch.write(other_key, None);    // delete
19//! let merkleized = batch.merkleize(None).await?;
20//! let root = merkleized.root();
21//! let finalized = merkleized.finalize();
22//! db.apply_batch(finalized).await?;
23//!
24//! // Batches can fork: a merkleized batch can spawn child batches
25//! // to explore speculative states without committing the parent.
26//! let mut batch = db.new_batch();
27//! batch.write(key_a, Some(val_a));
28//! let parent = batch.merkleize(None).await?;
29//!
30//! let mut child = parent.new_batch();
31//! child.write(key_b, Some(val_b));
32//! let merkleized_child = child.merkleize(None).await?;
33//! // Finalize the child (consumes the entire chain).
34//! let finalized = merkleized_child.finalize();
35//! db.apply_batch(finalized).await?;
36//! ```
37
38use crate::{
39    index::Unordered as UnorderedIndex,
40    journal::{
41        authenticated,
42        contiguous::{
43            fixed::{Config as FConfig, Journal as FJournal},
44            variable::{Config as VConfig, Journal as VJournal},
45        },
46    },
47    mmr::{journaled::Config as MmrConfig, Location},
48    qmdb::{
49        any::operation::{Operation, Update},
50        operation::{Committable, Key},
51        Error,
52    },
53    translator::Translator,
54};
55use commonware_codec::{Codec, CodecFixedShared, Read};
56use commonware_cryptography::Hasher;
57use commonware_parallel::ThreadPool;
58use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage};
59use commonware_utils::Array;
60use std::num::{NonZeroU64, NonZeroUsize};
61use tracing::warn;
62
63pub mod batch;
64pub(crate) mod db;
65pub(crate) mod operation;
66#[cfg(any(test, feature = "test-traits"))]
67pub mod traits;
68pub(crate) mod value;
69pub(crate) use value::{FixedValue, ValueEncoding, VariableValue};
70pub mod ordered;
71pub(crate) mod sync;
72pub mod unordered;
73
74/// Configuration for an `Any` authenticated db with fixed-size values.
75#[derive(Clone)]
76pub struct FixedConfig<T: Translator> {
77    /// The name of the [Storage] partition used for the MMR's backing journal.
78    pub mmr_journal_partition: String,
79
80    /// The items per blob configuration value used by the MMR journal.
81    pub mmr_items_per_blob: NonZeroU64,
82
83    /// The size of the write buffer to use for each blob in the MMR journal.
84    pub mmr_write_buffer: NonZeroUsize,
85
86    /// The name of the [Storage] partition used for the MMR's metadata.
87    pub mmr_metadata_partition: String,
88
89    /// The name of the [Storage] partition used to persist the (pruned) log of operations.
90    pub log_journal_partition: String,
91
92    /// The items per blob configuration value used by the log journal.
93    pub log_items_per_blob: NonZeroU64,
94
95    /// The size of the write buffer to use for each blob in the log journal.
96    pub log_write_buffer: NonZeroUsize,
97
98    /// The translator used by the compressed index.
99    pub translator: T,
100
101    /// An optional thread pool to use for parallelizing batch operations.
102    pub thread_pool: Option<ThreadPool>,
103
104    /// The page cache to use for caching data.
105    pub page_cache: CacheRef,
106}
107
108/// Configuration for an `Any` authenticated db with variable-sized values.
109#[derive(Clone)]
110pub struct VariableConfig<T: Translator, C> {
111    /// The name of the [Storage] partition used for the MMR's backing journal.
112    pub mmr_journal_partition: String,
113
114    /// The items per blob configuration value used by the MMR journal.
115    pub mmr_items_per_blob: NonZeroU64,
116
117    /// The size of the write buffer to use for each blob in the MMR journal.
118    pub mmr_write_buffer: NonZeroUsize,
119
120    /// The name of the [Storage] partition used for the MMR's metadata.
121    pub mmr_metadata_partition: String,
122
123    /// The name of the [Storage] partition used to persist the log of operations.
124    pub log_partition: String,
125
126    /// The size of the write buffer to use for each blob in the log journal.
127    pub log_write_buffer: NonZeroUsize,
128
129    /// Optional compression level (using `zstd`) to apply to log data before storing.
130    pub log_compression: Option<u8>,
131
132    /// The codec configuration to use for encoding and decoding log items.
133    pub log_codec_config: C,
134
135    /// The number of items to put in each blob of the journal.
136    pub log_items_per_blob: NonZeroU64,
137
138    /// The translator used by the compressed index.
139    pub translator: T,
140
141    /// An optional thread pool to use for parallelizing batch operations.
142    pub thread_pool: Option<ThreadPool>,
143
144    /// The page cache to use for caching data.
145    pub page_cache: CacheRef,
146}
147
148/// Shared initialization logic for fixed-sized value [db::Db].
149pub(super) async fn init_fixed<E, K, V, U, H, T, I, F, NewIndex>(
150    context: E,
151    cfg: FixedConfig<T>,
152    known_inactivity_floor: Option<Location>,
153    callback: F,
154    new_index: NewIndex,
155) -> Result<db::Db<E, FJournal<E, Operation<K, V, U>>, I, H, U>, Error>
156where
157    E: Storage + Clock + Metrics,
158    K: Array,
159    V: ValueEncoding,
160    U: Update<K, V> + Send + Sync,
161    H: Hasher,
162    T: Translator,
163    I: UnorderedIndex<Value = Location>,
164    F: FnMut(bool, Option<Location>),
165    NewIndex: FnOnce(E, T) -> I,
166    Operation<K, V, U>: CodecFixedShared + Committable,
167{
168    let mmr_config = MmrConfig {
169        journal_partition: cfg.mmr_journal_partition,
170        metadata_partition: cfg.mmr_metadata_partition,
171        items_per_blob: cfg.mmr_items_per_blob,
172        write_buffer: cfg.mmr_write_buffer,
173        thread_pool: cfg.thread_pool,
174        page_cache: cfg.page_cache.clone(),
175    };
176
177    let journal_config = FConfig {
178        partition: cfg.log_journal_partition,
179        items_per_blob: cfg.log_items_per_blob,
180        write_buffer: cfg.log_write_buffer,
181        page_cache: cfg.page_cache,
182    };
183
184    let mut log = authenticated::Journal::<_, FJournal<_, _>, _>::new(
185        context.with_label("log"),
186        mmr_config,
187        journal_config,
188        Operation::is_commit,
189    )
190    .await?;
191
192    if log.size().await == 0 {
193        warn!("Authenticated log is empty, initializing new db");
194        let commit_floor = Operation::CommitFloor(None, Location::new(0));
195        log.append(&commit_floor).await?;
196        log.sync().await?;
197    }
198
199    let index = new_index(context.with_label("index"), cfg.translator);
200    db::Db::init_from_log(index, log, known_inactivity_floor, callback).await
201}
202
203/// Shared initialization logic for variable-sized value [db::Db].
204pub(super) async fn init_variable<E, K, V, U, H, T, I, F, NewIndex>(
205    context: E,
206    cfg: VariableConfig<T, <Operation<K, V, U> as Read>::Cfg>,
207    known_inactivity_floor: Option<Location>,
208    callback: F,
209    new_index: NewIndex,
210) -> Result<db::Db<E, VJournal<E, Operation<K, V, U>>, I, H, U>, Error>
211where
212    E: Storage + Clock + Metrics,
213    K: Key,
214    V: ValueEncoding,
215    U: Update<K, V> + Send + Sync,
216    H: Hasher,
217    T: Translator,
218    I: UnorderedIndex<Value = Location>,
219    F: FnMut(bool, Option<Location>),
220    NewIndex: FnOnce(E, T) -> I,
221    Operation<K, V, U>: Codec + Committable,
222{
223    let mmr_config = MmrConfig {
224        journal_partition: cfg.mmr_journal_partition,
225        metadata_partition: cfg.mmr_metadata_partition,
226        items_per_blob: cfg.mmr_items_per_blob,
227        write_buffer: cfg.mmr_write_buffer,
228        thread_pool: cfg.thread_pool,
229        page_cache: cfg.page_cache.clone(),
230    };
231
232    let journal_config = VConfig {
233        partition: cfg.log_partition,
234        items_per_section: cfg.log_items_per_blob,
235        compression: cfg.log_compression,
236        codec_config: cfg.log_codec_config,
237        page_cache: cfg.page_cache,
238        write_buffer: cfg.log_write_buffer,
239    };
240
241    let mut log = authenticated::Journal::<_, VJournal<_, _>, _>::new(
242        context.with_label("log"),
243        mmr_config,
244        journal_config,
245        Operation::is_commit,
246    )
247    .await?;
248
249    if log.size().await == 0 {
250        warn!("Authenticated log is empty, initializing new db");
251        let commit_floor = Operation::CommitFloor(None, Location::new(0));
252        log.append(&commit_floor).await?;
253        log.sync().await?;
254    }
255
256    let index = new_index(context.with_label("index"), cfg.translator);
257    db::Db::init_from_log(index, log, known_inactivity_floor, callback).await
258}
259
260#[cfg(test)]
261// pub(crate) so qmdb/current can use the generic tests.
262pub(crate) mod test {
263    use super::*;
264    use crate::{
265        qmdb::any::{FixedConfig, VariableConfig},
266        translator::OneCap,
267    };
268    use commonware_utils::{NZUsize, NZU16, NZU64};
269    use std::num::NonZeroU16;
270
271    // Janky page & cache sizes to exercise boundary conditions.
272    const PAGE_SIZE: NonZeroU16 = NZU16!(101);
273    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(11);
274
275    pub(crate) fn fixed_db_config<T: Translator + Default>(
276        suffix: &str,
277        pooler: &impl BufferPooler,
278    ) -> FixedConfig<T> {
279        FixedConfig {
280            mmr_journal_partition: format!("journal-{suffix}"),
281            mmr_metadata_partition: format!("metadata-{suffix}"),
282            mmr_items_per_blob: NZU64!(11),
283            mmr_write_buffer: NZUsize!(1024),
284            log_journal_partition: format!("log-journal-{suffix}"),
285            log_items_per_blob: NZU64!(7),
286            log_write_buffer: NZUsize!(1024),
287            translator: T::default(),
288            thread_pool: None,
289            page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
290        }
291    }
292
293    pub(crate) fn variable_db_config<T: Translator + Default>(
294        suffix: &str,
295        pooler: &impl BufferPooler,
296    ) -> VariableConfig<T, ((), ())> {
297        VariableConfig {
298            mmr_journal_partition: format!("journal-{suffix}"),
299            mmr_metadata_partition: format!("metadata-{suffix}"),
300            mmr_items_per_blob: NZU64!(11),
301            mmr_write_buffer: NZUsize!(1024),
302            log_partition: format!("log-journal-{suffix}"),
303            log_items_per_blob: NZU64!(7),
304            log_write_buffer: NZUsize!(1024),
305            log_compression: None,
306            log_codec_config: ((), ()),
307            translator: T::default(),
308            thread_pool: None,
309            page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
310        }
311    }
312
313    use crate::{
314        kv::Gettable,
315        mmr::Location,
316        qmdb::{
317            any::traits::{DbAny, MerkleizedBatch as _, UnmerkleizedBatch as _},
318            store::MerkleizedStore,
319        },
320    };
321    use commonware_codec::{Codec, CodecShared};
322    use commonware_cryptography::{sha256::Digest, Sha256};
323    use commonware_runtime::{deterministic::Context, BufferPooler};
324    use core::{future::Future, pin::Pin};
325    use std::collections::HashMap;
326
327    /// Test recovery on non-empty db.
328    pub(crate) async fn test_any_db_non_empty_recovery<D, V: Clone + CodecShared>(
329        context: Context,
330        mut db: D,
331        reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
332        make_value: impl Fn(u64) -> V,
333    ) where
334        D: DbAny + Gettable<Key = Digest> + MerkleizedStore<Value = V, Digest = Digest>,
335    {
336        const ELEMENTS: u64 = 1000;
337
338        // Commit initial batch.
339        let finalized = {
340            let mut batch = db.new_batch();
341            for i in 0u64..ELEMENTS {
342                let k = Sha256::hash(&i.to_be_bytes());
343                let v = make_value(i * 1000);
344                batch.write(k, Some(v));
345            }
346            batch.merkleize(None).await.unwrap().finalize()
347        };
348        db.apply_batch(finalized).await.unwrap();
349        db.prune(db.inactivity_floor_loc().await).await.unwrap();
350        let root = db.root();
351        let op_count = db.size().await;
352        let inactivity_floor_loc = db.inactivity_floor_loc().await;
353
354        let db = reopen_db(context.with_label("reopen1")).await;
355        assert_eq!(db.size().await, op_count);
356        assert_eq!(db.inactivity_floor_loc().await, inactivity_floor_loc);
357        assert_eq!(db.root(), root);
358
359        // Write without applying (unapplied batch should be lost on reopen).
360        {
361            let mut batch = db.new_batch();
362            for i in 0u64..ELEMENTS {
363                let k = Sha256::hash(&i.to_be_bytes());
364                let v = make_value((i + 1) * 10000);
365                batch.write(k, Some(v));
366            }
367            let _ = batch.merkleize(None).await.unwrap().finalize();
368        }
369        let db = reopen_db(context.with_label("reopen2")).await;
370        assert_eq!(db.size().await, op_count);
371        assert_eq!(db.inactivity_floor_loc().await, inactivity_floor_loc);
372        assert_eq!(db.root(), root);
373
374        // Write without applying again.
375        {
376            let mut batch = db.new_batch();
377            for i in 0u64..ELEMENTS {
378                let k = Sha256::hash(&i.to_be_bytes());
379                let v = make_value((i + 1) * 10000);
380                batch.write(k, Some(v));
381            }
382            let _ = batch.merkleize(None).await.unwrap().finalize();
383        }
384        let db = reopen_db(context.with_label("reopen3")).await;
385        assert_eq!(db.size().await, op_count);
386        assert_eq!(db.root(), root);
387
388        // Three rounds of unapplied batches.
389        for _ in 0..3 {
390            let mut batch = db.new_batch();
391            for i in 0u64..ELEMENTS {
392                let k = Sha256::hash(&i.to_be_bytes());
393                let v = make_value((i + 1) * 10000);
394                batch.write(k, Some(v));
395            }
396            let _ = batch.merkleize(None).await.unwrap().finalize();
397        }
398        let mut db = reopen_db(context.with_label("reopen4")).await;
399        assert_eq!(db.size().await, op_count);
400        assert_eq!(db.root(), root);
401
402        // Now actually commit a batch.
403        let finalized = {
404            let mut batch = db.new_batch();
405            for i in 0u64..ELEMENTS {
406                let k = Sha256::hash(&i.to_be_bytes());
407                let v = make_value((i + 1) * 10000);
408                batch.write(k, Some(v));
409            }
410            batch.merkleize(None).await.unwrap().finalize()
411        };
412        db.apply_batch(finalized).await.unwrap();
413        let db = reopen_db(context.with_label("reopen5")).await;
414        assert!(db.size().await > op_count);
415        assert_ne!(db.inactivity_floor_loc().await, inactivity_floor_loc);
416        assert_ne!(db.root(), root);
417
418        db.destroy().await.unwrap();
419    }
420
421    /// Test recovery on empty db.
422    pub(crate) async fn test_any_db_empty_recovery<D, V: Clone + CodecShared>(
423        context: Context,
424        db: D,
425        reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
426        make_value: impl Fn(u64) -> V,
427    ) where
428        D: DbAny + Gettable<Key = Digest> + MerkleizedStore<Value = V, Digest = Digest>,
429    {
430        let root = db.root();
431
432        let db = reopen_db(context.with_label("reopen1")).await;
433        assert_eq!(db.size().await, 1);
434        assert_eq!(db.root(), root);
435
436        // Write without applying (unapplied batch should be lost on reopen).
437        {
438            let mut batch = db.new_batch();
439            for i in 0u64..1000 {
440                let k = Sha256::hash(&i.to_be_bytes());
441                let v = make_value((i + 1) * 10000);
442                batch.write(k, Some(v));
443            }
444            let _ = batch.merkleize(None).await.unwrap().finalize();
445        }
446        let db = reopen_db(context.with_label("reopen2")).await;
447        assert_eq!(db.size().await, 1);
448        assert_eq!(db.root(), root);
449
450        // Write without applying again.
451        {
452            let mut batch = db.new_batch();
453            for i in 0u64..1000 {
454                let k = Sha256::hash(&i.to_be_bytes());
455                let v = make_value((i + 1) * 10000);
456                batch.write(k, Some(v));
457            }
458            let _ = batch.merkleize(None).await.unwrap().finalize();
459        }
460        drop(db);
461        let db = reopen_db(context.with_label("reopen3")).await;
462        assert_eq!(db.size().await, 1);
463        assert_eq!(db.root(), root);
464
465        // Three rounds of unapplied batches.
466        for _ in 0..3 {
467            let mut batch = db.new_batch();
468            for i in 0u64..1000 {
469                let k = Sha256::hash(&i.to_be_bytes());
470                let v = make_value((i + 1) * 10000);
471                batch.write(k, Some(v));
472            }
473            let _ = batch.merkleize(None).await.unwrap().finalize();
474        }
475        drop(db);
476        let mut db = reopen_db(context.with_label("reopen4")).await;
477        assert_eq!(db.size().await, 1);
478        assert_eq!(db.root(), root);
479
480        // Now actually commit a batch.
481        let finalized = {
482            let mut batch = db.new_batch();
483            for i in 0u64..1000 {
484                let k = Sha256::hash(&i.to_be_bytes());
485                let v = make_value((i + 1) * 10000);
486                batch.write(k, Some(v));
487            }
488            batch.merkleize(None).await.unwrap().finalize()
489        };
490        db.apply_batch(finalized).await.unwrap();
491        drop(db);
492        let db = reopen_db(context.with_label("reopen5")).await;
493        assert!(db.size().await > 1);
494        assert_ne!(db.root(), root);
495
496        db.destroy().await.unwrap();
497    }
498
499    /// Test that a large mixed workload can be authenticated and replayed correctly.
500    pub(crate) async fn test_any_db_build_and_authenticate<D, V>(
501        context: Context,
502        mut db: D,
503        reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
504        make_value: impl Fn(u64) -> V,
505    ) where
506        D: DbAny + Gettable<Key = Digest> + MerkleizedStore<Value = V, Digest = Digest>,
507        V: CodecShared + Clone + Eq + std::hash::Hash + std::fmt::Debug,
508        <D as MerkleizedStore>::Operation: Codec,
509    {
510        use crate::{mmr::StandardHasher, qmdb::verify_proof};
511
512        const ELEMENTS: u64 = 1000;
513
514        let mut map = HashMap::<Digest, V>::default();
515        let finalized = {
516            let mut batch = db.new_batch();
517            for i in 0u64..ELEMENTS {
518                let k = Sha256::hash(&i.to_be_bytes());
519                let v = make_value(i * 1000);
520                batch.write(k, Some(v.clone()));
521                map.insert(k, v);
522            }
523
524            // Update every 3rd key.
525            for i in 0u64..ELEMENTS {
526                if i % 3 != 0 {
527                    continue;
528                }
529                let k = Sha256::hash(&i.to_be_bytes());
530                let v = make_value((i + 1) * 10000);
531                batch.write(k, Some(v.clone()));
532                map.insert(k, v);
533            }
534
535            // Delete every 7th key.
536            for i in 0u64..ELEMENTS {
537                if i % 7 != 1 {
538                    continue;
539                }
540                let k = Sha256::hash(&i.to_be_bytes());
541                batch.write(k, None);
542                map.remove(&k);
543            }
544
545            batch.merkleize(None).await.unwrap().finalize()
546        };
547        // Commit + sync with pruning raises inactivity floor.
548        db.apply_batch(finalized).await.unwrap();
549        db.sync().await.unwrap();
550        db.prune(db.inactivity_floor_loc().await).await.unwrap();
551
552        // Drop & reopen and ensure state matches.
553        let root = db.root();
554        db.sync().await.unwrap();
555        drop(db);
556        let db = reopen_db(context.with_label("reopened")).await;
557        assert_eq!(root, db.root());
558
559        // State matches reference map.
560        for i in 0u64..ELEMENTS {
561            let k = Sha256::hash(&i.to_be_bytes());
562            if let Some(map_value) = map.get(&k) {
563                let Some(db_value) = db.get(&k).await.unwrap() else {
564                    panic!("key not found in db: {k}");
565                };
566                assert_eq!(*map_value, db_value);
567            } else {
568                assert!(db.get(&k).await.unwrap().is_none());
569            }
570        }
571
572        let mut hasher = StandardHasher::<Sha256>::new();
573        let bounds = db.bounds().await;
574        let inactivity_floor = db.inactivity_floor_loc().await;
575        for loc in *inactivity_floor..*bounds.end {
576            let loc = Location::new(loc);
577            let (proof, ops) = db.proof(loc, NZU64!(10)).await.unwrap();
578            assert!(verify_proof(&mut hasher, &proof, loc, &ops, &root));
579        }
580
581        db.destroy().await.unwrap();
582    }
583
584    /// Test that replaying multiple updates of the same key on startup preserves correct state.
585    pub(crate) async fn test_any_db_log_replay<
586        D,
587        V: Clone + CodecShared + PartialEq + std::fmt::Debug,
588    >(
589        context: Context,
590        mut db: D,
591        reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
592        make_value: impl Fn(u64) -> V,
593    ) where
594        D: DbAny + Gettable<Key = Digest> + MerkleizedStore<Value = V, Digest = Digest>,
595    {
596        // Update the same key many times within a single batch.
597        const UPDATES: u64 = 100;
598        let k = Sha256::hash(&UPDATES.to_be_bytes());
599        let mut last_value = None;
600        let finalized = {
601            let mut batch = db.new_batch();
602            for i in 0u64..UPDATES {
603                let v = make_value(i * 1000);
604                last_value = Some(v.clone());
605                batch.write(k, Some(v));
606            }
607            batch.merkleize(None).await.unwrap().finalize()
608        };
609        db.apply_batch(finalized).await.unwrap();
610        let root = db.root();
611
612        // Reopen and verify the state is preserved correctly.
613        drop(db);
614        let db = reopen_db(context.with_label("reopened")).await;
615        assert_eq!(db.root(), root);
616        assert_eq!(db.get(&k).await.unwrap(), last_value);
617
618        db.destroy().await.unwrap();
619    }
620
621    /// Test that historical_proof returns correct proofs for past database states.
622    pub(crate) async fn test_any_db_historical_proof_basic<D, V: Clone + CodecShared>(
623        _context: Context,
624        mut db: D,
625        make_value: impl Fn(u64) -> V,
626    ) where
627        D: DbAny + Gettable<Key = Digest> + MerkleizedStore<Value = V, Digest = Digest>,
628        <D as MerkleizedStore>::Operation: Codec + PartialEq + std::fmt::Debug,
629    {
630        use crate::{mmr::StandardHasher, qmdb::verify_proof};
631        use commonware_utils::NZU64;
632
633        // Add some operations
634        const OPS: u64 = 20;
635        let finalized = {
636            let mut batch = db.new_batch();
637            for i in 0u64..OPS {
638                let k = Sha256::hash(&i.to_be_bytes());
639                let v = make_value(i * 1000);
640                batch.write(k, Some(v));
641            }
642            batch.merkleize(None).await.unwrap().finalize()
643        };
644        db.apply_batch(finalized).await.unwrap();
645        let root_hash = db.root();
646        let original_op_count = db.size().await;
647
648        // Historical proof should match "regular" proof when historical size == current database size
649        let max_ops = NZU64!(10);
650        let start_loc = Location::new(5);
651        let (historical_proof, historical_ops) = db
652            .historical_proof(original_op_count, start_loc, max_ops)
653            .await
654            .unwrap();
655        let (regular_proof, regular_ops) = db.proof(start_loc, max_ops).await.unwrap();
656
657        assert_eq!(historical_proof.leaves, regular_proof.leaves);
658        assert_eq!(historical_proof.digests, regular_proof.digests);
659        assert_eq!(historical_ops, regular_ops);
660        let mut hasher = StandardHasher::<Sha256>::new();
661        assert!(verify_proof(
662            &mut hasher,
663            &historical_proof,
664            start_loc,
665            &historical_ops,
666            &root_hash
667        ));
668
669        // Add more operations to the database
670        let finalized = {
671            let mut batch = db.new_batch();
672            for i in OPS..(OPS + 5) {
673                let k = Sha256::hash(&(i + 1000).to_be_bytes()); // different keys
674                let v = make_value(i * 1000);
675                batch.write(k, Some(v));
676            }
677            batch.merkleize(None).await.unwrap().finalize()
678        };
679        db.apply_batch(finalized).await.unwrap();
680
681        // Historical proof should remain the same even though database has grown
682        let (historical_proof2, historical_ops2) = db
683            .historical_proof(original_op_count, start_loc, max_ops)
684            .await
685            .unwrap();
686        assert_eq!(historical_proof2.leaves, original_op_count);
687        assert_eq!(historical_proof2.digests, regular_proof.digests);
688        assert_eq!(historical_ops2, regular_ops);
689        assert!(verify_proof(
690            &mut hasher,
691            &historical_proof2,
692            start_loc,
693            &historical_ops2,
694            &root_hash
695        ));
696
697        db.destroy().await.unwrap();
698    }
699
700    /// Test that tampering with historical proofs causes verification to fail.
701    pub(crate) async fn test_any_db_historical_proof_invalid<D, V: Clone + CodecShared>(
702        _context: Context,
703        mut db: D,
704        make_value: impl Fn(u64) -> V,
705    ) where
706        D: DbAny + Gettable<Key = Digest> + MerkleizedStore<Value = V, Digest = Digest>,
707        <D as MerkleizedStore>::Operation: Codec + PartialEq + std::fmt::Debug + Clone,
708    {
709        use crate::{mmr::StandardHasher, qmdb::verify_proof};
710        use commonware_utils::NZU64;
711
712        // Add some operations
713        let finalized = {
714            let mut batch = db.new_batch();
715            for i in 0u64..10 {
716                let k = Sha256::hash(&i.to_be_bytes());
717                let v = make_value(i * 1000);
718                batch.write(k, Some(v));
719            }
720            batch.merkleize(None).await.unwrap().finalize()
721        };
722        db.apply_batch(finalized).await.unwrap();
723
724        let historical_op_count = Location::new(5);
725        let (proof, ops) = db
726            .historical_proof(historical_op_count, Location::new(1), NZU64!(10))
727            .await
728            .unwrap();
729        assert_eq!(proof.leaves, historical_op_count);
730        assert_eq!(ops.len(), 4);
731
732        let mut hasher = StandardHasher::<Sha256>::new();
733
734        // Changing the proof digests should cause verification to fail
735        {
736            let mut tampered_proof = proof.clone();
737            tampered_proof.digests[0] = Sha256::hash(b"invalid");
738            let root_hash = db.root();
739            assert!(!verify_proof(
740                &mut hasher,
741                &tampered_proof,
742                Location::new(1),
743                &ops,
744                &root_hash
745            ));
746        }
747
748        // Appending an extra digest should cause verification to fail
749        {
750            let mut tampered_proof = proof.clone();
751            tampered_proof.digests.push(Sha256::hash(b"invalid"));
752            let root_hash = db.root();
753            assert!(!verify_proof(
754                &mut hasher,
755                &tampered_proof,
756                Location::new(1),
757                &ops,
758                &root_hash
759            ));
760        }
761
762        // Changing the ops should cause verification to fail
763        {
764            let root_hash = db.root();
765            let mut tampered_ops = ops.clone();
766            // Swap first two ops if we have at least 2
767            if tampered_ops.len() >= 2 {
768                tampered_ops.swap(0, 1);
769                assert!(!verify_proof(
770                    &mut hasher,
771                    &proof,
772                    Location::new(1),
773                    &tampered_ops,
774                    &root_hash
775                ));
776            }
777        }
778
779        // Appending an extra (duplicate) op should cause verification to fail
780        {
781            let root_hash = db.root();
782            let mut tampered_ops = ops.clone();
783            tampered_ops.push(tampered_ops[0].clone());
784            assert!(!verify_proof(
785                &mut hasher,
786                &proof,
787                Location::new(1),
788                &tampered_ops,
789                &root_hash
790            ));
791        }
792
793        // Changing the start location should cause verification to fail
794        {
795            let root_hash = db.root();
796            assert!(!verify_proof(
797                &mut hasher,
798                &proof,
799                Location::new(2),
800                &ops,
801                &root_hash
802            ));
803        }
804
805        // Changing the root digest should cause verification to fail
806        {
807            let invalid_root = Sha256::hash(b"invalid");
808            assert!(!verify_proof(
809                &mut hasher,
810                &proof,
811                Location::new(1),
812                &ops,
813                &invalid_root
814            ));
815        }
816
817        // Changing the proof leaves count should cause verification to fail
818        {
819            let mut tampered_proof = proof.clone();
820            tampered_proof.leaves = Location::new(100);
821            let root_hash = db.root();
822            assert!(!verify_proof(
823                &mut hasher,
824                &tampered_proof,
825                Location::new(1),
826                &ops,
827                &root_hash
828            ));
829        }
830
831        db.destroy().await.unwrap();
832    }
833
834    /// Test historical_proof edge cases: singleton db, limited ops, min position.
835    pub(crate) async fn test_any_db_historical_proof_edge_cases<D, V: Clone + CodecShared>(
836        _context: Context,
837        mut db: D,
838        make_value: impl Fn(u64) -> V,
839    ) where
840        D: DbAny + Gettable<Key = Digest> + MerkleizedStore<Value = V, Digest = Digest>,
841        <D as MerkleizedStore>::Operation: Codec + PartialEq + std::fmt::Debug,
842    {
843        use commonware_utils::NZU64;
844
845        // Add 50 operations
846        let finalized = {
847            let mut batch = db.new_batch();
848            for i in 0u64..50 {
849                let k = Sha256::hash(&i.to_be_bytes());
850                let v = make_value(i * 1000);
851                batch.write(k, Some(v));
852            }
853            batch.merkleize(None).await.unwrap().finalize()
854        };
855        db.apply_batch(finalized).await.unwrap();
856
857        // Test singleton database (historical size = 2 means 1 op after initial commit)
858        let (single_proof, single_ops) = db
859            .historical_proof(Location::new(2), Location::new(1), NZU64!(1))
860            .await
861            .unwrap();
862        assert_eq!(single_proof.leaves, Location::new(2));
863        assert_eq!(single_ops.len(), 1);
864
865        // Test requesting more operations than available in historical position
866        let (_limited_proof, limited_ops) = db
867            .historical_proof(Location::new(11), Location::new(6), NZU64!(20))
868            .await
869            .unwrap();
870        assert_eq!(limited_ops.len(), 5); // Should be limited by historical position
871
872        // Test proof at minimum historical position
873        let (min_proof, min_ops) = db
874            .historical_proof(Location::new(4), Location::new(1), NZU64!(3))
875            .await
876            .unwrap();
877        assert_eq!(min_proof.leaves, Location::new(4));
878        assert_eq!(min_ops.len(), 3);
879
880        db.destroy().await.unwrap();
881    }
882
883    /// Test making multiple commits, one of which deletes a key from a previous commit.
884    pub(crate) async fn test_any_db_multiple_commits_delete_replayed<D, V>(
885        context: Context,
886        mut db: D,
887        reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
888        make_value: impl Fn(u64) -> V,
889    ) where
890        D: DbAny + Gettable<Key = Digest> + MerkleizedStore<Value = V, Digest = Digest>,
891        V: Clone + CodecShared + Eq + std::fmt::Debug,
892    {
893        let mut map = HashMap::<Digest, V>::default();
894        const ELEMENTS: u64 = 10;
895        let metadata_value = make_value(42);
896        let key_at = |j: u64, i: u64| Sha256::hash(&(j * 1000 + i).to_be_bytes());
897        for j in 0u64..ELEMENTS {
898            let finalized = {
899                let mut batch = db.new_batch();
900                for i in 0u64..ELEMENTS {
901                    let k = key_at(j, i);
902                    let v = make_value(i * 1000);
903                    batch.write(k, Some(v.clone()));
904                    map.insert(k, v);
905                }
906                batch
907                    .merkleize(Some(metadata_value.clone()))
908                    .await
909                    .unwrap()
910                    .finalize()
911            };
912            db.apply_batch(finalized).await.unwrap();
913        }
914        assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_value));
915        let k = key_at(ELEMENTS - 1, ELEMENTS - 1);
916
917        let finalized = {
918            let mut batch = db.new_batch();
919            batch.write(k, None);
920            batch.merkleize(None).await.unwrap().finalize()
921        };
922        db.apply_batch(finalized).await.unwrap();
923        assert_eq!(db.get_metadata().await.unwrap(), None);
924        assert!(db.get(&k).await.unwrap().is_none());
925
926        let root = db.root();
927        drop(db);
928        let db = reopen_db(context.with_label("reopened")).await;
929        assert_eq!(root, db.root());
930        assert_eq!(db.get_metadata().await.unwrap(), None);
931        assert!(db.get(&k).await.unwrap().is_none());
932
933        db.destroy().await.unwrap();
934    }
935
936    use crate::qmdb::any::{
937        ordered::{fixed::Db as OrderedFixedDb, variable::Db as OrderedVariableDb},
938        unordered::{fixed::Db as UnorderedFixedDb, variable::Db as UnorderedVariableDb},
939    };
940    use commonware_macros::{test_group, test_traced};
941    use commonware_runtime::{deterministic, Runner as _};
942
943    // Type aliases for all 12 variants (all use OneCap for collision coverage).
944    type UnorderedFixed = UnorderedFixedDb<Context, Digest, Digest, Sha256, OneCap>;
945    type UnorderedVariable = UnorderedVariableDb<Context, Digest, Digest, Sha256, OneCap>;
946    type OrderedFixed = OrderedFixedDb<Context, Digest, Digest, Sha256, OneCap>;
947    type OrderedVariable = OrderedVariableDb<Context, Digest, Digest, Sha256, OneCap>;
948    type UnorderedFixedP1 =
949        unordered::fixed::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 1>;
950    type UnorderedVariableP1 =
951        unordered::variable::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 1>;
952    type OrderedFixedP1 =
953        ordered::fixed::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 1>;
954    type OrderedVariableP1 =
955        ordered::variable::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 1>;
956    type UnorderedFixedP2 =
957        unordered::fixed::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 2>;
958    type UnorderedVariableP2 =
959        unordered::variable::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 2>;
960    type OrderedFixedP2 =
961        ordered::fixed::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 2>;
962    type OrderedVariableP2 =
963        ordered::variable::partitioned::Db<Context, Digest, Digest, Sha256, OneCap, 2>;
964
965    #[inline]
966    fn to_digest(i: u64) -> Digest {
967        Sha256::hash(&i.to_be_bytes())
968    }
969
970    // Defines all 12 variants in one place. Calls $cb!($($args)*, $label, $type, $config) for each.
971    macro_rules! with_all_variants {
972        ($cb:ident!($($args:tt)*)) => {
973            $cb!($($args)*, "uf", UnorderedFixed, fixed_db_config);
974            $cb!($($args)*, "uv", UnorderedVariable, variable_db_config);
975            $cb!($($args)*, "of", OrderedFixed, fixed_db_config);
976            $cb!($($args)*, "ov", OrderedVariable, variable_db_config);
977            $cb!($($args)*, "ufp1", UnorderedFixedP1, fixed_db_config);
978            $cb!($($args)*, "uvp1", UnorderedVariableP1, variable_db_config);
979            $cb!($($args)*, "ofp1", OrderedFixedP1, fixed_db_config);
980            $cb!($($args)*, "ovp1", OrderedVariableP1, variable_db_config);
981            $cb!($($args)*, "ufp2", UnorderedFixedP2, fixed_db_config);
982            $cb!($($args)*, "uvp2", UnorderedVariableP2, variable_db_config);
983            $cb!($($args)*, "ofp2", OrderedFixedP2, fixed_db_config);
984            $cb!($($args)*, "ovp2", OrderedVariableP2, variable_db_config);
985        };
986    }
987
988    // Runner macros - each receives common args followed by (label, type, config) from with_all_variants.
989    // Uses Box::pin to move futures to the heap and avoid stack overflow.
990    macro_rules! test_with_reopen {
991        ($ctx:expr, $sfx:expr, $f:expr, $l:literal, $db:ty, $cfg:ident) => {{
992            let p = concat!($l, "_", $sfx);
993            Box::pin(async {
994                let ctx = $ctx.with_label($l);
995                let db = <$db>::init(ctx.clone(), $cfg::<OneCap>(p, &ctx))
996                    .await
997                    .unwrap();
998                $f(
999                    ctx,
1000                    db,
1001                    |ctx| {
1002                        Box::pin(async move {
1003                            <$db>::init(ctx.clone(), $cfg::<OneCap>(p, &ctx))
1004                                .await
1005                                .unwrap()
1006                        })
1007                    },
1008                    to_digest,
1009                )
1010                .await;
1011            })
1012            .await
1013        }};
1014    }
1015
1016    macro_rules! test_with_make_value {
1017        ($ctx:expr, $sfx:expr, $f:expr, $l:literal, $db:ty, $cfg:ident) => {{
1018            let p = concat!($l, "_", $sfx);
1019            Box::pin(async {
1020                let ctx = $ctx.with_label($l);
1021                let db = <$db>::init(ctx.clone(), $cfg::<OneCap>(p, &ctx))
1022                    .await
1023                    .unwrap();
1024                $f(ctx, db, to_digest).await;
1025            })
1026            .await
1027        }};
1028    }
1029
1030    // Macro to run a test on all 12 DB variants.
1031    macro_rules! for_all_variants {
1032        ($ctx:expr, $sfx:expr, with_reopen: $f:expr) => {{
1033            with_all_variants!(test_with_reopen!($ctx, $sfx, $f));
1034        }};
1035        ($ctx:expr, $sfx:expr, with_make_value: $f:expr) => {{
1036            with_all_variants!(test_with_make_value!($ctx, $sfx, $f));
1037        }};
1038    }
1039
1040    #[test_group("slow")]
1041    #[test_traced("WARN")]
1042    fn test_all_variants_log_replay() {
1043        let executor = deterministic::Runner::default();
1044        executor.start(|context| async move {
1045            for_all_variants!(context, "lr", with_reopen: test_any_db_log_replay);
1046        });
1047    }
1048
1049    #[test_group("slow")]
1050    #[test_traced("WARN")]
1051    fn test_all_variants_build_and_authenticate() {
1052        let executor = deterministic::Runner::default();
1053        executor.start(|context| async move {
1054            for_all_variants!(context, "baa", with_reopen: test_any_db_build_and_authenticate);
1055        });
1056    }
1057
1058    #[test_group("slow")]
1059    #[test_traced("WARN")]
1060    fn test_all_variants_historical_proof_basic() {
1061        let executor = deterministic::Runner::default();
1062        executor.start(|context| async move {
1063            for_all_variants!(context, "hpb", with_make_value: test_any_db_historical_proof_basic);
1064        });
1065    }
1066
1067    #[test_group("slow")]
1068    #[test_traced("WARN")]
1069    fn test_all_variants_historical_proof_invalid() {
1070        let executor = deterministic::Runner::default();
1071        executor.start(|context| async move {
1072            for_all_variants!(context, "hpi", with_make_value: test_any_db_historical_proof_invalid);
1073        });
1074    }
1075
1076    #[test_group("slow")]
1077    #[test_traced("WARN")]
1078    fn test_all_variants_historical_proof_edge_cases() {
1079        let executor = deterministic::Runner::default();
1080        executor.start(|context| async move {
1081            for_all_variants!(context, "hpec", with_make_value: test_any_db_historical_proof_edge_cases);
1082        });
1083    }
1084
1085    #[test_group("slow")]
1086    #[test_traced("WARN")]
1087    fn test_all_variants_multiple_commits_delete_replayed() {
1088        let executor = deterministic::Runner::default();
1089        executor.start(|context| async move {
1090            for_all_variants!(context, "mcdr", with_reopen: test_any_db_multiple_commits_delete_replayed);
1091        });
1092    }
1093
1094    #[test_group("slow")]
1095    #[test_traced("WARN")]
1096    fn test_all_variants_non_empty_recovery() {
1097        let executor = deterministic::Runner::default();
1098        executor.start(|context| async move {
1099            for_all_variants!(context, "ner", with_reopen: test_any_db_non_empty_recovery);
1100        });
1101    }
1102
1103    #[test_group("slow")]
1104    #[test_traced("WARN")]
1105    fn test_all_variants_empty_recovery() {
1106        let executor = deterministic::Runner::default();
1107        executor.start(|context| async move {
1108            for_all_variants!(context, "er", with_reopen: test_any_db_empty_recovery);
1109        });
1110    }
1111
1112    fn key(i: u64) -> Digest {
1113        Sha256::hash(&i.to_be_bytes())
1114    }
1115
1116    fn val(i: u64) -> Digest {
1117        Sha256::hash(&(i + 10000).to_be_bytes())
1118    }
1119
1120    /// Helper: commit a batch of key-value writes and return the applied range.
1121    async fn commit_writes(
1122        db: &mut UnorderedVariable,
1123        writes: impl IntoIterator<Item = (Digest, Option<Digest>)>,
1124        metadata: Option<Digest>,
1125    ) -> std::ops::Range<Location> {
1126        let mut batch = db.new_batch();
1127        for (k, v) in writes {
1128            batch.write(k, v);
1129        }
1130        let finalized = batch.merkleize(metadata).await.unwrap().finalize();
1131        db.apply_batch(finalized).await.unwrap()
1132    }
1133
1134    /// An empty batch (no mutations) still produces a valid commit.
1135    #[test_traced("INFO")]
1136    fn test_any_batch_empty() {
1137        let executor = deterministic::Runner::default();
1138        executor.start(|context| async move {
1139            let ctx = context.with_label("db");
1140            let mut db: UnorderedVariable =
1141                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("e", &ctx))
1142                    .await
1143                    .unwrap();
1144
1145            let root_before = db.root();
1146            let batch = db.new_batch();
1147            let finalized = batch.merkleize(None).await.unwrap().finalize();
1148            db.apply_batch(finalized).await.unwrap();
1149
1150            // A CommitFloor op was appended, so root must change.
1151            assert_ne!(db.root(), root_before);
1152
1153            // DB should still be functional.
1154            commit_writes(&mut db, [(key(0), Some(val(0)))], None).await;
1155            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
1156
1157            db.destroy().await.unwrap();
1158        });
1159    }
1160
1161    /// Metadata propagates through merkleize and clears with None.
1162    #[test_traced("INFO")]
1163    fn test_any_batch_metadata() {
1164        let executor = deterministic::Runner::default();
1165        executor.start(|context| async move {
1166            let ctx = context.with_label("db");
1167            let mut db: UnorderedVariable =
1168                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("m", &ctx))
1169                    .await
1170                    .unwrap();
1171
1172            let metadata = val(42);
1173
1174            // Batch with metadata.
1175            commit_writes(&mut db, [(key(0), Some(val(0)))], Some(metadata)).await;
1176            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
1177
1178            // Batch without metadata clears it.
1179            let batch = db.new_batch();
1180            let finalized = batch.merkleize(None).await.unwrap().finalize();
1181            db.apply_batch(finalized).await.unwrap();
1182            assert_eq!(db.get_metadata().await.unwrap(), None);
1183
1184            db.destroy().await.unwrap();
1185        });
1186    }
1187
1188    /// batch.get() reads through: pending mutations -> base DB.
1189    /// Updates shadow the base value; deletes hide the key.
1190    #[test_traced("INFO")]
1191    fn test_any_batch_get_read_through() {
1192        let executor = deterministic::Runner::default();
1193        executor.start(|context| async move {
1194            let ctx = context.with_label("db");
1195            let mut db: UnorderedVariable =
1196                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("g", &ctx))
1197                    .await
1198                    .unwrap();
1199
1200            // Pre-populate with key A.
1201            let ka = key(0);
1202            let va = val(0);
1203            commit_writes(&mut db, [(ka, Some(va))], None).await;
1204
1205            let kb = key(1);
1206            let vb = val(1);
1207            let kc = key(2);
1208
1209            let mut batch = db.new_batch();
1210
1211            // Read-through to base DB.
1212            assert_eq!(batch.get(&ka).await.unwrap(), Some(va));
1213
1214            // Pending mutation visible.
1215            batch.write(kb, Some(vb));
1216            assert_eq!(batch.get(&kb).await.unwrap(), Some(vb));
1217
1218            // Nonexistent key.
1219            assert_eq!(batch.get(&kc).await.unwrap(), None);
1220
1221            // Update shadows base DB value.
1222            let va2 = val(100);
1223            batch.write(ka, Some(va2));
1224            assert_eq!(batch.get(&ka).await.unwrap(), Some(va2));
1225
1226            // Delete hides the key.
1227            batch.write(ka, None);
1228            assert_eq!(batch.get(&ka).await.unwrap(), None);
1229
1230            db.destroy().await.unwrap();
1231        });
1232    }
1233
1234    /// merkleized.get() reflects the resolved diff after merkleize.
1235    #[test_traced("INFO")]
1236    fn test_any_batch_get_on_merkleized() {
1237        let executor = deterministic::Runner::default();
1238        executor.start(|context| async move {
1239            let ctx = context.with_label("db");
1240            let mut db: UnorderedVariable =
1241                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("mg", &ctx))
1242                    .await
1243                    .unwrap();
1244
1245            let ka = key(0);
1246            let kb = key(1);
1247            let kc = key(2);
1248            let kd = key(3);
1249
1250            // Pre-populate A and B.
1251            commit_writes(&mut db, [(ka, Some(val(0))), (kb, Some(val(1)))], None).await;
1252
1253            // Batch: update A, delete B, create C.
1254            let va2 = val(100);
1255            let vc = val(2);
1256            let mut batch = db.new_batch();
1257            batch.write(ka, Some(va2));
1258            batch.write(kb, None);
1259            batch.write(kc, Some(vc));
1260            let merkleized = batch.merkleize(None).await.unwrap();
1261
1262            assert_eq!(merkleized.get(&ka).await.unwrap(), Some(va2));
1263            assert_eq!(merkleized.get(&kb).await.unwrap(), None);
1264            assert_eq!(merkleized.get(&kc).await.unwrap(), Some(vc));
1265            assert_eq!(merkleized.get(&kd).await.unwrap(), None);
1266
1267            db.destroy().await.unwrap();
1268        });
1269    }
1270
1271    /// Child batch reads through: child mutations -> parent diff -> base DB.
1272    #[test_traced("INFO")]
1273    fn test_any_batch_stacked_get() {
1274        let executor = deterministic::Runner::default();
1275        executor.start(|context| async move {
1276            let ctx = context.with_label("db");
1277            let db: UnorderedVariable =
1278                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("sg", &ctx))
1279                    .await
1280                    .unwrap();
1281
1282            let ka = key(0);
1283            let kb = key(1);
1284
1285            // Parent batch writes A.
1286            let mut batch = db.new_batch();
1287            batch.write(ka, Some(val(0)));
1288            let merkleized = batch.merkleize(None).await.unwrap();
1289
1290            // Child reads parent's A.
1291            let mut child = merkleized.new_batch();
1292            assert_eq!(child.get(&ka).await.unwrap(), Some(val(0)));
1293
1294            // Child overwrites A.
1295            child.write(ka, Some(val(100)));
1296            assert_eq!(child.get(&ka).await.unwrap(), Some(val(100)));
1297
1298            // Child writes new key B.
1299            child.write(kb, Some(val(1)));
1300            assert_eq!(child.get(&kb).await.unwrap(), Some(val(1)));
1301
1302            // Child deletes A.
1303            child.write(ka, None);
1304            assert_eq!(child.get(&ka).await.unwrap(), None);
1305
1306            db.destroy().await.unwrap();
1307        });
1308    }
1309
1310    /// Parent deletes a base-DB key, child re-creates it.
1311    #[test_traced("INFO")]
1312    fn test_any_batch_stacked_delete_recreate() {
1313        let executor = deterministic::Runner::default();
1314        executor.start(|context| async move {
1315            let ctx = context.with_label("db");
1316            let mut db: UnorderedVariable =
1317                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("dr", &ctx))
1318                    .await
1319                    .unwrap();
1320
1321            let ka = key(0);
1322
1323            // Pre-populate with key A.
1324            commit_writes(&mut db, [(ka, Some(val(0)))], None).await;
1325
1326            // Parent batch deletes A.
1327            let mut parent = db.new_batch();
1328            parent.write(ka, None);
1329            let parent_m = parent.merkleize(None).await.unwrap();
1330            assert_eq!(parent_m.get(&ka).await.unwrap(), None);
1331
1332            // Child re-creates A with a new value.
1333            let mut child = parent_m.new_batch();
1334            child.write(ka, Some(val(200)));
1335            let child_m = child.merkleize(None).await.unwrap();
1336            assert_eq!(child_m.get(&ka).await.unwrap(), Some(val(200)));
1337
1338            // Apply and verify DB state.
1339            let finalized = child_m.finalize();
1340            db.apply_batch(finalized).await.unwrap();
1341            assert_eq!(db.get(&ka).await.unwrap(), Some(val(200)));
1342
1343            db.destroy().await.unwrap();
1344        });
1345    }
1346
1347    /// Floor raise during merkleize moves active operations to the tip.
1348    /// All keys remain accessible with correct values.
1349    #[test_traced("INFO")]
1350    fn test_any_batch_floor_raise() {
1351        let executor = deterministic::Runner::default();
1352        executor.start(|context| async move {
1353            let ctx = context.with_label("db");
1354            let mut db: UnorderedVariable =
1355                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("fr", &ctx))
1356                    .await
1357                    .unwrap();
1358
1359            // Pre-populate with 100 keys.
1360            let init: Vec<_> = (0..100).map(|i| (key(i), Some(val(i)))).collect();
1361            commit_writes(&mut db, init, None).await;
1362
1363            let floor_before = db.inactivity_floor_loc();
1364
1365            // Update 30 keys.
1366            let updates: Vec<_> = (0..30).map(|i| (key(i), Some(val(i + 500)))).collect();
1367            commit_writes(&mut db, updates, None).await;
1368
1369            // Floor should have advanced.
1370            assert!(db.inactivity_floor_loc() > floor_before);
1371
1372            // All keys should still be accessible with correct values.
1373            for i in 0..30 {
1374                assert_eq!(
1375                    db.get(&key(i)).await.unwrap(),
1376                    Some(val(i + 500)),
1377                    "updated key {i} mismatch"
1378                );
1379            }
1380            for i in 30..100 {
1381                assert_eq!(
1382                    db.get(&key(i)).await.unwrap(),
1383                    Some(val(i)),
1384                    "untouched key {i} mismatch"
1385                );
1386            }
1387
1388            db.destroy().await.unwrap();
1389        });
1390    }
1391
1392    /// apply_batch() returns the correct range of committed locations.
1393    #[test_traced("INFO")]
1394    fn test_any_batch_apply_returns_range() {
1395        let executor = deterministic::Runner::default();
1396        executor.start(|context| async move {
1397            let ctx = context.with_label("db");
1398            let mut db: UnorderedVariable =
1399                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("ar", &ctx))
1400                    .await
1401                    .unwrap();
1402
1403            // First batch: 5 keys.
1404            let writes: Vec<_> = (0..5).map(|i| (key(i), Some(val(i)))).collect();
1405            let range1 = commit_writes(&mut db, writes, None).await;
1406
1407            // Range should start after the initial CommitFloor (location 0).
1408            assert_eq!(range1.start, Location::new(1));
1409            // Range length >= 6 (5 writes + 1 CommitFloor + possible floor raise ops).
1410            assert!(range1.end.saturating_sub(*range1.start) >= 6);
1411
1412            // Second batch: ranges must be contiguous.
1413            let writes: Vec<_> = (5..10).map(|i| (key(i), Some(val(i)))).collect();
1414            let range2 = commit_writes(&mut db, writes, None).await;
1415            assert_eq!(range2.start, range1.end);
1416
1417            db.destroy().await.unwrap();
1418        });
1419    }
1420
1421    /// 3-level chain: parent -> child -> grandchild, finalize grandchild and apply.
1422    #[test_traced("INFO")]
1423    fn test_any_batch_deep_chain() {
1424        let executor = deterministic::Runner::default();
1425        executor.start(|context| async move {
1426            let ctx = context.with_label("db");
1427            let mut db: UnorderedVariable =
1428                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("dc", &ctx))
1429                    .await
1430                    .unwrap();
1431
1432            // Pre-populate with keys 0..5.
1433            let init: Vec<_> = (0..5).map(|i| (key(i), Some(val(i)))).collect();
1434            commit_writes(&mut db, init, None).await;
1435
1436            // Parent: overwrite key 0, add key 5.
1437            let mut parent = db.new_batch();
1438            parent.write(key(0), Some(val(100)));
1439            parent.write(key(5), Some(val(5)));
1440            let parent_m = parent.merkleize(None).await.unwrap();
1441
1442            // Child: overwrite key 1, add key 6.
1443            let mut child = parent_m.new_batch();
1444            child.write(key(1), Some(val(101)));
1445            child.write(key(6), Some(val(6)));
1446            let child_m = child.merkleize(None).await.unwrap();
1447
1448            // Grandchild: delete key 2, add key 7.
1449            let mut grandchild = child_m.new_batch();
1450            grandchild.write(key(2), None);
1451            grandchild.write(key(7), Some(val(7)));
1452            let grandchild_m = grandchild.merkleize(None).await.unwrap();
1453
1454            // Verify reads through the chain.
1455            assert_eq!(grandchild_m.get(&key(0)).await.unwrap(), Some(val(100)));
1456            assert_eq!(grandchild_m.get(&key(1)).await.unwrap(), Some(val(101)));
1457            assert_eq!(grandchild_m.get(&key(2)).await.unwrap(), None);
1458            assert_eq!(grandchild_m.get(&key(7)).await.unwrap(), Some(val(7)));
1459
1460            // Finalize and apply.
1461            let finalized = grandchild_m.finalize();
1462            db.apply_batch(finalized).await.unwrap();
1463
1464            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(100)));
1465            assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(101)));
1466            assert_eq!(db.get(&key(2)).await.unwrap(), None);
1467            assert_eq!(db.get(&key(3)).await.unwrap(), Some(val(3)));
1468            assert_eq!(db.get(&key(4)).await.unwrap(), Some(val(4)));
1469            assert_eq!(db.get(&key(5)).await.unwrap(), Some(val(5)));
1470            assert_eq!(db.get(&key(6)).await.unwrap(), Some(val(6)));
1471            assert_eq!(db.get(&key(7)).await.unwrap(), Some(val(7)));
1472
1473            db.destroy().await.unwrap();
1474        });
1475    }
1476
1477    /// Chained batch produces the same DB state as sequential apply_batch calls.
1478    #[test_traced("INFO")]
1479    fn test_any_batch_chain_matches_sequential() {
1480        let executor = deterministic::Runner::default();
1481        executor.start(|context| async move {
1482            let ctx = context.with_label("db");
1483
1484            // DB A: sequential apply.
1485            let ctx_a = ctx.with_label("a");
1486            let mut db_a: UnorderedVariable = UnorderedVariableDb::init(
1487                ctx_a.clone(),
1488                variable_db_config::<OneCap>("cms-a", &ctx_a),
1489            )
1490            .await
1491            .unwrap();
1492
1493            // DB B: chained batch.
1494            let ctx_b = ctx.with_label("b");
1495            let mut db_b: UnorderedVariable = UnorderedVariableDb::init(
1496                ctx_b.clone(),
1497                variable_db_config::<OneCap>("cms-b", &ctx_b),
1498            )
1499            .await
1500            .unwrap();
1501
1502            // Batch 1 operations: create keys 0..5.
1503            let writes1: Vec<_> = (0..5).map(|i| (key(i), Some(val(i)))).collect();
1504
1505            // Batch 2 operations: update key 0, delete key 1, create key 5.
1506            let writes2 = vec![
1507                (key(0), Some(val(100))),
1508                (key(1), None),
1509                (key(5), Some(val(5))),
1510            ];
1511
1512            // DB A: apply sequentially.
1513            commit_writes(&mut db_a, writes1.clone(), None).await;
1514            commit_writes(&mut db_a, writes2.clone(), None).await;
1515
1516            // DB B: apply as chain.
1517            let mut parent = db_b.new_batch();
1518            for (k, v) in &writes1 {
1519                parent.write(*k, *v);
1520            }
1521            let parent_m = parent.merkleize(None).await.unwrap();
1522
1523            let mut child = parent_m.new_batch();
1524            for (k, v) in &writes2 {
1525                child.write(*k, *v);
1526            }
1527            let child_m = child.merkleize(None).await.unwrap();
1528            let finalized = child_m.finalize();
1529            db_b.apply_batch(finalized).await.unwrap();
1530
1531            // Both DBs must have the same state.
1532            assert_eq!(db_a.root(), db_b.root());
1533            for i in 0..6 {
1534                assert_eq!(
1535                    db_a.get(&key(i)).await.unwrap(),
1536                    db_b.get(&key(i)).await.unwrap(),
1537                    "key {i} mismatch"
1538                );
1539            }
1540
1541            db_a.destroy().await.unwrap();
1542            db_b.destroy().await.unwrap();
1543        });
1544    }
1545
1546    /// Create and delete the same key in a single batch produces no net change for that key.
1547    #[test_traced("INFO")]
1548    fn test_any_batch_create_then_delete_same_batch() {
1549        let executor = deterministic::Runner::default();
1550        executor.start(|context| async move {
1551            let ctx = context.with_label("db");
1552            let mut db: UnorderedVariable =
1553                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("cd", &ctx))
1554                    .await
1555                    .unwrap();
1556
1557            // Pre-populate key A.
1558            commit_writes(&mut db, [(key(0), Some(val(0)))], None).await;
1559
1560            // In one batch: create B then delete B, also create C and delete A.
1561            let mut batch = db.new_batch();
1562            batch.write(key(1), Some(val(1))); // create B
1563            batch.write(key(1), None); // delete B (net: no B)
1564            batch.write(key(2), Some(val(2))); // create C
1565            batch.write(key(0), None); // delete A
1566            let finalized = batch.merkleize(None).await.unwrap().finalize();
1567            db.apply_batch(finalized).await.unwrap();
1568
1569            assert_eq!(db.get(&key(0)).await.unwrap(), None);
1570            assert_eq!(db.get(&key(1)).await.unwrap(), None);
1571            assert_eq!(db.get(&key(2)).await.unwrap(), Some(val(2)));
1572
1573            db.destroy().await.unwrap();
1574        });
1575    }
1576
1577    /// Deleting all keys exercises the total_active_keys == 0 floor-raise fast path.
1578    #[test_traced("INFO")]
1579    fn test_any_batch_delete_all_keys() {
1580        let executor = deterministic::Runner::default();
1581        executor.start(|context| async move {
1582            let ctx = context.with_label("db");
1583            let mut db: UnorderedVariable =
1584                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("da", &ctx))
1585                    .await
1586                    .unwrap();
1587
1588            // Pre-populate 5 keys.
1589            let init: Vec<_> = (0..5).map(|i| (key(i), Some(val(i)))).collect();
1590            commit_writes(&mut db, init, None).await;
1591
1592            // Delete all 5.
1593            let deletes: Vec<_> = (0..5).map(|i| (key(i), None)).collect();
1594            commit_writes(&mut db, deletes, None).await;
1595
1596            for i in 0..5 {
1597                assert_eq!(db.get(&key(i)).await.unwrap(), None, "key {i} not deleted");
1598            }
1599
1600            // DB should still be functional after deleting everything.
1601            commit_writes(&mut db, [(key(10), Some(val(10)))], None).await;
1602            assert_eq!(db.get(&key(10)).await.unwrap(), Some(val(10)));
1603
1604            db.destroy().await.unwrap();
1605        });
1606    }
1607
1608    /// Two independent batches from the same DB do not interfere with each other.
1609    #[test_traced("INFO")]
1610    fn test_any_batch_parallel_forks() {
1611        let executor = deterministic::Runner::default();
1612        executor.start(|context| async move {
1613            let ctx = context.with_label("db");
1614            let mut db: UnorderedVariable =
1615                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("pf", &ctx))
1616                    .await
1617                    .unwrap();
1618
1619            // Pre-populate.
1620            commit_writes(&mut db, [(key(0), Some(val(0)))], None).await;
1621            let root_before = db.root();
1622
1623            // Fork A: update key 0 and create key 1.
1624            let mut fork_a = db.new_batch();
1625            fork_a.write(key(0), Some(val(100)));
1626            fork_a.write(key(1), Some(val(1)));
1627            let fork_a_m = fork_a.merkleize(None).await.unwrap();
1628
1629            // Fork B: delete key 0 and create key 2.
1630            let mut fork_b = db.new_batch();
1631            fork_b.write(key(0), None);
1632            fork_b.write(key(2), Some(val(2)));
1633            let fork_b_m = fork_b.merkleize(None).await.unwrap();
1634
1635            // Different mutations must produce different roots.
1636            assert_ne!(fork_a_m.root(), fork_b_m.root());
1637
1638            // DB is unchanged (neither batch applied).
1639            assert_eq!(db.root(), root_before);
1640            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
1641            assert_eq!(db.get(&key(1)).await.unwrap(), None);
1642
1643            // Apply fork A only.
1644            let finalized = fork_a_m.finalize();
1645            db.apply_batch(finalized).await.unwrap();
1646            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(100)));
1647            assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
1648            assert_eq!(db.get(&key(2)).await.unwrap(), None);
1649
1650            db.destroy().await.unwrap();
1651        });
1652    }
1653
1654    /// Floor raise advances correctly across a chained batch.
1655    #[test_traced("INFO")]
1656    fn test_any_batch_floor_raise_chained() {
1657        let executor = deterministic::Runner::default();
1658        executor.start(|context| async move {
1659            let ctx = context.with_label("db");
1660            let mut db: UnorderedVariable =
1661                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("frc", &ctx))
1662                    .await
1663                    .unwrap();
1664
1665            // Pre-populate with 50 keys.
1666            let init: Vec<_> = (0..50).map(|i| (key(i), Some(val(i)))).collect();
1667            commit_writes(&mut db, init, None).await;
1668            let floor_before = db.inactivity_floor_loc();
1669
1670            // Parent: update keys 0..20.
1671            let mut parent = db.new_batch();
1672            for i in 0..20 {
1673                parent.write(key(i), Some(val(i + 500)));
1674            }
1675            let parent_m = parent.merkleize(None).await.unwrap();
1676
1677            // Child: update keys 20..30.
1678            let mut child = parent_m.new_batch();
1679            for i in 20..30 {
1680                child.write(key(i), Some(val(i + 500)));
1681            }
1682            let child_m = child.merkleize(None).await.unwrap();
1683
1684            let finalized = child_m.finalize();
1685            db.apply_batch(finalized).await.unwrap();
1686
1687            // Floor must have advanced.
1688            assert!(db.inactivity_floor_loc() > floor_before);
1689
1690            // All keys should be accessible.
1691            for i in 0..30 {
1692                assert_eq!(
1693                    db.get(&key(i)).await.unwrap(),
1694                    Some(val(i + 500)),
1695                    "updated key {i} mismatch"
1696                );
1697            }
1698            for i in 30..50 {
1699                assert_eq!(
1700                    db.get(&key(i)).await.unwrap(),
1701                    Some(val(i)),
1702                    "untouched key {i} mismatch"
1703                );
1704            }
1705
1706            db.destroy().await.unwrap();
1707        });
1708    }
1709
1710    /// Dropping a batch without applying it leaves the DB unchanged.
1711    #[test_traced("INFO")]
1712    fn test_any_batch_abandoned() {
1713        let executor = deterministic::Runner::default();
1714        executor.start(|context| async move {
1715            let ctx = context.with_label("db");
1716            let mut db: UnorderedVariable =
1717                UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("ab", &ctx))
1718                    .await
1719                    .unwrap();
1720
1721            commit_writes(&mut db, [(key(0), Some(val(0)))], None).await;
1722            let root_before = db.root();
1723
1724            // Create, populate, merkleize, then drop without apply.
1725            {
1726                let mut batch = db.new_batch();
1727                batch.write(key(0), Some(val(999)));
1728                batch.write(key(1), Some(val(1)));
1729                let _merkleized = batch.merkleize(None).await.unwrap();
1730                // dropped here
1731            }
1732
1733            // DB state is identical.
1734            assert_eq!(db.root(), root_before);
1735            assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
1736            assert_eq!(db.get(&key(1)).await.unwrap(), None);
1737
1738            db.destroy().await.unwrap();
1739        });
1740    }
1741}