1use crate::{
29 index::Factory as IndexFactory,
30 journal::{
31 authenticated,
32 contiguous::{fixed, variable, Mutable, Reader as _},
33 },
34 merkle::{
35 full::{self, Merkle},
36 Graftable, Location,
37 },
38 qmdb::{
39 self,
40 any::{
41 db::{Db as AnyDb, Metrics as AnyMetrics},
42 operation::{update::Update, Operation},
43 ordered::{
44 fixed::{Operation as OrderedFixedOp, Update as OrderedFixedUpdate},
45 variable::{Operation as OrderedVariableOp, Update as OrderedVariableUpdate},
46 },
47 unordered::{
48 fixed::{Operation as UnorderedFixedOp, Update as UnorderedFixedUpdate},
49 variable::{Operation as UnorderedVariableOp, Update as UnorderedVariableUpdate},
50 },
51 FixedValue, VariableValue,
52 },
53 bitmap::Shared,
54 current::{
55 db, grafting,
56 ordered::{
57 fixed::Db as CurrentOrderedFixedDb, variable::Db as CurrentOrderedVariableDb,
58 },
59 unordered::{
60 fixed::Db as CurrentUnorderedFixedDb, variable::Db as CurrentUnorderedVariableDb,
61 },
62 FixedConfig, VariableConfig,
63 },
64 operation::{Committable, Key, Operation as _},
65 sync::{resolver::fetch_operations, Database, DatabaseConfig as Config},
66 },
67 translator::Translator,
68 Context, Persistable,
69};
70use commonware_codec::{Codec, CodecShared, Read as CodecRead};
71use commonware_cryptography::{DigestOf, Hasher};
72use commonware_parallel::Strategy;
73use commonware_utils::{
74 bitmap::Prunable as BitMap, channel::oneshot, range::NonEmptyRange, sync::AsyncMutex, Array,
75};
76use std::sync::Arc;
77
78#[cfg(test)]
79pub(crate) mod tests;
80
81impl<T: Translator, J: Clone, S: Strategy> Config for super::Config<T, J, S> {
82 type JournalConfig = J;
83
84 fn journal_config(&self) -> Self::JournalConfig {
85 self.journal_config.clone()
86 }
87}
88
89#[allow(clippy::too_many_arguments)]
97async fn build_db<F, E, U, I, H, J, T, const N: usize, S>(
98 context: E,
99 merkle_config: full::Config<S>,
100 log: J,
101 translator: T,
102 pinned_nodes: Option<Vec<H::Digest>>,
103 range: NonEmptyRange<Location<F>>,
104 apply_batch_size: usize,
105 metadata_partition: String,
106 strategy: S,
107) -> Result<db::Db<F, E, J, I, H, U, N, S>, qmdb::Error<F>>
108where
109 F: Graftable,
110 E: Context,
111 U: Update + Send + Sync + 'static,
112 I: IndexFactory<T, Value = Location<F>>,
113 H: Hasher,
114 T: Translator,
115 J: Mutable<Item = Operation<F, U>> + Persistable<Error = crate::journal::Error>,
116 S: Strategy,
117 Operation<F, U>: Codec + Committable + CodecShared,
118{
119 let hasher = qmdb::hasher::<H>();
121 let merkle = Merkle::<F, _, _, S>::init_sync(
122 context.child("merkle"),
123 full::SyncConfig {
124 config: merkle_config,
125 range: range.clone(),
126 pinned_nodes,
127 },
128 )
129 .await?;
130 let index = I::new(context.child("index"), translator);
131 let log = authenticated::Journal::<F, _, _, _, S>::from_components(
132 merkle,
133 log,
134 hasher,
135 apply_batch_size as u64,
136 )
137 .await?;
138
139 let pruned_chunks = (*range.start() / BitMap::<N>::CHUNK_SIZE_BITS) as usize;
146 let bitmap = BitMap::<N>::new_with_pruned_chunks(pruned_chunks)
147 .map_err(|_| qmdb::Error::<F>::DataCorrupted("pruned chunks overflow"))?;
148 let bitmap = Arc::new(Shared::<N>::new(bitmap));
149
150 let any_metrics = AnyMetrics::new(context.child("any"));
153 let any: AnyDb<F, E, J, I, H, U, N, S> =
154 AnyDb::init_from_log(index, log, Some(bitmap), any_metrics).await?;
155
156 let grafted_pinned_nodes = {
164 let grafted_boundary = Location::<F>::new(pruned_chunks as u64);
165 let grafting_height = grafting::height::<N>();
166 let mut pins = Vec::new();
167 for grafted_pos in F::nodes_to_pin(grafted_boundary) {
168 let ops_pos = grafting::grafted_to_ops_pos::<F>(grafted_pos, grafting_height);
169 let digest = any
170 .log
171 .merkle
172 .get_node(ops_pos)
173 .await?
174 .ok_or(qmdb::Error::<F>::DataCorrupted("missing ops pinned node"))?;
175 pins.push(digest);
176 }
177 pins
178 };
179
180 let hasher = qmdb::hasher::<H>();
182 let ops_size = any.log.merkle.size();
183 let ops_leaves = Location::<F>::try_from(ops_size)?;
184 let grafted_tree = db::build_grafted_tree::<F, H, S, N>(
185 &hasher,
186 any.bitmap.as_ref(),
187 &grafted_pinned_nodes,
188 &any.log.merkle,
189 ops_leaves,
190 &strategy,
191 )
192 .await?;
193
194 let storage = grafting::Storage::new(
198 &grafted_tree,
199 grafting::height::<N>(),
200 &any.log.merkle,
201 hasher.clone(),
202 );
203 let partial = db::partial_chunk(any.bitmap.as_ref());
204 let grafted_root = db::compute_grafted_root(
205 &hasher,
206 any.bitmap.as_ref(),
207 &storage,
208 ops_leaves,
209 any.inactivity_floor_loc,
210 )
211 .await?;
212 let ops_root = any.root();
213 let partial_digest = partial.map(|(chunk, next_bit)| {
214 let digest = hasher.digest(&chunk);
215 (next_bit, digest)
216 });
217 let pending_digest =
218 db::pending_chunk::<F, _, N>(any.bitmap.as_ref(), ops_leaves, grafting::height::<N>())?
219 .map(|chunk| hasher.digest(&chunk));
220 let root = db::combine_roots(
221 &hasher,
222 &ops_root,
223 &grafted_root,
224 pending_digest.as_ref(),
225 partial_digest.as_ref().map(|(nb, d)| (*nb, d)),
226 );
227
228 let (metadata, _, _) =
230 db::init_metadata::<F, E, DigestOf<H>>(context.child("metadata"), &metadata_partition)
231 .await?;
232
233 let metrics = db::Metrics::new(context);
234 let current_db = db::Db {
235 any,
236 grafted_tree,
237 metadata: AsyncMutex::new(metadata),
238 strategy,
239 root,
240 metrics,
241 };
242 current_db.update_metrics();
243
244 current_db.sync_metadata().await?;
246
247 Ok(current_db)
248}
249
250macro_rules! impl_current_sync_database {
253 ($db:ident, $op:ident, $update:ident,
254 $journal:ty, $config:ty,
255 $key_bound:path, $value_bound:ident
256 $(; $($where_extra:tt)+)?) => {
257 impl<F, E, K, V, H, T, const N: usize, S> Database for $db<F, E, K, V, H, T, N, S>
258 where
259 F: Graftable,
260 E: Context,
261 K: $key_bound,
262 V: $value_bound + 'static,
263 H: Hasher,
264 T: Translator,
265 S: Strategy,
266 $($($where_extra)+)?
267 {
268 type Family = F;
269 type Context = E;
270 type Op = $op<F, K, V>;
271 type Journal = $journal;
272 type Hasher = H;
273 type Config = $config;
274 type Digest = H::Digest;
275
276 async fn from_sync_result(
277 context: Self::Context,
278 config: Self::Config,
279 log: Self::Journal,
280 pinned_nodes: Option<Vec<Self::Digest>>,
281 range: NonEmptyRange<Location<F>>,
282 apply_batch_size: usize,
283 ) -> Result<Self, qmdb::Error<F>> {
284 let merkle_config = config.merkle_config.clone();
285 let metadata_partition = config.grafted_metadata_partition.clone();
286 let strategy = config.merkle_config.strategy.clone();
287 let translator = config.translator.clone();
288 build_db::<F, _, $update<K, V>, _, H, _, T, N, _>(
289 context,
290 merkle_config,
291 log,
292 translator,
293 pinned_nodes,
294 range,
295 apply_batch_size,
296 metadata_partition,
297 strategy,
298 )
299 .await
300 }
301
302 async fn local_boundary_nodes(
303 context: Self::Context,
304 config: &Self::Config,
305 target: &qmdb::sync::Target<Self::Family, Self::Digest>,
306 journal: &Self::Journal,
307 ) -> Result<Option<Vec<Self::Digest>>, qmdb::Error<F>> {
308 if target.range.start() == Location::new(0) {
309 return Ok(None);
310 }
311
312 let reader = journal.reader().await;
313 let bounds = reader.bounds();
314 if Location::new(bounds.start) > target.range.start()
315 || Location::new(bounds.end) != target.range.end()
316 {
317 return Ok(None);
318 }
319
320 let inactivity_floor = qmdb::find_inactivity_floor_at::<F, _>(
321 &reader,
322 target.range.end(),
323 |op| op.has_floor(),
324 )
325 .await?;
326 drop(reader);
327
328 let hasher = qmdb::hasher::<H>();
329 let merkle = Merkle::<F, _, _, S>::init(
330 context.child("local_boundary_merkle"),
331 &hasher,
332 config.merkle_config.clone(),
333 )
334 .await?;
335 let bounds = merkle.bounds();
336 if bounds.start > target.range.start() || bounds.end != target.range.end() {
337 return Ok(None);
338 }
339
340 let inactive_peaks = F::inactive_peaks(
341 F::location_to_position(target.range.end()),
342 inactivity_floor,
343 );
344 if merkle.root(&hasher, inactive_peaks)? != target.root {
345 return Ok(None);
346 }
347
348 merkle
349 .pinned_nodes_at(target.range.start())
350 .await
351 .map(Some)
352 .map_err(Into::into)
353 }
354
355 fn root(&self) -> Self::Digest {
358 self.any.root()
359 }
360 }
361 };
362}
363
364impl_current_sync_database!(
365 CurrentUnorderedFixedDb, UnorderedFixedOp, UnorderedFixedUpdate,
366 fixed::Journal<E, Self::Op>, FixedConfig<T, S>,
367 Array, FixedValue
368);
369
370impl_current_sync_database!(
371 CurrentUnorderedVariableDb, UnorderedVariableOp, UnorderedVariableUpdate,
372 variable::Journal<E, Self::Op>,
373 VariableConfig<T, <UnorderedVariableOp<F, K, V> as CodecRead>::Cfg, S>,
374 Key, VariableValue;
375 UnorderedVariableOp<F, K, V>: CodecShared
376);
377
378impl_current_sync_database!(
379 CurrentOrderedFixedDb, OrderedFixedOp, OrderedFixedUpdate,
380 fixed::Journal<E, Self::Op>, FixedConfig<T, S>,
381 Array, FixedValue
382);
383
384impl_current_sync_database!(
385 CurrentOrderedVariableDb, OrderedVariableOp, OrderedVariableUpdate,
386 variable::Journal<E, Self::Op>,
387 VariableConfig<T, <OrderedVariableOp<F, K, V> as CodecRead>::Cfg, S>,
388 Key, VariableValue;
389 OrderedVariableOp<F, K, V>: CodecShared
390);
391
392macro_rules! impl_current_resolver {
398 ($db:ident, $op:ident, $val_bound:ident, $key_bound:path $(; $($where_extra:tt)+)?) => {
399 impl<F, E, K, V, H, T, const N: usize, S> crate::qmdb::sync::Resolver
400 for std::sync::Arc<$db<F, E, K, V, H, T, N, S>>
401 where
402 F: Graftable,
403 E: Context,
404 K: $key_bound,
405 V: $val_bound + Send + Sync + 'static,
406 H: Hasher,
407 T: Translator + Send + Sync + 'static,
408 T::Key: Send + Sync,
409 S: Strategy,
410 $($($where_extra)+)?
411 {
412 type Family = F;
413 type Digest = H::Digest;
414 type Op = $op<F, K, V>;
415 type Error = qmdb::Error<F>;
416
417 async fn get_operations(
418 &self,
419 op_count: Location<F>,
420 start_loc: Location<F>,
421 max_ops: std::num::NonZeroU64,
422 include_pinned_nodes: bool,
423 _cancel_rx: oneshot::Receiver<()>,
424 ) -> Result<crate::qmdb::sync::FetchResult<F, Self::Op, Self::Digest>, Self::Error> {
425 fetch_operations(
426 op_count,
427 start_loc,
428 max_ops,
429 include_pinned_nodes,
430 |op_count, start_loc, max_ops| {
431 self.any.historical_proof(op_count, start_loc, max_ops)
432 },
433 |start_loc| self.any.pinned_nodes_at(start_loc),
434 )
435 .await
436 }
437 }
438
439 impl<F, E, K, V, H, T, const N: usize, S> crate::qmdb::sync::Resolver
440 for std::sync::Arc<
441 commonware_utils::sync::AsyncRwLock<
442 $db<F, E, K, V, H, T, N, S>,
443 >,
444 >
445 where
446 F: Graftable,
447 E: Context,
448 K: $key_bound,
449 V: $val_bound + Send + Sync + 'static,
450 H: Hasher,
451 T: Translator + Send + Sync + 'static,
452 T::Key: Send + Sync,
453 S: Strategy,
454 $($($where_extra)+)?
455 {
456 type Family = F;
457 type Digest = H::Digest;
458 type Op = $op<F, K, V>;
459 type Error = qmdb::Error<F>;
460
461 async fn get_operations(
462 &self,
463 op_count: Location<F>,
464 start_loc: Location<F>,
465 max_ops: std::num::NonZeroU64,
466 include_pinned_nodes: bool,
467 _cancel_rx: oneshot::Receiver<()>,
468 ) -> Result<crate::qmdb::sync::FetchResult<F, Self::Op, Self::Digest>, qmdb::Error<F>> {
469 let db = self.read().await;
470 fetch_operations(
471 op_count,
472 start_loc,
473 max_ops,
474 include_pinned_nodes,
475 |op_count, start_loc, max_ops| {
476 db.any.historical_proof(op_count, start_loc, max_ops)
477 },
478 |start_loc| db.any.pinned_nodes_at(start_loc),
479 )
480 .await
481 }
482 }
483
484 impl<F, E, K, V, H, T, const N: usize, S> crate::qmdb::sync::Resolver
485 for std::sync::Arc<
486 commonware_utils::sync::AsyncRwLock<
487 Option<$db<F, E, K, V, H, T, N, S>>,
488 >,
489 >
490 where
491 F: Graftable,
492 E: Context,
493 K: $key_bound,
494 V: $val_bound + Send + Sync + 'static,
495 H: Hasher,
496 T: Translator + Send + Sync + 'static,
497 T::Key: Send + Sync,
498 S: Strategy,
499 $($($where_extra)+)?
500 {
501 type Family = F;
502 type Digest = H::Digest;
503 type Op = $op<F, K, V>;
504 type Error = qmdb::Error<F>;
505
506 async fn get_operations(
507 &self,
508 op_count: Location<F>,
509 start_loc: Location<F>,
510 max_ops: std::num::NonZeroU64,
511 include_pinned_nodes: bool,
512 _cancel_rx: oneshot::Receiver<()>,
513 ) -> Result<crate::qmdb::sync::FetchResult<F, Self::Op, Self::Digest>, qmdb::Error<F>> {
514 let guard = self.read().await;
515 let db = guard.as_ref().ok_or(qmdb::Error::<F>::KeyNotFound)?;
516 fetch_operations(
517 op_count,
518 start_loc,
519 max_ops,
520 include_pinned_nodes,
521 |op_count, start_loc, max_ops| {
522 db.any.historical_proof(op_count, start_loc, max_ops)
523 },
524 |start_loc| db.any.pinned_nodes_at(start_loc),
525 )
526 .await
527 }
528 }
529 };
530}
531
532impl_current_resolver!(CurrentUnorderedFixedDb, UnorderedFixedOp, FixedValue, Array);
534
535impl_current_resolver!(
537 CurrentUnorderedVariableDb, UnorderedVariableOp, VariableValue, Key;
538 UnorderedVariableOp<F, K, V>: CodecShared,
539);
540
541impl_current_resolver!(CurrentOrderedFixedDb, OrderedFixedOp, FixedValue, Array);
543
544impl_current_resolver!(
546 CurrentOrderedVariableDb, OrderedVariableOp, VariableValue, Key;
547 OrderedVariableOp<F, K, V>: CodecShared,
548);