1use crate::{
2 index::unordered::Index,
3 journal::{
4 authenticated,
5 contiguous::{Mutable, Reader as _},
6 Error as JournalError,
7 },
8 merkle::{
9 full::{self, Merkle},
10 Family, Location,
11 },
12 qmdb::{
13 self,
14 any::ValueEncoding,
15 build_snapshot_from_log,
16 immutable::{self, CompactDb, Metrics, Operation},
17 operation::Key,
18 sync::{self},
19 Error,
20 },
21 translator::Translator,
22 Context, Persistable,
23};
24use commonware_codec::{Encode, EncodeShared, Read};
25use commonware_cryptography::Hasher;
26use commonware_parallel::Strategy;
27use commonware_utils::range::NonEmptyRange;
28
29impl<F, E, K, V, C, H, T, S> sync::Database for immutable::Immutable<F, E, K, V, C, H, T, S>
30where
31 F: Family,
32 E: Context,
33 K: Key,
34 V: ValueEncoding,
35 C: Mutable<Item = Operation<F, K, V>>
36 + Persistable<Error = JournalError>
37 + sync::Journal<F, Context = E, Op = Operation<F, K, V>>,
38 C::Item: EncodeShared,
39 C::Config: Clone + Send,
40 H: Hasher,
41 T: Translator,
42 S: Strategy,
43{
44 type Family = F;
45 type Op = Operation<F, K, V>;
46 type Journal = C;
47 type Hasher = H;
48 type Config = immutable::Config<T, C::Config, S>;
49 type Digest = H::Digest;
50 type Context = E;
51
52 async fn from_sync_result(
69 context: Self::Context,
70 db_config: Self::Config,
71 log: Self::Journal,
72 pinned_nodes: Option<Vec<Self::Digest>>,
73 range: NonEmptyRange<Location<F>>,
74 apply_batch_size: usize,
75 ) -> Result<Self, Error<F>> {
76 let hasher = qmdb::hasher::<H>();
77
78 let merkle = Merkle::<F, _, _, S>::init_sync(
80 context.child("merkle"),
81 full::SyncConfig {
82 config: db_config.merkle_config.clone(),
83 range: range.clone(),
84 pinned_nodes,
85 },
86 )
87 .await?;
88
89 let journal = authenticated::Journal::<_, _, _, _, S>::from_components(
90 merkle,
91 log,
92 hasher,
93 apply_batch_size as u64,
94 )
95 .await?;
96
97 let mut snapshot: Index<T, Location<F>> =
98 Index::new(context.child("snapshot"), db_config.translator.clone());
99
100 let (last_commit_loc, inactivity_floor_loc) = {
101 let reader = journal.journal.reader().await;
102 let bounds = reader.bounds();
103 let last_commit_loc = Location::<F>::new(
104 bounds
105 .end
106 .checked_sub(1)
107 .ok_or(Error::HistoricalFloorPruned(Location::new(bounds.end)))?,
108 );
109 let inactivity_floor_loc = crate::qmdb::find_inactivity_floor_at::<F, _>(
110 &reader,
111 Location::new(bounds.end),
112 |op| op.has_floor(),
113 )
114 .await?;
115
116 build_snapshot_from_log::<F, _, _, _>(
118 inactivity_floor_loc,
119 &reader,
120 &mut snapshot,
121 |_, _| {},
122 )
123 .await?;
124
125 (last_commit_loc, inactivity_floor_loc)
126 };
127 let inactive_peaks = F::inactive_peaks(
128 F::location_to_position(Location::new(*last_commit_loc + 1)),
129 inactivity_floor_loc,
130 );
131 let root = journal.root(inactive_peaks)?;
132
133 let metrics = Metrics::new(context);
134 let db = Self {
135 journal,
136 root,
137 snapshot,
138 last_commit_loc,
139 inactivity_floor_loc,
140 metrics,
141 };
142 db.update_metrics().await;
143
144 db.sync().await?;
145 Ok(db)
146 }
147
148 fn root(&self) -> Self::Digest {
149 self.root()
150 }
151}
152
153impl<F, E, K, V, H, Cfg, S> sync::compact::Database for CompactDb<F, E, K, V, H, Cfg, S>
154where
155 F: Family,
156 E: Context,
157 K: Key,
158 V: ValueEncoding,
159 H: Hasher,
160 S: Strategy,
161 Operation<F, K, V>: EncodeShared,
162 Operation<F, K, V>: Read<Cfg = Cfg>,
163 Cfg: Clone + Send + Sync + 'static,
164{
165 type Family = F;
166 type Op = Operation<F, K, V>;
167 type Config = immutable::CompactConfig<Cfg, S>;
168 type Digest = H::Digest;
169 type Context = E;
170 type Hasher = H;
171
172 async fn from_validated_state(
173 context: Self::Context,
174 config: Self::Config,
175 state: sync::compact::ValidatedState<Self::Family, Self::Op, Self::Digest>,
176 ) -> Result<Self, Error<F>> {
177 let sync::compact::ValidatedState {
178 state,
179 root,
180 inactivity_floor: inactivity_floor_loc,
181 } = state;
182 let sync::compact::State {
183 leaf_count,
184 pinned_nodes,
185 last_commit_op,
186 last_commit_proof,
187 } = state;
188 let last_commit_loc = Location::new(*leaf_count - 1);
189 let Operation::Commit(last_commit_metadata, op_floor) = last_commit_op else {
190 return Err(Error::UnexpectedData(last_commit_loc));
191 };
192 assert_eq!(op_floor, inactivity_floor_loc, "inactivity floor mismatch");
193 let commit_codec_config = config.commit_codec_config.clone();
194 let last_commit_op_bytes =
195 Operation::<F, K, V>::Commit(last_commit_metadata.clone(), inactivity_floor_loc)
196 .encode()
197 .to_vec();
198 let merkle = crate::merkle::compact::Merkle::init_from_compact_state(
199 context.child("merkle"),
200 config.merkle,
201 leaf_count,
202 pinned_nodes.clone(),
203 )
204 .await?;
205 Self::init_from_verified_state(
206 merkle,
207 commit_codec_config,
208 last_commit_metadata,
209 inactivity_floor_loc,
210 root,
211 last_commit_op_bytes,
212 last_commit_proof,
213 pinned_nodes,
214 )
215 }
216
217 fn inactivity_floor(op: &Self::Op) -> Option<Location<Self::Family>> {
218 op.has_floor()
219 }
220
221 fn root(&self) -> Self::Digest {
222 self.root()
223 }
224
225 async fn persist_compact_state(&self) -> Result<(), Error<F>> {
226 self.persist_cached_witness().await
227 }
228}
229
230#[cfg(test)]
231mod tests;