commonware_storage/qmdb/keyless/sync/
mod.rs1use crate::{
2 journal::{
3 authenticated,
4 contiguous::{Contiguous as _, Mutable, Reader as _},
5 Error as JournalError,
6 },
7 merkle::{
8 full::{self, Merkle},
9 Family, Location,
10 },
11 qmdb::{
12 self,
13 any::value::ValueEncoding,
14 keyless::{operation::Codec, CompactDb, Keyless, Metrics, Operation},
15 sync,
16 },
17 Context, Persistable,
18};
19use commonware_codec::{Encode, EncodeShared, Read};
20use commonware_cryptography::Hasher;
21use commonware_parallel::Strategy;
22use commonware_utils::range::NonEmptyRange;
23
24impl<F, E, V, C, H, S> sync::Database for Keyless<F, E, V, C, H, S>
25where
26 F: Family,
27 E: Context,
28 V: ValueEncoding + Codec,
29 C: Mutable<Item = Operation<F, V>>
30 + Persistable<Error = JournalError>
31 + sync::Journal<F, Context = E, Op = Operation<F, V>>,
32 C::Config: Clone + Send,
33 H: Hasher,
34 S: Strategy,
35 Operation<F, V>: EncodeShared,
36{
37 type Family = F;
38 type Op = Operation<F, V>;
39 type Journal = C;
40 type Hasher = H;
41 type Config = super::Config<C::Config, S>;
42 type Digest = H::Digest;
43 type Context = E;
44
45 async fn from_sync_result(
61 context: Self::Context,
62 config: Self::Config,
63 log: Self::Journal,
64 pinned_nodes: Option<Vec<Self::Digest>>,
65 range: NonEmptyRange<Location<F>>,
66 apply_batch_size: usize,
67 ) -> Result<Self, qmdb::Error<F>> {
68 let hasher = qmdb::hasher::<H>();
69
70 let merkle = Merkle::<F, _, _, S>::init_sync(
71 context.child("merkle"),
72 full::SyncConfig {
73 config: config.merkle.clone(),
74 range: range.clone(),
75 pinned_nodes,
76 },
77 )
78 .await?;
79
80 let journal = authenticated::Journal::<F, _, _, _, S>::from_components(
81 merkle,
82 log,
83 hasher,
84 apply_batch_size as u64,
85 )
86 .await?;
87
88 let (last_commit_loc, inactivity_floor_loc) = {
89 let reader = journal.reader().await;
90 let bounds = reader.bounds();
91 let loc = bounds
92 .end
93 .checked_sub(1)
94 .ok_or(qmdb::Error::HistoricalFloorPruned(Location::new(
95 bounds.end,
96 )))?;
97 let floor =
98 qmdb::find_inactivity_floor_at::<F, _>(&reader, Location::new(bounds.end), |op| {
99 op.has_floor()
100 })
101 .await?;
102 (Location::new(loc), floor)
103 };
104 let inactive_peaks = F::inactive_peaks(
105 F::location_to_position(Location::new(*last_commit_loc + 1)),
106 inactivity_floor_loc,
107 );
108 let root = journal.root(inactive_peaks)?;
109
110 let metrics = Metrics::new(context);
111 let db = Self {
112 journal,
113 root,
114 last_commit_loc,
115 inactivity_floor_loc,
116 metrics,
117 };
118 db.update_metrics().await;
119
120 db.sync().await?;
121 Ok(db)
122 }
123
124 fn root(&self) -> Self::Digest {
125 self.root()
126 }
127}
128
129impl<F, E, V, H, Cfg, S> sync::compact::Database for CompactDb<F, E, V, H, Cfg, S>
130where
131 F: Family,
132 E: Context,
133 V: ValueEncoding + Codec,
134 H: Hasher,
135 S: Strategy,
136 Operation<F, V>: EncodeShared,
137 Operation<F, V>: Read<Cfg = Cfg>,
138 Cfg: Clone + Send + Sync + 'static,
139{
140 type Family = F;
141 type Op = Operation<F, V>;
142 type Config = super::CompactConfig<Cfg, S>;
143 type Digest = H::Digest;
144 type Context = E;
145 type Hasher = H;
146
147 async fn from_validated_state(
148 context: Self::Context,
149 config: Self::Config,
150 state: sync::compact::ValidatedState<Self::Family, Self::Op, Self::Digest>,
151 ) -> Result<Self, qmdb::Error<F>> {
152 let sync::compact::ValidatedState {
153 state,
154 root,
155 inactivity_floor: inactivity_floor_loc,
156 } = state;
157 let sync::compact::State {
158 leaf_count,
159 pinned_nodes,
160 last_commit_op,
161 last_commit_proof,
162 } = state;
163 let last_commit_loc = Location::new(*leaf_count - 1);
164 let Operation::Commit(last_commit_metadata, op_floor) = last_commit_op else {
165 return Err(qmdb::Error::UnexpectedData(last_commit_loc));
166 };
167 assert_eq!(op_floor, inactivity_floor_loc, "inactivity floor mismatch");
168 let commit_codec_config = config.commit_codec_config.clone();
169 let last_commit_op_bytes =
170 Operation::<F, V>::Commit(last_commit_metadata.clone(), inactivity_floor_loc)
171 .encode()
172 .to_vec();
173 let merkle = crate::merkle::compact::Merkle::init_from_compact_state(
174 context.child("merkle"),
175 config.merkle,
176 leaf_count,
177 pinned_nodes.clone(),
178 )
179 .await?;
180 Self::init_from_verified_state(
181 merkle,
182 commit_codec_config,
183 last_commit_metadata,
184 inactivity_floor_loc,
185 root,
186 last_commit_op_bytes,
187 last_commit_proof,
188 pinned_nodes,
189 )
190 }
191
192 fn inactivity_floor(op: &Self::Op) -> Option<Location<Self::Family>> {
193 op.has_floor()
194 }
195
196 fn root(&self) -> Self::Digest {
197 self.root()
198 }
199
200 async fn persist_compact_state(&self) -> Result<(), qmdb::Error<F>> {
201 self.persist_cached_witness().await
202 }
203}
204
205#[cfg(test)]
206mod tests;