1use crate::{
8 journal::{
9 contiguous::fixed::{Config as JConfig, Journal},
10 Error as JError,
11 },
12 metadata::{Config as MConfig, Metadata},
13 mmr::{
14 hasher::Hasher,
15 iterator::{nodes_to_pin, PeakIterator},
16 location::Location,
17 mem::{Clean, Config as MemConfig, Dirty, DirtyMmr as DirtyMemMmr, Mmr as MemMmr, State},
18 position::Position,
19 storage::Storage,
20 verification,
21 Error::{self, *},
22 Proof,
23 },
24};
25use commonware_codec::DecodeExt;
26use commonware_cryptography::Digest;
27use commonware_parallel::ThreadPool;
28use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage as RStorage};
29use commonware_utils::sequence::prefixed_u64::U64;
30use core::ops::Range;
31use std::{
32 collections::BTreeMap,
33 num::{NonZeroU64, NonZeroUsize},
34};
35use tracing::{debug, error, warn};
36
37pub type DirtyMmr<E, D> = Mmr<E, D, Dirty>;
38pub type CleanMmr<E, D> = Mmr<E, D, Clean<D>>;
39
40#[derive(Clone)]
42pub struct Config {
43 pub journal_partition: String,
46
47 pub metadata_partition: String,
51
52 pub items_per_blob: NonZeroU64,
54
55 pub write_buffer: NonZeroUsize,
57
58 pub thread_pool: Option<ThreadPool>,
60
61 pub page_cache: CacheRef,
63}
64
65pub struct SyncConfig<D: Digest> {
72 pub config: Config,
74
75 pub range: std::ops::Range<Position>,
77
78 pub pinned_nodes: Option<Vec<D>>,
82}
83
84pub struct Mmr<E: RStorage + Clock + Metrics, D: Digest, S: State<D> + Send + Sync = Dirty> {
86 mem_mmr: MemMmr<D, S>,
90
91 journal: Journal<E, D>,
93
94 journal_size: Position,
97
98 metadata: Metadata<E, U64, Vec<u8>>,
102
103 pruned_to_pos: Position,
106
107 pool: Option<ThreadPool>,
109}
110
111impl<E: RStorage + Clock + Metrics, D: Digest> From<CleanMmr<E, D>> for DirtyMmr<E, D> {
112 fn from(clean: Mmr<E, D, Clean<D>>) -> Self {
113 Self {
114 mem_mmr: clean.mem_mmr.into(),
115 journal: clean.journal,
116 journal_size: clean.journal_size,
117 metadata: clean.metadata,
118 pruned_to_pos: clean.pruned_to_pos,
119 pool: clean.pool,
120 }
121 }
122}
123
124const NODE_PREFIX: u8 = 0;
126
127const PRUNE_TO_POS_PREFIX: u8 = 1;
129
130impl<E: RStorage + Clock + Metrics, D: Digest, S: State<D> + Send + Sync> Mmr<E, D, S> {
131 pub fn size(&self) -> Position {
134 self.mem_mmr.size()
135 }
136
137 pub fn leaves(&self) -> Location {
139 self.mem_mmr.leaves()
140 }
141
142 pub fn last_leaf_pos(&self) -> Option<Position> {
144 self.mem_mmr.last_leaf_pos()
145 }
146
147 async fn get_from_metadata_or_journal(
151 metadata: &Metadata<E, U64, Vec<u8>>,
152 journal: &Journal<E, D>,
153 pos: Position,
154 ) -> Result<D, Error> {
155 if let Some(bytes) = metadata.get(&U64::new(NODE_PREFIX, *pos)) {
156 debug!(?pos, "read node from metadata");
157 let digest = D::decode(bytes.as_ref());
158 let Ok(digest) = digest else {
159 error!(
160 ?pos,
161 err = %digest.expect_err("digest is Err in else branch"),
162 "could not convert node from metadata bytes to digest"
163 );
164 return Err(Error::MissingNode(pos));
165 };
166 return Ok(digest);
167 }
168
169 debug!(?pos, "reading node from journal");
171 let node = journal.read(*pos).await;
172 match node {
173 Ok(node) => Ok(node),
174 Err(JError::ItemPruned(_)) => {
175 error!(?pos, "node is missing from metadata and journal");
176 Err(Error::MissingNode(pos))
177 }
178 Err(e) => Err(Error::JournalError(e)),
179 }
180 }
181
182 pub fn bounds(&self) -> std::ops::Range<Position> {
185 self.pruned_to_pos..self.size()
186 }
187
188 async fn add_extra_pinned_nodes(
190 mem_mmr: &mut MemMmr<D, S>,
191 metadata: &Metadata<E, U64, Vec<u8>>,
192 journal: &Journal<E, D>,
193 prune_pos: Position,
194 ) -> Result<(), Error> {
195 let mut pinned_nodes = BTreeMap::new();
196 for pos in nodes_to_pin(prune_pos) {
197 let digest =
198 Mmr::<E, D, Clean<D>>::get_from_metadata_or_journal(metadata, journal, pos).await?;
199 pinned_nodes.insert(pos, digest);
200 }
201 mem_mmr.add_pinned_nodes(pinned_nodes);
202
203 Ok(())
204 }
205}
206
207impl<E: RStorage + Clock + Metrics, D: Digest> CleanMmr<E, D> {
208 pub async fn init(
210 context: E,
211 hasher: &mut impl Hasher<Digest = D>,
212 cfg: Config,
213 ) -> Result<Self, Error> {
214 let journal_cfg = JConfig {
215 partition: cfg.journal_partition,
216 items_per_blob: cfg.items_per_blob,
217 page_cache: cfg.page_cache,
218 write_buffer: cfg.write_buffer,
219 };
220 let mut journal =
221 Journal::<E, D>::init(context.with_label("mmr_journal"), journal_cfg).await?;
222 let mut journal_size = Position::new(journal.bounds().end);
223
224 let metadata_cfg = MConfig {
225 partition: cfg.metadata_partition,
226 codec_config: ((0..).into(), ()),
227 };
228 let metadata =
229 Metadata::<_, U64, Vec<u8>>::init(context.with_label("mmr_metadata"), metadata_cfg)
230 .await?;
231
232 if journal_size == 0 {
233 let mem_mmr = MemMmr::init(
234 MemConfig {
235 nodes: vec![],
236 pruned_to_pos: Position::new(0),
237 pinned_nodes: vec![],
238 },
239 hasher,
240 )?;
241 return Ok(Self {
242 mem_mmr,
243 journal,
244 journal_size,
245 metadata,
246 pruned_to_pos: Position::new(0),
247 pool: cfg.thread_pool,
248 });
249 }
250
251 let key: U64 = U64::new(PRUNE_TO_POS_PREFIX, 0);
255 let metadata_prune_pos = metadata.get(&key).map_or(0, |bytes| {
256 u64::from_be_bytes(
257 bytes
258 .as_slice()
259 .try_into()
260 .expect("metadata prune position is not 8 bytes"),
261 )
262 });
263 let journal_bounds_start = journal.bounds().start;
264 if metadata_prune_pos > journal_bounds_start {
265 journal.prune(metadata_prune_pos).await?;
268 if journal.bounds().start != journal_bounds_start {
269 warn!(
272 journal_bounds_start,
273 metadata_prune_pos, "journal pruned to match metadata"
274 );
275 }
276 } else if metadata_prune_pos < journal_bounds_start {
277 warn!(
280 metadata_prune_pos,
281 journal_bounds_start, "metadata stale, using journal pruning boundary"
282 );
283 }
284
285 let effective_prune_pos = std::cmp::max(metadata_prune_pos, journal_bounds_start);
288
289 let last_valid_size = PeakIterator::to_nearest_size(journal_size);
290 let mut orphaned_leaf: Option<D> = None;
291 if last_valid_size != journal_size {
292 warn!(
293 ?last_valid_size,
294 "encountered invalid MMR structure, recovering from last valid size"
295 );
296 let recovered_item = journal.read(*last_valid_size).await;
299 if let Ok(item) = recovered_item {
300 orphaned_leaf = Some(item);
301 }
302 journal.rewind(*last_valid_size).await?;
303 journal.sync().await?;
304 journal_size = last_valid_size
305 }
306
307 let mut pinned_nodes = Vec::new();
309 for pos in nodes_to_pin(journal_size) {
310 let digest =
311 Mmr::<E, D>::get_from_metadata_or_journal(&metadata, &journal, pos).await?;
312 pinned_nodes.push(digest);
313 }
314 let mut mem_mmr = MemMmr::init(
315 MemConfig {
316 nodes: vec![],
317 pruned_to_pos: journal_size,
318 pinned_nodes,
319 },
320 hasher,
321 )?;
322 let prune_pos = Position::new(effective_prune_pos);
323 Self::add_extra_pinned_nodes(&mut mem_mmr, &metadata, &journal, prune_pos).await?;
324
325 let mut s = Self {
326 mem_mmr,
327 journal,
328 journal_size,
329 metadata,
330 pruned_to_pos: prune_pos,
331 pool: cfg.thread_pool,
332 };
333
334 if let Some(leaf) = orphaned_leaf {
335 let pos = s.mem_mmr.size();
337 warn!(?pos, "recovering orphaned leaf");
338 let mut dirty_mmr = s.mem_mmr.into_dirty();
339 dirty_mmr.add_leaf_digest(leaf);
340 s.mem_mmr = dirty_mmr.merkleize(hasher, None);
341 assert_eq!(pos, journal_size);
342 s.sync().await?;
343 assert_eq!(s.size(), s.journal.bounds().end);
344 }
345
346 Ok(s)
347 }
348
349 pub async fn init_sync(
364 context: E,
365 cfg: SyncConfig<D>,
366 hasher: &mut impl Hasher<Digest = D>,
367 ) -> Result<Self, crate::qmdb::Error> {
368 let journal_cfg = JConfig {
369 partition: cfg.config.journal_partition.clone(),
370 items_per_blob: cfg.config.items_per_blob,
371 write_buffer: cfg.config.write_buffer,
372 page_cache: cfg.config.page_cache.clone(),
373 };
374
375 assert!(!cfg.range.is_empty(), "range must not be empty");
377 let mut journal: Journal<E, D> =
378 Journal::init(context.with_label("mmr_journal"), journal_cfg).await?;
379 let size = journal.size();
380
381 if size > *cfg.range.end {
382 return Err(crate::journal::Error::ItemOutOfRange(size).into());
383 }
384 if size <= *cfg.range.start && *cfg.range.start != 0 {
385 journal.clear_to_size(*cfg.range.start).await?;
386 }
387
388 let journal_size = Position::new(journal.size());
389
390 let metadata_cfg = MConfig {
392 partition: cfg.config.metadata_partition,
393 codec_config: ((0..).into(), ()),
394 };
395 let mut metadata = Metadata::init(context.with_label("mmr_metadata"), metadata_cfg).await?;
396
397 let pruning_boundary_key = U64::new(PRUNE_TO_POS_PREFIX, 0);
399 metadata.put(
400 pruning_boundary_key,
401 (*cfg.range.start).to_be_bytes().into(),
402 );
403
404 if let Some(pinned_nodes) = cfg.pinned_nodes {
406 let nodes_to_pin_persisted = nodes_to_pin(cfg.range.start);
408 for (pos, digest) in nodes_to_pin_persisted.zip(pinned_nodes.iter()) {
409 metadata.put(U64::new(NODE_PREFIX, *pos), digest.to_vec());
410 }
411 }
412
413 let nodes_to_pin_mem = nodes_to_pin(journal_size);
417 let mut mem_pinned_nodes = Vec::new();
418 for pos in nodes_to_pin_mem {
419 let digest =
420 Mmr::<E, D>::get_from_metadata_or_journal(&metadata, &journal, pos).await?;
421 mem_pinned_nodes.push(digest);
422 }
423 let mut mem_mmr = MemMmr::init(
424 MemConfig {
425 nodes: vec![],
426 pruned_to_pos: journal_size,
427 pinned_nodes: mem_pinned_nodes,
428 },
429 hasher,
430 )?;
431
432 if cfg.range.start < journal_size {
435 Self::add_extra_pinned_nodes(&mut mem_mmr, &metadata, &journal, cfg.range.start)
436 .await?;
437 }
438
439 metadata.sync().await?;
441
442 journal.prune(*cfg.range.start).await?;
444
445 Ok(Self {
446 mem_mmr,
447 journal,
448 journal_size,
449 metadata,
450 pruned_to_pos: cfg.range.start,
451 pool: cfg.config.thread_pool,
452 })
453 }
454
455 async fn update_metadata(
458 &mut self,
459 prune_to_pos: Position,
460 ) -> Result<BTreeMap<Position, D>, Error> {
461 assert!(prune_to_pos >= self.pruned_to_pos);
462
463 let mut pinned_nodes = BTreeMap::new();
464 for pos in nodes_to_pin(prune_to_pos) {
465 let digest = self.get_node(pos).await?.expect(
466 "pinned node should exist if prune_to_pos is no less than self.pruned_to_pos",
467 );
468 self.metadata
469 .put(U64::new(NODE_PREFIX, *pos), digest.to_vec());
470 pinned_nodes.insert(pos, digest);
471 }
472
473 let key: U64 = U64::new(PRUNE_TO_POS_PREFIX, 0);
474 self.metadata.put(key, (*prune_to_pos).to_be_bytes().into());
475
476 self.metadata.sync().await.map_err(Error::MetadataError)?;
477
478 Ok(pinned_nodes)
479 }
480
481 pub async fn get_node(&self, position: Position) -> Result<Option<D>, Error> {
482 if let Some(node) = self.mem_mmr.get_node(position) {
483 return Ok(Some(node));
484 }
485
486 match self.journal.read(*position).await {
487 Ok(item) => Ok(Some(item)),
488 Err(JError::ItemPruned(_)) => Ok(None),
489 Err(e) => Err(Error::JournalError(e)),
490 }
491 }
492
493 pub async fn sync(&mut self) -> Result<(), Error> {
495 for pos in *self.journal_size..*self.size() {
497 let pos = Position::new(pos);
498 let node = *self.mem_mmr.get_node_unchecked(pos);
499 self.journal.append(node).await?;
500 }
501 self.journal_size = self.size();
502 self.journal.sync().await?;
503 assert_eq!(self.journal_size, self.journal.bounds().end);
504
505 let mut pinned_nodes = BTreeMap::new();
508 for pos in nodes_to_pin(self.pruned_to_pos) {
509 let digest = self.mem_mmr.get_node_unchecked(pos);
510 pinned_nodes.insert(pos, *digest);
511 }
512
513 self.mem_mmr.prune_all();
516 self.mem_mmr.add_pinned_nodes(pinned_nodes);
517
518 Ok(())
519 }
520
521 pub async fn prune_to_pos(&mut self, pos: Position) -> Result<(), Error> {
526 assert!(pos <= self.size());
527 if pos <= self.pruned_to_pos {
528 return Ok(());
529 }
530
531 self.sync().await?;
533
534 let pinned_nodes = self.update_metadata(pos).await?;
537
538 self.journal.prune(*pos).await?;
539 self.mem_mmr.add_pinned_nodes(pinned_nodes);
540 self.pruned_to_pos = pos;
541
542 Ok(())
543 }
544
545 pub const fn root(&self) -> D {
547 *self.mem_mmr.root()
548 }
549
550 pub async fn proof(&self, loc: Location) -> Result<Proof<D>, Error> {
558 if !loc.is_valid() {
559 return Err(Error::LocationOverflow(loc));
560 }
561 self.range_proof(loc..loc + 1).await
563 }
564
565 pub async fn range_proof(&self, range: Range<Location>) -> Result<Proof<D>, Error> {
575 verification::range_proof(self, range).await
576 }
577
578 pub async fn historical_range_proof(
589 &self,
590 leaves: Location,
591 range: Range<Location>,
592 ) -> Result<Proof<D>, Error> {
593 verification::historical_range_proof(self, leaves, range).await
594 }
595
596 pub async fn prune_all(&mut self) -> Result<(), Error> {
599 if self.size() != 0 {
600 self.prune_to_pos(self.size()).await?;
601 return Ok(());
602 }
603 Ok(())
604 }
605
606 pub async fn destroy(self) -> Result<(), Error> {
608 self.journal.destroy().await?;
609 self.metadata.destroy().await?;
610
611 Ok(())
612 }
613
614 pub fn into_dirty(self) -> DirtyMmr<E, D> {
616 self.into()
617 }
618
619 #[cfg(any(test, feature = "fuzzing"))]
620 pub async fn simulate_partial_sync(&mut self, write_limit: usize) -> Result<(), Error> {
623 if write_limit == 0 {
624 return Ok(());
625 }
626
627 let mut written_count = 0usize;
630 for i in *self.journal_size..*self.size() {
631 let node = *self.mem_mmr.get_node_unchecked(Position::new(i));
632 self.journal.append(node).await?;
633 written_count += 1;
634 if written_count >= write_limit {
635 break;
636 }
637 }
638 self.journal.sync().await?;
639
640 Ok(())
641 }
642
643 #[cfg(test)]
644 pub fn get_pinned_nodes(&self) -> BTreeMap<Position, D> {
645 self.mem_mmr.pinned_nodes()
646 }
647
648 #[cfg(test)]
649 pub async fn simulate_pruning_failure(mut self, prune_to_pos: Position) -> Result<(), Error> {
650 assert!(prune_to_pos <= self.size());
651
652 self.sync().await?;
654
655 self.update_metadata(prune_to_pos).await?;
658
659 Ok(())
661 }
662}
663
664impl<E: RStorage + Clock + Metrics, D: Digest> DirtyMmr<E, D> {
665 pub fn merkleize(self, h: &mut impl Hasher<Digest = D>) -> CleanMmr<E, D> {
667 CleanMmr {
668 mem_mmr: self.mem_mmr.merkleize(h, self.pool.clone()),
669 journal: self.journal,
670 journal_size: self.journal_size,
671 metadata: self.metadata,
672 pruned_to_pos: self.pruned_to_pos,
673 pool: self.pool,
674 }
675 }
676
677 pub async fn add(
680 &mut self,
681 h: &mut impl Hasher<Digest = D>,
682 element: &[u8],
683 ) -> Result<Position, Error> {
684 Ok(self.mem_mmr.add(h, element))
685 }
686
687 pub async fn pop(&mut self, mut leaves_to_pop: usize) -> Result<(), Error> {
689 while leaves_to_pop > 0 {
690 match self.mem_mmr.pop() {
691 Ok(_) => leaves_to_pop -= 1,
692 Err(ElementPruned(_)) => break,
693 Err(Empty) => return Err(Error::Empty),
694 Err(e) => return Err(e),
695 }
696 }
697 if leaves_to_pop == 0 {
698 return Ok(());
699 }
700
701 let mut new_size = self.size();
702 while leaves_to_pop > 0 {
703 if new_size == 0 {
704 return Err(Error::Empty);
705 }
706 new_size -= 1;
707 if new_size < self.pruned_to_pos {
708 return Err(Error::ElementPruned(new_size));
709 }
710 if new_size.is_mmr_size() {
711 leaves_to_pop -= 1;
712 }
713 }
714
715 self.journal.rewind(*new_size).await?;
716 self.journal.sync().await?;
717 self.journal_size = new_size;
718
719 let mut pinned_nodes = Vec::new();
720 for pos in nodes_to_pin(new_size) {
721 let digest = Mmr::<E, D, Clean<D>>::get_from_metadata_or_journal(
722 &self.metadata,
723 &self.journal,
724 pos,
725 )
726 .await?;
727 pinned_nodes.push(digest);
728 }
729
730 self.mem_mmr = DirtyMemMmr::from_components(vec![], new_size, pinned_nodes);
731 Self::add_extra_pinned_nodes(
732 &mut self.mem_mmr,
733 &self.metadata,
734 &self.journal,
735 self.pruned_to_pos,
736 )
737 .await?;
738
739 Ok(())
740 }
741
742 #[cfg(any(test, feature = "fuzzing"))]
743 pub async fn simulate_partial_sync(
746 self,
747 hasher: &mut impl Hasher<Digest = D>,
748 write_limit: usize,
749 ) -> Result<(), Error> {
750 if write_limit == 0 {
751 return Ok(());
752 }
753
754 let mut clean_mmr = self.merkleize(hasher);
755
756 let mut written_count = 0usize;
759 for i in *clean_mmr.journal_size..*clean_mmr.size() {
760 let node = *clean_mmr.mem_mmr.get_node_unchecked(Position::new(i));
761 clean_mmr.journal.append(node).await?;
762 written_count += 1;
763 if written_count >= write_limit {
764 break;
765 }
766 }
767 clean_mmr.journal.sync().await?;
768
769 Ok(())
770 }
771}
772
773impl<E: RStorage + Clock + Metrics + Sync, D: Digest> Storage<D> for CleanMmr<E, D> {
774 fn size(&self) -> Position {
775 self.size()
776 }
777
778 async fn get_node(&self, position: Position) -> Result<Option<D>, Error> {
779 self.get_node(position).await
780 }
781}
782
783#[cfg(test)]
784mod tests {
785 use super::*;
786 use crate::mmr::{
787 conformance::build_test_mmr, hasher::Hasher as _, location::LocationRangeExt as _, mem,
788 Location, StandardHasher as Standard,
789 };
790 use commonware_cryptography::{
791 sha256::{self, Digest},
792 Hasher, Sha256,
793 };
794 use commonware_macros::test_traced;
795 use commonware_runtime::{buffer::paged::CacheRef, deterministic, Blob as _, Runner};
796 use commonware_utils::{NZUsize, NZU16, NZU64};
797 use std::num::NonZeroU16;
798
799 fn test_digest(v: usize) -> Digest {
800 Sha256::hash(&v.to_be_bytes())
801 }
802
803 const PAGE_SIZE: NonZeroU16 = NZU16!(111);
804 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(5);
805
806 fn test_config() -> Config {
807 Config {
808 journal_partition: "journal_partition".into(),
809 metadata_partition: "metadata_partition".into(),
810 items_per_blob: NZU64!(7),
811 write_buffer: NZUsize!(1024),
812 thread_pool: None,
813 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
814 }
815 }
816
817 #[test]
819 fn test_journaled_mmr_batched_root() {
820 let executor = deterministic::Runner::default();
821 executor.start(|context| async move {
822 const NUM_ELEMENTS: u64 = 199;
823 let mut hasher: Standard<Sha256> = Standard::new();
824 let test_mmr = mem::CleanMmr::new(&mut hasher);
825 let test_mmr = build_test_mmr(&mut hasher, test_mmr, NUM_ELEMENTS);
826 let expected_root = test_mmr.root();
827
828 let mut journaled_mmr = Mmr::init(
829 context.clone(),
830 &mut Standard::<Sha256>::new(),
831 test_config(),
832 )
833 .await
834 .unwrap()
835 .into_dirty();
836
837 for i in 0u64..NUM_ELEMENTS {
838 hasher.inner().update(&i.to_be_bytes());
839 let element = hasher.inner().finalize();
840 journaled_mmr.add(&mut hasher, &element).await.unwrap();
841 }
842
843 let journaled_mmr = journaled_mmr.merkleize(&mut hasher);
844 assert_eq!(journaled_mmr.root(), *expected_root);
845
846 journaled_mmr.destroy().await.unwrap();
847 });
848 }
849
850 #[test_traced]
851 fn test_journaled_mmr_empty() {
852 let executor = deterministic::Runner::default();
853 executor.start(|context| async move {
854 let mut hasher: Standard<Sha256> = Standard::new();
855 let mut mmr = Mmr::init(context.with_label("first"), &mut hasher, test_config())
856 .await
857 .unwrap();
858 assert_eq!(mmr.size(), 0);
859 assert!(mmr.get_node(Position::new(0)).await.is_err());
860 let bounds = mmr.bounds();
861 assert!(bounds.is_empty());
862 assert!(mmr.prune_all().await.is_ok());
863 assert_eq!(bounds.start, 0);
864 assert!(mmr.prune_to_pos(Position::new(0)).await.is_ok());
865 assert!(mmr.sync().await.is_ok());
866 let mut mmr = mmr.into_dirty();
867 assert!(matches!(mmr.pop(1).await, Err(Error::Empty)));
868
869 mmr.add(&mut hasher, &test_digest(0)).await.unwrap();
870 assert_eq!(mmr.size(), 1);
871 let mut mmr = mmr.merkleize(&mut hasher);
872 mmr.sync().await.unwrap();
873 assert!(mmr.get_node(Position::new(0)).await.is_ok());
874 let mut mmr = mmr.into_dirty();
875 assert!(mmr.pop(1).await.is_ok());
876 assert_eq!(mmr.size(), 0);
877 let mut mmr = mmr.merkleize(&mut hasher);
878 mmr.sync().await.unwrap();
879
880 let mmr = Mmr::init(context.with_label("second"), &mut hasher, test_config())
881 .await
882 .unwrap();
883 assert_eq!(mmr.size(), 0);
884
885 let empty_proof = Proof::default();
886 let mut hasher: Standard<Sha256> = Standard::new();
887 let root = mmr.root();
888 assert!(empty_proof.verify_range_inclusion(
889 &mut hasher,
890 &[] as &[Digest],
891 Location::new_unchecked(0),
892 &root
893 ));
894 assert!(empty_proof.verify_multi_inclusion(
895 &mut hasher,
896 &[] as &[(Digest, Location)],
897 &root
898 ));
899
900 let mut mmr = mmr.into_dirty();
902 mmr.add(&mut hasher, &test_digest(0)).await.unwrap();
903 let mmr = mmr.merkleize(&mut hasher);
904 let root = mmr.root();
905 assert!(!empty_proof.verify_range_inclusion(
906 &mut hasher,
907 &[] as &[Digest],
908 Location::new_unchecked(0),
909 &root
910 ));
911 assert!(!empty_proof.verify_multi_inclusion(
912 &mut hasher,
913 &[] as &[(Digest, Location)],
914 &root
915 ));
916
917 mmr.destroy().await.unwrap();
918 });
919 }
920
921 #[test_traced]
922 fn test_journaled_mmr_pop() {
923 let executor = deterministic::Runner::default();
924 executor.start(|context| async move {
925 const NUM_ELEMENTS: u64 = 200;
926
927 let mut hasher: Standard<Sha256> = Standard::new();
928 let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
929 .await
930 .unwrap()
931 .into_dirty();
932
933 let mut c_hasher = Sha256::new();
934 for i in 0u64..NUM_ELEMENTS {
935 c_hasher.update(&i.to_be_bytes());
936 let element = c_hasher.finalize();
937 mmr.add(&mut hasher, &element).await.unwrap();
938 }
939
940 for i in (0..NUM_ELEMENTS).rev() {
942 assert!(mmr.pop(1).await.is_ok());
943 let clean_mmr = mmr.merkleize(&mut hasher);
944 let root = clean_mmr.root();
945 let mut reference_mmr = mem::DirtyMmr::new();
946 for j in 0..i {
947 c_hasher.update(&j.to_be_bytes());
948 let element = c_hasher.finalize();
949 reference_mmr.add(&mut hasher, &element);
950 }
951 let reference_mmr = reference_mmr.merkleize(&mut hasher, None);
952 assert_eq!(
953 root,
954 *reference_mmr.root(),
955 "root mismatch after pop at {i}"
956 );
957 mmr = clean_mmr.into_dirty();
958 }
959 assert!(matches!(mmr.pop(1).await, Err(Error::Empty)));
960 assert!(mmr.pop(0).await.is_ok());
961
962 for i in 0u64..NUM_ELEMENTS {
965 c_hasher.update(&i.to_be_bytes());
966 let element = c_hasher.finalize();
967 mmr.add(&mut hasher, &element).await.unwrap();
968 if i == 101 {
969 let mut clean_mmr = mmr.merkleize(&mut hasher);
970 clean_mmr.sync().await.unwrap();
971 mmr = clean_mmr.into_dirty();
972 }
973 }
974
975 for i in (0..NUM_ELEMENTS - 1).rev().step_by(2) {
976 assert!(mmr.pop(2).await.is_ok(), "at position {i:?}");
977 let clean_mmr = mmr.merkleize(&mut hasher);
978 let root = clean_mmr.root();
979 let reference_mmr = mem::CleanMmr::new(&mut hasher);
980 let reference_mmr = build_test_mmr(&mut hasher, reference_mmr, i);
981 assert_eq!(
982 root,
983 *reference_mmr.root(),
984 "root mismatch at position {i:?}"
985 );
986 mmr = clean_mmr.into_dirty();
987 }
988 assert!(matches!(mmr.pop(99).await, Err(Error::Empty)));
989
990 for i in 0u64..NUM_ELEMENTS {
992 c_hasher.update(&i.to_be_bytes());
993 let element = c_hasher.finalize();
994 mmr.add(&mut hasher, &element).await.unwrap();
995 if i == 101 {
996 let mut clean_mmr = mmr.merkleize(&mut hasher);
997 clean_mmr.sync().await.unwrap();
998 mmr = clean_mmr.into_dirty();
999 }
1000 }
1001 let mut mmr = mmr.merkleize(&mut hasher);
1002 let leaf_pos = Position::try_from(Location::new_unchecked(50)).unwrap();
1003 mmr.prune_to_pos(leaf_pos).await.unwrap();
1004 let mut mmr = mmr.into_dirty();
1006 mmr.pop(80).await.unwrap();
1007 let mmr = mmr.merkleize(&mut hasher);
1008 mmr.proof(Location::try_from(leaf_pos).unwrap())
1010 .await
1011 .unwrap();
1012 let mut mmr = mmr.into_dirty();
1014 while mmr.size() > leaf_pos {
1015 assert!(mmr.pop(1).await.is_ok());
1016 }
1017 assert!(matches!(mmr.pop(1).await, Err(Error::ElementPruned(_))));
1018
1019 let mut mmr = mmr.merkleize(&mut hasher);
1021 assert!(mmr.prune_to_pos(leaf_pos - 1).await.is_ok());
1022 assert_eq!(mmr.bounds().start, leaf_pos);
1023
1024 mmr.destroy().await.unwrap();
1025 });
1026 }
1027
1028 #[test_traced]
1029 fn test_journaled_mmr_basic() {
1030 let executor = deterministic::Runner::default();
1031 executor.start(|context| async move {
1032 let mut hasher: Standard<Sha256> = Standard::new();
1033 let mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1034 .await
1035 .unwrap();
1036 const LEAF_COUNT: usize = 255;
1038 let mut leaves = Vec::with_capacity(LEAF_COUNT);
1039 let mut positions = Vec::with_capacity(LEAF_COUNT);
1040 let mut mmr = mmr.into_dirty();
1041 for i in 0..LEAF_COUNT {
1042 let digest = test_digest(i);
1043 leaves.push(digest);
1044 let pos = mmr.add(&mut hasher, leaves.last().unwrap()).await.unwrap();
1045 positions.push(pos);
1046 }
1047 let mut mmr = mmr.merkleize(&mut hasher);
1048 assert_eq!(mmr.size(), Position::new(502));
1049 assert_eq!(mmr.journal_size, Position::new(0));
1050
1051 const TEST_ELEMENT: usize = 133;
1053 const TEST_ELEMENT_LOC: Location = Location::new_unchecked(TEST_ELEMENT as u64);
1054
1055 let proof = mmr.proof(TEST_ELEMENT_LOC).await.unwrap();
1056 let root = mmr.root();
1057 assert!(proof.verify_element_inclusion(
1058 &mut hasher,
1059 &leaves[TEST_ELEMENT],
1060 TEST_ELEMENT_LOC,
1061 &root,
1062 ));
1063
1064 mmr.sync().await.unwrap();
1066 assert_eq!(mmr.journal_size, Position::new(502));
1067 assert!(mmr.mem_mmr.bounds().is_empty());
1068
1069 let proof2 = mmr.proof(TEST_ELEMENT_LOC).await.unwrap();
1072 assert_eq!(proof, proof2);
1073
1074 let range = Location::new_unchecked(TEST_ELEMENT as u64)
1076 ..Location::new_unchecked(LEAF_COUNT as u64);
1077 let proof = mmr.range_proof(range.clone()).await.unwrap();
1078 assert!(proof.verify_range_inclusion(
1079 &mut hasher,
1080 &leaves[range.to_usize_range()],
1081 TEST_ELEMENT_LOC,
1082 &root
1083 ));
1084
1085 mmr.destroy().await.unwrap();
1086 });
1087 }
1088
1089 #[test_traced]
1090 fn test_journaled_mmr_recovery() {
1093 let executor = deterministic::Runner::default();
1094 executor.start(|context| async move {
1095 let mut hasher: Standard<Sha256> = Standard::new();
1096 let mut mmr = Mmr::init(context.with_label("first"), &mut hasher, test_config())
1097 .await
1098 .unwrap()
1099 .into_dirty();
1100 assert_eq!(mmr.size(), 0);
1101
1102 const LEAF_COUNT: usize = 252;
1104 let mut leaves = Vec::with_capacity(LEAF_COUNT);
1105 let mut positions = Vec::with_capacity(LEAF_COUNT);
1106 for i in 0..LEAF_COUNT {
1107 let digest = test_digest(i);
1108 leaves.push(digest);
1109 let pos = mmr.add(&mut hasher, leaves.last().unwrap()).await.unwrap();
1110 positions.push(pos);
1111 }
1112 let mut mmr = mmr.merkleize(&mut hasher);
1113 assert_eq!(mmr.size(), 498);
1114 let root = mmr.root();
1115 mmr.sync().await.unwrap();
1116 drop(mmr);
1117
1118 let partition: String = "journal_partition-blobs".into();
1122 let (blob, len) = context
1123 .open(&partition, &71u64.to_be_bytes())
1124 .await
1125 .expect("Failed to open blob");
1126 assert_eq!(len, PAGE_SIZE.get() as u64 + 12);
1128
1129 blob.resize(len - 1).await.expect("Failed to corrupt blob");
1131 blob.sync().await.expect("Failed to sync blob");
1132
1133 let mmr = Mmr::init(context.with_label("second"), &mut hasher, test_config())
1134 .await
1135 .unwrap();
1136 assert_eq!(mmr.size(), 498);
1139 assert_eq!(mmr.root(), root);
1140
1141 drop(mmr);
1143 let mmr = Mmr::init(context.with_label("third"), &mut hasher, test_config())
1144 .await
1145 .unwrap();
1146 assert_eq!(mmr.size(), 498);
1147
1148 mmr.destroy().await.unwrap();
1149 });
1150 }
1151
1152 #[test_traced]
1153 fn test_journaled_mmr_pruning() {
1154 let executor = deterministic::Runner::default();
1155 executor.start(|context| async move {
1156 let mut hasher: Standard<Sha256> = Standard::new();
1157 const LEAF_COUNT: usize = 2000;
1159 let cfg_pruned = test_config();
1160 let pruned_mmr = Mmr::init(
1161 context.with_label("pruned"),
1162 &mut hasher,
1163 cfg_pruned.clone(),
1164 )
1165 .await
1166 .unwrap();
1167 let cfg_unpruned = Config {
1168 journal_partition: "unpruned_journal_partition".into(),
1169 metadata_partition: "unpruned_metadata_partition".into(),
1170 items_per_blob: NZU64!(7),
1171 write_buffer: NZUsize!(1024),
1172 thread_pool: None,
1173 page_cache: cfg_pruned.page_cache.clone(),
1174 };
1175 let mut mmr = Mmr::init(context.with_label("unpruned"), &mut hasher, cfg_unpruned)
1176 .await
1177 .unwrap()
1178 .into_dirty();
1179 let mut leaves = Vec::with_capacity(LEAF_COUNT);
1180 let mut positions = Vec::with_capacity(LEAF_COUNT);
1181 let mut pruned_mmr = pruned_mmr.into_dirty();
1182 for i in 0..LEAF_COUNT {
1183 let digest = test_digest(i);
1184 leaves.push(digest);
1185 let last_leaf = leaves.last().unwrap();
1186 let pos = mmr.add(&mut hasher, last_leaf).await.unwrap();
1187 positions.push(pos);
1188 pruned_mmr.add(&mut hasher, last_leaf).await.unwrap();
1189 }
1190 let mut mmr = mmr.merkleize(&mut hasher);
1191 let mut pruned_mmr = pruned_mmr.merkleize(&mut hasher);
1192 assert_eq!(mmr.size(), 3994);
1193 assert_eq!(pruned_mmr.size(), 3994);
1194
1195 for i in 0usize..300 {
1198 let prune_pos = i as u64 * 10;
1199 pruned_mmr
1200 .prune_to_pos(Position::new(prune_pos))
1201 .await
1202 .unwrap();
1203 assert_eq!(prune_pos, pruned_mmr.bounds().start);
1204
1205 let digest = test_digest(LEAF_COUNT + i);
1206 leaves.push(digest);
1207 let last_leaf = leaves.last().unwrap();
1208 let mut dirty_pruned_mmr = pruned_mmr.into_dirty();
1209 let pos = dirty_pruned_mmr.add(&mut hasher, last_leaf).await.unwrap();
1210 pruned_mmr = dirty_pruned_mmr.merkleize(&mut hasher);
1211 positions.push(pos);
1212 let mut dirty_mmr = mmr.into_dirty();
1213 dirty_mmr.add(&mut hasher, last_leaf).await.unwrap();
1214 mmr = dirty_mmr.merkleize(&mut hasher);
1215 assert_eq!(pruned_mmr.root(), mmr.root());
1216 }
1217
1218 pruned_mmr.sync().await.unwrap();
1220 assert_eq!(pruned_mmr.root(), mmr.root());
1221
1222 pruned_mmr.sync().await.unwrap();
1224 drop(pruned_mmr);
1225 let mut pruned_mmr = Mmr::init(
1226 context.with_label("pruned_reopen"),
1227 &mut hasher,
1228 cfg_pruned.clone(),
1229 )
1230 .await
1231 .unwrap();
1232 assert_eq!(pruned_mmr.root(), mmr.root());
1233
1234 let size = pruned_mmr.size();
1236 pruned_mmr.prune_all().await.unwrap();
1237 assert_eq!(pruned_mmr.root(), mmr.root());
1238 let bounds = pruned_mmr.bounds();
1239 assert!(bounds.is_empty());
1240 assert_eq!(bounds.start, size);
1241
1242 let mut mmr = mmr.into_dirty();
1245 mmr.add(&mut hasher, &test_digest(LEAF_COUNT))
1246 .await
1247 .unwrap();
1248 let mmr = mmr.merkleize(&mut hasher);
1249 let mut dirty_pruned = pruned_mmr.into_dirty();
1250 dirty_pruned
1251 .add(&mut hasher, &test_digest(LEAF_COUNT))
1252 .await
1253 .unwrap();
1254 let mut pruned_mmr = dirty_pruned.merkleize(&mut hasher);
1255 assert!(*pruned_mmr.size() % cfg_pruned.items_per_blob != 0);
1256 pruned_mmr.sync().await.unwrap();
1257 drop(pruned_mmr);
1258 let mut pruned_mmr = Mmr::init(
1259 context.with_label("pruned_reopen2"),
1260 &mut hasher,
1261 cfg_pruned.clone(),
1262 )
1263 .await
1264 .unwrap();
1265 assert_eq!(pruned_mmr.root(), mmr.root());
1266 let bounds = pruned_mmr.bounds();
1267 assert!(!bounds.is_empty());
1268 assert_eq!(bounds.start, size);
1269
1270 assert!(pruned_mmr.prune_to_pos(size - 1).await.is_ok());
1272 assert_eq!(pruned_mmr.bounds().start, size);
1273
1274 while *pruned_mmr.size() % cfg_pruned.items_per_blob != 0 {
1277 let mut dirty_pruned_mmr = pruned_mmr.into_dirty();
1278 dirty_pruned_mmr
1279 .add(&mut hasher, &test_digest(LEAF_COUNT))
1280 .await
1281 .unwrap();
1282 pruned_mmr = dirty_pruned_mmr.merkleize(&mut hasher);
1283 }
1284 pruned_mmr.prune_all().await.unwrap();
1285 assert!(pruned_mmr.bounds().is_empty());
1286
1287 pruned_mmr.destroy().await.unwrap();
1288 mmr.destroy().await.unwrap();
1289 });
1290 }
1291
1292 #[test_traced("WARN")]
1293 fn test_journaled_mmr_recovery_with_pruning() {
1295 let executor = deterministic::Runner::default();
1296 executor.start(|context| async move {
1297 let mut hasher: Standard<Sha256> = Standard::new();
1299 const LEAF_COUNT: usize = 2000;
1300 let mut mmr = Mmr::init(context.with_label("init"), &mut hasher, test_config())
1301 .await
1302 .unwrap()
1303 .into_dirty();
1304 let mut leaves = Vec::with_capacity(LEAF_COUNT);
1305 let mut positions = Vec::with_capacity(LEAF_COUNT);
1306 for i in 0..LEAF_COUNT {
1307 let digest = test_digest(i);
1308 leaves.push(digest);
1309 let last_leaf = leaves.last().unwrap();
1310 let pos = mmr.add(&mut hasher, last_leaf).await.unwrap();
1311 positions.push(pos);
1312 }
1313 let mut mmr = mmr.merkleize(&mut hasher);
1314 assert_eq!(mmr.size(), 3994);
1315 mmr.sync().await.unwrap();
1316 drop(mmr);
1317
1318 for i in 0usize..200 {
1320 let label = format!("iter_{i}");
1321 let mut mmr = Mmr::init(context.with_label(&label), &mut hasher, test_config())
1322 .await
1323 .unwrap();
1324 let start_size = mmr.size();
1325 let prune_pos = std::cmp::min(i as u64 * 50, *start_size);
1326 let prune_pos = Position::new(prune_pos);
1327 if i % 5 == 0 {
1328 mmr.simulate_pruning_failure(prune_pos).await.unwrap();
1329 continue;
1330 }
1331 mmr.prune_to_pos(prune_pos).await.unwrap();
1332
1333 for j in 0..10 {
1335 let digest = test_digest(100 * (i + 1) + j);
1336 leaves.push(digest);
1337 let last_leaf = leaves.last().unwrap();
1338 let mut dirty_mmr = mmr.into_dirty();
1339 let pos = dirty_mmr.add(&mut hasher, last_leaf).await.unwrap();
1340 positions.push(pos);
1341 dirty_mmr.add(&mut hasher, last_leaf).await.unwrap();
1342 mmr = dirty_mmr.merkleize(&mut hasher);
1343 assert_eq!(mmr.root(), mmr.root());
1344 let digest = test_digest(LEAF_COUNT + i);
1345 leaves.push(digest);
1346 let last_leaf = leaves.last().unwrap();
1347 let mut dirty_mmr = mmr.into_dirty();
1348 let pos = dirty_mmr.add(&mut hasher, last_leaf).await.unwrap();
1349 positions.push(pos);
1350 dirty_mmr.add(&mut hasher, last_leaf).await.unwrap();
1351 mmr = dirty_mmr.merkleize(&mut hasher);
1352 }
1353 let end_size = mmr.size();
1354 let total_to_write = (*end_size - *start_size) as usize;
1355 let partial_write_limit = i % total_to_write;
1356 mmr.simulate_partial_sync(partial_write_limit)
1357 .await
1358 .unwrap();
1359 }
1360
1361 let mmr = Mmr::init(context.with_label("final"), &mut hasher, test_config())
1362 .await
1363 .unwrap();
1364 mmr.destroy().await.unwrap();
1365 });
1366 }
1367
1368 #[test_traced]
1369 fn test_journaled_mmr_historical_range_proof_basic() {
1370 let executor = deterministic::Runner::default();
1371 executor.start(|context| async move {
1372 let mut hasher = Standard::<Sha256>::new();
1374 let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1375 .await
1376 .unwrap()
1377 .into_dirty();
1378 let mut elements = Vec::new();
1379 let mut positions = Vec::new();
1380 for i in 0..10 {
1381 elements.push(test_digest(i));
1382 positions.push(mmr.add(&mut hasher, &elements[i]).await.unwrap());
1383 }
1384 let mmr = mmr.merkleize(&mut hasher);
1385 let original_leaves = mmr.leaves();
1386
1387 let historical_proof = mmr
1389 .historical_range_proof(
1390 original_leaves,
1391 Location::new_unchecked(2)..Location::new_unchecked(6),
1392 )
1393 .await
1394 .unwrap();
1395 assert_eq!(historical_proof.leaves, original_leaves);
1396 let root = mmr.root();
1397 assert!(historical_proof.verify_range_inclusion(
1398 &mut hasher,
1399 &elements[2..6],
1400 Location::new_unchecked(2),
1401 &root
1402 ));
1403 let regular_proof = mmr
1404 .range_proof(Location::new_unchecked(2)..Location::new_unchecked(6))
1405 .await
1406 .unwrap();
1407 assert_eq!(regular_proof.leaves, historical_proof.leaves);
1408 assert_eq!(regular_proof.digests, historical_proof.digests);
1409
1410 let mut mmr = mmr.into_dirty();
1412 for i in 10..20 {
1413 elements.push(test_digest(i));
1414 positions.push(mmr.add(&mut hasher, &elements[i]).await.unwrap());
1415 }
1416 let mmr = mmr.merkleize(&mut hasher);
1417 let new_historical_proof = mmr
1418 .historical_range_proof(
1419 original_leaves,
1420 Location::new_unchecked(2)..Location::new_unchecked(6),
1421 )
1422 .await
1423 .unwrap();
1424 assert_eq!(new_historical_proof.leaves, historical_proof.leaves);
1425 assert_eq!(new_historical_proof.digests, historical_proof.digests);
1426
1427 mmr.destroy().await.unwrap();
1428 });
1429 }
1430
1431 #[test_traced]
1432 fn test_journaled_mmr_historical_range_proof_with_pruning() {
1433 let executor = deterministic::Runner::default();
1434 executor.start(|context| async move {
1435 let mut hasher = Standard::<Sha256>::new();
1436 let mmr = Mmr::init(context.with_label("main"), &mut hasher, test_config())
1437 .await
1438 .unwrap();
1439
1440 let mut elements = Vec::new();
1442 let mut positions = Vec::new();
1443 let mut mmr = mmr.into_dirty();
1444 for i in 0..50 {
1445 elements.push(test_digest(i));
1446 positions.push(mmr.add(&mut hasher, &elements[i]).await.unwrap());
1447 }
1448 let mut mmr = mmr.merkleize(&mut hasher);
1449
1450 let prune_pos = Position::new(30);
1452 mmr.prune_to_pos(prune_pos).await.unwrap();
1453
1454 let ref_mmr = Mmr::init(
1456 context.with_label("ref"),
1457 &mut hasher,
1458 Config {
1459 journal_partition: "ref_journal_pruned".into(),
1460 metadata_partition: "ref_metadata_pruned".into(),
1461 items_per_blob: NZU64!(7),
1462 write_buffer: NZUsize!(1024),
1463 thread_pool: None,
1464 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1465 },
1466 )
1467 .await
1468 .unwrap();
1469
1470 let mut ref_mmr = ref_mmr.into_dirty();
1471 for elt in elements.iter().take(41) {
1472 ref_mmr.add(&mut hasher, elt).await.unwrap();
1473 }
1474 let ref_mmr = ref_mmr.merkleize(&mut hasher);
1475 let historical_leaves = ref_mmr.leaves();
1476 let historical_root = ref_mmr.root();
1477
1478 let historical_proof = mmr
1480 .historical_range_proof(
1481 historical_leaves,
1482 Location::new_unchecked(35)..Location::new_unchecked(39), )
1484 .await
1485 .unwrap();
1486
1487 assert_eq!(historical_proof.leaves, historical_leaves);
1488
1489 assert!(historical_proof.verify_range_inclusion(
1491 &mut hasher,
1492 &elements[35..39],
1493 Location::new_unchecked(35),
1494 &historical_root
1495 ));
1496
1497 ref_mmr.destroy().await.unwrap();
1498 mmr.destroy().await.unwrap();
1499 });
1500 }
1501
1502 #[test_traced]
1503 fn test_journaled_mmr_historical_range_proof_large() {
1504 let executor = deterministic::Runner::default();
1505 executor.start(|context| async move {
1506 let mut hasher = Standard::<Sha256>::new();
1507
1508 let mmr = Mmr::init(
1509 context.with_label("server"),
1510 &mut hasher,
1511 Config {
1512 journal_partition: "server_journal".into(),
1513 metadata_partition: "server_metadata".into(),
1514 items_per_blob: NZU64!(7),
1515 write_buffer: NZUsize!(1024),
1516 thread_pool: None,
1517 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1518 },
1519 )
1520 .await
1521 .unwrap();
1522
1523 let mut elements = Vec::new();
1524 let mut positions = Vec::new();
1525 let mut mmr = mmr.into_dirty();
1526 for i in 0..100 {
1527 elements.push(test_digest(i));
1528 positions.push(mmr.add(&mut hasher, &elements[i]).await.unwrap());
1529 }
1530 let mmr = mmr.merkleize(&mut hasher);
1531
1532 let range = Location::new_unchecked(30)..Location::new_unchecked(61);
1533
1534 let ref_mmr = Mmr::init(
1536 context.with_label("client"),
1537 &mut hasher,
1538 Config {
1539 journal_partition: "client_journal".into(),
1540 metadata_partition: "client_metadata".into(),
1541 items_per_blob: NZU64!(7),
1542 write_buffer: NZUsize!(1024),
1543 thread_pool: None,
1544 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1545 },
1546 )
1547 .await
1548 .unwrap();
1549
1550 let mut ref_mmr = ref_mmr.into_dirty();
1552 for elt in elements.iter().take(*range.end as usize) {
1553 ref_mmr.add(&mut hasher, elt).await.unwrap();
1554 }
1555 let ref_mmr = ref_mmr.merkleize(&mut hasher);
1556 let historical_leaves = ref_mmr.leaves();
1557 let expected_root = ref_mmr.root();
1558
1559 let proof = mmr
1561 .historical_range_proof(historical_leaves, range.clone())
1562 .await
1563 .unwrap();
1564
1565 assert!(proof.verify_range_inclusion(
1566 &mut hasher,
1567 &elements[range.to_usize_range()],
1568 range.start,
1569 &expected_root ));
1571
1572 ref_mmr.destroy().await.unwrap();
1573 mmr.destroy().await.unwrap();
1574 });
1575 }
1576
1577 #[test_traced]
1578 fn test_journaled_mmr_historical_range_proof_singleton() {
1579 let executor = deterministic::Runner::default();
1580 executor.start(|context| async move {
1581 let mut hasher = Standard::<Sha256>::new();
1582 let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1583 .await
1584 .unwrap()
1585 .into_dirty();
1586
1587 let element = test_digest(0);
1588 mmr.add(&mut hasher, &element).await.unwrap();
1589 let mmr = mmr.merkleize(&mut hasher);
1590
1591 let single_proof = mmr
1593 .historical_range_proof(
1594 Location::new_unchecked(1),
1595 Location::new_unchecked(0)..Location::new_unchecked(1),
1596 )
1597 .await
1598 .unwrap();
1599
1600 let root = mmr.root();
1601 assert!(single_proof.verify_range_inclusion(
1602 &mut hasher,
1603 &[element],
1604 Location::new_unchecked(0),
1605 &root
1606 ));
1607
1608 mmr.destroy().await.unwrap();
1609 });
1610 }
1611
1612 #[test_traced]
1614 fn test_journaled_mmr_init_sync_empty() {
1615 let executor = deterministic::Runner::default();
1616 executor.start(|context| async move {
1617 let mut hasher = Standard::<Sha256>::new();
1618
1619 let sync_cfg = SyncConfig::<sha256::Digest> {
1621 config: test_config(),
1622 range: Position::new(0)..Position::new(100),
1623 pinned_nodes: None,
1624 };
1625
1626 let sync_mmr = Mmr::init_sync(context.clone(), sync_cfg, &mut hasher)
1627 .await
1628 .unwrap();
1629
1630 assert_eq!(sync_mmr.size(), 0);
1632 let bounds = sync_mmr.bounds();
1633 assert_eq!(bounds.start, 0);
1634 assert!(bounds.is_empty());
1635
1636 let new_element = test_digest(999);
1638 let mut sync_mmr = sync_mmr.into_dirty();
1639 sync_mmr.add(&mut hasher, &new_element).await.unwrap();
1640 let sync_mmr = sync_mmr.merkleize(&mut hasher);
1641
1642 let _root = sync_mmr.root();
1644
1645 sync_mmr.destroy().await.unwrap();
1646 });
1647 }
1648
1649 #[test_traced]
1651 fn test_journaled_mmr_init_sync_nonempty_exact_match() {
1652 let executor = deterministic::Runner::default();
1653 executor.start(|context| async move {
1654 let mut hasher = Standard::<Sha256>::new();
1655
1656 let mmr = Mmr::init(context.with_label("init"), &mut hasher, test_config())
1658 .await
1659 .unwrap();
1660 let mut mmr = mmr.into_dirty();
1661 for i in 0..50 {
1662 mmr.add(&mut hasher, &test_digest(i)).await.unwrap();
1663 }
1664 let mut mmr = mmr.merkleize(&mut hasher);
1665 mmr.sync().await.unwrap();
1666 let original_size = mmr.size();
1667 let original_leaves = mmr.leaves();
1668 let original_root = mmr.root();
1669
1670 let lower_bound_pos = mmr.bounds().start;
1672 let upper_bound_pos = mmr.size();
1673 let mut expected_nodes = BTreeMap::new();
1674 for i in *lower_bound_pos..*upper_bound_pos {
1675 expected_nodes.insert(
1676 Position::new(i),
1677 mmr.get_node(Position::new(i)).await.unwrap().unwrap(),
1678 );
1679 }
1680 let sync_cfg = SyncConfig::<sha256::Digest> {
1681 config: test_config(),
1682 range: lower_bound_pos..upper_bound_pos,
1683 pinned_nodes: None,
1684 };
1685
1686 mmr.sync().await.unwrap();
1687 drop(mmr);
1688
1689 let sync_mmr = Mmr::init_sync(context.with_label("sync"), sync_cfg, &mut hasher)
1690 .await
1691 .unwrap();
1692
1693 assert_eq!(sync_mmr.size(), original_size);
1695 assert_eq!(sync_mmr.leaves(), original_leaves);
1696 let bounds = sync_mmr.bounds();
1697 assert_eq!(bounds.start, lower_bound_pos);
1698 assert!(!bounds.is_empty());
1699 assert_eq!(sync_mmr.root(), original_root);
1700 for pos in *lower_bound_pos..*upper_bound_pos {
1701 let pos = Position::new(pos);
1702 assert_eq!(
1703 sync_mmr.get_node(pos).await.unwrap(),
1704 expected_nodes.get(&pos).cloned()
1705 );
1706 }
1707
1708 sync_mmr.destroy().await.unwrap();
1709 });
1710 }
1711
1712 #[test_traced]
1714 fn test_journaled_mmr_init_sync_partial_overlap() {
1715 let executor = deterministic::Runner::default();
1716 executor.start(|context| async move {
1717 let mut hasher = Standard::<Sha256>::new();
1718
1719 let mmr = Mmr::init(context.with_label("init"), &mut hasher, test_config())
1721 .await
1722 .unwrap();
1723 let mut mmr = mmr.into_dirty();
1724 for i in 0..30 {
1725 mmr.add(&mut hasher, &test_digest(i)).await.unwrap();
1726 }
1727 let mut mmr = mmr.merkleize(&mut hasher);
1728 mmr.sync().await.unwrap();
1729 mmr.prune_to_pos(Position::new(10)).await.unwrap();
1730
1731 let original_size = mmr.size();
1732 let original_root = mmr.root();
1733 let original_pruned_to = mmr.bounds().start;
1734
1735 let lower_bound_pos = original_pruned_to;
1737 let upper_bound_pos = original_size + 11; let mut expected_nodes = BTreeMap::new();
1740 for pos in *lower_bound_pos..*original_size {
1741 let pos = Position::new(pos);
1742 expected_nodes.insert(pos, mmr.get_node(pos).await.unwrap().unwrap());
1743 }
1744
1745 let sync_cfg = SyncConfig::<sha256::Digest> {
1746 config: test_config(),
1747 range: lower_bound_pos..upper_bound_pos,
1748 pinned_nodes: None,
1749 };
1750
1751 mmr.sync().await.unwrap();
1752 drop(mmr);
1753
1754 let sync_mmr = Mmr::init_sync(context.with_label("sync"), sync_cfg, &mut hasher)
1755 .await
1756 .unwrap();
1757
1758 assert_eq!(sync_mmr.size(), original_size);
1760 let bounds = sync_mmr.bounds();
1761 assert_eq!(bounds.start, lower_bound_pos);
1762 assert!(!bounds.is_empty());
1763 assert_eq!(sync_mmr.root(), original_root);
1764
1765 for pos in *lower_bound_pos..*original_size {
1767 let pos = Position::new(pos);
1768 assert_eq!(
1769 sync_mmr.get_node(pos).await.unwrap(),
1770 expected_nodes.get(&pos).cloned()
1771 );
1772 }
1773
1774 sync_mmr.destroy().await.unwrap();
1775 });
1776 }
1777
1778 #[test_traced("WARN")]
1782 fn test_journaled_mmr_init_stale_metadata_returns_error() {
1783 let executor = deterministic::Runner::default();
1784 executor.start(|context| async move {
1785 let mut hasher = Standard::<Sha256>::new();
1786
1787 let mmr = Mmr::init(context.with_label("init"), &mut hasher, test_config())
1789 .await
1790 .unwrap();
1791
1792 let mut mmr = mmr.into_dirty();
1794 for i in 0..50 {
1795 mmr.add(&mut hasher, &test_digest(i)).await.unwrap();
1796 }
1797 let mut mmr = mmr.merkleize(&mut hasher);
1798 mmr.sync().await.unwrap();
1799
1800 let prune_pos = Position::new(20);
1802 mmr.prune_to_pos(prune_pos).await.unwrap();
1803 drop(mmr);
1804
1805 let meta_cfg = MConfig {
1807 partition: test_config().metadata_partition,
1808 codec_config: ((0..).into(), ()),
1809 };
1810 let mut metadata =
1811 Metadata::<_, U64, Vec<u8>>::init(context.with_label("meta_tamper"), meta_cfg)
1812 .await
1813 .unwrap();
1814
1815 let key = U64::new(PRUNE_TO_POS_PREFIX, 0);
1817 metadata.put(key, 0u64.to_be_bytes().to_vec());
1818 metadata.sync().await.unwrap();
1819 drop(metadata);
1820
1821 let result = CleanMmr::<_, Digest>::init(
1826 context.with_label("reopened"),
1827 &mut hasher,
1828 test_config(),
1829 )
1830 .await;
1831
1832 match result {
1833 Err(Error::MissingNode(_)) => {} Ok(_) => panic!("expected MissingNode error, got Ok"),
1835 Err(e) => panic!("expected MissingNode error, got {:?}", e),
1836 }
1837 });
1838 }
1839
1840 #[test_traced("WARN")]
1844 fn test_journaled_mmr_init_metadata_ahead() {
1845 let executor = deterministic::Runner::default();
1846 executor.start(|context| async move {
1847 let mut hasher = Standard::<Sha256>::new();
1848
1849 let mut mmr = Mmr::init(context.with_label("init"), &mut hasher, test_config())
1851 .await
1852 .unwrap()
1853 .into_dirty();
1854
1855 for i in 0..50 {
1857 mmr.add(&mut hasher, &test_digest(i)).await.unwrap();
1858 }
1859 let mut mmr = mmr.merkleize(&mut hasher);
1860 mmr.sync().await.unwrap();
1861
1862 let prune_pos = Position::new(30);
1864 mmr.prune_to_pos(prune_pos).await.unwrap();
1865 let expected_root = mmr.root();
1866 let expected_size = mmr.size();
1867 drop(mmr);
1868
1869 let mmr = Mmr::init(context.with_label("reopened"), &mut hasher, test_config())
1872 .await
1873 .unwrap();
1874
1875 assert_eq!(mmr.bounds().start, prune_pos);
1876 assert_eq!(mmr.size(), expected_size);
1877 assert_eq!(mmr.root(), expected_root);
1878
1879 mmr.destroy().await.unwrap();
1880 });
1881 }
1882
1883 #[test_traced]
1890 fn test_journaled_mmr_init_sync_computes_pinned_nodes_before_pruning() {
1891 let executor = deterministic::Runner::default();
1892 executor.start(|context| async move {
1893 let mut hasher = Standard::<Sha256>::new();
1894
1895 let cfg = Config {
1897 journal_partition: "mmr_journal".to_string(),
1898 metadata_partition: "mmr_metadata".to_string(),
1899 items_per_blob: NZU64!(7),
1900 write_buffer: NZUsize!(64),
1901 thread_pool: None,
1902 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
1903 };
1904
1905 let mmr = Mmr::init(context.with_label("init"), &mut hasher, cfg.clone())
1907 .await
1908 .unwrap();
1909 let mut mmr = mmr.into_dirty();
1910 for i in 0..100 {
1911 mmr.add(&mut hasher, &test_digest(i)).await.unwrap();
1912 }
1913 let mut mmr = mmr.merkleize(&mut hasher);
1914 mmr.sync().await.unwrap();
1915
1916 let original_size = mmr.size();
1919 let original_root = mmr.root();
1920 drop(mmr);
1921
1922 let prune_pos = Position::new(50);
1925 let sync_cfg = SyncConfig::<sha256::Digest> {
1926 config: cfg,
1927 range: prune_pos..Position::new(200),
1928 pinned_nodes: None, };
1930
1931 let sync_mmr = Mmr::init_sync(context.with_label("sync"), sync_cfg, &mut hasher)
1932 .await
1933 .unwrap();
1934
1935 assert_eq!(sync_mmr.size(), original_size);
1937 assert_eq!(sync_mmr.root(), original_root);
1938 assert_eq!(sync_mmr.bounds().start, prune_pos);
1939
1940 sync_mmr.destroy().await.unwrap();
1941 });
1942 }
1943}