1use crate::{
10 journal::{
11 contiguous::{
12 fixed::{Config as JConfig, Journal},
13 Many, Reader,
14 },
15 Error as JError,
16 },
17 merkle::{
18 batch,
19 hasher::Hasher,
20 mem::{Config as MemConfig, Mem},
21 Error, Family, Location, Position, Proof, Readable,
22 },
23 metadata::{Config as MConfig, Metadata},
24};
25use commonware_codec::DecodeExt;
26use commonware_cryptography::Digest;
27use commonware_parallel::Strategy;
28use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage as RStorage};
29use commonware_utils::{
30 range::NonEmptyRange,
31 sequence::prefixed_u64::U64,
32 sync::{AsyncMutex, RwLock},
33};
34use std::{
35 collections::BTreeMap,
36 num::{NonZeroU64, NonZeroUsize},
37 sync::Arc,
38};
39use tracing::{debug, error, warn};
40
41pub struct UnmerkleizedBatch<F: Family, D: Digest, S: Strategy> {
49 inner: batch::UnmerkleizedBatch<F, D, S>,
50}
51
52impl<F: Family, D: Digest, S: Strategy> UnmerkleizedBatch<F, D, S> {
53 pub fn add(self, hasher: &impl Hasher<F, Digest = D>, element: &[u8]) -> Self {
55 Self {
56 inner: self.inner.add(hasher, element),
57 }
58 }
59
60 pub fn add_leaf_digest(self, digest: D) -> Self {
62 Self {
63 inner: self.inner.add_leaf_digest(digest),
64 }
65 }
66
67 pub fn leaves(&self) -> Location<F> {
69 self.inner.leaves()
70 }
71
72 pub fn strategy(&self) -> &S {
74 self.inner.strategy()
75 }
76
77 pub fn merkleize(
80 self,
81 base: &Mem<F, D>,
82 hasher: &impl Hasher<F, Digest = D>,
83 ) -> Arc<batch::MerkleizedBatch<F, D, S>> {
84 self.inner.merkleize(base, hasher)
85 }
86}
87
88pub(crate) struct Inner<F: Family, D: Digest> {
90 pub(crate) mem: Mem<F, D>,
94
95 pub(crate) pruned_to_pos: Position<F>,
98}
99
100#[derive(Clone)]
102pub struct Config<S: Strategy> {
103 pub journal_partition: String,
106
107 pub metadata_partition: String,
111
112 pub items_per_blob: NonZeroU64,
114
115 pub write_buffer: NonZeroUsize,
117
118 pub strategy: S,
120
121 pub page_cache: CacheRef,
123}
124
125pub struct SyncConfig<F: Family, D: Digest, S: Strategy> {
132 pub config: Config<S>,
134
135 pub range: NonEmptyRange<Location<F>>,
137
138 pub pinned_nodes: Option<Vec<D>>,
142}
143
144pub struct Merkle<F: Family, E: RStorage + Clock + Metrics, D: Digest, S: Strategy> {
146 pub(crate) inner: RwLock<Inner<F, D>>,
148
149 pub(crate) journal: Journal<E, D>,
151
152 pub(crate) metadata: Metadata<E, U64, Vec<u8>>,
156
157 pub(crate) sync_lock: AsyncMutex<()>,
159
160 pub(crate) strategy: S,
162}
163
164const NODE_PREFIX: u8 = 0;
166
167pub(crate) const PRUNED_TO_PREFIX: u8 = 1;
169
170impl<F: Family, E: RStorage + Clock + Metrics, D: Digest, S: Strategy> Merkle<F, E, D, S> {
171 pub fn size(&self) -> Position<F> {
174 self.inner.read().mem.size()
175 }
176
177 pub fn leaves(&self) -> Location<F> {
179 self.inner.read().mem.leaves()
180 }
181
182 async fn get_from_metadata_or_journal(
186 metadata: &Metadata<E, U64, Vec<u8>>,
187 journal: &Journal<E, D>,
188 pos: Position<F>,
189 ) -> Result<D, Error<F>> {
190 if let Some(bytes) = metadata.get(&U64::new(NODE_PREFIX, *pos)) {
191 debug!(?pos, "read node from metadata");
192 let digest = D::decode(bytes.as_ref());
193 let Ok(digest) = digest else {
194 error!(
195 ?pos,
196 err = %digest.expect_err("digest is Err in else branch"),
197 "could not convert node from metadata bytes to digest"
198 );
199 return Err(Error::DataCorrupted(
200 "could not read digest at requested pos",
201 ));
202 };
203 return Ok(digest);
204 }
205
206 debug!(?pos, "reading node from journal");
208 let node = journal.reader().await.read(*pos).await;
209 match node {
210 Ok(node) => Ok(node),
211 Err(JError::ItemPruned(_)) => {
212 error!(?pos, "node is missing from metadata and journal");
213 Err(Error::MissingNode(pos))
214 }
215 Err(e) => Err(Error::Journal(e)),
216 }
217 }
218
219 pub fn bounds(&self) -> std::ops::Range<Location<F>> {
222 let inner = self.inner.read();
223 Location::try_from(inner.pruned_to_pos).expect("valid pruned_to_pos")..inner.mem.leaves()
224 }
225
226 async fn add_extra_pinned_nodes(
228 mem: &mut Mem<F, D>,
229 metadata: &Metadata<E, U64, Vec<u8>>,
230 journal: &Journal<E, D>,
231 prune_pos: Position<F>,
232 ) -> Result<(), Error<F>> {
233 let prune_loc = Location::try_from(prune_pos).expect("valid prune_pos");
234 let mut pinned_nodes = BTreeMap::new();
235 for pos in F::nodes_to_pin(prune_loc) {
236 let digest = Self::get_from_metadata_or_journal(metadata, journal, pos).await?;
237 pinned_nodes.insert(pos, digest);
238 }
239 mem.add_pinned_nodes(pinned_nodes);
240
241 Ok(())
242 }
243
244 pub async fn init(
246 context: E,
247 hasher: &impl Hasher<F, Digest = D>,
248 cfg: Config<S>,
249 ) -> Result<Self, Error<F>> {
250 let journal_cfg = JConfig {
251 partition: cfg.journal_partition,
252 items_per_blob: cfg.items_per_blob,
253 page_cache: cfg.page_cache,
254 write_buffer: cfg.write_buffer,
255 };
256 let journal = Journal::<E, D>::init(context.child("merkle_journal"), journal_cfg).await?;
257 let mut journal_size = Position::<F>::new(journal.size().await);
258
259 let metadata_cfg = MConfig {
260 partition: cfg.metadata_partition,
261 codec_config: ((0..).into(), ()),
262 };
263 let metadata =
264 Metadata::<_, U64, Vec<u8>>::init(context.child("merkle_metadata"), metadata_cfg)
265 .await?;
266
267 if journal_size == 0 {
268 let mem = Mem::init(MemConfig {
269 nodes: vec![],
270 pruning_boundary: Location::new(0),
271 pinned_nodes: vec![],
272 })?;
273 return Ok(Self {
274 inner: RwLock::new(Inner {
275 mem,
276 pruned_to_pos: Position::new(0),
277 }),
278 journal,
279 metadata,
280 sync_lock: AsyncMutex::new(()),
281 strategy: cfg.strategy,
282 });
283 }
284
285 let key: U64 = U64::new(PRUNED_TO_PREFIX, 0);
289 let metadata_pruned_to = Location::<F>::new(metadata.get(&key).map_or(0, |bytes| {
290 u64::from_be_bytes(
291 bytes
292 .as_slice()
293 .try_into()
294 .expect("metadata pruned_to is not 8 bytes"),
295 )
296 }));
297 let metadata_prune_pos = Position::try_from(metadata_pruned_to)?;
298 let journal_bounds_start = journal.reader().await.bounds().start;
299 if *metadata_prune_pos > journal_bounds_start {
300 journal.prune(*metadata_prune_pos).await?;
303 if journal.reader().await.bounds().start != journal_bounds_start {
304 warn!(
307 journal_bounds_start,
308 ?metadata_prune_pos,
309 "journal pruned to match metadata"
310 );
311 }
312 } else if *metadata_prune_pos < journal_bounds_start {
313 warn!(
316 ?metadata_prune_pos,
317 journal_bounds_start, "metadata stale, using journal pruning boundary"
318 );
319 }
320
321 let journal_boundary_pos = Position::<F>::new(journal_bounds_start);
327 let journal_boundary_floor = F::to_nearest_size(journal_boundary_pos);
328 let journal_boundary_leaf_aligned_pos = if journal_boundary_floor == journal_boundary_pos {
329 journal_boundary_floor
332 } else {
333 Position::try_from(Location::try_from(journal_boundary_floor)? + 1)?
336 };
337 let effective_prune_pos =
338 std::cmp::max(metadata_prune_pos, journal_boundary_leaf_aligned_pos);
339
340 let last_valid_size = F::to_nearest_size(journal_size);
341 let mut orphaned_leaf: Option<D> = None;
342 if last_valid_size != journal_size {
343 warn!(
344 ?last_valid_size,
345 "encountered invalid structure, recovering from last valid size"
346 );
347 let recovered_item = journal.reader().await.read(*last_valid_size).await;
350 if let Ok(item) = recovered_item {
351 orphaned_leaf = Some(item);
352 }
353 journal.rewind(*last_valid_size).await?;
354 journal.sync().await?;
355 journal_size = last_valid_size
356 }
357
358 let journal_leaves = Location::try_from(journal_size)?;
360 let mut pinned_nodes = Vec::new();
361 for pos in F::nodes_to_pin(journal_leaves) {
362 let digest = Self::get_from_metadata_or_journal(&metadata, &journal, pos).await?;
363 pinned_nodes.push(digest);
364 }
365 let mut mem = Mem::init(MemConfig {
366 nodes: vec![],
367 pruning_boundary: journal_leaves,
368 pinned_nodes,
369 })?;
370 Self::add_extra_pinned_nodes(&mut mem, &metadata, &journal, effective_prune_pos).await?;
371
372 if let Some(leaf) = orphaned_leaf {
373 let pos = mem.size();
375 warn!(?pos, "recovering orphaned leaf");
376 let batch = mem
377 .new_batch()
378 .add_leaf_digest(leaf)
379 .merkleize(&mem, hasher);
380 mem.apply_batch(&batch)?;
381 assert_eq!(pos, journal_size);
382
383 for p in journal.size().await..*mem.size() {
385 let p = Position::new(p);
386 let node = *mem.get_node_unchecked(p);
387 journal.append(&node).await?;
388 }
389 journal.sync().await?;
390 assert_eq!(mem.size(), journal.size().await);
391
392 let effective_prune_loc =
394 Location::try_from(effective_prune_pos).expect("valid effective_prune_pos");
395 let mut pn = BTreeMap::new();
396 for p in F::nodes_to_pin(effective_prune_loc) {
397 let d = mem.get_node_unchecked(p);
398 pn.insert(p, *d);
399 }
400 mem.prune_all();
401 mem.add_pinned_nodes(pn);
402 }
403
404 Ok(Self {
405 inner: RwLock::new(Inner {
406 mem,
407 pruned_to_pos: effective_prune_pos,
408 }),
409 journal,
410 metadata,
411 sync_lock: AsyncMutex::new(()),
412 strategy: cfg.strategy,
413 })
414 }
415
416 pub async fn init_sync(context: E, cfg: SyncConfig<F, D, S>) -> Result<Self, Error<F>> {
431 let prune_pos = Position::try_from(cfg.range.start())?;
432 let end_pos = Position::try_from(cfg.range.end())?;
433 let journal_cfg = JConfig {
434 partition: cfg.config.journal_partition.clone(),
435 items_per_blob: cfg.config.items_per_blob,
436 write_buffer: cfg.config.write_buffer,
437 page_cache: cfg.config.page_cache.clone(),
438 };
439
440 let journal: Journal<E, D> =
442 Journal::init(context.child("merkle_journal"), journal_cfg).await?;
443 let mut journal_size = Position::<F>::new(journal.size().await);
444
445 let last_valid_size = F::to_nearest_size(journal_size);
448 if last_valid_size != journal_size {
449 warn!(
450 ?last_valid_size,
451 "init_sync: encountered invalid structure, recovering from last valid size"
452 );
453 journal.rewind(*last_valid_size).await?;
454 journal.sync().await?;
455 journal_size = last_valid_size;
456 }
457
458 if journal_size > *end_pos {
460 return Err(crate::journal::Error::ItemOutOfRange(*journal_size).into());
461 }
462 if journal_size <= *prune_pos && *prune_pos != 0 {
463 journal.clear_to_size(*prune_pos).await?;
464 journal_size = Position::new(journal.size().await);
465 }
466
467 let metadata_cfg = MConfig {
469 partition: cfg.config.metadata_partition,
470 codec_config: ((0..).into(), ()),
471 };
472 let mut metadata = Metadata::init(context.child("merkle_metadata"), metadata_cfg).await?;
473
474 let pruning_boundary_key = U64::new(PRUNED_TO_PREFIX, 0);
476 metadata.put(
477 pruning_boundary_key,
478 cfg.range.start().as_u64().to_be_bytes().into(),
479 );
480
481 let prune_loc = Location::try_from(prune_pos)?;
485 let journal_leaves = Location::try_from(journal_size)?;
486 if let Some(pinned_nodes) = cfg.pinned_nodes {
487 let nodes_to_pin_persisted: Vec<_> = F::nodes_to_pin(prune_loc).collect();
489 if pinned_nodes.len() != nodes_to_pin_persisted.len() {
490 return Err(Error::<F>::InvalidPinnedNodes);
491 }
492 for (pos, digest) in nodes_to_pin_persisted.into_iter().zip(pinned_nodes.iter()) {
493 metadata.put(U64::new(NODE_PREFIX, *pos), digest.to_vec());
494 }
495 }
496
497 let nodes_to_pin_mem = F::nodes_to_pin(journal_leaves);
501 let mut mem_pinned_nodes = Vec::new();
502 for pos in nodes_to_pin_mem {
503 let digest = Self::get_from_metadata_or_journal(&metadata, &journal, pos).await?;
504 mem_pinned_nodes.push(digest);
505 }
506 let mut mem = Mem::init(MemConfig {
507 nodes: vec![],
508 pruning_boundary: Location::try_from(journal_size)?,
509 pinned_nodes: mem_pinned_nodes,
510 })?;
511
512 if prune_pos < journal_size {
515 Self::add_extra_pinned_nodes(&mut mem, &metadata, &journal, prune_pos).await?;
516 }
517
518 metadata.sync().await?;
520
521 journal.prune(*prune_pos).await?;
523
524 Ok(Self {
525 inner: RwLock::new(Inner {
526 mem,
527 pruned_to_pos: prune_pos,
528 }),
529 journal,
530 metadata,
531 sync_lock: AsyncMutex::new(()),
532 strategy: cfg.config.strategy,
533 })
534 }
535
536 async fn update_metadata(
539 &mut self,
540 prune_to_pos: Position<F>,
541 ) -> Result<BTreeMap<Position<F>, D>, Error<F>> {
542 assert!(prune_to_pos >= self.inner.get_mut().pruned_to_pos);
543
544 let prune_loc = Location::try_from(prune_to_pos).expect("valid prune_to_pos");
545 let mut pinned_nodes = BTreeMap::new();
546 for pos in F::nodes_to_pin(prune_loc) {
547 let digest = self.get_node(pos).await?.expect(
548 "pinned node should exist if prune_to_pos is no less than self.pruned_to_pos",
549 );
550 self.metadata
551 .put(U64::new(NODE_PREFIX, *pos), digest.to_vec());
552 pinned_nodes.insert(pos, digest);
553 }
554
555 let key: U64 = U64::new(PRUNED_TO_PREFIX, 0);
556 self.metadata
557 .put_sync(
558 key,
559 Location::try_from(prune_to_pos)?
560 .as_u64()
561 .to_be_bytes()
562 .into(),
563 )
564 .await
565 .map_err(Error::Metadata)?;
566
567 Ok(pinned_nodes)
568 }
569
570 pub async fn get_node(&self, position: Position<F>) -> Result<Option<D>, Error<F>> {
571 {
572 let inner = self.inner.read();
573 if let Some(node) = inner.mem.get_node(position) {
574 return Ok(Some(node));
575 }
576 }
577
578 match self.journal.reader().await.read(*position).await {
579 Ok(item) => Ok(Some(item)),
580 Err(JError::ItemPruned(_)) => Ok(None),
581 Err(e) => Err(Error::Journal(e)),
582 }
583 }
584
585 pub async fn pinned_nodes_at(&self, loc: Location<F>) -> Result<Vec<D>, Error<F>> {
587 if !loc.is_valid() {
588 return Err(Error::LocationOverflow(loc));
589 }
590 let futs = F::nodes_to_pin(loc)
591 .map(|p| async move { self.get_node(p).await?.ok_or(Error::ElementPruned(p)) })
592 .collect::<Vec<_>>();
593 futures::future::try_join_all(futs).await
594 }
595
596 pub async fn sync(&self) -> Result<(), Error<F>> {
598 let _sync_guard = self.sync_lock.lock().await;
599
600 let journal_size = Position::<F>::new(self.journal.size().await);
601
602 let (sync_target_leaves, missing_nodes, pinned_nodes) = {
605 let inner = self.inner.read();
606 let size = inner.mem.size();
607 let sync_target_leaves = inner.mem.leaves();
608
609 assert!(
610 journal_size <= size,
611 "journal size should never exceed in-memory structure size"
612 );
613 if journal_size == size {
614 return Ok(());
615 }
616
617 let mut missing_nodes = Vec::with_capacity((*size - *journal_size) as usize);
618 for pos in *journal_size..*size {
619 let node = *inner.mem.get_node_unchecked(Position::new(pos));
620 missing_nodes.push(node);
621 }
622
623 let prune_loc = Location::try_from(inner.pruned_to_pos).expect("valid pruned_to_pos");
626 let mut pinned_nodes = BTreeMap::new();
627 for pos in F::nodes_to_pin(prune_loc) {
628 let digest = inner.mem.get_node_unchecked(pos);
629 pinned_nodes.insert(pos, *digest);
630 }
631
632 (sync_target_leaves, missing_nodes, pinned_nodes)
633 };
634
635 self.journal.append_many(Many::Flat(&missing_nodes)).await?;
637
638 self.journal.sync().await?;
640
641 {
645 let mut inner = self.inner.write();
646 inner
647 .mem
648 .prune(sync_target_leaves)
649 .expect("captured leaves is in bounds");
650 inner.mem.add_pinned_nodes(pinned_nodes);
651 }
652
653 Ok(())
654 }
655
656 pub async fn prune(&mut self, loc: Location<F>) -> Result<(), Error<F>> {
664 let pos = Position::try_from(loc)?;
665 {
666 let inner = self.inner.get_mut();
667 if loc > inner.mem.leaves() {
668 return Err(Error::LeafOutOfBounds(loc));
669 }
670 if pos <= inner.pruned_to_pos {
671 return Ok(());
672 }
673 }
674
675 self.sync().await?;
677
678 let pinned_nodes = self.update_metadata(pos).await?;
681
682 self.journal.prune(*pos).await?;
683 let inner = self.inner.get_mut();
684 inner.mem.add_pinned_nodes(pinned_nodes);
685 inner.pruned_to_pos = pos;
686
687 Ok(())
688 }
689
690 pub fn root(
692 &self,
693 hasher: &impl Hasher<F, Digest = D>,
694 inactive_peaks: usize,
695 ) -> Result<D, Error<F>> {
696 self.inner.read().mem.root(hasher, inactive_peaks)
697 }
698
699 pub async fn prune_all(&mut self) -> Result<(), Error<F>> {
702 let leaves = self.inner.get_mut().mem.leaves();
703 if leaves != 0 {
704 self.prune(leaves).await?;
705 }
706 Ok(())
707 }
708
709 pub async fn destroy(self) -> Result<(), Error<F>> {
711 self.journal.destroy().await?;
712 self.metadata.destroy().await?;
713
714 Ok(())
715 }
716
717 #[cfg(any(test, feature = "fuzzing"))]
718 pub async fn simulate_partial_sync(&mut self, write_limit: usize) -> Result<(), Error<F>> {
721 if write_limit == 0 {
722 return Ok(());
723 }
724
725 let inner = self.inner.get_mut();
726 let journal_size = Position::<F>::new(self.journal.size().await);
727
728 let mut written_count = 0usize;
731 for i in *journal_size..*inner.mem.size() {
732 let node = *inner.mem.get_node_unchecked(Position::new(i));
733 self.journal.append(&node).await?;
734 written_count += 1;
735 if written_count >= write_limit {
736 break;
737 }
738 }
739 self.journal.sync().await?;
740
741 Ok(())
742 }
743
744 #[cfg(test)]
745 pub fn get_pinned_nodes(&self) -> BTreeMap<Position<F>, D> {
747 self.inner.read().mem.pinned_nodes()
748 }
749
750 #[cfg(test)]
751 pub async fn simulate_pruning_failure(mut self, prune_to: Location<F>) -> Result<(), Error<F>> {
753 let prune_to_pos = Position::try_from(prune_to)?;
754 assert!(prune_to_pos <= self.inner.get_mut().mem.size());
755
756 self.sync().await?;
758
759 self.update_metadata(prune_to_pos).await?;
762
763 Ok(())
765 }
766
767 pub fn apply_batch(&self, batch: &batch::MerkleizedBatch<F, D, S>) -> Result<(), Error<F>> {
774 self.inner.write().mem.apply_batch(batch)?;
775 Ok(())
776 }
777
778 pub(crate) fn to_batch(&self) -> Arc<batch::MerkleizedBatch<F, D, S>> {
783 let inner = self.inner.read();
784 batch::MerkleizedBatch::from_mem_with_strategy(&inner.mem, self.strategy.clone())
785 }
786
787 pub fn with_mem<R>(&self, f: impl FnOnce(&Mem<F, D>) -> R) -> R {
790 let inner = self.inner.read();
791 f(&inner.mem)
792 }
793
794 pub fn new_batch(&self) -> UnmerkleizedBatch<F, D, S> {
796 let inner = self.inner.read();
797 UnmerkleizedBatch {
798 inner: inner.mem.new_batch_with_strategy(self.strategy.clone()),
799 }
800 }
801
802 pub const fn strategy(&self) -> &S {
804 &self.strategy
805 }
806
807 pub(crate) async fn rewind(&mut self, leaves_to_remove: usize) -> Result<(), Error<F>> {
815 if leaves_to_remove == 0 {
816 return Ok(());
817 }
818
819 let current_leaves = *self.leaves();
820 let destination_leaf = match current_leaves.checked_sub(leaves_to_remove as u64) {
821 Some(dest) => dest,
822 None => {
823 let pruned_to_pos = self.inner.get_mut().pruned_to_pos;
824 return Err(if pruned_to_pos == 0 {
825 Error::Empty
826 } else {
827 Error::ElementPruned(pruned_to_pos - 1)
828 });
829 }
830 };
831
832 let destination_loc = Location::new(destination_leaf);
833 let new_size = Position::try_from(destination_loc).expect("valid leaf");
834
835 let pruned_to_pos = self.inner.get_mut().pruned_to_pos;
836 if new_size < pruned_to_pos {
837 return Err(Error::ElementPruned(new_size));
838 }
839
840 let journal_size = Position::<F>::new(self.journal.size().await);
842 if new_size < journal_size {
843 self.journal.rewind(*new_size).await?;
844 self.journal.sync().await?;
845 }
846
847 let inner = self.inner.get_mut();
851 if new_size >= Position::try_from(inner.mem.bounds().start).expect("valid mem bounds start")
852 {
853 inner.mem.truncate(new_size);
854 } else {
855 let mut pinned_nodes = Vec::new();
856 for pos in F::nodes_to_pin(destination_loc) {
857 pinned_nodes.push(
858 Self::get_from_metadata_or_journal(&self.metadata, &self.journal, pos).await?,
859 );
860 }
861 inner.mem = Mem::init(MemConfig {
862 nodes: vec![],
863 pruning_boundary: destination_loc,
864 pinned_nodes,
865 })?;
866 Self::add_extra_pinned_nodes(
867 &mut inner.mem,
868 &self.metadata,
869 &self.journal,
870 inner.pruned_to_pos,
871 )
872 .await?;
873 }
874
875 Ok(())
876 }
877}
878
879impl<F: Family, E: RStorage + Clock + Metrics, D: Digest, S: Strategy> Readable
887 for Merkle<F, E, D, S>
888{
889 type Family = F;
890 type Digest = D;
891 type Error = Error<F>;
892
893 fn size(&self) -> Position<F> {
894 self.size()
895 }
896
897 fn get_node(&self, pos: Position<F>) -> Option<D> {
898 self.inner.read().mem.get_node(pos)
899 }
900
901 fn pruning_boundary(&self) -> Location<F> {
902 self.inner.read().mem.pruning_boundary()
903 }
904}
905
906impl<F: Family, E: RStorage + Clock + Metrics + Sync, D: Digest, S: Strategy>
907 crate::merkle::storage::Storage<F> for Merkle<F, E, D, S>
908{
909 type Digest = D;
910
911 async fn size(&self) -> Position<F> {
912 self.size()
913 }
914
915 async fn get_node(&self, position: Position<F>) -> Result<Option<D>, Error<F>> {
916 Self::get_node(self, position).await
917 }
918}
919
920impl<F: Family, E: RStorage + Clock + Metrics, D: Digest, S: Strategy> Merkle<F, E, D, S> {
921 pub async fn historical_proof(
934 &self,
935 hasher: &impl Hasher<F, Digest = D>,
936 leaves: Location<F>,
937 loc: Location<F>,
938 inactive_peaks: usize,
939 ) -> Result<Proof<F, D>, Error<F>> {
940 if !loc.is_valid_index() {
941 return Err(Error::LocationOverflow(loc));
942 }
943 self.historical_range_proof(hasher, leaves, loc..loc + 1, inactive_peaks)
945 .await
946 }
947
948 pub async fn historical_range_proof(
962 &self,
963 hasher: &impl Hasher<F, Digest = D>,
964 leaves: Location<F>,
965 range: core::ops::Range<Location<F>>,
966 inactive_peaks: usize,
967 ) -> Result<Proof<F, D>, Error<F>> {
968 if leaves > self.leaves() {
969 return Err(Error::RangeOutOfBounds(leaves));
970 }
971 crate::merkle::verification::historical_range_proof(
972 hasher,
973 self,
974 leaves,
975 range,
976 inactive_peaks,
977 )
978 .await
979 }
980
981 pub async fn proof(
996 &self,
997 hasher: &impl Hasher<F, Digest = D>,
998 loc: Location<F>,
999 inactive_peaks: usize,
1000 ) -> Result<Proof<F, D>, Error<F>> {
1001 if !loc.is_valid_index() {
1002 return Err(Error::LocationOverflow(loc));
1003 }
1004 self.range_proof(hasher, loc..loc + 1, inactive_peaks).await
1006 }
1007
1008 pub async fn range_proof(
1022 &self,
1023 hasher: &impl Hasher<F, Digest = D>,
1024 range: core::ops::Range<Location<F>>,
1025 inactive_peaks: usize,
1026 ) -> Result<Proof<F, D>, Error<F>> {
1027 self.historical_range_proof(hasher, self.leaves(), range, inactive_peaks)
1028 .await
1029 }
1030}
1031
1032#[cfg(test)]
1033mod tests {
1034 use super::*;
1035 use crate::{
1036 journal::contiguous::fixed::{Config as JConfig, Journal},
1037 merkle::{
1038 hasher::Standard, mmb, mmr, Bagging::ForwardFold, Location, LocationRangeExt as _,
1039 Position, Proof,
1040 },
1041 metadata::{Config as MConfig, Metadata},
1042 };
1043 use commonware_cryptography::{
1044 sha256::{self, Digest},
1045 Hasher as _, Sha256,
1046 };
1047 use commonware_macros::test_traced;
1048 use commonware_parallel::Sequential;
1049 use commonware_runtime::{
1050 buffer::paged::CacheRef, deterministic, BufferPooler, Runner, Supervisor as _,
1051 };
1052 use commonware_utils::{non_empty_range, sequence::prefixed_u64::U64, NZUsize, NZU16, NZU64};
1053 use std::{
1054 collections::BTreeMap,
1055 num::{NonZeroU16, NonZeroUsize},
1056 };
1057
1058 fn test_digest(v: usize) -> Digest {
1059 Sha256::hash(&v.to_be_bytes())
1060 }
1061
1062 const PAGE_SIZE: NonZeroU16 = NZU16!(111);
1063 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(5);
1064
1065 fn test_config(pooler: &impl BufferPooler) -> Config<Sequential> {
1066 Config {
1067 journal_partition: "journal-partition".into(),
1068 metadata_partition: "metadata-partition".into(),
1069 items_per_blob: NZU64!(7),
1070 write_buffer: NZUsize!(1024),
1071 strategy: Sequential,
1072 page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
1073 }
1074 }
1075
1076 async fn full_empty_inner<F: Family>(context: deterministic::Context) {
1077 let hasher: Standard<Sha256> = Standard::new(ForwardFold);
1078 let mut mmr = Merkle::<F, _, Digest, Sequential>::init(
1079 context.child("first"),
1080 &hasher,
1081 test_config(&context),
1082 )
1083 .await
1084 .unwrap();
1085 assert_eq!(mmr.size(), 0);
1086 assert!(mmr.get_node(Position::<F>::new(0)).await.is_err());
1087 let bounds = mmr.bounds();
1088 assert!(bounds.is_empty());
1089 assert!(mmr.prune_all().await.is_ok());
1090 assert_eq!(bounds.start, 0);
1091 assert!(mmr.prune(Location::<F>::new(0)).await.is_ok());
1092 assert!(mmr.sync().await.is_ok());
1093 assert!(matches!(mmr.rewind(1).await, Err(Error::Empty)));
1094
1095 let batch = mmr.new_batch().add(&hasher, &test_digest(0));
1096 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1097 mmr.apply_batch(&batch).unwrap();
1098 assert_eq!(mmr.size(), 1);
1099 mmr.sync().await.unwrap();
1100 assert!(mmr.get_node(Position::<F>::new(0)).await.is_ok());
1101 assert!(mmr.rewind(1).await.is_ok());
1102 assert_eq!(mmr.size(), 0);
1103 mmr.sync().await.unwrap();
1104
1105 let mmr = Merkle::<F, _, Digest, Sequential>::init(
1106 context.child("second"),
1107 &hasher,
1108 test_config(&context),
1109 )
1110 .await
1111 .unwrap();
1112 assert_eq!(mmr.size(), 0);
1113
1114 let empty_proof = Proof::<F, Digest>::default();
1115 let hasher: Standard<Sha256> = Standard::new(ForwardFold);
1116 let root = mmr.root(&hasher, 0).unwrap();
1117 assert!(empty_proof.verify_range_inclusion(
1118 &hasher,
1119 &[] as &[Digest],
1120 Location::<F>::new(0),
1121 &root
1122 ));
1123 assert!(empty_proof.verify_multi_inclusion(
1124 &hasher,
1125 &[] as &[(Digest, Location<F>)],
1126 &root
1127 ));
1128
1129 let batch = mmr.new_batch().add(&hasher, &test_digest(0));
1131 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1132 mmr.apply_batch(&batch).unwrap();
1133 let root = mmr.root(&hasher, 0).unwrap();
1134 assert!(!empty_proof.verify_range_inclusion(
1135 &hasher,
1136 &[] as &[Digest],
1137 Location::<F>::new(0),
1138 &root
1139 ));
1140 assert!(!empty_proof.verify_multi_inclusion(
1141 &hasher,
1142 &[] as &[(Digest, Location<F>)],
1143 &root
1144 ));
1145
1146 mmr.destroy().await.unwrap();
1147 }
1148
1149 #[test_traced]
1150 fn test_full_empty_mmr() {
1151 let executor = deterministic::Runner::default();
1152 executor.start(full_empty_inner::<mmr::Family>);
1153 }
1154
1155 #[test_traced]
1156 fn test_full_empty_mmb() {
1157 let executor = deterministic::Runner::default();
1158 executor.start(full_empty_inner::<mmb::Family>);
1159 }
1160
1161 async fn full_prune_out_of_bounds_returns_error_inner<F: Family>(
1162 context: deterministic::Context,
1163 ) {
1164 let hasher = Standard::<Sha256>::new(ForwardFold);
1165 let mut mmr = Merkle::<F, _, Digest, Sequential>::init(
1166 context.child("oob_prune"),
1167 &hasher,
1168 test_config(&context),
1169 )
1170 .await
1171 .unwrap();
1172
1173 let batch = mmr.new_batch().add(&hasher, &test_digest(0));
1174 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1175 mmr.apply_batch(&batch).unwrap();
1176
1177 assert!(matches!(
1178 mmr.prune(Location::<F>::new(2)).await,
1179 Err(Error::LeafOutOfBounds(loc)) if loc == Location::<F>::new(2)
1180 ));
1181
1182 mmr.destroy().await.unwrap();
1183 }
1184
1185 #[test_traced]
1186 fn test_full_prune_out_of_bounds_returns_error_mmr() {
1187 let executor = deterministic::Runner::default();
1188 executor.start(full_prune_out_of_bounds_returns_error_inner::<mmr::Family>);
1189 }
1190
1191 #[test_traced]
1192 fn test_full_prune_out_of_bounds_returns_error_mmb() {
1193 let executor = deterministic::Runner::default();
1194 executor.start(full_prune_out_of_bounds_returns_error_inner::<mmb::Family>);
1195 }
1196
1197 async fn full_rewind_error_leaves_valid_state_inner<F: Family>(
1198 context: deterministic::Context,
1199 ) {
1200 let hasher: Standard<Sha256> = Standard::new(ForwardFold);
1201
1202 let element_pruned_context = context.child("element_pruned_case");
1204 let mut mmr = Merkle::<F, _, Digest, Sequential>::init(
1205 element_pruned_context.child("element_pruned"),
1206 &hasher,
1207 test_config(&element_pruned_context),
1208 )
1209 .await
1210 .unwrap();
1211 let mut batch = mmr.new_batch();
1212 for i in 0u64..32 {
1213 batch = batch.add(&hasher, &i.to_be_bytes());
1214 }
1215 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1216 mmr.apply_batch(&batch).unwrap();
1217 mmr.prune(Location::<F>::new(8)).await.unwrap();
1218 let leaves_before = mmr.leaves();
1219 assert!(matches!(
1220 mmr.rewind(128).await,
1221 Err(Error::ElementPruned(_))
1222 ));
1223 assert!(mmr.leaves() <= leaves_before);
1225 mmr.destroy().await.unwrap();
1226
1227 let empty_context = context.child("empty_case");
1229 let cfg = test_config(&empty_context);
1230 let mut mmr = Merkle::<F, _, Digest, Sequential>::init(empty_context, &hasher, cfg)
1231 .await
1232 .unwrap();
1233 let mut batch = mmr.new_batch();
1234 for i in 0u64..8 {
1235 batch = batch.add(&hasher, &i.to_be_bytes());
1236 }
1237 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1238 mmr.apply_batch(&batch).unwrap();
1239 let leaves_before = mmr.leaves();
1240 assert!(matches!(mmr.rewind(9).await, Err(Error::Empty)));
1241 assert_eq!(mmr.leaves(), leaves_before);
1243 mmr.destroy().await.unwrap();
1244 }
1245
1246 #[test_traced]
1247 fn test_full_rewind_error_leaves_valid_state_mmr() {
1248 let executor = deterministic::Runner::default();
1249 executor.start(full_rewind_error_leaves_valid_state_inner::<mmr::Family>);
1250 }
1251
1252 #[test_traced]
1253 fn test_full_rewind_error_leaves_valid_state_mmb() {
1254 let executor = deterministic::Runner::default();
1255 executor.start(full_rewind_error_leaves_valid_state_inner::<mmb::Family>);
1256 }
1257
1258 async fn full_basic_inner<F: Family>(context: deterministic::Context) {
1259 let hasher: Standard<Sha256> = Standard::new(ForwardFold);
1260 let cfg = test_config(&context);
1261 let mmr = Merkle::<F, _, Digest, Sequential>::init(context, &hasher, cfg)
1262 .await
1263 .unwrap();
1264 const LEAF_COUNT: usize = 255;
1266 let mut leaves = Vec::with_capacity(LEAF_COUNT);
1267 for i in 0..LEAF_COUNT {
1268 leaves.push(test_digest(i));
1269 }
1270 let mut batch = mmr.new_batch();
1271 for leaf in &leaves {
1272 batch = batch.add(&hasher, leaf);
1273 }
1274 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1275 mmr.apply_batch(&batch).unwrap();
1276 let expected_size = Position::<F>::try_from(Location::<F>::new(LEAF_COUNT as u64)).unwrap();
1277 assert_eq!(mmr.size(), expected_size);
1278
1279 const TEST_ELEMENT: usize = 133;
1281 let test_element_loc: Location<F> = Location::new(TEST_ELEMENT as u64);
1282
1283 let proof = mmr.proof(&hasher, test_element_loc, 0).await.unwrap();
1284 let root = mmr.root(&hasher, 0).unwrap();
1285 assert!(proof.verify_element_inclusion(
1286 &hasher,
1287 &leaves[TEST_ELEMENT],
1288 test_element_loc,
1289 &root
1290 ));
1291
1292 mmr.sync().await.unwrap();
1294
1295 let proof2 = mmr.proof(&hasher, test_element_loc, 0).await.unwrap();
1298 assert_eq!(proof, proof2);
1299
1300 let range = Location::<F>::new(TEST_ELEMENT as u64)..Location::<F>::new(LEAF_COUNT as u64);
1302 let proof = mmr.range_proof(&hasher, range.clone(), 0).await.unwrap();
1303 assert!(proof.verify_range_inclusion(
1304 &hasher,
1305 &leaves[range.to_usize_range()],
1306 test_element_loc,
1307 &root
1308 ));
1309
1310 mmr.destroy().await.unwrap();
1311 }
1312
1313 #[test_traced]
1314 fn test_full_basic_mmr() {
1315 let executor = deterministic::Runner::default();
1316 executor.start(full_basic_inner::<mmr::Family>);
1317 }
1318
1319 #[test_traced]
1320 fn test_full_basic_mmb() {
1321 let executor = deterministic::Runner::default();
1322 executor.start(full_basic_inner::<mmb::Family>);
1323 }
1324
1325 async fn full_recovery_inner<F: Family>(context: deterministic::Context) {
1328 use crate::journal::contiguous::fixed::{Config as JConfig, Journal};
1329
1330 let hasher: Standard<Sha256> = Standard::new(ForwardFold);
1331 let mmr = Merkle::<F, _, Digest, Sequential>::init(
1332 context.child("first"),
1333 &hasher,
1334 test_config(&context),
1335 )
1336 .await
1337 .unwrap();
1338 assert_eq!(mmr.size(), 0);
1339
1340 const LEAF_COUNT: usize = 252;
1342 let mut leaves = Vec::with_capacity(LEAF_COUNT);
1343 for i in 0..LEAF_COUNT {
1344 leaves.push(test_digest(i));
1345 }
1346 let mut batch = mmr.new_batch();
1347 for leaf in &leaves {
1348 batch = batch.add(&hasher, leaf);
1349 }
1350 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1351 mmr.apply_batch(&batch).unwrap();
1352 let expected_size = Position::<F>::try_from(Location::<F>::new(LEAF_COUNT as u64)).unwrap();
1353 assert_eq!(mmr.size(), expected_size);
1354 mmr.sync().await.unwrap();
1355 drop(mmr);
1356
1357 {
1360 let journal: Journal<_, Digest> = Journal::init(
1361 context.child("corrupt"),
1362 JConfig {
1363 partition: "journal-partition".into(),
1364 items_per_blob: NZU64!(7),
1365 write_buffer: NZUsize!(1024),
1366 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1367 },
1368 )
1369 .await
1370 .unwrap();
1371 assert_eq!(journal.size().await, expected_size);
1372 journal.append(&Sha256::hash(b"orphan")).await.unwrap();
1373 journal.sync().await.unwrap();
1374 assert_eq!(journal.size().await, expected_size + 1);
1375 }
1376
1377 let mmr = Merkle::<F, _, Digest, Sequential>::init(
1378 context.child("second"),
1379 &hasher,
1380 test_config(&context),
1381 )
1382 .await
1383 .unwrap();
1384 let recovered_size =
1387 Position::<F>::try_from(Location::<F>::new(LEAF_COUNT as u64 + 1)).unwrap();
1388 assert_eq!(mmr.size(), recovered_size);
1389
1390 drop(mmr);
1392 let mmr = Merkle::<F, _, Digest, Sequential>::init(
1393 context.child("third"),
1394 &hasher,
1395 test_config(&context),
1396 )
1397 .await
1398 .unwrap();
1399 assert_eq!(mmr.size(), recovered_size);
1400
1401 mmr.destroy().await.unwrap();
1402 }
1403
1404 #[test_traced]
1405 fn test_full_recovery_mmr() {
1406 let executor = deterministic::Runner::default();
1407 executor.start(full_recovery_inner::<mmr::Family>);
1408 }
1409
1410 #[test_traced]
1411 fn test_full_recovery_mmb() {
1412 let executor = deterministic::Runner::default();
1413 executor.start(full_recovery_inner::<mmb::Family>);
1414 }
1415
1416 async fn full_pruning_inner<F: Family>(context: deterministic::Context) {
1417 let hasher: Standard<Sha256> = Standard::new(ForwardFold);
1418 const LEAF_COUNT: usize = 2000;
1420 let cfg_pruned = test_config(&context);
1421 let mut pruned_mmr = Merkle::<F, _, Digest, Sequential>::init(
1422 context.child("pruned"),
1423 &hasher,
1424 cfg_pruned.clone(),
1425 )
1426 .await
1427 .unwrap();
1428 let cfg_unpruned = Config {
1429 journal_partition: "unpruned-journal-partition".into(),
1430 metadata_partition: "unpruned-metadata-partition".into(),
1431 items_per_blob: NZU64!(7),
1432 write_buffer: NZUsize!(1024),
1433 strategy: Sequential,
1434 page_cache: cfg_pruned.page_cache.clone(),
1435 };
1436 let mmr = Merkle::<F, _, Digest, Sequential>::init(
1437 context.child("unpruned"),
1438 &hasher,
1439 cfg_unpruned,
1440 )
1441 .await
1442 .unwrap();
1443 let mut leaves = Vec::with_capacity(LEAF_COUNT);
1444 for i in 0..LEAF_COUNT {
1445 leaves.push(test_digest(i));
1446 }
1447 let mut batch = mmr.new_batch();
1448 for leaf in &leaves {
1449 batch = batch.add(&hasher, leaf);
1450 }
1451 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1452 mmr.apply_batch(&batch).unwrap();
1453 let mut batch = pruned_mmr.new_batch();
1454 for leaf in &leaves {
1455 batch = batch.add(&hasher, leaf);
1456 }
1457 let batch = pruned_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1458 pruned_mmr.apply_batch(&batch).unwrap();
1459 let expected_size = Position::<F>::try_from(Location::<F>::new(LEAF_COUNT as u64)).unwrap();
1460 assert_eq!(mmr.size(), expected_size);
1461 assert_eq!(pruned_mmr.size(), expected_size);
1462
1463 for i in 0usize..300 {
1466 let prune_loc = Location::<F>::new(std::cmp::min(i as u64 * 10, *pruned_mmr.leaves()));
1467 pruned_mmr.prune(prune_loc).await.unwrap();
1468 assert_eq!(prune_loc, pruned_mmr.bounds().start);
1469
1470 let digest = test_digest(LEAF_COUNT + i);
1471 leaves.push(digest);
1472 let last_leaf = leaves.last().unwrap();
1473 let batch = pruned_mmr.new_batch().add(&hasher, last_leaf);
1474 let batch = pruned_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1475 pruned_mmr.apply_batch(&batch).unwrap();
1476 let batch = mmr.new_batch().add(&hasher, last_leaf);
1477 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1478 mmr.apply_batch(&batch).unwrap();
1479 assert_eq!(
1480 pruned_mmr.root(&hasher, 0).unwrap(),
1481 mmr.root(&hasher, 0).unwrap()
1482 );
1483 }
1484
1485 pruned_mmr.sync().await.unwrap();
1487 assert_eq!(
1488 pruned_mmr.root(&hasher, 0).unwrap(),
1489 mmr.root(&hasher, 0).unwrap()
1490 );
1491
1492 pruned_mmr.sync().await.unwrap();
1494 drop(pruned_mmr);
1495 let mut pruned_mmr = Merkle::<F, _, Digest, Sequential>::init(
1496 context.child("pruned_reopen"),
1497 &hasher,
1498 cfg_pruned.clone(),
1499 )
1500 .await
1501 .unwrap();
1502 assert_eq!(
1503 pruned_mmr.root(&hasher, 0).unwrap(),
1504 mmr.root(&hasher, 0).unwrap()
1505 );
1506
1507 let size = pruned_mmr.size();
1509 pruned_mmr.prune_all().await.unwrap();
1510 assert_eq!(
1511 pruned_mmr.root(&hasher, 0).unwrap(),
1512 mmr.root(&hasher, 0).unwrap()
1513 );
1514 let bounds = pruned_mmr.bounds();
1515 assert!(bounds.is_empty());
1516 assert_eq!(bounds.start, Location::<F>::try_from(size).unwrap());
1517
1518 let batch = mmr.new_batch().add(&hasher, &test_digest(LEAF_COUNT));
1521 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1522 mmr.apply_batch(&batch).unwrap();
1523 let batch = pruned_mmr
1524 .new_batch()
1525 .add(&hasher, &test_digest(LEAF_COUNT));
1526 let batch = pruned_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1527 pruned_mmr.apply_batch(&batch).unwrap();
1528 assert!(*pruned_mmr.size() % cfg_pruned.items_per_blob != 0);
1529 pruned_mmr.sync().await.unwrap();
1530 drop(pruned_mmr);
1531 let mut pruned_mmr = Merkle::<F, _, Digest, Sequential>::init(
1532 context.child("pruned_reopen").with_attribute("index", 2),
1533 &hasher,
1534 cfg_pruned.clone(),
1535 )
1536 .await
1537 .unwrap();
1538 assert_eq!(
1539 pruned_mmr.root(&hasher, 0).unwrap(),
1540 mmr.root(&hasher, 0).unwrap()
1541 );
1542 let bounds = pruned_mmr.bounds();
1543 assert!(!bounds.is_empty());
1544 assert_eq!(bounds.start, Location::<F>::try_from(size).unwrap());
1545
1546 assert!(pruned_mmr
1548 .prune(Location::<F>::try_from(size).unwrap() - 1)
1549 .await
1550 .is_ok());
1551 assert_eq!(
1552 pruned_mmr.bounds().start,
1553 Location::<F>::try_from(size).unwrap()
1554 );
1555
1556 while *pruned_mmr.size() % cfg_pruned.items_per_blob != 0 {
1559 let batch = pruned_mmr
1560 .new_batch()
1561 .add(&hasher, &test_digest(LEAF_COUNT));
1562 let batch = pruned_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1563 pruned_mmr.apply_batch(&batch).unwrap();
1564 }
1565 pruned_mmr.prune_all().await.unwrap();
1566 assert!(pruned_mmr.bounds().is_empty());
1567
1568 pruned_mmr.destroy().await.unwrap();
1569 mmr.destroy().await.unwrap();
1570 }
1571
1572 #[test_traced]
1573 fn test_full_pruning_mmr() {
1574 let executor = deterministic::Runner::default();
1575 executor.start(full_pruning_inner::<mmr::Family>);
1576 }
1577
1578 #[test_traced]
1579 fn test_full_pruning_mmb() {
1580 let executor = deterministic::Runner::default();
1581 executor.start(full_pruning_inner::<mmb::Family>);
1582 }
1583
1584 async fn full_recovery_with_pruning_inner<F: Family>(context: deterministic::Context) {
1586 let hasher: Standard<Sha256> = Standard::new(ForwardFold);
1588 const LEAF_COUNT: usize = 2000;
1589 let mut leaves = Vec::with_capacity(LEAF_COUNT);
1590 let mmr = Merkle::<F, _, Digest, Sequential>::init(
1591 context.child("init"),
1592 &hasher,
1593 test_config(&context),
1594 )
1595 .await
1596 .unwrap();
1597 for i in 0..LEAF_COUNT {
1598 leaves.push(test_digest(i));
1599 }
1600 let mut batch = mmr.new_batch();
1601 for leaf in &leaves {
1602 batch = batch.add(&hasher, leaf);
1603 }
1604 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1605 mmr.apply_batch(&batch).unwrap();
1606 let expected_size = Position::<F>::try_from(Location::<F>::new(LEAF_COUNT as u64)).unwrap();
1607 assert_eq!(mmr.size(), expected_size);
1608 mmr.sync().await.unwrap();
1609 drop(mmr);
1610
1611 for i in 0usize..200 {
1613 let mut mmr = Merkle::<F, _, Digest, Sequential>::init(
1614 context.child("iter").with_attribute("index", i),
1615 &hasher,
1616 test_config(&context),
1617 )
1618 .await
1619 .unwrap();
1620 let start_size = mmr.size();
1621 let start_leaves = *mmr.leaves();
1622 let prune_loc = Location::<F>::new(std::cmp::min(i as u64 * 50, start_leaves));
1623 if i % 5 == 0 {
1624 mmr.simulate_pruning_failure(prune_loc).await.unwrap();
1625 continue;
1626 }
1627 mmr.prune(prune_loc).await.unwrap();
1628
1629 for j in 0..10 {
1631 let digest = test_digest(100 * (i + 1) + j);
1632 leaves.push(digest);
1633 let batch = mmr
1634 .new_batch()
1635 .add(&hasher, leaves.last().unwrap())
1636 .add(&hasher, leaves.last().unwrap());
1637 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1638 mmr.apply_batch(&batch).unwrap();
1639 let digest = test_digest(LEAF_COUNT + i);
1640 leaves.push(digest);
1641 let batch = mmr
1642 .new_batch()
1643 .add(&hasher, leaves.last().unwrap())
1644 .add(&hasher, leaves.last().unwrap());
1645 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1646 mmr.apply_batch(&batch).unwrap();
1647 }
1648 let end_size = mmr.size();
1649 let total_to_write = (*end_size - *start_size) as usize;
1650 let partial_write_limit = i % total_to_write;
1651 mmr.simulate_partial_sync(partial_write_limit)
1652 .await
1653 .unwrap();
1654 }
1655
1656 let mmr = Merkle::<F, _, Digest, Sequential>::init(
1657 context.child("final"),
1658 &hasher,
1659 test_config(&context),
1660 )
1661 .await
1662 .unwrap();
1663 mmr.destroy().await.unwrap();
1664 }
1665
1666 #[test_traced("WARN")]
1667 fn test_full_recovery_with_pruning_mmr() {
1668 let executor = deterministic::Runner::default();
1669 executor.start(full_recovery_with_pruning_inner::<mmr::Family>);
1670 }
1671
1672 #[test_traced("WARN")]
1673 fn test_full_recovery_with_pruning_mmb() {
1674 let executor = deterministic::Runner::default();
1675 executor.start(full_recovery_with_pruning_inner::<mmb::Family>);
1676 }
1677
1678 async fn full_historical_proof_basic_inner<F: Family>(context: deterministic::Context) {
1679 let hasher = Standard::<Sha256>::new(ForwardFold);
1681 let cfg = test_config(&context);
1682 let mmr = Merkle::<F, _, Digest, Sequential>::init(context, &hasher, cfg)
1683 .await
1684 .unwrap();
1685 let mut elements = Vec::new();
1686 for i in 0..10 {
1687 elements.push(test_digest(i));
1688 }
1689 let mut batch = mmr.new_batch();
1690 for elt in &elements {
1691 batch = batch.add(&hasher, elt);
1692 }
1693 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1694 mmr.apply_batch(&batch).unwrap();
1695 let original_leaves = mmr.leaves();
1696
1697 let historical_proof = mmr
1699 .historical_range_proof(
1700 &hasher,
1701 original_leaves,
1702 Location::<F>::new(2)..Location::<F>::new(6),
1703 0,
1704 )
1705 .await
1706 .unwrap();
1707 assert_eq!(historical_proof.leaves, original_leaves);
1708 let root = mmr.root(&hasher, 0).unwrap();
1709 assert!(historical_proof.verify_range_inclusion(
1710 &hasher,
1711 &elements[2..6],
1712 Location::<F>::new(2),
1713 &root
1714 ));
1715 let regular_proof = mmr
1716 .range_proof(&hasher, Location::<F>::new(2)..Location::<F>::new(6), 0)
1717 .await
1718 .unwrap();
1719 assert_eq!(regular_proof.leaves, historical_proof.leaves);
1720 assert_eq!(regular_proof.digests, historical_proof.digests);
1721
1722 for i in 10..20 {
1724 elements.push(test_digest(i));
1725 }
1726 let mut batch = mmr.new_batch();
1727 for elt in &elements[10..20] {
1728 batch = batch.add(&hasher, elt);
1729 }
1730 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1731 mmr.apply_batch(&batch).unwrap();
1732 let new_historical_proof = mmr
1733 .historical_range_proof(
1734 &hasher,
1735 original_leaves,
1736 Location::<F>::new(2)..Location::<F>::new(6),
1737 0,
1738 )
1739 .await
1740 .unwrap();
1741 assert_eq!(new_historical_proof.leaves, historical_proof.leaves);
1742 assert_eq!(new_historical_proof.digests, historical_proof.digests);
1743
1744 mmr.destroy().await.unwrap();
1745 }
1746
1747 #[test_traced]
1748 fn test_full_historical_proof_basic_mmr() {
1749 let executor = deterministic::Runner::default();
1750 executor.start(full_historical_proof_basic_inner::<mmr::Family>);
1751 }
1752
1753 #[test_traced]
1754 fn test_full_historical_proof_basic_mmb() {
1755 let executor = deterministic::Runner::default();
1756 executor.start(full_historical_proof_basic_inner::<mmb::Family>);
1757 }
1758
1759 async fn full_historical_proof_with_pruning_inner<F: Family>(context: deterministic::Context) {
1760 let hasher = Standard::<Sha256>::new(ForwardFold);
1761 let mut mmr = Merkle::<F, _, Digest, Sequential>::init(
1762 context.child("main"),
1763 &hasher,
1764 test_config(&context),
1765 )
1766 .await
1767 .unwrap();
1768
1769 let mut elements = Vec::new();
1771 for i in 0..50 {
1772 elements.push(test_digest(i));
1773 }
1774 let mut batch = mmr.new_batch();
1775 for elt in &elements {
1776 batch = batch.add(&hasher, elt);
1777 }
1778 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1779 mmr.apply_batch(&batch).unwrap();
1780
1781 let prune_loc = Location::<F>::new(16);
1783 mmr.prune(prune_loc).await.unwrap();
1784
1785 let ref_mmr = Merkle::<F, _, Digest, Sequential>::init(
1787 context.child("ref"),
1788 &hasher,
1789 Config {
1790 journal_partition: "ref-journal-pruned".into(),
1791 metadata_partition: "ref-metadata-pruned".into(),
1792 items_per_blob: NZU64!(7),
1793 write_buffer: NZUsize!(1024),
1794 strategy: Sequential,
1795 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1796 },
1797 )
1798 .await
1799 .unwrap();
1800
1801 let mut batch = ref_mmr.new_batch();
1802 for elt in elements.iter().take(41) {
1803 batch = batch.add(&hasher, elt);
1804 }
1805 let batch = ref_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1806 ref_mmr.apply_batch(&batch).unwrap();
1807 let historical_leaves = ref_mmr.leaves();
1808 let historical_root = ref_mmr.root(&hasher, 0).unwrap();
1809
1810 let historical_proof = mmr
1812 .historical_range_proof(
1813 &hasher,
1814 historical_leaves,
1815 Location::<F>::new(35)..Location::<F>::new(39),
1816 0,
1817 )
1818 .await
1819 .unwrap();
1820
1821 assert_eq!(historical_proof.leaves, historical_leaves);
1822
1823 assert!(historical_proof.verify_range_inclusion(
1825 &hasher,
1826 &elements[35..39],
1827 Location::<F>::new(35),
1828 &historical_root
1829 ));
1830
1831 ref_mmr.destroy().await.unwrap();
1832 mmr.destroy().await.unwrap();
1833 }
1834
1835 #[test_traced]
1836 fn test_full_historical_proof_with_pruning_mmr() {
1837 let executor = deterministic::Runner::default();
1838 executor.start(full_historical_proof_with_pruning_inner::<mmr::Family>);
1839 }
1840
1841 #[test_traced]
1842 fn test_full_historical_proof_with_pruning_mmb() {
1843 let executor = deterministic::Runner::default();
1844 executor.start(full_historical_proof_with_pruning_inner::<mmb::Family>);
1845 }
1846
1847 async fn full_historical_proof_large_inner<F: Family>(context: deterministic::Context) {
1848 let hasher = Standard::<Sha256>::new(ForwardFold);
1849
1850 let mmr = Merkle::<F, _, Digest, Sequential>::init(
1851 context.child("server"),
1852 &hasher,
1853 Config {
1854 journal_partition: "server-journal".into(),
1855 metadata_partition: "server-metadata".into(),
1856 items_per_blob: NZU64!(7),
1857 write_buffer: NZUsize!(1024),
1858 strategy: Sequential,
1859 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1860 },
1861 )
1862 .await
1863 .unwrap();
1864
1865 let mut elements = Vec::new();
1866 for i in 0..100 {
1867 elements.push(test_digest(i));
1868 }
1869 let mut batch = mmr.new_batch();
1870 for elt in &elements {
1871 batch = batch.add(&hasher, elt);
1872 }
1873 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1874 mmr.apply_batch(&batch).unwrap();
1875
1876 let range = Location::<F>::new(30)..Location::<F>::new(61);
1877
1878 let ref_mmr = Merkle::<F, _, Digest, Sequential>::init(
1880 context.child("client"),
1881 &hasher,
1882 Config {
1883 journal_partition: "client-journal".into(),
1884 metadata_partition: "client-metadata".into(),
1885 items_per_blob: NZU64!(7),
1886 write_buffer: NZUsize!(1024),
1887 strategy: Sequential,
1888 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1889 },
1890 )
1891 .await
1892 .unwrap();
1893
1894 let mut batch = ref_mmr.new_batch();
1896 for elt in elements.iter().take(*range.end as usize) {
1897 batch = batch.add(&hasher, elt);
1898 }
1899 let batch = ref_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1900 ref_mmr.apply_batch(&batch).unwrap();
1901 let historical_leaves = ref_mmr.leaves();
1902 let expected_root = ref_mmr.root(&hasher, 0).unwrap();
1903
1904 let proof = mmr
1906 .historical_range_proof(&hasher, historical_leaves, range.clone(), 0)
1907 .await
1908 .unwrap();
1909
1910 assert!(proof.verify_range_inclusion(
1911 &hasher,
1912 &elements[range.to_usize_range()],
1913 range.start,
1914 &expected_root, ));
1916
1917 ref_mmr.destroy().await.unwrap();
1918 mmr.destroy().await.unwrap();
1919 }
1920
1921 #[test_traced]
1922 fn test_full_historical_proof_large_mmr() {
1923 let executor = deterministic::Runner::default();
1924 executor.start(full_historical_proof_large_inner::<mmr::Family>);
1925 }
1926
1927 #[test_traced]
1928 fn test_full_historical_proof_large_mmb() {
1929 let executor = deterministic::Runner::default();
1930 executor.start(full_historical_proof_large_inner::<mmb::Family>);
1931 }
1932
1933 async fn full_historical_proof_singleton_inner<F: Family>(context: deterministic::Context) {
1934 let hasher = Standard::<Sha256>::new(ForwardFold);
1935 let cfg = test_config(&context);
1936 let mmr = Merkle::<F, _, Digest, Sequential>::init(context, &hasher, cfg)
1937 .await
1938 .unwrap();
1939
1940 let element = test_digest(0);
1941 let batch = mmr.new_batch().add(&hasher, &element);
1942 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1943 mmr.apply_batch(&batch).unwrap();
1944
1945 let single_proof = mmr
1947 .historical_range_proof(
1948 &hasher,
1949 Location::<F>::new(1),
1950 Location::<F>::new(0)..Location::<F>::new(1),
1951 0,
1952 )
1953 .await
1954 .unwrap();
1955
1956 let root = mmr.root(&hasher, 0).unwrap();
1957 assert!(single_proof.verify_range_inclusion(
1958 &hasher,
1959 &[element],
1960 Location::<F>::new(0),
1961 &root
1962 ));
1963
1964 mmr.destroy().await.unwrap();
1965 }
1966
1967 #[test_traced]
1968 fn test_full_historical_proof_singleton_mmr() {
1969 let executor = deterministic::Runner::default();
1970 executor.start(full_historical_proof_singleton_inner::<mmr::Family>);
1971 }
1972
1973 #[test_traced]
1974 fn test_full_historical_proof_singleton_mmb() {
1975 let executor = deterministic::Runner::default();
1976 executor.start(full_historical_proof_singleton_inner::<mmb::Family>);
1977 }
1978
1979 async fn full_init_sync_empty_inner<F: Family>(context: deterministic::Context) {
1981 let hasher = Standard::<Sha256>::new(ForwardFold);
1982
1983 let sync_cfg = SyncConfig::<F, sha256::Digest, Sequential> {
1985 config: test_config(&context),
1986 range: non_empty_range!(Location::<F>::new(0), Location::<F>::new(52)),
1987 pinned_nodes: None,
1988 };
1989
1990 let sync_mmr =
1991 Merkle::<F, _, Digest, Sequential>::init_sync(context.child("storage"), sync_cfg)
1992 .await
1993 .unwrap();
1994
1995 assert_eq!(sync_mmr.size(), 0);
1997 let bounds = sync_mmr.bounds();
1998 assert_eq!(bounds.start, 0);
1999 assert!(bounds.is_empty());
2000
2001 let new_element = test_digest(999);
2003 let batch = sync_mmr.new_batch().add(&hasher, &new_element);
2004 let batch = sync_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2005 sync_mmr.apply_batch(&batch).unwrap();
2006
2007 let _root = sync_mmr.root(&hasher, 0).unwrap();
2009
2010 sync_mmr.destroy().await.unwrap();
2011 }
2012
2013 #[test_traced]
2014 fn test_full_init_sync_empty_mmr() {
2015 let executor = deterministic::Runner::default();
2016 executor.start(full_init_sync_empty_inner::<mmr::Family>);
2017 }
2018
2019 #[test_traced]
2020 fn test_full_init_sync_empty_mmb() {
2021 let executor = deterministic::Runner::default();
2022 executor.start(full_init_sync_empty_inner::<mmb::Family>);
2023 }
2024
2025 async fn full_init_sync_nonempty_exact_match_inner<F: Family>(context: deterministic::Context) {
2027 let hasher = Standard::<Sha256>::new(ForwardFold);
2028
2029 let mmr = Merkle::<F, _, Digest, Sequential>::init(
2031 context.child("init"),
2032 &hasher,
2033 test_config(&context),
2034 )
2035 .await
2036 .unwrap();
2037 let mut batch = mmr.new_batch();
2038 for i in 0..50 {
2039 batch = batch.add(&hasher, &test_digest(i));
2040 }
2041 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2042 mmr.apply_batch(&batch).unwrap();
2043 mmr.sync().await.unwrap();
2044 let original_size = mmr.size();
2045 let original_leaves = mmr.leaves();
2046 let original_root = mmr.root(&hasher, 0).unwrap();
2047
2048 let lower_bound_loc = mmr.bounds().start;
2050 let upper_bound_loc = mmr.leaves();
2051 let lower_bound_pos = Position::<F>::try_from(lower_bound_loc).unwrap();
2052 let upper_bound_pos = mmr.size();
2053 let mut expected_nodes = BTreeMap::new();
2054 for i in *lower_bound_pos..*upper_bound_pos {
2055 expected_nodes.insert(
2056 Position::<F>::new(i),
2057 mmr.get_node(Position::<F>::new(i)).await.unwrap().unwrap(),
2058 );
2059 }
2060 let sync_cfg = SyncConfig::<F, sha256::Digest, Sequential> {
2061 config: test_config(&context),
2062 range: non_empty_range!(lower_bound_loc, upper_bound_loc),
2063 pinned_nodes: None,
2064 };
2065
2066 mmr.sync().await.unwrap();
2067 drop(mmr);
2068
2069 let sync_mmr =
2070 Merkle::<F, _, Digest, Sequential>::init_sync(context.child("sync"), sync_cfg)
2071 .await
2072 .unwrap();
2073
2074 assert_eq!(sync_mmr.size(), original_size);
2076 assert_eq!(sync_mmr.leaves(), original_leaves);
2077 let bounds = sync_mmr.bounds();
2078 assert_eq!(bounds.start, lower_bound_loc);
2079 assert!(!bounds.is_empty());
2080 assert_eq!(sync_mmr.root(&hasher, 0).unwrap(), original_root);
2081 for pos in *lower_bound_pos..*upper_bound_pos {
2082 let pos = Position::<F>::new(pos);
2083 assert_eq!(
2084 sync_mmr.get_node(pos).await.unwrap(),
2085 expected_nodes.get(&pos).cloned()
2086 );
2087 }
2088
2089 sync_mmr.destroy().await.unwrap();
2090 }
2091
2092 #[test_traced]
2093 fn test_full_init_sync_nonempty_exact_match_mmr() {
2094 let executor = deterministic::Runner::default();
2095 executor.start(full_init_sync_nonempty_exact_match_inner::<mmr::Family>);
2096 }
2097
2098 #[test_traced]
2099 fn test_full_init_sync_nonempty_exact_match_mmb() {
2100 let executor = deterministic::Runner::default();
2101 executor.start(full_init_sync_nonempty_exact_match_inner::<mmb::Family>);
2102 }
2103
2104 async fn full_init_sync_partial_overlap_inner<F: Family>(context: deterministic::Context) {
2107 let hasher = Standard::<Sha256>::new(ForwardFold);
2108
2109 let mut mmr = Merkle::<F, _, Digest, Sequential>::init(
2111 context.child("init"),
2112 &hasher,
2113 test_config(&context),
2114 )
2115 .await
2116 .unwrap();
2117 let mut batch = mmr.new_batch();
2118 for i in 0..30 {
2119 batch = batch.add(&hasher, &test_digest(i));
2120 }
2121 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2122 mmr.apply_batch(&batch).unwrap();
2123 mmr.sync().await.unwrap();
2124 mmr.prune(Location::<F>::new(6)).await.unwrap();
2125
2126 let original_size = mmr.size();
2127 let original_leaves = mmr.leaves();
2128 let original_root = mmr.root(&hasher, 0).unwrap();
2129 let original_pruning_boundary = mmr.bounds().start;
2130 let original_pruning_pos = Position::<F>::try_from(original_pruning_boundary).unwrap();
2131
2132 let lower_bound_loc = original_pruning_boundary;
2134 let upper_bound_loc = original_leaves + 6; let mut expected_nodes = BTreeMap::new();
2137 for i in *original_pruning_pos..*original_size {
2138 let pos = Position::<F>::new(i);
2139 expected_nodes.insert(pos, mmr.get_node(pos).await.unwrap().unwrap());
2140 }
2141
2142 let sync_cfg = SyncConfig::<F, sha256::Digest, Sequential> {
2143 config: test_config(&context),
2144 range: non_empty_range!(lower_bound_loc, upper_bound_loc),
2145 pinned_nodes: None,
2146 };
2147
2148 mmr.sync().await.unwrap();
2149 drop(mmr);
2150
2151 let sync_mmr =
2152 Merkle::<F, _, Digest, Sequential>::init_sync(context.child("sync"), sync_cfg)
2153 .await
2154 .unwrap();
2155
2156 assert_eq!(sync_mmr.size(), original_size);
2158 let bounds = sync_mmr.bounds();
2159 assert_eq!(bounds.start, lower_bound_loc);
2160 assert!(!bounds.is_empty());
2161 assert_eq!(sync_mmr.root(&hasher, 0).unwrap(), original_root);
2162
2163 for i in *original_pruning_pos..*original_size {
2165 let pos = Position::<F>::new(i);
2166 assert_eq!(
2167 sync_mmr.get_node(pos).await.unwrap(),
2168 expected_nodes.get(&pos).cloned()
2169 );
2170 }
2171
2172 sync_mmr.destroy().await.unwrap();
2173 }
2174
2175 #[test_traced]
2176 fn test_full_init_sync_partial_overlap_mmr() {
2177 let executor = deterministic::Runner::default();
2178 executor.start(full_init_sync_partial_overlap_inner::<mmr::Family>);
2179 }
2180
2181 #[test_traced]
2182 fn test_full_init_sync_partial_overlap_mmb() {
2183 let executor = deterministic::Runner::default();
2184 executor.start(full_init_sync_partial_overlap_inner::<mmb::Family>);
2185 }
2186
2187 async fn full_init_sync_rejects_extra_pinned_nodes_inner<F: Family>(
2188 context: deterministic::Context,
2189 ) {
2190 let sync_cfg = SyncConfig::<F, sha256::Digest, Sequential> {
2191 config: test_config(&context),
2192 range: non_empty_range!(Location::<F>::new(6), Location::<F>::new(20)),
2193 pinned_nodes: Some(vec![test_digest(1), test_digest(2), test_digest(3)]),
2194 };
2195
2196 let result =
2197 Merkle::<F, _, Digest, Sequential>::init_sync(context.child("sync"), sync_cfg).await;
2198 assert!(matches!(result, Err(Error::InvalidPinnedNodes)));
2199 }
2200
2201 #[test_traced]
2202 fn test_full_init_sync_rejects_extra_pinned_nodes_mmr() {
2203 let executor = deterministic::Runner::default();
2204 executor.start(full_init_sync_rejects_extra_pinned_nodes_inner::<mmr::Family>);
2205 }
2206
2207 #[test_traced]
2208 fn test_full_init_sync_rejects_extra_pinned_nodes_mmb() {
2209 let executor = deterministic::Runner::default();
2210 executor.start(full_init_sync_rejects_extra_pinned_nodes_inner::<mmb::Family>);
2211 }
2212
2213 async fn full_init_stale_metadata_returns_error_inner<F: Family>(
2217 context: deterministic::Context,
2218 ) {
2219 let hasher = Standard::<Sha256>::new(ForwardFold);
2220
2221 let mut mmr = Merkle::<F, _, Digest, Sequential>::init(
2223 context.child("init"),
2224 &hasher,
2225 test_config(&context),
2226 )
2227 .await
2228 .unwrap();
2229
2230 let mut batch = mmr.new_batch();
2232 for i in 0..50 {
2233 batch = batch.add(&hasher, &test_digest(i));
2234 }
2235 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2236 mmr.apply_batch(&batch).unwrap();
2237 mmr.sync().await.unwrap();
2238
2239 let prune_loc = Location::<F>::new(25);
2241 mmr.prune(prune_loc).await.unwrap();
2242 drop(mmr);
2243
2244 let meta_cfg = MConfig {
2247 partition: test_config(&context).metadata_partition,
2248 codec_config: ((0..).into(), ()),
2249 };
2250 let mut metadata =
2251 Metadata::<_, U64, Vec<u8>>::init(context.child("meta_tamper"), meta_cfg)
2252 .await
2253 .unwrap();
2254 metadata.clear();
2255 let key = U64::new(PRUNED_TO_PREFIX, 0);
2256 metadata
2257 .put_sync(key, 0u64.to_be_bytes().to_vec())
2258 .await
2259 .unwrap();
2260 drop(metadata);
2261
2262 let result = Merkle::<F, _, Digest, Sequential>::init(
2267 context.child("reopened"),
2268 &hasher,
2269 test_config(&context),
2270 )
2271 .await;
2272
2273 match result {
2274 Err(Error::MissingNode(_)) => {} Ok(_) => panic!("expected MissingNode error, got Ok"),
2276 Err(e) => panic!("expected MissingNode error, got {:?}", e),
2277 }
2278 }
2279
2280 #[test_traced("WARN")]
2281 fn test_full_init_stale_metadata_returns_error_mmr() {
2282 let executor = deterministic::Runner::default();
2283 executor.start(full_init_stale_metadata_returns_error_inner::<mmr::Family>);
2284 }
2285
2286 #[test_traced("WARN")]
2287 fn test_full_init_stale_metadata_returns_error_mmb() {
2288 let executor = deterministic::Runner::default();
2289 executor.start(full_init_stale_metadata_returns_error_inner::<mmb::Family>);
2290 }
2291
2292 async fn full_init_metadata_ahead_inner<F: Family>(context: deterministic::Context) {
2296 let hasher = Standard::<Sha256>::new(ForwardFold);
2297
2298 let mut mmr = Merkle::<F, _, Digest, Sequential>::init(
2300 context.child("init"),
2301 &hasher,
2302 test_config(&context),
2303 )
2304 .await
2305 .unwrap();
2306
2307 let mut batch = mmr.new_batch();
2309 for i in 0..50 {
2310 batch = batch.add(&hasher, &test_digest(i));
2311 }
2312 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2313 mmr.apply_batch(&batch).unwrap();
2314 mmr.sync().await.unwrap();
2315
2316 let prune_loc = Location::<F>::new(16);
2318 mmr.prune(prune_loc).await.unwrap();
2319 let expected_root = mmr.root(&hasher, 0).unwrap();
2320 let expected_size = mmr.size();
2321 drop(mmr);
2322
2323 let mmr = Merkle::<F, _, Digest, Sequential>::init(
2326 context.child("reopened"),
2327 &hasher,
2328 test_config(&context),
2329 )
2330 .await
2331 .unwrap();
2332
2333 assert_eq!(mmr.bounds().start, prune_loc);
2334 assert_eq!(mmr.size(), expected_size);
2335 assert_eq!(mmr.root(&hasher, 0).unwrap(), expected_root);
2336
2337 mmr.destroy().await.unwrap();
2338 }
2339
2340 #[test_traced("WARN")]
2341 fn test_full_init_metadata_ahead_mmr() {
2342 let executor = deterministic::Runner::default();
2343 executor.start(full_init_metadata_ahead_inner::<mmr::Family>);
2344 }
2345
2346 #[test_traced("WARN")]
2347 fn test_full_init_metadata_ahead_mmb() {
2348 let executor = deterministic::Runner::default();
2349 executor.start(full_init_metadata_ahead_inner::<mmb::Family>);
2350 }
2351
2352 async fn full_init_sync_computes_pinned_nodes_before_pruning_inner<F: Family>(
2359 context: deterministic::Context,
2360 ) {
2361 let hasher = Standard::<Sha256>::new(ForwardFold);
2362
2363 let cfg = Config {
2365 journal_partition: "mmr-journal".into(),
2366 metadata_partition: "mmr-metadata".into(),
2367 items_per_blob: NZU64!(7),
2368 write_buffer: NZUsize!(64),
2369 strategy: Sequential,
2370 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2371 };
2372
2373 let mmr =
2375 Merkle::<F, _, Digest, Sequential>::init(context.child("init"), &hasher, cfg.clone())
2376 .await
2377 .unwrap();
2378 let mut batch = mmr.new_batch();
2379 for i in 0..100 {
2380 batch = batch.add(&hasher, &test_digest(i));
2381 }
2382 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2383 mmr.apply_batch(&batch).unwrap();
2384 mmr.sync().await.unwrap();
2385
2386 let original_size = mmr.size();
2389 let original_root = mmr.root(&hasher, 0).unwrap();
2390 drop(mmr);
2391
2392 let prune_loc = Location::<F>::new(32);
2395 let sync_cfg = SyncConfig::<F, sha256::Digest, Sequential> {
2396 config: cfg,
2397 range: non_empty_range!(prune_loc, Location::<F>::new(128)),
2398 pinned_nodes: None, };
2400
2401 let sync_mmr =
2402 Merkle::<F, _, Digest, Sequential>::init_sync(context.child("sync"), sync_cfg)
2403 .await
2404 .unwrap();
2405
2406 assert_eq!(sync_mmr.size(), original_size);
2408 assert_eq!(sync_mmr.root(&hasher, 0).unwrap(), original_root);
2409 assert_eq!(sync_mmr.bounds().start, prune_loc);
2410
2411 sync_mmr.destroy().await.unwrap();
2412 }
2413
2414 #[test_traced]
2415 fn test_full_init_sync_computes_pinned_nodes_before_pruning_mmr() {
2416 let executor = deterministic::Runner::default();
2417 executor.start(full_init_sync_computes_pinned_nodes_before_pruning_inner::<mmr::Family>);
2418 }
2419
2420 #[test_traced]
2421 fn test_full_init_sync_computes_pinned_nodes_before_pruning_mmb() {
2422 let executor = deterministic::Runner::default();
2423 executor.start(full_init_sync_computes_pinned_nodes_before_pruning_inner::<mmb::Family>);
2424 }
2425
2426 async fn full_historical_proof_pruned_elements_inner<F: Family>(
2427 context: deterministic::Context,
2428 ) {
2429 let hasher = Standard::<Sha256>::new(ForwardFold);
2430
2431 let mut mmr = Merkle::<F, _, Digest, Sequential>::init(
2432 context.child("init"),
2433 &hasher,
2434 test_config(&context),
2435 )
2436 .await
2437 .unwrap();
2438
2439 let mut batch = mmr.new_batch();
2440 for i in 0..64 {
2441 batch = batch.add(&hasher, &test_digest(i));
2442 }
2443 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2444 mmr.apply_batch(&batch).unwrap();
2445
2446 let prune_loc = Location::<F>::new(16);
2447 mmr.prune(prune_loc).await.unwrap();
2448
2449 let historical_leaves = mmr.leaves();
2450 let mut pruned_loc = None;
2451 for loc_u64 in 0..*historical_leaves {
2452 let loc = Location::<F>::new(loc_u64);
2453 let result = mmr
2454 .historical_range_proof(&hasher, historical_leaves, loc..loc + 1, 0)
2455 .await;
2456 if matches!(result, Err(Error::ElementPruned(_))) {
2457 pruned_loc = Some(loc);
2458 break;
2459 }
2460 }
2461 let pruned_loc = pruned_loc.expect("expected at least one pruned location");
2462
2463 let mut batch = mmr.new_batch();
2465 for i in 0..8 {
2466 batch = batch.add(&hasher, &test_digest(10_000 + i));
2467 }
2468 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2469 mmr.apply_batch(&batch).unwrap();
2470
2471 let requested = mmr.leaves();
2472 let result = mmr
2473 .historical_range_proof(&hasher, requested, pruned_loc..pruned_loc + 1, 0)
2474 .await;
2475 assert!(matches!(result, Err(Error::ElementPruned(_))));
2476
2477 mmr.destroy().await.unwrap();
2478 }
2479
2480 #[test_traced]
2481 fn test_full_historical_proof_pruned_elements_mmr() {
2482 let executor = deterministic::Runner::default();
2483 executor.start(full_historical_proof_pruned_elements_inner::<mmr::Family>);
2484 }
2485
2486 #[test_traced]
2487 fn test_full_historical_proof_pruned_elements_mmb() {
2488 let executor = deterministic::Runner::default();
2489 executor.start(full_historical_proof_pruned_elements_inner::<mmb::Family>);
2490 }
2491
2492 async fn full_append_while_historical_proof_is_available_inner<F: Family>(
2493 context: deterministic::Context,
2494 ) {
2495 let hasher = Standard::<Sha256>::new(ForwardFold);
2496 let mmr = Merkle::<F, _, Digest, Sequential>::init(
2497 context.child("init"),
2498 &hasher,
2499 test_config(&context),
2500 )
2501 .await
2502 .unwrap();
2503
2504 let mut batch = mmr.new_batch();
2505 for i in 0..20 {
2506 batch = batch.add(&hasher, &test_digest(i));
2507 }
2508 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2509 mmr.apply_batch(&batch).unwrap();
2510
2511 let historical_leaves = Location::<F>::new(10);
2512 let range = Location::<F>::new(2)..Location::<F>::new(8);
2513
2514 let batch = mmr
2516 .new_batch()
2517 .add(&hasher, &test_digest(100))
2518 .add(&hasher, &test_digest(101));
2519 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2520 mmr.apply_batch(&batch).unwrap();
2521
2522 let proof = mmr
2523 .historical_range_proof(&hasher, historical_leaves, range.clone(), 0)
2524 .await
2525 .unwrap();
2526
2527 let expected = mmr
2528 .historical_range_proof(&hasher, historical_leaves, range, 0)
2529 .await
2530 .unwrap();
2531 assert_eq!(proof, expected);
2532
2533 mmr.destroy().await.unwrap();
2534 }
2535
2536 #[test_traced]
2537 fn test_full_append_while_historical_proof_is_available_mmr() {
2538 let executor = deterministic::Runner::default();
2539 executor.start(full_append_while_historical_proof_is_available_inner::<mmr::Family>);
2540 }
2541
2542 #[test_traced]
2543 fn test_full_append_while_historical_proof_is_available_mmb() {
2544 let executor = deterministic::Runner::default();
2545 executor.start(full_append_while_historical_proof_is_available_inner::<mmb::Family>);
2546 }
2547
2548 async fn full_historical_proof_after_sync_reads_from_journal_inner<F: Family>(
2549 context: deterministic::Context,
2550 ) {
2551 let hasher = Standard::<Sha256>::new(ForwardFold);
2552 let mmr = Merkle::<F, _, Digest, Sequential>::init(
2553 context.child("init"),
2554 &hasher,
2555 test_config(&context),
2556 )
2557 .await
2558 .unwrap();
2559
2560 let mut batch = mmr.new_batch();
2561 for i in 0..64 {
2562 batch = batch.add(&hasher, &test_digest(i));
2563 }
2564 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2565 mmr.apply_batch(&batch).unwrap();
2566 mmr.sync().await.unwrap();
2567
2568 let historical_leaves = Location::<F>::new(20);
2569 let range = Location::<F>::new(5)..Location::<F>::new(15);
2570 let expected = mmr
2571 .historical_range_proof(&hasher, historical_leaves, range.clone(), 0)
2572 .await
2573 .unwrap();
2574
2575 let actual = mmr
2576 .historical_range_proof(&hasher, historical_leaves, range, 0)
2577 .await
2578 .unwrap();
2579 assert_eq!(actual, expected);
2580
2581 mmr.destroy().await.unwrap();
2582 }
2583
2584 #[test_traced]
2585 fn test_full_historical_proof_after_sync_reads_from_journal_mmr() {
2586 let executor = deterministic::Runner::default();
2587 executor.start(full_historical_proof_after_sync_reads_from_journal_inner::<mmr::Family>);
2588 }
2589
2590 #[test_traced]
2591 fn test_full_historical_proof_after_sync_reads_from_journal_mmb() {
2592 let executor = deterministic::Runner::default();
2593 executor.start(full_historical_proof_after_sync_reads_from_journal_inner::<mmb::Family>);
2594 }
2595
2596 async fn full_historical_proof_after_pruning_inner<F: Family>(context: deterministic::Context) {
2597 let hasher = Standard::<Sha256>::new(ForwardFold);
2598 let mut mmr = Merkle::<F, _, Digest, Sequential>::init(
2599 context.child("init"),
2600 &hasher,
2601 test_config(&context),
2602 )
2603 .await
2604 .unwrap();
2605
2606 let mut batch = mmr.new_batch();
2607 for i in 0..30 {
2608 batch = batch.add(&hasher, &test_digest(i));
2609 }
2610 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2611 mmr.apply_batch(&batch).unwrap();
2612
2613 let prune_loc = Location::<F>::new(10);
2614 mmr.prune(prune_loc).await.unwrap();
2615
2616 let requested = Location::<F>::new(20);
2617 let range = prune_loc..requested;
2618 let proof = mmr
2619 .historical_range_proof(&hasher, requested, range, 0)
2620 .await
2621 .unwrap();
2622 assert!(proof.leaves > Location::<F>::new(0));
2623
2624 mmr.destroy().await.unwrap();
2625 }
2626
2627 #[test_traced]
2628 fn test_full_historical_proof_after_pruning_mmr() {
2629 let executor = deterministic::Runner::default();
2630 executor.start(full_historical_proof_after_pruning_inner::<mmr::Family>);
2631 }
2632
2633 #[test_traced]
2634 fn test_full_historical_proof_after_pruning_mmb() {
2635 let executor = deterministic::Runner::default();
2636 executor.start(full_historical_proof_after_pruning_inner::<mmb::Family>);
2637 }
2638
2639 async fn full_historical_proof_edge_cases_inner<F: Family>(context: deterministic::Context) {
2640 let hasher = Standard::<Sha256>::new(ForwardFold);
2641
2642 let mmr = Merkle::<F, _, Digest, Sequential>::init(
2644 context.child("empty"),
2645 &hasher,
2646 test_config(&context),
2647 )
2648 .await
2649 .unwrap();
2650 let empty_end = Location::<F>::new(0);
2651 let empty_result = mmr
2652 .historical_range_proof(&hasher, empty_end, empty_end..empty_end, 0)
2653 .await;
2654 assert!(matches!(empty_result, Err(Error::Empty)));
2655 let oob_result = mmr
2656 .historical_range_proof(&hasher, empty_end + 1, empty_end..empty_end + 1, 0)
2657 .await;
2658 assert!(matches!(
2659 oob_result,
2660 Err(Error::RangeOutOfBounds(loc)) if loc == empty_end + 1
2661 ));
2662 mmr.destroy().await.unwrap();
2663
2664 let mut mmr = Merkle::<F, _, Digest, Sequential>::init(
2666 context.child("fully_pruned"),
2667 &hasher,
2668 test_config(&context),
2669 )
2670 .await
2671 .unwrap();
2672 let mut batch = mmr.new_batch();
2673 for i in 0..20 {
2674 batch = batch.add(&hasher, &test_digest(i));
2675 }
2676 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2677 mmr.apply_batch(&batch).unwrap();
2678 let end = mmr.leaves();
2679 mmr.prune_all().await.unwrap();
2680 assert!(mmr.bounds().is_empty());
2681 let pruned_result = mmr
2682 .historical_range_proof(&hasher, end, end - 1..end, 0)
2683 .await;
2684 assert!(matches!(pruned_result, Err(Error::ElementPruned(_))));
2685 let oob_result = mmr
2686 .historical_range_proof(&hasher, end + 1, end - 1..end, 0)
2687 .await;
2688 assert!(matches!(
2689 oob_result,
2690 Err(Error::RangeOutOfBounds(loc)) if loc == end + 1
2691 ));
2692 mmr.destroy().await.unwrap();
2693
2694 let mut mmr = Merkle::<F, _, Digest, Sequential>::init(
2696 context.child("single_leaf"),
2697 &hasher,
2698 test_config(&context),
2699 )
2700 .await
2701 .unwrap();
2702 let mut batch = mmr.new_batch();
2703 for i in 0..11 {
2704 batch = batch.add(&hasher, &test_digest(i));
2705 }
2706 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2707 mmr.apply_batch(&batch).unwrap();
2708 let end = mmr.leaves();
2709 let keep_loc = end - 1;
2710 mmr.prune(keep_loc).await.unwrap();
2711 let ok_result = mmr
2712 .historical_range_proof(&hasher, end, keep_loc..end, 0)
2713 .await;
2714 assert!(ok_result.is_ok());
2715 let pruned_end = keep_loc - 1;
2716 let start_loc = Location::<F>::new(1);
2718 let pruned_result = mmr
2719 .historical_range_proof(&hasher, end, start_loc..pruned_end + 1, 0)
2720 .await;
2721 assert!(matches!(pruned_result, Err(Error::ElementPruned(_))));
2722 let oob_result = mmr
2723 .historical_range_proof(&hasher, end + 1, keep_loc..end, 0)
2724 .await;
2725 assert!(matches!(oob_result, Err(Error::RangeOutOfBounds(_))));
2726 mmr.destroy().await.unwrap();
2727 }
2728
2729 #[test_traced]
2730 fn test_full_historical_proof_edge_cases_mmr() {
2731 let executor = deterministic::Runner::default();
2732 executor.start(full_historical_proof_edge_cases_inner::<mmr::Family>);
2733 }
2734
2735 #[test_traced]
2736 fn test_full_historical_proof_edge_cases_mmb() {
2737 let executor = deterministic::Runner::default();
2738 executor.start(full_historical_proof_edge_cases_inner::<mmb::Family>);
2739 }
2740
2741 async fn full_historical_proof_out_of_bounds_inner<F: Family>(context: deterministic::Context) {
2742 let hasher = Standard::<Sha256>::new(ForwardFold);
2743 let mmr = Merkle::<F, _, Digest, Sequential>::init(
2744 context.child("oob"),
2745 &hasher,
2746 test_config(&context),
2747 )
2748 .await
2749 .unwrap();
2750
2751 let mut batch = mmr.new_batch();
2752 for i in 0..8 {
2753 batch = batch.add(&hasher, &test_digest(i));
2754 }
2755 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2756 mmr.apply_batch(&batch).unwrap();
2757 let requested = mmr.leaves() + 1;
2758
2759 let result = mmr
2760 .historical_range_proof(&hasher, requested, Location::<F>::new(0)..requested, 0)
2761 .await;
2762 assert!(matches!(
2763 result,
2764 Err(Error::RangeOutOfBounds(loc)) if loc == requested
2765 ));
2766
2767 mmr.destroy().await.unwrap();
2768 }
2769
2770 #[test_traced]
2771 fn test_full_historical_proof_out_of_bounds_mmr() {
2772 let executor = deterministic::Runner::default();
2773 executor.start(full_historical_proof_out_of_bounds_inner::<mmr::Family>);
2774 }
2775
2776 #[test_traced]
2777 fn test_full_historical_proof_out_of_bounds_mmb() {
2778 let executor = deterministic::Runner::default();
2779 executor.start(full_historical_proof_out_of_bounds_inner::<mmb::Family>);
2780 }
2781
2782 async fn full_historical_proof_range_validation_inner<F: Family>(
2783 context: deterministic::Context,
2784 ) {
2785 let hasher = Standard::<Sha256>::new(ForwardFold);
2786 let mmr = Merkle::<F, _, Digest, Sequential>::init(
2787 context.child("range_validation"),
2788 &hasher,
2789 test_config(&context),
2790 )
2791 .await
2792 .unwrap();
2793
2794 let mut batch = mmr.new_batch();
2795 for i in 0..32 {
2796 batch = batch.add(&hasher, &test_digest(i));
2797 }
2798 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2799 mmr.apply_batch(&batch).unwrap();
2800
2801 let valid_range = Location::<F>::new(0)..Location::<F>::new(1);
2802
2803 let requested = Location::<F>::new(5);
2805 let empty_range = requested..requested;
2806 let empty_result = mmr
2807 .historical_range_proof(&hasher, requested, empty_range, 0)
2808 .await;
2809 assert!(matches!(empty_result, Err(Error::Empty)));
2810
2811 let leaves_oob = mmr.leaves() + 1;
2813 let result = mmr
2814 .historical_range_proof(&hasher, leaves_oob, valid_range.clone(), 0)
2815 .await;
2816 assert!(matches!(
2817 result,
2818 Err(Error::RangeOutOfBounds(loc)) if loc == leaves_oob
2819 ));
2820
2821 let end_oob = mmr.leaves() + 1;
2823 let range_oob = Location::<F>::new(0)..end_oob;
2824 let result = mmr
2825 .historical_range_proof(&hasher, requested, range_oob, 0)
2826 .await;
2827 assert!(matches!(
2828 result,
2829 Err(Error::RangeOutOfBounds(loc)) if loc == end_oob
2830 ));
2831
2832 let range_end_gt_requested = requested + 1;
2834 let range_oob_at_requested = Location::<F>::new(0)..range_end_gt_requested;
2835 assert!(range_end_gt_requested <= mmr.leaves());
2836 let result = mmr
2837 .historical_range_proof(&hasher, requested, range_oob_at_requested, 0)
2838 .await;
2839 assert!(matches!(
2840 result,
2841 Err(Error::RangeOutOfBounds(loc)) if loc == range_end_gt_requested
2842 ));
2843
2844 let overflow_loc = Location::<F>::new(u64::MAX);
2847 let overflow_range = Location::<F>::new(0)..overflow_loc;
2848 let result = mmr
2849 .historical_range_proof(&hasher, requested, overflow_range, 0)
2850 .await;
2851 assert!(matches!(
2852 result,
2853 Err(Error::RangeOutOfBounds(loc)) if loc == overflow_loc
2854 ));
2855
2856 mmr.destroy().await.unwrap();
2857 }
2858
2859 #[test_traced]
2860 fn test_full_historical_proof_range_validation_mmr() {
2861 let executor = deterministic::Runner::default();
2862 executor.start(full_historical_proof_range_validation_inner::<mmr::Family>);
2863 }
2864
2865 #[test_traced]
2866 fn test_full_historical_proof_range_validation_mmb() {
2867 let executor = deterministic::Runner::default();
2868 executor.start(full_historical_proof_range_validation_inner::<mmb::Family>);
2869 }
2870
2871 async fn full_historical_proof_non_size_prune_excludes_pruned_leaves_inner<F: Family>(
2872 context: deterministic::Context,
2873 ) {
2874 let hasher = Standard::<Sha256>::new(ForwardFold);
2875 let mut mmr = Merkle::<F, _, Digest, Sequential>::init(
2876 context.child("non_size_prune"),
2877 &hasher,
2878 test_config(&context),
2879 )
2880 .await
2881 .unwrap();
2882
2883 let mut batch = mmr.new_batch();
2884 for i in 0..16 {
2885 batch = batch.add(&hasher, &test_digest(i));
2886 }
2887 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2888 mmr.apply_batch(&batch).unwrap();
2889
2890 let end = mmr.leaves();
2891 let mut failures = Vec::new();
2892 for prune_leaf in 1..*end {
2893 let prune_loc = Location::<F>::new(prune_leaf);
2894 mmr.prune(prune_loc).await.unwrap();
2895 for loc_u64 in 0..*end {
2896 let loc = Location::<F>::new(loc_u64);
2897 let range_includes_pruned_leaf = loc < prune_loc;
2898 match mmr.historical_proof(&hasher, end, loc, 0).await {
2899 Ok(_) => {}
2900 Err(Error::ElementPruned(_)) if range_includes_pruned_leaf => {}
2901 Err(Error::ElementPruned(_)) => failures.push(format!(
2902 "prune_loc={prune_loc} loc={loc} returned ElementPruned without a pruned range element"
2903 )),
2904 Err(err) => failures
2905 .push(format!("prune_loc={prune_loc} loc={loc} err={err}")),
2906 }
2907 }
2908 }
2909
2910 assert!(
2911 failures.is_empty(),
2912 "historical proof generation returned unexpected errors: {failures:?}"
2913 );
2914
2915 mmr.destroy().await.unwrap();
2916 }
2917
2918 #[test_traced]
2919 fn test_full_historical_proof_non_size_prune_excludes_pruned_leaves_mmr() {
2920 let executor = deterministic::Runner::default();
2921 executor.start(
2922 full_historical_proof_non_size_prune_excludes_pruned_leaves_inner::<mmr::Family>,
2923 );
2924 }
2925
2926 #[test_traced]
2927 fn test_full_historical_proof_non_size_prune_excludes_pruned_leaves_mmb() {
2928 let executor = deterministic::Runner::default();
2929 executor.start(
2930 full_historical_proof_non_size_prune_excludes_pruned_leaves_inner::<mmb::Family>,
2931 );
2932 }
2933
2934 async fn full_init_sync_recovers_from_invalid_journal_size_inner<F: Family>(
2937 context: deterministic::Context,
2938 ) {
2939 let hasher = Standard::<Sha256>::new(ForwardFold);
2940
2941 let mmr = Merkle::<F, _, Digest, Sequential>::init(
2943 context.child("init"),
2944 &hasher,
2945 test_config(&context),
2946 )
2947 .await
2948 .unwrap();
2949 let mut batch = mmr.new_batch();
2950 for i in 0..3 {
2951 batch = batch.add(&hasher, &test_digest(i));
2952 }
2953 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2954 mmr.apply_batch(&batch).unwrap();
2955 let valid_size = mmr.size();
2956 let valid_root = mmr.root(&hasher, 0).unwrap();
2957 mmr.sync().await.unwrap();
2958 drop(mmr);
2959
2960 {
2964 let journal: Journal<_, Digest> = Journal::init(
2965 context.child("corrupt"),
2966 JConfig {
2967 partition: "journal-partition".into(),
2968 items_per_blob: NZU64!(7),
2969 write_buffer: NZUsize!(1024),
2970 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2971 },
2972 )
2973 .await
2974 .unwrap();
2975 assert_eq!(journal.size().await, valid_size);
2976 journal.append(&Sha256::hash(b"orphan")).await.unwrap();
2977 journal.sync().await.unwrap();
2978 assert_eq!(journal.size().await, valid_size + 1);
2979 }
2980
2981 let sync_cfg = SyncConfig::<F, Digest, Sequential> {
2983 config: test_config(&context),
2984 range: non_empty_range!(Location::<F>::new(0), Location::<F>::new(100)),
2985 pinned_nodes: None,
2986 };
2987 let sync_mmr =
2988 Merkle::<F, _, Digest, Sequential>::init_sync(context.child("sync"), sync_cfg)
2989 .await
2990 .unwrap();
2991
2992 assert_eq!(sync_mmr.size(), valid_size);
2993 assert_eq!(sync_mmr.root(&hasher, 0).unwrap(), valid_root);
2994
2995 sync_mmr.destroy().await.unwrap();
2996 }
2997
2998 #[test_traced]
2999 fn test_init_sync_recovers_from_invalid_journal_size_mmr() {
3000 let executor = deterministic::Runner::default();
3001 executor.start(full_init_sync_recovers_from_invalid_journal_size_inner::<mmr::Family>);
3002 }
3003
3004 #[test_traced]
3005 fn test_init_sync_recovers_from_invalid_journal_size_mmb() {
3006 let executor = deterministic::Runner::default();
3007 executor.start(full_init_sync_recovers_from_invalid_journal_size_inner::<mmb::Family>);
3008 }
3009
3010 async fn full_stale_batch_inner<F: Family>(context: deterministic::Context) {
3011 let hasher: Standard<Sha256> = Standard::new(ForwardFold);
3012 let mmr = Merkle::<F, _, Digest, Sequential>::init(
3013 context.child("storage"),
3014 &Standard::<Sha256>::new(ForwardFold),
3015 test_config(&context),
3016 )
3017 .await
3018 .unwrap();
3019
3020 let batch_a = mmr.new_batch().add(&hasher, b"leaf-a");
3022 let batch_a = mmr.with_mem(|mem| batch_a.merkleize(mem, &hasher));
3023 let batch_b = mmr.new_batch().add(&hasher, b"leaf-b");
3024 let batch_b = mmr.with_mem(|mem| batch_b.merkleize(mem, &hasher));
3025
3026 mmr.apply_batch(&batch_a).unwrap();
3028
3029 let result = mmr.apply_batch(&batch_b);
3031 assert!(
3032 matches!(result, Err(Error::StaleBatch { .. })),
3033 "expected StaleBatch, got {result:?}"
3034 );
3035
3036 mmr.destroy().await.unwrap();
3037 }
3038
3039 #[test]
3040 fn test_stale_batch_mmr() {
3041 let executor = deterministic::Runner::default();
3042 executor.start(full_stale_batch_inner::<mmr::Family>);
3043 }
3044
3045 #[test]
3046 fn test_stale_batch_mmb() {
3047 let executor = deterministic::Runner::default();
3048 executor.start(full_stale_batch_inner::<mmb::Family>);
3049 }
3050
3051 async fn full_new_batch_returns_append_only_wrapper_inner<F: Family>(
3053 context: deterministic::Context,
3054 ) {
3055 let hasher = Standard::<Sha256>::new(ForwardFold);
3056 let mmr = Merkle::<F, _, Digest, Sequential>::init(
3057 context.child("storage"),
3058 &hasher,
3059 test_config(&context),
3060 )
3061 .await
3062 .unwrap();
3063
3064 let _batch: UnmerkleizedBatch<F, Digest, Sequential> = mmr.new_batch();
3065
3066 mmr.destroy().await.unwrap();
3067 }
3068
3069 #[test_traced]
3070 fn test_new_batch_returns_append_only_wrapper_mmr() {
3071 let executor = deterministic::Runner::default();
3072 executor.start(full_new_batch_returns_append_only_wrapper_inner::<mmr::Family>);
3073 }
3074
3075 #[test_traced]
3076 fn test_new_batch_returns_append_only_wrapper_mmb() {
3077 let executor = deterministic::Runner::default();
3078 executor.start(full_new_batch_returns_append_only_wrapper_inner::<mmb::Family>);
3079 }
3080
3081 async fn full_update_leaf_after_sync_returns_pruned_inner<F: Family>(
3086 context: deterministic::Context,
3087 ) {
3088 let hasher = Standard::<Sha256>::new(ForwardFold);
3089 let mmr = Merkle::<F, _, Digest, Sequential>::init(
3090 context.child("storage"),
3091 &hasher,
3092 test_config(&context),
3093 )
3094 .await
3095 .unwrap();
3096
3097 let mut batch = mmr.new_batch();
3099 for i in 0..50 {
3100 batch = batch.add(&hasher, &test_digest(i));
3101 }
3102 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
3103 mmr.apply_batch(&batch).unwrap();
3104 mmr.sync().await.unwrap();
3105
3106 let batch = mmr.to_batch().new_batch();
3110 let result = batch.update_leaf(&hasher, Location::<F>::new(0), b"updated");
3111 assert!(matches!(result, Err(Error::ElementPruned(_))));
3112
3113 mmr.destroy().await.unwrap();
3114 }
3115
3116 #[test_traced]
3117 fn test_update_leaf_after_sync_returns_pruned_mmr() {
3118 let executor = deterministic::Runner::default();
3119 executor.start(full_update_leaf_after_sync_returns_pruned_inner::<mmr::Family>);
3120 }
3121
3122 #[test_traced]
3123 fn test_update_leaf_after_sync_returns_pruned_mmb() {
3124 let executor = deterministic::Runner::default();
3125 executor.start(full_update_leaf_after_sync_returns_pruned_inner::<mmb::Family>);
3126 }
3127}