Skip to main content

commonware_sync/databases/
current.rs

1//! Current database types and helpers for the sync example.
2//!
3//! A `current` database extends an `any` database with an activity bitmap that tracks which
4//! operations are active (i.e. represent the current state of their key) vs inactive (superseded or
5//! deleted). Its canonical root folds the ops root, a grafted merkle root (combining bitmap chunks
6//! with ops subtree roots), and an optional partial-chunk digest. See [current] module
7//! documentation for more details.
8//!
9//! For sync, the engine targets the **ops root** (not the canonical root). The operations and proof
10//! format are identical to `any`; direct proof verifiers should use `qmdb::hasher`. The bitmap is
11//! reconstructed deterministically from the operations after sync completes. See the
12//! [Root structure](commonware_storage::qmdb::current) module documentation for details.
13//!
14//! This module re-uses the same [`Operation`] type as [`super::any`] since the underlying
15//! operations log is the same.
16
17use crate::{Hasher, Key, Translator, Value};
18use commonware_codec::FixedSize;
19use commonware_cryptography::{sha256, Hasher as CryptoHasher};
20use commonware_parallel::Sequential;
21use commonware_runtime::{buffer, BufferPooler, Clock, Metrics, Storage};
22use commonware_storage::{
23    journal::contiguous::fixed::Config as FConfig,
24    mmr::{self, full::Config as MmrConfig, Location, Proof},
25    qmdb::{
26        self,
27        any::unordered::{fixed::Operation as FixedOperation, Update},
28        current::{self, FixedConfig as Config},
29        operation::Committable,
30    },
31};
32use commonware_utils::{NZUsize, NZU16, NZU64};
33use std::{future::Future, num::NonZeroU64};
34use tracing::error;
35
36/// Bitmap chunk size in bytes. Each chunk covers `N * 8` operations' activity bits.
37const CHUNK_SIZE: usize = sha256::Digest::SIZE;
38
39/// Database type alias.
40pub type Database<E> = current::unordered::fixed::Db<
41    mmr::Family,
42    E,
43    Key,
44    Value,
45    Hasher,
46    Translator,
47    CHUNK_SIZE,
48    Sequential,
49>;
50
51/// Operation type alias. Same as the `any` operation type.
52pub type Operation = FixedOperation<mmr::Family, Key, Value>;
53
54/// Create a database configuration.
55pub fn create_config(context: &impl BufferPooler) -> Config<Translator, Sequential> {
56    let page_cache = buffer::paged::CacheRef::from_pooler(context, NZU16!(2048), NZUsize!(10));
57    Config {
58        merkle_config: MmrConfig {
59            journal_partition: "mmr-journal".into(),
60            metadata_partition: "mmr-metadata".into(),
61            items_per_blob: NZU64!(4096),
62            write_buffer: NZUsize!(4096),
63            strategy: Sequential,
64            page_cache: page_cache.clone(),
65        },
66        journal_config: FConfig {
67            partition: "log-journal".into(),
68            items_per_blob: NZU64!(4096),
69            write_buffer: NZUsize!(4096),
70            page_cache,
71        },
72        grafted_metadata_partition: "grafted-mmr-metadata".into(),
73        translator: Translator::default(),
74    }
75}
76
77impl<E> super::ExampleDatabase for Database<E>
78where
79    E: Storage + Clock + Metrics,
80{
81    type Family = mmr::Family;
82    type Operation = Operation;
83
84    fn create_test_operations(count: usize, seed: u64, _starting_loc: u64) -> Vec<Self::Operation> {
85        let mut hasher = <Hasher as CryptoHasher>::new();
86        let mut operations = Vec::new();
87        for i in 0..count {
88            let key = {
89                hasher.update(&i.to_be_bytes());
90                hasher.update(&seed.to_be_bytes());
91                hasher.finalize()
92            };
93
94            let value = {
95                hasher.update(&key);
96                hasher.update(b"value");
97                hasher.finalize()
98            };
99
100            operations.push(Operation::Update(Update(key, value)));
101
102            if (i + 1) % 10 == 0 {
103                operations.push(Operation::CommitFloor(None, Location::from(i + 1)));
104            }
105        }
106
107        // Always end with a commit.
108        operations.push(Operation::CommitFloor(None, Location::from(count)));
109        operations
110    }
111
112    async fn add_operations(
113        &mut self,
114        operations: Vec<Self::Operation>,
115    ) -> Result<(), qmdb::Error<mmr::Family>> {
116        if operations.last().is_none() || !operations.last().unwrap().is_commit() {
117            error!("operations must end with a commit");
118            return Ok(());
119        }
120
121        let mut batch = self.new_batch();
122        for operation in operations {
123            match operation {
124                Operation::Update(Update(key, value)) => {
125                    batch = batch.write(key, Some(value));
126                }
127                Operation::Delete(key) => {
128                    batch = batch.write(key, None);
129                }
130                Operation::CommitFloor(metadata, _) => {
131                    let merkleized = batch.merkleize(self, metadata).await?;
132                    self.apply_batch(merkleized).await?;
133                    self.commit().await?;
134                    batch = self.new_batch();
135                }
136            }
137        }
138        Ok(())
139    }
140
141    fn current_floor(&self) -> u64 {
142        // `current`'s `merkleize` derives the floor internally; the `starting_loc` passed to
143        // `create_test_operations` is unused, so any value is safe.
144        0
145    }
146
147    fn root(&self) -> Key {
148        // Return the ops root (not the canonical root) because this is what the
149        // sync engine verifies against.
150        self.ops_root()
151    }
152
153    fn name() -> &'static str {
154        "current"
155    }
156}
157
158impl<E> super::Syncable for Database<E>
159where
160    E: Storage + Clock + Metrics,
161{
162    async fn size(&self) -> Location {
163        self.bounds().await.end
164    }
165
166    async fn sync_boundary(&self) -> Location {
167        self.sync_boundary()
168    }
169
170    fn historical_proof(
171        &self,
172        op_count: Location,
173        start_loc: Location,
174        max_ops: NonZeroU64,
175    ) -> impl Future<Output = Result<(Proof<Key>, Vec<Self::Operation>), qmdb::Error<mmr::Family>>> + Send
176    {
177        // Return ops-level proofs (not grafted proofs) for the sync engine.
178        self.ops_historical_proof(op_count, start_loc, max_ops)
179    }
180
181    fn pinned_nodes_at(
182        &self,
183        loc: Location,
184    ) -> impl Future<Output = Result<Vec<Key>, qmdb::Error<mmr::Family>>> + Send {
185        self.pinned_nodes_at(loc)
186    }
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192    use crate::databases::ExampleDatabase;
193    use commonware_runtime::deterministic;
194
195    type CurrentDb = Database<deterministic::Context>;
196
197    #[test]
198    fn test_create_test_operations() {
199        let ops = <CurrentDb as ExampleDatabase>::create_test_operations(5, 12345, 0);
200        assert_eq!(ops.len(), 6); // 5 operations + 1 commit
201
202        if let Operation::CommitFloor(_, loc) = &ops[5] {
203            assert_eq!(*loc, 5);
204        } else {
205            panic!("last operation should be a commit");
206        }
207    }
208
209    #[test]
210    fn test_deterministic_operations() {
211        let ops1 = <CurrentDb as ExampleDatabase>::create_test_operations(3, 12345, 0);
212        let ops2 = <CurrentDb as ExampleDatabase>::create_test_operations(3, 12345, 0);
213        assert_eq!(ops1, ops2);
214
215        let ops3 = <CurrentDb as ExampleDatabase>::create_test_operations(3, 54321, 0);
216        assert_ne!(ops1, ops3);
217    }
218}