commonware_sync/databases/
any.rs1use crate::{Hasher, Key, Translator, Value};
4use commonware_cryptography::Hasher as CryptoHasher;
5use commonware_parallel::Sequential;
6use commonware_runtime::{buffer, BufferPooler, Clock, Metrics, Storage};
7use commonware_storage::{
8 journal::contiguous::fixed::Config as FConfig,
9 mmr::{self, full::Config as MmrConfig, Location, Proof},
10 qmdb::{
11 self,
12 any::{
13 unordered::{
14 fixed::{Db, Operation as FixedOperation},
15 Update,
16 },
17 FixedConfig as Config,
18 },
19 operation::Committable,
20 },
21};
22use commonware_utils::{NZUsize, NZU16, NZU64};
23use std::{future::Future, num::NonZeroU64};
24use tracing::error;
25
26pub type Database<E> = Db<mmr::Family, E, Key, Value, Hasher, Translator, Sequential>;
28
29pub type Operation = FixedOperation<mmr::Family, Key, Value>;
31
32pub fn create_config(context: &impl BufferPooler) -> Config<Translator, Sequential> {
34 let page_cache = buffer::paged::CacheRef::from_pooler(context, NZU16!(2048), NZUsize!(10));
35 Config {
36 merkle_config: MmrConfig {
37 journal_partition: "mmr-journal".into(),
38 metadata_partition: "mmr-metadata".into(),
39 items_per_blob: NZU64!(4096),
40 write_buffer: NZUsize!(4096),
41 strategy: Sequential,
42 page_cache: page_cache.clone(),
43 },
44 journal_config: FConfig {
45 partition: "log-journal".into(),
46 items_per_blob: NZU64!(4096),
47 write_buffer: NZUsize!(4096),
48 page_cache,
49 },
50 translator: Translator::default(),
51 }
52}
53
54impl<E> crate::databases::ExampleDatabase for Database<E>
55where
56 E: Storage + Clock + Metrics,
57{
58 type Family = mmr::Family;
59 type Operation = Operation;
60
61 fn create_test_operations(count: usize, seed: u64, _starting_loc: u64) -> Vec<Self::Operation> {
62 let mut hasher = <Hasher as CryptoHasher>::new();
63 let mut operations = Vec::new();
64 for i in 0..count {
65 let key = {
66 hasher.update(&i.to_be_bytes());
67 hasher.update(&seed.to_be_bytes());
68 hasher.finalize()
69 };
70
71 let value = {
72 hasher.update(&key);
73 hasher.update(b"value");
74 hasher.finalize()
75 };
76
77 operations.push(Operation::Update(Update(key, value)));
78
79 if (i + 1) % 10 == 0 {
80 operations.push(Operation::CommitFloor(None, Location::from(i + 1)));
81 }
82 }
83
84 operations.push(Operation::CommitFloor(None, Location::from(count)));
86 operations
87 }
88
89 async fn add_operations(
90 &mut self,
91 operations: Vec<Self::Operation>,
92 ) -> Result<(), qmdb::Error<mmr::Family>> {
93 if operations.last().is_none() || !operations.last().unwrap().is_commit() {
94 error!("operations must end with a commit");
96 return Ok(());
97 }
98
99 let mut batch = self.new_batch();
100 for operation in operations {
101 match operation {
102 Operation::Update(Update(key, value)) => {
103 batch = batch.write(key, Some(value));
104 }
105 Operation::Delete(key) => {
106 batch = batch.write(key, None);
107 }
108 Operation::CommitFloor(metadata, _) => {
109 let merkleized = batch.merkleize(self, metadata).await?;
110 self.apply_batch(merkleized).await?;
111 self.commit().await?;
112 batch = self.new_batch();
113 }
114 }
115 }
116 Ok(())
117 }
118
119 fn current_floor(&self) -> u64 {
120 0
123 }
124
125 fn root(&self) -> Key {
126 self.root()
127 }
128
129 fn name() -> &'static str {
130 "any"
131 }
132}
133
134impl<E> crate::databases::Syncable for Database<E>
135where
136 E: Storage + Clock + Metrics,
137{
138 async fn size(&self) -> Location {
139 self.bounds().await.end
140 }
141
142 async fn sync_boundary(&self) -> Location {
143 self.sync_boundary()
144 }
145
146 fn historical_proof(
147 &self,
148 op_count: Location,
149 start_loc: Location,
150 max_ops: NonZeroU64,
151 ) -> impl Future<Output = Result<(Proof<Key>, Vec<Self::Operation>), qmdb::Error<mmr::Family>>> + Send
152 {
153 self.historical_proof(op_count, start_loc, max_ops)
154 }
155
156 fn pinned_nodes_at(
157 &self,
158 loc: Location,
159 ) -> impl Future<Output = Result<Vec<Key>, qmdb::Error<mmr::Family>>> + Send {
160 self.pinned_nodes_at(loc)
161 }
162}
163
164#[cfg(test)]
165mod tests {
166 use super::*;
167 use crate::databases::ExampleDatabase;
168 use commonware_runtime::deterministic;
169
170 type AnyDb = Database<deterministic::Context>;
171
172 #[test]
173 fn test_create_test_operations() {
174 let ops = <AnyDb as ExampleDatabase>::create_test_operations(5, 12345, 0);
175 assert_eq!(ops.len(), 6); if let Operation::CommitFloor(_, loc) = &ops[5] {
178 assert_eq!(*loc, 5);
179 } else {
180 panic!("Last operation should be a commit");
181 }
182 }
183
184 #[test]
185 fn test_deterministic_operations() {
186 let ops1 = <AnyDb as ExampleDatabase>::create_test_operations(3, 12345, 0);
188 let ops2 = <AnyDb as ExampleDatabase>::create_test_operations(3, 12345, 0);
189 assert_eq!(ops1, ops2);
190
191 let ops3 = <AnyDb as ExampleDatabase>::create_test_operations(3, 54321, 0);
193 assert_ne!(ops1, ops3);
194 }
195}