1use crate::{
28 index::{ordered, unordered},
29 journal::{
30 authenticated,
31 contiguous::{fixed, variable, Mutable},
32 },
33 mmr::{
34 self, hasher::Hasher as _, journaled::Config as MmrConfig, Location, Position,
35 StandardHasher,
36 },
37 qmdb::{
38 self,
39 any::{
40 db::Db as AnyDb,
41 operation::{update::Update, Operation},
42 ordered::{
43 fixed::{Operation as OrderedFixedOp, Update as OrderedFixedUpdate},
44 variable::{Operation as OrderedVariableOp, Update as OrderedVariableUpdate},
45 },
46 unordered::{
47 fixed::{Operation as UnorderedFixedOp, Update as UnorderedFixedUpdate},
48 variable::{Operation as UnorderedVariableOp, Update as UnorderedVariableUpdate},
49 },
50 FixedConfig as AnyFixedConfig, FixedValue, ValueEncoding,
51 VariableConfig as AnyVariableConfig, VariableValue,
52 },
53 current::{
54 db, grafting,
55 ordered::{
56 fixed::Db as CurrentOrderedFixedDb, variable::Db as CurrentOrderedVariableDb,
57 },
58 unordered::{
59 fixed::Db as CurrentUnorderedFixedDb, variable::Db as CurrentUnorderedVariableDb,
60 },
61 FixedConfig, VariableConfig,
62 },
63 operation::{Committable, Key},
64 sync::{Database, DatabaseConfig as Config},
65 },
66 translator::Translator,
67 Persistable,
68};
69use commonware_codec::{Codec, CodecShared, Read as CodecRead};
70use commonware_cryptography::{DigestOf, Hasher};
71use commonware_runtime::{Clock, Metrics, Storage};
72use commonware_utils::{bitmap::Prunable as BitMap, sync::AsyncMutex, Array};
73use std::ops::Range;
74
75#[cfg(test)]
76pub(crate) mod tests;
77
78impl<T: Translator> Config for FixedConfig<T> {
79 type JournalConfig = fixed::Config;
80
81 fn journal_config(&self) -> Self::JournalConfig {
82 let any_config: AnyFixedConfig<T> = self.clone().into();
83 <AnyFixedConfig<T> as Config>::journal_config(&any_config)
84 }
85}
86
87impl<T: Translator, C: Clone> Config for VariableConfig<T, C> {
88 type JournalConfig = variable::Config<C>;
89
90 fn journal_config(&self) -> Self::JournalConfig {
91 let any_config: AnyVariableConfig<T, C> = self.clone().into();
92 <AnyVariableConfig<T, C> as Config>::journal_config(&any_config)
93 }
94}
95
96fn mmr_config_from_fixed<T: Translator>(config: &FixedConfig<T>) -> MmrConfig {
98 MmrConfig {
99 journal_partition: config.mmr_journal_partition.clone(),
100 metadata_partition: config.mmr_metadata_partition.clone(),
101 items_per_blob: config.mmr_items_per_blob,
102 write_buffer: config.mmr_write_buffer,
103 thread_pool: config.thread_pool.clone(),
104 page_cache: config.page_cache.clone(),
105 }
106}
107
108fn mmr_config_from_variable<T: Translator, C>(config: &VariableConfig<T, C>) -> MmrConfig {
110 MmrConfig {
111 journal_partition: config.mmr_journal_partition.clone(),
112 metadata_partition: config.mmr_metadata_partition.clone(),
113 items_per_blob: config.mmr_items_per_blob,
114 write_buffer: config.mmr_write_buffer,
115 thread_pool: config.thread_pool.clone(),
116 page_cache: config.page_cache.clone(),
117 }
118}
119
120#[allow(clippy::too_many_arguments)]
128async fn build_db<E, K, V, U, I, H, J, const N: usize>(
129 context: E,
130 mmr_config: MmrConfig,
131 log: J,
132 index: I,
133 pinned_nodes: Option<Vec<H::Digest>>,
134 range: Range<Location>,
135 apply_batch_size: usize,
136 metadata_partition: String,
137 thread_pool: Option<commonware_parallel::ThreadPool>,
138) -> Result<db::Db<E, J, I, H, U, N>, qmdb::Error>
139where
140 E: Storage + Clock + Metrics,
141 K: Key,
142 V: ValueEncoding,
143 U: Update<K, V> + Send + Sync + 'static,
144 I: crate::index::Unordered<Value = Location>,
145 H: Hasher,
146 J: Mutable<Item = Operation<K, V, U>> + Persistable<Error = crate::journal::Error>,
147 Operation<K, V, U>: Codec + Committable + CodecShared,
148{
149 let mut hasher = StandardHasher::<H>::new();
151 let mmr = mmr::journaled::Mmr::init_sync(
152 context.with_label("mmr"),
153 mmr::journaled::SyncConfig {
154 config: mmr_config,
155 range: range.clone(),
156 pinned_nodes,
157 },
158 &mut hasher,
159 )
160 .await?;
161 let log =
162 authenticated::Journal::from_components(mmr, log, hasher, apply_batch_size as u64).await?;
163
164 let pruned_chunks = (*range.start / BitMap::<N>::CHUNK_SIZE_BITS) as usize;
171 let mut status = BitMap::<N>::new_with_pruned_chunks(pruned_chunks)
172 .map_err(|_| qmdb::Error::DataCorrupted("pruned chunks overflow"))?;
173
174 let known_inactivity_floor = Location::new(status.len());
179 let any: AnyDb<E, J, I, H, U> = AnyDb::init_from_log(
180 index,
181 log,
182 Some(known_inactivity_floor),
183 |is_active: bool, old_loc: Option<Location>| {
184 status.push(is_active);
185 if let Some(loc) = old_loc {
186 status.set_bit(*loc, false);
187 }
188 },
189 )
190 .await?;
191
192 let grafted_pinned_nodes = {
203 let ops_pin_positions = mmr::iterator::nodes_to_pin(Position::try_from(range.start)?);
204 let num_grafted_pins = (pruned_chunks as u64).count_ones() as usize;
205 let mut pins = Vec::with_capacity(num_grafted_pins);
206 for pos in ops_pin_positions.take(num_grafted_pins) {
207 let digest = any
208 .log
209 .mmr
210 .get_node(pos)
211 .await?
212 .ok_or(qmdb::Error::DataCorrupted("missing ops pinned node"))?;
213 pins.push(digest);
214 }
215 pins
216 };
217
218 let mut hasher = StandardHasher::<H>::new();
220 let grafted_mmr = db::build_grafted_mmr::<H, N>(
221 &mut hasher,
222 &status,
223 &grafted_pinned_nodes,
224 &any.log.mmr,
225 thread_pool.as_ref(),
226 )
227 .await?;
228
229 let storage = grafting::Storage::new(&grafted_mmr, grafting::height::<N>(), &any.log.mmr);
233 let partial = db::partial_chunk(&status);
234 let grafted_mmr_root = db::compute_grafted_mmr_root(&mut hasher, &storage).await?;
235 let ops_root = any.log.root();
236 let partial_digest = partial.map(|(chunk, next_bit)| {
237 let digest = hasher.digest(&chunk);
238 (next_bit, digest)
239 });
240 let root = db::combine_roots(
241 &mut hasher,
242 &ops_root,
243 &grafted_mmr_root,
244 partial_digest.as_ref().map(|(nb, d)| (*nb, d)),
245 );
246
247 let (metadata, _, _) =
249 db::init_metadata::<E, DigestOf<H>>(context.with_label("metadata"), &metadata_partition)
250 .await?;
251
252 let current_db = db::Db {
253 any,
254 status,
255 grafted_mmr,
256 metadata: AsyncMutex::new(metadata),
257 thread_pool,
258 root,
259 };
260
261 current_db.sync_metadata().await?;
263
264 Ok(current_db)
265}
266
267impl<E, K, V, H, T, const N: usize> Database for CurrentUnorderedFixedDb<E, K, V, H, T, N>
270where
271 E: Storage + Clock + Metrics,
272 K: Array,
273 V: FixedValue + 'static,
274 H: Hasher,
275 T: Translator,
276{
277 type Context = E;
278 type Op = UnorderedFixedOp<K, V>;
279 type Journal = fixed::Journal<E, Self::Op>;
280 type Hasher = H;
281 type Config = FixedConfig<T>;
282 type Digest = H::Digest;
283
284 async fn from_sync_result(
285 context: Self::Context,
286 config: Self::Config,
287 log: Self::Journal,
288 pinned_nodes: Option<Vec<Self::Digest>>,
289 range: Range<Location>,
290 apply_batch_size: usize,
291 ) -> Result<Self, qmdb::Error> {
292 let mmr_config = mmr_config_from_fixed(&config);
293 let metadata_partition = config.grafted_mmr_metadata_partition.clone();
294 let thread_pool = config.thread_pool.clone();
295 let index = unordered::Index::new(context.with_label("index"), config.translator.clone());
296 build_db::<_, K, _, UnorderedFixedUpdate<K, V>, _, H, _, N>(
297 context,
298 mmr_config,
299 log,
300 index,
301 pinned_nodes,
302 range,
303 apply_batch_size,
304 metadata_partition,
305 thread_pool,
306 )
307 .await
308 }
309
310 fn root(&self) -> Self::Digest {
313 self.any.log.root()
314 }
315}
316
317impl<E, K, V, H, T, const N: usize> Database for CurrentUnorderedVariableDb<E, K, V, H, T, N>
318where
319 E: Storage + Clock + Metrics,
320 K: Key,
321 V: VariableValue + 'static,
322 H: Hasher,
323 T: Translator,
324 UnorderedVariableOp<K, V>: CodecShared,
325{
326 type Context = E;
327 type Op = UnorderedVariableOp<K, V>;
328 type Journal = variable::Journal<E, Self::Op>;
329 type Hasher = H;
330 type Config = VariableConfig<T, <UnorderedVariableOp<K, V> as CodecRead>::Cfg>;
331 type Digest = H::Digest;
332
333 async fn from_sync_result(
334 context: Self::Context,
335 config: Self::Config,
336 log: Self::Journal,
337 pinned_nodes: Option<Vec<Self::Digest>>,
338 range: Range<Location>,
339 apply_batch_size: usize,
340 ) -> Result<Self, qmdb::Error> {
341 let mmr_config = mmr_config_from_variable(&config);
342 let metadata_partition = config.grafted_mmr_metadata_partition.clone();
343 let thread_pool = config.thread_pool.clone();
344 let index = unordered::Index::new(context.with_label("index"), config.translator.clone());
345 build_db::<_, K, _, UnorderedVariableUpdate<K, V>, _, H, _, N>(
346 context,
347 mmr_config,
348 log,
349 index,
350 pinned_nodes,
351 range,
352 apply_batch_size,
353 metadata_partition,
354 thread_pool,
355 )
356 .await
357 }
358
359 fn root(&self) -> Self::Digest {
362 self.any.log.root()
363 }
364}
365
366impl<E, K, V, H, T, const N: usize> Database for CurrentOrderedFixedDb<E, K, V, H, T, N>
367where
368 E: Storage + Clock + Metrics,
369 K: Array,
370 V: FixedValue + 'static,
371 H: Hasher,
372 T: Translator,
373{
374 type Context = E;
375 type Op = OrderedFixedOp<K, V>;
376 type Journal = fixed::Journal<E, Self::Op>;
377 type Hasher = H;
378 type Config = FixedConfig<T>;
379 type Digest = H::Digest;
380
381 async fn from_sync_result(
382 context: Self::Context,
383 config: Self::Config,
384 log: Self::Journal,
385 pinned_nodes: Option<Vec<Self::Digest>>,
386 range: Range<Location>,
387 apply_batch_size: usize,
388 ) -> Result<Self, qmdb::Error> {
389 let mmr_config = mmr_config_from_fixed(&config);
390 let metadata_partition = config.grafted_mmr_metadata_partition.clone();
391 let thread_pool = config.thread_pool.clone();
392 let index = ordered::Index::new(context.with_label("index"), config.translator.clone());
393 build_db::<_, K, _, OrderedFixedUpdate<K, V>, _, H, _, N>(
394 context,
395 mmr_config,
396 log,
397 index,
398 pinned_nodes,
399 range,
400 apply_batch_size,
401 metadata_partition,
402 thread_pool,
403 )
404 .await
405 }
406
407 fn root(&self) -> Self::Digest {
410 self.any.log.root()
411 }
412}
413
414impl<E, K, V, H, T, const N: usize> Database for CurrentOrderedVariableDb<E, K, V, H, T, N>
415where
416 E: Storage + Clock + Metrics,
417 K: Key,
418 V: VariableValue + 'static,
419 H: Hasher,
420 T: Translator,
421 OrderedVariableOp<K, V>: CodecShared,
422{
423 type Context = E;
424 type Op = OrderedVariableOp<K, V>;
425 type Journal = variable::Journal<E, Self::Op>;
426 type Hasher = H;
427 type Config = VariableConfig<T, <OrderedVariableOp<K, V> as CodecRead>::Cfg>;
428 type Digest = H::Digest;
429
430 async fn from_sync_result(
431 context: Self::Context,
432 config: Self::Config,
433 log: Self::Journal,
434 pinned_nodes: Option<Vec<Self::Digest>>,
435 range: Range<Location>,
436 apply_batch_size: usize,
437 ) -> Result<Self, qmdb::Error> {
438 let mmr_config = mmr_config_from_variable(&config);
439 let metadata_partition = config.grafted_mmr_metadata_partition.clone();
440 let thread_pool = config.thread_pool.clone();
441 let index = ordered::Index::new(context.with_label("index"), config.translator.clone());
442 build_db::<_, K, _, OrderedVariableUpdate<K, V>, _, H, _, N>(
443 context,
444 mmr_config,
445 log,
446 index,
447 pinned_nodes,
448 range,
449 apply_batch_size,
450 metadata_partition,
451 thread_pool,
452 )
453 .await
454 }
455
456 fn root(&self) -> Self::Digest {
459 self.any.log.root()
460 }
461}
462
463macro_rules! impl_current_resolver {
469 ($db:ident, $op:ident, $val_bound:ident, $key_bound:path $(; $($where_extra:tt)+)?) => {
470 impl<E, K, V, H, T, const N: usize> crate::qmdb::sync::Resolver
471 for std::sync::Arc<$db<E, K, V, H, T, N>>
472 where
473 E: Storage + Clock + Metrics,
474 K: $key_bound,
475 V: $val_bound + Send + Sync + 'static,
476 H: Hasher,
477 T: Translator + Send + Sync + 'static,
478 T::Key: Send + Sync,
479 $($($where_extra)+)?
480 {
481 type Digest = H::Digest;
482 type Op = $op<K, V>;
483 type Error = qmdb::Error;
484
485 async fn get_operations(
486 &self,
487 op_count: Location,
488 start_loc: Location,
489 max_ops: std::num::NonZeroU64,
490 ) -> Result<crate::qmdb::sync::FetchResult<Self::Op, Self::Digest>, Self::Error> {
491 self.any
492 .historical_proof(op_count, start_loc, max_ops)
493 .await
494 .map(|(proof, operations)| crate::qmdb::sync::FetchResult {
495 proof,
496 operations,
497 success_tx: commonware_utils::channel::oneshot::channel().0,
498 })
499 }
500 }
501
502 impl<E, K, V, H, T, const N: usize> crate::qmdb::sync::Resolver
503 for std::sync::Arc<
504 commonware_utils::sync::AsyncRwLock<
505 $db<E, K, V, H, T, N>,
506 >,
507 >
508 where
509 E: Storage + Clock + Metrics,
510 K: $key_bound,
511 V: $val_bound + Send + Sync + 'static,
512 H: Hasher,
513 T: Translator + Send + Sync + 'static,
514 T::Key: Send + Sync,
515 $($($where_extra)+)?
516 {
517 type Digest = H::Digest;
518 type Op = $op<K, V>;
519 type Error = qmdb::Error;
520
521 async fn get_operations(
522 &self,
523 op_count: Location,
524 start_loc: Location,
525 max_ops: std::num::NonZeroU64,
526 ) -> Result<crate::qmdb::sync::FetchResult<Self::Op, Self::Digest>, qmdb::Error> {
527 let db = self.read().await;
528 db.any
529 .historical_proof(op_count, start_loc, max_ops)
530 .await
531 .map(|(proof, operations)| crate::qmdb::sync::FetchResult {
532 proof,
533 operations,
534 success_tx: commonware_utils::channel::oneshot::channel().0,
535 })
536 }
537 }
538
539 impl<E, K, V, H, T, const N: usize> crate::qmdb::sync::Resolver
540 for std::sync::Arc<
541 commonware_utils::sync::AsyncRwLock<
542 Option<$db<E, K, V, H, T, N>>,
543 >,
544 >
545 where
546 E: Storage + Clock + Metrics,
547 K: $key_bound,
548 V: $val_bound + Send + Sync + 'static,
549 H: Hasher,
550 T: Translator + Send + Sync + 'static,
551 T::Key: Send + Sync,
552 $($($where_extra)+)?
553 {
554 type Digest = H::Digest;
555 type Op = $op<K, V>;
556 type Error = qmdb::Error;
557
558 async fn get_operations(
559 &self,
560 op_count: Location,
561 start_loc: Location,
562 max_ops: std::num::NonZeroU64,
563 ) -> Result<crate::qmdb::sync::FetchResult<Self::Op, Self::Digest>, qmdb::Error> {
564 let guard = self.read().await;
565 let db = guard.as_ref().ok_or(qmdb::Error::KeyNotFound)?;
566 db.any
567 .historical_proof(op_count, start_loc, max_ops)
568 .await
569 .map(|(proof, operations)| crate::qmdb::sync::FetchResult {
570 proof,
571 operations,
572 success_tx: commonware_utils::channel::oneshot::channel().0,
573 })
574 }
575 }
576 };
577}
578
579impl_current_resolver!(CurrentUnorderedFixedDb, UnorderedFixedOp, FixedValue, Array);
581
582impl_current_resolver!(
584 CurrentUnorderedVariableDb, UnorderedVariableOp, VariableValue, Key;
585 UnorderedVariableOp<K, V>: CodecShared,
586);
587
588impl_current_resolver!(CurrentOrderedFixedDb, OrderedFixedOp, FixedValue, Array);
590
591impl_current_resolver!(
593 CurrentOrderedVariableDb, OrderedVariableOp, VariableValue, Key;
594 OrderedVariableOp<K, V>: CodecShared,
595);