Skip to main content

commonware_storage/qmdb/any/unordered/
variable.rs

1//! An authenticated database that provides succinct proofs of _any_ value ever associated
2//! with a key, where values can have varying sizes.
3//!
4//! _If the values you wish to store all have the same size, use [crate::qmdb::any::unordered::fixed]
5//! instead for better performance._
6
7use crate::{
8    index::unordered::Index,
9    journal::contiguous::variable::Journal,
10    merkle::{self, Location},
11    qmdb::{
12        any::{unordered, value::VariableEncoding, VariableConfig, VariableValue},
13        operation::Key,
14        Error,
15    },
16    translator::Translator,
17    Context,
18};
19use commonware_codec::{Codec, Read};
20use commonware_cryptography::Hasher;
21
22pub type Update<K, V> = unordered::Update<K, VariableEncoding<V>>;
23pub type Operation<F, K, V> = unordered::Operation<F, K, VariableEncoding<V>>;
24
25/// A key-value QMDB based on an authenticated log of operations, supporting authentication of any
26/// value ever associated with a key.
27pub type Db<F, E, K, V, H, T> =
28    super::Db<F, E, Journal<E, Operation<F, K, V>>, Index<T, Location<F>>, H, Update<K, V>>;
29
30impl<F: merkle::Family, E: Context, K: Key, V: VariableValue, H: Hasher, T: Translator>
31    Db<F, E, K, V, H, T>
32where
33    Operation<F, K, V>: Codec,
34{
35    /// Returns a [Db] QMDB initialized from `cfg`. Uncommitted log operations will be
36    /// discarded and the state of the db will be as of the last committed operation.
37    pub async fn init(
38        context: E,
39        cfg: VariableConfig<T, <Operation<F, K, V> as Read>::Cfg>,
40    ) -> Result<Self, Error<F>> {
41        Self::init_with_callback(context, cfg, None, |_, _| {}).await
42    }
43
44    /// Initialize the DB, invoking `callback` for each operation processed during recovery.
45    ///
46    /// If `known_inactivity_floor` is provided and is less than the log's actual inactivity floor,
47    /// `callback` is invoked with `(false, None)` for each location in the gap. Then, as the
48    /// snapshot is built from the log, `callback` is invoked for each operation with its activity
49    /// status and previous location (if any).
50    pub(crate) async fn init_with_callback(
51        context: E,
52        cfg: VariableConfig<T, <Operation<F, K, V> as Read>::Cfg>,
53        known_inactivity_floor: Option<Location<F>>,
54        callback: impl FnMut(bool, Option<Location<F>>),
55    ) -> Result<Self, Error<F>> {
56        crate::qmdb::any::init(context, cfg, known_inactivity_floor, callback).await
57    }
58}
59
60/// Partitioned index variants that divide the key space into `2^(P*8)` partitions.
61///
62/// See [partitioned::Db] for the generic type, or use the convenience aliases:
63/// - [partitioned::p256::Db] for 256 partitions (P=1)
64/// - [partitioned::p64k::Db] for 65,536 partitions (P=2)
65pub mod partitioned {
66    pub use super::{Operation, Update};
67    use crate::{
68        index::partitioned::unordered::Index,
69        journal::contiguous::variable::Journal,
70        merkle::{self, Location},
71        qmdb::{
72            any::{VariableConfig, VariableValue},
73            operation::Key,
74            Error,
75        },
76        translator::Translator,
77        Context,
78    };
79    use commonware_codec::{Codec, Read};
80    use commonware_cryptography::Hasher;
81
82    /// A key-value QMDB with a partitioned snapshot index and variable-size values.
83    ///
84    /// This is the partitioned variant of [super::Db]. The const generic `P` specifies
85    /// the number of prefix bytes used for partitioning:
86    /// - `P = 1`: 256 partitions
87    /// - `P = 2`: 65,536 partitions
88    ///
89    /// Use partitioned indices when you have a large number of keys (>> 2^(P*8)) and memory
90    /// efficiency is important. Keys should be uniformly distributed across the prefix space.
91    pub type Db<F, E, K, V, H, T, const P: usize> = crate::qmdb::any::unordered::Db<
92        F,
93        E,
94        Journal<E, Operation<F, K, V>>,
95        Index<T, Location<F>, P>,
96        H,
97        Update<K, V>,
98    >;
99
100    impl<
101            F: merkle::Family,
102            E: Context,
103            K: Key,
104            V: VariableValue,
105            H: Hasher,
106            T: Translator,
107            const P: usize,
108        > Db<F, E, K, V, H, T, P>
109    where
110        Operation<F, K, V>: Codec,
111    {
112        /// Returns a [Db] QMDB initialized from `cfg`. Uncommitted log operations will be
113        /// discarded and the state of the db will be as of the last committed operation.
114        pub async fn init(
115            context: E,
116            cfg: VariableConfig<T, <Operation<F, K, V> as Read>::Cfg>,
117        ) -> Result<Self, Error<F>> {
118            Self::init_with_callback(context, cfg, None, |_, _| {}).await
119        }
120
121        /// Initialize the DB, invoking `callback` for each operation processed during recovery.
122        ///
123        /// If `known_inactivity_floor` is provided and is less than the log's actual inactivity floor,
124        /// `callback` is invoked with `(false, None)` for each location in the gap. Then, as the
125        /// snapshot is built from the log, `callback` is invoked for each operation with its activity
126        /// status and previous location (if any).
127        pub(crate) async fn init_with_callback(
128            context: E,
129            cfg: VariableConfig<T, <Operation<F, K, V> as Read>::Cfg>,
130            known_inactivity_floor: Option<Location<F>>,
131            callback: impl FnMut(bool, Option<Location<F>>),
132        ) -> Result<Self, Error<F>> {
133            crate::qmdb::any::init(context, cfg, known_inactivity_floor, callback).await
134        }
135    }
136
137    /// Convenience type aliases for 256 partitions (P=1).
138    pub mod p256 {
139        /// Variable-value DB with 256 partitions.
140        pub type Db<F, E, K, V, H, T> = super::Db<F, E, K, V, H, T, 1>;
141    }
142
143    /// Convenience type aliases for 65,536 partitions (P=2).
144    pub mod p64k {
145        /// Variable-value DB with 65,536 partitions.
146        pub type Db<F, E, K, V, H, T> = super::Db<F, E, K, V, H, T, 2>;
147    }
148}
149
150#[cfg(test)]
151pub(crate) mod test {
152    use super::*;
153    use crate::{index::Unordered as _, mmr, translator::TwoCap};
154    use commonware_cryptography::{sha256::Digest, Hasher, Sha256};
155    use commonware_macros::test_traced;
156    use commonware_math::algebra::Random;
157    use commonware_runtime::{
158        buffer::paged::CacheRef,
159        deterministic::{self, Context},
160        BufferPooler, Metrics, Runner as _,
161    };
162    use commonware_utils::{test_rng_seeded, NZUsize, NZU16, NZU64};
163    use rand::RngCore;
164    use std::{
165        num::{NonZeroU16, NonZeroUsize},
166        sync::Arc,
167    };
168
169    const PAGE_SIZE: NonZeroU16 = NZU16!(77);
170    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(9);
171
172    pub(crate) fn create_test_config(seed: u64, pooler: &impl BufferPooler) -> VarConfig {
173        let page_cache = CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE);
174        VariableConfig {
175            merkle_config: crate::mmr::journaled::Config {
176                journal_partition: format!("journal-{seed}"),
177                metadata_partition: format!("metadata-{seed}"),
178                items_per_blob: NZU64!(13),
179                write_buffer: NZUsize!(1024),
180                thread_pool: None,
181                page_cache: page_cache.clone(),
182            },
183            journal_config: crate::journal::contiguous::variable::Config {
184                partition: format!("log-journal-{seed}"),
185                items_per_section: NZU64!(7),
186                write_buffer: NZUsize!(1024),
187                compression: None,
188                codec_config: ((), ((0..=10000).into(), ())),
189                page_cache,
190            },
191            translator: TwoCap,
192        }
193    }
194
195    pub(crate) type VarConfig =
196        VariableConfig<TwoCap, ((), (commonware_codec::RangeCfg<usize>, ()))>;
197
198    /// A type alias for the concrete [Db] type used in these unit tests.
199    pub(crate) type AnyTest =
200        Db<mmr::Family, deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap>;
201
202    /// Create a test database with unique partition names
203    pub(crate) async fn create_test_db(mut context: Context) -> AnyTest {
204        let seed = context.next_u64();
205        let config = create_test_config(seed, &context);
206        AnyTest::init(context, config).await.unwrap()
207    }
208
209    /// Deterministic byte vector generator for variable-value tests.
210    fn to_bytes(i: u64) -> Vec<u8> {
211        let len = ((i % 13) + 7) as usize;
212        vec![(i % 255) as u8; len]
213    }
214
215    /// Create n random operations using the default seed (0). Some portion of
216    /// the updates are deletes. create_test_ops(n) is a prefix of
217    /// create_test_ops(n') for n < n'.
218    pub(crate) fn create_test_ops(
219        n: usize,
220    ) -> Vec<unordered::Operation<mmr::Family, Digest, VariableEncoding<Vec<u8>>>> {
221        create_test_ops_seeded(n, 0)
222    }
223
224    /// Create n random operations using a specific seed. Use different seeds
225    /// when you need non-overlapping keys in the same test.
226    pub(crate) fn create_test_ops_seeded(
227        n: usize,
228        seed: u64,
229    ) -> Vec<unordered::Operation<mmr::Family, Digest, VariableEncoding<Vec<u8>>>> {
230        let mut rng = test_rng_seeded(seed);
231        let mut prev_key = Digest::random(&mut rng);
232        let mut ops = Vec::new();
233        for i in 0..n {
234            let key = Digest::random(&mut rng);
235            if i % 10 == 0 && i > 0 {
236                ops.push(unordered::Operation::Delete(prev_key));
237            } else {
238                let value = to_bytes(rng.next_u64());
239                ops.push(unordered::Operation::Update(unordered::Update(key, value)));
240                prev_key = key;
241            }
242        }
243        ops
244    }
245
246    /// Applies the given operations to the database.
247    pub(crate) async fn apply_ops(
248        db: &mut AnyTest,
249        ops: Vec<unordered::Operation<mmr::Family, Digest, VariableEncoding<Vec<u8>>>>,
250    ) {
251        let mut batch = db.new_batch();
252        for op in ops {
253            match op {
254                unordered::Operation::Update(unordered::Update(key, value)) => {
255                    batch = batch.write(key, Some(value));
256                }
257                unordered::Operation::Delete(key) => {
258                    batch = batch.write(key, None);
259                }
260                unordered::Operation::CommitFloor(_, _) => {
261                    panic!("CommitFloor not supported in apply_ops");
262                }
263            }
264        }
265        let merkleized = batch.merkleize(db, None).await.unwrap();
266        db.apply_batch(merkleized).await.unwrap();
267    }
268
269    /// Return an `Any` database initialized with a fixed config.
270    async fn open_db(context: deterministic::Context) -> AnyTest {
271        let cfg = create_test_config(0, &context);
272        AnyTest::init(context, cfg).await.unwrap()
273    }
274
275    #[test_traced("WARN")]
276    pub fn test_any_variable_db_build_and_authenticate() {
277        let executor = deterministic::Runner::default();
278        executor.start(|context| async move {
279            let db = open_db(context.clone()).await;
280            crate::qmdb::any::test::test_any_db_build_and_authenticate(
281                context,
282                db,
283                |ctx| Box::pin(open_db(ctx)),
284                to_bytes,
285            )
286            .await;
287        });
288    }
289
290    #[test_traced("WARN")]
291    pub fn test_any_variable_db_recovery() {
292        let executor = deterministic::Runner::default();
293        // Build a db with 1000 keys, some of which we update and some of which we delete.
294        const ELEMENTS: u64 = 1000;
295        executor.start(|context| async move {
296            let db = open_db(context.with_label("open1")).await;
297            let root = db.root();
298
299            // Build a batch but don't apply it (simulate failure before commit).
300            {
301                let mut batch = db.new_batch();
302                for i in 0..ELEMENTS {
303                    batch = batch.write(
304                        Sha256::hash(&i.to_be_bytes()),
305                        Some(vec![(i % 255) as u8; ((i % 13) + 7) as usize]),
306                    );
307                }
308                let _ = batch.merkleize(&db, None).await.unwrap();
309            }
310
311            // Simulate a failure and test that we rollback to the previous root.
312            drop(db);
313            let mut db = open_db(context.with_label("open2")).await;
314            assert_eq!(root, db.root());
315
316            // Re-apply the updates and commit them this time.
317            let mut batch = db.new_batch();
318            for i in 0u64..ELEMENTS {
319                let k = Sha256::hash(&i.to_be_bytes());
320                let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
321                batch = batch.write(k, Some(v));
322            }
323            let merkleized = batch.merkleize(&db, None).await.unwrap();
324            db.apply_batch(merkleized).await.unwrap();
325            db.commit().await.unwrap();
326            let root = db.root();
327
328            // Update every 3rd key but don't apply (simulate failure).
329            {
330                let mut batch = db.new_batch();
331                for i in 0u64..ELEMENTS {
332                    if i % 3 != 0 {
333                        continue;
334                    }
335                    let k = Sha256::hash(&i.to_be_bytes());
336                    let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
337                    batch = batch.write(k, Some(v));
338                }
339                let _ = batch.merkleize(&db, None).await.unwrap();
340            }
341
342            // Simulate a failure and test that we rollback to the previous root.
343            drop(db);
344            let mut db = open_db(context.with_label("open3")).await;
345            assert_eq!(root, db.root());
346
347            // Re-apply updates for every 3rd key and commit them this time.
348            let mut batch = db.new_batch();
349            for i in 0u64..ELEMENTS {
350                if i % 3 != 0 {
351                    continue;
352                }
353                let k = Sha256::hash(&i.to_be_bytes());
354                let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
355                batch = batch.write(k, Some(v));
356            }
357            let merkleized = batch.merkleize(&db, None).await.unwrap();
358            db.apply_batch(merkleized).await.unwrap();
359            db.commit().await.unwrap();
360            let root = db.root();
361
362            // Delete every 7th key but don't apply (simulate failure).
363            {
364                let mut batch = db.new_batch();
365                for i in 0u64..ELEMENTS {
366                    if i % 7 != 1 {
367                        continue;
368                    }
369                    let k = Sha256::hash(&i.to_be_bytes());
370                    batch = batch.write(k, None);
371                }
372                let _ = batch.merkleize(&db, None).await.unwrap();
373            }
374
375            // Simulate a failure and test that we rollback to the previous root.
376            drop(db);
377            let mut db = open_db(context.with_label("open4")).await;
378            assert_eq!(root, db.root());
379
380            // Re-delete every 7th key and commit this time.
381            let mut batch = db.new_batch();
382            for i in 0u64..ELEMENTS {
383                if i % 7 != 1 {
384                    continue;
385                }
386                let k = Sha256::hash(&i.to_be_bytes());
387                batch = batch.write(k, None);
388            }
389            let merkleized = batch.merkleize(&db, None).await.unwrap();
390            db.apply_batch(merkleized).await.unwrap();
391            db.commit().await.unwrap();
392
393            let root = db.root();
394            let inactivity_floor = db.inactivity_floor_loc();
395            db.sync().await.unwrap(); // test pruning boundary after sync w/ prune
396            db.prune(inactivity_floor).await.unwrap();
397            let bounds = db.bounds().await;
398            let snapshot_items = db.snapshot.items();
399
400            db.sync().await.unwrap();
401            drop(db);
402
403            // Confirm state is preserved after reopen.
404            let db = open_db(context.with_label("open5")).await;
405            assert_eq!(root, db.root());
406            assert_eq!(db.bounds().await, bounds);
407            assert_eq!(db.inactivity_floor_loc(), inactivity_floor);
408            assert_eq!(db.snapshot.items(), snapshot_items);
409
410            db.destroy().await.unwrap();
411        });
412    }
413
414    #[test_traced]
415    fn test_any_variable_db_prune_beyond_inactivity_floor() {
416        let executor = deterministic::Runner::default();
417        executor.start(|mut context| async move {
418            let mut db = open_db(context.clone()).await;
419
420            // Add some operations
421            let key1 = Digest::random(&mut context);
422            let key2 = Digest::random(&mut context);
423            let key3 = Digest::random(&mut context);
424
425            let merkleized = db
426                .new_batch()
427                .write(key1, Some(vec![10]))
428                .write(key2, Some(vec![20]))
429                .write(key3, Some(vec![30]))
430                .merkleize(&db, None)
431                .await
432                .unwrap();
433            db.apply_batch(merkleized).await.unwrap();
434
435            // inactivity_floor should be at some location < op_count
436            let inactivity_floor = db.inactivity_floor_loc();
437            let beyond_floor = Location::new(*inactivity_floor + 1);
438
439            // Try to prune beyond the inactivity floor
440            let result = db.prune(beyond_floor).await;
441            assert!(
442                matches!(result, Err(Error::PruneBeyondMinRequired(loc, floor))
443                    if loc == beyond_floor && floor == inactivity_floor)
444            );
445
446            db.destroy().await.unwrap();
447        });
448    }
449
450    #[test_traced]
451    fn test_stale_batch_rejected() {
452        let executor = deterministic::Runner::default();
453        executor.start(|context| async move {
454            let mut db = open_db(context.clone()).await;
455
456            let key1 = Sha256::hash(&[1]);
457            let key2 = Sha256::hash(&[2]);
458
459            // Create two batches from the same DB state.
460            let batch_a = db
461                .new_batch()
462                .write(key1, Some(vec![10]))
463                .merkleize(&db, None)
464                .await
465                .unwrap();
466            let batch_b = db
467                .new_batch()
468                .write(key2, Some(vec![20]))
469                .merkleize(&db, None)
470                .await
471                .unwrap();
472
473            // Apply the first -- should succeed.
474            db.apply_batch(batch_a).await.unwrap();
475            let expected_root = db.root();
476            let expected_bounds = db.bounds().await;
477            assert_eq!(db.get(&key1).await.unwrap(), Some(vec![10]));
478            assert_eq!(db.get(&key2).await.unwrap(), None);
479
480            // Apply the second -- should fail because the DB was modified.
481            let result = db.apply_batch(batch_b).await;
482            assert!(
483                matches!(result, Err(Error::StaleBatch { .. })),
484                "expected StaleBatch error, got {result:?}"
485            );
486            assert_eq!(db.root(), expected_root);
487            assert_eq!(db.bounds().await, expected_bounds);
488            assert_eq!(db.get(&key1).await.unwrap(), Some(vec![10]));
489            assert_eq!(db.get(&key2).await.unwrap(), None);
490
491            db.destroy().await.unwrap();
492        });
493    }
494
495    /// Sibling batches with different operation counts are still detected
496    /// as stale.
497    #[test_traced]
498    fn test_stale_batch_rejected_different_sizes() {
499        let executor = deterministic::Runner::default();
500        executor.start(|context| async move {
501            let mut db = open_db(context.clone()).await;
502
503            // A writes 1 key, B writes 5 keys -- different total_size.
504            let batch_a = db
505                .new_batch()
506                .write(Sha256::hash(&[1]), Some(vec![10]))
507                .merkleize(&db, None)
508                .await
509                .unwrap();
510            let batch_b = db
511                .new_batch()
512                .write(Sha256::hash(&[2]), Some(vec![20]))
513                .write(Sha256::hash(&[3]), Some(vec![30]))
514                .write(Sha256::hash(&[4]), Some(vec![40]))
515                .write(Sha256::hash(&[5]), Some(vec![50]))
516                .write(Sha256::hash(&[6]), Some(vec![60]))
517                .merkleize(&db, None)
518                .await
519                .unwrap();
520
521            // B has more ops than A.
522            assert!(batch_b.total_size > batch_a.total_size);
523
524            // Apply A, then B must be stale.
525            db.apply_batch(batch_a).await.unwrap();
526            let result = db.apply_batch(batch_b).await;
527            assert!(
528                matches!(result, Err(Error::StaleBatch { .. })),
529                "expected StaleBatch for asymmetric sibling, got {result:?}"
530            );
531
532            db.destroy().await.unwrap();
533        });
534    }
535
536    /// Applying C (grandchild of A) after only A is committed must
537    /// apply B's data + C's data. Uncommitted ancestor B's snapshot
538    /// entries are applied via ancestor_diffs with committed_locs override.
539    #[test_traced]
540    fn test_partial_ancestor_commit() {
541        let executor = deterministic::Runner::default();
542        executor.start(|context| async move {
543            let mut db = open_db(context.clone()).await;
544
545            let key1 = Sha256::hash(&[1]);
546            let key2 = Sha256::hash(&[2]);
547            let key3 = Sha256::hash(&[3]);
548
549            // Chain: DB <- A <- B <- C
550            let a = db
551                .new_batch()
552                .write(key1, Some(vec![10]))
553                .merkleize(&db, None)
554                .await
555                .unwrap();
556            let b = a
557                .new_batch::<Sha256>()
558                .write(key2, Some(vec![20]))
559                .merkleize(&db, None)
560                .await
561                .unwrap();
562            let c = b
563                .new_batch::<Sha256>()
564                .write(key3, Some(vec![30]))
565                .merkleize(&db, None)
566                .await
567                .unwrap();
568
569            let expected_root = c.root();
570
571            // Apply only A, then apply C directly (B uncommitted).
572            db.apply_batch(a).await.unwrap();
573            db.apply_batch(c).await.unwrap();
574
575            assert_eq!(db.root(), expected_root);
576            assert_eq!(db.get(&key1).await.unwrap(), Some(vec![10]));
577            assert_eq!(db.get(&key2).await.unwrap(), Some(vec![20]));
578            assert_eq!(db.get(&key3).await.unwrap(), Some(vec![30]));
579
580            db.destroy().await.unwrap();
581        });
582    }
583
584    #[test_traced]
585    fn test_stale_batch_chained() {
586        let executor = deterministic::Runner::default();
587        executor.start(|context| async move {
588            let mut db = open_db(context.clone()).await;
589
590            let key1 = Sha256::hash(&[1]);
591            let key2 = Sha256::hash(&[2]);
592            let key3 = Sha256::hash(&[3]);
593
594            // Commit initial state.
595            let merkleized = db
596                .new_batch()
597                .write(key1, Some(vec![10]))
598                .merkleize(&db, None)
599                .await
600                .unwrap();
601            db.apply_batch(merkleized).await.unwrap();
602
603            // Create a parent batch, then fork two children.
604            let parent = db
605                .new_batch()
606                .write(key2, Some(vec![20]))
607                .merkleize(&db, None)
608                .await
609                .unwrap();
610
611            let child_a = parent
612                .new_batch::<Sha256>()
613                .write(key3, Some(vec![30]))
614                .merkleize(&db, None)
615                .await
616                .unwrap();
617            let child_b = parent
618                .new_batch::<Sha256>()
619                .write(key3, Some(vec![40]))
620                .merkleize(&db, None)
621                .await
622                .unwrap();
623
624            // Apply child_a, then child_b should be stale.
625            db.apply_batch(child_a).await.unwrap();
626            let result = db.apply_batch(child_b).await;
627            assert!(
628                matches!(result, Err(Error::StaleBatch { .. })),
629                "expected StaleBatch error for sibling, got {result:?}"
630            );
631
632            db.destroy().await.unwrap();
633        });
634    }
635
636    /// Apply parent then child -- this is the sequential commit pattern
637    /// and must succeed. `apply_batch` detects that the child's ancestors
638    /// were committed and applies only the child's own operations.
639    #[test_traced]
640    fn test_sequential_commit_parent_then_child() {
641        let executor = deterministic::Runner::default();
642        executor.start(|context| async move {
643            let mut db = open_db(context.clone()).await;
644
645            let key1 = Sha256::hash(&[1]);
646            let key2 = Sha256::hash(&[2]);
647
648            // Create parent, then child.
649            let parent = db
650                .new_batch()
651                .write(key1, Some(vec![10]))
652                .merkleize(&db, None)
653                .await
654                .unwrap();
655            let child = parent
656                .new_batch::<Sha256>()
657                .write(key2, Some(vec![20]))
658                .merkleize(&db, None)
659                .await
660                .unwrap();
661
662            // Apply parent first, then child -- sequential commit.
663            db.apply_batch(parent).await.unwrap();
664            db.apply_batch(child).await.unwrap();
665
666            assert_eq!(db.get(&key1).await.unwrap(), Some(vec![10]));
667            assert_eq!(db.get(&key2).await.unwrap(), Some(vec![20]));
668
669            db.destroy().await.unwrap();
670        });
671    }
672
673    #[test_traced]
674    fn test_stale_batch_child_applied_before_parent() {
675        let executor = deterministic::Runner::default();
676        executor.start(|context| async move {
677            let mut db = open_db(context.clone()).await;
678
679            let key1 = Sha256::hash(&[1]);
680            let key2 = Sha256::hash(&[2]);
681
682            // Create parent, then child.
683            let parent = db
684                .new_batch()
685                .write(key1, Some(vec![10]))
686                .merkleize(&db, None)
687                .await
688                .unwrap();
689            let child = parent
690                .new_batch::<Sha256>()
691                .write(key2, Some(vec![20]))
692                .merkleize(&db, None)
693                .await
694                .unwrap();
695
696            // Apply child first -- parent should now be stale.
697            db.apply_batch(child).await.unwrap();
698            let result = db.apply_batch(parent).await;
699            assert!(
700                matches!(result, Err(Error::StaleBatch { .. })),
701                "expected StaleBatch for parent after child applied, got {result:?}"
702            );
703
704            db.destroy().await.unwrap();
705        });
706    }
707
708    // FromSyncTestable implementation for from_sync_result tests
709    mod from_sync_testable {
710        use super::*;
711        use crate::{
712            merkle::{
713                mmr::{self, journaled::Mmr},
714                Family as _,
715            },
716            qmdb::any::sync::tests::FromSyncTestable,
717        };
718        use futures::future::join_all;
719
720        type TestMmr = Mmr<deterministic::Context, Digest>;
721
722        impl FromSyncTestable for AnyTest {
723            type Mmr = TestMmr;
724
725            fn into_log_components(self) -> (Self::Mmr, Self::Journal) {
726                (self.log.merkle, self.log.journal)
727            }
728
729            async fn pinned_nodes_at(&self, loc: mmr::Location) -> Vec<Digest> {
730                join_all(mmr::Family::nodes_to_pin(loc).map(|p| self.log.merkle.get_node(p)))
731                    .await
732                    .into_iter()
733                    .map(|n| n.unwrap().unwrap())
734                    .collect()
735            }
736        }
737    }
738
739    /// Regression test for https://github.com/commonwarexyz/monorepo/issues/2787
740    #[allow(dead_code, clippy::manual_async_fn)]
741    fn issue_2787_regression(
742        db: &crate::qmdb::immutable::variable::Db<
743            mmr::Family,
744            deterministic::Context,
745            Digest,
746            Vec<u8>,
747            Sha256,
748            TwoCap,
749        >,
750        key: Digest,
751    ) -> impl std::future::Future<Output = ()> + Send + use<'_> {
752        async move {
753            let _ = db.get(&key).await;
754        }
755    }
756
757    fn is_send<T: Send>(_: T) {}
758
759    #[allow(dead_code)]
760    fn assert_non_trait_futures_are_send(db: &AnyTest, key: Digest, value: Vec<u8>) {
761        let batch = db.new_batch().write(key, Some(value));
762        is_send(batch.merkleize(db, None));
763        is_send(db.get_with_loc(&key));
764    }
765
766    /// Owned batch root matches the borrow-based batch root.
767    #[test_traced("WARN")]
768    fn test_owned_batch_root_matches() {
769        let executor = deterministic::Runner::default();
770        executor.start(|context| async move {
771            let mut db = create_test_db(context).await;
772
773            // Apply some initial data.
774            apply_ops(&mut db, create_test_ops(20)).await;
775            db.commit().await.unwrap();
776
777            // Build an owned batch from committed state.
778            let base = db.to_batch();
779
780            // Create a child batch via owned and via borrow-based API. Same ops.
781            let ops = create_test_ops_seeded(10, 99);
782
783            // Borrow-based path.
784            let mut batch = db.new_batch();
785            for op in &ops {
786                match op {
787                    unordered::Operation::Update(unordered::Update(k, v)) => {
788                        batch = batch.write(*k, Some(v.clone()));
789                    }
790                    unordered::Operation::Delete(k) => {
791                        batch = batch.write(*k, None);
792                    }
793                    _ => unreachable!(),
794                }
795            }
796            let borrow_root = batch.merkleize(&db, None).await.unwrap().root();
797
798            // Owned batch path.
799            let mut batch = base.new_batch::<Sha256>();
800            for op in &ops {
801                match op {
802                    unordered::Operation::Update(unordered::Update(k, v)) => {
803                        batch = batch.write(*k, Some(v.clone()));
804                    }
805                    unordered::Operation::Delete(k) => {
806                        batch = batch.write(*k, None);
807                    }
808                    _ => unreachable!(),
809                }
810            }
811            let batch_root = batch.merkleize(&db, None).await.unwrap().root();
812
813            assert_eq!(borrow_root, batch_root);
814
815            db.destroy().await.unwrap();
816        });
817    }
818
819    /// Owned batch can be merkleized and applied to the database.
820    #[test_traced("WARN")]
821    fn test_owned_batch_apply() {
822        let executor = deterministic::Runner::default();
823        executor.start(|context| async move {
824            let mut db = create_test_db(context).await;
825
826            // Apply initial data.
827            apply_ops(&mut db, create_test_ops(20)).await;
828            db.commit().await.unwrap();
829
830            let base = db.to_batch();
831
832            // Build a child batch via owned API, merkleize, and apply.
833            let key = Digest::random(&mut commonware_utils::test_rng_seeded(200));
834            let value = vec![42u8; 16];
835            let child_batch = base
836                .new_batch::<Sha256>()
837                .write(key, Some(value.clone()))
838                .merkleize(&db, None)
839                .await
840                .unwrap();
841
842            // Apply the batch.
843            db.apply_batch(child_batch).await.unwrap();
844            db.commit().await.unwrap();
845
846            // Verify the key was written.
847            let fetched = db.get(&key).await.unwrap();
848            assert_eq!(fetched.unwrap(), value);
849
850            db.destroy().await.unwrap();
851        });
852    }
853
854    /// Batch chains: parent batch committed, child applied sequentially.
855    #[test_traced("WARN")]
856    fn test_owned_batch_chain_commit_parent_first() {
857        let executor = deterministic::Runner::default();
858        executor.start(|context| async move {
859            let mut db = create_test_db(context).await;
860
861            // Build initial data.
862            apply_ops(&mut db, create_test_ops(10)).await;
863            db.commit().await.unwrap();
864
865            let base = db.to_batch();
866
867            // Parent batch (via owned API).
868            let key_a = Digest::random(&mut commonware_utils::test_rng_seeded(300));
869            let val_a = vec![1u8; 10];
870            let parent_batch = base
871                .new_batch::<Sha256>()
872                .write(key_a, Some(val_a.clone()))
873                .merkleize(&db, None)
874                .await
875                .unwrap();
876
877            // Child batch (built on parent batch).
878            let key_b = Digest::random(&mut commonware_utils::test_rng_seeded(301));
879            let val_b = vec![2u8; 10];
880            let child_batch = parent_batch
881                .new_batch::<Sha256>()
882                .write(key_b, Some(val_b.clone()))
883                .merkleize(&db, None)
884                .await
885                .unwrap();
886
887            db.apply_batch(parent_batch).await.unwrap();
888            db.commit().await.unwrap();
889
890            // Commit child.
891            db.apply_batch(child_batch).await.unwrap();
892            db.commit().await.unwrap();
893
894            // Both keys should be readable.
895            assert_eq!(db.get(&key_a).await.unwrap().unwrap(), val_a);
896            assert_eq!(db.get(&key_b).await.unwrap().unwrap(), val_b);
897
898            db.destroy().await.unwrap();
899        });
900    }
901
902    /// Multiple forks from the same batch.
903    #[test_traced("WARN")]
904    fn test_owned_batch_multiple_forks() {
905        let executor = deterministic::Runner::default();
906        executor.start(|context| async move {
907            let mut db = create_test_db(context).await;
908
909            apply_ops(&mut db, create_test_ops(10)).await;
910            db.commit().await.unwrap();
911
912            let base = db.to_batch();
913
914            // Fork A.
915            let key_a = Digest::random(&mut commonware_utils::test_rng_seeded(400));
916            let fork_a = base
917                .new_batch::<Sha256>()
918                .write(key_a, Some(vec![10u8; 8]))
919                .merkleize(&db, None)
920                .await
921                .unwrap();
922
923            // Fork B (different key, same parent).
924            let key_b = Digest::random(&mut commonware_utils::test_rng_seeded(401));
925            let fork_b = base
926                .new_batch::<Sha256>()
927                .write(key_b, Some(vec![20u8; 8]))
928                .merkleize(&db, None)
929                .await
930                .unwrap();
931
932            // Roots differ.
933            assert_ne!(fork_a.root(), fork_b.root());
934
935            // Apply fork A.
936            db.apply_batch(fork_a).await.unwrap();
937            db.commit().await.unwrap();
938
939            assert_eq!(db.get(&key_a).await.unwrap().unwrap(), vec![10u8; 8]);
940            assert!(db.get(&key_b).await.unwrap().is_none());
941
942            db.destroy().await.unwrap();
943        });
944    }
945
946    /// Batches can be stored in a homogeneous collection.
947    #[test_traced("WARN")]
948    fn test_owned_batch_homogeneous_collection() {
949        use crate::qmdb::any::batch::MerkleizedBatch;
950        use commonware_cryptography::sha256;
951        use std::collections::HashMap;
952
953        type Snap = MerkleizedBatch<mmr::Family, sha256::Digest, super::Update<Digest, Vec<u8>>>;
954
955        let executor = deterministic::Runner::default();
956        executor.start(|context| async move {
957            let mut db = create_test_db(context).await;
958
959            apply_ops(&mut db, create_test_ops(10)).await;
960            db.commit().await.unwrap();
961
962            let base = db.to_batch();
963
964            // Build several batches at different depths and store them by root.
965            let mut collection: HashMap<sha256::Digest, Arc<Snap>> = HashMap::new();
966
967            // Depth 1.
968            let key = Digest::random(&mut commonware_utils::test_rng_seeded(500));
969            let batch1 = base
970                .new_batch::<Sha256>()
971                .write(key, Some(vec![1u8; 8]))
972                .merkleize(&db, None)
973                .await
974                .unwrap();
975            collection.insert(batch1.root(), batch1);
976
977            // Depth 2 (retrieve batch1 from collection, build child).
978            let batch1_root = *collection.keys().next().unwrap();
979            let batch1_ref = collection.get(&batch1_root).unwrap();
980            let key = Digest::random(&mut commonware_utils::test_rng_seeded(501));
981            let batch2 = batch1_ref
982                .new_batch::<Sha256>()
983                .write(key, Some(vec![2u8; 8]))
984                .merkleize(&db, None)
985                .await
986                .unwrap();
987            collection.insert(batch2.root(), batch2);
988
989            // All batches in the same HashMap -- type erasure works.
990            assert_eq!(collection.len(), 2);
991
992            db.destroy().await.unwrap();
993        });
994    }
995
996    /// Batch chains: parent inserts key, child deletes it.
997    #[test_traced("WARN")]
998    fn test_owned_batch_chain_delete_after_ancestor_insert() {
999        let executor = deterministic::Runner::default();
1000        executor.start(|context| async move {
1001            let mut db = create_test_db(context).await;
1002
1003            apply_ops(&mut db, create_test_ops(5)).await;
1004            db.commit().await.unwrap();
1005
1006            let base = db.to_batch();
1007
1008            // Parent batch: insert key_x.
1009            let key_x = Digest::random(&mut commonware_utils::test_rng_seeded(700));
1010            let val_a = vec![10u8; 8];
1011            let parent_batch = base
1012                .new_batch::<Sha256>()
1013                .write(key_x, Some(val_a.clone()))
1014                .merkleize(&db, None)
1015                .await
1016                .unwrap();
1017
1018            // Child batch: delete key_x.
1019            let child_batch = parent_batch
1020                .new_batch::<Sha256>()
1021                .write(key_x, None)
1022                .merkleize(&db, None)
1023                .await
1024                .unwrap();
1025
1026            db.apply_batch(parent_batch).await.unwrap();
1027            db.commit().await.unwrap();
1028            assert_eq!(db.get(&key_x).await.unwrap().unwrap(), val_a);
1029
1030            // Commit child.
1031            db.apply_batch(child_batch).await.unwrap();
1032            db.commit().await.unwrap();
1033
1034            // key_x should be deleted.
1035            assert!(db.get(&key_x).await.unwrap().is_none());
1036
1037            db.destroy().await.unwrap();
1038        });
1039    }
1040
1041    /// Batch chains: parent and child both modify the same key.
1042    #[test_traced("WARN")]
1043    fn test_owned_batch_chain_overlapping_keys() {
1044        let executor = deterministic::Runner::default();
1045        executor.start(|context| async move {
1046            let mut db = create_test_db(context).await;
1047
1048            // Build initial data.
1049            apply_ops(&mut db, create_test_ops(5)).await;
1050            db.commit().await.unwrap();
1051
1052            let base = db.to_batch();
1053
1054            // Parent batch: insert key_x with value_a.
1055            let key_x = Digest::random(&mut commonware_utils::test_rng_seeded(600));
1056            let val_a = vec![10u8; 8];
1057            let parent_batch = base
1058                .new_batch::<Sha256>()
1059                .write(key_x, Some(val_a.clone()))
1060                .merkleize(&db, None)
1061                .await
1062                .unwrap();
1063
1064            // Child batch: update key_x to value_b (overlapping key).
1065            let val_b = vec![20u8; 8];
1066            let child_batch = parent_batch
1067                .new_batch::<Sha256>()
1068                .write(key_x, Some(val_b.clone()))
1069                .merkleize(&db, None)
1070                .await
1071                .unwrap();
1072
1073            db.apply_batch(parent_batch).await.unwrap();
1074            db.commit().await.unwrap();
1075
1076            // key_x should have parent's value.
1077            assert_eq!(db.get(&key_x).await.unwrap().unwrap(), val_a);
1078
1079            // Commit child.
1080            db.apply_batch(child_batch).await.unwrap();
1081            db.commit().await.unwrap();
1082
1083            // key_x should now have child's value.
1084            assert_eq!(db.get(&key_x).await.unwrap().unwrap(), val_b);
1085
1086            db.destroy().await.unwrap();
1087        });
1088    }
1089
1090    /// Three-deep batch chain: grandparent -> parent -> child.
1091    /// Commit each layer sequentially.
1092    #[test_traced("WARN")]
1093    fn test_owned_batch_chain_three_deep() {
1094        let executor = deterministic::Runner::default();
1095        executor.start(|context| async move {
1096            let mut db = create_test_db(context).await;
1097
1098            apply_ops(&mut db, create_test_ops(10)).await;
1099            db.commit().await.unwrap();
1100
1101            let base = db.to_batch();
1102
1103            // Grandparent: insert key_a.
1104            let key_a = Digest::random(&mut commonware_utils::test_rng_seeded(900));
1105            let val_a = vec![1u8; 10];
1106            let grandparent_batch = base
1107                .new_batch::<Sha256>()
1108                .write(key_a, Some(val_a.clone()))
1109                .merkleize(&db, None)
1110                .await
1111                .unwrap();
1112
1113            // Parent: insert key_b.
1114            let key_b = Digest::random(&mut commonware_utils::test_rng_seeded(901));
1115            let val_b = vec![2u8; 10];
1116            let parent_batch = grandparent_batch
1117                .new_batch::<Sha256>()
1118                .write(key_b, Some(val_b.clone()))
1119                .merkleize(&db, None)
1120                .await
1121                .unwrap();
1122
1123            // Child: insert key_c.
1124            let key_c = Digest::random(&mut commonware_utils::test_rng_seeded(902));
1125            let val_c = vec![3u8; 10];
1126            let child_batch = parent_batch
1127                .new_batch::<Sha256>()
1128                .write(key_c, Some(val_c.clone()))
1129                .merkleize(&db, None)
1130                .await
1131                .unwrap();
1132
1133            db.apply_batch(grandparent_batch).await.unwrap();
1134            db.commit().await.unwrap();
1135            assert_eq!(db.get(&key_a).await.unwrap().unwrap(), val_a);
1136
1137            // Commit parent.
1138            db.apply_batch(parent_batch).await.unwrap();
1139            db.commit().await.unwrap();
1140            assert_eq!(db.get(&key_b).await.unwrap().unwrap(), val_b);
1141
1142            // Commit child.
1143            db.apply_batch(child_batch).await.unwrap();
1144            db.commit().await.unwrap();
1145            assert_eq!(db.get(&key_c).await.unwrap().unwrap(), val_c);
1146
1147            // All three keys readable.
1148            assert_eq!(db.get(&key_a).await.unwrap().unwrap(), val_a);
1149            assert_eq!(db.get(&key_b).await.unwrap().unwrap(), val_b);
1150            assert_eq!(db.get(&key_c).await.unwrap().unwrap(), val_c);
1151
1152            db.destroy().await.unwrap();
1153        });
1154    }
1155
1156    /// Three-deep chain where each layer touches the same key.
1157    #[test_traced("WARN")]
1158    fn test_owned_batch_chain_three_deep_overlapping_key() {
1159        let executor = deterministic::Runner::default();
1160        executor.start(|context| async move {
1161            let mut db = create_test_db(context).await;
1162
1163            apply_ops(&mut db, create_test_ops(5)).await;
1164            db.commit().await.unwrap();
1165
1166            let base = db.to_batch();
1167            let key_x = Digest::random(&mut commonware_utils::test_rng_seeded(910));
1168
1169            // Grandparent: insert key_x = val_a.
1170            let val_a = vec![10u8; 8];
1171            let grandparent_batch = base
1172                .new_batch::<Sha256>()
1173                .write(key_x, Some(val_a.clone()))
1174                .merkleize(&db, None)
1175                .await
1176                .unwrap();
1177
1178            // Parent: update key_x = val_b.
1179            let val_b = vec![20u8; 8];
1180            let parent_batch = grandparent_batch
1181                .new_batch::<Sha256>()
1182                .write(key_x, Some(val_b.clone()))
1183                .merkleize(&db, None)
1184                .await
1185                .unwrap();
1186
1187            // Child: delete key_x.
1188            let child_batch = parent_batch
1189                .new_batch::<Sha256>()
1190                .write(key_x, None)
1191                .merkleize(&db, None)
1192                .await
1193                .unwrap();
1194
1195            db.apply_batch(grandparent_batch).await.unwrap();
1196            db.commit().await.unwrap();
1197            assert_eq!(db.get(&key_x).await.unwrap().unwrap(), val_a);
1198
1199            // Commit parent.
1200            db.apply_batch(parent_batch).await.unwrap();
1201            db.commit().await.unwrap();
1202            assert_eq!(db.get(&key_x).await.unwrap().unwrap(), val_b);
1203
1204            // Commit child.
1205            db.apply_batch(child_batch).await.unwrap();
1206            db.commit().await.unwrap();
1207            assert!(db.get(&key_x).await.unwrap().is_none());
1208
1209            db.destroy().await.unwrap();
1210        });
1211    }
1212
1213    /// After committing and dropping an ancestor, building a new child
1214    /// from a surviving descendant must not panic or return wrong data.
1215    /// Regression test: the Merkleizer's `read_op` fell into the
1216    /// "ancestor chain" region for operations that belonged to the freed
1217    /// ancestor, causing wrong indexing.
1218    #[test_traced("WARN")]
1219    fn test_new_child_after_ancestor_committed_and_dropped() {
1220        let executor = deterministic::Runner::default();
1221        executor.start(|context| async move {
1222            let mut db = create_test_db(context).await;
1223
1224            apply_ops(&mut db, create_test_ops(5)).await;
1225            db.commit().await.unwrap();
1226
1227            // Chain: DB <-- a <-- b
1228            let key_a = Digest::random(&mut commonware_utils::test_rng_seeded(800));
1229            let val_a = vec![10u8; 8];
1230            let a = db
1231                .new_batch()
1232                .write(key_a, Some(val_a.clone()))
1233                .merkleize(&db, None)
1234                .await
1235                .unwrap();
1236
1237            let key_b = Digest::random(&mut commonware_utils::test_rng_seeded(801));
1238            let val_b = vec![20u8; 8];
1239            let b = a
1240                .new_batch::<Sha256>()
1241                .write(key_b, Some(val_b.clone()))
1242                .merkleize(&db, None)
1243                .await
1244                .unwrap();
1245
1246            // Commit a and drop it. b's Weak<a> becomes invalid.
1247            db.apply_batch(a).await.unwrap();
1248            db.commit().await.unwrap();
1249
1250            // Build c from b. This must not panic despite a being freed.
1251            let key_c = Digest::random(&mut commonware_utils::test_rng_seeded(802));
1252            let val_c = vec![30u8; 8];
1253            let c = b
1254                .new_batch::<Sha256>()
1255                .write(key_c, Some(val_c.clone()))
1256                .merkleize(&db, None)
1257                .await
1258                .unwrap();
1259
1260            // Commit b (skip_ancestors path since a is committed).
1261            db.apply_batch(b).await.unwrap();
1262            db.commit().await.unwrap();
1263
1264            // Commit c.
1265            db.apply_batch(c).await.unwrap();
1266            db.commit().await.unwrap();
1267
1268            // All three keys present with correct values.
1269            assert_eq!(db.get(&key_a).await.unwrap().unwrap(), val_a);
1270            assert_eq!(db.get(&key_b).await.unwrap().unwrap(), val_b);
1271            assert_eq!(db.get(&key_c).await.unwrap().unwrap(), val_c);
1272
1273            db.destroy().await.unwrap();
1274        });
1275    }
1276
1277    /// Regression: applying a batch after its ancestor Arc is dropped (without
1278    /// committing) must still apply the ancestor's snapshot diffs. Before the
1279    /// fix, the Weak parent chain was dead and ancestor diffs were silently
1280    /// lost, causing the journal and snapshot to diverge.
1281    #[test_traced("WARN")]
1282    fn test_apply_batch_after_ancestor_dropped_without_commit() {
1283        let executor = deterministic::Runner::default();
1284        executor.start(|context| async move {
1285            let mut db = create_test_db(context).await;
1286
1287            apply_ops(&mut db, create_test_ops(5)).await;
1288            db.commit().await.unwrap();
1289
1290            let base = db.to_batch();
1291
1292            // Chain: base <-- a <-- b <-- c
1293            let key_a = Digest::random(&mut commonware_utils::test_rng_seeded(700));
1294            let val_a = vec![1u8; 10];
1295            let a = base
1296                .new_batch::<Sha256>()
1297                .write(key_a, Some(val_a.clone()))
1298                .merkleize(&db, None)
1299                .await
1300                .unwrap();
1301
1302            let key_b = Digest::random(&mut commonware_utils::test_rng_seeded(701));
1303            let val_b = vec![2u8; 10];
1304            let b = a
1305                .new_batch::<Sha256>()
1306                .write(key_b, Some(val_b.clone()))
1307                .merkleize(&db, None)
1308                .await
1309                .unwrap();
1310
1311            let key_c = Digest::random(&mut commonware_utils::test_rng_seeded(702));
1312            let val_c = vec![3u8; 10];
1313            let c = b
1314                .new_batch::<Sha256>()
1315                .write(key_c, Some(val_c.clone()))
1316                .merkleize(&db, None)
1317                .await
1318                .unwrap();
1319
1320            // Drop a and b without committing. Their Weak refs in c are now dead.
1321            drop(a);
1322            drop(b);
1323
1324            // Apply only the tip. This is !skip_ancestors (db hasn't changed).
1325            // Before the fix, a's and b's snapshot diffs would be silently lost.
1326            db.apply_batch(c).await.unwrap();
1327            db.commit().await.unwrap();
1328
1329            // All three keys must be in the snapshot.
1330            assert_eq!(db.get(&key_a).await.unwrap().unwrap(), val_a);
1331            assert_eq!(db.get(&key_b).await.unwrap().unwrap(), val_b);
1332            assert_eq!(db.get(&key_c).await.unwrap().unwrap(), val_c);
1333
1334            db.destroy().await.unwrap();
1335        });
1336    }
1337}