commonware_sync/databases/
current.rs1use crate::{Hasher, Key, Translator, Value};
18use commonware_codec::FixedSize;
19use commonware_cryptography::{sha256, Hasher as CryptoHasher};
20use commonware_runtime::{buffer, BufferPooler, Clock, Metrics, Storage};
21use commonware_storage::{
22 journal::contiguous::fixed::Config as FConfig,
23 mmr::{self, journaled::Config as MmrConfig, Location, Proof},
24 qmdb::{
25 self,
26 any::unordered::{fixed::Operation as FixedOperation, Update},
27 current::{self, FixedConfig as Config},
28 operation::Committable,
29 },
30};
31use commonware_utils::{NZUsize, NZU16, NZU64};
32use std::{future::Future, num::NonZeroU64};
33use tracing::error;
34
35const CHUNK_SIZE: usize = sha256::Digest::SIZE;
37
38pub type Database<E> =
40 current::unordered::fixed::Db<mmr::Family, E, Key, Value, Hasher, Translator, CHUNK_SIZE>;
41
42pub type Operation = FixedOperation<mmr::Family, Key, Value>;
44
45pub fn create_config(context: &impl BufferPooler) -> Config<Translator> {
47 let page_cache = buffer::paged::CacheRef::from_pooler(context, NZU16!(2048), NZUsize!(10));
48 Config {
49 merkle_config: MmrConfig {
50 journal_partition: "mmr-journal".into(),
51 metadata_partition: "mmr-metadata".into(),
52 items_per_blob: NZU64!(4096),
53 write_buffer: NZUsize!(4096),
54 thread_pool: None,
55 page_cache: page_cache.clone(),
56 },
57 journal_config: FConfig {
58 partition: "log-journal".into(),
59 items_per_blob: NZU64!(4096),
60 write_buffer: NZUsize!(4096),
61 page_cache,
62 },
63 grafted_metadata_partition: "grafted-mmr-metadata".into(),
64 translator: Translator::default(),
65 }
66}
67
68impl<E> super::Syncable for Database<E>
69where
70 E: Storage + Clock + Metrics,
71{
72 type Family = mmr::Family;
73 type Operation = Operation;
74
75 fn create_test_operations(count: usize, seed: u64) -> Vec<Self::Operation> {
76 let mut hasher = <Hasher as CryptoHasher>::new();
77 let mut operations = Vec::new();
78 for i in 0..count {
79 let key = {
80 hasher.update(&i.to_be_bytes());
81 hasher.update(&seed.to_be_bytes());
82 hasher.finalize()
83 };
84
85 let value = {
86 hasher.update(&key);
87 hasher.update(b"value");
88 hasher.finalize()
89 };
90
91 operations.push(Operation::Update(Update(key, value)));
92
93 if (i + 1) % 10 == 0 {
94 operations.push(Operation::CommitFloor(None, Location::from(i + 1)));
95 }
96 }
97
98 operations.push(Operation::CommitFloor(None, Location::from(count)));
100 operations
101 }
102
103 async fn add_operations(
104 &mut self,
105 operations: Vec<Self::Operation>,
106 ) -> Result<(), qmdb::Error<mmr::Family>> {
107 if operations.last().is_none() || !operations.last().unwrap().is_commit() {
108 error!("operations must end with a commit");
109 return Ok(());
110 }
111
112 let mut batch = self.new_batch();
113 for operation in operations {
114 match operation {
115 Operation::Update(Update(key, value)) => {
116 batch = batch.write(key, Some(value));
117 }
118 Operation::Delete(key) => {
119 batch = batch.write(key, None);
120 }
121 Operation::CommitFloor(metadata, _) => {
122 let merkleized = batch.merkleize(self, metadata).await?;
123 self.apply_batch(merkleized).await?;
124 self.commit().await?;
125 batch = self.new_batch();
126 }
127 }
128 }
129 Ok(())
130 }
131
132 fn root(&self) -> Key {
133 self.ops_root()
136 }
137
138 async fn size(&self) -> Location {
139 self.bounds().await.end
140 }
141
142 async fn inactivity_floor(&self) -> Location {
143 self.inactivity_floor_loc()
144 }
145
146 fn historical_proof(
147 &self,
148 op_count: Location,
149 start_loc: Location,
150 max_ops: NonZeroU64,
151 ) -> impl Future<Output = Result<(Proof<Key>, Vec<Self::Operation>), qmdb::Error<mmr::Family>>> + Send
152 {
153 self.ops_historical_proof(op_count, start_loc, max_ops)
155 }
156
157 fn pinned_nodes_at(
158 &self,
159 loc: Location,
160 ) -> impl Future<Output = Result<Vec<Key>, qmdb::Error<mmr::Family>>> + Send {
161 self.pinned_nodes_at(loc)
162 }
163
164 fn name() -> &'static str {
165 "current"
166 }
167}
168
169#[cfg(test)]
170mod tests {
171 use super::*;
172 use crate::databases::Syncable;
173 use commonware_runtime::deterministic;
174
175 type CurrentDb = Database<deterministic::Context>;
176
177 #[test]
178 fn test_create_test_operations() {
179 let ops = <CurrentDb as Syncable>::create_test_operations(5, 12345);
180 assert_eq!(ops.len(), 6); if let Operation::CommitFloor(_, loc) = &ops[5] {
183 assert_eq!(*loc, 5);
184 } else {
185 panic!("last operation should be a commit");
186 }
187 }
188
189 #[test]
190 fn test_deterministic_operations() {
191 let ops1 = <CurrentDb as Syncable>::create_test_operations(3, 12345);
192 let ops2 = <CurrentDb as Syncable>::create_test_operations(3, 12345);
193 assert_eq!(ops1, ops2);
194
195 let ops3 = <CurrentDb as Syncable>::create_test_operations(3, 54321);
196 assert_ne!(ops1, ops3);
197 }
198}