1use crate::{
6 index::Unordered as UnorderedIndex,
7 journal::{
8 contiguous::{Contiguous, Mutable, Reader},
9 Error as JournalError,
10 },
11 merkle::{
12 self,
13 hasher::{Hasher as MerkleHasher, Standard as StandardHasher},
14 mem::Mem,
15 storage::Storage as MerkleStorage,
16 Graftable, Location, Position,
17 },
18 metadata::{Config as MConfig, Metadata},
19 qmdb::{
20 self,
21 any::{
22 self,
23 operation::{update::Update, Operation},
24 },
25 current::{
26 batch::BitmapBatch,
27 grafting,
28 proof::{OperationProof, OpsRootWitness, RangeProof, RangeProofSpec},
29 },
30 operation::Operation as _,
31 Error,
32 },
33 Context, Persistable,
34};
35use commonware_codec::{Codec, CodecShared, DecodeExt};
36use commonware_cryptography::{Digest, DigestOf, Hasher};
37use commonware_parallel::Strategy;
38use commonware_runtime::telemetry::metrics::{
39 histogram::{duration_histogram, ScopedTimer, Timed},
40 Counter, Gauge, GaugeExt as _, MetricsExt as _,
41};
42use commonware_utils::{
43 bitmap::{self, Readable as _},
44 sequence::prefixed_u64::U64,
45 sync::AsyncMutex,
46};
47use core::{num::NonZeroU64, ops::Range};
48use futures::future::try_join_all;
49use std::{collections::BTreeMap, sync::Arc};
50use tracing::{error, warn};
51
52const NODE_PREFIX: u8 = 0;
54
55const PRUNED_CHUNKS_PREFIX: u8 = 1;
57
58pub(crate) struct Metrics<E: Context> {
60 clock: Arc<E>,
62 pruned_chunks: Gauge,
64 sync_boundary: Gauge,
66 pub apply_batch_calls: Counter,
68 apply_batch_duration: Timed,
70 pub sync_calls: Counter,
72 sync_duration: Timed,
74 pub prune_calls: Counter,
76 prune_duration: Timed,
78}
79
80impl<E: Context> Metrics<E> {
81 pub fn new(context: E) -> Self {
83 let pruned_chunks = context.gauge("pruned_chunks", "Number of pruned bitmap chunks");
84 let sync_boundary =
85 context.gauge("sync_boundary", "Most recent safe sync boundary location");
86 let apply_batch_calls = context.counter("apply_batch_calls", "Number of apply-batch calls");
87 let apply_batch_duration = duration_histogram(
88 &context,
89 "apply_batch_duration",
90 "Duration of apply-batch calls",
91 );
92 let sync_calls = context.counter("sync_calls", "Number of sync calls");
93 let sync_duration = duration_histogram(&context, "sync_duration", "Duration of sync calls");
94 let prune_calls = context.counter("prune_calls", "Number of prune calls");
95 let prune_duration =
96 duration_histogram(&context, "prune_duration", "Duration of prune calls");
97 let clock = Arc::new(context);
98
99 Self {
100 clock,
101 pruned_chunks,
102 sync_boundary,
103 apply_batch_calls,
104 apply_batch_duration: Timed::new(apply_batch_duration),
105 sync_calls,
106 sync_duration: Timed::new(sync_duration),
107 prune_calls,
108 prune_duration: Timed::new(prune_duration),
109 }
110 }
111
112 pub fn apply_batch_timer(&self) -> ScopedTimer<E> {
113 self.apply_batch_duration.scoped(&self.clock)
114 }
115
116 pub fn sync_timer(&self) -> ScopedTimer<E> {
117 self.sync_duration.scoped(&self.clock)
118 }
119
120 pub fn prune_timer(&self) -> ScopedTimer<E> {
121 self.prune_duration.scoped(&self.clock)
122 }
123
124 pub fn update(&self, pruned_chunks: u64, sync_boundary: u64) {
126 let _ = self.pruned_chunks.try_set(pruned_chunks);
127 let _ = self.sync_boundary.try_set(sync_boundary);
128 }
129}
130
131pub struct Db<
133 F: merkle::Graftable,
134 E: Context,
135 C: Contiguous<Item: CodecShared>,
136 I: UnorderedIndex<Value = Location<F>>,
137 H: Hasher,
138 U: Send + Sync,
139 const N: usize,
140 S: Strategy,
141> {
142 pub(super) any: any::db::Db<F, E, C, I, H, U, N, S>,
146
147 pub(super) grafted_tree: Mem<F, H::Digest>,
153
154 pub(super) metadata: AsyncMutex<Metadata<E, U64, Vec<u8>>>,
158
159 pub(super) strategy: S,
162
163 pub(super) root: DigestOf<H>,
166
167 pub(super) metrics: Metrics<E>,
169}
170
171impl<F, E, C, I, H, U, const N: usize, S> Db<F, E, C, I, H, U, N, S>
173where
174 F: merkle::Graftable,
175 E: Context,
176 U: Update,
177 C: Contiguous<Item = Operation<F, U>>,
178 I: UnorderedIndex<Value = Location<F>>,
179 H: Hasher,
180 S: Strategy,
181 Operation<F, U>: Codec,
182{
183 #[cfg(any(test, feature = "test-traits"))]
186 pub(crate) const fn inactivity_floor_loc(&self) -> Location<F> {
187 self.any.inactivity_floor_loc()
188 }
189
190 pub const fn is_empty(&self) -> bool {
192 self.any.is_empty()
193 }
194
195 pub async fn get_metadata(&self) -> Result<Option<U::Value>, Error<F>> {
197 self.any.get_metadata().await
198 }
199
200 pub async fn bounds(&self) -> std::ops::Range<Location<F>> {
203 self.any.bounds().await
204 }
205
206 pub fn verify_range_proof(
209 hasher: &StandardHasher<H>,
210 proof: &RangeProof<F, H::Digest>,
211 start_loc: Location<F>,
212 ops: &[Operation<F, U>],
213 chunks: &[[u8; N]],
214 root: &H::Digest,
215 ) -> bool {
216 proof.verify(hasher, start_loc, ops, chunks, root)
217 }
218}
219
220impl<F, E, U, C, I, H, const N: usize, S> Db<F, E, C, I, H, U, N, S>
222where
223 F: merkle::Graftable,
224 E: Context,
225 U: Update,
226 C: Contiguous<Item = Operation<F, U>>,
227 I: UnorderedIndex<Value = Location<F>>,
228 H: Hasher,
229 S: Strategy,
230 Operation<F, U>: Codec,
231{
232 fn grafted_storage(&self) -> impl MerkleStorage<F, Digest = H::Digest> + '_ {
236 grafting::Storage::new(
237 &self.grafted_tree,
238 grafting::height::<N>(),
239 &self.any.log.merkle,
240 qmdb::hasher::<H>(),
241 )
242 }
243
244 pub const fn root(&self) -> H::Digest {
247 self.root
248 }
249
250 pub const fn strategy(&self) -> &S {
252 &self.strategy
253 }
254
255 pub const fn ops_root(&self) -> H::Digest {
266 self.any.root()
267 }
268
269 pub async fn ops_root_witness(
273 &self,
274 hasher: &StandardHasher<H>,
275 ) -> Result<OpsRootWitness<F, H::Digest>, Error<F>> {
276 let storage = self.grafted_storage();
277 let ops_size = storage.size().await;
278 let ops_leaves = Location::<F>::try_from(ops_size)?;
279 let grafted_root = compute_grafted_root::<F, H, _, _, N>(
280 hasher,
281 self.any.bitmap.as_ref(),
282 &storage,
283 ops_leaves,
284 self.any.inactivity_floor_loc,
285 )
286 .await?;
287 let partial_chunk = partial_chunk::<_, N>(self.any.bitmap.as_ref())
288 .map(|(chunk, next_bit)| (next_bit, hasher.digest(&chunk)));
289 let pending_chunk_digest: F::PendingChunk<H::Digest> = pending_chunk::<F, _, N>(
290 self.any.bitmap.as_ref(),
291 ops_leaves,
292 grafting::height::<N>(),
293 )?
294 .map(|chunk| hasher.digest(&chunk))
295 .try_into()
296 .expect("pending_chunk must be consistent with family");
297 Ok(OpsRootWitness {
298 grafted_root,
299 pending_chunk_digest,
300 partial_chunk,
301 })
302 }
303
304 pub(super) fn grafted_snapshot(&self) -> Arc<merkle::batch::MerkleizedBatch<F, H::Digest, S>> {
306 merkle::batch::MerkleizedBatch::from_mem_with_strategy(
307 &self.grafted_tree,
308 self.strategy.clone(),
309 )
310 }
311
312 pub fn new_batch(&self) -> super::batch::UnmerkleizedBatch<F, H, U, N, S> {
314 super::batch::UnmerkleizedBatch::new(
315 self.any.new_batch(),
316 self.grafted_snapshot(),
317 BitmapBatch::Base(Arc::clone(&self.any.bitmap)),
318 )
319 }
320
321 pub(super) async fn operation_proof(
323 &self,
324 hasher: &StandardHasher<H>,
325 loc: Location<F>,
326 ) -> Result<OperationProof<F, H::Digest, N>, Error<F>> {
327 let storage = self.grafted_storage();
328 let ops_root = self.any.root();
329 OperationProof::new(
330 hasher,
331 self.any.bitmap.as_ref(),
332 &storage,
333 self.any.inactivity_floor_loc,
334 loc,
335 ops_root,
336 )
337 .await
338 }
339
340 pub async fn range_proof(
352 &self,
353 hasher: &StandardHasher<H>,
354 start_loc: Location<F>,
355 max_ops: NonZeroU64,
356 ) -> Result<(RangeProof<F, H::Digest>, Vec<Operation<F, U>>, Vec<[u8; N]>), Error<F>> {
357 let storage = self.grafted_storage();
358 let ops_root = self.any.root();
359 RangeProof::new_with_ops(
360 hasher,
361 self.any.bitmap.as_ref(),
362 &storage,
363 &self.any.log,
364 RangeProofSpec {
365 start_loc,
366 max_ops,
367 inactivity_floor: self.any.inactivity_floor_loc,
368 ops_root,
369 },
370 )
371 .await
372 }
373}
374
375impl<F, E, U, C, I, H, const N: usize, S> Db<F, E, C, I, H, U, N, S>
377where
378 F: merkle::Graftable,
379 E: Context,
380 U: Update,
381 C: Mutable<Item = Operation<F, U>>,
382 I: UnorderedIndex<Value = Location<F>>,
383 H: Hasher,
384 S: Strategy,
385 Operation<F, U>: Codec,
386{
387 pub async fn ops_historical_proof(
393 &self,
394 historical_size: Location<F>,
395 start_loc: Location<F>,
396 max_ops: NonZeroU64,
397 ) -> Result<(merkle::Proof<F, H::Digest>, Vec<Operation<F, U>>), Error<F>> {
398 self.any
399 .historical_proof(historical_size, start_loc, max_ops)
400 .await
401 }
402
403 pub async fn pinned_nodes_at(&self, loc: Location<F>) -> Result<Vec<H::Digest>, Error<F>> {
405 self.any.pinned_nodes_at(loc).await
406 }
407
408 pub fn sync_boundary(&self) -> Location<F> {
437 sync_boundary::<F, N>(
438 *self.any.inactivity_floor_loc / bitmap::Prunable::<N>::CHUNK_SIZE_BITS,
439 *self.any.last_commit_loc + 1,
440 )
441 }
442
443 pub(super) fn update_metrics(&self) {
445 self.metrics.update(
446 self.any.bitmap.pruned_chunks() as u64,
447 *self.sync_boundary(),
448 );
449 }
450
451 fn delayed_merge_rewind_floor(&self) -> Option<u64> {
462 pair_absorption_threshold::<F, N>(self.any.bitmap.pruned_chunks() as u64)
463 }
464
465 fn prune_grafted_tree_to_bitmap(&mut self) -> Result<(), Error<F>> {
467 let pruned_chunks = self.any.bitmap.pruned_chunks() as u64;
468 if pruned_chunks == 0 {
469 return Ok(());
470 }
471
472 let prune_loc = Location::<F>::new(pruned_chunks);
473 if prune_loc <= self.grafted_tree.bounds().start {
474 return Ok(());
475 }
476
477 let prune_pos = Position::try_from(prune_loc)
478 .map_err(|_| Error::<F>::DataCorrupted("prune location overflow"))?;
479 let size = self.grafted_tree.size();
480
481 let mut pinned = BTreeMap::new();
482 for pos in F::nodes_to_pin(prune_loc) {
483 let digest = self
484 .grafted_tree
485 .get_node(pos)
486 .ok_or(Error::<F>::DataCorrupted("missing grafted pinned node"))?;
487 pinned.insert(pos, digest);
488 }
489
490 let mut retained = Vec::with_capacity((*size - *prune_pos) as usize);
491 for p in *prune_pos..*size {
492 let digest = self
493 .grafted_tree
494 .get_node(Position::new(p))
495 .ok_or(Error::<F>::DataCorrupted("missing retained grafted node"))?;
496 retained.push(digest);
497 }
498
499 self.grafted_tree = Mem::from_pruned_with_retained(prune_pos, pinned, retained);
500 Ok(())
501 }
502
503 pub async fn prune(&mut self, prune_loc: Location<F>) -> Result<(), Error<F>> {
518 let _timer = self.metrics.prune_timer();
519 self.metrics.prune_calls.inc();
520 let sync_boundary = self.sync_boundary();
521 if prune_loc > sync_boundary {
522 return Err(Error::PruneBeyondMinRequired(prune_loc, sync_boundary));
523 }
524
525 self.any.prune_bitmap(sync_boundary);
527 self.prune_grafted_tree_to_bitmap()?;
528
529 self.sync_metadata().await?;
535
536 self.any.prune_log(prune_loc).await?;
537 self.any.update_metrics().await;
538 self.update_metrics();
539 Ok(())
540 }
541
542 pub async fn rewind(&mut self, size: Location<F>) -> Result<(), Error<F>> {
563 let rewind_size = *size;
564 let current_size = *self.any.last_commit_loc + 1;
565 if rewind_size == current_size {
568 return Ok(());
569 }
570 if rewind_size == 0 || rewind_size > current_size {
574 return Err(Error::Journal(JournalError::InvalidRewind(rewind_size)));
575 }
576
577 let pruned_chunks = self.any.bitmap.pruned_chunks();
578 let pruned_bits = (pruned_chunks as u64)
579 .checked_mul(bitmap::Prunable::<N>::CHUNK_SIZE_BITS)
580 .ok_or_else(|| Error::DataCorrupted("pruned ops leaves overflow"))?;
581 if rewind_size < pruned_bits {
582 return Err(Error::Journal(JournalError::ItemPruned(rewind_size - 1)));
583 }
584 if let Some(rewind_floor) = self.delayed_merge_rewind_floor() {
585 if rewind_size < rewind_floor {
586 return Err(Error::Journal(JournalError::ItemPruned(rewind_size - 1)));
587 }
588 }
589
590 {
595 let reader = self.any.log.reader().await;
596 let rewind_last_loc = Location::<F>::new(rewind_size - 1);
597 let rewind_last_op = reader.read(*rewind_last_loc).await?;
598 let Some(rewind_floor) = rewind_last_op.has_floor() else {
599 return Err(Error::<F>::UnexpectedData(rewind_last_loc));
600 };
601 if *rewind_floor < pruned_bits {
602 return Err(Error::<F>::Journal(JournalError::ItemPruned(*rewind_floor)));
603 }
604 }
605
606 let pinned_nodes = if pruned_chunks > 0 {
608 let grafted_leaves = Location::<F>::new(pruned_chunks as u64);
609 let mut pinned_nodes = Vec::new();
610 for pos in F::nodes_to_pin(grafted_leaves) {
611 let digest = self
612 .grafted_tree
613 .get_node(pos)
614 .ok_or(Error::<F>::DataCorrupted("missing grafted pinned node"))?;
615 pinned_nodes.push(digest);
616 }
617 pinned_nodes
618 } else {
619 Vec::new()
620 };
621
622 self.any.rewind(size).await?;
626
627 let hasher = qmdb::hasher::<H>();
628 let ops_size = self.any.log.merkle.size();
629 let ops_leaves = Location::<F>::try_from(ops_size)?;
630 let grafted_tree = build_grafted_tree::<F, H, S, N>(
631 &hasher,
632 self.any.bitmap.as_ref(),
633 &pinned_nodes,
634 &self.any.log.merkle,
635 ops_leaves,
636 &self.strategy,
637 )
638 .await?;
639 let storage = grafting::Storage::new(
640 &grafted_tree,
641 grafting::height::<N>(),
642 &self.any.log.merkle,
643 hasher.clone(),
644 );
645 let partial_chunk = partial_chunk(self.any.bitmap.as_ref());
646 let ops_root = self.any.root();
647 let root = compute_db_root(
648 &hasher,
649 self.any.bitmap.as_ref(),
650 &storage,
651 ops_leaves,
652 partial_chunk,
653 self.any.inactivity_floor_loc,
654 &ops_root,
655 )
656 .await?;
657
658 self.grafted_tree = grafted_tree;
659 self.root = root;
660 self.update_metrics();
661
662 Ok(())
663 }
664
665 pub(crate) async fn sync_metadata(&self) -> Result<(), Error<F>> {
667 let mut metadata = self.metadata.lock().await;
668 metadata.clear();
669
670 let pruned_chunks_u64 = self.any.bitmap.pruned_chunks() as u64;
672
673 let key = U64::new(PRUNED_CHUNKS_PREFIX, 0);
675 metadata.put(key, pruned_chunks_u64.to_be_bytes().to_vec());
676
677 let pruned_chunks = Location::<F>::new(pruned_chunks_u64);
679 for (i, grafted_pos) in F::nodes_to_pin(pruned_chunks).enumerate() {
680 let digest = self
681 .grafted_tree
682 .get_node(grafted_pos)
683 .ok_or(Error::<F>::DataCorrupted("missing grafted pinned node"))?;
684 let key = U64::new(NODE_PREFIX, i as u64);
685 metadata.put(key, digest.to_vec());
686 }
687
688 metadata.sync().await?;
689
690 Ok(())
691 }
692}
693
694pub(crate) fn sync_boundary<F: Graftable, const N: usize>(
698 mut pruned_chunks: u64,
699 ops_leaves: u64,
700) -> Location<F> {
701 let chunk_bits = bitmap::Prunable::<N>::CHUNK_SIZE_BITS;
702 let grafting_height = grafting::height::<N>();
703
704 while pruned_chunks > 0 {
705 let required_ops = pair_absorption_threshold::<F, N>(pruned_chunks).unwrap_or_else(|| {
706 let youngest_start = (pruned_chunks - 1) * chunk_bits;
707 let pos = F::subtree_root_position(Location::<F>::new(youngest_start), grafting_height);
708 F::peak_birth_size(pos, grafting_height)
709 });
710
711 if ops_leaves >= required_ops {
712 break;
713 }
714 pruned_chunks -= 1;
715 }
716
717 Location::new(pruned_chunks * chunk_bits)
718}
719
720fn pair_absorption_threshold<F: Graftable, const N: usize>(pruned_chunks: u64) -> Option<u64> {
724 if pruned_chunks == 0 {
725 return None;
726 }
727
728 let grafting_height = grafting::height::<N>();
729 let youngest = pruned_chunks - 1;
730 let youngest_start = youngest << grafting_height;
731 let youngest_end = (youngest + 1) << grafting_height;
732 let youngest_pos =
733 F::subtree_root_position(Location::<F>::new(youngest_start), grafting_height);
734
735 if F::peak_birth_size(youngest_pos, grafting_height) <= youngest_end {
736 return None;
737 }
738
739 let pair_chunk = youngest & !1;
740 let pair_start = pair_chunk << grafting_height;
741 let pair_pos = F::subtree_root_position(Location::<F>::new(pair_start), grafting_height + 1);
742 Some(F::peak_birth_size(pair_pos, grafting_height + 1))
743}
744
745impl<F, E, U, C, I, H, const N: usize, S> Db<F, E, C, I, H, U, N, S>
747where
748 F: merkle::Graftable,
749 E: Context,
750 U: Update,
751 C: Mutable<Item = Operation<F, U>> + Persistable<Error = JournalError>,
752 I: UnorderedIndex<Value = Location<F>>,
753 H: Hasher,
754 S: Strategy,
755 Operation<F, U>: Codec,
756{
757 pub async fn commit(&self) -> Result<(), Error<F>> {
760 self.any.commit().await
761 }
762
763 pub async fn sync(&self) -> Result<(), Error<F>> {
765 let _timer = self.metrics.sync_timer();
766 self.metrics.sync_calls.inc();
767 self.any.sync().await?;
768
769 self.sync_metadata().await?;
772 self.update_metrics();
773 Ok(())
774 }
775
776 pub async fn destroy(self) -> Result<(), Error<F>> {
778 self.metadata.into_inner().destroy().await?;
779 self.any.destroy().await
780 }
781}
782
783impl<F, E, U, C, I, H, const N: usize, S> Db<F, E, C, I, H, U, N, S>
784where
785 F: merkle::Graftable,
786 E: Context,
787 U: Update + 'static,
788 C: Mutable<Item = Operation<F, U>> + Persistable<Error = JournalError>,
789 I: UnorderedIndex<Value = Location<F>>,
790 H: Hasher,
791 S: Strategy,
792 Operation<F, U>: Codec,
793{
794 pub async fn apply_batch(
804 &mut self,
805 batch: Arc<super::batch::MerkleizedBatch<F, H::Digest, U, N, S>>,
806 ) -> Result<Range<Location<F>>, Error<F>> {
807 let _timer = self.metrics.apply_batch_timer();
808 self.metrics.apply_batch_calls.inc();
809 let range = self.any.apply_batch(Arc::clone(&batch.inner)).await?;
810 self.grafted_tree.apply_batch(&batch.grafted)?;
811 self.root = batch.canonical_root;
812 self.update_metrics();
813 Ok(range)
814 }
815}
816
817impl<F, E, U, C, I, H, const N: usize, S> Persistable for Db<F, E, C, I, H, U, N, S>
818where
819 F: merkle::Graftable,
820 E: Context,
821 U: Update,
822 C: Mutable<Item = Operation<F, U>> + Persistable<Error = JournalError>,
823 I: UnorderedIndex<Value = Location<F>>,
824 H: Hasher,
825 S: Strategy,
826 Operation<F, U>: Codec,
827{
828 type Error = Error<F>;
829
830 async fn commit(&self) -> Result<(), Error<F>> {
831 Self::commit(self).await
832 }
833
834 async fn sync(&self) -> Result<(), Error<F>> {
835 Self::sync(self).await
836 }
837
838 async fn destroy(self) -> Result<(), Error<F>> {
839 self.destroy().await
840 }
841}
842
843pub(super) fn partial_chunk<B: bitmap::Readable<N>, const N: usize>(
846 bitmap: &B,
847) -> Option<([u8; N], u64)> {
848 let (last_chunk, next_bit) = bitmap.last_chunk();
849 if next_bit == bitmap::Prunable::<N>::CHUNK_SIZE_BITS {
850 None
851 } else {
852 Some((last_chunk, next_bit))
853 }
854}
855
856fn graftable_chunk_window<F: merkle::Graftable, B: bitmap::Readable<N>, const N: usize>(
861 bitmap: &B,
862 ops_leaves: Location<F>,
863 grafting_height: u32,
864) -> Result<(u64, u64), Error<F>> {
865 let complete = bitmap.complete_chunks() as u64;
866 let graftable = grafting::graftable_chunks::<F>(*ops_leaves, grafting_height).min(complete);
867 let pending = complete - graftable;
868 if pending > 1 {
869 return Err(Error::DataCorrupted("multiple pending bitmap chunks"));
870 }
871
872 let pruned = bitmap.pruned_chunks() as u64;
873 if pruned > graftable {
874 return Err(Error::DataCorrupted(
875 "pruned chunks exceed graftable chunks",
876 ));
877 }
878
879 Ok((complete, graftable))
880}
881
882pub(super) fn pending_chunk<F: merkle::Graftable, B: bitmap::Readable<N>, const N: usize>(
895 bitmap: &B,
896 ops_leaves: Location<F>,
897 grafting_height: u32,
898) -> Result<Option<[u8; N]>, Error<F>> {
899 let (complete, graftable) =
900 graftable_chunk_window::<F, B, N>(bitmap, ops_leaves, grafting_height)?;
901 if complete - graftable != 1 {
902 return Ok(None);
903 }
904 Ok(Some(bitmap.get_chunk(graftable as usize)))
905}
906
907pub(super) fn combine_roots<H: Hasher>(
920 hasher: &StandardHasher<H>,
921 ops_root: &H::Digest,
922 grafted_root: &H::Digest,
923 pending: Option<&H::Digest>,
924 partial: Option<(u64, &H::Digest)>,
925) -> H::Digest {
926 match (pending, partial) {
927 (None, None) => hasher.hash([ops_root.as_ref(), grafted_root.as_ref()]),
928 (Some(pe), None) => hasher.hash([ops_root.as_ref(), grafted_root.as_ref(), pe.as_ref()]),
929 (None, Some((nb, p))) => {
930 let nb_bytes = nb.to_be_bytes();
931 hasher.hash([
932 ops_root.as_ref(),
933 grafted_root.as_ref(),
934 nb_bytes.as_slice(),
935 p.as_ref(),
936 ])
937 }
938 (Some(pe), Some((nb, p))) => {
939 let nb_bytes = nb.to_be_bytes();
940 hasher.hash([
941 ops_root.as_ref(),
942 grafted_root.as_ref(),
943 pe.as_ref(),
944 nb_bytes.as_slice(),
945 p.as_ref(),
946 ])
947 }
948 }
949}
950
951#[allow(clippy::too_many_arguments)]
961pub(super) async fn compute_db_root<
962 F: merkle::Graftable,
963 H: Hasher,
964 B: bitmap::Readable<N>,
965 S: MerkleStorage<F, Digest = H::Digest>,
966 const N: usize,
967>(
968 hasher: &StandardHasher<H>,
969 status: &B,
970 storage: &S,
971 ops_leaves: Location<F>,
972 partial_chunk: Option<([u8; N], u64)>,
973 inactivity_floor: Location<F>,
974 ops_root: &H::Digest,
975) -> Result<H::Digest, Error<F>> {
976 let grafted_root =
977 compute_grafted_root(hasher, status, storage, ops_leaves, inactivity_floor).await?;
978 let pending = pending_chunk::<F, B, N>(status, ops_leaves, grafting::height::<N>())?
979 .map(|chunk| hasher.digest(&chunk));
980 let partial = partial_chunk.map(|(chunk, next_bit)| {
981 let digest = hasher.digest(&chunk);
982 (next_bit, digest)
983 });
984 Ok(combine_roots(
985 hasher,
986 ops_root,
987 &grafted_root,
988 pending.as_ref(),
989 partial.as_ref().map(|(nb, d)| (*nb, d)),
990 ))
991}
992
993pub(super) async fn compute_grafted_root<
1003 F: merkle::Graftable,
1004 H: Hasher,
1005 B: bitmap::Readable<N>,
1006 S: MerkleStorage<F, Digest = H::Digest>,
1007 const N: usize,
1008>(
1009 hasher: &StandardHasher<H>,
1010 status: &B,
1011 storage: &S,
1012 ops_leaves: Location<F>,
1013 inactivity_floor: Location<F>,
1014) -> Result<H::Digest, Error<F>> {
1015 let size = storage.size().await;
1016 let leaves = Location::try_from(size)?;
1017
1018 let mut peaks: Vec<H::Digest> = Vec::new();
1020 for (peak_pos, _) in F::peaks(size) {
1021 let digest = storage
1022 .get_node(peak_pos)
1023 .await?
1024 .ok_or_else(|| merkle::Error::<F>::MissingNode(peak_pos))?;
1025 peaks.push(digest);
1026 }
1027
1028 let grafting_height = grafting::height::<N>();
1030 let (_complete_chunks, _graftable_chunks) =
1031 graftable_chunk_window::<F, B, N>(status, ops_leaves, grafting_height)?;
1032
1033 let inactive_peaks =
1034 grafting::chunk_aligned_inactive_peaks::<F>(leaves, inactivity_floor, grafting_height)?;
1035
1036 Ok(hasher.root(leaves, inactive_peaks, peaks.iter())?)
1042}
1043
1044pub(super) async fn compute_grafted_leaves<
1054 F: merkle::Graftable,
1055 H: Hasher,
1056 S: Strategy,
1057 const N: usize,
1058>(
1059 hasher: &StandardHasher<H>,
1060 ops_tree: &impl MerkleStorage<F, Digest = H::Digest>,
1061 chunks: impl IntoIterator<Item = (usize, [u8; N])>,
1062 strategy: &S,
1063) -> Result<Vec<(usize, H::Digest)>, Error<F>> {
1064 let grafting_height = grafting::height::<N>();
1065
1066 let inputs = try_join_all(chunks.into_iter().map(|(chunk_idx, chunk)| async move {
1069 let leaf_start = Location::<F>::new((chunk_idx as u64) << grafting_height);
1070 let pos = F::subtree_root_position(leaf_start, grafting_height);
1071 let chunk_ops_digest = ops_tree
1072 .get_node(pos)
1073 .await?
1074 .ok_or(merkle::Error::<F>::MissingGraftedLeaf(pos))?;
1075 Ok::<_, Error<F>>((chunk_idx, chunk_ops_digest, chunk))
1076 }))
1077 .await?;
1078
1079 let zero_chunk = [0u8; N];
1082 Ok(strategy.map_init_collect_vec(
1083 inputs,
1084 || hasher.clone(),
1085 |h, (chunk_idx, chunk_ops_digest, chunk)| {
1086 if chunk == zero_chunk {
1087 (chunk_idx, chunk_ops_digest)
1088 } else {
1089 (
1090 chunk_idx,
1091 h.hash([chunk.as_slice(), chunk_ops_digest.as_ref()]),
1092 )
1093 }
1094 },
1095 ))
1096}
1097
1098pub(super) async fn build_grafted_tree<
1113 F: merkle::Graftable,
1114 H: Hasher,
1115 S: Strategy,
1116 const N: usize,
1117>(
1118 hasher: &StandardHasher<H>,
1119 bitmap: &impl bitmap::Readable<N>,
1120 pinned_nodes: &[H::Digest],
1121 ops_tree: &impl MerkleStorage<F, Digest = H::Digest>,
1122 ops_leaves: Location<F>,
1123 strategy: &S,
1124) -> Result<Mem<F, H::Digest>, Error<F>> {
1125 let grafting_height = grafting::height::<N>();
1126 let pruned_chunks = bitmap.pruned_chunks();
1127 let complete_chunks = bitmap.complete_chunks();
1128 let graftable_chunks = grafting::graftable_chunks::<F>(*ops_leaves, grafting_height)
1129 .min(complete_chunks as u64) as usize;
1130 assert!(
1131 pruned_chunks <= graftable_chunks && graftable_chunks <= complete_chunks,
1132 "invariant violated: pruned={pruned_chunks} graftable={graftable_chunks} complete={complete_chunks}"
1133 );
1134
1135 let leaves = compute_grafted_leaves::<F, H, S, N>(
1139 hasher,
1140 ops_tree,
1141 (pruned_chunks..graftable_chunks).map(|chunk_idx| (chunk_idx, bitmap.get_chunk(chunk_idx))),
1142 strategy,
1143 )
1144 .await?;
1145
1146 let grafted_hasher = grafting::GraftedHasher::<F, _>::new(hasher.clone(), grafting_height);
1148 let mut grafted_tree = if pruned_chunks > 0 {
1149 let grafted_pruning_boundary = Location::<F>::new(pruned_chunks as u64);
1150 Mem::from_components(Vec::new(), grafted_pruning_boundary, pinned_nodes.to_vec())
1151 .map_err(|_| Error::<F>::DataCorrupted("grafted tree rebuild failed"))?
1152 } else {
1153 Mem::new()
1154 };
1155
1156 if !leaves.is_empty() {
1158 let batch = {
1159 let mut batch = grafted_tree.new_batch_with_strategy(strategy.clone());
1160 for &(_ops_pos, digest) in &leaves {
1161 batch = batch.add_leaf_digest(digest);
1162 }
1163 batch.merkleize(&grafted_tree, &grafted_hasher)
1164 };
1165 grafted_tree.apply_batch(&batch)?;
1166 }
1167
1168 Ok(grafted_tree)
1169}
1170
1171pub(super) async fn init_metadata<F: merkle::Graftable, E: Context, D: Digest>(
1182 context: E,
1183 partition: &str,
1184) -> Result<(Metadata<E, U64, Vec<u8>>, usize, Vec<D>), Error<F>> {
1185 let metadata_cfg = MConfig {
1186 partition: partition.into(),
1187 codec_config: ((0..).into(), ()),
1188 };
1189 let metadata =
1190 Metadata::<_, U64, Vec<u8>>::init(context.child("metadata"), metadata_cfg).await?;
1191
1192 let key = U64::new(PRUNED_CHUNKS_PREFIX, 0);
1193 let pruned_chunks = match metadata.get(&key) {
1194 Some(bytes) => u64::from_be_bytes(bytes.as_slice().try_into().map_err(|_| {
1195 error!("pruned chunks value not a valid u64");
1196 Error::<F>::DataCorrupted("pruned chunks value not a valid u64")
1197 })?),
1198 None => {
1199 warn!("bitmap metadata does not contain pruned chunks, initializing as empty");
1200 0
1201 }
1202 } as usize;
1203
1204 let pinned_nodes = if pruned_chunks > 0 {
1208 let pruned_loc = Location::<F>::new(pruned_chunks as u64);
1209 if !pruned_loc.is_valid() {
1210 return Err(Error::DataCorrupted("pruned chunks exceeds MAX_LEAVES"));
1211 }
1212 let mut pinned = Vec::new();
1213 for (index, _pos) in F::nodes_to_pin(pruned_loc).enumerate() {
1214 let metadata_key = U64::new(NODE_PREFIX, index as u64);
1215 let Some(bytes) = metadata.get(&metadata_key) else {
1216 return Err(Error::DataCorrupted(
1217 "missing pinned node in grafted tree metadata",
1218 ));
1219 };
1220 let digest = D::decode(bytes.as_ref())
1221 .map_err(|_| Error::<F>::DataCorrupted("invalid pinned node digest"))?;
1222 pinned.push(digest);
1223 }
1224 pinned
1225 } else {
1226 Vec::new()
1227 };
1228
1229 Ok((metadata, pruned_chunks, pinned_nodes))
1230}
1231
1232#[cfg(test)]
1233mod tests {
1234 use super::*;
1235 use crate::{
1236 merkle::{mmb, mmr, Bagging::ForwardFold},
1237 qmdb::{
1238 any::traits::{DbAny, UnmerkleizedBatch as _},
1239 current::{tests::fixed_config, unordered::fixed},
1240 },
1241 translator::OneCap,
1242 };
1243 use commonware_codec::FixedSize;
1244 use commonware_cryptography::{sha256, Sha256};
1245 use commonware_macros::test_traced;
1246 use commonware_runtime::{deterministic, Runner as _, Supervisor as _};
1247 use commonware_utils::bitmap::Prunable as PrunableBitMap;
1248
1249 const N: usize = sha256::Digest::SIZE;
1250
1251 #[test]
1252 fn partial_chunk_single_bit() {
1253 let mut bm = PrunableBitMap::<N>::new();
1254 bm.push(true);
1255 let result = partial_chunk::<PrunableBitMap<N>, N>(&bm);
1256 assert!(result.is_some());
1257 let (chunk, next_bit) = result.unwrap();
1258 assert_eq!(next_bit, 1);
1259 assert_eq!(chunk[0], 1); }
1261
1262 #[test]
1263 fn partial_chunk_aligned() {
1264 let mut bm = PrunableBitMap::<N>::new();
1265 for _ in 0..PrunableBitMap::<N>::CHUNK_SIZE_BITS {
1266 bm.push(true);
1267 }
1268 let result = partial_chunk::<PrunableBitMap<N>, N>(&bm);
1269 assert!(result.is_none());
1270 }
1271
1272 #[test]
1273 fn partial_chunk_partial() {
1274 let mut bm = PrunableBitMap::<N>::new();
1275 for _ in 0..(PrunableBitMap::<N>::CHUNK_SIZE_BITS + 5) {
1276 bm.push(true);
1277 }
1278 let result = partial_chunk::<PrunableBitMap<N>, N>(&bm);
1279 assert!(result.is_some());
1280 let (_chunk, next_bit) = result.unwrap();
1281 assert_eq!(next_bit, 5);
1282 }
1283
1284 #[test]
1285 fn combine_roots_deterministic() {
1286 let hasher = StandardHasher::<Sha256>::new(ForwardFold);
1287 let ops = Sha256::hash(b"ops");
1288 let grafted = Sha256::hash(b"grafted");
1289 let r1 = combine_roots(&hasher, &ops, &grafted, None, None);
1290 let r2 = combine_roots(&hasher, &ops, &grafted, None, None);
1291 assert_eq!(r1, r2);
1292 }
1293
1294 #[test]
1295 fn combine_roots_with_partial_differs() {
1296 let hasher = StandardHasher::<Sha256>::new(ForwardFold);
1297 let ops = Sha256::hash(b"ops");
1298 let grafted = Sha256::hash(b"grafted");
1299 let partial_digest = Sha256::hash(b"partial");
1300
1301 let without = combine_roots(&hasher, &ops, &grafted, None, None);
1302 let with = combine_roots(&hasher, &ops, &grafted, None, Some((5, &partial_digest)));
1303 assert_ne!(without, with);
1304 }
1305
1306 #[test]
1307 fn combine_roots_with_pending_differs() {
1308 let hasher = StandardHasher::<Sha256>::new(ForwardFold);
1309 let ops = Sha256::hash(b"ops");
1310 let grafted = Sha256::hash(b"grafted");
1311 let pending_digest = Sha256::hash(b"pending");
1312
1313 let without = combine_roots(&hasher, &ops, &grafted, None, None);
1314 let with = combine_roots(&hasher, &ops, &grafted, Some(&pending_digest), None);
1315 assert_ne!(without, with);
1316 }
1317
1318 #[test]
1319 fn combine_roots_pending_and_partial_independent() {
1320 let hasher = StandardHasher::<Sha256>::new(ForwardFold);
1321 let ops = Sha256::hash(b"ops");
1322 let grafted = Sha256::hash(b"grafted");
1323 let pending_digest = Sha256::hash(b"pending");
1324 let partial_digest = Sha256::hash(b"partial");
1325
1326 let only_pending = combine_roots(&hasher, &ops, &grafted, Some(&pending_digest), None);
1327 let only_partial = combine_roots(&hasher, &ops, &grafted, None, Some((5, &partial_digest)));
1328 let both = combine_roots(
1329 &hasher,
1330 &ops,
1331 &grafted,
1332 Some(&pending_digest),
1333 Some((5, &partial_digest)),
1334 );
1335 assert_ne!(only_pending, only_partial);
1336 assert_ne!(only_pending, both);
1337 assert_ne!(only_partial, both);
1338 }
1339
1340 #[test]
1341 fn combine_roots_different_ops_root() {
1342 let hasher = StandardHasher::<Sha256>::new(ForwardFold);
1343 let ops_a = Sha256::hash(b"ops_a");
1344 let ops_b = Sha256::hash(b"ops_b");
1345 let grafted = Sha256::hash(b"grafted");
1346
1347 let r1 = combine_roots(&hasher, &ops_a, &grafted, None, None);
1348 let r2 = combine_roots(&hasher, &ops_b, &grafted, None, None);
1349 assert_ne!(r1, r2);
1350 }
1351
1352 #[test]
1356 fn combine_roots_format_golden() {
1357 let hasher = StandardHasher::<Sha256>::new(ForwardFold);
1358 let ops = Sha256::hash(b"ops");
1359 let grafted = Sha256::hash(b"grafted");
1360 let pending = Sha256::hash(b"pending");
1361 let partial = Sha256::hash(b"partial");
1362 let next_bit: u64 = 0x1122_3344_5566_7788;
1363
1364 assert_eq!(
1366 combine_roots(&hasher, &ops, &grafted, None, None),
1367 hasher.hash([ops.as_ref(), grafted.as_ref()])
1368 );
1369
1370 assert_eq!(
1372 combine_roots(&hasher, &ops, &grafted, Some(&pending), None),
1373 hasher.hash([ops.as_ref(), grafted.as_ref(), pending.as_ref()])
1374 );
1375
1376 assert_eq!(
1378 combine_roots(&hasher, &ops, &grafted, None, Some((next_bit, &partial))),
1379 hasher.hash([
1380 ops.as_ref(),
1381 grafted.as_ref(),
1382 next_bit.to_be_bytes().as_slice(),
1383 partial.as_ref(),
1384 ])
1385 );
1386
1387 assert_eq!(
1389 combine_roots(
1390 &hasher,
1391 &ops,
1392 &grafted,
1393 Some(&pending),
1394 Some((next_bit, &partial))
1395 ),
1396 hasher.hash([
1397 ops.as_ref(),
1398 grafted.as_ref(),
1399 pending.as_ref(),
1400 next_bit.to_be_bytes().as_slice(),
1401 partial.as_ref(),
1402 ])
1403 );
1404 }
1405
1406 type MmrDb = fixed::Db<
1407 mmr::Family,
1408 deterministic::Context,
1409 sha256::Digest,
1410 sha256::Digest,
1411 Sha256,
1412 OneCap,
1413 32,
1414 commonware_parallel::Sequential,
1415 >;
1416 type MmbDb = fixed::Db<
1417 mmb::Family,
1418 deterministic::Context,
1419 sha256::Digest,
1420 sha256::Digest,
1421 Sha256,
1422 OneCap,
1423 32,
1424 commonware_parallel::Sequential,
1425 >;
1426
1427 async fn populate_fixed_db<F, DB>(db: &mut DB, start: u64, count: u64)
1428 where
1429 F: merkle::Graftable,
1430 DB: DbAny<F, Key = sha256::Digest, Value = sha256::Digest>,
1431 {
1432 let mut batch = db.new_batch();
1433 for idx in start..start + count {
1434 let key = Sha256::hash(&idx.to_be_bytes());
1435 let value = Sha256::hash(&(idx + count).to_be_bytes());
1436 batch = batch.write(key, Some(value));
1437 }
1438 let merkleized = batch.merkleize(db, None).await.unwrap();
1439 db.apply_batch(merkleized).await.unwrap();
1440 db.commit().await.unwrap();
1441 }
1442
1443 #[test_traced]
1444 fn test_ops_root_witness_verifies_without_partial_chunk() {
1445 let executor = deterministic::Runner::default();
1446 executor.start(|ctx| async move {
1447 let mut db = MmrDb::init(
1448 ctx.child("storage"),
1449 fixed_config::<OneCap>("ops-root-witness-full", &ctx),
1450 )
1451 .await
1452 .unwrap();
1453 let mut next_idx = 0;
1454 populate_fixed_db::<mmr::Family, _>(&mut db, next_idx, 256).await;
1455 next_idx += 256;
1456 while partial_chunk::<_, 32>(db.any.bitmap.as_ref()).is_some() {
1457 populate_fixed_db::<mmr::Family, _>(&mut db, next_idx, 1).await;
1458 next_idx += 1;
1459 }
1460
1461 let hasher = qmdb::hasher::<Sha256>();
1462 let witness = db.ops_root_witness(&hasher).await.unwrap();
1463 let ops_root = db.ops_root();
1464 let canonical_root = db.root();
1465
1466 assert!(witness.partial_chunk.is_none());
1467 assert!(witness.verify(&hasher, &ops_root, &canonical_root));
1468
1469 let wrong_ops_root = Sha256::hash(b"wrong ops root");
1470 assert!(!witness.verify(&hasher, &wrong_ops_root, &canonical_root));
1471
1472 let wrong_canonical_root = Sha256::hash(b"wrong canonical root");
1473 assert!(!witness.verify(&hasher, &ops_root, &wrong_canonical_root));
1474
1475 let mut tampered = witness;
1476 tampered.grafted_root = Sha256::hash(b"wrong grafted root");
1477 assert!(!tampered.verify(&hasher, &ops_root, &canonical_root));
1478 });
1479 }
1480
1481 #[test_traced]
1482 fn test_ops_root_witness_verifies_with_partial_chunk() {
1483 let executor = deterministic::Runner::default();
1484 executor.start(|ctx| async move {
1485 let mut db = MmbDb::init(
1486 ctx.child("storage"),
1487 fixed_config::<OneCap>("ops-root-witness-partial", &ctx),
1488 )
1489 .await
1490 .unwrap();
1491 populate_fixed_db::<mmb::Family, _>(&mut db, 0, 260).await;
1492
1493 let hasher = qmdb::hasher::<Sha256>();
1494 let witness = db.ops_root_witness(&hasher).await.unwrap();
1495 let ops_root = db.ops_root();
1496 let canonical_root = db.root();
1497
1498 assert!(witness.partial_chunk.is_some());
1499 assert!(witness.verify(&hasher, &ops_root, &canonical_root));
1500
1501 let wrong_ops_root = Sha256::hash(b"wrong ops root");
1502 assert!(!witness.verify(&hasher, &wrong_ops_root, &canonical_root));
1503
1504 let wrong_canonical_root = Sha256::hash(b"wrong canonical root");
1505 assert!(!witness.verify(&hasher, &ops_root, &wrong_canonical_root));
1506
1507 let mut tampered = witness.clone();
1508 tampered.grafted_root = Sha256::hash(b"wrong grafted root");
1509 assert!(!tampered.verify(&hasher, &ops_root, &canonical_root));
1510
1511 let mut tampered = witness.clone();
1512 tampered.partial_chunk.as_mut().unwrap().0 += 1;
1513 assert!(!tampered.verify(&hasher, &ops_root, &canonical_root));
1514
1515 let mut tampered = witness;
1516 tampered.partial_chunk.as_mut().unwrap().1 = Sha256::hash(b"wrong partial chunk");
1517 assert!(!tampered.verify(&hasher, &ops_root, &canonical_root));
1518 });
1519 }
1520
1521 #[test_traced]
1522 fn test_ops_root_witness_verifies_with_pruned_db() {
1523 let executor = deterministic::Runner::default();
1524 executor.start(|ctx| async move {
1525 let mut db = MmrDb::init(
1526 ctx.child("storage"),
1527 fixed_config::<OneCap>("ops-root-witness-pruned", &ctx),
1528 )
1529 .await
1530 .unwrap();
1531
1532 for _ in 0..5 {
1534 populate_fixed_db::<mmr::Family, _>(&mut db, 0, 512).await;
1535 }
1536 db.prune(db.sync_boundary()).await.unwrap();
1537 assert!(
1538 db.any.bitmap.pruned_chunks() > 0,
1539 "test requires at least one pruned chunk to exercise the zero-chunk path"
1540 );
1541
1542 let hasher = qmdb::hasher::<Sha256>();
1543 let witness = db.ops_root_witness(&hasher).await.unwrap();
1544 let ops_root = db.ops_root();
1545 let canonical_root = db.root();
1546
1547 assert!(witness.verify(&hasher, &ops_root, &canonical_root));
1548
1549 let wrong_canonical_root = Sha256::hash(b"wrong canonical root");
1550 assert!(!witness.verify(&hasher, &ops_root, &wrong_canonical_root));
1551
1552 let mut tampered = witness;
1553 tampered.grafted_root = Sha256::hash(b"wrong grafted root");
1554 assert!(!tampered.verify(&hasher, &ops_root, &canonical_root));
1555 });
1556 }
1557
1558 #[test_traced]
1559 fn test_ops_root_witness_verifies_on_fresh_db() {
1560 let executor = deterministic::Runner::default();
1561 executor.start(|ctx| async move {
1562 let db = MmrDb::init(
1563 ctx.child("storage"),
1564 fixed_config::<OneCap>("ops-root-witness-fresh", &ctx),
1565 )
1566 .await
1567 .unwrap();
1568
1569 let hasher = qmdb::hasher::<Sha256>();
1570 let witness = db.ops_root_witness(&hasher).await.unwrap();
1571 let ops_root = db.ops_root();
1572 let canonical_root = db.root();
1573
1574 assert!(witness.verify(&hasher, &ops_root, &canonical_root));
1575 });
1576 }
1577}