commonware_sync/databases/
any.rs1use crate::{Hasher, Key, Translator, Value};
4use commonware_cryptography::Hasher as CryptoHasher;
5use commonware_runtime::{buffer, BufferPooler, Clock, Metrics, Storage};
6use commonware_storage::{
7 journal::contiguous::fixed::Config as FConfig,
8 mmr::{self, journaled::Config as MmrConfig, Location, Proof},
9 qmdb::{
10 self,
11 any::{
12 unordered::{
13 fixed::{Db, Operation as FixedOperation},
14 Update,
15 },
16 FixedConfig as Config,
17 },
18 operation::Committable,
19 },
20};
21use commonware_utils::{NZUsize, NZU16, NZU64};
22use std::{future::Future, num::NonZeroU64};
23use tracing::error;
24
25pub type Database<E> = Db<mmr::Family, E, Key, Value, Hasher, Translator>;
27
28pub type Operation = FixedOperation<mmr::Family, Key, Value>;
30
31pub fn create_config(context: &impl BufferPooler) -> Config<Translator> {
33 let page_cache = buffer::paged::CacheRef::from_pooler(context, NZU16!(2048), NZUsize!(10));
34 Config {
35 merkle_config: MmrConfig {
36 journal_partition: "mmr-journal".into(),
37 metadata_partition: "mmr-metadata".into(),
38 items_per_blob: NZU64!(4096),
39 write_buffer: NZUsize!(4096),
40 thread_pool: None,
41 page_cache: page_cache.clone(),
42 },
43 journal_config: FConfig {
44 partition: "log-journal".into(),
45 items_per_blob: NZU64!(4096),
46 write_buffer: NZUsize!(4096),
47 page_cache,
48 },
49 translator: Translator::default(),
50 }
51}
52
53impl<E> crate::databases::Syncable for Database<E>
54where
55 E: Storage + Clock + Metrics,
56{
57 type Family = mmr::Family;
58 type Operation = Operation;
59
60 fn create_test_operations(count: usize, seed: u64) -> Vec<Self::Operation> {
61 let mut hasher = <Hasher as CryptoHasher>::new();
62 let mut operations = Vec::new();
63 for i in 0..count {
64 let key = {
65 hasher.update(&i.to_be_bytes());
66 hasher.update(&seed.to_be_bytes());
67 hasher.finalize()
68 };
69
70 let value = {
71 hasher.update(&key);
72 hasher.update(b"value");
73 hasher.finalize()
74 };
75
76 operations.push(Operation::Update(Update(key, value)));
77
78 if (i + 1) % 10 == 0 {
79 operations.push(Operation::CommitFloor(None, Location::from(i + 1)));
80 }
81 }
82
83 operations.push(Operation::CommitFloor(None, Location::from(count)));
85 operations
86 }
87
88 async fn add_operations(
89 &mut self,
90 operations: Vec<Self::Operation>,
91 ) -> Result<(), qmdb::Error<mmr::Family>> {
92 if operations.last().is_none() || !operations.last().unwrap().is_commit() {
93 error!("operations must end with a commit");
95 return Ok(());
96 }
97
98 let mut batch = self.new_batch();
99 for operation in operations {
100 match operation {
101 Operation::Update(Update(key, value)) => {
102 batch = batch.write(key, Some(value));
103 }
104 Operation::Delete(key) => {
105 batch = batch.write(key, None);
106 }
107 Operation::CommitFloor(metadata, _) => {
108 let merkleized = batch.merkleize(self, metadata).await?;
109 self.apply_batch(merkleized).await?;
110 self.commit().await?;
111 batch = self.new_batch();
112 }
113 }
114 }
115 Ok(())
116 }
117
118 fn root(&self) -> Key {
119 self.root()
120 }
121
122 async fn size(&self) -> Location {
123 self.bounds().await.end
124 }
125
126 async fn inactivity_floor(&self) -> Location {
127 self.inactivity_floor_loc()
128 }
129
130 fn historical_proof(
131 &self,
132 op_count: Location,
133 start_loc: Location,
134 max_ops: NonZeroU64,
135 ) -> impl Future<Output = Result<(Proof<Key>, Vec<Self::Operation>), qmdb::Error<mmr::Family>>> + Send
136 {
137 self.historical_proof(op_count, start_loc, max_ops)
138 }
139
140 fn pinned_nodes_at(
141 &self,
142 loc: Location,
143 ) -> impl Future<Output = Result<Vec<Key>, qmdb::Error<mmr::Family>>> + Send {
144 self.pinned_nodes_at(loc)
145 }
146
147 fn name() -> &'static str {
148 "any"
149 }
150}
151
152#[cfg(test)]
153mod tests {
154 use super::*;
155 use crate::databases::Syncable;
156 use commonware_runtime::deterministic;
157
158 type AnyDb = Database<deterministic::Context>;
159
160 #[test]
161 fn test_create_test_operations() {
162 let ops = <AnyDb as Syncable>::create_test_operations(5, 12345);
163 assert_eq!(ops.len(), 6); if let Operation::CommitFloor(_, loc) = &ops[5] {
166 assert_eq!(*loc, 5);
167 } else {
168 panic!("Last operation should be a commit");
169 }
170 }
171
172 #[test]
173 fn test_deterministic_operations() {
174 let ops1 = <AnyDb as Syncable>::create_test_operations(3, 12345);
176 let ops2 = <AnyDb as Syncable>::create_test_operations(3, 12345);
177 assert_eq!(ops1, ops2);
178
179 let ops3 = <AnyDb as Syncable>::create_test_operations(3, 54321);
181 assert_ne!(ops1, ops3);
182 }
183}