Skip to main content

commonware_storage/qmdb/any/ordered/
fixed.rs

1//! An _ordered_ variant of an "Any" authenticated database with fixed-size values which additionally
2//! maintains the lexicographic-next active key of each active key. For example, if the active key
3//! set is `{bar, baz, foo}`, then the next-key value for `bar` is `baz`, the next-key value for
4//! `baz` is `foo`, and because we define the next-key of the very last key as the first key, the
5//! next-key value for `foo` is `bar`.
6
7use crate::{
8    index::ordered::Index,
9    journal::contiguous::fixed::Journal,
10    merkle::{self, Location},
11    qmdb::{
12        any::{ordered, value::FixedEncoding, FixedConfig as Config, FixedValue},
13        Error,
14    },
15    translator::Translator,
16    Context,
17};
18use commonware_cryptography::Hasher;
19use commonware_utils::Array;
20
21pub type Update<K, V> = ordered::Update<K, FixedEncoding<V>>;
22pub type Operation<F, K, V> = ordered::Operation<F, K, FixedEncoding<V>>;
23
24/// A key-value QMDB based on an authenticated log of operations, supporting authentication of any
25/// value ever associated with a key.
26pub type Db<F, E, K, V, H, T> =
27    super::Db<F, E, Journal<E, Operation<F, K, V>>, Index<T, Location<F>>, H, Update<K, V>>;
28
29impl<F: merkle::Family, E: Context, K: Array, V: FixedValue, H: Hasher, T: Translator>
30    Db<F, E, K, V, H, T>
31{
32    /// Returns a [Db] qmdb initialized from `cfg`. Any uncommitted log operations will be
33    /// discarded and the state of the db will be as of the last committed operation.
34    pub async fn init(context: E, cfg: Config<T>) -> Result<Self, Error<F>> {
35        Self::init_with_callback(context, cfg, None, |_, _| {}).await
36    }
37
38    /// Initialize the DB, invoking `callback` for each operation processed during recovery.
39    ///
40    /// If `known_inactivity_floor` is provided and is less than the log's actual inactivity floor,
41    /// `callback` is invoked with `(false, None)` for each location in the gap. Then, as the
42    /// snapshot is built from the log, `callback` is invoked for each operation with its activity
43    /// status and previous location (if any).
44    pub(crate) async fn init_with_callback(
45        context: E,
46        cfg: Config<T>,
47        known_inactivity_floor: Option<Location<F>>,
48        callback: impl FnMut(bool, Option<Location<F>>),
49    ) -> Result<Self, Error<F>> {
50        crate::qmdb::any::init(context, cfg, known_inactivity_floor, callback).await
51    }
52}
53
54/// Partitioned index variants that divide the key space into `2^(P*8)` partitions.
55///
56/// See [partitioned::Db] for the generic type, or use the convenience aliases:
57/// - [partitioned::p256::Db] for 256 partitions (P=1)
58/// - [partitioned::p64k::Db] for 65,536 partitions (P=2)
59pub mod partitioned {
60    pub use super::{Operation, Update};
61    use crate::{
62        index::partitioned::ordered::Index,
63        journal::contiguous::fixed::Journal,
64        merkle::{self, Location},
65        qmdb::{
66            any::{FixedConfig as Config, FixedValue},
67            Error,
68        },
69        translator::Translator,
70        Context,
71    };
72    use commonware_cryptography::Hasher;
73    use commonware_utils::Array;
74
75    /// An ordered key-value QMDB with a partitioned snapshot index.
76    ///
77    /// This is the partitioned variant of [super::Db]. The const generic `P` specifies
78    /// the number of prefix bytes used for partitioning:
79    /// - `P = 1`: 256 partitions
80    /// - `P = 2`: 65,536 partitions
81    ///
82    /// Use partitioned indices when you have a large number of keys (>> 2^(P*8)) and memory
83    /// efficiency is important. Keys should be uniformly distributed across the prefix space.
84    pub type Db<F, E, K, V, H, T, const P: usize> = crate::qmdb::any::ordered::Db<
85        F,
86        E,
87        Journal<E, Operation<F, K, V>>,
88        Index<T, Location<F>, P>,
89        H,
90        Update<K, V>,
91    >;
92
93    impl<
94            F: merkle::Family,
95            E: Context,
96            K: Array,
97            V: FixedValue,
98            H: Hasher,
99            T: Translator,
100            const P: usize,
101        > Db<F, E, K, V, H, T, P>
102    {
103        /// Returns a [Db] QMDB initialized from `cfg`. Uncommitted log operations will be
104        /// discarded and the state of the db will be as of the last committed operation.
105        pub async fn init(context: E, cfg: Config<T>) -> Result<Self, Error<F>> {
106            Self::init_with_callback(context, cfg, None, |_, _| {}).await
107        }
108
109        /// Initialize the DB, invoking `callback` for each operation processed during recovery.
110        ///
111        /// If `known_inactivity_floor` is provided and is less than the log's actual inactivity floor,
112        /// `callback` is invoked with `(false, None)` for each location in the gap. Then, as the
113        /// snapshot is built from the log, `callback` is invoked for each operation with its activity
114        /// status and previous location (if any).
115        pub(crate) async fn init_with_callback(
116            context: E,
117            cfg: Config<T>,
118            known_inactivity_floor: Option<Location<F>>,
119            callback: impl FnMut(bool, Option<Location<F>>),
120        ) -> Result<Self, Error<F>> {
121            crate::qmdb::any::init(context, cfg, known_inactivity_floor, callback).await
122        }
123    }
124
125    /// Convenience type aliases for 256 partitions (P=1).
126    pub mod p256 {
127        /// Fixed-value DB with 256 partitions.
128        pub type Db<F, E, K, V, H, T> = super::Db<F, E, K, V, H, T, 1>;
129    }
130
131    /// Convenience type aliases for 65,536 partitions (P=2).
132    pub mod p64k {
133        /// Fixed-value DB with 65,536 partitions.
134        pub type Db<F, E, K, V, H, T> = super::Db<F, E, K, V, H, T, 2>;
135    }
136}
137
138#[cfg(test)]
139pub(crate) mod test {
140    use super::*;
141    use crate::{
142        index::Unordered as _,
143        merkle::{
144            mmr::{self, Location, StandardHasher as Standard},
145            Location as GenericLocation,
146        },
147        qmdb::{
148            any::{
149                ordered::{
150                    test::{
151                        test_ordered_any_db_basic, test_ordered_any_db_empty,
152                        test_ordered_any_update_collision_edge_case,
153                    },
154                    Update,
155                },
156                test::fixed_db_config,
157            },
158            verify_proof,
159        },
160        translator::{OneCap, TwoCap},
161    };
162    use commonware_cryptography::{sha256::Digest, Hasher, Sha256};
163    use commonware_macros::test_traced;
164    use commonware_math::algebra::Random;
165    use commonware_runtime::{
166        deterministic::{self, Context},
167        Metrics, Runner as _,
168    };
169    use commonware_utils::{sequence::FixedBytes, test_rng_seeded, NZU64};
170    use futures::StreamExt as _;
171    use rand::{rngs::StdRng, seq::IteratorRandom, RngCore, SeedableRng};
172    use std::collections::{BTreeMap, HashMap};
173
174    /// A generic type alias for an Any database parameterized by merkle family.
175    type AnyTestGeneric<F> = crate::qmdb::any::db::Db<
176        F,
177        deterministic::Context,
178        Journal<
179            deterministic::Context,
180            crate::qmdb::any::operation::Ordered<F, Digest, FixedEncoding<Digest>>,
181        >,
182        Index<TwoCap, GenericLocation<F>>,
183        Sha256,
184        crate::qmdb::any::operation::update::Ordered<Digest, FixedEncoding<Digest>>,
185    >;
186
187    /// Type alias for the concrete [Db] type used in these unit tests.
188    pub(crate) type AnyTest =
189        Db<mmr::Family, deterministic::Context, Digest, Digest, Sha256, TwoCap>;
190
191    /// Return an `Any` database initialized with a fixed config, generic over merkle family.
192    async fn open_db_generic<F: crate::merkle::Family>(
193        context: deterministic::Context,
194    ) -> AnyTestGeneric<F> {
195        let cfg = fixed_db_config::<TwoCap>("partition", &context);
196        crate::qmdb::any::init(context, cfg, None, |_, _| {})
197            .await
198            .unwrap()
199    }
200
201    /// Return an `Any` database initialized with a fixed config.
202    async fn open_db(context: deterministic::Context) -> AnyTest {
203        let cfg = fixed_db_config("partition", &context);
204        AnyTest::init(context, cfg).await.unwrap()
205    }
206
207    /// Create a test database with unique partition names
208    pub(crate) async fn create_test_db(mut context: Context) -> AnyTest {
209        let seed = context.next_u64();
210        let cfg = fixed_db_config::<TwoCap>(&seed.to_string(), &context);
211        AnyTest::init(context, cfg).await.unwrap()
212    }
213
214    /// Create n random operations using the default seed (0). Some portion of
215    /// the updates are deletes. create_test_ops(n) is a prefix of
216    /// create_test_ops(n') for n < n'.
217    pub(crate) fn create_test_ops(n: usize) -> Vec<Operation<mmr::Family, Digest, Digest>> {
218        create_test_ops_seeded(n, 0)
219    }
220
221    /// Create n random operations using a specific seed. Use different seeds
222    /// when you need non-overlapping keys in the same test.
223    pub(crate) fn create_test_ops_seeded(
224        n: usize,
225        seed: u64,
226    ) -> Vec<Operation<mmr::Family, Digest, Digest>> {
227        let mut rng = test_rng_seeded(seed);
228        let mut prev_key = Digest::random(&mut rng);
229        let mut ops = Vec::new();
230        for i in 0..n {
231            if i % 10 == 0 && i > 0 {
232                ops.push(Operation::Delete(prev_key));
233            } else {
234                let key = Digest::random(&mut rng);
235                let next_key = Digest::random(&mut rng);
236                let value = Digest::random(&mut rng);
237                ops.push(Operation::Update(Update {
238                    key,
239                    value,
240                    next_key,
241                }));
242                prev_key = key;
243            }
244        }
245        ops
246    }
247
248    /// Applies the given operations to the database.
249    pub(crate) async fn apply_ops(
250        db: &mut AnyTest,
251        ops: Vec<Operation<mmr::Family, Digest, Digest>>,
252    ) {
253        let mut batch = db.new_batch();
254        for op in ops {
255            match op {
256                Operation::Update(data) => {
257                    batch = batch.write(data.key, Some(data.value));
258                }
259                Operation::Delete(key) => {
260                    batch = batch.write(key, None);
261                }
262                Operation::CommitFloor(_, _) => {
263                    // CommitFloor consumes self - not supported in this helper.
264                    // Test data from create_test_ops never includes CommitFloor.
265                    panic!("CommitFloor not supported in apply_ops");
266                }
267            }
268        }
269        let merkleized = batch.merkleize(db, None).await.unwrap();
270        db.apply_batch(merkleized).await.unwrap();
271    }
272
273    #[test_traced("WARN")]
274    // Test the edge case that arises where we're inserting the second key and it precedes the first
275    // key, but shares the same translated key.
276    fn test_ordered_any_fixed_db_translated_key_collision_edge_case() {
277        let executor = deterministic::Runner::default();
278        executor.start(|mut context| async move {
279            let seed = context.next_u64();
280            let config = fixed_db_config::<OneCap>(&seed.to_string(), &context);
281            let mut db = Db::<mmr::Family, Context, FixedBytes<2>, i32, Sha256, OneCap>::init(
282                context, config,
283            )
284            .await
285            .unwrap();
286            let key1 = FixedBytes::<2>::new([1u8, 1u8]);
287            let key2 = FixedBytes::<2>::new([1u8, 3u8]);
288            // Create some keys that will not be added to the snapshot.
289            let early_key = FixedBytes::<2>::new([0u8, 2u8]);
290            let late_key = FixedBytes::<2>::new([3u8, 0u8]);
291            let middle_key = FixedBytes::<2>::new([1u8, 2u8]);
292
293            let merkleized = db
294                .new_batch()
295                .write(key1.clone(), Some(1))
296                .write(key2.clone(), Some(2))
297                .merkleize(&db, None)
298                .await
299                .unwrap();
300            db.apply_batch(merkleized).await.unwrap();
301            assert_eq!(db.get_all(&key1).await.unwrap().unwrap(), (1, key2.clone()));
302            assert_eq!(db.get_all(&key2).await.unwrap().unwrap(), (2, key1.clone()));
303            assert!(db.get_span(&key1).await.unwrap().unwrap().1.next_key == key2.clone());
304            assert!(db.get_span(&key2).await.unwrap().unwrap().1.next_key == key1.clone());
305            assert!(db.get_span(&early_key).await.unwrap().unwrap().1.next_key == key1.clone());
306            assert!(db.get_span(&middle_key).await.unwrap().unwrap().1.next_key == key2.clone());
307            assert!(db.get_span(&late_key).await.unwrap().unwrap().1.next_key == key1.clone());
308
309            let merkleized = db
310                .new_batch()
311                .write(key1.clone(), None)
312                .merkleize(&db, None)
313                .await
314                .unwrap();
315            db.apply_batch(merkleized).await.unwrap();
316            assert!(db.get_span(&key1).await.unwrap().unwrap().1.next_key == key2.clone());
317            assert!(db.get_span(&key2).await.unwrap().unwrap().1.next_key == key2.clone());
318            assert!(db.get_span(&early_key).await.unwrap().unwrap().1.next_key == key2.clone());
319            assert!(db.get_span(&middle_key).await.unwrap().unwrap().1.next_key == key2.clone());
320            assert!(db.get_span(&late_key).await.unwrap().unwrap().1.next_key == key2.clone());
321
322            let merkleized = db
323                .new_batch()
324                .write(key2.clone(), None)
325                .merkleize(&db, None)
326                .await
327                .unwrap();
328            db.apply_batch(merkleized).await.unwrap();
329            assert!(db.get_span(&key1).await.unwrap().is_none());
330            assert!(db.get_span(&key2).await.unwrap().is_none());
331
332            assert!(db.is_empty());
333
334            // Update the keys in opposite order from earlier.
335
336            let merkleized = db
337                .new_batch()
338                .write(key2.clone(), Some(2))
339                .write(key1.clone(), Some(1))
340                .merkleize(&db, None)
341                .await
342                .unwrap();
343            db.apply_batch(merkleized).await.unwrap();
344            assert_eq!(db.get_all(&key1).await.unwrap().unwrap(), (1, key2.clone()));
345            assert_eq!(db.get_all(&key2).await.unwrap().unwrap(), (2, key1.clone()));
346            assert!(db.get_span(&key1).await.unwrap().unwrap().1.next_key == key2.clone());
347            assert!(db.get_span(&key2).await.unwrap().unwrap().1.next_key == key1.clone());
348            assert!(db.get_span(&early_key).await.unwrap().unwrap().1.next_key == key1.clone());
349            assert!(db.get_span(&middle_key).await.unwrap().unwrap().1.next_key == key2.clone());
350            assert!(db.get_span(&late_key).await.unwrap().unwrap().1.next_key == key1.clone());
351
352            // Delete the keys in opposite order from earlier.
353
354            let merkleized = db
355                .new_batch()
356                .write(key2.clone(), None)
357                .merkleize(&db, None)
358                .await
359                .unwrap();
360            db.apply_batch(merkleized).await.unwrap();
361            assert!(db.get_span(&key1).await.unwrap().unwrap().1.next_key == key1.clone());
362            assert!(db.get_span(&key2).await.unwrap().unwrap().1.next_key == key1.clone());
363            assert!(db.get_span(&early_key).await.unwrap().unwrap().1.next_key == key1.clone());
364            assert!(db.get_span(&middle_key).await.unwrap().unwrap().1.next_key == key1.clone());
365            assert!(db.get_span(&late_key).await.unwrap().unwrap().1.next_key == key1.clone());
366
367            let merkleized = db
368                .new_batch()
369                .write(key1.clone(), None)
370                .merkleize(&db, None)
371                .await
372                .unwrap();
373            db.apply_batch(merkleized).await.unwrap();
374            assert!(db.get_span(&key1).await.unwrap().is_none());
375            assert!(db.get_span(&key2).await.unwrap().is_none());
376
377            db.destroy().await.unwrap();
378        });
379    }
380
381    #[test_traced("WARN")]
382    fn test_ordered_any_fixed_db_build_and_authenticate() {
383        let executor = deterministic::Runner::default();
384        // Build a db with 1000 keys, some of which we update and some of which we delete, and
385        // confirm that the end state of the db matches that of an identically updated hashmap.
386        const ELEMENTS: u64 = 1000;
387        executor.start(|context| async move {
388            let hasher = Standard::<Sha256>::new();
389            let mut db = open_db(context.with_label("first")).await;
390
391            let mut map = HashMap::<Digest, Digest>::default();
392            {
393                let mut batch = db.new_batch();
394                for i in 0u64..ELEMENTS {
395                    let k = Sha256::hash(&i.to_be_bytes());
396                    let v = Sha256::hash(&(i * 1000).to_be_bytes());
397                    batch = batch.write(k, Some(v));
398                    map.insert(k, v);
399                }
400
401                // Update every 3rd key
402                for i in 0u64..ELEMENTS {
403                    if i % 3 != 0 {
404                        continue;
405                    }
406                    let k = Sha256::hash(&i.to_be_bytes());
407                    let v = Sha256::hash(&((i + 1) * 10000).to_be_bytes());
408                    batch = batch.write(k, Some(v));
409                    map.insert(k, v);
410                }
411
412                // Delete every 7th key
413                for i in 0u64..ELEMENTS {
414                    if i % 7 != 1 {
415                        continue;
416                    }
417                    let k = Sha256::hash(&i.to_be_bytes());
418                    batch = batch.write(k, None);
419                    map.remove(&k);
420                }
421
422                let merkleized = batch.merkleize(&db, None).await.unwrap();
423                db.apply_batch(merkleized).await.unwrap();
424            }
425
426            assert_eq!(db.snapshot.items(), 857);
427
428            // Test that apply_batch + sync w/ pruning will raise the activity floor.
429            db.sync().await.unwrap();
430            db.prune(db.inactivity_floor_loc()).await.unwrap();
431            assert_eq!(db.snapshot.items(), 857);
432
433            // Drop & reopen the db, making sure it has exactly the same state.
434            let root = db.root();
435            db.sync().await.unwrap();
436            drop(db);
437            let mut db = open_db(context.with_label("second")).await;
438            assert_eq!(root, db.root());
439            assert_eq!(db.snapshot.items(), 857);
440
441            // Confirm the db's state matches that of the separate map we computed independently.
442            for i in 0u64..1000 {
443                let k = Sha256::hash(&i.to_be_bytes());
444                if let Some(map_value) = map.get(&k) {
445                    let Some(db_value) = db.get(&k).await.unwrap() else {
446                        panic!("key not found in db: {k}");
447                    };
448                    assert_eq!(*map_value, db_value);
449                } else {
450                    assert!(db.get(&k).await.unwrap().is_none());
451                }
452            }
453
454            // Make sure size-constrained batches of operations are provable from the oldest
455            // retained op to tip.
456            let max_ops = NZU64!(4);
457            let end_loc = db.bounds().await.end;
458            let start_loc = db.log.merkle.bounds().start;
459            // Raise the inactivity floor via an empty batch and make sure historical inactive
460            // operations are still provable.
461            let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
462            db.apply_batch(merkleized).await.unwrap();
463            let root = db.root();
464            assert!(start_loc < db.inactivity_floor_loc());
465
466            for i in start_loc.as_u64()..end_loc.as_u64() {
467                let loc = Location::from(i);
468                let (proof, log) = db.proof(loc, max_ops).await.unwrap();
469                assert!(verify_proof(&hasher, &proof, loc, &log, &root));
470            }
471
472            db.destroy().await.unwrap();
473        });
474    }
475
476    /// Test that various types of unclean shutdown while updating a non-empty DB recover to the
477    /// empty DB on re-open.
478    #[test_traced("WARN")]
479    fn test_ordered_any_fixed_non_empty_db_recovery() {
480        let executor = deterministic::Runner::default();
481        executor.start(|context| async move {
482            let mut db = open_db(context.with_label("first")).await;
483
484            // Insert 1000 keys then commit.
485            const ELEMENTS: u64 = 1000;
486            {
487                let mut batch = db.new_batch();
488                for i in 0u64..ELEMENTS {
489                    let k = Sha256::hash(&i.to_be_bytes());
490                    let v = Sha256::hash(&(i * 1000).to_be_bytes());
491                    batch = batch.write(k, Some(v));
492                }
493                let merkleized = batch.merkleize(&db, None).await.unwrap();
494                db.apply_batch(merkleized).await.unwrap();
495                db.commit().await.unwrap();
496            }
497            db.prune(db.inactivity_floor_loc()).await.unwrap();
498            let root = db.root();
499            let op_count = db.bounds().await.end;
500            let inactivity_floor_loc = db.inactivity_floor_loc();
501
502            // Reopen DB without clean shutdown and make sure the state is the same.
503            let mut db = open_db(context.with_label("second")).await;
504            assert_eq!(db.bounds().await.end, op_count);
505            assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
506            assert_eq!(db.root(), root);
507
508            fn write_unapplied_batch(db: &mut AnyTest) {
509                let mut batch = db.new_batch();
510                for i in 0u64..ELEMENTS {
511                    let k = Sha256::hash(&i.to_be_bytes());
512                    let v = Sha256::hash(&((i + 1) * 10000).to_be_bytes());
513                    batch = batch.write(k, Some(v));
514                }
515                // Don't merkleize/apply -- simulates uncommitted writes
516            }
517
518            // Insert operations without applying, then drop without cleanup.
519
520            write_unapplied_batch(&mut db);
521            drop(db);
522            let mut db = open_db(context.with_label("third")).await;
523            assert_eq!(db.bounds().await.end, op_count);
524            assert_eq!(db.inactivity_floor_loc(), inactivity_floor_loc);
525            assert_eq!(db.root(), root);
526
527            // Repeat, drop without cleanup again.
528
529            write_unapplied_batch(&mut db);
530            drop(db);
531            let mut db = open_db(context.with_label("fourth")).await;
532            assert_eq!(db.bounds().await.end, op_count);
533            assert_eq!(db.root(), root);
534
535            // One last check that re-open without proper shutdown still recovers the correct state.
536
537            write_unapplied_batch(&mut db);
538            write_unapplied_batch(&mut db);
539            write_unapplied_batch(&mut db);
540            let mut db = open_db(context.with_label("fifth")).await;
541            assert_eq!(db.bounds().await.end, op_count);
542            assert_eq!(db.root(), root);
543
544            // Apply the ops one last time but fully commit them this time, then clean up.
545
546            {
547                let mut batch = db.new_batch();
548                for i in 0u64..ELEMENTS {
549                    let k = Sha256::hash(&i.to_be_bytes());
550                    let v = Sha256::hash(&((i + 1) * 10000).to_be_bytes());
551                    batch = batch.write(k, Some(v));
552                }
553                let merkleized = batch.merkleize(&db, None).await.unwrap();
554                db.apply_batch(merkleized).await.unwrap();
555                db.commit().await.unwrap();
556            }
557            let db = open_db(context.with_label("sixth")).await;
558            assert!(db.bounds().await.end > op_count);
559            assert_ne!(db.inactivity_floor_loc(), inactivity_floor_loc);
560            assert_ne!(db.root(), root);
561
562            db.destroy().await.unwrap();
563        });
564    }
565
566    /// Test that various types of unclean shutdown while updating an empty DB recover to the empty
567    /// DB on re-open.
568    #[test_traced("WARN")]
569    fn test_ordered_any_fixed_empty_db_recovery() {
570        let executor = deterministic::Runner::default();
571        executor.start(|context| async move {
572            // Initialize an empty db.
573            let db = open_db(context.with_label("first")).await;
574            let root = db.root();
575
576            // Reopen DB without clean shutdown and make sure the state is the same.
577            let mut db = open_db(context.with_label("second")).await;
578            assert_eq!(db.bounds().await.end, 1);
579            assert_eq!(db.root(), root);
580
581            fn write_unapplied_batch(db: &mut AnyTest) {
582                let mut batch = db.new_batch();
583                for i in 0u64..1000 {
584                    let k = Sha256::hash(&i.to_be_bytes());
585                    let v = Sha256::hash(&((i + 1) * 10000).to_be_bytes());
586                    batch = batch.write(k, Some(v));
587                }
588                // Don't merkleize/apply -- simulates uncommitted writes
589            }
590
591            // Insert operations without applying then drop without cleanup.
592
593            write_unapplied_batch(&mut db);
594            drop(db);
595            let mut db = open_db(context.with_label("third")).await;
596            assert_eq!(db.bounds().await.end, 1);
597            assert_eq!(db.root(), root);
598
599            // Repeat, drop without cleanup again.
600
601            write_unapplied_batch(&mut db);
602            drop(db);
603            let mut db = open_db(context.with_label("fourth")).await;
604            assert_eq!(db.bounds().await.end, 1);
605            assert_eq!(db.root(), root);
606
607            // One last check that re-open without proper shutdown still recovers the correct state.
608
609            write_unapplied_batch(&mut db);
610            write_unapplied_batch(&mut db);
611            write_unapplied_batch(&mut db);
612            let mut db = open_db(context.with_label("fifth")).await;
613            assert_eq!(db.bounds().await.end, 1);
614            assert_eq!(db.root(), root);
615
616            // Apply the ops one last time but fully commit them this time, then clean up.
617
618            {
619                let mut batch = db.new_batch();
620                for i in 0u64..1000 {
621                    let k = Sha256::hash(&i.to_be_bytes());
622                    let v = Sha256::hash(&((i + 1) * 10000).to_be_bytes());
623                    batch = batch.write(k, Some(v));
624                }
625                let merkleized = batch.merkleize(&db, None).await.unwrap();
626                db.apply_batch(merkleized).await.unwrap();
627                db.commit().await.unwrap();
628            }
629            let db = open_db(context.with_label("sixth")).await;
630            assert!(db.bounds().await.end > 1);
631            assert_ne!(db.root(), root);
632
633            db.destroy().await.unwrap();
634        });
635    }
636
637    #[test_traced("WARN")]
638    fn test_ordered_any_fixed_db_multiple_commits_delete_gets_replayed() {
639        let executor = deterministic::Runner::default();
640        executor.start(|context| async move {
641            let mut db = open_db(context.with_label("first")).await;
642
643            let mut map = HashMap::<Digest, Digest>::default();
644            const ELEMENTS: u64 = 10;
645            // insert & apply multiple batches to ensure repeated inactivity floor raising.
646            let metadata = Sha256::hash(&42u64.to_be_bytes());
647            for j in 0u64..ELEMENTS {
648                let mut batch = db.new_batch();
649                for i in 0u64..ELEMENTS {
650                    let k = Sha256::hash(&(j * 1000 + i).to_be_bytes());
651                    let v = Sha256::hash(&(i * 1000).to_be_bytes());
652                    batch = batch.write(k, Some(v));
653                    map.insert(k, v);
654                }
655                let merkleized = batch.merkleize(&db, Some(metadata)).await.unwrap();
656                db.apply_batch(merkleized).await.unwrap();
657                db.commit().await.unwrap();
658            }
659            assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
660            let k = Sha256::hash(&((ELEMENTS - 1) * 1000 + (ELEMENTS - 1)).to_be_bytes());
661
662            // Do one last delete operation which will be above the inactivity
663            // floor, to make sure it gets replayed on restart.
664            let merkleized = db
665                .new_batch()
666                .write(k, None)
667                .merkleize(&db, None)
668                .await
669                .unwrap();
670            db.apply_batch(merkleized).await.unwrap();
671            db.commit().await.unwrap();
672            assert_eq!(db.get_metadata().await.unwrap(), None);
673            assert!(db.get(&k).await.unwrap().is_none());
674
675            // Drop & reopen the db, making sure the re-opened db has exactly the same state.
676            let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
677            db.apply_batch(merkleized).await.unwrap();
678            db.commit().await.unwrap();
679            let root = db.root();
680            drop(db);
681            let db = open_db(context.with_label("second")).await;
682            assert_eq!(root, db.root());
683            assert_eq!(db.get_metadata().await.unwrap(), None);
684            assert!(db.get(&k).await.unwrap().is_none());
685
686            db.destroy().await.unwrap();
687        });
688    }
689
690    #[test]
691    fn test_ordered_any_fixed_db_historical_proof_basic() {
692        let executor = deterministic::Runner::default();
693        executor.start(|context| async move {
694            let mut db = create_test_db(context.clone()).await;
695            let ops = create_test_ops(20);
696            apply_ops(&mut db, ops.clone()).await;
697            let hasher = Standard::<Sha256>::new();
698            let root_hash = db.root();
699            let original_op_count = db.bounds().await.end;
700
701            // Historical proof should match "regular" proof when historical size == current database size
702            let max_ops = NZU64!(10);
703            let (historical_proof, historical_ops) = db
704                .historical_proof(original_op_count, Location::new(5), max_ops)
705                .await
706                .unwrap();
707            let (regular_proof, regular_ops) = db.proof(Location::new(5), max_ops).await.unwrap();
708
709            assert_eq!(historical_proof.leaves, regular_proof.leaves);
710            assert_eq!(historical_proof.digests, regular_proof.digests);
711            assert_eq!(historical_ops, regular_ops);
712            assert!(verify_proof(
713                &hasher,
714                &historical_proof,
715                Location::new(5),
716                &historical_ops,
717                &root_hash
718            ));
719
720            // Add more operations to the database
721            // (use different seed to avoid key collisions)
722            let more_ops = create_test_ops_seeded(5, 1);
723
724            apply_ops(&mut db, more_ops.clone()).await;
725
726            // Historical proof should remain the same even though database has grown
727            let (historical_proof, historical_ops) = db
728                .historical_proof(original_op_count, Location::new(5), NZU64!(10))
729                .await
730                .unwrap();
731            assert_eq!(historical_proof.leaves, original_op_count);
732            assert_eq!(historical_ops.len(), 10);
733            assert_eq!(historical_proof.digests, regular_proof.digests);
734            assert_eq!(historical_ops, regular_ops);
735            assert!(verify_proof(
736                &hasher,
737                &historical_proof,
738                Location::new(5),
739                &historical_ops,
740                &root_hash
741            ));
742
743            db.destroy().await.unwrap();
744        });
745    }
746
747    #[test]
748    fn test_ordered_any_fixed_db_historical_proof_edge_cases() {
749        let executor = deterministic::Runner::default();
750        executor.start(|context| async move {
751            let hasher = Standard::<Sha256>::new();
752            let ops = create_test_ops(50);
753
754            let mut db = create_test_db(context.with_label("first")).await;
755            apply_ops(&mut db, ops.clone()).await;
756
757            let root = db.root();
758            let full_size = db.bounds().await.end;
759
760            // Verify a single-op proof at the full commit size.
761            let (proof, proof_ops) = db.proof(Location::new(1), NZU64!(1)).await.unwrap();
762            assert_eq!(proof_ops.len(), 1);
763            assert!(verify_proof(
764                &hasher,
765                &proof,
766                Location::new(1),
767                &proof_ops,
768                &root
769            ));
770
771            // historical_proof at full size should match proof.
772            let (hp, hp_ops) = db
773                .historical_proof(full_size, Location::new(1), NZU64!(1))
774                .await
775                .unwrap();
776            assert_eq!(hp.digests, proof.digests);
777            assert_eq!(hp_ops, proof_ops);
778
779            // Test requesting more operations than available in historical position.
780            let (_proof, limited_ops) = db
781                .historical_proof(Location::new(10), Location::new(5), NZU64!(20))
782                .await
783                .unwrap();
784            assert_eq!(limited_ops.len(), 5); // limited by historical size
785
786            // Test proof at minimum historical position.
787            let (min_proof, min_ops) = db
788                .historical_proof(Location::new(4), Location::new(1), NZU64!(3))
789                .await
790                .unwrap();
791            assert_eq!(min_proof.leaves, Location::new(4));
792            assert_eq!(min_ops.len(), 3);
793
794            db.destroy().await.unwrap();
795        });
796    }
797
798    #[test]
799    fn test_ordered_any_fixed_db_historical_proof_different_historical_sizes() {
800        let executor = deterministic::Runner::default();
801        executor.start(|context| async move {
802            let mut db = create_test_db(context.clone()).await;
803            let ops = create_test_ops(100);
804            apply_ops(&mut db, ops.clone()).await;
805
806            let hasher = Standard::<Sha256>::new();
807            let root = db.root();
808
809            let start_loc = Location::new(20);
810            let max_ops = NZU64!(10);
811            let (proof, ops) = db.proof(start_loc, max_ops).await.unwrap();
812
813            // Now keep adding operations and make sure we can still generate a historical proof that matches the original.
814            let historical_size = db.bounds().await.end;
815
816            for i in 1..10 {
817                // Use different seed per iteration to avoid key collisions
818                let more_ops = create_test_ops_seeded(100, i);
819                apply_ops(&mut db, more_ops).await;
820
821                let (historical_proof, historical_ops) = db
822                    .historical_proof(historical_size, start_loc, max_ops)
823                    .await
824                    .unwrap();
825                assert_eq!(proof.leaves, historical_proof.leaves);
826                assert_eq!(ops, historical_ops);
827                assert_eq!(proof.digests, historical_proof.digests);
828
829                // Verify proof against reference root
830                assert!(verify_proof(
831                    &hasher,
832                    &historical_proof,
833                    start_loc,
834                    &historical_ops,
835                    &root
836                ));
837            }
838
839            db.destroy().await.unwrap();
840        });
841    }
842
843    #[test]
844    fn test_ordered_any_fixed_db_span_maintenance_under_collisions() {
845        let executor = deterministic::Runner::default();
846        executor.start(|mut context| async move {
847            async fn insert_random<T: Translator>(
848                mut db: Db<mmr::Family, Context, Digest, i32, Sha256, T>,
849                rng: &mut StdRng,
850            ) -> Db<mmr::Family, Context, Digest, i32, Sha256, T> {
851                let mut keys = BTreeMap::new();
852
853                // Insert 1000 random keys into both the db and an ordered map.
854                {
855                    let mut batch = db.new_batch();
856                    for i in 0..1000 {
857                        let key = Digest::random(&mut *rng);
858                        keys.insert(key, i);
859                        batch = batch.write(key, Some(i));
860                    }
861                    let merkleized = batch.merkleize(&db, None).await.unwrap();
862                    db.apply_batch(merkleized).await.unwrap();
863                }
864
865                // Make sure the db and ordered map agree on contents & key order.
866                let mut iter = keys.iter();
867                let first_key = iter.next().unwrap().0;
868                let mut next_key = db.get_all(first_key).await.unwrap().unwrap().1;
869                for (key, value) in iter {
870                    let (v, next) = db.get_all(key).await.unwrap().unwrap();
871                    assert_eq!(*value, v);
872                    assert_eq!(*key, next_key);
873                    assert_eq!(db.get_span(key).await.unwrap().unwrap().1.next_key, next);
874                    next_key = next;
875                }
876
877                // Delete some random keys and check order agreement again.
878                {
879                    let mut batch = db.new_batch();
880                    for _ in 0..500 {
881                        let key = keys.keys().choose(rng).cloned().unwrap();
882                        keys.remove(&key);
883                        batch = batch.write(key, None);
884                    }
885                    let merkleized = batch.merkleize(&db, None).await.unwrap();
886                    db.apply_batch(merkleized).await.unwrap();
887                }
888
889                let mut iter = keys.iter();
890                let first_key = iter.next().unwrap().0;
891                let mut next_key = db.get_all(first_key).await.unwrap().unwrap().1;
892                for (key, value) in iter {
893                    let (v, next) = db.get_all(key).await.unwrap().unwrap();
894                    assert_eq!(*value, v);
895                    assert_eq!(*key, next_key);
896                    assert_eq!(db.get_span(key).await.unwrap().unwrap().1.next_key, next);
897                    next_key = next;
898                }
899
900                // Delete the rest of the keys and make sure we get back to empty.
901                {
902                    let mut batch = db.new_batch();
903                    for _ in 0..500 {
904                        let key = keys.keys().choose(rng).cloned().unwrap();
905                        keys.remove(&key);
906                        batch = batch.write(key, None);
907                    }
908                    let merkleized = batch.merkleize(&db, None).await.unwrap();
909                    db.apply_batch(merkleized).await.unwrap();
910                }
911                assert_eq!(keys.len(), 0);
912                assert!(db.is_empty());
913                assert_eq!(db.get_span(&Digest::random(&mut *rng)).await.unwrap(), None);
914                db
915            }
916
917            let mut rng = StdRng::seed_from_u64(context.next_u64());
918            let seed = context.next_u64();
919
920            // Use a OneCap to ensure many collisions.
921            let config = fixed_db_config::<OneCap>(&seed.to_string(), &context);
922            let db = Db::<mmr::Family, Context, Digest, i32, Sha256, OneCap>::init(
923                context.with_label("first"),
924                config,
925            )
926            .await
927            .unwrap();
928            let db = insert_random(db, &mut rng).await;
929            db.destroy().await.unwrap();
930
931            // Repeat test with TwoCap to test low/no collisions.
932            let config = fixed_db_config::<TwoCap>(&seed.to_string(), &context);
933            let db = Db::<mmr::Family, Context, Digest, i32, Sha256, TwoCap>::init(
934                context.with_label("second"),
935                config,
936            )
937            .await
938            .unwrap();
939            let db = insert_random(db, &mut rng).await;
940            db.destroy().await.unwrap();
941        });
942    }
943
944    // Tests using FixedBytes<4> keys (for edge cases that require specific key patterns)
945
946    /// Type alias for a fixed db with FixedBytes<4> keys.
947    type FixedDb = Db<mmr::Family, Context, FixedBytes<4>, Digest, Sha256, TwoCap>;
948
949    /// Return a fixed db with FixedBytes<4> keys.
950    async fn open_fixed_db(context: Context) -> FixedDb {
951        let cfg = fixed_db_config("fixed-bytes-partition", &context);
952        FixedDb::init(context, cfg).await.unwrap()
953    }
954
955    #[test_traced("WARN")]
956    fn test_ordered_any_fixed_db_empty() {
957        let executor = deterministic::Runner::default();
958        executor.start(|context| async move {
959            let db = open_fixed_db(context.with_label("initial")).await;
960            test_ordered_any_db_empty(context, db, |ctx| Box::pin(open_fixed_db(ctx))).await;
961        });
962    }
963
964    #[test_traced("WARN")]
965    fn test_ordered_any_fixed_db_basic() {
966        let executor = deterministic::Runner::default();
967        executor.start(|context| async move {
968            let db = open_fixed_db(context.with_label("initial")).await;
969            test_ordered_any_db_basic(context, db, |ctx| Box::pin(open_fixed_db(ctx))).await;
970        });
971    }
972
973    #[test_traced("WARN")]
974    fn test_ordered_any_update_collision_edge_case_fixed() {
975        let executor = deterministic::Runner::default();
976        executor.start(|context| async move {
977            let db = open_fixed_db(context.clone()).await;
978            test_ordered_any_update_collision_edge_case(db).await;
979        });
980    }
981
982    /// Builds a db with one key, and then creates another non-colliding key preceeding it in a
983    /// batch. The prev_key search will have to "cycle around" in order to find the correct next_key
984    /// value.
985    #[test_traced("WARN")]
986    fn test_ordered_any_batch_create_with_cycling_next_key() {
987        let executor = deterministic::Runner::default();
988        executor.start(|context| async move {
989            let mut db = open_fixed_db(context.clone()).await;
990
991            let mid_key = FixedBytes::from([0xAAu8; 4]);
992            let val = Sha256::fill(1u8);
993            let merkleized = db
994                .new_batch()
995                .write(mid_key.clone(), Some(val))
996                .merkleize(&db, None)
997                .await
998                .unwrap();
999            db.apply_batch(merkleized).await.unwrap();
1000
1001            // Batch-insert a preceeding non-translated-colliding key.
1002            let preceeding_key = FixedBytes::from([0x55u8; 4]);
1003
1004            let merkleized = db
1005                .new_batch()
1006                .write(preceeding_key.clone(), Some(val))
1007                .merkleize(&db, None)
1008                .await
1009                .unwrap();
1010            db.apply_batch(merkleized).await.unwrap();
1011
1012            assert_eq!(db.get(&preceeding_key).await.unwrap().unwrap(), val);
1013            assert_eq!(db.get(&mid_key).await.unwrap().unwrap(), val);
1014
1015            let span1 = db.get_span(&preceeding_key).await.unwrap().unwrap();
1016            assert_eq!(span1.1.next_key, mid_key);
1017            let span2 = db.get_span(&mid_key).await.unwrap().unwrap();
1018            assert_eq!(span2.1.next_key, preceeding_key);
1019
1020            db.destroy().await.unwrap();
1021        });
1022    }
1023
1024    /// Builds a db with three keys A < B < C, then batch-deletes B. Verifies that A's next_key is
1025    /// correctly updated to C (skipping the deleted B).
1026    #[test_traced("WARN")]
1027    fn test_ordered_any_batch_delete_middle_key() {
1028        let executor = deterministic::Runner::default();
1029        executor.start(|context| async move {
1030            let mut db = open_fixed_db(context.clone()).await;
1031
1032            let key_a = FixedBytes::from([0x11u8; 4]);
1033            let key_b = FixedBytes::from([0x22u8; 4]);
1034            let key_c = FixedBytes::from([0x33u8; 4]);
1035            let val = Sha256::fill(1u8);
1036
1037            // Create three keys in order: A -> B -> C -> A (circular)
1038            let merkleized = db
1039                .new_batch()
1040                .write(key_a.clone(), Some(val))
1041                .write(key_b.clone(), Some(val))
1042                .write(key_c.clone(), Some(val))
1043                .merkleize(&db, None)
1044                .await
1045                .unwrap();
1046            db.apply_batch(merkleized).await.unwrap();
1047
1048            // Verify initial spans
1049            let span_a = db.get_span(&key_a).await.unwrap().unwrap();
1050            assert_eq!(span_a.1.next_key, key_b);
1051            let span_b = db.get_span(&key_b).await.unwrap().unwrap();
1052            assert_eq!(span_b.1.next_key, key_c);
1053            let span_c = db.get_span(&key_c).await.unwrap().unwrap();
1054            assert_eq!(span_c.1.next_key, key_a);
1055
1056            // Batch-delete the middle key B
1057            let merkleized = db
1058                .new_batch()
1059                .write(key_b.clone(), None)
1060                .merkleize(&db, None)
1061                .await
1062                .unwrap();
1063            db.apply_batch(merkleized).await.unwrap();
1064
1065            // Verify B is deleted
1066            assert!(db.get(&key_b).await.unwrap().is_none());
1067
1068            // Verify A's next_key is now C (not B)
1069            let span_a = db.get_span(&key_a).await.unwrap().unwrap();
1070            assert_eq!(span_a.1.next_key, key_c);
1071
1072            // Verify C's next_key is still A
1073            let span_c = db.get_span(&key_c).await.unwrap().unwrap();
1074            assert_eq!(span_c.1.next_key, key_a);
1075
1076            db.destroy().await.unwrap();
1077        });
1078    }
1079
1080    #[test_traced("WARN")]
1081    fn test_ordered_any_stream_range() {
1082        let executor = deterministic::Runner::default();
1083        executor.start(|context| async move {
1084            let mut db = open_fixed_db(context.clone()).await;
1085
1086            let key1 = FixedBytes::from([0x10u8, 0x00, 0x00, 0x05]);
1087            let val = Sha256::fill(1u8);
1088
1089            // Test the single-bucket case.
1090            let merkleized = db
1091                .new_batch()
1092                .write(key1.clone(), Some(val))
1093                .merkleize(&db, None)
1094                .await
1095                .unwrap();
1096            db.apply_batch(merkleized).await.unwrap();
1097
1098            // Start key is in the DB.
1099            {
1100                let mut stream = db.stream_range(key1.clone()).await.unwrap().boxed_local();
1101                assert_eq!(stream.next().await.unwrap().unwrap().0, key1);
1102                assert!(stream.next().await.is_none());
1103            }
1104
1105            // Start key collides & precedes the only key in the db.
1106            {
1107                let start = FixedBytes::from([0x10u8, 0x00, 0x00, 0x01]);
1108                let mut stream = db.stream_range(start).await.unwrap().boxed_local();
1109                assert_eq!(stream.next().await.unwrap().unwrap().0, key1);
1110                assert!(stream.next().await.is_none());
1111            }
1112
1113            // Start key collides & follows the only key in the db.
1114            {
1115                let start = FixedBytes::from([0x10u8, 0x00, 0x00, 0xFF]);
1116                let mut stream = db.stream_range(start).await.unwrap().boxed_local();
1117                assert!(stream.next().await.is_none());
1118            }
1119
1120            // Start key precedes the key in the DB without colliding.
1121            {
1122                let start = FixedBytes::from([0x00u8, 0x00, 0x00, 0x01]);
1123                let mut stream = db.stream_range(start).await.unwrap().boxed_local();
1124                assert_eq!(stream.next().await.unwrap().unwrap().0, key1);
1125                assert!(stream.next().await.is_none());
1126            }
1127
1128            // Start key follows the key in the DB without colliding.
1129            {
1130                let start = FixedBytes::from([0xFFu8, 0x00, 0x00, 0x11]);
1131                let mut stream = db.stream_range(start).await.unwrap().boxed_local();
1132                assert!(stream.next().await.is_none());
1133            }
1134
1135            // Now test the multiple bucket cases.
1136            let key2_1 = FixedBytes::from([0x20u8, 0x00, 0x00, 0x05]);
1137            let key2_2 = FixedBytes::from([0x20u8, 0x00, 0x00, 0x11]);
1138            let key3 = FixedBytes::from([0x30u8, 0x00, 0x00, 0x05]);
1139
1140            let merkleized = db
1141                .new_batch()
1142                .write(key2_1.clone(), Some(val))
1143                .write(key2_2.clone(), Some(val))
1144                .write(key3.clone(), Some(val))
1145                .merkleize(&db, None)
1146                .await
1147                .unwrap();
1148            db.apply_batch(merkleized).await.unwrap();
1149
1150            // Start key is in the DB.
1151            {
1152                let mut stream = db.stream_range(key1.clone()).await.unwrap().boxed_local();
1153                assert_eq!(stream.next().await.unwrap().unwrap().0, key1);
1154                assert_eq!(stream.next().await.unwrap().unwrap().0, key2_1);
1155                assert_eq!(stream.next().await.unwrap().unwrap().0, key2_2);
1156                assert_eq!(stream.next().await.unwrap().unwrap().0, key3);
1157                assert!(stream.next().await.is_none());
1158            }
1159
1160            // Start key is not in DB but collides with an earlier key.
1161            {
1162                let start = FixedBytes::from([0x10u8, 0x00, 0x00, 0xFF]);
1163                let mut stream = db.stream_range(start).await.unwrap().boxed_local();
1164                assert_eq!(stream.next().await.unwrap().unwrap().0, key2_1);
1165                assert_eq!(stream.next().await.unwrap().unwrap().0, key2_2);
1166                assert_eq!(stream.next().await.unwrap().unwrap().0, key3);
1167                assert!(stream.next().await.is_none());
1168            }
1169
1170            // Start key is not in the DB but collides with a later key.
1171            {
1172                let start = FixedBytes::from([0x10u8, 0x00, 0x00, 0x00]);
1173                let mut stream = db.stream_range(start).await.unwrap().boxed_local();
1174                assert_eq!(stream.next().await.unwrap().unwrap().0, key1);
1175                assert_eq!(stream.next().await.unwrap().unwrap().0, key2_1);
1176                assert_eq!(stream.next().await.unwrap().unwrap().0, key2_2);
1177                assert_eq!(stream.next().await.unwrap().unwrap().0, key3);
1178                assert!(stream.next().await.is_none());
1179            }
1180
1181            // Start key is not in the DB but falls between two colliding keys.
1182            {
1183                let start = FixedBytes::from([0x20u8, 0x00, 0x00, 0x06]);
1184                let mut stream = db.stream_range(start).await.unwrap().boxed_local();
1185                assert_eq!(stream.next().await.unwrap().unwrap().0, key2_2);
1186                assert_eq!(stream.next().await.unwrap().unwrap().0, key3);
1187                assert!(stream.next().await.is_none());
1188            }
1189
1190            // Start key is in the DB and collides with an earlier key.
1191            {
1192                let mut stream = db.stream_range(key2_2.clone()).await.unwrap().boxed_local();
1193                assert_eq!(stream.next().await.unwrap().unwrap().0, key2_2);
1194                assert_eq!(stream.next().await.unwrap().unwrap().0, key3);
1195                assert!(stream.next().await.is_none());
1196            }
1197            // Start key is > key3. Should yield nothing.
1198            {
1199                let start = FixedBytes::from([0x40u8, 0x00, 0x00, 0x00]);
1200                let mut stream = db.stream_range(start).await.unwrap().boxed_local();
1201                assert!(stream.next().await.is_none());
1202            }
1203
1204            db.destroy().await.unwrap();
1205        });
1206    }
1207
1208    fn key(i: u64) -> Digest {
1209        Sha256::hash(&i.to_be_bytes())
1210    }
1211
1212    fn val(i: u64) -> Digest {
1213        Sha256::hash(&(i + 10000).to_be_bytes())
1214    }
1215
1216    /// Helper: commit a batch of key-value writes and return the applied range (generic).
1217    async fn commit_writes_generic<F: crate::merkle::Family>(
1218        db: &mut AnyTestGeneric<F>,
1219        writes: impl IntoIterator<Item = (Digest, Option<Digest>)>,
1220        metadata: Option<Digest>,
1221    ) -> std::ops::Range<GenericLocation<F>> {
1222        let mut batch = db.new_batch();
1223        for (k, v) in writes {
1224            batch = batch.write(k, v);
1225        }
1226        let merkleized = batch.merkleize(db, metadata).await.unwrap();
1227        let range = db.apply_batch(merkleized).await.unwrap();
1228        db.commit().await.unwrap();
1229        range
1230    }
1231
1232    // -- Generic inner functions for parameterized batch tests --
1233
1234    async fn batch_empty_inner<F: crate::merkle::Family>(context: deterministic::Context) {
1235        let mut db = open_db_generic::<F>(context.with_label("db")).await;
1236        let root_before = db.root();
1237
1238        let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
1239        db.apply_batch(merkleized).await.unwrap();
1240        assert_ne!(db.root(), root_before);
1241
1242        commit_writes_generic(&mut db, [(key(0), Some(val(0)))], None).await;
1243        assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
1244
1245        db.destroy().await.unwrap();
1246    }
1247
1248    async fn batch_metadata_inner<F: crate::merkle::Family>(context: deterministic::Context) {
1249        let mut db = open_db_generic::<F>(context.with_label("db")).await;
1250        let metadata = val(42);
1251
1252        commit_writes_generic(&mut db, [(key(0), Some(val(0)))], Some(metadata)).await;
1253        assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
1254
1255        let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
1256        db.apply_batch(merkleized).await.unwrap();
1257        assert_eq!(db.get_metadata().await.unwrap(), None);
1258
1259        db.destroy().await.unwrap();
1260    }
1261
1262    async fn batch_get_read_through_inner<F: crate::merkle::Family>(
1263        context: deterministic::Context,
1264    ) {
1265        let mut db = open_db_generic::<F>(context.with_label("db")).await;
1266
1267        let ka = key(0);
1268        let va = val(0);
1269        commit_writes_generic(&mut db, [(ka, Some(va))], None).await;
1270
1271        let kb = key(1);
1272        let vb = val(1);
1273        let kc = key(2);
1274
1275        let mut batch = db.new_batch();
1276        assert_eq!(batch.get(&ka, &db).await.unwrap(), Some(va));
1277
1278        batch = batch.write(kb, Some(vb));
1279        assert_eq!(batch.get(&kb, &db).await.unwrap(), Some(vb));
1280        assert_eq!(batch.get(&kc, &db).await.unwrap(), None);
1281
1282        let va2 = val(100);
1283        batch = batch.write(ka, Some(va2));
1284        assert_eq!(batch.get(&ka, &db).await.unwrap(), Some(va2));
1285
1286        batch = batch.write(ka, None);
1287        assert_eq!(batch.get(&ka, &db).await.unwrap(), None);
1288
1289        db.destroy().await.unwrap();
1290    }
1291
1292    async fn batch_get_on_merkleized_inner<F: crate::merkle::Family>(
1293        context: deterministic::Context,
1294    ) {
1295        let mut db = open_db_generic::<F>(context.with_label("db")).await;
1296
1297        let ka = key(0);
1298        let kb = key(1);
1299        let kc = key(2);
1300        let kd = key(3);
1301
1302        commit_writes_generic(&mut db, [(ka, Some(val(0))), (kb, Some(val(1)))], None).await;
1303
1304        let va2 = val(100);
1305        let vc = val(2);
1306        let mut batch = db.new_batch();
1307        batch = batch.write(ka, Some(va2));
1308        batch = batch.write(kb, None);
1309        batch = batch.write(kc, Some(vc));
1310        let merkleized = batch.merkleize(&db, None).await.unwrap();
1311
1312        assert_eq!(merkleized.get(&ka, &db).await.unwrap(), Some(va2));
1313        assert_eq!(merkleized.get(&kb, &db).await.unwrap(), None);
1314        assert_eq!(merkleized.get(&kc, &db).await.unwrap(), Some(vc));
1315        assert_eq!(merkleized.get(&kd, &db).await.unwrap(), None);
1316
1317        db.destroy().await.unwrap();
1318    }
1319
1320    async fn batch_stacked_get_inner<F: crate::merkle::Family>(context: deterministic::Context) {
1321        let db = open_db_generic::<F>(context.with_label("db")).await;
1322
1323        let ka = key(0);
1324        let kb = key(1);
1325
1326        let mut batch = db.new_batch();
1327        batch = batch.write(ka, Some(val(0)));
1328        let merkleized = batch.merkleize(&db, None).await.unwrap();
1329
1330        let mut child = merkleized.new_batch::<Sha256>();
1331        assert_eq!(child.get(&ka, &db).await.unwrap(), Some(val(0)));
1332
1333        child = child.write(ka, Some(val(100)));
1334        assert_eq!(child.get(&ka, &db).await.unwrap(), Some(val(100)));
1335
1336        child = child.write(kb, Some(val(1)));
1337        assert_eq!(child.get(&kb, &db).await.unwrap(), Some(val(1)));
1338
1339        child = child.write(ka, None);
1340        assert_eq!(child.get(&ka, &db).await.unwrap(), None);
1341
1342        drop(child);
1343        drop(merkleized);
1344        db.destroy().await.unwrap();
1345    }
1346
1347    async fn batch_stacked_delete_recreate_inner<F: crate::merkle::Family>(
1348        context: deterministic::Context,
1349    ) {
1350        let mut db = open_db_generic::<F>(context.with_label("db")).await;
1351        let ka = key(0);
1352
1353        commit_writes_generic(&mut db, [(ka, Some(val(0)))], None).await;
1354
1355        let parent_m = db
1356            .new_batch()
1357            .write(ka, None)
1358            .merkleize(&db, None)
1359            .await
1360            .unwrap();
1361        assert_eq!(parent_m.get(&ka, &db).await.unwrap(), None);
1362
1363        let child_m = parent_m
1364            .new_batch::<Sha256>()
1365            .write(ka, Some(val(200)))
1366            .merkleize(&db, None)
1367            .await
1368            .unwrap();
1369        assert_eq!(child_m.get(&ka, &db).await.unwrap(), Some(val(200)));
1370
1371        db.apply_batch(child_m).await.unwrap();
1372        assert_eq!(db.get(&ka).await.unwrap(), Some(val(200)));
1373
1374        db.destroy().await.unwrap();
1375    }
1376
1377    async fn batch_apply_returns_range_inner<F: crate::merkle::Family>(
1378        context: deterministic::Context,
1379    ) {
1380        let mut db = open_db_generic::<F>(context.with_label("db")).await;
1381
1382        let writes: Vec<_> = (0..5).map(|i| (key(i), Some(val(i)))).collect();
1383        let range1 = commit_writes_generic(&mut db, writes, None).await;
1384
1385        assert_eq!(range1.start, GenericLocation::<F>::new(1));
1386        assert!(range1.end.saturating_sub(*range1.start) >= 6);
1387
1388        let writes: Vec<_> = (5..10).map(|i| (key(i), Some(val(i)))).collect();
1389        let range2 = commit_writes_generic(&mut db, writes, None).await;
1390        assert_eq!(range2.start, range1.end);
1391
1392        db.destroy().await.unwrap();
1393    }
1394
1395    async fn batch_speculative_root_inner<F: crate::merkle::Family>(
1396        context: deterministic::Context,
1397    ) {
1398        let mut db = open_db_generic::<F>(context.with_label("db")).await;
1399
1400        let mut batch = db.new_batch();
1401        for i in 0..10 {
1402            batch = batch.write(key(i), Some(val(i)));
1403        }
1404        let merkleized = batch.merkleize(&db, None).await.unwrap();
1405        let speculative_root = merkleized.root();
1406
1407        db.apply_batch(merkleized).await.unwrap();
1408        assert_eq!(db.root(), speculative_root);
1409
1410        db.destroy().await.unwrap();
1411    }
1412
1413    async fn log_replay_inner<F: crate::merkle::Family>(context: deterministic::Context) {
1414        let db_context = context.with_label("db");
1415        let mut db = open_db_generic::<F>(db_context.clone()).await;
1416
1417        const UPDATES: u64 = 100;
1418        let k = Sha256::hash(&UPDATES.to_be_bytes());
1419        for i in 0u64..UPDATES {
1420            let v = Sha256::hash(&(i * 1000).to_be_bytes());
1421            let merkleized = db
1422                .new_batch()
1423                .write(k, Some(v))
1424                .merkleize(&db, None)
1425                .await
1426                .unwrap();
1427            db.apply_batch(merkleized).await.unwrap();
1428        }
1429        db.commit().await.unwrap();
1430        let root = db.root();
1431
1432        drop(db);
1433        let db: AnyTestGeneric<F> = open_db_generic::<F>(db_context.with_label("reopened")).await;
1434        let iter = db.snapshot.get(&k);
1435        assert_eq!(iter.cloned().collect::<Vec<_>>().len(), 1);
1436        assert_eq!(db.root(), root);
1437
1438        db.destroy().await.unwrap();
1439    }
1440
1441    /// Regression test: child batch deleting a key whose predecessor shares the
1442    /// same translated-key bucket must update the predecessor's next_key.
1443    #[test_traced("INFO")]
1444    fn test_ordered_child_delete_colliding_key_corrupts_next_key() {
1445        let executor = deterministic::Runner::default();
1446        executor.start(|context| async move {
1447            let mut db = open_db(context.with_label("db")).await;
1448
1449            // B and K collide under TwoCap (same first 2 bytes 0xAABB).
1450            let key_b = Digest::from({
1451                let mut b = [0u8; 32];
1452                b[0] = 0xAA;
1453                b[1] = 0xBB;
1454                b[2] = 0x01;
1455                b
1456            });
1457            let key_k = Digest::from({
1458                let mut k = [0u8; 32];
1459                k[0] = 0xAA;
1460                k[1] = 0xBB;
1461                k[2] = 0x02;
1462                k
1463            });
1464            // A is in a different bucket.
1465            let key_a = Digest::from({
1466                let mut a = [0u8; 32];
1467                a[0] = 0x11;
1468                a[1] = 0x22;
1469                a
1470            });
1471
1472            // Commit A, B, K.
1473            commit_writes_generic(
1474                &mut db,
1475                [
1476                    (key_a, Some(val(1))),
1477                    (key_b, Some(val(2))),
1478                    (key_k, Some(val(3))),
1479                ],
1480                None,
1481            )
1482            .await;
1483
1484            // Add padding keys so the floor-raise in subsequent batches moves
1485            // padding ops (not B) and B stays at its original log position.
1486            let mut padding_keys = Vec::new();
1487            for i in 0..20u64 {
1488                let pk = Digest::from({
1489                    let mut p = [0u8; 32];
1490                    p[0] = 0xCC;
1491                    p[1] = i as u8;
1492                    p
1493                });
1494                padding_keys.push(pk);
1495                commit_writes_generic(&mut db, [(pk, Some(val(100 + i)))], None).await;
1496            }
1497
1498            // Sanity: B.next_key == K before the speculative batches.
1499            let (_, next_b) = db.get_all(&key_b).await.unwrap().unwrap();
1500            assert_eq!(next_b, key_k, "B.next_key should be K before delete");
1501
1502            // Parent batch: update K's value (K enters base_diff).
1503            let mut parent = db.new_batch();
1504            parent = parent.write(key_k, Some(val(4)));
1505            let parent_m = parent.merkleize(&db, None).await.unwrap();
1506
1507            // Child batch: delete K.
1508            let mut child = parent_m.new_batch::<Sha256>();
1509            child = child.write(key_k, None);
1510            let child_m = child.merkleize(&db, None).await.unwrap();
1511
1512            // Apply and commit.
1513            db.apply_batch(child_m).await.unwrap();
1514            db.commit().await.unwrap();
1515
1516            // K should be deleted.
1517            assert!(db.get(&key_k).await.unwrap().is_none());
1518
1519            // B.next_key must not point to deleted K.
1520            let (_, b_next) = db.get_all(&key_b).await.unwrap().unwrap();
1521            assert_ne!(
1522                b_next, key_k,
1523                "B.next_key still points to deleted K (corrupted next_key ring)"
1524            );
1525            assert_eq!(b_next, padding_keys[0]);
1526
1527            db.destroy().await.unwrap();
1528        });
1529    }
1530
1531    // -- MMR test wrappers --
1532
1533    #[test_traced("INFO")]
1534    fn test_ordered_fixed_batch_empty() {
1535        let executor = deterministic::Runner::default();
1536        executor.start(batch_empty_inner::<mmr::Family>);
1537    }
1538
1539    #[test_traced("INFO")]
1540    fn test_ordered_fixed_batch_metadata() {
1541        let executor = deterministic::Runner::default();
1542        executor.start(batch_metadata_inner::<mmr::Family>);
1543    }
1544
1545    #[test_traced("INFO")]
1546    fn test_ordered_fixed_batch_get_read_through() {
1547        let executor = deterministic::Runner::default();
1548        executor.start(batch_get_read_through_inner::<mmr::Family>);
1549    }
1550
1551    #[test_traced("INFO")]
1552    fn test_ordered_fixed_batch_get_on_merkleized() {
1553        let executor = deterministic::Runner::default();
1554        executor.start(batch_get_on_merkleized_inner::<mmr::Family>);
1555    }
1556
1557    #[test_traced("INFO")]
1558    fn test_ordered_fixed_batch_stacked_get() {
1559        let executor = deterministic::Runner::default();
1560        executor.start(batch_stacked_get_inner::<mmr::Family>);
1561    }
1562
1563    #[test_traced("INFO")]
1564    fn test_ordered_fixed_batch_stacked_delete_recreate() {
1565        let executor = deterministic::Runner::default();
1566        executor.start(batch_stacked_delete_recreate_inner::<mmr::Family>);
1567    }
1568
1569    #[test_traced("INFO")]
1570    fn test_ordered_fixed_batch_apply_returns_range() {
1571        let executor = deterministic::Runner::default();
1572        executor.start(batch_apply_returns_range_inner::<mmr::Family>);
1573    }
1574
1575    #[test_traced("INFO")]
1576    fn test_ordered_fixed_batch_speculative_root() {
1577        let executor = deterministic::Runner::default();
1578        executor.start(batch_speculative_root_inner::<mmr::Family>);
1579    }
1580
1581    #[test_traced("WARN")]
1582    fn test_ordered_any_fixed_db_log_replay() {
1583        let executor = deterministic::Runner::default();
1584        executor.start(log_replay_inner::<mmr::Family>);
1585    }
1586
1587    // -- MMB test wrappers --
1588
1589    #[test_traced("INFO")]
1590    fn test_ordered_fixed_batch_empty_mmb() {
1591        let executor = deterministic::Runner::default();
1592        executor.start(batch_empty_inner::<crate::merkle::mmb::Family>);
1593    }
1594
1595    #[test_traced("INFO")]
1596    fn test_ordered_fixed_batch_metadata_mmb() {
1597        let executor = deterministic::Runner::default();
1598        executor.start(batch_metadata_inner::<crate::merkle::mmb::Family>);
1599    }
1600
1601    #[test_traced("INFO")]
1602    fn test_ordered_fixed_batch_get_read_through_mmb() {
1603        let executor = deterministic::Runner::default();
1604        executor.start(batch_get_read_through_inner::<crate::merkle::mmb::Family>);
1605    }
1606
1607    #[test_traced("INFO")]
1608    fn test_ordered_fixed_batch_get_on_merkleized_mmb() {
1609        let executor = deterministic::Runner::default();
1610        executor.start(batch_get_on_merkleized_inner::<crate::merkle::mmb::Family>);
1611    }
1612
1613    #[test_traced("INFO")]
1614    fn test_ordered_fixed_batch_stacked_get_mmb() {
1615        let executor = deterministic::Runner::default();
1616        executor.start(batch_stacked_get_inner::<crate::merkle::mmb::Family>);
1617    }
1618
1619    #[test_traced("INFO")]
1620    fn test_ordered_fixed_batch_stacked_delete_recreate_mmb() {
1621        let executor = deterministic::Runner::default();
1622        executor.start(batch_stacked_delete_recreate_inner::<crate::merkle::mmb::Family>);
1623    }
1624
1625    #[test_traced("INFO")]
1626    fn test_ordered_fixed_batch_apply_returns_range_mmb() {
1627        let executor = deterministic::Runner::default();
1628        executor.start(batch_apply_returns_range_inner::<crate::merkle::mmb::Family>);
1629    }
1630
1631    #[test_traced("INFO")]
1632    fn test_ordered_fixed_batch_speculative_root_mmb() {
1633        let executor = deterministic::Runner::default();
1634        executor.start(batch_speculative_root_inner::<crate::merkle::mmb::Family>);
1635    }
1636
1637    #[test_traced("WARN")]
1638    fn test_ordered_any_fixed_db_log_replay_mmb() {
1639        let executor = deterministic::Runner::default();
1640        executor.start(log_replay_inner::<crate::merkle::mmb::Family>);
1641    }
1642
1643    // -- MMR-only tests (use verify_proof / Position which are MMR-specific) --
1644
1645    fn is_send<T: Send>(_: T) {}
1646
1647    #[allow(dead_code)]
1648    fn assert_non_trait_futures_are_send(db: &mut AnyTest, key: Digest) {
1649        is_send(db.get_all(&key));
1650        is_send(db.get_with_loc(&key));
1651        is_send(db.get_span(&key));
1652    }
1653
1654    // FromSyncTestable implementation for from_sync_result tests
1655    mod from_sync_testable {
1656        use super::*;
1657        use crate::{
1658            merkle::{
1659                mmr::{self, journaled::Mmr},
1660                Family as _,
1661            },
1662            qmdb::any::sync::tests::FromSyncTestable,
1663        };
1664        use futures::future::join_all;
1665
1666        type TestMmr = Mmr<deterministic::Context, Digest>;
1667
1668        impl FromSyncTestable for AnyTest {
1669            type Mmr = TestMmr;
1670
1671            fn into_log_components(self) -> (Self::Mmr, Self::Journal) {
1672                (self.log.merkle, self.log.journal)
1673            }
1674
1675            async fn pinned_nodes_at(&self, loc: Location) -> Vec<Digest> {
1676                join_all(mmr::Family::nodes_to_pin(loc).map(|p| self.log.merkle.get_node(p)))
1677                    .await
1678                    .into_iter()
1679                    .map(|n| n.unwrap().unwrap())
1680                    .collect()
1681            }
1682        }
1683    }
1684}