Skip to main content

commonware_storage/qmdb/any/ordered/
variable.rs

1//! An authenticated database that provides succinct proofs of _any_ value ever associated
2//! with a key, maintains a next-key ordering for each active key, and allows values to have
3//! variable sizes.
4//!
5//! _If the values you wish to store all have the same size, use [crate::qmdb::any::ordered::fixed]
6//! instead for better performance._
7
8use crate::{
9    index::ordered::Index,
10    journal::contiguous::variable::Journal,
11    merkle::{self, Location},
12    qmdb::{
13        any::{ordered, value::VariableEncoding, VariableConfig, VariableValue},
14        operation::Key,
15        Error,
16    },
17    translator::Translator,
18    Context,
19};
20use commonware_codec::{Codec, Read};
21use commonware_cryptography::Hasher;
22
23pub type Update<K, V> = ordered::Update<K, VariableEncoding<V>>;
24pub type Operation<F, K, V> = ordered::Operation<F, K, VariableEncoding<V>>;
25
26/// A key-value QMDB based on an authenticated log of operations, supporting authentication of any
27/// value ever associated with a key.
28pub type Db<F, E, K, V, H, T> =
29    super::Db<F, E, Journal<E, Operation<F, K, V>>, Index<T, Location<F>>, H, Update<K, V>>;
30
31impl<F: merkle::Family, E: Context, K: Key, V: VariableValue, H: Hasher, T: Translator>
32    Db<F, E, K, V, H, T>
33where
34    Operation<F, K, V>: Codec,
35{
36    /// Returns a [Db] QMDB initialized from `cfg`. Any uncommitted log operations will be
37    /// discarded and the state of the db will be as of the last committed operation.
38    pub async fn init(
39        context: E,
40        cfg: VariableConfig<T, <Operation<F, K, V> as Read>::Cfg>,
41    ) -> Result<Self, Error<F>> {
42        Self::init_with_callback(context, cfg, None, |_, _| {}).await
43    }
44
45    /// Initialize the DB, invoking `callback` for each operation processed during recovery.
46    ///
47    /// If `known_inactivity_floor` is provided and is less than the log's actual inactivity floor,
48    /// `callback` is invoked with `(false, None)` for each location in the gap. Then, as the
49    /// snapshot is built from the log, `callback` is invoked for each operation with its activity
50    /// status and previous location (if any).
51    pub(crate) async fn init_with_callback(
52        context: E,
53        cfg: VariableConfig<T, <Operation<F, K, V> as Read>::Cfg>,
54        known_inactivity_floor: Option<Location<F>>,
55        callback: impl FnMut(bool, Option<Location<F>>),
56    ) -> Result<Self, Error<F>> {
57        crate::qmdb::any::init(context, cfg, known_inactivity_floor, callback).await
58    }
59}
60
61/// Partitioned index variants that divide the key space into `2^(P*8)` partitions.
62///
63/// See [partitioned::Db] for the generic type, or use the convenience aliases:
64/// - [partitioned::p256::Db] for 256 partitions (P=1)
65/// - [partitioned::p64k::Db] for 65,536 partitions (P=2)
66pub mod partitioned {
67    pub use super::{Operation, Update};
68    use crate::{
69        index::partitioned::ordered::Index,
70        journal::contiguous::variable::Journal,
71        merkle::{self, Location},
72        qmdb::{
73            any::{VariableConfig, VariableValue},
74            operation::Key,
75            Error,
76        },
77        translator::Translator,
78        Context,
79    };
80    use commonware_codec::{Codec, Read};
81    use commonware_cryptography::Hasher;
82
83    /// An ordered key-value QMDB with a partitioned snapshot index and variable-size values.
84    ///
85    /// This is the partitioned variant of [super::Db]. The const generic `P` specifies
86    /// the number of prefix bytes used for partitioning:
87    /// - `P = 1`: 256 partitions
88    /// - `P = 2`: 65,536 partitions
89    ///
90    /// Use partitioned indices when you have a large number of keys (>> 2^(P*8)) and memory
91    /// efficiency is important. Keys should be uniformly distributed across the prefix space.
92    pub type Db<F, E, K, V, H, T, const P: usize> = crate::qmdb::any::ordered::Db<
93        F,
94        E,
95        Journal<E, Operation<F, K, V>>,
96        Index<T, Location<F>, P>,
97        H,
98        Update<K, V>,
99    >;
100
101    impl<
102            F: merkle::Family,
103            E: Context,
104            K: Key,
105            V: VariableValue,
106            H: Hasher,
107            T: Translator,
108            const P: usize,
109        > Db<F, E, K, V, H, T, P>
110    where
111        Operation<F, K, V>: Codec,
112    {
113        /// Returns a [Db] QMDB initialized from `cfg`. Uncommitted log operations will be
114        /// discarded and the state of the db will be as of the last committed operation.
115        pub async fn init(
116            context: E,
117            cfg: VariableConfig<T, <Operation<F, K, V> as Read>::Cfg>,
118        ) -> Result<Self, Error<F>> {
119            Self::init_with_callback(context, cfg, None, |_, _| {}).await
120        }
121
122        /// Initialize the DB, invoking `callback` for each operation processed during recovery.
123        ///
124        /// If `known_inactivity_floor` is provided and is less than the log's actual inactivity floor,
125        /// `callback` is invoked with `(false, None)` for each location in the gap. Then, as the
126        /// snapshot is built from the log, `callback` is invoked for each operation with its activity
127        /// status and previous location (if any).
128        pub(crate) async fn init_with_callback(
129            context: E,
130            cfg: VariableConfig<T, <Operation<F, K, V> as Read>::Cfg>,
131            known_inactivity_floor: Option<Location<F>>,
132            callback: impl FnMut(bool, Option<Location<F>>),
133        ) -> Result<Self, Error<F>> {
134            crate::qmdb::any::init(context, cfg, known_inactivity_floor, callback).await
135        }
136    }
137
138    /// Convenience type aliases for 256 partitions (P=1).
139    pub mod p256 {
140        /// Variable-value DB with 256 partitions.
141        pub type Db<F, E, K, V, H, T> = super::Db<F, E, K, V, H, T, 1>;
142    }
143
144    /// Convenience type aliases for 65,536 partitions (P=2).
145    pub mod p64k {
146        /// Variable-value DB with 65,536 partitions.
147        pub type Db<F, E, K, V, H, T> = super::Db<F, E, K, V, H, T, 2>;
148    }
149}
150
151#[cfg(test)]
152pub(crate) mod test {
153    use super::*;
154    use crate::{
155        mmr,
156        qmdb::any::{
157            ordered::test::{
158                test_ordered_any_db_basic, test_ordered_any_db_empty,
159                test_ordered_any_update_collision_edge_case,
160            },
161            test::variable_db_config,
162        },
163        translator::TwoCap,
164    };
165    use commonware_cryptography::{sha256::Digest, Sha256};
166    use commonware_macros::test_traced;
167    use commonware_math::algebra::Random;
168    use commonware_runtime::{
169        buffer::paged::CacheRef,
170        deterministic::{self, Context},
171        BufferPooler, Metrics, Runner as _,
172    };
173    use commonware_utils::{sequence::FixedBytes, test_rng_seeded, NZUsize, NZU16, NZU64};
174    use rand::RngCore;
175    // Janky page & cache sizes to exercise boundary conditions.
176    const PAGE_SIZE: u16 = 103;
177    const PAGE_CACHE_SIZE: usize = 13;
178
179    pub(crate) type VarConfig =
180        VariableConfig<TwoCap, ((), (commonware_codec::RangeCfg<usize>, ()))>;
181
182    /// Type alias for the concrete [Db] type used in these unit tests.
183    pub(crate) type AnyTest =
184        Db<mmr::Family, deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap>;
185
186    pub(crate) fn create_test_config(seed: u64, pooler: &impl BufferPooler) -> VarConfig {
187        let page_cache =
188            CacheRef::from_pooler(pooler, NZU16!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE));
189        VariableConfig {
190            merkle_config: crate::mmr::journaled::Config {
191                journal_partition: format!("mmr-journal-{seed}"),
192                metadata_partition: format!("mmr-metadata-{seed}"),
193                items_per_blob: NZU64!(12), // intentionally small and janky size
194                write_buffer: NZUsize!(64),
195                thread_pool: None,
196                page_cache: page_cache.clone(),
197            },
198            journal_config: crate::journal::contiguous::variable::Config {
199                partition: format!("log-journal-{seed}"),
200                items_per_section: NZU64!(14), // intentionally small and janky size
201                write_buffer: NZUsize!(64),
202                compression: None,
203                codec_config: ((), ((0..=10000).into(), ())),
204                page_cache,
205            },
206            translator: TwoCap,
207        }
208    }
209
210    /// Create a test database with unique partition names
211    pub(crate) async fn create_test_db(mut context: Context) -> AnyTest {
212        let seed = context.next_u64();
213        let config = create_test_config(seed, &context);
214        AnyTest::init(context, config).await.unwrap()
215    }
216
217    /// Deterministic byte vector generator for variable-value tests.
218    fn to_bytes(i: u64) -> Vec<u8> {
219        let len = ((i % 13) + 7) as usize;
220        vec![(i % 255) as u8; len]
221    }
222
223    /// Create n random operations using the default seed (0). Some portion of
224    /// the updates are deletes. create_test_ops(n) is a prefix of
225    /// create_test_ops(n') for n < n'.
226    pub(crate) fn create_test_ops(n: usize) -> Vec<Operation<mmr::Family, Digest, Vec<u8>>> {
227        create_test_ops_seeded(n, 0)
228    }
229
230    /// Create n random operations using a specific seed. Use different seeds
231    /// when you need non-overlapping keys in the same test.
232    pub(crate) fn create_test_ops_seeded(
233        n: usize,
234        seed: u64,
235    ) -> Vec<Operation<mmr::Family, Digest, Vec<u8>>> {
236        let mut rng = test_rng_seeded(seed);
237        let mut prev_key = Digest::random(&mut rng);
238        let mut ops = Vec::new();
239        for i in 0..n {
240            if i % 10 == 0 && i > 0 {
241                ops.push(Operation::Delete(prev_key));
242            } else {
243                let key = Digest::random(&mut rng);
244                let next_key = Digest::random(&mut rng);
245                let value = to_bytes(rng.next_u64());
246                ops.push(Operation::Update(ordered::Update {
247                    key,
248                    value,
249                    next_key,
250                }));
251                prev_key = key;
252            }
253        }
254        ops
255    }
256
257    /// Applies the given operations to the database.
258    pub(crate) async fn apply_ops(
259        db: &mut AnyTest,
260        ops: Vec<Operation<mmr::Family, Digest, Vec<u8>>>,
261    ) {
262        let mut batch = db.new_batch();
263        for op in ops {
264            match op {
265                Operation::Update(data) => {
266                    batch = batch.write(data.key, Some(data.value));
267                }
268                Operation::Delete(key) => {
269                    batch = batch.write(key, None);
270                }
271                Operation::CommitFloor(_, _) => {
272                    // CommitFloor consumes self - not supported in this helper.
273                    // Test data from create_test_ops never includes CommitFloor.
274                    panic!("CommitFloor not supported in apply_ops");
275                }
276            }
277        }
278        let merkleized = batch.merkleize(db, None).await.unwrap();
279        db.apply_batch(merkleized).await.unwrap();
280    }
281
282    // Tests using FixedBytes<4> keys (for edge cases that require specific key patterns)
283
284    /// Type alias for a variable db with FixedBytes<4> keys.
285    type VariableDb = Db<mmr::Family, Context, FixedBytes<4>, Digest, Sha256, TwoCap>;
286
287    /// Return a variable db with FixedBytes<4> keys.
288    async fn open_variable_db(context: Context) -> VariableDb {
289        let cfg = variable_db_config("fixed-bytes-var-partition", &context);
290        VariableDb::init(context, cfg).await.unwrap()
291    }
292
293    #[test_traced("WARN")]
294    fn test_ordered_any_variable_db_empty() {
295        let executor = deterministic::Runner::default();
296        executor.start(|context| async move {
297            let db = open_variable_db(context.with_label("initial")).await;
298            test_ordered_any_db_empty(context, db, |ctx| Box::pin(open_variable_db(ctx))).await;
299        });
300    }
301
302    #[test_traced("WARN")]
303    fn test_ordered_any_variable_db_basic() {
304        let executor = deterministic::Runner::default();
305        executor.start(|context| async move {
306            let db = open_variable_db(context.with_label("initial")).await;
307            test_ordered_any_db_basic(context, db, |ctx| Box::pin(open_variable_db(ctx))).await;
308        });
309    }
310
311    #[test_traced("WARN")]
312    fn test_ordered_any_update_collision_edge_case_variable() {
313        let executor = deterministic::Runner::default();
314        executor.start(|context| async move {
315            let db = open_variable_db(context.clone()).await;
316            test_ordered_any_update_collision_edge_case(db).await;
317        });
318    }
319
320    /// Builds a db with two colliding keys, and creates a new one between them using a batch
321    /// update.
322    #[test_traced("WARN")]
323    fn test_ordered_any_update_batch_create_between_collisions() {
324        let executor = deterministic::Runner::default();
325        executor.start(|context| async move {
326            let mut db = open_variable_db(context.clone()).await;
327
328            // This DB uses a TwoCap so we use equivalent two byte prefixes for each key to ensure
329            // collisions.
330            let key1 = FixedBytes::from([0xFFu8, 0xFFu8, 5u8, 5u8]);
331            let key2 = FixedBytes::from([0xFFu8, 0xFFu8, 6u8, 6u8]);
332            let key3 = FixedBytes::from([0xFFu8, 0xFFu8, 7u8, 0u8]);
333            let val = Sha256::fill(1u8);
334
335            let merkleized = db
336                .new_batch()
337                .write(key1.clone(), Some(val))
338                .write(key3.clone(), Some(val))
339                .merkleize(&db, None)
340                .await
341                .unwrap();
342            db.apply_batch(merkleized).await.unwrap();
343
344            assert_eq!(db.get(&key1).await.unwrap().unwrap(), val);
345            assert!(db.get(&key2).await.unwrap().is_none());
346            assert_eq!(db.get(&key3).await.unwrap().unwrap(), val);
347
348            // Batch-insert the middle key.
349            let merkleized = db
350                .new_batch()
351                .write(key2.clone(), Some(val))
352                .merkleize(&db, None)
353                .await
354                .unwrap();
355            db.apply_batch(merkleized).await.unwrap();
356
357            assert_eq!(db.get(&key1).await.unwrap().unwrap(), val);
358            assert_eq!(db.get(&key2).await.unwrap().unwrap(), val);
359            assert_eq!(db.get(&key3).await.unwrap().unwrap(), val);
360
361            let span1 = db.get_span(&key1).await.unwrap().unwrap();
362            assert_eq!(span1.1.next_key, key2);
363            let span2 = db.get_span(&key2).await.unwrap().unwrap();
364            assert_eq!(span2.1.next_key, key3);
365            let span3 = db.get_span(&key3).await.unwrap().unwrap();
366            assert_eq!(span3.1.next_key, key1);
367
368            db.destroy().await.unwrap();
369        });
370    }
371
372    /// Batch create/delete cases where the deleted key is the previous key of a newly created key,
373    /// and vice-versa.
374    #[test_traced("WARN")]
375    fn test_ordered_any_batch_create_delete_prev_links() {
376        let executor = deterministic::Runner::default();
377        executor.start(|context| async move {
378            let key1 = FixedBytes::from([0x10u8, 0x00, 0x00, 0x00]);
379            let key2 = FixedBytes::from([0x20u8, 0x00, 0x00, 0x00]);
380            let key3 = FixedBytes::from([0x30u8, 0x00, 0x00, 0x00]);
381            let val1 = Sha256::fill(1u8);
382            let val2 = Sha256::fill(2u8);
383            let val3 = Sha256::fill(3u8);
384
385            // Delete the previous key of a newly created key.
386            let mut db = open_variable_db(context.with_label("first")).await;
387            let merkleized = db
388                .new_batch()
389                .write(key1.clone(), Some(val1))
390                .write(key3.clone(), Some(val3))
391                .merkleize(&db, None)
392                .await
393                .unwrap();
394            db.apply_batch(merkleized).await.unwrap();
395
396            let merkleized = db
397                .new_batch()
398                .write(key1.clone(), None)
399                .write(key2.clone(), Some(val2))
400                .merkleize(&db, None)
401                .await
402                .unwrap();
403            db.apply_batch(merkleized).await.unwrap();
404
405            assert!(db.get(&key1).await.unwrap().is_none());
406            assert_eq!(db.get(&key2).await.unwrap(), Some(val2));
407            assert_eq!(db.get(&key3).await.unwrap(), Some(val3));
408            let span2 = db.get_span(&key2).await.unwrap().unwrap();
409            assert_eq!(span2.1.next_key, key3);
410            let span3 = db.get_span(&key3).await.unwrap().unwrap();
411            assert_eq!(span3.1.next_key, key2);
412            db.destroy().await.unwrap();
413
414            // Create a key that becomes the previous key of a concurrently deleted key.
415            let mut db = open_variable_db(context.with_label("second")).await;
416            let merkleized = db
417                .new_batch()
418                .write(key1.clone(), Some(val1))
419                .write(key3.clone(), Some(val3))
420                .merkleize(&db, None)
421                .await
422                .unwrap();
423            db.apply_batch(merkleized).await.unwrap();
424
425            let merkleized = db
426                .new_batch()
427                .write(key2.clone(), Some(val2))
428                .write(key3.clone(), None)
429                .merkleize(&db, None)
430                .await
431                .unwrap();
432            db.apply_batch(merkleized).await.unwrap();
433
434            assert_eq!(db.get(&key1).await.unwrap(), Some(val1));
435            assert_eq!(db.get(&key2).await.unwrap(), Some(val2));
436            assert!(db.get(&key3).await.unwrap().is_none());
437            let span1 = db.get_span(&key1).await.unwrap().unwrap();
438            assert_eq!(span1.1.next_key, key2);
439            let span2 = db.get_span(&key2).await.unwrap().unwrap();
440            assert_eq!(span2.1.next_key, key1);
441            db.destroy().await.unwrap();
442        });
443    }
444
445    fn is_send<T: Send>(_: T) {}
446
447    #[allow(dead_code)]
448    fn assert_non_trait_futures_are_send(db: &mut AnyTest, key: Digest) {
449        is_send(db.get_all(&key));
450        is_send(db.get_with_loc(&key));
451        is_send(db.get_span(&key));
452    }
453
454    /// Parent inserts a key, child inserts another; commit parent then
455    /// apply child sequentially. Verifies next-key pointers
456    /// are correct after both commits.
457    #[test_traced("WARN")]
458    fn test_ordered_sequential_commit_basic() {
459        let executor = deterministic::Runner::default();
460        executor.start(|context| async move {
461            let mut db = create_test_db(context).await;
462
463            // Seed with initial data so the ordered index is non-trivial.
464            apply_ops(&mut db, create_test_ops(10)).await;
465            db.commit().await.unwrap();
466
467            let base = db.to_batch();
468
469            // Parent batch: insert key_a.
470            let key_a = Digest::random(&mut test_rng_seeded(800));
471            let val_a = vec![1u8; 10];
472            let parent_batch = base
473                .new_batch::<Sha256>()
474                .write(key_a, Some(val_a.clone()))
475                .merkleize(&db, None)
476                .await
477                .unwrap();
478
479            // Child batch: insert key_b.
480            let key_b = Digest::random(&mut test_rng_seeded(801));
481            let val_b = vec![2u8; 10];
482            let child_batch = parent_batch
483                .new_batch::<Sha256>()
484                .write(key_b, Some(val_b.clone()))
485                .merkleize(&db, None)
486                .await
487                .unwrap();
488
489            db.apply_batch(parent_batch).await.unwrap();
490            db.commit().await.unwrap();
491
492            // Commit child.
493            db.apply_batch(child_batch).await.unwrap();
494            db.commit().await.unwrap();
495
496            // Both keys should be readable.
497            assert_eq!(db.get(&key_a).await.unwrap().unwrap(), val_a);
498            assert_eq!(db.get(&key_b).await.unwrap().unwrap(), val_b);
499
500            db.destroy().await.unwrap();
501        });
502    }
503
504    /// Parent inserts key_x, child deletes key_x. After committing parent
505    /// then child sequentially, key_x should be gone and the
506    /// next-key ring should exclude it.
507    #[test_traced("WARN")]
508    fn test_ordered_sequential_commit_delete_after_insert() {
509        let executor = deterministic::Runner::default();
510        executor.start(|context| async move {
511            let mut db = create_test_db(context).await;
512
513            apply_ops(&mut db, create_test_ops(5)).await;
514            db.commit().await.unwrap();
515
516            let base = db.to_batch();
517
518            let key_x = Digest::random(&mut test_rng_seeded(810));
519            let val_x = vec![10u8; 8];
520            let parent_batch = base
521                .new_batch::<Sha256>()
522                .write(key_x, Some(val_x.clone()))
523                .merkleize(&db, None)
524                .await
525                .unwrap();
526
527            let child_batch = parent_batch
528                .new_batch::<Sha256>()
529                .write(key_x, None)
530                .merkleize(&db, None)
531                .await
532                .unwrap();
533
534            db.apply_batch(parent_batch).await.unwrap();
535            db.commit().await.unwrap();
536            assert_eq!(db.get(&key_x).await.unwrap().unwrap(), val_x);
537
538            // Commit child.
539            db.apply_batch(child_batch).await.unwrap();
540            db.commit().await.unwrap();
541
542            // key_x should be deleted.
543            assert!(db.get(&key_x).await.unwrap().is_none());
544
545            db.destroy().await.unwrap();
546        });
547    }
548
549    /// Parent and child both modify the same key. After committing parent
550    /// then child sequentially, the child's value wins.
551    #[test_traced("WARN")]
552    fn test_ordered_sequential_commit_overlapping_keys() {
553        let executor = deterministic::Runner::default();
554        executor.start(|context| async move {
555            let mut db = create_test_db(context).await;
556
557            apply_ops(&mut db, create_test_ops(5)).await;
558            db.commit().await.unwrap();
559
560            let base = db.to_batch();
561
562            let key_x = Digest::random(&mut test_rng_seeded(820));
563            let val_a = vec![10u8; 8];
564            let parent_batch = base
565                .new_batch::<Sha256>()
566                .write(key_x, Some(val_a.clone()))
567                .merkleize(&db, None)
568                .await
569                .unwrap();
570
571            let val_b = vec![20u8; 8];
572            let child_batch = parent_batch
573                .new_batch::<Sha256>()
574                .write(key_x, Some(val_b.clone()))
575                .merkleize(&db, None)
576                .await
577                .unwrap();
578
579            db.apply_batch(parent_batch).await.unwrap();
580            db.commit().await.unwrap();
581            assert_eq!(db.get(&key_x).await.unwrap().unwrap(), val_a);
582
583            // Commit child.
584            db.apply_batch(child_batch).await.unwrap();
585            db.commit().await.unwrap();
586
587            assert_eq!(db.get(&key_x).await.unwrap().unwrap(), val_b);
588
589            db.destroy().await.unwrap();
590        });
591    }
592
593    // FromSyncTestable implementation for from_sync_result tests
594    mod from_sync_testable {
595        use super::*;
596        use crate::{
597            merkle::{
598                mmr::{self, journaled::Mmr},
599                Family as _,
600            },
601            qmdb::any::sync::tests::FromSyncTestable,
602        };
603        use futures::future::join_all;
604
605        type TestMmr = Mmr<deterministic::Context, Digest>;
606
607        impl FromSyncTestable for AnyTest {
608            type Mmr = TestMmr;
609
610            fn into_log_components(self) -> (Self::Mmr, Self::Journal) {
611                (self.log.merkle, self.log.journal)
612            }
613
614            async fn pinned_nodes_at(&self, loc: mmr::Location) -> Vec<Digest> {
615                join_all(mmr::Family::nodes_to_pin(loc).map(|p| self.log.merkle.get_node(p)))
616                    .await
617                    .into_iter()
618                    .map(|n| n.unwrap().unwrap())
619                    .collect()
620            }
621        }
622    }
623}