1use crate::{
28 index::Factory as IndexFactory,
29 journal::{
30 authenticated,
31 contiguous::{fixed, variable, Mutable},
32 },
33 merkle::{
34 mmr::{self, Family, Location, StandardHasher},
35 Family as _,
36 },
37 qmdb::{
38 self,
39 any::{
40 db::Db as AnyDb,
41 operation::{update::Update, Operation},
42 ordered::{
43 fixed::{Operation as OrderedFixedOp, Update as OrderedFixedUpdate},
44 variable::{Operation as OrderedVariableOp, Update as OrderedVariableUpdate},
45 },
46 unordered::{
47 fixed::{Operation as UnorderedFixedOp, Update as UnorderedFixedUpdate},
48 variable::{Operation as UnorderedVariableOp, Update as UnorderedVariableUpdate},
49 },
50 FixedValue, VariableValue,
51 },
52 current::{
53 db, grafting,
54 ordered::{
55 fixed::Db as CurrentOrderedFixedDb, variable::Db as CurrentOrderedVariableDb,
56 },
57 unordered::{
58 fixed::Db as CurrentUnorderedFixedDb, variable::Db as CurrentUnorderedVariableDb,
59 },
60 FixedConfig, VariableConfig,
61 },
62 operation::{Committable, Key},
63 sync::{Database, DatabaseConfig as Config},
64 },
65 translator::Translator,
66 Context, Persistable,
67};
68use commonware_codec::{Codec, CodecShared, Read as CodecRead};
69use commonware_cryptography::{DigestOf, Hasher};
70use commonware_utils::{bitmap::Prunable as BitMap, channel::oneshot, sync::AsyncMutex, Array};
71use std::{ops::Range, sync::Arc};
72
73#[cfg(test)]
74pub(crate) mod tests;
75
76impl<T: Translator, J: Clone> Config for super::Config<T, J> {
77 type JournalConfig = J;
78
79 fn journal_config(&self) -> Self::JournalConfig {
80 self.journal_config.clone()
81 }
82}
83
84#[allow(clippy::too_many_arguments)]
92async fn build_db<E, U, I, H, J, T, const N: usize>(
93 context: E,
94 mmr_config: mmr::journaled::Config,
95 log: J,
96 translator: T,
97 pinned_nodes: Option<Vec<H::Digest>>,
98 range: Range<Location>,
99 apply_batch_size: usize,
100 metadata_partition: String,
101 thread_pool: Option<commonware_parallel::ThreadPool>,
102) -> Result<db::Db<Family, E, J, I, H, U, N>, qmdb::Error<Family>>
103where
104 E: Context,
105 U: Update + Send + Sync + 'static,
106 I: IndexFactory<T, Value = Location>,
107 H: Hasher,
108 T: Translator,
109 J: Mutable<Item = Operation<Family, U>> + Persistable<Error = crate::journal::Error>,
110 Operation<Family, U>: Codec + Committable + CodecShared,
111{
112 let hasher = StandardHasher::<H>::new();
114 let mmr = mmr::journaled::Mmr::init_sync(
115 context.with_label("mmr"),
116 mmr::journaled::SyncConfig {
117 config: mmr_config,
118 range: range.clone(),
119 pinned_nodes,
120 },
121 &hasher,
122 )
123 .await?;
124 let index = I::new(context.with_label("index"), translator);
125 let log = authenticated::Journal::<Family, _, _, _>::from_components(
126 mmr,
127 log,
128 hasher,
129 apply_batch_size as u64,
130 )
131 .await?;
132
133 let pruned_chunks = (*range.start / BitMap::<N>::CHUNK_SIZE_BITS) as usize;
140 let mut status = BitMap::<N>::new_with_pruned_chunks(pruned_chunks)
141 .map_err(|_| qmdb::Error::<Family>::DataCorrupted("pruned chunks overflow"))?;
142
143 let known_inactivity_floor = Location::new(status.len());
148 let any: AnyDb<Family, E, J, I, H, U> = AnyDb::init_from_log(
149 index,
150 log,
151 Some(known_inactivity_floor),
152 |is_active: bool, old_loc: Option<Location>| {
153 status.push(is_active);
154 if let Some(loc) = old_loc {
155 status.set_bit(*loc, false);
156 }
157 },
158 )
159 .await?;
160
161 let grafted_pinned_nodes = {
172 let ops_pin_positions = mmr::Family::nodes_to_pin(range.start);
173 let num_grafted_pins = (pruned_chunks as u64).count_ones() as usize;
174 let mut pins = Vec::with_capacity(num_grafted_pins);
175 for pos in ops_pin_positions.take(num_grafted_pins) {
176 let digest = any.log.merkle.get_node(pos).await?.ok_or(
177 qmdb::Error::<mmr::Family>::DataCorrupted("missing ops pinned node"),
178 )?;
179 pins.push(digest);
180 }
181 pins
182 };
183
184 let hasher = StandardHasher::<H>::new();
186 let grafted_tree = db::build_grafted_tree::<Family, H, N>(
187 &hasher,
188 &status,
189 &grafted_pinned_nodes,
190 &any.log.merkle,
191 thread_pool.as_ref(),
192 )
193 .await?;
194
195 let storage = grafting::Storage::new(&grafted_tree, grafting::height::<N>(), &any.log.merkle);
199 let partial = db::partial_chunk(&status);
200 let grafted_root = db::compute_grafted_root(&hasher, &status, &storage).await?;
201 let ops_root = any.log.root();
202 let partial_digest = partial.map(|(chunk, next_bit)| {
203 let digest = hasher.digest(&chunk);
204 (next_bit, digest)
205 });
206 let root = db::combine_roots(
207 &hasher,
208 &ops_root,
209 &grafted_root,
210 partial_digest.as_ref().map(|(nb, d)| (*nb, d)),
211 );
212
213 let (metadata, _, _) = db::init_metadata::<Family, E, DigestOf<H>>(
215 context.with_label("metadata"),
216 &metadata_partition,
217 )
218 .await?;
219
220 let current_db = db::Db {
221 any,
222 status: crate::qmdb::current::batch::BitmapBatch::Base(Arc::new(status)),
223 grafted_tree,
224 metadata: AsyncMutex::new(metadata),
225 thread_pool,
226 root,
227 };
228
229 current_db.sync_metadata().await?;
231
232 Ok(current_db)
233}
234
235macro_rules! impl_current_sync_database {
238 ($db:ident, $op:ident, $update:ident,
239 $journal:ty, $config:ty,
240 $key_bound:path, $value_bound:ident
241 $(; $($where_extra:tt)+)?) => {
242 impl<E, K, V, H, T, const N: usize> Database for $db<Family, E, K, V, H, T, N>
243 where
244 E: Context,
245 K: $key_bound,
246 V: $value_bound + 'static,
247 H: Hasher,
248 T: Translator,
249 $($($where_extra)+)?
250 {
251 type Context = E;
252 type Op = $op<Family, K, V>;
253 type Journal = $journal;
254 type Hasher = H;
255 type Config = $config;
256 type Digest = H::Digest;
257
258 async fn from_sync_result(
259 context: Self::Context,
260 config: Self::Config,
261 log: Self::Journal,
262 pinned_nodes: Option<Vec<Self::Digest>>,
263 range: Range<Location>,
264 apply_batch_size: usize,
265 ) -> Result<Self, qmdb::Error<Family>> {
266 let mmr_config = config.merkle_config.clone();
267 let metadata_partition = config.grafted_metadata_partition.clone();
268 let thread_pool = config.merkle_config.thread_pool.clone();
269 let translator = config.translator.clone();
270 build_db::<_, $update<K, V>, _, H, _, T, N>(
271 context,
272 mmr_config,
273 log,
274 translator,
275 pinned_nodes,
276 range,
277 apply_batch_size,
278 metadata_partition,
279 thread_pool,
280 )
281 .await
282 }
283
284 fn root(&self) -> Self::Digest {
287 self.any.log.root()
288 }
289 }
290 };
291}
292
293impl_current_sync_database!(
294 CurrentUnorderedFixedDb, UnorderedFixedOp, UnorderedFixedUpdate,
295 fixed::Journal<E, Self::Op>, FixedConfig<T>,
296 Array, FixedValue
297);
298
299impl_current_sync_database!(
300 CurrentUnorderedVariableDb, UnorderedVariableOp, UnorderedVariableUpdate,
301 variable::Journal<E, Self::Op>,
302 VariableConfig<T, <UnorderedVariableOp<Family, K, V> as CodecRead>::Cfg>,
303 Key, VariableValue;
304 UnorderedVariableOp<Family, K, V>: CodecShared
305);
306
307impl_current_sync_database!(
308 CurrentOrderedFixedDb, OrderedFixedOp, OrderedFixedUpdate,
309 fixed::Journal<E, Self::Op>, FixedConfig<T>,
310 Array, FixedValue
311);
312
313impl_current_sync_database!(
314 CurrentOrderedVariableDb, OrderedVariableOp, OrderedVariableUpdate,
315 variable::Journal<E, Self::Op>,
316 VariableConfig<T, <OrderedVariableOp<Family, K, V> as CodecRead>::Cfg>,
317 Key, VariableValue;
318 OrderedVariableOp<Family, K, V>: CodecShared
319);
320
321macro_rules! impl_current_resolver {
327 ($db:ident, $op:ident, $val_bound:ident, $key_bound:path $(; $($where_extra:tt)+)?) => {
328 impl<E, K, V, H, T, const N: usize> crate::qmdb::sync::Resolver
329 for std::sync::Arc<$db<Family, E, K, V, H, T, N>>
330 where
331 E: Context,
332 K: $key_bound,
333 V: $val_bound + Send + Sync + 'static,
334 H: Hasher,
335 T: Translator + Send + Sync + 'static,
336 T::Key: Send + Sync,
337 $($($where_extra)+)?
338 {
339 type Digest = H::Digest;
340 type Op = $op<Family, K, V>;
341 type Error = qmdb::Error<Family>;
342
343 async fn get_operations(
344 &self,
345 op_count: Location,
346 start_loc: Location,
347 max_ops: std::num::NonZeroU64,
348 include_pinned_nodes: bool,
349 _cancel_rx: oneshot::Receiver<()>,
350 ) -> Result<crate::qmdb::sync::FetchResult<Self::Op, Self::Digest>, Self::Error> {
351 let (proof, operations) = self.any
352 .historical_proof(op_count, start_loc, max_ops)
353 .await?;
354 let pinned_nodes = if include_pinned_nodes {
355 Some(self.any.pinned_nodes_at(start_loc).await?)
356 } else {
357 None
358 };
359 Ok(crate::qmdb::sync::FetchResult {
360 proof,
361 operations,
362 success_tx: oneshot::channel().0,
363 pinned_nodes,
364 })
365 }
366 }
367
368 impl<E, K, V, H, T, const N: usize> crate::qmdb::sync::Resolver
369 for std::sync::Arc<
370 commonware_utils::sync::AsyncRwLock<
371 $db<Family, E, K, V, H, T, N>,
372 >,
373 >
374 where
375 E: Context,
376 K: $key_bound,
377 V: $val_bound + Send + Sync + 'static,
378 H: Hasher,
379 T: Translator + Send + Sync + 'static,
380 T::Key: Send + Sync,
381 $($($where_extra)+)?
382 {
383 type Digest = H::Digest;
384 type Op = $op<Family, K, V>;
385 type Error = qmdb::Error<Family>;
386
387 async fn get_operations(
388 &self,
389 op_count: Location,
390 start_loc: Location,
391 max_ops: std::num::NonZeroU64,
392 include_pinned_nodes: bool,
393 _cancel_rx: oneshot::Receiver<()>,
394 ) -> Result<crate::qmdb::sync::FetchResult<Self::Op, Self::Digest>, qmdb::Error<Family>> {
395 let db = self.read().await;
396 let (proof, operations) = db.any
397 .historical_proof(op_count, start_loc, max_ops)
398 .await?;
399 let pinned_nodes = if include_pinned_nodes {
400 Some(db.any.pinned_nodes_at(start_loc).await?)
401 } else {
402 None
403 };
404 Ok(crate::qmdb::sync::FetchResult {
405 proof,
406 operations,
407 success_tx: oneshot::channel().0,
408 pinned_nodes,
409 })
410 }
411 }
412
413 impl<E, K, V, H, T, const N: usize> crate::qmdb::sync::Resolver
414 for std::sync::Arc<
415 commonware_utils::sync::AsyncRwLock<
416 Option<$db<Family, E, K, V, H, T, N>>,
417 >,
418 >
419 where
420 E: Context,
421 K: $key_bound,
422 V: $val_bound + Send + Sync + 'static,
423 H: Hasher,
424 T: Translator + Send + Sync + 'static,
425 T::Key: Send + Sync,
426 $($($where_extra)+)?
427 {
428 type Digest = H::Digest;
429 type Op = $op<Family, K, V>;
430 type Error = qmdb::Error<Family>;
431
432 async fn get_operations(
433 &self,
434 op_count: Location,
435 start_loc: Location,
436 max_ops: std::num::NonZeroU64,
437 include_pinned_nodes: bool,
438 _cancel_rx: oneshot::Receiver<()>,
439 ) -> Result<crate::qmdb::sync::FetchResult<Self::Op, Self::Digest>, qmdb::Error<Family>> {
440 let guard = self.read().await;
441 let db = guard.as_ref().ok_or(qmdb::Error::<Family>::KeyNotFound)?;
442 let (proof, operations) = db.any
443 .historical_proof(op_count, start_loc, max_ops)
444 .await?;
445 let pinned_nodes = if include_pinned_nodes {
446 Some(db.any.pinned_nodes_at(start_loc).await?)
447 } else {
448 None
449 };
450 Ok(crate::qmdb::sync::FetchResult {
451 proof,
452 operations,
453 success_tx: oneshot::channel().0,
454 pinned_nodes,
455 })
456 }
457 }
458 };
459}
460
461impl_current_resolver!(CurrentUnorderedFixedDb, UnorderedFixedOp, FixedValue, Array);
463
464impl_current_resolver!(
466 CurrentUnorderedVariableDb, UnorderedVariableOp, VariableValue, Key;
467 UnorderedVariableOp<Family, K, V>: CodecShared,
468);
469
470impl_current_resolver!(CurrentOrderedFixedDb, OrderedFixedOp, FixedValue, Array);
472
473impl_current_resolver!(
475 CurrentOrderedVariableDb, OrderedVariableOp, VariableValue, Key;
476 OrderedVariableOp<Family, K, V>: CodecShared,
477);