commonware_sync/databases/
current.rs1use crate::{Hasher, Key, Translator, Value};
18use commonware_codec::FixedSize;
19use commonware_cryptography::{sha256, Hasher as CryptoHasher};
20use commonware_runtime::{buffer, BufferPooler, Clock, Metrics, Storage};
21use commonware_storage::{
22 mmr::{Location, Proof},
23 qmdb::{
24 self,
25 any::unordered::{fixed::Operation as FixedOperation, Update},
26 current::{self, FixedConfig as Config},
27 operation::Committable,
28 store::LogStore,
29 },
30};
31use commonware_utils::{NZUsize, NZU16, NZU64};
32use std::{future::Future, num::NonZeroU64};
33use tracing::error;
34
35const CHUNK_SIZE: usize = sha256::Digest::SIZE;
37
38pub type Database<E> = current::unordered::fixed::Db<E, Key, Value, Hasher, Translator, CHUNK_SIZE>;
40
41pub type Operation = FixedOperation<Key, Value>;
43
44pub fn create_config(context: &impl BufferPooler) -> Config<Translator> {
46 Config {
47 mmr_journal_partition: "mmr-journal".into(),
48 mmr_metadata_partition: "mmr-metadata".into(),
49 mmr_items_per_blob: NZU64!(4096),
50 mmr_write_buffer: NZUsize!(4096),
51 log_journal_partition: "log-journal".into(),
52 log_items_per_blob: NZU64!(4096),
53 log_write_buffer: NZUsize!(4096),
54 grafted_mmr_metadata_partition: "grafted-mmr-metadata".into(),
55 translator: Translator::default(),
56 thread_pool: None,
57 page_cache: buffer::paged::CacheRef::from_pooler(context, NZU16!(2048), NZUsize!(10)),
58 }
59}
60
61impl<E> super::Syncable for Database<E>
62where
63 E: Storage + Clock + Metrics,
64{
65 type Operation = Operation;
66
67 fn create_test_operations(count: usize, seed: u64) -> Vec<Self::Operation> {
68 let mut hasher = <Hasher as CryptoHasher>::new();
69 let mut operations = Vec::new();
70 for i in 0..count {
71 let key = {
72 hasher.update(&i.to_be_bytes());
73 hasher.update(&seed.to_be_bytes());
74 hasher.finalize()
75 };
76
77 let value = {
78 hasher.update(&key);
79 hasher.update(b"value");
80 hasher.finalize()
81 };
82
83 operations.push(Operation::Update(Update(key, value)));
84
85 if (i + 1) % 10 == 0 {
86 operations.push(Operation::CommitFloor(None, Location::from(i + 1)));
87 }
88 }
89
90 operations.push(Operation::CommitFloor(None, Location::from(count)));
92 operations
93 }
94
95 async fn add_operations(
96 &mut self,
97 operations: Vec<Self::Operation>,
98 ) -> Result<(), qmdb::Error> {
99 if operations.last().is_none() || !operations.last().unwrap().is_commit() {
100 error!("operations must end with a commit");
101 return Ok(());
102 }
103
104 let mut batch = self.new_batch();
105 for operation in operations {
106 match operation {
107 Operation::Update(Update(key, value)) => {
108 batch.write(key, Some(value));
109 }
110 Operation::Delete(key) => {
111 batch.write(key, None);
112 }
113 Operation::CommitFloor(metadata, _) => {
114 let finalized = batch.merkleize(metadata).await?.finalize();
115 self.apply_batch(finalized).await?;
116 batch = self.new_batch();
117 }
118 }
119 }
120 Ok(())
121 }
122
123 fn root(&self) -> Key {
124 self.ops_root()
127 }
128
129 async fn size(&self) -> Location {
130 LogStore::bounds(self).await.end
131 }
132
133 async fn inactivity_floor(&self) -> Location {
134 self.inactivity_floor_loc()
135 }
136
137 fn historical_proof(
138 &self,
139 op_count: Location,
140 start_loc: Location,
141 max_ops: NonZeroU64,
142 ) -> impl Future<Output = Result<(Proof<Key>, Vec<Self::Operation>), qmdb::Error>> + Send {
143 self.ops_historical_proof(op_count, start_loc, max_ops)
145 }
146
147 fn name() -> &'static str {
148 "current"
149 }
150}
151
152#[cfg(test)]
153mod tests {
154 use super::*;
155 use crate::databases::Syncable;
156 use commonware_runtime::deterministic;
157
158 type CurrentDb = Database<deterministic::Context>;
159
160 #[test]
161 fn test_create_test_operations() {
162 let ops = <CurrentDb as Syncable>::create_test_operations(5, 12345);
163 assert_eq!(ops.len(), 6); if let Operation::CommitFloor(_, loc) = &ops[5] {
166 assert_eq!(*loc, 5);
167 } else {
168 panic!("last operation should be a commit");
169 }
170 }
171
172 #[test]
173 fn test_deterministic_operations() {
174 let ops1 = <CurrentDb as Syncable>::create_test_operations(3, 12345);
175 let ops2 = <CurrentDb as Syncable>::create_test_operations(3, 12345);
176 assert_eq!(ops1, ops2);
177
178 let ops3 = <CurrentDb as Syncable>::create_test_operations(3, 54321);
179 assert_ne!(ops1, ops3);
180 }
181}