commonware_sync/databases/
immutable.rs1use crate::{Hasher, Key, Translator, Value};
4use commonware_cryptography::{Hasher as CryptoHasher, Sha256};
5use commonware_parallel::Sequential;
6use commonware_runtime::{BufferPooler, Clock, Metrics, Storage};
7use commonware_storage::{
8 journal::contiguous::fixed::Config as FConfig,
9 merkle::{
10 full::Config as MmrConfig,
11 mmr::{self, Location, Proof},
12 },
13 qmdb::{
14 self,
15 immutable::{fixed, Config},
16 sync::compact,
17 },
18};
19use commonware_utils::{NZUsize, NZU16, NZU64};
20use std::{future::Future, num::NonZeroU64};
21use tracing::error;
22
23pub type Database<E> = fixed::Db<mmr::Family, E, Key, Value, Hasher, Translator, Sequential>;
25
26pub type Operation = fixed::Operation<mmr::Family, Key, Value>;
28
29pub fn create_config(context: &impl BufferPooler) -> Config<Translator, FConfig, Sequential> {
31 let page_cache = commonware_runtime::buffer::paged::CacheRef::from_pooler(
32 context,
33 NZU16!(2048),
34 NZUsize!(10),
35 );
36 Config {
37 merkle_config: MmrConfig {
38 journal_partition: "mmr-journal".into(),
39 metadata_partition: "mmr-metadata".into(),
40 items_per_blob: NZU64!(4096),
41 write_buffer: NZUsize!(4096),
42 strategy: Sequential,
43 page_cache: page_cache.clone(),
44 },
45 log: FConfig {
46 partition: "log".into(),
47 items_per_blob: NZU64!(4096),
48 write_buffer: NZUsize!(4096),
49 page_cache,
50 },
51 translator: commonware_storage::translator::EightCap,
52 }
53}
54
55pub fn create_test_operations(count: usize, seed: u64, starting_loc: u64) -> Vec<Operation> {
61 let mut operations = Vec::new();
62 let mut hasher = <Hasher as CryptoHasher>::new();
63 let floor = Location::new(starting_loc);
64
65 for i in 0..count {
66 let key = {
67 hasher.update(&i.to_be_bytes());
68 hasher.update(&seed.to_be_bytes());
69 hasher.finalize()
70 };
71
72 let value = {
73 hasher.update(&key);
74 hasher.update(b"value");
75 hasher.finalize()
76 };
77
78 operations.push(Operation::Set(key, value));
79
80 if (i + 1) % 10 == 0 {
81 operations.push(Operation::Commit(None, floor));
82 }
83 }
84
85 operations.push(Operation::Commit(Some(Sha256::fill(1)), floor));
87 operations
88}
89
90impl<E> super::ExampleDatabase for Database<E>
91where
92 E: Storage + Clock + Metrics,
93{
94 type Family = mmr::Family;
95 type Operation = Operation;
96
97 fn create_test_operations(count: usize, seed: u64, starting_loc: u64) -> Vec<Self::Operation> {
98 create_test_operations(count, seed, starting_loc)
99 }
100
101 async fn add_operations(
102 &mut self,
103 operations: Vec<Self::Operation>,
104 ) -> Result<(), commonware_storage::qmdb::Error<mmr::Family>> {
105 if operations.last().is_none() || !operations.last().unwrap().is_commit() {
106 error!("operations must end with a commit");
108 return Ok(());
109 }
110
111 let mut batch = self.new_batch();
112 for operation in operations {
113 match operation {
114 Operation::Set(key, value) => {
115 batch = batch.set(key, value);
116 }
117 Operation::Commit(metadata, floor) => {
118 let merkleized = batch.merkleize(self, metadata, floor);
119 self.apply_batch(merkleized).await?;
120 self.commit().await?;
121 batch = self.new_batch();
122 }
123 }
124 }
125 Ok(())
126 }
127
128 fn current_floor(&self) -> u64 {
129 *self.inactivity_floor_loc()
130 }
131
132 fn root(&self) -> Key {
133 self.root()
134 }
135
136 fn name() -> &'static str {
137 "immutable"
138 }
139}
140
141impl<E> super::Syncable for Database<E>
142where
143 E: Storage + Clock + Metrics,
144{
145 async fn size(&self) -> Location {
146 self.bounds().await.end
147 }
148
149 async fn sync_boundary(&self) -> Location {
150 self.sync_boundary()
151 }
152
153 fn historical_proof(
154 &self,
155 op_count: Location,
156 start_loc: Location,
157 max_ops: NonZeroU64,
158 ) -> impl Future<Output = Result<(Proof<Key>, Vec<Self::Operation>), qmdb::Error<mmr::Family>>> + Send
159 {
160 self.historical_proof(op_count, start_loc, max_ops)
161 }
162
163 fn pinned_nodes_at(
164 &self,
165 loc: Location,
166 ) -> impl Future<Output = Result<Vec<Key>, qmdb::Error<mmr::Family>>> + Send {
167 self.pinned_nodes_at(loc)
168 }
169}
170
171impl<E> super::CompactSyncable for Database<E>
172where
173 E: Storage + Clock + Metrics,
174{
175 async fn current_target(&self) -> compact::Target<Self::Family, Key> {
176 compact::Target::new(self.root(), self.bounds().await.end)
177 }
178}