Skip to main content

commonware_sync/databases/
current.rs

1//! Current database types and helpers for the sync example.
2//!
3//! A `current` database extends an `any` database with an activity bitmap that tracks which
4//! operations are active (i.e. represent the current state of their key) vs inactive (superseded or
5//! deleted). Its canonical root folds the ops root, a grafted merkle root (combining bitmap chunks
6//! with ops subtree roots), and an optional partial-chunk digest. See [current] module
7//! documentation for more details.
8//!
9//! For sync, the engine targets the **ops root** (not the canonical root). The operations and proof
10//! format are identical to `any` -- the bitmap is reconstructed deterministically from the
11//! operations after sync completes. See the [Root structure](commonware_storage::qmdb::current)
12//! module documentation for details.
13//!
14//! This module re-uses the same [`Operation`] type as [`super::any`] since the underlying
15//! operations log is the same.
16
17use crate::{Hasher, Key, Translator, Value};
18use commonware_codec::FixedSize;
19use commonware_cryptography::{sha256, Hasher as CryptoHasher};
20use commonware_runtime::{buffer, BufferPooler, Clock, Metrics, Storage};
21use commonware_storage::{
22    journal::contiguous::fixed::Config as FConfig,
23    mmr::{self, journaled::Config as MmrConfig, Location, Proof},
24    qmdb::{
25        self,
26        any::unordered::{fixed::Operation as FixedOperation, Update},
27        current::{self, FixedConfig as Config},
28        operation::Committable,
29    },
30};
31use commonware_utils::{NZUsize, NZU16, NZU64};
32use std::{future::Future, num::NonZeroU64};
33use tracing::error;
34
35/// Bitmap chunk size in bytes. Each chunk covers `N * 8` operations' activity bits.
36const CHUNK_SIZE: usize = sha256::Digest::SIZE;
37
38/// Database type alias.
39pub type Database<E> =
40    current::unordered::fixed::Db<mmr::Family, E, Key, Value, Hasher, Translator, CHUNK_SIZE>;
41
42/// Operation type alias. Same as the `any` operation type.
43pub type Operation = FixedOperation<mmr::Family, Key, Value>;
44
45/// Create a database configuration.
46pub fn create_config(context: &impl BufferPooler) -> Config<Translator> {
47    let page_cache = buffer::paged::CacheRef::from_pooler(context, NZU16!(2048), NZUsize!(10));
48    Config {
49        merkle_config: MmrConfig {
50            journal_partition: "mmr-journal".into(),
51            metadata_partition: "mmr-metadata".into(),
52            items_per_blob: NZU64!(4096),
53            write_buffer: NZUsize!(4096),
54            thread_pool: None,
55            page_cache: page_cache.clone(),
56        },
57        journal_config: FConfig {
58            partition: "log-journal".into(),
59            items_per_blob: NZU64!(4096),
60            write_buffer: NZUsize!(4096),
61            page_cache,
62        },
63        grafted_metadata_partition: "grafted-mmr-metadata".into(),
64        translator: Translator::default(),
65    }
66}
67
68impl<E> super::Syncable for Database<E>
69where
70    E: Storage + Clock + Metrics,
71{
72    type Family = mmr::Family;
73    type Operation = Operation;
74
75    fn create_test_operations(count: usize, seed: u64) -> Vec<Self::Operation> {
76        let mut hasher = <Hasher as CryptoHasher>::new();
77        let mut operations = Vec::new();
78        for i in 0..count {
79            let key = {
80                hasher.update(&i.to_be_bytes());
81                hasher.update(&seed.to_be_bytes());
82                hasher.finalize()
83            };
84
85            let value = {
86                hasher.update(&key);
87                hasher.update(b"value");
88                hasher.finalize()
89            };
90
91            operations.push(Operation::Update(Update(key, value)));
92
93            if (i + 1) % 10 == 0 {
94                operations.push(Operation::CommitFloor(None, Location::from(i + 1)));
95            }
96        }
97
98        // Always end with a commit.
99        operations.push(Operation::CommitFloor(None, Location::from(count)));
100        operations
101    }
102
103    async fn add_operations(
104        &mut self,
105        operations: Vec<Self::Operation>,
106    ) -> Result<(), qmdb::Error<mmr::Family>> {
107        if operations.last().is_none() || !operations.last().unwrap().is_commit() {
108            error!("operations must end with a commit");
109            return Ok(());
110        }
111
112        let mut batch = self.new_batch();
113        for operation in operations {
114            match operation {
115                Operation::Update(Update(key, value)) => {
116                    batch = batch.write(key, Some(value));
117                }
118                Operation::Delete(key) => {
119                    batch = batch.write(key, None);
120                }
121                Operation::CommitFloor(metadata, _) => {
122                    let merkleized = batch.merkleize(self, metadata).await?;
123                    self.apply_batch(merkleized).await?;
124                    self.commit().await?;
125                    batch = self.new_batch();
126                }
127            }
128        }
129        Ok(())
130    }
131
132    fn root(&self) -> Key {
133        // Return the ops root (not the canonical root) because this is what the
134        // sync engine verifies against.
135        self.ops_root()
136    }
137
138    async fn size(&self) -> Location {
139        self.bounds().await.end
140    }
141
142    async fn inactivity_floor(&self) -> Location {
143        self.inactivity_floor_loc()
144    }
145
146    fn historical_proof(
147        &self,
148        op_count: Location,
149        start_loc: Location,
150        max_ops: NonZeroU64,
151    ) -> impl Future<Output = Result<(Proof<Key>, Vec<Self::Operation>), qmdb::Error<mmr::Family>>> + Send
152    {
153        // Return ops-level proofs (not grafted proofs) for the sync engine.
154        self.ops_historical_proof(op_count, start_loc, max_ops)
155    }
156
157    fn pinned_nodes_at(
158        &self,
159        loc: Location,
160    ) -> impl Future<Output = Result<Vec<Key>, qmdb::Error<mmr::Family>>> + Send {
161        self.pinned_nodes_at(loc)
162    }
163
164    fn name() -> &'static str {
165        "current"
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use super::*;
172    use crate::databases::Syncable;
173    use commonware_runtime::deterministic;
174
175    type CurrentDb = Database<deterministic::Context>;
176
177    #[test]
178    fn test_create_test_operations() {
179        let ops = <CurrentDb as Syncable>::create_test_operations(5, 12345);
180        assert_eq!(ops.len(), 6); // 5 operations + 1 commit
181
182        if let Operation::CommitFloor(_, loc) = &ops[5] {
183            assert_eq!(*loc, 5);
184        } else {
185            panic!("last operation should be a commit");
186        }
187    }
188
189    #[test]
190    fn test_deterministic_operations() {
191        let ops1 = <CurrentDb as Syncable>::create_test_operations(3, 12345);
192        let ops2 = <CurrentDb as Syncable>::create_test_operations(3, 12345);
193        assert_eq!(ops1, ops2);
194
195        let ops3 = <CurrentDb as Syncable>::create_test_operations(3, 54321);
196        assert_ne!(ops1, ops3);
197    }
198}