Skip to main content

commonware_sync/databases/
current.rs

1//! Current database types and helpers for the sync example.
2//!
3//! A `current` database extends an `any` database with an activity bitmap that tracks which
4//! operations are active (i.e. represent the current state of their key) vs inactive
5//! (superseded or deleted). Its canonical root folds the ops root, a grafted MMR root
6//! (combining bitmap chunks with ops subtree roots), and an optional partial-chunk digest.
7//! See [current] module documentation for more details.
8//!
9//! For sync, the engine targets the **ops root** (not the canonical root). The operations and
10//! proof format are identical to `any` -- the bitmap is reconstructed deterministically from
11//! the operations after sync completes. See the
12//! [Root structure](commonware_storage::qmdb::current) module documentation for details.
13//!
14//! This module re-uses the same [`Operation`] type as [`super::any`] since the underlying
15//! operations log is the same.
16
17use crate::{Hasher, Key, Translator, Value};
18use commonware_codec::FixedSize;
19use commonware_cryptography::{sha256, Hasher as CryptoHasher};
20use commonware_runtime::{buffer, BufferPooler, Clock, Metrics, Storage};
21use commonware_storage::{
22    mmr::{Location, Proof},
23    qmdb::{
24        self,
25        any::unordered::{fixed::Operation as FixedOperation, Update},
26        current::{self, FixedConfig as Config},
27        operation::Committable,
28        store::LogStore,
29    },
30};
31use commonware_utils::{NZUsize, NZU16, NZU64};
32use std::{future::Future, num::NonZeroU64};
33use tracing::error;
34
35/// Bitmap chunk size in bytes. Each chunk covers `N * 8` operations' activity bits.
36const CHUNK_SIZE: usize = sha256::Digest::SIZE;
37
38/// Database type alias.
39pub type Database<E> = current::unordered::fixed::Db<E, Key, Value, Hasher, Translator, CHUNK_SIZE>;
40
41/// Operation type alias. Same as the `any` operation type.
42pub type Operation = FixedOperation<Key, Value>;
43
44/// Create a database configuration.
45pub fn create_config(context: &impl BufferPooler) -> Config<Translator> {
46    Config {
47        mmr_journal_partition: "mmr-journal".into(),
48        mmr_metadata_partition: "mmr-metadata".into(),
49        mmr_items_per_blob: NZU64!(4096),
50        mmr_write_buffer: NZUsize!(4096),
51        log_journal_partition: "log-journal".into(),
52        log_items_per_blob: NZU64!(4096),
53        log_write_buffer: NZUsize!(4096),
54        grafted_mmr_metadata_partition: "grafted-mmr-metadata".into(),
55        translator: Translator::default(),
56        thread_pool: None,
57        page_cache: buffer::paged::CacheRef::from_pooler(context, NZU16!(2048), NZUsize!(10)),
58    }
59}
60
61impl<E> super::Syncable for Database<E>
62where
63    E: Storage + Clock + Metrics,
64{
65    type Operation = Operation;
66
67    fn create_test_operations(count: usize, seed: u64) -> Vec<Self::Operation> {
68        let mut hasher = <Hasher as CryptoHasher>::new();
69        let mut operations = Vec::new();
70        for i in 0..count {
71            let key = {
72                hasher.update(&i.to_be_bytes());
73                hasher.update(&seed.to_be_bytes());
74                hasher.finalize()
75            };
76
77            let value = {
78                hasher.update(&key);
79                hasher.update(b"value");
80                hasher.finalize()
81            };
82
83            operations.push(Operation::Update(Update(key, value)));
84
85            if (i + 1) % 10 == 0 {
86                operations.push(Operation::CommitFloor(None, Location::from(i + 1)));
87            }
88        }
89
90        // Always end with a commit.
91        operations.push(Operation::CommitFloor(None, Location::from(count)));
92        operations
93    }
94
95    async fn add_operations(
96        &mut self,
97        operations: Vec<Self::Operation>,
98    ) -> Result<(), qmdb::Error> {
99        if operations.last().is_none() || !operations.last().unwrap().is_commit() {
100            error!("operations must end with a commit");
101            return Ok(());
102        }
103
104        let mut batch = self.new_batch();
105        for operation in operations {
106            match operation {
107                Operation::Update(Update(key, value)) => {
108                    batch.write(key, Some(value));
109                }
110                Operation::Delete(key) => {
111                    batch.write(key, None);
112                }
113                Operation::CommitFloor(metadata, _) => {
114                    let finalized = batch.merkleize(metadata).await?.finalize();
115                    self.apply_batch(finalized).await?;
116                    batch = self.new_batch();
117                }
118            }
119        }
120        Ok(())
121    }
122
123    fn root(&self) -> Key {
124        // Return the ops root (not the canonical root) because this is what the
125        // sync engine verifies against.
126        self.ops_root()
127    }
128
129    async fn size(&self) -> Location {
130        LogStore::bounds(self).await.end
131    }
132
133    async fn inactivity_floor(&self) -> Location {
134        self.inactivity_floor_loc()
135    }
136
137    fn historical_proof(
138        &self,
139        op_count: Location,
140        start_loc: Location,
141        max_ops: NonZeroU64,
142    ) -> impl Future<Output = Result<(Proof<Key>, Vec<Self::Operation>), qmdb::Error>> + Send {
143        // Return ops-level proofs (not grafted proofs) for the sync engine.
144        self.ops_historical_proof(op_count, start_loc, max_ops)
145    }
146
147    fn name() -> &'static str {
148        "current"
149    }
150}
151
152#[cfg(test)]
153mod tests {
154    use super::*;
155    use crate::databases::Syncable;
156    use commonware_runtime::deterministic;
157
158    type CurrentDb = Database<deterministic::Context>;
159
160    #[test]
161    fn test_create_test_operations() {
162        let ops = <CurrentDb as Syncable>::create_test_operations(5, 12345);
163        assert_eq!(ops.len(), 6); // 5 operations + 1 commit
164
165        if let Operation::CommitFloor(_, loc) = &ops[5] {
166            assert_eq!(*loc, 5);
167        } else {
168            panic!("last operation should be a commit");
169        }
170    }
171
172    #[test]
173    fn test_deterministic_operations() {
174        let ops1 = <CurrentDb as Syncable>::create_test_operations(3, 12345);
175        let ops2 = <CurrentDb as Syncable>::create_test_operations(3, 12345);
176        assert_eq!(ops1, ops2);
177
178        let ops3 = <CurrentDb as Syncable>::create_test_operations(3, 54321);
179        assert_ne!(ops1, ops3);
180    }
181}