commonware_sync/databases/
current.rs1use crate::{Hasher, Key, Translator, Value};
18use commonware_codec::FixedSize;
19use commonware_cryptography::{sha256, Hasher as CryptoHasher};
20use commonware_parallel::Sequential;
21use commonware_runtime::{buffer, BufferPooler, Clock, Metrics, Storage};
22use commonware_storage::{
23 journal::contiguous::fixed::Config as FConfig,
24 mmr::{self, full::Config as MmrConfig, Location, Proof},
25 qmdb::{
26 self,
27 any::unordered::{fixed::Operation as FixedOperation, Update},
28 current::{self, FixedConfig as Config},
29 operation::Committable,
30 },
31};
32use commonware_utils::{NZUsize, NZU16, NZU64};
33use std::{future::Future, num::NonZeroU64};
34use tracing::error;
35
36const CHUNK_SIZE: usize = sha256::Digest::SIZE;
38
39pub type Database<E> = current::unordered::fixed::Db<
41 mmr::Family,
42 E,
43 Key,
44 Value,
45 Hasher,
46 Translator,
47 CHUNK_SIZE,
48 Sequential,
49>;
50
51pub type Operation = FixedOperation<mmr::Family, Key, Value>;
53
54pub fn create_config(context: &impl BufferPooler) -> Config<Translator, Sequential> {
56 let page_cache = buffer::paged::CacheRef::from_pooler(context, NZU16!(2048), NZUsize!(10));
57 Config {
58 merkle_config: MmrConfig {
59 journal_partition: "mmr-journal".into(),
60 metadata_partition: "mmr-metadata".into(),
61 items_per_blob: NZU64!(4096),
62 write_buffer: NZUsize!(4096),
63 strategy: Sequential,
64 page_cache: page_cache.clone(),
65 },
66 journal_config: FConfig {
67 partition: "log-journal".into(),
68 items_per_blob: NZU64!(4096),
69 write_buffer: NZUsize!(4096),
70 page_cache,
71 },
72 grafted_metadata_partition: "grafted-mmr-metadata".into(),
73 translator: Translator::default(),
74 }
75}
76
77impl<E> super::ExampleDatabase for Database<E>
78where
79 E: Storage + Clock + Metrics,
80{
81 type Family = mmr::Family;
82 type Operation = Operation;
83
84 fn create_test_operations(count: usize, seed: u64, _starting_loc: u64) -> Vec<Self::Operation> {
85 let mut hasher = <Hasher as CryptoHasher>::new();
86 let mut operations = Vec::new();
87 for i in 0..count {
88 let key = {
89 hasher.update(&i.to_be_bytes());
90 hasher.update(&seed.to_be_bytes());
91 hasher.finalize()
92 };
93
94 let value = {
95 hasher.update(&key);
96 hasher.update(b"value");
97 hasher.finalize()
98 };
99
100 operations.push(Operation::Update(Update(key, value)));
101
102 if (i + 1) % 10 == 0 {
103 operations.push(Operation::CommitFloor(None, Location::from(i + 1)));
104 }
105 }
106
107 operations.push(Operation::CommitFloor(None, Location::from(count)));
109 operations
110 }
111
112 async fn add_operations(
113 &mut self,
114 operations: Vec<Self::Operation>,
115 ) -> Result<(), qmdb::Error<mmr::Family>> {
116 if operations.last().is_none() || !operations.last().unwrap().is_commit() {
117 error!("operations must end with a commit");
118 return Ok(());
119 }
120
121 let mut batch = self.new_batch();
122 for operation in operations {
123 match operation {
124 Operation::Update(Update(key, value)) => {
125 batch = batch.write(key, Some(value));
126 }
127 Operation::Delete(key) => {
128 batch = batch.write(key, None);
129 }
130 Operation::CommitFloor(metadata, _) => {
131 let merkleized = batch.merkleize(self, metadata).await?;
132 self.apply_batch(merkleized).await?;
133 self.commit().await?;
134 batch = self.new_batch();
135 }
136 }
137 }
138 Ok(())
139 }
140
141 fn current_floor(&self) -> u64 {
142 0
145 }
146
147 fn root(&self) -> Key {
148 self.ops_root()
151 }
152
153 fn name() -> &'static str {
154 "current"
155 }
156}
157
158impl<E> super::Syncable for Database<E>
159where
160 E: Storage + Clock + Metrics,
161{
162 async fn size(&self) -> Location {
163 self.bounds().await.end
164 }
165
166 async fn sync_boundary(&self) -> Location {
167 self.sync_boundary()
168 }
169
170 fn historical_proof(
171 &self,
172 op_count: Location,
173 start_loc: Location,
174 max_ops: NonZeroU64,
175 ) -> impl Future<Output = Result<(Proof<Key>, Vec<Self::Operation>), qmdb::Error<mmr::Family>>> + Send
176 {
177 self.ops_historical_proof(op_count, start_loc, max_ops)
179 }
180
181 fn pinned_nodes_at(
182 &self,
183 loc: Location,
184 ) -> impl Future<Output = Result<Vec<Key>, qmdb::Error<mmr::Family>>> + Send {
185 self.pinned_nodes_at(loc)
186 }
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192 use crate::databases::ExampleDatabase;
193 use commonware_runtime::deterministic;
194
195 type CurrentDb = Database<deterministic::Context>;
196
197 #[test]
198 fn test_create_test_operations() {
199 let ops = <CurrentDb as ExampleDatabase>::create_test_operations(5, 12345, 0);
200 assert_eq!(ops.len(), 6); if let Operation::CommitFloor(_, loc) = &ops[5] {
203 assert_eq!(*loc, 5);
204 } else {
205 panic!("last operation should be a commit");
206 }
207 }
208
209 #[test]
210 fn test_deterministic_operations() {
211 let ops1 = <CurrentDb as ExampleDatabase>::create_test_operations(3, 12345, 0);
212 let ops2 = <CurrentDb as ExampleDatabase>::create_test_operations(3, 12345, 0);
213 assert_eq!(ops1, ops2);
214
215 let ops3 = <CurrentDb as ExampleDatabase>::create_test_operations(3, 54321, 0);
216 assert_ne!(ops1, ops3);
217 }
218}