Skip to main content

commonware_storage/qmdb/current/
mod.rs

1//! A _Current_ authenticated database provides succinct proofs of _any_ value ever associated with
2//! a key, and also whether that value is the _current_ value associated with it. The
3//! implementations are based on a [crate::qmdb::any] authenticated database combined with an
4//! authenticated [crate::bitmap::MerkleizedBitMap] over the activity status of each operation.
5//! The two structures are "grafted" together to minimize proof sizes.
6
7use crate::{
8    qmdb::any::{FixedConfig as AnyFixedConfig, VariableConfig as AnyVariableConfig},
9    translator::Translator,
10};
11use commonware_parallel::ThreadPool;
12use commonware_runtime::buffer::paged::CacheRef;
13use std::num::{NonZeroU64, NonZeroUsize};
14
15pub mod db;
16pub mod ordered;
17pub mod proof;
18pub mod unordered;
19
20/// Configuration for a `Current` authenticated db with fixed-size values.
21#[derive(Clone)]
22pub struct FixedConfig<T: Translator> {
23    /// The name of the storage partition used for the MMR's backing journal.
24    pub mmr_journal_partition: String,
25
26    /// The items per blob configuration value used by the MMR journal.
27    pub mmr_items_per_blob: NonZeroU64,
28
29    /// The size of the write buffer to use for each blob in the MMR journal.
30    pub mmr_write_buffer: NonZeroUsize,
31
32    /// The name of the storage partition used for the MMR's metadata.
33    pub mmr_metadata_partition: String,
34
35    /// The name of the storage partition used to persist the (pruned) log of operations.
36    pub log_journal_partition: String,
37
38    /// The items per blob configuration value used by the log journal.
39    pub log_items_per_blob: NonZeroU64,
40
41    /// The size of the write buffer to use for each blob in the log journal.
42    pub log_write_buffer: NonZeroUsize,
43
44    /// The name of the storage partition used for the bitmap metadata.
45    pub bitmap_metadata_partition: String,
46
47    /// The translator used by the compressed index.
48    pub translator: T,
49
50    /// An optional thread pool to use for parallelizing batch operations.
51    pub thread_pool: Option<ThreadPool>,
52
53    /// The page cache to use for caching data.
54    pub page_cache: CacheRef,
55}
56
57impl<T: Translator> From<FixedConfig<T>> for AnyFixedConfig<T> {
58    fn from(cfg: FixedConfig<T>) -> Self {
59        Self {
60            mmr_journal_partition: cfg.mmr_journal_partition,
61            mmr_metadata_partition: cfg.mmr_metadata_partition,
62            mmr_items_per_blob: cfg.mmr_items_per_blob,
63            mmr_write_buffer: cfg.mmr_write_buffer,
64            log_journal_partition: cfg.log_journal_partition,
65            log_items_per_blob: cfg.log_items_per_blob,
66            log_write_buffer: cfg.log_write_buffer,
67            translator: cfg.translator,
68            thread_pool: cfg.thread_pool,
69            page_cache: cfg.page_cache,
70        }
71    }
72}
73
74#[derive(Clone)]
75pub struct VariableConfig<T: Translator, C> {
76    /// The name of the storage partition used for the MMR's backing journal.
77    pub mmr_journal_partition: String,
78
79    /// The items per blob configuration value used by the MMR journal.
80    pub mmr_items_per_blob: NonZeroU64,
81
82    /// The size of the write buffer to use for each blob in the MMR journal.
83    pub mmr_write_buffer: NonZeroUsize,
84
85    /// The name of the storage partition used for the MMR's metadata.
86    pub mmr_metadata_partition: String,
87
88    /// The name of the storage partition used to persist the log of operations.
89    pub log_partition: String,
90
91    /// The size of the write buffer to use for each blob in the log journal.
92    pub log_write_buffer: NonZeroUsize,
93
94    /// Optional compression level (using `zstd`) to apply to log data before storing.
95    pub log_compression: Option<u8>,
96
97    /// The codec configuration to use for the log.
98    pub log_codec_config: C,
99
100    /// The items per blob configuration value used by the log journal.
101    pub log_items_per_blob: NonZeroU64,
102
103    /// The name of the storage partition used for the bitmap metadata.
104    pub bitmap_metadata_partition: String,
105
106    /// The translator used by the compressed index.
107    pub translator: T,
108
109    /// An optional thread pool to use for parallelizing batch operations.
110    pub thread_pool: Option<ThreadPool>,
111
112    /// The page cache to use for caching data.
113    pub page_cache: CacheRef,
114}
115
116impl<T: Translator, C> From<VariableConfig<T, C>> for AnyVariableConfig<T, C> {
117    fn from(cfg: VariableConfig<T, C>) -> Self {
118        Self {
119            mmr_journal_partition: cfg.mmr_journal_partition,
120            mmr_metadata_partition: cfg.mmr_metadata_partition,
121            mmr_items_per_blob: cfg.mmr_items_per_blob,
122            mmr_write_buffer: cfg.mmr_write_buffer,
123            log_items_per_blob: cfg.log_items_per_blob,
124            log_partition: cfg.log_partition,
125            log_write_buffer: cfg.log_write_buffer,
126            log_compression: cfg.log_compression,
127            log_codec_config: cfg.log_codec_config,
128            translator: cfg.translator,
129            thread_pool: cfg.thread_pool,
130            page_cache: cfg.page_cache,
131        }
132    }
133}
134
135/// Extension trait for Current QMDB types that exposes bitmap information for testing.
136#[cfg(any(test, feature = "test-traits"))]
137pub trait BitmapPrunedBits {
138    /// Returns the number of bits that have been pruned from the bitmap.
139    fn pruned_bits(&self) -> u64;
140
141    /// Returns the value of the bit at the given index.
142    fn get_bit(&self, index: u64) -> bool;
143
144    /// Returns the position of the oldest retained bit.
145    fn oldest_retained(&self) -> u64;
146}
147
148#[cfg(test)]
149pub mod tests {
150    //! Shared test utilities for Current QMDB variants.
151
152    pub use super::BitmapPrunedBits;
153    use crate::{
154        kv::{Deletable as _, Updatable as _},
155        qmdb::{
156            any::states::{CleanAny, MutableAny as _, UnmerkleizedDurableAny as _},
157            store::{
158                batch_tests::{TestKey, TestValue},
159                LogStore,
160            },
161            Error,
162        },
163    };
164    use commonware_runtime::{
165        deterministic::{self, Context},
166        Metrics as _, Runner as _,
167    };
168    use core::future::Future;
169    use rand::{rngs::StdRng, RngCore, SeedableRng};
170    use tracing::warn;
171
172    /// Apply random operations to the given db, committing them (randomly and at the end) only if
173    /// `commit_changes` is true. Returns a mutable db; callers should commit if needed.
174    pub async fn apply_random_ops<C>(
175        num_elements: u64,
176        commit_changes: bool,
177        rng_seed: u64,
178        mut db: C::Mutable,
179    ) -> Result<C::Mutable, Error>
180    where
181        C: CleanAny,
182        C::Key: TestKey,
183        <C as LogStore>::Value: TestValue,
184    {
185        // Log the seed with high visibility to make failures reproducible.
186        warn!("rng_seed={}", rng_seed);
187        let mut rng = StdRng::seed_from_u64(rng_seed);
188
189        for i in 0u64..num_elements {
190            let k = TestKey::from_seed(i);
191            let v = TestValue::from_seed(rng.next_u64());
192            db.update(k, v).await.unwrap();
193        }
194
195        // Randomly update / delete them. We use a delete frequency that is 1/7th of the update
196        // frequency.
197        for _ in 0u64..num_elements * 10 {
198            let rand_key = TestKey::from_seed(rng.next_u64() % num_elements);
199            if rng.next_u32() % 7 == 0 {
200                db.delete(rand_key).await.unwrap();
201                continue;
202            }
203            let v = TestValue::from_seed(rng.next_u64());
204            db.update(rand_key, v).await.unwrap();
205            if commit_changes && rng.next_u32() % 20 == 0 {
206                // Commit every ~20 updates.
207                let (durable_db, _) = db.commit(None).await?;
208                let clean_db: C = durable_db.into_merkleized().await?;
209                db = clean_db.into_mutable();
210            }
211        }
212        if commit_changes {
213            let (durable_db, _) = db.commit(None).await?;
214            let clean_db: C = durable_db.into_merkleized().await?;
215            db = clean_db.into_mutable();
216        }
217        Ok(db)
218    }
219
220    /// Run `test_build_random_close_reopen` against a database factory.
221    ///
222    /// The factory should return a clean (Merkleized, Durable) database when given a context and
223    /// partition name. The factory will be called multiple times to test reopening.
224    pub fn test_build_random_close_reopen<C, F, Fut>(mut open_db: F)
225    where
226        C: CleanAny,
227        C::Key: TestKey,
228        <C as LogStore>::Value: TestValue,
229        F: FnMut(Context, String) -> Fut + Clone,
230        Fut: Future<Output = C>,
231    {
232        const ELEMENTS: u64 = 1000;
233
234        let executor = deterministic::Runner::default();
235        let mut open_db_clone = open_db.clone();
236        let state1 = executor.start(|mut context| async move {
237            let partition = "build_random".to_string();
238            let rng_seed = context.next_u64();
239            let db: C = open_db_clone(context.with_label("first"), partition.clone()).await;
240            let db = apply_random_ops::<C>(ELEMENTS, true, rng_seed, db.into_mutable())
241                .await
242                .unwrap();
243            let (db, _) = db.commit(None).await.unwrap();
244            let mut db: C = db.into_merkleized().await.unwrap();
245            db.sync().await.unwrap();
246
247            // Drop and reopen the db
248            let root = db.root();
249            drop(db);
250            let db: C = open_db_clone(context.with_label("second"), partition).await;
251
252            // Ensure the root matches
253            assert_eq!(db.root(), root);
254
255            db.destroy().await.unwrap();
256            context.auditor().state()
257        });
258
259        // Run again to verify determinism
260        let executor = deterministic::Runner::default();
261        let state2 = executor.start(|mut context| async move {
262            let partition = "build_random".to_string();
263            let rng_seed = context.next_u64();
264            let db: C = open_db(context.with_label("first"), partition.clone()).await;
265            let db = apply_random_ops::<C>(ELEMENTS, true, rng_seed, db.into_mutable())
266                .await
267                .unwrap();
268            let (db, _) = db.commit(None).await.unwrap();
269            let mut db: C = db.into_merkleized().await.unwrap();
270            db.sync().await.unwrap();
271
272            let root = db.root();
273            drop(db);
274            let db: C = open_db(context.with_label("second"), partition).await;
275            assert_eq!(db.root(), root);
276
277            db.destroy().await.unwrap();
278            context.auditor().state()
279        });
280
281        assert_eq!(state1, state2);
282    }
283
284    /// Run `test_simulate_write_failures` against a database factory.
285    ///
286    /// This test builds a random database and simulates recovery from different types of
287    /// failure scenarios.
288    pub fn test_simulate_write_failures<C, F, Fut>(mut open_db: F)
289    where
290        C: CleanAny,
291        C::Key: TestKey,
292        <C as LogStore>::Value: TestValue,
293        F: FnMut(Context, String) -> Fut + Clone,
294        Fut: Future<Output = C>,
295    {
296        const ELEMENTS: u64 = 1000;
297
298        let executor = deterministic::Runner::default();
299        executor.start(|mut context| async move {
300            let partition = "build_random_fail_commit".to_string();
301            let rng_seed = context.next_u64();
302            let db: C = open_db(context.with_label("first"), partition.clone()).await;
303            let db = apply_random_ops::<C>(ELEMENTS, true, rng_seed, db.into_mutable())
304                .await
305                .unwrap();
306            let (db, _) = db.commit(None).await.unwrap();
307            let mut db: C = db.into_merkleized().await.unwrap();
308            let committed_root = db.root();
309            let committed_op_count = db.bounds().end;
310            let committed_inactivity_floor = db.inactivity_floor_loc();
311            db.prune(committed_inactivity_floor).await.unwrap();
312
313            // Perform more random operations without committing any of them.
314            let db = apply_random_ops::<C>(ELEMENTS, false, rng_seed + 1, db.into_mutable())
315                .await
316                .unwrap();
317
318            // SCENARIO #1: Simulate a crash that happens before any writes. Upon reopening, the
319            // state of the DB should be as of the last commit.
320            drop(db);
321            let db: C = open_db(context.with_label("scenario1"), partition.clone()).await;
322            assert_eq!(db.root(), committed_root);
323            assert_eq!(db.bounds().end, committed_op_count);
324
325            // Re-apply the exact same uncommitted operations.
326            let db = apply_random_ops::<C>(ELEMENTS, false, rng_seed + 1, db.into_mutable())
327                .await
328                .unwrap();
329
330            // SCENARIO #2: Simulate a crash that happens after the any db has been committed, but
331            // before the state of the pruned bitmap can be written to disk (i.e., before
332            // into_merkleized is called). We do this by committing and then dropping the durable
333            // db without calling close or into_merkleized.
334            let (durable_db, _) = db.commit(None).await.unwrap();
335            let committed_op_count = durable_db.bounds().end;
336            drop(durable_db);
337
338            // We should be able to recover, so the root should differ from the previous commit, and
339            // the op count should be greater than before.
340            let db: C = open_db(context.with_label("scenario2"), partition.clone()).await;
341            let scenario_2_root = db.root();
342
343            // To confirm the second committed hash is correct we'll re-build the DB in a new
344            // partition, but without any failures. They should have the exact same state.
345            let fresh_partition = "build_random_fail_commit_fresh".to_string();
346            let db: C = open_db(context.with_label("fresh"), fresh_partition.clone()).await;
347            let db = apply_random_ops::<C>(ELEMENTS, true, rng_seed, db.into_mutable())
348                .await
349                .unwrap();
350            let (db, _) = db.commit(None).await.unwrap();
351            let db = apply_random_ops::<C>(ELEMENTS, false, rng_seed + 1, db.into_mutable())
352                .await
353                .unwrap();
354            let (db, _) = db.commit(None).await.unwrap();
355            let mut db: C = db.into_merkleized().await.unwrap();
356            db.prune(db.inactivity_floor_loc()).await.unwrap();
357            // State from scenario #2 should match that of a successful commit.
358            assert_eq!(db.bounds().end, committed_op_count);
359            assert_eq!(db.root(), scenario_2_root);
360
361            db.destroy().await.unwrap();
362        });
363    }
364
365    /// Run `test_different_pruning_delays_same_root` against a database factory.
366    ///
367    /// This test verifies that pruning operations do not affect the root hash - two databases
368    /// with identical operations but different pruning schedules should have the same root.
369    pub fn test_different_pruning_delays_same_root<C, F, Fut>(mut open_db: F)
370    where
371        C: CleanAny,
372        C::Key: TestKey,
373        <C as LogStore>::Value: TestValue,
374        F: FnMut(Context, String) -> Fut + Clone,
375        Fut: Future<Output = C>,
376    {
377        const NUM_OPERATIONS: u64 = 1000;
378
379        let executor = deterministic::Runner::default();
380        let mut open_db_clone = open_db.clone();
381        executor.start(|context| async move {
382            // Create two databases that are identical other than how they are pruned.
383            let mut db_no_pruning: C = open_db_clone(
384                context.with_label("no_pruning"),
385                "no_pruning_test".to_string(),
386            )
387            .await;
388            let mut db_pruning: C =
389                open_db(context.with_label("pruning"), "pruning_test".to_string()).await;
390
391            let mut db_no_pruning_mut = db_no_pruning.into_mutable();
392            let mut db_pruning_mut = db_pruning.into_mutable();
393
394            // Apply identical operations to both databases, but only prune one.
395            for i in 0..NUM_OPERATIONS {
396                let key: C::Key = TestKey::from_seed(i);
397                let value: <C as LogStore>::Value = TestValue::from_seed(i * 1000);
398
399                db_no_pruning_mut.update(key, value.clone()).await.unwrap();
400                db_pruning_mut.update(key, value).await.unwrap();
401
402                // Commit periodically
403                if i % 50 == 49 {
404                    let (db_1, _) = db_no_pruning_mut.commit(None).await.unwrap();
405                    let clean_no_pruning: C = db_1.into_merkleized().await.unwrap();
406                    let (db_2, _) = db_pruning_mut.commit(None).await.unwrap();
407                    let mut clean_pruning: C = db_2.into_merkleized().await.unwrap();
408                    clean_pruning
409                        .prune(clean_no_pruning.inactivity_floor_loc())
410                        .await
411                        .unwrap();
412                    db_no_pruning_mut = clean_no_pruning.into_mutable();
413                    db_pruning_mut = clean_pruning.into_mutable();
414                }
415            }
416
417            // Final commit
418            let (db_1, _) = db_no_pruning_mut.commit(None).await.unwrap();
419            db_no_pruning = db_1.into_merkleized().await.unwrap();
420            let (db_2, _) = db_pruning_mut.commit(None).await.unwrap();
421            db_pruning = db_2.into_merkleized().await.unwrap();
422
423            // Get roots from both databases - they should match
424            let root_no_pruning = db_no_pruning.root();
425            let root_pruning = db_pruning.root();
426            assert_eq!(root_no_pruning, root_pruning);
427
428            // Also verify inactivity floors match
429            assert_eq!(
430                db_no_pruning.inactivity_floor_loc(),
431                db_pruning.inactivity_floor_loc()
432            );
433
434            db_no_pruning.destroy().await.unwrap();
435            db_pruning.destroy().await.unwrap();
436        });
437    }
438
439    /// Run `test_sync_persists_bitmap_pruning_boundary` against a database factory.
440    ///
441    /// This test verifies that calling `sync()` persists the bitmap pruning boundary that was
442    /// set during `into_merkleized()`. If `sync()` didn't call `write_pruned`, the
443    /// `pruned_bits()` count would be 0 after reopen instead of the expected value.
444    pub fn test_sync_persists_bitmap_pruning_boundary<C, F, Fut>(mut open_db: F)
445    where
446        C: CleanAny + BitmapPrunedBits,
447        C::Key: TestKey,
448        <C as LogStore>::Value: TestValue,
449        F: FnMut(Context, String) -> Fut + Clone,
450        Fut: Future<Output = C>,
451    {
452        const ELEMENTS: u64 = 500;
453
454        let executor = deterministic::Runner::default();
455        let mut open_db_clone = open_db.clone();
456        executor.start(|mut context| async move {
457            let partition = "sync_bitmap_pruning".to_string();
458            let rng_seed = context.next_u64();
459            let db: C = open_db_clone(context.with_label("first"), partition.clone()).await;
460
461            // Apply random operations with commits to advance the inactivity floor.
462            let db = apply_random_ops::<C>(ELEMENTS, true, rng_seed, db.into_mutable())
463                .await
464                .unwrap();
465            let (db, _) = db.commit(None).await.unwrap();
466            let mut db: C = db.into_merkleized().await.unwrap();
467
468            // The bitmap should have been pruned during into_merkleized().
469            let pruned_bits_before = db.pruned_bits();
470            warn!(
471                "pruned_bits_before={}, inactivity_floor={}, op_count={}",
472                pruned_bits_before,
473                *db.inactivity_floor_loc(),
474                *db.bounds().end
475            );
476
477            // Verify we actually have some pruning (otherwise the test is meaningless).
478            assert!(
479                pruned_bits_before > 0,
480                "Expected bitmap to have pruned bits after merkleization"
481            );
482
483            // Call sync() WITHOUT calling prune(). The bitmap pruning boundary was set
484            // during into_merkleized(), and sync() should persist it.
485            db.sync().await.unwrap();
486
487            // Record the root before dropping.
488            let root_before = db.root();
489            drop(db);
490
491            // Reopen the database.
492            let db: C = open_db(context.with_label("second"), partition).await;
493
494            // The pruned bits count should match. If sync() didn't persist the bitmap pruned
495            // state, this would be 0.
496            let pruned_bits_after = db.pruned_bits();
497            warn!("pruned_bits_after={}", pruned_bits_after);
498
499            assert_eq!(
500                pruned_bits_after, pruned_bits_before,
501                "Bitmap pruned bits mismatch after reopen - sync() may not have called write_pruned()"
502            );
503
504            // Also verify the root matches.
505            assert_eq!(db.root(), root_before);
506
507            db.destroy().await.unwrap();
508        });
509    }
510
511    /// Run `test_current_db_build_big` against a database factory.
512    ///
513    /// This test builds a database with 1000 keys, updates some, deletes some, and verifies that
514    /// the final state matches an independently computed HashMap. It also verifies that the state
515    /// persists correctly after close and reopen.
516    ///
517    /// The `expected_op_count` and `expected_inactivity_floor` parameters specify the expected
518    /// values after commit + merkleize + prune. These differ between ordered and unordered variants.
519    pub fn test_current_db_build_big<C, F, Fut>(
520        mut open_db: F,
521        expected_op_count: u64,
522        expected_inactivity_floor: u64,
523    ) where
524        C: CleanAny,
525        C::Key: TestKey,
526        <C as LogStore>::Value: TestValue,
527        F: FnMut(Context, String) -> Fut + Clone,
528        Fut: Future<Output = C>,
529    {
530        use crate::mmr::Location;
531
532        const ELEMENTS: u64 = 1000;
533
534        let executor = deterministic::Runner::default();
535        let mut open_db_clone = open_db.clone();
536        executor.start(|context| async move {
537            let mut db = open_db_clone(context.with_label("first"), "build_big".to_string())
538                .await
539                .into_mutable();
540
541            let mut map = std::collections::HashMap::<C::Key, <C as LogStore>::Value>::default();
542            for i in 0u64..ELEMENTS {
543                let k: C::Key = TestKey::from_seed(i);
544                let v: <C as LogStore>::Value = TestValue::from_seed(i * 1000);
545                db.update(k, v.clone()).await.unwrap();
546                map.insert(k, v);
547            }
548
549            // Update every 3rd key
550            for i in 0u64..ELEMENTS {
551                if i % 3 != 0 {
552                    continue;
553                }
554                let k: C::Key = TestKey::from_seed(i);
555                let v: <C as LogStore>::Value = TestValue::from_seed((i + 1) * 10000);
556                db.update(k, v.clone()).await.unwrap();
557                map.insert(k, v);
558            }
559
560            // Delete every 7th key
561            for i in 0u64..ELEMENTS {
562                if i % 7 != 1 {
563                    continue;
564                }
565                let k: C::Key = TestKey::from_seed(i);
566                db.delete(k).await.unwrap();
567                map.remove(&k);
568            }
569
570            // Test that commit + sync w/ pruning will raise the activity floor.
571            let (db, _) = db.commit(None).await.unwrap();
572            let mut db: C = db.into_merkleized().await.unwrap();
573            db.sync().await.unwrap();
574            db.prune(db.inactivity_floor_loc()).await.unwrap();
575
576            // Verify expected state after prune.
577            assert_eq!(db.bounds().end, Location::new_unchecked(expected_op_count));
578            assert_eq!(
579                db.inactivity_floor_loc(),
580                Location::new_unchecked(expected_inactivity_floor)
581            );
582
583            // Record root before dropping.
584            let root = db.root();
585            db.sync().await.unwrap();
586            drop(db);
587
588            // Reopen the db and verify it has exactly the same state.
589            let db: C = open_db(context.with_label("second"), "build_big".to_string()).await;
590            assert_eq!(root, db.root());
591            assert_eq!(db.bounds().end, Location::new_unchecked(expected_op_count));
592            assert_eq!(
593                db.inactivity_floor_loc(),
594                Location::new_unchecked(expected_inactivity_floor)
595            );
596
597            // Confirm the db's state matches that of the separate map we computed independently.
598            for i in 0u64..ELEMENTS {
599                let k: C::Key = TestKey::from_seed(i);
600                if let Some(map_value) = map.get(&k) {
601                    let Some(db_value) = db.get(&k).await.unwrap() else {
602                        panic!("key not found in db: {k}");
603                    };
604                    assert_eq!(*map_value, db_value);
605                } else {
606                    assert!(db.get(&k).await.unwrap().is_none());
607                }
608            }
609        });
610    }
611}