1use crate::{
8 journal::{
9 contiguous::fixed::{Config as JConfig, Journal},
10 Error as JError,
11 },
12 metadata::{Config as MConfig, Metadata},
13 mmr::{
14 hasher::Hasher,
15 iterator::{nodes_to_pin, PeakIterator},
16 location::Location,
17 mem::{
18 Clean, CleanMmr as CleanMemMmr, Config as MemConfig, Dirty, DirtyMmr as DirtyMemMmr,
19 Mmr as MemMmr, State,
20 },
21 position::Position,
22 storage::Storage,
23 verification,
24 Error::{self, *},
25 Proof,
26 },
27 qmdb::any::unordered::sync::{init_journal, init_journal_at_size},
28};
29use commonware_codec::DecodeExt;
30use commonware_cryptography::Digest;
31use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage as RStorage, ThreadPool};
32use commonware_utils::sequence::prefixed_u64::U64;
33use core::ops::Range;
34use std::{
35 collections::BTreeMap,
36 num::{NonZeroU64, NonZeroUsize},
37};
38use tracing::{debug, error, warn};
39
40pub type DirtyMmr<E, D> = Mmr<E, D, Dirty>;
41pub type CleanMmr<E, D> = Mmr<E, D, Clean<D>>;
42
43#[derive(Clone)]
45pub struct Config {
46 pub journal_partition: String,
49
50 pub metadata_partition: String,
54
55 pub items_per_blob: NonZeroU64,
57
58 pub write_buffer: NonZeroUsize,
60
61 pub thread_pool: Option<ThreadPool>,
63
64 pub buffer_pool: PoolRef,
66}
67
68pub struct SyncConfig<D: Digest> {
75 pub config: Config,
77
78 pub range: std::ops::Range<Position>,
80
81 pub pinned_nodes: Option<Vec<D>>,
85}
86
87pub struct Mmr<E: RStorage + Clock + Metrics, D: Digest, S: State<D> = Dirty> {
89 mem_mmr: MemMmr<D, S>,
93
94 journal: Journal<E, D>,
96
97 journal_size: Position,
100
101 metadata: Metadata<E, U64, Vec<u8>>,
105
106 pruned_to_pos: Position,
109
110 pool: Option<ThreadPool>,
112}
113
114impl<E: RStorage + Clock + Metrics, D: Digest> From<CleanMmr<E, D>> for DirtyMmr<E, D> {
115 fn from(clean: Mmr<E, D, Clean<D>>) -> Self {
116 Self {
117 mem_mmr: clean.mem_mmr.into(),
118 journal: clean.journal,
119 journal_size: clean.journal_size,
120 metadata: clean.metadata,
121 pruned_to_pos: clean.pruned_to_pos,
122 pool: clean.pool,
123 }
124 }
125}
126
127const NODE_PREFIX: u8 = 0;
129
130const PRUNE_TO_POS_PREFIX: u8 = 1;
132
133impl<E: RStorage + Clock + Metrics, D: Digest, S: State<D>> Mmr<E, D, S> {
134 pub fn size(&self) -> Position {
137 self.mem_mmr.size()
138 }
139
140 pub fn leaves(&self) -> Location {
142 self.mem_mmr.leaves()
143 }
144
145 pub fn last_leaf_pos(&self) -> Option<Position> {
147 self.mem_mmr.last_leaf_pos()
148 }
149
150 async fn get_from_metadata_or_journal(
154 metadata: &Metadata<E, U64, Vec<u8>>,
155 journal: &Journal<E, D>,
156 pos: Position,
157 ) -> Result<D, Error> {
158 if let Some(bytes) = metadata.get(&U64::new(NODE_PREFIX, *pos)) {
159 debug!(?pos, "read node from metadata");
160 let digest = D::decode(bytes.as_ref());
161 let Ok(digest) = digest else {
162 error!(
163 ?pos,
164 err = %digest.expect_err("digest is Err in else branch"),
165 "could not convert node from metadata bytes to digest"
166 );
167 return Err(Error::MissingNode(pos));
168 };
169 return Ok(digest);
170 }
171
172 debug!(?pos, "reading node from journal");
174 let node = journal.read(*pos).await;
175 match node {
176 Ok(node) => Ok(node),
177 Err(JError::ItemPruned(_)) => {
178 error!(?pos, "node is missing from metadata and journal");
179 Err(Error::MissingNode(pos))
180 }
181 Err(e) => Err(Error::JournalError(e)),
182 }
183 }
184
185 pub async fn add(&mut self, h: &mut impl Hasher<D>, element: &[u8]) -> Result<Position, Error> {
188 Ok(self.mem_mmr.add(h, element))
189 }
190
191 pub const fn pruned_to_pos(&self) -> Position {
194 self.pruned_to_pos
195 }
196
197 pub fn oldest_retained_pos(&self) -> Option<Position> {
199 if self.pruned_to_pos == self.size() {
200 return None;
201 }
202
203 Some(self.pruned_to_pos)
204 }
205
206 async fn add_extra_pinned_nodes(
208 mem_mmr: &mut MemMmr<D, S>,
209 metadata: &Metadata<E, U64, Vec<u8>>,
210 journal: &Journal<E, D>,
211 prune_pos: Position,
212 ) -> Result<(), Error> {
213 let mut pinned_nodes = BTreeMap::new();
214 for pos in nodes_to_pin(prune_pos) {
215 let digest =
216 Mmr::<E, D, Clean<D>>::get_from_metadata_or_journal(metadata, journal, pos).await?;
217 pinned_nodes.insert(pos, digest);
218 }
219 mem_mmr.add_pinned_nodes(pinned_nodes);
220
221 Ok(())
222 }
223}
224
225impl<E: RStorage + Clock + Metrics, D: Digest> CleanMmr<E, D> {
226 pub async fn init_from_pinned_nodes(
242 context: E,
243 pinned_nodes: Vec<D>,
244 mmr_size: Position,
245 config: Config,
246 hasher: &mut impl Hasher<D>,
247 ) -> Result<Self, Error> {
248 context.remove(&config.journal_partition, None).await.ok();
250 context.remove(&config.metadata_partition, None).await.ok();
251
252 let journal_cfg = JConfig {
254 partition: config.journal_partition.clone(),
255 items_per_blob: config.items_per_blob,
256 buffer_pool: config.buffer_pool.clone(),
257 write_buffer: config.write_buffer,
258 };
259 let journal =
260 init_journal_at_size(context.with_label("mmr_journal"), journal_cfg, *mmr_size).await?;
261
262 let metadata_cfg = MConfig {
264 partition: config.metadata_partition.clone(),
265 codec_config: ((0..).into(), ()),
266 };
267 let mut metadata = Metadata::init(context.with_label("mmr_metadata"), metadata_cfg).await?;
268
269 let pruning_boundary_key = U64::new(PRUNE_TO_POS_PREFIX, 0);
271 metadata.put(pruning_boundary_key, mmr_size.to_be_bytes().into());
272
273 let nodes_to_pin_positions = nodes_to_pin(mmr_size);
275 for (pos, digest) in nodes_to_pin_positions.zip(pinned_nodes.iter()) {
276 metadata.put(U64::new(NODE_PREFIX, *pos), digest.to_vec());
277 }
278
279 metadata.sync().await.map_err(Error::MetadataError)?;
281
282 let mem_mmr = MemMmr::init(
284 MemConfig {
285 nodes: vec![],
286 pruned_to_pos: mmr_size,
287 pinned_nodes,
288 },
289 hasher,
290 )?;
291
292 Ok(Self {
293 mem_mmr,
294 journal,
295 journal_size: mmr_size,
296 metadata,
297 pruned_to_pos: mmr_size,
298 pool: config.thread_pool,
299 })
300 }
301
302 pub async fn init(context: E, hasher: &mut impl Hasher<D>, cfg: Config) -> Result<Self, Error> {
304 let journal_cfg = JConfig {
305 partition: cfg.journal_partition,
306 items_per_blob: cfg.items_per_blob,
307 buffer_pool: cfg.buffer_pool,
308 write_buffer: cfg.write_buffer,
309 };
310 let mut journal =
311 Journal::<E, D>::init(context.with_label("mmr_journal"), journal_cfg).await?;
312 let mut journal_size = Position::new(journal.size());
313
314 let metadata_cfg = MConfig {
315 partition: cfg.metadata_partition,
316 codec_config: ((0..).into(), ()),
317 };
318 let metadata =
319 Metadata::<_, U64, Vec<u8>>::init(context.with_label("mmr_metadata"), metadata_cfg)
320 .await?;
321
322 if journal_size == 0 {
323 let mem_mmr = MemMmr::init(
324 MemConfig {
325 nodes: vec![],
326 pruned_to_pos: Position::new(0),
327 pinned_nodes: vec![],
328 },
329 hasher,
330 )?;
331 return Ok(Self {
332 mem_mmr,
333 journal,
334 journal_size,
335 metadata,
336 pruned_to_pos: Position::new(0),
337 pool: cfg.thread_pool,
338 });
339 }
340
341 let key: U64 = U64::new(PRUNE_TO_POS_PREFIX, 0);
345 let metadata_prune_pos = metadata.get(&key).map_or(0, |bytes| {
346 u64::from_be_bytes(
347 bytes
348 .as_slice()
349 .try_into()
350 .expect("metadata prune position is not 8 bytes"),
351 )
352 });
353 let oldest_retained_pos = journal.oldest_retained_pos().unwrap_or(0);
354 if metadata_prune_pos != oldest_retained_pos {
355 assert!(metadata_prune_pos >= oldest_retained_pos);
356 journal.prune(metadata_prune_pos).await?;
359 if journal.oldest_retained_pos().unwrap_or(0) != oldest_retained_pos {
360 warn!(
363 oldest_retained_pos,
364 metadata_prune_pos, "journal pruned to match metadata"
365 );
366 }
367 }
368
369 let last_valid_size = PeakIterator::to_nearest_size(journal_size);
370 let mut orphaned_leaf: Option<D> = None;
371 if last_valid_size != journal_size {
372 warn!(
373 ?last_valid_size,
374 "encountered invalid MMR structure, recovering from last valid size"
375 );
376 let recovered_item = journal.read(*last_valid_size).await;
379 if let Ok(item) = recovered_item {
380 orphaned_leaf = Some(item);
381 }
382 journal.rewind(*last_valid_size).await?;
383 journal.sync().await?;
384 journal_size = last_valid_size
385 }
386
387 let mut pinned_nodes = Vec::new();
389 for pos in nodes_to_pin(journal_size) {
390 let digest =
391 Mmr::<E, D>::get_from_metadata_or_journal(&metadata, &journal, pos).await?;
392 pinned_nodes.push(digest);
393 }
394 let mut mem_mmr = MemMmr::init(
395 MemConfig {
396 nodes: vec![],
397 pruned_to_pos: journal_size,
398 pinned_nodes,
399 },
400 hasher,
401 )?;
402 let prune_pos = Position::new(metadata_prune_pos);
403 Self::add_extra_pinned_nodes(&mut mem_mmr, &metadata, &journal, prune_pos).await?;
404
405 let mut s = Self {
406 mem_mmr,
407 journal,
408 journal_size,
409 metadata,
410 pruned_to_pos: prune_pos,
411 pool: cfg.thread_pool,
412 };
413
414 if let Some(leaf) = orphaned_leaf {
415 let pos = s.mem_mmr.size();
417 warn!(?pos, "recovering orphaned leaf");
418 s.mem_mmr.add_leaf_digest(hasher, leaf);
419 assert_eq!(pos, journal_size);
420 s.sync().await?;
421 assert_eq!(s.size(), s.journal.size());
422 }
423
424 Ok(s)
425 }
426
427 pub async fn init_sync(
444 context: E,
445 cfg: SyncConfig<D>,
446 hasher: &mut impl Hasher<D>,
447 ) -> Result<Self, crate::qmdb::Error> {
448 let journal = init_journal(
449 context.with_label("mmr_journal"),
450 JConfig {
451 partition: cfg.config.journal_partition,
452 items_per_blob: cfg.config.items_per_blob,
453 write_buffer: cfg.config.write_buffer,
454 buffer_pool: cfg.config.buffer_pool.clone(),
455 },
456 *cfg.range.start..*cfg.range.end,
457 )
458 .await?;
459 let journal_size = Position::new(journal.size());
460 assert!(journal_size <= *cfg.range.end);
461
462 let metadata_cfg = MConfig {
464 partition: cfg.config.metadata_partition,
465 codec_config: ((0..).into(), ()),
466 };
467 let mut metadata = Metadata::init(context.with_label("mmr_metadata"), metadata_cfg).await?;
468
469 let pruning_boundary_key = U64::new(PRUNE_TO_POS_PREFIX, 0);
471 metadata.put(
472 pruning_boundary_key,
473 (*cfg.range.start).to_be_bytes().into(),
474 );
475
476 if let Some(pinned_nodes) = cfg.pinned_nodes {
478 let nodes_to_pin_persisted = nodes_to_pin(cfg.range.start);
479 for (pos, digest) in nodes_to_pin_persisted.zip(pinned_nodes.iter()) {
480 metadata.put(U64::new(NODE_PREFIX, *pos), digest.to_vec());
481 }
482 }
483
484 let nodes_to_pin_mem = nodes_to_pin(journal_size);
486 let mut mem_pinned_nodes = Vec::new();
487 for pos in nodes_to_pin_mem {
488 let digest =
489 Mmr::<E, D>::get_from_metadata_or_journal(&metadata, &journal, pos).await?;
490 mem_pinned_nodes.push(digest);
491 }
492 let mut mem_mmr = MemMmr::init(
493 MemConfig {
494 nodes: vec![],
495 pruned_to_pos: journal_size,
496 pinned_nodes: mem_pinned_nodes,
497 },
498 hasher,
499 )?;
500
501 if cfg.range.start < journal_size {
503 Self::add_extra_pinned_nodes(&mut mem_mmr, &metadata, &journal, cfg.range.start)
504 .await?;
505 }
506 metadata.sync().await?;
507
508 Ok(Self {
509 mem_mmr,
510 journal,
511 journal_size,
512 metadata,
513 pruned_to_pos: cfg.range.start,
514 pool: cfg.config.thread_pool,
515 })
516 }
517
518 async fn update_metadata(
521 &mut self,
522 prune_to_pos: Position,
523 ) -> Result<BTreeMap<Position, D>, Error> {
524 assert!(prune_to_pos >= self.pruned_to_pos);
525
526 let mut pinned_nodes = BTreeMap::new();
527 for pos in nodes_to_pin(prune_to_pos) {
528 let digest = self.get_node(pos).await?.expect(
529 "pinned node should exist if prune_to_pos is no less than self.pruned_to_pos",
530 );
531 self.metadata
532 .put(U64::new(NODE_PREFIX, *pos), digest.to_vec());
533 pinned_nodes.insert(pos, digest);
534 }
535
536 let key: U64 = U64::new(PRUNE_TO_POS_PREFIX, 0);
537 self.metadata.put(key, (*prune_to_pos).to_be_bytes().into());
538
539 self.metadata.sync().await.map_err(Error::MetadataError)?;
540
541 Ok(pinned_nodes)
542 }
543
544 pub async fn get_node(&self, position: Position) -> Result<Option<D>, Error> {
545 if let Some(node) = self.mem_mmr.get_node(position) {
546 return Ok(Some(node));
547 }
548
549 match self.journal.read(*position).await {
550 Ok(item) => Ok(Some(item)),
551 Err(JError::ItemPruned(_)) => Ok(None),
552 Err(e) => Err(Error::JournalError(e)),
553 }
554 }
555
556 pub async fn sync(&mut self) -> Result<(), Error> {
558 for pos in *self.journal_size..*self.size() {
560 let pos = Position::new(pos);
561 let node = *self.mem_mmr.get_node_unchecked(pos);
562 self.journal.append(node).await?;
563 }
564 self.journal_size = self.size();
565 self.journal.sync().await?;
566 assert_eq!(self.journal_size, self.journal.size());
567
568 let mut pinned_nodes = BTreeMap::new();
571 for pos in nodes_to_pin(self.pruned_to_pos) {
572 let digest = self.mem_mmr.get_node_unchecked(pos);
573 pinned_nodes.insert(pos, *digest);
574 }
575
576 self.mem_mmr.prune_all();
579 self.mem_mmr.add_pinned_nodes(pinned_nodes);
580
581 Ok(())
582 }
583
584 pub async fn prune_to_pos(&mut self, pos: Position) -> Result<(), Error> {
589 assert!(pos <= self.size());
590 if pos <= self.pruned_to_pos {
591 return Ok(());
592 }
593
594 self.sync().await?;
596
597 let pinned_nodes = self.update_metadata(pos).await?;
600
601 self.journal.prune(*pos).await?;
602 self.mem_mmr.add_pinned_nodes(pinned_nodes);
603 self.pruned_to_pos = pos;
604
605 Ok(())
606 }
607
608 pub async fn pop(
612 &mut self,
613 hasher: &mut impl Hasher<D>,
614 mut leaves_to_pop: usize,
615 ) -> Result<(), Error> {
616 while leaves_to_pop > 0 {
619 match self.mem_mmr.pop(hasher) {
620 Ok(_) => {
621 leaves_to_pop -= 1;
622 }
623 Err(ElementPruned(_)) => break,
624 Err(Empty) => {
625 return Err(Error::Empty);
626 }
627 _ => unreachable!(),
628 }
629 }
630 if leaves_to_pop == 0 {
631 return Ok(());
632 }
633
634 let mut new_size = self.size();
635 while leaves_to_pop > 0 {
636 if new_size == 0 {
637 return Err(Error::Empty);
638 }
639 new_size -= 1;
640 if new_size < self.pruned_to_pos {
641 return Err(Error::ElementPruned(new_size));
642 }
643 if new_size.is_mmr_size() {
644 leaves_to_pop -= 1;
645 }
646 }
647
648 self.journal.rewind(*new_size).await?;
649 self.journal.sync().await?;
650 self.journal_size = new_size;
651
652 let mut pinned_nodes = Vec::new();
654 for pos in nodes_to_pin(new_size) {
655 let digest =
656 Self::get_from_metadata_or_journal(&self.metadata, &self.journal, pos).await?;
657 pinned_nodes.push(digest);
658 }
659
660 self.mem_mmr = CleanMemMmr::from_components(hasher, vec![], new_size, pinned_nodes);
661 Self::add_extra_pinned_nodes(
662 &mut self.mem_mmr,
663 &self.metadata,
664 &self.journal,
665 self.pruned_to_pos,
666 )
667 .await?;
668
669 Ok(())
670 }
671
672 pub const fn root(&self) -> D {
674 *self.mem_mmr.root()
675 }
676
677 pub async fn proof(&self, loc: Location) -> Result<Proof<D>, Error> {
685 if !loc.is_valid() {
686 return Err(Error::LocationOverflow(loc));
687 }
688 self.range_proof(loc..loc + 1).await
690 }
691
692 pub async fn range_proof(&self, range: Range<Location>) -> Result<Proof<D>, Error> {
702 verification::range_proof(self, range).await
703 }
704
705 pub async fn historical_range_proof(
716 &self,
717 size: Position,
718 range: Range<Location>,
719 ) -> Result<Proof<D>, Error> {
720 verification::historical_range_proof(self, size, range).await
721 }
722
723 pub async fn prune_all(&mut self) -> Result<(), Error> {
726 if self.size() != 0 {
727 self.prune_to_pos(self.size()).await?;
728 return Ok(());
729 }
730 Ok(())
731 }
732
733 pub async fn close(mut self) -> Result<(), Error> {
735 self.sync().await?;
736 self.journal.close().await?;
737 self.metadata.close().await.map_err(Error::MetadataError)
738 }
739
740 pub async fn destroy(self) -> Result<(), Error> {
742 self.journal.destroy().await?;
743 self.metadata.destroy().await?;
744
745 Ok(())
746 }
747
748 pub fn into_dirty(self) -> DirtyMmr<E, D> {
750 self.into()
751 }
752
753 #[cfg(any(test, feature = "fuzzing"))]
754 pub async fn simulate_partial_sync(&mut self, write_limit: usize) -> Result<(), Error> {
757 if write_limit == 0 {
758 return Ok(());
759 }
760
761 let mut written_count = 0usize;
764 for i in *self.journal_size..*self.size() {
765 let node = *self.mem_mmr.get_node_unchecked(Position::new(i));
766 self.journal.append(node).await?;
767 written_count += 1;
768 if written_count >= write_limit {
769 break;
770 }
771 }
772 self.journal.sync().await?;
773
774 Ok(())
775 }
776
777 #[cfg(test)]
778 pub fn get_pinned_nodes(&self) -> BTreeMap<Position, D> {
779 self.mem_mmr.pinned_nodes()
780 }
781
782 #[cfg(test)]
783 pub async fn simulate_pruning_failure(mut self, prune_to_pos: Position) -> Result<(), Error> {
784 assert!(prune_to_pos <= self.size());
785
786 self.sync().await?;
788
789 self.update_metadata(prune_to_pos).await?;
792
793 Ok(())
795 }
796}
797
798impl<E: RStorage + Clock + Metrics, D: Digest> DirtyMmr<E, D> {
799 pub fn merkleize(self, h: &mut impl Hasher<D>) -> CleanMmr<E, D> {
801 CleanMmr {
802 mem_mmr: self.mem_mmr.merkleize(h, self.pool.clone()),
803 journal: self.journal,
804 journal_size: self.journal_size,
805 metadata: self.metadata,
806 pruned_to_pos: self.pruned_to_pos,
807 pool: self.pool,
808 }
809 }
810
811 pub async fn pop(&mut self, mut leaves_to_pop: usize) -> Result<(), Error> {
813 while leaves_to_pop > 0 {
814 match self.mem_mmr.pop() {
815 Ok(_) => leaves_to_pop -= 1,
816 Err(ElementPruned(_)) => break,
817 Err(Empty) => return Err(Error::Empty),
818 Err(e) => return Err(e),
819 }
820 }
821 if leaves_to_pop == 0 {
822 return Ok(());
823 }
824
825 let mut new_size = self.size();
826 while leaves_to_pop > 0 {
827 if new_size == 0 {
828 return Err(Error::Empty);
829 }
830 new_size -= 1;
831 if new_size < self.pruned_to_pos {
832 return Err(Error::ElementPruned(new_size));
833 }
834 if new_size.is_mmr_size() {
835 leaves_to_pop -= 1;
836 }
837 }
838
839 self.journal.rewind(*new_size).await?;
840 self.journal.sync().await?;
841 self.journal_size = new_size;
842
843 let mut pinned_nodes = Vec::new();
844 for pos in nodes_to_pin(new_size) {
845 let digest = Mmr::<E, D, Clean<D>>::get_from_metadata_or_journal(
846 &self.metadata,
847 &self.journal,
848 pos,
849 )
850 .await?;
851 pinned_nodes.push(digest);
852 }
853
854 self.mem_mmr = DirtyMemMmr::from_components(vec![], new_size, pinned_nodes);
855 Self::add_extra_pinned_nodes(
856 &mut self.mem_mmr,
857 &self.metadata,
858 &self.journal,
859 self.pruned_to_pos,
860 )
861 .await?;
862
863 Ok(())
864 }
865
866 #[cfg(any(test, feature = "fuzzing"))]
867 pub async fn simulate_partial_sync(
870 self,
871 hasher: &mut impl Hasher<D>,
872 write_limit: usize,
873 ) -> Result<(), Error> {
874 if write_limit == 0 {
875 return Ok(());
876 }
877
878 let mut clean_mmr = self.merkleize(hasher);
879
880 let mut written_count = 0usize;
883 for i in *clean_mmr.journal_size..*clean_mmr.size() {
884 let node = *clean_mmr.mem_mmr.get_node_unchecked(Position::new(i));
885 clean_mmr.journal.append(node).await?;
886 written_count += 1;
887 if written_count >= write_limit {
888 break;
889 }
890 }
891 clean_mmr.journal.sync().await?;
892
893 Ok(())
894 }
895}
896
897impl<E: RStorage + Clock + Metrics, D: Digest> Storage<D> for Mmr<E, D, Clean<D>> {
898 fn size(&self) -> Position {
899 self.size()
900 }
901
902 async fn get_node(&self, position: Position) -> Result<Option<D>, Error> {
903 self.get_node(position).await
904 }
905}
906
907#[cfg(test)]
908mod tests {
909 use super::*;
910 use crate::mmr::{
911 hasher::Hasher as _, location::LocationRangeExt as _, stability::ROOTS, Location,
912 StandardHasher as Standard,
913 };
914 use commonware_cryptography::{
915 sha256::{self, Digest},
916 Hasher, Sha256,
917 };
918 use commonware_macros::test_traced;
919 use commonware_runtime::{buffer::PoolRef, deterministic, Blob as _, Runner};
920 use commonware_utils::{hex, NZUsize, NZU64};
921
922 fn test_digest(v: usize) -> Digest {
923 Sha256::hash(&v.to_be_bytes())
924 }
925
926 const PAGE_SIZE: usize = 111;
927 const PAGE_CACHE_SIZE: usize = 5;
928
929 fn test_config() -> Config {
930 Config {
931 journal_partition: "journal_partition".into(),
932 metadata_partition: "metadata_partition".into(),
933 items_per_blob: NZU64!(7),
934 write_buffer: NZUsize!(1024),
935 thread_pool: None,
936 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
937 }
938 }
939
940 pub async fn build_batched_and_check_test_roots_journaled<E: RStorage + Clock + Metrics>(
941 journaled_mmr: CleanMmr<E, sha256::Digest>,
942 ) -> CleanMmr<E, sha256::Digest> {
943 let mut hasher: Standard<Sha256> = Standard::new();
944
945 let mut dirty_mmr = journaled_mmr.into_dirty();
947 hasher.inner().update(&0u64.to_be_bytes());
948 let element = hasher.inner().finalize();
949 dirty_mmr.add(&mut hasher, &element).await.unwrap();
950
951 for i in 1u64..199 {
953 hasher.inner().update(&i.to_be_bytes());
954 let element = hasher.inner().finalize();
955 dirty_mmr.add(&mut hasher, &element).await.unwrap();
956 }
957
958 let journaled_mmr = dirty_mmr.merkleize(&mut hasher);
959
960 assert_eq!(
961 hex(&journaled_mmr.root()),
962 ROOTS[199],
963 "Root after 200 elements"
964 );
965
966 journaled_mmr
967 }
968
969 #[test]
971 fn test_journaled_mmr_root_stability() {
972 let executor = deterministic::Runner::default();
973 executor.start(|context| async move {
974 let mmr = Mmr::init(
975 context.clone(),
976 &mut Standard::<Sha256>::new(),
977 test_config(),
978 )
979 .await
980 .unwrap();
981 let mmr = build_batched_and_check_test_roots_journaled(mmr).await;
982 mmr.destroy().await.unwrap();
983 });
984 }
985
986 #[test_traced]
987 fn test_journaled_mmr_empty() {
988 let executor = deterministic::Runner::default();
989 executor.start(|context| async move {
990 let mut hasher: Standard<Sha256> = Standard::new();
991 let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
992 .await
993 .unwrap();
994 assert_eq!(mmr.size(), 0);
995 assert!(mmr.get_node(Position::new(0)).await.is_err());
996 assert_eq!(mmr.oldest_retained_pos(), None);
997 assert!(mmr.prune_all().await.is_ok());
998 assert_eq!(mmr.pruned_to_pos(), 0);
999 assert!(mmr.prune_to_pos(Position::new(0)).await.is_ok());
1000 assert!(mmr.sync().await.is_ok());
1001 assert!(matches!(mmr.pop(&mut hasher, 1).await, Err(Error::Empty)));
1002
1003 mmr.add(&mut hasher, &test_digest(0)).await.unwrap();
1004 assert_eq!(mmr.size(), 1);
1005 mmr.sync().await.unwrap();
1006 assert!(mmr.get_node(Position::new(0)).await.is_ok());
1007 assert!(mmr.pop(&mut hasher, 1).await.is_ok());
1008 assert_eq!(mmr.size(), 0);
1009 mmr.sync().await.unwrap();
1010
1011 let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1012 .await
1013 .unwrap();
1014 assert_eq!(mmr.size(), 0);
1015
1016 let empty_proof = Proof::default();
1017 let mut hasher: Standard<Sha256> = Standard::new();
1018 let root = mmr.root();
1019 assert!(empty_proof.verify_range_inclusion(
1020 &mut hasher,
1021 &[] as &[Digest],
1022 Location::new_unchecked(0),
1023 &root
1024 ));
1025 assert!(empty_proof.verify_multi_inclusion(
1026 &mut hasher,
1027 &[] as &[(Digest, Location)],
1028 &root
1029 ));
1030
1031 mmr.add(&mut hasher, &test_digest(0)).await.unwrap();
1033 let root = mmr.root();
1034 assert!(!empty_proof.verify_range_inclusion(
1035 &mut hasher,
1036 &[] as &[Digest],
1037 Location::new_unchecked(0),
1038 &root
1039 ));
1040 assert!(!empty_proof.verify_multi_inclusion(
1041 &mut hasher,
1042 &[] as &[(Digest, Location)],
1043 &root
1044 ));
1045
1046 mmr.destroy().await.unwrap();
1047 });
1048 }
1049
1050 #[test_traced]
1051 fn test_journaled_mmr_pop() {
1052 let executor = deterministic::Runner::default();
1053 executor.start(|context| async move {
1054 let mut hasher: Standard<Sha256> = Standard::new();
1055 let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1056 .await
1057 .unwrap();
1058
1059 let mut c_hasher = Sha256::new();
1060 for i in 0u64..199 {
1061 c_hasher.update(&i.to_be_bytes());
1062 let element = c_hasher.finalize();
1063 mmr.add(&mut hasher, &element).await.unwrap();
1064 }
1065 assert_eq!(ROOTS[199], hex(&mmr.root()));
1066
1067 for i in (0..199u64).rev() {
1070 assert!(mmr.pop(&mut hasher, 1).await.is_ok());
1071 let root = mmr.root();
1072 let expected_root = ROOTS[i as usize];
1073 assert_eq!(hex(&root), expected_root);
1074 }
1075 assert!(matches!(mmr.pop(&mut hasher, 1).await, Err(Error::Empty)));
1076 assert!(mmr.pop(&mut hasher, 0).await.is_ok());
1077
1078 for i in 0u64..199 {
1081 c_hasher.update(&i.to_be_bytes());
1082 let element = c_hasher.finalize();
1083 mmr.add(&mut hasher, &element).await.unwrap();
1084 if i == 101 {
1085 mmr.sync().await.unwrap();
1086 }
1087 }
1088 for i in (0..198u64).rev().step_by(2) {
1089 assert!(mmr.pop(&mut hasher, 2).await.is_ok(), "at position {i:?}");
1090 let root = mmr.root();
1091 let expected_root = ROOTS[i as usize];
1092 assert_eq!(hex(&root), expected_root, "at position {i:?}");
1093 }
1094 assert_eq!(mmr.size(), 1);
1095 assert!(mmr.pop(&mut hasher, 1).await.is_ok()); assert!(matches!(mmr.pop(&mut hasher, 99).await, Err(Error::Empty)));
1097
1098 for i in 0u64..199 {
1100 c_hasher.update(&i.to_be_bytes());
1101 let element = c_hasher.finalize();
1102 mmr.add(&mut hasher, &element).await.unwrap();
1103 if i == 101 {
1104 mmr.sync().await.unwrap();
1105 }
1106 }
1107 let leaf_pos = Position::try_from(Location::new_unchecked(50)).unwrap();
1108 mmr.prune_to_pos(leaf_pos).await.unwrap();
1109 mmr.pop(&mut hasher, 80).await.unwrap();
1111 mmr.proof(Location::try_from(leaf_pos).unwrap())
1113 .await
1114 .unwrap();
1115 while mmr.size() > leaf_pos {
1117 assert!(mmr.pop(&mut hasher, 1).await.is_ok());
1118 }
1119 assert!(matches!(
1120 mmr.pop(&mut hasher, 1).await,
1121 Err(Error::ElementPruned(_))
1122 ));
1123
1124 assert!(mmr.prune_to_pos(leaf_pos - 1).await.is_ok());
1126 assert_eq!(mmr.pruned_to_pos(), leaf_pos);
1127
1128 mmr.destroy().await.unwrap();
1129 });
1130 }
1131
1132 #[test_traced]
1133 fn test_journaled_mmr_basic() {
1134 let executor = deterministic::Runner::default();
1135 executor.start(|context| async move {
1136 let mut hasher: Standard<Sha256> = Standard::new();
1137 let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1138 .await
1139 .unwrap();
1140 const LEAF_COUNT: usize = 255;
1142 let mut leaves = Vec::with_capacity(LEAF_COUNT);
1143 let mut positions = Vec::with_capacity(LEAF_COUNT);
1144 for i in 0..LEAF_COUNT {
1145 let digest = test_digest(i);
1146 leaves.push(digest);
1147 let pos = mmr.add(&mut hasher, leaves.last().unwrap()).await.unwrap();
1148 positions.push(pos);
1149 }
1150 assert_eq!(mmr.size(), Position::new(502));
1151 assert_eq!(mmr.journal_size, Position::new(0));
1152
1153 const TEST_ELEMENT: usize = 133;
1155 const TEST_ELEMENT_LOC: Location = Location::new_unchecked(TEST_ELEMENT as u64);
1156
1157 let proof = mmr.proof(TEST_ELEMENT_LOC).await.unwrap();
1158 let root = mmr.root();
1159 assert!(proof.verify_element_inclusion(
1160 &mut hasher,
1161 &leaves[TEST_ELEMENT],
1162 TEST_ELEMENT_LOC,
1163 &root,
1164 ));
1165
1166 mmr.sync().await.unwrap();
1168 assert_eq!(mmr.journal_size, Position::new(502));
1169 assert_eq!(mmr.mem_mmr.oldest_retained_pos(), None);
1170
1171 let proof2 = mmr.proof(TEST_ELEMENT_LOC).await.unwrap();
1174 assert_eq!(proof, proof2);
1175
1176 let range = Location::new_unchecked(TEST_ELEMENT as u64)
1178 ..Location::new_unchecked(LEAF_COUNT as u64);
1179 let proof = mmr.range_proof(range.clone()).await.unwrap();
1180 assert!(proof.verify_range_inclusion(
1181 &mut hasher,
1182 &leaves[range.to_usize_range()],
1183 TEST_ELEMENT_LOC,
1184 &root
1185 ));
1186
1187 mmr.destroy().await.unwrap();
1188 });
1189 }
1190
1191 #[test_traced]
1192 fn test_journaled_mmr_recovery() {
1195 let executor = deterministic::Runner::default();
1196 executor.start(|context| async move {
1197 let mut hasher: Standard<Sha256> = Standard::new();
1198 let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1199 .await
1200 .unwrap();
1201 assert_eq!(mmr.size(), 0);
1202
1203 const LEAF_COUNT: usize = 252;
1205 let mut leaves = Vec::with_capacity(LEAF_COUNT);
1206 let mut positions = Vec::with_capacity(LEAF_COUNT);
1207 for i in 0..LEAF_COUNT {
1208 let digest = test_digest(i);
1209 leaves.push(digest);
1210 let pos = mmr.add(&mut hasher, leaves.last().unwrap()).await.unwrap();
1211 positions.push(pos);
1212 }
1213 assert_eq!(mmr.size(), 498);
1214 let root = mmr.root();
1215 mmr.close().await.unwrap();
1216
1217 let partition: String = "journal_partition".into();
1221 let (blob, len) = context
1222 .open(&partition, &71u64.to_be_bytes())
1223 .await
1224 .expect("Failed to open blob");
1225 assert_eq!(len, 36); blob.resize(len - 1).await.expect("Failed to corrupt blob");
1229 blob.sync().await.expect("Failed to sync blob");
1230
1231 let mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1232 .await
1233 .unwrap();
1234 assert_eq!(mmr.size(), 498);
1237 assert_eq!(mmr.root(), root);
1238
1239 mmr.close().await.unwrap();
1241 let mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1242 .await
1243 .unwrap();
1244 assert_eq!(mmr.size(), 498);
1245 mmr.close().await.unwrap();
1246
1247 context
1251 .remove(&partition, Some(&71u64.to_be_bytes()))
1252 .await
1253 .expect("Failed to remove blob");
1254 let (blob, len) = context
1255 .open(&partition, &70u64.to_be_bytes())
1256 .await
1257 .expect("Failed to open blob");
1258 assert_eq!(len, 36 * 7); blob.resize(36 * 5 + 35)
1262 .await
1263 .expect("Failed to corrupt blob");
1264 blob.sync().await.expect("Failed to sync blob");
1265
1266 let mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1267 .await
1268 .unwrap();
1269 assert_eq!(mmr.size(), 495);
1272
1273 mmr.destroy().await.unwrap();
1274 });
1275 }
1276
1277 #[test_traced]
1278 fn test_journaled_mmr_pruning() {
1279 let executor = deterministic::Runner::default();
1280 executor.start(|context| async move {
1281 let mut hasher: Standard<Sha256> = Standard::new();
1282 const LEAF_COUNT: usize = 2000;
1284 let cfg_pruned = test_config();
1285 let mut pruned_mmr = Mmr::init(context.clone(), &mut hasher, cfg_pruned.clone())
1286 .await
1287 .unwrap();
1288 let cfg_unpruned = Config {
1289 journal_partition: "unpruned_journal_partition".into(),
1290 metadata_partition: "unpruned_metadata_partition".into(),
1291 items_per_blob: NZU64!(7),
1292 write_buffer: NZUsize!(1024),
1293 thread_pool: None,
1294 buffer_pool: cfg_pruned.buffer_pool.clone(),
1295 };
1296 let mut mmr = Mmr::init(context.clone(), &mut hasher, cfg_unpruned)
1297 .await
1298 .unwrap();
1299 let mut leaves = Vec::with_capacity(LEAF_COUNT);
1300 let mut positions = Vec::with_capacity(LEAF_COUNT);
1301 for i in 0..LEAF_COUNT {
1302 let digest = test_digest(i);
1303 leaves.push(digest);
1304 let last_leaf = leaves.last().unwrap();
1305 let pos = mmr.add(&mut hasher, last_leaf).await.unwrap();
1306 positions.push(pos);
1307 pruned_mmr.add(&mut hasher, last_leaf).await.unwrap();
1308 }
1309 assert_eq!(mmr.size(), 3994);
1310 assert_eq!(pruned_mmr.size(), 3994);
1311
1312 for i in 0usize..300 {
1315 let prune_pos = i as u64 * 10;
1316 pruned_mmr
1317 .prune_to_pos(Position::new(prune_pos))
1318 .await
1319 .unwrap();
1320 assert_eq!(prune_pos, pruned_mmr.pruned_to_pos());
1321
1322 let digest = test_digest(LEAF_COUNT + i);
1323 leaves.push(digest);
1324 let last_leaf = leaves.last().unwrap();
1325 let pos = pruned_mmr.add(&mut hasher, last_leaf).await.unwrap();
1326 positions.push(pos);
1327 mmr.add(&mut hasher, last_leaf).await.unwrap();
1328 assert_eq!(pruned_mmr.root(), mmr.root());
1329 }
1330
1331 pruned_mmr.sync().await.unwrap();
1333 assert_eq!(pruned_mmr.root(), mmr.root());
1334
1335 pruned_mmr.close().await.unwrap();
1337 let mut pruned_mmr = Mmr::init(context.clone(), &mut hasher, cfg_pruned.clone())
1338 .await
1339 .unwrap();
1340 assert_eq!(pruned_mmr.root(), mmr.root());
1341
1342 let size = pruned_mmr.size();
1344 pruned_mmr.prune_all().await.unwrap();
1345 assert_eq!(pruned_mmr.root(), mmr.root());
1346 assert_eq!(pruned_mmr.oldest_retained_pos(), None);
1347 assert_eq!(pruned_mmr.pruned_to_pos(), size);
1348
1349 mmr.add(&mut hasher, &test_digest(LEAF_COUNT))
1352 .await
1353 .unwrap();
1354 pruned_mmr
1355 .add(&mut hasher, &test_digest(LEAF_COUNT))
1356 .await
1357 .unwrap();
1358 assert!(*pruned_mmr.size() % cfg_pruned.items_per_blob != 0);
1359 pruned_mmr.close().await.unwrap();
1360 let mut pruned_mmr = Mmr::init(context.clone(), &mut hasher, cfg_pruned.clone())
1361 .await
1362 .unwrap();
1363 assert_eq!(pruned_mmr.root(), mmr.root());
1364 assert_eq!(pruned_mmr.oldest_retained_pos(), Some(size));
1365 assert_eq!(pruned_mmr.pruned_to_pos(), size);
1366
1367 assert!(pruned_mmr.prune_to_pos(size - 1).await.is_ok());
1369 assert_eq!(pruned_mmr.pruned_to_pos(), size);
1370
1371 while *pruned_mmr.size() % cfg_pruned.items_per_blob != 0 {
1374 pruned_mmr
1375 .add(&mut hasher, &test_digest(LEAF_COUNT))
1376 .await
1377 .unwrap();
1378 }
1379 pruned_mmr.prune_all().await.unwrap();
1380 assert_eq!(pruned_mmr.oldest_retained_pos(), None);
1381
1382 pruned_mmr.destroy().await.unwrap();
1383 mmr.destroy().await.unwrap();
1384 });
1385 }
1386
1387 #[test_traced("WARN")]
1388 fn test_journaled_mmr_recovery_with_pruning() {
1390 let executor = deterministic::Runner::default();
1391 executor.start(|context| async move {
1392 let mut hasher: Standard<Sha256> = Standard::new();
1394 const LEAF_COUNT: usize = 2000;
1395 let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1396 .await
1397 .unwrap();
1398 let mut leaves = Vec::with_capacity(LEAF_COUNT);
1399 let mut positions = Vec::with_capacity(LEAF_COUNT);
1400 for i in 0..LEAF_COUNT {
1401 let digest = test_digest(i);
1402 leaves.push(digest);
1403 let last_leaf = leaves.last().unwrap();
1404 let pos = mmr.add(&mut hasher, last_leaf).await.unwrap();
1405 positions.push(pos);
1406 }
1407 assert_eq!(mmr.size(), 3994);
1408 mmr.close().await.unwrap();
1409
1410 for i in 0usize..200 {
1412 let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1413 .await
1414 .unwrap();
1415 let start_size = mmr.size();
1416 let prune_pos = std::cmp::min(i as u64 * 50, *start_size);
1417 let prune_pos = Position::new(prune_pos);
1418 if i % 5 == 0 {
1419 mmr.simulate_pruning_failure(prune_pos).await.unwrap();
1420 continue;
1421 }
1422 mmr.prune_to_pos(prune_pos).await.unwrap();
1423
1424 for j in 0..10 {
1426 let digest = test_digest(100 * (i + 1) + j);
1427 leaves.push(digest);
1428 let last_leaf = leaves.last().unwrap();
1429 let pos = mmr.add(&mut hasher, last_leaf).await.unwrap();
1430 positions.push(pos);
1431 mmr.add(&mut hasher, last_leaf).await.unwrap();
1432 assert_eq!(mmr.root(), mmr.root());
1433 let digest = test_digest(LEAF_COUNT + i);
1434 leaves.push(digest);
1435 let last_leaf = leaves.last().unwrap();
1436 let pos = mmr.add(&mut hasher, last_leaf).await.unwrap();
1437 positions.push(pos);
1438 mmr.add(&mut hasher, last_leaf).await.unwrap();
1439 }
1440 let end_size = mmr.size();
1441 let total_to_write = (*end_size - *start_size) as usize;
1442 let partial_write_limit = i % total_to_write;
1443 mmr.simulate_partial_sync(partial_write_limit)
1444 .await
1445 .unwrap();
1446 }
1447
1448 let mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1449 .await
1450 .unwrap();
1451 mmr.destroy().await.unwrap();
1452 });
1453 }
1454
1455 #[test_traced]
1456 fn test_journaled_mmr_historical_range_proof_basic() {
1457 let executor = deterministic::Runner::default();
1458 executor.start(|context| async move {
1459 let mut hasher = Standard::<Sha256>::new();
1461 let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1462 .await
1463 .unwrap();
1464 let mut elements = Vec::new();
1465 let mut positions = Vec::new();
1466 for i in 0..10 {
1467 elements.push(test_digest(i));
1468 positions.push(mmr.add(&mut hasher, &elements[i]).await.unwrap());
1469 }
1470 let original_size = mmr.size();
1471
1472 let historical_proof = mmr
1474 .historical_range_proof(
1475 original_size,
1476 Location::new_unchecked(2)..Location::new_unchecked(6),
1477 )
1478 .await
1479 .unwrap();
1480 assert_eq!(historical_proof.size, original_size);
1481 let root = mmr.root();
1482 assert!(historical_proof.verify_range_inclusion(
1483 &mut hasher,
1484 &elements[2..6],
1485 Location::new_unchecked(2),
1486 &root
1487 ));
1488 let regular_proof = mmr
1489 .range_proof(Location::new_unchecked(2)..Location::new_unchecked(6))
1490 .await
1491 .unwrap();
1492 assert_eq!(regular_proof.size, historical_proof.size);
1493 assert_eq!(regular_proof.digests, historical_proof.digests);
1494
1495 for i in 10..20 {
1497 elements.push(test_digest(i));
1498 positions.push(mmr.add(&mut hasher, &elements[i]).await.unwrap());
1499 }
1500 let new_historical_proof = mmr
1501 .historical_range_proof(
1502 original_size,
1503 Location::new_unchecked(2)..Location::new_unchecked(6),
1504 )
1505 .await
1506 .unwrap();
1507 assert_eq!(new_historical_proof.size, historical_proof.size);
1508 assert_eq!(new_historical_proof.digests, historical_proof.digests);
1509
1510 mmr.destroy().await.unwrap();
1511 });
1512 }
1513
1514 #[test_traced]
1515 fn test_journaled_mmr_historical_range_proof_with_pruning() {
1516 let executor = deterministic::Runner::default();
1517 executor.start(|context| async move {
1518 let mut hasher = Standard::<Sha256>::new();
1519 let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1520 .await
1521 .unwrap();
1522
1523 let mut elements = Vec::new();
1525 let mut positions = Vec::new();
1526 for i in 0..50 {
1527 elements.push(test_digest(i));
1528 positions.push(mmr.add(&mut hasher, &elements[i]).await.unwrap());
1529 }
1530
1531 let prune_pos = Position::new(30);
1533 mmr.prune_to_pos(prune_pos).await.unwrap();
1534
1535 let mut ref_mmr = Mmr::init(
1537 context.clone(),
1538 &mut hasher,
1539 Config {
1540 journal_partition: "ref_journal_pruned".into(),
1541 metadata_partition: "ref_metadata_pruned".into(),
1542 items_per_blob: NZU64!(7),
1543 write_buffer: NZUsize!(1024),
1544 thread_pool: None,
1545 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1546 },
1547 )
1548 .await
1549 .unwrap();
1550
1551 for elt in elements.iter().take(41) {
1552 ref_mmr.add(&mut hasher, elt).await.unwrap();
1553 }
1554 let historical_size = ref_mmr.size();
1555 let historical_root = ref_mmr.root();
1556
1557 let historical_proof = mmr
1559 .historical_range_proof(
1560 historical_size,
1561 Location::new_unchecked(35)..Location::new_unchecked(39), )
1563 .await
1564 .unwrap();
1565
1566 assert_eq!(historical_proof.size, historical_size);
1567
1568 assert!(historical_proof.verify_range_inclusion(
1570 &mut hasher,
1571 &elements[35..39],
1572 Location::new_unchecked(35),
1573 &historical_root
1574 ));
1575
1576 ref_mmr.destroy().await.unwrap();
1577 mmr.destroy().await.unwrap();
1578 });
1579 }
1580
1581 #[test_traced]
1582 fn test_journaled_mmr_historical_range_proof_large() {
1583 let executor = deterministic::Runner::default();
1584 executor.start(|context| async move {
1585 let mut hasher = Standard::<Sha256>::new();
1586
1587 let mut mmr = Mmr::init(
1588 context.clone(),
1589 &mut hasher,
1590 Config {
1591 journal_partition: "server_journal".into(),
1592 metadata_partition: "server_metadata".into(),
1593 items_per_blob: NZU64!(7),
1594 write_buffer: NZUsize!(1024),
1595 thread_pool: None,
1596 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1597 },
1598 )
1599 .await
1600 .unwrap();
1601
1602 let mut elements = Vec::new();
1603 let mut positions = Vec::new();
1604 for i in 0..100 {
1605 elements.push(test_digest(i));
1606 positions.push(mmr.add(&mut hasher, &elements[i]).await.unwrap());
1607 }
1608
1609 let range = Location::new_unchecked(30)..Location::new_unchecked(61);
1610
1611 let mut ref_mmr = Mmr::init(
1613 context.clone(),
1614 &mut hasher,
1615 Config {
1616 journal_partition: "client_journal".into(),
1617 metadata_partition: "client_metadata".into(),
1618 items_per_blob: NZU64!(7),
1619 write_buffer: NZUsize!(1024),
1620 thread_pool: None,
1621 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1622 },
1623 )
1624 .await
1625 .unwrap();
1626
1627 for elt in elements.iter().take(*range.end as usize) {
1629 ref_mmr.add(&mut hasher, elt).await.unwrap();
1630 }
1631 let historical_size = ref_mmr.size();
1632 let expected_root = ref_mmr.root();
1633
1634 let proof = mmr
1636 .historical_range_proof(historical_size, range.clone())
1637 .await
1638 .unwrap();
1639
1640 assert!(proof.verify_range_inclusion(
1641 &mut hasher,
1642 &elements[range.to_usize_range()],
1643 range.start,
1644 &expected_root ));
1646
1647 ref_mmr.destroy().await.unwrap();
1648 mmr.destroy().await.unwrap();
1649 });
1650 }
1651
1652 #[test_traced]
1653 fn test_journaled_mmr_historical_range_proof_singleton() {
1654 let executor = deterministic::Runner::default();
1655 executor.start(|context| async move {
1656 let mut hasher = Standard::<Sha256>::new();
1657 let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1658 .await
1659 .unwrap();
1660
1661 let element = test_digest(0);
1662 mmr.add(&mut hasher, &element).await.unwrap();
1663
1664 let single_proof = mmr
1666 .historical_range_proof(
1667 Position::new(1),
1668 Location::new_unchecked(0)..Location::new_unchecked(1),
1669 )
1670 .await
1671 .unwrap();
1672
1673 let root = mmr.root();
1674 assert!(single_proof.verify_range_inclusion(
1675 &mut hasher,
1676 &[element],
1677 Location::new_unchecked(0),
1678 &root
1679 ));
1680
1681 mmr.destroy().await.unwrap();
1682 });
1683 }
1684
1685 #[test_traced]
1686 fn test_journaled_mmr_init_from_pinned_nodes() {
1687 let executor = deterministic::Runner::default();
1688 executor.start(|context| async move {
1689 let mut hasher = Standard::<Sha256>::new();
1690
1691 let mut original_mmr = Mmr::init(
1693 context.clone(),
1694 &mut hasher,
1695 Config {
1696 journal_partition: "original_journal".into(),
1697 metadata_partition: "original_metadata".into(),
1698 items_per_blob: NZU64!(7),
1699 write_buffer: NZUsize!(1024),
1700 thread_pool: None,
1701 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1702 },
1703 )
1704 .await
1705 .unwrap();
1706
1707 const NUM_ELEMENTS: u64 = 1_000;
1709 for i in 0..NUM_ELEMENTS {
1710 original_mmr
1711 .add(&mut hasher, &test_digest(i as usize))
1712 .await
1713 .unwrap();
1714 }
1715 original_mmr.sync().await.unwrap();
1716 let original_size = original_mmr.size();
1717 original_mmr.prune_to_pos(original_size).await.unwrap();
1718
1719 let mut hasher = Standard::<Sha256>::new();
1721 let original_journal_digest = original_mmr.root();
1722
1723 let pinned_nodes_map = original_mmr.get_pinned_nodes();
1725 let pinned_nodes: Vec<_> = nodes_to_pin(original_size)
1726 .map(|pos| pinned_nodes_map[&pos])
1727 .collect();
1728
1729 let new_mmr_config = Config {
1731 journal_partition: "new_journal".into(),
1732 metadata_partition: "new_metadata".into(),
1733 items_per_blob: NZU64!(7),
1734 write_buffer: NZUsize!(1024),
1735 thread_pool: None,
1736 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1737 };
1738 let mut new_mmr =
1739 Mmr::<_, sha256::Digest, Clean<sha256::Digest>>::init_from_pinned_nodes(
1740 context.clone(),
1741 pinned_nodes,
1742 original_size,
1743 new_mmr_config.clone(),
1744 &mut hasher,
1745 )
1746 .await
1747 .unwrap();
1748
1749 assert_eq!(new_mmr.size(), original_size);
1751 assert_eq!(new_mmr.pruned_to_pos(), original_size);
1752 assert_eq!(new_mmr.oldest_retained_pos(), None);
1753 let new_journal_digest = new_mmr.root();
1755 assert_eq!(new_journal_digest, original_journal_digest);
1756
1757 let new_element = test_digest(10);
1759
1760 let original_mmr_pos = original_mmr.add(&mut hasher, &new_element).await.unwrap();
1761 assert_eq!(original_mmr_pos, original_size);
1762
1763 let new_mmr_pos = new_mmr.add(&mut hasher, &new_element).await.unwrap();
1764 assert_eq!(new_mmr_pos, original_size); let original_mmr_root = original_mmr.root();
1768 let new_mmr_root = new_mmr.root();
1769 assert_eq!(new_mmr_root, original_mmr_root);
1770
1771 new_mmr.close().await.unwrap();
1773 let new_mmr = Mmr::<_, sha256::Digest, Clean<sha256::Digest>>::init(
1774 context.clone(),
1775 &mut hasher,
1776 new_mmr_config,
1777 )
1778 .await
1779 .unwrap();
1780
1781 let new_mmr_root = new_mmr.root();
1783 assert_eq!(new_mmr_root, original_mmr_root);
1784
1785 assert_eq!(new_mmr.size(), original_size + 1); assert_eq!(new_mmr.pruned_to_pos(), original_size);
1788 assert_eq!(new_mmr.oldest_retained_pos(), Some(original_size)); let proof = new_mmr
1792 .proof(Location::new_unchecked(NUM_ELEMENTS))
1793 .await
1794 .unwrap();
1795 let original_proof = original_mmr
1796 .proof(Location::new_unchecked(NUM_ELEMENTS))
1797 .await
1798 .unwrap();
1799 assert_eq!(proof.digests, original_proof.digests);
1800 assert_eq!(proof.size, original_proof.size);
1801
1802 original_mmr.destroy().await.unwrap();
1803 new_mmr.destroy().await.unwrap();
1804 });
1805 }
1806
1807 #[test_traced]
1808 fn test_journaled_mmr_init_from_pinned_nodes_edge_cases() {
1809 use crate::mmr::mem::CleanMmr as CleanMemMmr;
1810
1811 let executor = deterministic::Runner::default();
1812 executor.start(|context| async move {
1813 let mut hasher = Standard::<Sha256>::new();
1814
1815 let mut empty_mmr =
1817 Mmr::<_, sha256::Digest, Clean<sha256::Digest>>::init_from_pinned_nodes(
1818 context.clone(),
1819 vec![], Position::new(0), Config {
1822 journal_partition: "empty_journal".into(),
1823 metadata_partition: "empty_metadata".into(),
1824 items_per_blob: NZU64!(7),
1825 write_buffer: NZUsize!(1024),
1826 thread_pool: None,
1827 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1828 },
1829 &mut hasher,
1830 )
1831 .await
1832 .unwrap();
1833
1834 assert_eq!(empty_mmr.size(), 0);
1835 assert_eq!(empty_mmr.pruned_to_pos(), Position::new(0));
1836 assert_eq!(empty_mmr.oldest_retained_pos(), None);
1837
1838 let pos = empty_mmr.add(&mut hasher, &test_digest(0)).await.unwrap();
1840 assert_eq!(pos, 0);
1841 assert_eq!(empty_mmr.size(), 1);
1842
1843 empty_mmr.destroy().await.unwrap();
1844
1845 let mut single_mem_mmr = CleanMemMmr::new(&mut hasher);
1847 single_mem_mmr.add(&mut hasher, &test_digest(42));
1848 let single_size = single_mem_mmr.size();
1849 let single_root = single_mem_mmr.root();
1850 let single_pinned = single_mem_mmr.node_digests_to_pin(single_size);
1851
1852 let single_journaled_mmr =
1853 Mmr::<_, sha256::Digest, Clean<sha256::Digest>>::init_from_pinned_nodes(
1854 context.clone(),
1855 single_pinned,
1856 single_size,
1857 Config {
1858 journal_partition: "single_journal".into(),
1859 metadata_partition: "single_metadata".into(),
1860 items_per_blob: NZU64!(7),
1861 write_buffer: NZUsize!(1024),
1862 thread_pool: None,
1863 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1864 },
1865 &mut hasher,
1866 )
1867 .await
1868 .unwrap();
1869
1870 assert_eq!(single_journaled_mmr.size(), single_size);
1871 assert_eq!(single_journaled_mmr.root(), *single_root);
1872
1873 single_journaled_mmr.destroy().await.unwrap();
1874 });
1875 }
1876 #[test_traced]
1878 fn test_journaled_mmr_init_sync_empty() {
1879 let executor = deterministic::Runner::default();
1880 executor.start(|context| async move {
1881 let mut hasher = Standard::<Sha256>::new();
1882
1883 let sync_cfg = SyncConfig::<sha256::Digest> {
1885 config: test_config(),
1886 range: Position::new(0)..Position::new(100),
1887 pinned_nodes: None,
1888 };
1889
1890 let sync_mmr = Mmr::init_sync(context.clone(), sync_cfg, &mut hasher)
1891 .await
1892 .unwrap();
1893
1894 assert_eq!(sync_mmr.size(), 0);
1896 assert_eq!(sync_mmr.pruned_to_pos(), 0);
1897 assert_eq!(sync_mmr.oldest_retained_pos(), None);
1898
1899 let mut sync_mmr = sync_mmr;
1901 let new_element = test_digest(999);
1902 sync_mmr.add(&mut hasher, &new_element).await.unwrap();
1903
1904 let _root = sync_mmr.root();
1906
1907 sync_mmr.destroy().await.unwrap();
1908 });
1909 }
1910
1911 #[test_traced]
1913 fn test_journaled_mmr_init_sync_nonempty_exact_match() {
1914 let executor = deterministic::Runner::default();
1915 executor.start(|context| async move {
1916 let mut hasher = Standard::<Sha256>::new();
1917
1918 let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1920 .await
1921 .unwrap();
1922 for i in 0..50 {
1923 mmr.add(&mut hasher, &test_digest(i)).await.unwrap();
1924 }
1925 mmr.sync().await.unwrap();
1926 let original_size = mmr.size();
1927 let original_leaves = mmr.leaves();
1928 let original_root = mmr.root();
1929
1930 let lower_bound_pos = mmr.pruned_to_pos();
1932 let upper_bound_pos = mmr.size();
1933 let mut expected_nodes = BTreeMap::new();
1934 for i in *lower_bound_pos..*upper_bound_pos {
1935 expected_nodes.insert(
1936 Position::new(i),
1937 mmr.get_node(Position::new(i)).await.unwrap().unwrap(),
1938 );
1939 }
1940 let sync_cfg = SyncConfig::<sha256::Digest> {
1941 config: test_config(),
1942 range: lower_bound_pos..upper_bound_pos,
1943 pinned_nodes: None,
1944 };
1945
1946 mmr.close().await.unwrap();
1947
1948 let sync_mmr = Mmr::init_sync(context.clone(), sync_cfg, &mut hasher)
1949 .await
1950 .unwrap();
1951
1952 assert_eq!(sync_mmr.size(), original_size);
1954 assert_eq!(sync_mmr.leaves(), original_leaves);
1955 assert_eq!(sync_mmr.pruned_to_pos(), lower_bound_pos);
1956 assert_eq!(sync_mmr.oldest_retained_pos(), Some(lower_bound_pos));
1957 assert_eq!(sync_mmr.root(), original_root);
1958 for pos in *lower_bound_pos..*upper_bound_pos {
1959 let pos = Position::new(pos);
1960 assert_eq!(
1961 sync_mmr.get_node(pos).await.unwrap(),
1962 expected_nodes.get(&pos).cloned()
1963 );
1964 }
1965
1966 sync_mmr.destroy().await.unwrap();
1967 });
1968 }
1969
1970 #[test_traced]
1972 fn test_journaled_mmr_init_sync_partial_overlap() {
1973 let executor = deterministic::Runner::default();
1974 executor.start(|context| async move {
1975 let mut hasher = Standard::<Sha256>::new();
1976
1977 let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1979 .await
1980 .unwrap();
1981 for i in 0..30 {
1982 mmr.add(&mut hasher, &test_digest(i)).await.unwrap();
1983 }
1984 mmr.sync().await.unwrap();
1985 mmr.prune_to_pos(Position::new(10)).await.unwrap();
1986
1987 let original_size = mmr.size();
1988 let original_root = mmr.root();
1989 let original_pruned_to = mmr.pruned_to_pos();
1990
1991 let lower_bound_pos = original_pruned_to;
1993 let upper_bound_pos = original_size + 11; let mut expected_nodes = BTreeMap::new();
1996 for pos in *lower_bound_pos..*original_size {
1997 let pos = Position::new(pos);
1998 expected_nodes.insert(pos, mmr.get_node(pos).await.unwrap().unwrap());
1999 }
2000
2001 let sync_cfg = SyncConfig::<sha256::Digest> {
2002 config: test_config(),
2003 range: lower_bound_pos..upper_bound_pos,
2004 pinned_nodes: None,
2005 };
2006
2007 mmr.close().await.unwrap();
2008
2009 let sync_mmr = Mmr::init_sync(context.clone(), sync_cfg, &mut hasher)
2010 .await
2011 .unwrap();
2012
2013 assert_eq!(sync_mmr.size(), original_size);
2015 assert_eq!(sync_mmr.pruned_to_pos(), lower_bound_pos);
2016 assert_eq!(sync_mmr.oldest_retained_pos(), Some(lower_bound_pos));
2017 assert_eq!(sync_mmr.root(), original_root);
2018
2019 for pos in *lower_bound_pos..*original_size {
2021 let pos = Position::new(pos);
2022 assert_eq!(
2023 sync_mmr.get_node(pos).await.unwrap(),
2024 expected_nodes.get(&pos).cloned()
2025 );
2026 }
2027
2028 sync_mmr.destroy().await.unwrap();
2029 });
2030 }
2031}