1use crate::{
10 journal::{
11 contiguous::{
12 fixed::{Config as JConfig, Journal},
13 Many, Reader,
14 },
15 Error as JError,
16 },
17 merkle::{
18 batch,
19 hasher::Hasher,
20 mem::{Config as MemConfig, Mem},
21 Error, Family, Location, Position, Proof, Readable,
22 },
23 metadata::{Config as MConfig, Metadata},
24};
25use commonware_codec::DecodeExt;
26use commonware_cryptography::Digest;
27use commonware_parallel::ThreadPool;
28use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage as RStorage};
29use commonware_utils::{
30 sequence::prefixed_u64::U64,
31 sync::{AsyncMutex, RwLock},
32};
33use std::{
34 collections::BTreeMap,
35 num::{NonZeroU64, NonZeroUsize},
36 sync::Arc,
37};
38use tracing::{debug, error, warn};
39
40pub struct UnmerkleizedBatch<F: Family, D: Digest> {
48 inner: batch::UnmerkleizedBatch<F, D>,
49}
50
51impl<F: Family, D: Digest> UnmerkleizedBatch<F, D> {
52 pub fn add(self, hasher: &impl Hasher<F, Digest = D>, element: &[u8]) -> Self {
54 Self {
55 inner: self.inner.add(hasher, element),
56 }
57 }
58
59 pub fn add_leaf_digest(self, digest: D) -> Self {
61 Self {
62 inner: self.inner.add_leaf_digest(digest),
63 }
64 }
65
66 pub fn leaves(&self) -> Location<F> {
68 self.inner.leaves()
69 }
70
71 #[cfg(feature = "std")]
73 pub fn with_pool(self, pool: Option<ThreadPool>) -> Self {
74 Self {
75 inner: self.inner.with_pool(pool),
76 }
77 }
78
79 pub fn merkleize(
82 self,
83 base: &Mem<F, D>,
84 hasher: &impl Hasher<F, Digest = D>,
85 ) -> Arc<batch::MerkleizedBatch<F, D>> {
86 self.inner.merkleize(base, hasher)
87 }
88}
89
90pub(crate) struct Inner<F: Family, D: Digest> {
92 pub(crate) mem: Mem<F, D>,
96
97 pub(crate) pruned_to_pos: Position<F>,
100}
101
102#[derive(Clone)]
104pub struct Config {
105 pub journal_partition: String,
108
109 pub metadata_partition: String,
113
114 pub items_per_blob: NonZeroU64,
116
117 pub write_buffer: NonZeroUsize,
119
120 pub thread_pool: Option<ThreadPool>,
122
123 pub page_cache: CacheRef,
125}
126
127pub struct SyncConfig<F: Family, D: Digest> {
134 pub config: Config,
136
137 pub range: std::ops::Range<Location<F>>,
139
140 pub pinned_nodes: Option<Vec<D>>,
144}
145
146pub struct Journaled<F: Family, E: RStorage + Clock + Metrics, D: Digest> {
148 pub(crate) inner: RwLock<Inner<F, D>>,
150
151 pub(crate) journal: Journal<E, D>,
153
154 pub(crate) metadata: Metadata<E, U64, Vec<u8>>,
158
159 pub(crate) sync_lock: AsyncMutex<()>,
161
162 pub(crate) pool: Option<ThreadPool>,
164}
165
166const NODE_PREFIX: u8 = 0;
168
169pub(crate) const PRUNED_TO_PREFIX: u8 = 1;
171
172impl<F: Family, E: RStorage + Clock + Metrics, D: Digest> Journaled<F, E, D> {
173 pub fn size(&self) -> Position<F> {
176 self.inner.read().mem.size()
177 }
178
179 pub fn leaves(&self) -> Location<F> {
181 self.inner.read().mem.leaves()
182 }
183
184 async fn get_from_metadata_or_journal(
188 metadata: &Metadata<E, U64, Vec<u8>>,
189 journal: &Journal<E, D>,
190 pos: Position<F>,
191 ) -> Result<D, Error<F>> {
192 if let Some(bytes) = metadata.get(&U64::new(NODE_PREFIX, *pos)) {
193 debug!(?pos, "read node from metadata");
194 let digest = D::decode(bytes.as_ref());
195 let Ok(digest) = digest else {
196 error!(
197 ?pos,
198 err = %digest.expect_err("digest is Err in else branch"),
199 "could not convert node from metadata bytes to digest"
200 );
201 return Err(Error::DataCorrupted(
202 "could not read digest at requested pos",
203 ));
204 };
205 return Ok(digest);
206 }
207
208 debug!(?pos, "reading node from journal");
210 let node = journal.reader().await.read(*pos).await;
211 match node {
212 Ok(node) => Ok(node),
213 Err(JError::ItemPruned(_)) => {
214 error!(?pos, "node is missing from metadata and journal");
215 Err(Error::MissingNode(pos))
216 }
217 Err(e) => Err(Error::Journal(e)),
218 }
219 }
220
221 pub fn bounds(&self) -> std::ops::Range<Location<F>> {
224 let inner = self.inner.read();
225 Location::try_from(inner.pruned_to_pos).expect("valid pruned_to_pos")..inner.mem.leaves()
226 }
227
228 async fn add_extra_pinned_nodes(
230 mem: &mut Mem<F, D>,
231 metadata: &Metadata<E, U64, Vec<u8>>,
232 journal: &Journal<E, D>,
233 prune_pos: Position<F>,
234 ) -> Result<(), Error<F>> {
235 let prune_loc = Location::try_from(prune_pos).expect("valid prune_pos");
236 let mut pinned_nodes = BTreeMap::new();
237 for pos in F::nodes_to_pin(prune_loc) {
238 let digest = Self::get_from_metadata_or_journal(metadata, journal, pos).await?;
239 pinned_nodes.insert(pos, digest);
240 }
241 mem.add_pinned_nodes(pinned_nodes);
242
243 Ok(())
244 }
245
246 pub async fn init(
248 context: E,
249 hasher: &impl Hasher<F, Digest = D>,
250 cfg: Config,
251 ) -> Result<Self, Error<F>> {
252 let journal_cfg = JConfig {
253 partition: cfg.journal_partition,
254 items_per_blob: cfg.items_per_blob,
255 page_cache: cfg.page_cache,
256 write_buffer: cfg.write_buffer,
257 };
258 let journal =
259 Journal::<E, D>::init(context.with_label("merkle_journal"), journal_cfg).await?;
260 let mut journal_size = Position::<F>::new(journal.size().await);
261
262 let metadata_cfg = MConfig {
263 partition: cfg.metadata_partition,
264 codec_config: ((0..).into(), ()),
265 };
266 let metadata =
267 Metadata::<_, U64, Vec<u8>>::init(context.with_label("merkle_metadata"), metadata_cfg)
268 .await?;
269
270 if journal_size == 0 {
271 let mem = Mem::init(
272 MemConfig {
273 nodes: vec![],
274 pruning_boundary: Location::new(0),
275 pinned_nodes: vec![],
276 },
277 hasher,
278 )?;
279 return Ok(Self {
280 inner: RwLock::new(Inner {
281 mem,
282 pruned_to_pos: Position::new(0),
283 }),
284 journal,
285 metadata,
286 sync_lock: AsyncMutex::new(()),
287 pool: cfg.thread_pool,
288 });
289 }
290
291 let key: U64 = U64::new(PRUNED_TO_PREFIX, 0);
295 let metadata_pruned_to = Location::<F>::new(metadata.get(&key).map_or(0, |bytes| {
296 u64::from_be_bytes(
297 bytes
298 .as_slice()
299 .try_into()
300 .expect("metadata pruned_to is not 8 bytes"),
301 )
302 }));
303 let metadata_prune_pos = Position::try_from(metadata_pruned_to)?;
304 let journal_bounds_start = journal.reader().await.bounds().start;
305 if *metadata_prune_pos > journal_bounds_start {
306 journal.prune(*metadata_prune_pos).await?;
309 if journal.reader().await.bounds().start != journal_bounds_start {
310 warn!(
313 journal_bounds_start,
314 ?metadata_prune_pos,
315 "journal pruned to match metadata"
316 );
317 }
318 } else if *metadata_prune_pos < journal_bounds_start {
319 warn!(
322 ?metadata_prune_pos,
323 journal_bounds_start, "metadata stale, using journal pruning boundary"
324 );
325 }
326
327 let journal_boundary_pos = Position::<F>::new(journal_bounds_start);
333 let journal_boundary_floor = F::to_nearest_size(journal_boundary_pos);
334 let journal_boundary_leaf_aligned_pos = if journal_boundary_floor == journal_boundary_pos {
335 journal_boundary_floor
338 } else {
339 Position::try_from(Location::try_from(journal_boundary_floor)? + 1)?
342 };
343 let effective_prune_pos =
344 std::cmp::max(metadata_prune_pos, journal_boundary_leaf_aligned_pos);
345
346 let last_valid_size = F::to_nearest_size(journal_size);
347 let mut orphaned_leaf: Option<D> = None;
348 if last_valid_size != journal_size {
349 warn!(
350 ?last_valid_size,
351 "encountered invalid structure, recovering from last valid size"
352 );
353 let recovered_item = journal.reader().await.read(*last_valid_size).await;
356 if let Ok(item) = recovered_item {
357 orphaned_leaf = Some(item);
358 }
359 journal.rewind(*last_valid_size).await?;
360 journal.sync().await?;
361 journal_size = last_valid_size
362 }
363
364 let journal_leaves = Location::try_from(journal_size)?;
366 let mut pinned_nodes = Vec::new();
367 for pos in F::nodes_to_pin(journal_leaves) {
368 let digest = Self::get_from_metadata_or_journal(&metadata, &journal, pos).await?;
369 pinned_nodes.push(digest);
370 }
371 let mut mem = Mem::init(
372 MemConfig {
373 nodes: vec![],
374 pruning_boundary: journal_leaves,
375 pinned_nodes,
376 },
377 hasher,
378 )?;
379 Self::add_extra_pinned_nodes(&mut mem, &metadata, &journal, effective_prune_pos).await?;
380
381 if let Some(leaf) = orphaned_leaf {
382 let pos = mem.size();
384 warn!(?pos, "recovering orphaned leaf");
385 let batch = mem
386 .new_batch()
387 .add_leaf_digest(leaf)
388 .merkleize(&mem, hasher);
389 mem.apply_batch(&batch)?;
390 assert_eq!(pos, journal_size);
391
392 for p in journal.size().await..*mem.size() {
394 let p = Position::new(p);
395 let node = *mem.get_node_unchecked(p);
396 journal.append(&node).await?;
397 }
398 journal.sync().await?;
399 assert_eq!(mem.size(), journal.size().await);
400
401 let effective_prune_loc =
403 Location::try_from(effective_prune_pos).expect("valid effective_prune_pos");
404 let mut pn = BTreeMap::new();
405 for p in F::nodes_to_pin(effective_prune_loc) {
406 let d = mem.get_node_unchecked(p);
407 pn.insert(p, *d);
408 }
409 mem.prune_all();
410 mem.add_pinned_nodes(pn);
411 }
412
413 Ok(Self {
414 inner: RwLock::new(Inner {
415 mem,
416 pruned_to_pos: effective_prune_pos,
417 }),
418 journal,
419 metadata,
420 sync_lock: AsyncMutex::new(()),
421 pool: cfg.thread_pool,
422 })
423 }
424
425 pub async fn init_sync(
440 context: E,
441 cfg: SyncConfig<F, D>,
442 hasher: &impl Hasher<F, Digest = D>,
443 ) -> Result<Self, Error<F>> {
444 let prune_pos = Position::try_from(cfg.range.start)?;
445 let end_pos = Position::try_from(cfg.range.end)?;
446 let journal_cfg = JConfig {
447 partition: cfg.config.journal_partition.clone(),
448 items_per_blob: cfg.config.items_per_blob,
449 write_buffer: cfg.config.write_buffer,
450 page_cache: cfg.config.page_cache.clone(),
451 };
452
453 let journal: Journal<E, D> =
455 Journal::init(context.with_label("merkle_journal"), journal_cfg).await?;
456 let mut journal_size = Position::<F>::new(journal.size().await);
457
458 let last_valid_size = F::to_nearest_size(journal_size);
461 if last_valid_size != journal_size {
462 warn!(
463 ?last_valid_size,
464 "init_sync: encountered invalid structure, recovering from last valid size"
465 );
466 journal.rewind(*last_valid_size).await?;
467 journal.sync().await?;
468 journal_size = last_valid_size;
469 }
470
471 assert!(!cfg.range.is_empty(), "range must not be empty");
473 if journal_size > *end_pos {
474 return Err(crate::journal::Error::ItemOutOfRange(*journal_size).into());
475 }
476 if journal_size <= *prune_pos && *prune_pos != 0 {
477 journal.clear_to_size(*prune_pos).await?;
478 journal_size = Position::new(journal.size().await);
479 }
480
481 let metadata_cfg = MConfig {
483 partition: cfg.config.metadata_partition,
484 codec_config: ((0..).into(), ()),
485 };
486 let mut metadata =
487 Metadata::init(context.with_label("merkle_metadata"), metadata_cfg).await?;
488
489 let pruning_boundary_key = U64::new(PRUNED_TO_PREFIX, 0);
491 metadata.put(
492 pruning_boundary_key,
493 cfg.range.start.as_u64().to_be_bytes().into(),
494 );
495
496 let prune_loc = Location::try_from(prune_pos)?;
500 let journal_leaves = Location::try_from(journal_size)?;
501 if let Some(pinned_nodes) = cfg.pinned_nodes {
502 let nodes_to_pin_persisted: Vec<_> = F::nodes_to_pin(prune_loc).collect();
504 if pinned_nodes.len() != nodes_to_pin_persisted.len() {
505 return Err(Error::<F>::InvalidPinnedNodes);
506 }
507 for (pos, digest) in nodes_to_pin_persisted.into_iter().zip(pinned_nodes.iter()) {
508 metadata.put(U64::new(NODE_PREFIX, *pos), digest.to_vec());
509 }
510 }
511
512 let nodes_to_pin_mem = F::nodes_to_pin(journal_leaves);
516 let mut mem_pinned_nodes = Vec::new();
517 for pos in nodes_to_pin_mem {
518 let digest = Self::get_from_metadata_or_journal(&metadata, &journal, pos).await?;
519 mem_pinned_nodes.push(digest);
520 }
521 let mut mem = Mem::init(
522 MemConfig {
523 nodes: vec![],
524 pruning_boundary: Location::try_from(journal_size)?,
525 pinned_nodes: mem_pinned_nodes,
526 },
527 hasher,
528 )?;
529
530 if prune_pos < journal_size {
533 Self::add_extra_pinned_nodes(&mut mem, &metadata, &journal, prune_pos).await?;
534 }
535
536 metadata.sync().await?;
538
539 journal.prune(*prune_pos).await?;
541
542 Ok(Self {
543 inner: RwLock::new(Inner {
544 mem,
545 pruned_to_pos: prune_pos,
546 }),
547 journal,
548 metadata,
549 sync_lock: AsyncMutex::new(()),
550 pool: cfg.config.thread_pool,
551 })
552 }
553
554 async fn update_metadata(
557 &mut self,
558 prune_to_pos: Position<F>,
559 ) -> Result<BTreeMap<Position<F>, D>, Error<F>> {
560 assert!(prune_to_pos >= self.inner.get_mut().pruned_to_pos);
561
562 let prune_loc = Location::try_from(prune_to_pos).expect("valid prune_to_pos");
563 let mut pinned_nodes = BTreeMap::new();
564 for pos in F::nodes_to_pin(prune_loc) {
565 let digest = self.get_node(pos).await?.expect(
566 "pinned node should exist if prune_to_pos is no less than self.pruned_to_pos",
567 );
568 self.metadata
569 .put(U64::new(NODE_PREFIX, *pos), digest.to_vec());
570 pinned_nodes.insert(pos, digest);
571 }
572
573 let key: U64 = U64::new(PRUNED_TO_PREFIX, 0);
574 self.metadata.put(
575 key,
576 Location::try_from(prune_to_pos)?
577 .as_u64()
578 .to_be_bytes()
579 .into(),
580 );
581
582 self.metadata.sync().await.map_err(Error::Metadata)?;
583
584 Ok(pinned_nodes)
585 }
586
587 pub async fn get_node(&self, position: Position<F>) -> Result<Option<D>, Error<F>> {
588 {
589 let inner = self.inner.read();
590 if let Some(node) = inner.mem.get_node(position) {
591 return Ok(Some(node));
592 }
593 }
594
595 match self.journal.reader().await.read(*position).await {
596 Ok(item) => Ok(Some(item)),
597 Err(JError::ItemPruned(_)) => Ok(None),
598 Err(e) => Err(Error::Journal(e)),
599 }
600 }
601
602 pub async fn sync(&self) -> Result<(), Error<F>> {
604 let _sync_guard = self.sync_lock.lock().await;
605
606 let journal_size = Position::<F>::new(self.journal.size().await);
607
608 let (sync_target_leaves, missing_nodes, pinned_nodes) = {
611 let inner = self.inner.read();
612 let size = inner.mem.size();
613 let sync_target_leaves = inner.mem.leaves();
614
615 assert!(
616 journal_size <= size,
617 "journal size should never exceed in-memory structure size"
618 );
619 if journal_size == size {
620 return Ok(());
621 }
622
623 let mut missing_nodes = Vec::with_capacity((*size - *journal_size) as usize);
624 for pos in *journal_size..*size {
625 let node = *inner.mem.get_node_unchecked(Position::new(pos));
626 missing_nodes.push(node);
627 }
628
629 let prune_loc = Location::try_from(inner.pruned_to_pos).expect("valid pruned_to_pos");
632 let mut pinned_nodes = BTreeMap::new();
633 for pos in F::nodes_to_pin(prune_loc) {
634 let digest = inner.mem.get_node_unchecked(pos);
635 pinned_nodes.insert(pos, *digest);
636 }
637
638 (sync_target_leaves, missing_nodes, pinned_nodes)
639 };
640
641 self.journal.append_many(Many::Flat(&missing_nodes)).await?;
643
644 self.journal.sync().await?;
646
647 {
651 let mut inner = self.inner.write();
652 inner
653 .mem
654 .prune(sync_target_leaves)
655 .expect("captured leaves is in bounds");
656 inner.mem.add_pinned_nodes(pinned_nodes);
657 }
658
659 Ok(())
660 }
661
662 pub async fn prune(&mut self, loc: Location<F>) -> Result<(), Error<F>> {
670 let pos = Position::try_from(loc)?;
671 {
672 let inner = self.inner.get_mut();
673 if loc > inner.mem.leaves() {
674 return Err(Error::LeafOutOfBounds(loc));
675 }
676 if pos <= inner.pruned_to_pos {
677 return Ok(());
678 }
679 }
680
681 self.sync().await?;
683
684 let pinned_nodes = self.update_metadata(pos).await?;
687
688 self.journal.prune(*pos).await?;
689 let inner = self.inner.get_mut();
690 inner.mem.add_pinned_nodes(pinned_nodes);
691 inner.pruned_to_pos = pos;
692
693 Ok(())
694 }
695
696 pub fn root(&self) -> D {
698 *self.inner.read().mem.root()
699 }
700
701 pub async fn prune_all(&mut self) -> Result<(), Error<F>> {
704 let leaves = self.inner.get_mut().mem.leaves();
705 if leaves != 0 {
706 self.prune(leaves).await?;
707 }
708 Ok(())
709 }
710
711 pub async fn destroy(self) -> Result<(), Error<F>> {
713 self.journal.destroy().await?;
714 self.metadata.destroy().await?;
715
716 Ok(())
717 }
718
719 #[cfg(any(test, feature = "fuzzing"))]
720 pub async fn simulate_partial_sync(&mut self, write_limit: usize) -> Result<(), Error<F>> {
723 if write_limit == 0 {
724 return Ok(());
725 }
726
727 let inner = self.inner.get_mut();
728 let journal_size = Position::<F>::new(self.journal.size().await);
729
730 let mut written_count = 0usize;
733 for i in *journal_size..*inner.mem.size() {
734 let node = *inner.mem.get_node_unchecked(Position::new(i));
735 self.journal.append(&node).await?;
736 written_count += 1;
737 if written_count >= write_limit {
738 break;
739 }
740 }
741 self.journal.sync().await?;
742
743 Ok(())
744 }
745
746 #[cfg(test)]
747 pub fn get_pinned_nodes(&self) -> BTreeMap<Position<F>, D> {
748 self.inner.read().mem.pinned_nodes()
749 }
750
751 #[cfg(test)]
752 pub async fn simulate_pruning_failure(mut self, prune_to: Location<F>) -> Result<(), Error<F>> {
753 let prune_to_pos = Position::try_from(prune_to)?;
754 assert!(prune_to_pos <= self.inner.get_mut().mem.size());
755
756 self.sync().await?;
758
759 self.update_metadata(prune_to_pos).await?;
762
763 Ok(())
765 }
766
767 pub fn apply_batch(&mut self, batch: &batch::MerkleizedBatch<F, D>) -> Result<(), Error<F>> {
774 self.inner.get_mut().mem.apply_batch(batch)?;
775 Ok(())
776 }
777
778 pub(crate) fn to_batch(&self) -> Arc<batch::MerkleizedBatch<F, D>> {
783 let inner = self.inner.read();
784 let mut batch = batch::MerkleizedBatch::from_mem(&inner.mem);
785 #[cfg(feature = "std")]
786 if let Some(pool) = &self.pool {
787 Arc::get_mut(&mut batch).expect("just created").pool = Some(pool.clone());
788 }
789 batch
790 }
791
792 pub fn with_mem<R>(&self, f: impl FnOnce(&Mem<F, D>) -> R) -> R {
795 let inner = self.inner.read();
796 f(&inner.mem)
797 }
798
799 pub fn new_batch(&self) -> UnmerkleizedBatch<F, D> {
801 let inner = self.inner.read();
802 let root = batch::MerkleizedBatch::from_mem(&inner.mem);
803 drop(inner);
804 UnmerkleizedBatch {
805 inner: root.new_batch(),
806 }
807 .with_pool(self.pool())
808 }
809
810 pub fn pool(&self) -> Option<ThreadPool> {
812 self.pool.clone()
813 }
814
815 pub(crate) async fn rewind(
823 &mut self,
824 leaves_to_remove: usize,
825 hasher: &impl Hasher<F, Digest = D>,
826 ) -> Result<(), Error<F>> {
827 if leaves_to_remove == 0 {
828 return Ok(());
829 }
830
831 let current_leaves = *self.leaves();
832 let destination_leaf = match current_leaves.checked_sub(leaves_to_remove as u64) {
833 Some(dest) => dest,
834 None => {
835 let pruned_to_pos = self.inner.get_mut().pruned_to_pos;
836 return Err(if pruned_to_pos == 0 {
837 Error::Empty
838 } else {
839 Error::ElementPruned(pruned_to_pos - 1)
840 });
841 }
842 };
843
844 let destination_loc = Location::new(destination_leaf);
845 let new_size = Position::try_from(destination_loc).expect("valid leaf");
846
847 let pruned_to_pos = self.inner.get_mut().pruned_to_pos;
848 if new_size < pruned_to_pos {
849 return Err(Error::ElementPruned(new_size));
850 }
851
852 let journal_size = Position::<F>::new(self.journal.size().await);
854 if new_size < journal_size {
855 self.journal.rewind(*new_size).await?;
856 self.journal.sync().await?;
857 }
858
859 let inner = self.inner.get_mut();
863 if new_size >= Position::try_from(inner.mem.bounds().start).expect("valid mem bounds start")
864 {
865 inner.mem.truncate(new_size, hasher);
866 } else {
867 let mut pinned_nodes = Vec::new();
868 for pos in F::nodes_to_pin(destination_loc) {
869 pinned_nodes.push(
870 Self::get_from_metadata_or_journal(&self.metadata, &self.journal, pos).await?,
871 );
872 }
873 inner.mem = Mem::from_components(hasher, vec![], destination_loc, pinned_nodes)?;
874 Self::add_extra_pinned_nodes(
875 &mut inner.mem,
876 &self.metadata,
877 &self.journal,
878 inner.pruned_to_pos,
879 )
880 .await?;
881 }
882
883 Ok(())
884 }
885}
886
887impl<F: Family, E: RStorage + Clock + Metrics, D: Digest> Readable for Journaled<F, E, D> {
895 type Family = F;
896 type Digest = D;
897 type Error = Error<F>;
898
899 fn size(&self) -> Position<F> {
900 self.size()
901 }
902
903 fn get_node(&self, pos: Position<F>) -> Option<D> {
904 self.inner.read().mem.get_node(pos)
905 }
906
907 fn root(&self) -> D {
908 *self.inner.read().mem.root()
909 }
910
911 fn pruning_boundary(&self) -> Location<F> {
912 self.inner.read().mem.pruning_boundary()
913 }
914
915 fn proof(
916 &self,
917 hasher: &impl Hasher<F, Digest = D>,
918 loc: Location<F>,
919 ) -> Result<Proof<F, D>, Error<F>> {
920 if !loc.is_valid_index() {
921 return Err(Error::LocationOverflow(loc));
922 }
923 crate::merkle::proof::build_range_proof(
924 hasher,
925 self.leaves(),
926 loc..loc + 1,
927 |pos| <Self as Readable>::get_node(self, pos),
928 Error::ElementPruned,
929 )
930 .map_err(|e| match e {
931 Error::RangeOutOfBounds(_) => Error::LeafOutOfBounds(loc),
932 _ => e,
933 })
934 }
935
936 fn range_proof(
937 &self,
938 hasher: &impl Hasher<F, Digest = D>,
939 range: core::ops::Range<Location<F>>,
940 ) -> Result<Proof<F, D>, Error<F>> {
941 crate::merkle::proof::build_range_proof(
942 hasher,
943 self.leaves(),
944 range,
945 |pos| <Self as Readable>::get_node(self, pos),
946 Error::ElementPruned,
947 )
948 }
949}
950
951impl<F: Family, E: RStorage + Clock + Metrics + Sync, D: Digest> crate::merkle::storage::Storage<F>
952 for Journaled<F, E, D>
953{
954 type Digest = D;
955
956 async fn size(&self) -> Position<F> {
957 self.size()
958 }
959
960 async fn get_node(&self, position: Position<F>) -> Result<Option<D>, Error<F>> {
961 Self::get_node(self, position).await
962 }
963}
964
965impl<F: Family, E: RStorage + Clock + Metrics, D: Digest> Journaled<F, E, D> {
966 pub async fn historical_proof(
977 &self,
978 hasher: &impl Hasher<F, Digest = D>,
979 leaves: Location<F>,
980 loc: Location<F>,
981 ) -> Result<Proof<F, D>, Error<F>> {
982 if !loc.is_valid_index() {
983 return Err(Error::LocationOverflow(loc));
984 }
985 self.historical_range_proof(hasher, leaves, loc..loc + 1)
987 .await
988 }
989
990 pub async fn historical_range_proof(
1002 &self,
1003 hasher: &impl Hasher<F, Digest = D>,
1004 leaves: Location<F>,
1005 range: core::ops::Range<Location<F>>,
1006 ) -> Result<Proof<F, D>, Error<F>> {
1007 if leaves > self.leaves() {
1008 return Err(Error::RangeOutOfBounds(leaves));
1009 }
1010 crate::merkle::verification::historical_range_proof(hasher, self, leaves, range).await
1011 }
1012
1013 pub async fn proof(
1026 &self,
1027 hasher: &impl Hasher<F, Digest = D>,
1028 loc: Location<F>,
1029 ) -> Result<Proof<F, D>, Error<F>> {
1030 if !loc.is_valid_index() {
1031 return Err(Error::LocationOverflow(loc));
1032 }
1033 self.range_proof(hasher, loc..loc + 1).await
1035 }
1036
1037 pub async fn range_proof(
1049 &self,
1050 hasher: &impl Hasher<F, Digest = D>,
1051 range: core::ops::Range<Location<F>>,
1052 ) -> Result<Proof<F, D>, Error<F>> {
1053 self.historical_range_proof(hasher, self.leaves(), range)
1054 .await
1055 }
1056}
1057
1058#[cfg(test)]
1059mod tests {
1060 use super::*;
1061 use crate::{
1062 journal::contiguous::fixed::{Config as JConfig, Journal},
1063 merkle::{hasher::Standard, mmb, mmr, Location, LocationRangeExt as _, Position, Proof},
1064 metadata::{Config as MConfig, Metadata},
1065 };
1066 use commonware_cryptography::{
1067 sha256::{self, Digest},
1068 Hasher as _, Sha256,
1069 };
1070 use commonware_macros::test_traced;
1071 use commonware_runtime::{buffer::paged::CacheRef, deterministic, BufferPooler, Runner};
1072 use commonware_utils::{sequence::prefixed_u64::U64, NZUsize, NZU16, NZU64};
1073 use std::{
1074 collections::BTreeMap,
1075 num::{NonZeroU16, NonZeroUsize},
1076 };
1077
1078 fn test_digest(v: usize) -> Digest {
1079 Sha256::hash(&v.to_be_bytes())
1080 }
1081
1082 const PAGE_SIZE: NonZeroU16 = NZU16!(111);
1083 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(5);
1084
1085 fn test_config(pooler: &impl BufferPooler) -> Config {
1086 Config {
1087 journal_partition: "journal-partition".into(),
1088 metadata_partition: "metadata-partition".into(),
1089 items_per_blob: NZU64!(7),
1090 write_buffer: NZUsize!(1024),
1091 thread_pool: None,
1092 page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
1093 }
1094 }
1095
1096 async fn journaled_empty_inner<F: Family>(context: deterministic::Context) {
1097 let hasher: Standard<Sha256> = Standard::new();
1098 let mut mmr = Journaled::<F, _, Digest>::init(
1099 context.with_label("first"),
1100 &hasher,
1101 test_config(&context),
1102 )
1103 .await
1104 .unwrap();
1105 assert_eq!(mmr.size(), 0);
1106 assert!(mmr.get_node(Position::<F>::new(0)).await.is_err());
1107 let bounds = mmr.bounds();
1108 assert!(bounds.is_empty());
1109 assert!(mmr.prune_all().await.is_ok());
1110 assert_eq!(bounds.start, 0);
1111 assert!(mmr.prune(Location::<F>::new(0)).await.is_ok());
1112 assert!(mmr.sync().await.is_ok());
1113 assert!(matches!(mmr.rewind(1, &hasher).await, Err(Error::Empty)));
1114
1115 let batch = mmr.new_batch().add(&hasher, &test_digest(0));
1116 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1117 mmr.apply_batch(&batch).unwrap();
1118 assert_eq!(mmr.size(), 1);
1119 mmr.sync().await.unwrap();
1120 assert!(mmr.get_node(Position::<F>::new(0)).await.is_ok());
1121 assert!(mmr.rewind(1, &hasher).await.is_ok());
1122 assert_eq!(mmr.size(), 0);
1123 mmr.sync().await.unwrap();
1124
1125 let mut mmr = Journaled::<F, _, Digest>::init(
1126 context.with_label("second"),
1127 &hasher,
1128 test_config(&context),
1129 )
1130 .await
1131 .unwrap();
1132 assert_eq!(mmr.size(), 0);
1133
1134 let empty_proof = Proof::<F, Digest>::default();
1135 let hasher: Standard<Sha256> = Standard::new();
1136 let root = mmr.root();
1137 assert!(empty_proof.verify_range_inclusion(
1138 &hasher,
1139 &[] as &[Digest],
1140 Location::<F>::new(0),
1141 &root
1142 ));
1143 assert!(empty_proof.verify_multi_inclusion(
1144 &hasher,
1145 &[] as &[(Digest, Location<F>)],
1146 &root
1147 ));
1148
1149 let batch = mmr.new_batch().add(&hasher, &test_digest(0));
1151 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1152 mmr.apply_batch(&batch).unwrap();
1153 let root = mmr.root();
1154 assert!(!empty_proof.verify_range_inclusion(
1155 &hasher,
1156 &[] as &[Digest],
1157 Location::<F>::new(0),
1158 &root
1159 ));
1160 assert!(!empty_proof.verify_multi_inclusion(
1161 &hasher,
1162 &[] as &[(Digest, Location<F>)],
1163 &root
1164 ));
1165
1166 mmr.destroy().await.unwrap();
1167 }
1168
1169 #[test_traced]
1170 fn test_journaled_empty_mmr() {
1171 let executor = deterministic::Runner::default();
1172 executor.start(journaled_empty_inner::<mmr::Family>);
1173 }
1174
1175 #[test_traced]
1176 fn test_journaled_empty_mmb() {
1177 let executor = deterministic::Runner::default();
1178 executor.start(journaled_empty_inner::<mmb::Family>);
1179 }
1180
1181 async fn journaled_prune_out_of_bounds_returns_error_inner<F: Family>(
1182 context: deterministic::Context,
1183 ) {
1184 let hasher = Standard::<Sha256>::new();
1185 let mut mmr = Journaled::<F, _, Digest>::init(
1186 context.with_label("oob_prune"),
1187 &hasher,
1188 test_config(&context),
1189 )
1190 .await
1191 .unwrap();
1192
1193 let batch = mmr.new_batch().add(&hasher, &test_digest(0));
1194 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1195 mmr.apply_batch(&batch).unwrap();
1196
1197 assert!(matches!(
1198 mmr.prune(Location::<F>::new(2)).await,
1199 Err(Error::LeafOutOfBounds(loc)) if loc == Location::<F>::new(2)
1200 ));
1201
1202 mmr.destroy().await.unwrap();
1203 }
1204
1205 #[test_traced]
1206 fn test_journaled_prune_out_of_bounds_returns_error_mmr() {
1207 let executor = deterministic::Runner::default();
1208 executor.start(journaled_prune_out_of_bounds_returns_error_inner::<mmr::Family>);
1209 }
1210
1211 #[test_traced]
1212 fn test_journaled_prune_out_of_bounds_returns_error_mmb() {
1213 let executor = deterministic::Runner::default();
1214 executor.start(journaled_prune_out_of_bounds_returns_error_inner::<mmb::Family>);
1215 }
1216
1217 async fn journaled_rewind_error_leaves_valid_state_inner<F: Family>(
1218 context: deterministic::Context,
1219 ) {
1220 let hasher: Standard<Sha256> = Standard::new();
1221
1222 let element_pruned_context = context.with_label("element_pruned_case");
1224 let mut mmr = Journaled::<F, _, Digest>::init(
1225 element_pruned_context.clone(),
1226 &hasher,
1227 test_config(&element_pruned_context),
1228 )
1229 .await
1230 .unwrap();
1231 let mut batch = mmr.new_batch();
1232 for i in 0u64..32 {
1233 batch = batch.add(&hasher, &i.to_be_bytes());
1234 }
1235 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1236 mmr.apply_batch(&batch).unwrap();
1237 mmr.prune(Location::<F>::new(8)).await.unwrap();
1238 let leaves_before = mmr.leaves();
1239 assert!(matches!(
1240 mmr.rewind(128, &hasher).await,
1241 Err(Error::ElementPruned(_))
1242 ));
1243 assert!(mmr.leaves() <= leaves_before);
1245 mmr.destroy().await.unwrap();
1246
1247 let empty_context = context.with_label("empty_case");
1249 let cfg = test_config(&empty_context);
1250 let mut mmr = Journaled::<F, _, Digest>::init(empty_context, &hasher, cfg)
1251 .await
1252 .unwrap();
1253 let mut batch = mmr.new_batch();
1254 for i in 0u64..8 {
1255 batch = batch.add(&hasher, &i.to_be_bytes());
1256 }
1257 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1258 mmr.apply_batch(&batch).unwrap();
1259 let leaves_before = mmr.leaves();
1260 assert!(matches!(mmr.rewind(9, &hasher).await, Err(Error::Empty)));
1261 assert_eq!(mmr.leaves(), leaves_before);
1263 mmr.destroy().await.unwrap();
1264 }
1265
1266 #[test_traced]
1267 fn test_journaled_rewind_error_leaves_valid_state_mmr() {
1268 let executor = deterministic::Runner::default();
1269 executor.start(journaled_rewind_error_leaves_valid_state_inner::<mmr::Family>);
1270 }
1271
1272 #[test_traced]
1273 fn test_journaled_rewind_error_leaves_valid_state_mmb() {
1274 let executor = deterministic::Runner::default();
1275 executor.start(journaled_rewind_error_leaves_valid_state_inner::<mmb::Family>);
1276 }
1277
1278 async fn journaled_basic_inner<F: Family>(context: deterministic::Context) {
1279 let hasher: Standard<Sha256> = Standard::new();
1280 let cfg = test_config(&context);
1281 let mut mmr = Journaled::<F, _, Digest>::init(context, &hasher, cfg)
1282 .await
1283 .unwrap();
1284 const LEAF_COUNT: usize = 255;
1286 let mut leaves = Vec::with_capacity(LEAF_COUNT);
1287 for i in 0..LEAF_COUNT {
1288 leaves.push(test_digest(i));
1289 }
1290 let mut batch = mmr.new_batch();
1291 for leaf in &leaves {
1292 batch = batch.add(&hasher, leaf);
1293 }
1294 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1295 mmr.apply_batch(&batch).unwrap();
1296 let expected_size = Position::<F>::try_from(Location::<F>::new(LEAF_COUNT as u64)).unwrap();
1297 assert_eq!(mmr.size(), expected_size);
1298
1299 const TEST_ELEMENT: usize = 133;
1301 let test_element_loc: Location<F> = Location::new(TEST_ELEMENT as u64);
1302
1303 let proof = mmr.proof(&hasher, test_element_loc).await.unwrap();
1304 let root = mmr.root();
1305 assert!(proof.verify_element_inclusion(
1306 &hasher,
1307 &leaves[TEST_ELEMENT],
1308 test_element_loc,
1309 &root,
1310 ));
1311
1312 mmr.sync().await.unwrap();
1314
1315 let proof2 = mmr.proof(&hasher, test_element_loc).await.unwrap();
1318 assert_eq!(proof, proof2);
1319
1320 let range = Location::<F>::new(TEST_ELEMENT as u64)..Location::<F>::new(LEAF_COUNT as u64);
1322 let proof = mmr.range_proof(&hasher, range.clone()).await.unwrap();
1323 assert!(proof.verify_range_inclusion(
1324 &hasher,
1325 &leaves[range.to_usize_range()],
1326 test_element_loc,
1327 &root
1328 ));
1329
1330 mmr.destroy().await.unwrap();
1331 }
1332
1333 #[test_traced]
1334 fn test_journaled_basic_mmr() {
1335 let executor = deterministic::Runner::default();
1336 executor.start(journaled_basic_inner::<mmr::Family>);
1337 }
1338
1339 #[test_traced]
1340 fn test_journaled_basic_mmb() {
1341 let executor = deterministic::Runner::default();
1342 executor.start(journaled_basic_inner::<mmb::Family>);
1343 }
1344
1345 async fn journaled_recovery_inner<F: Family>(context: deterministic::Context) {
1348 use crate::journal::contiguous::fixed::{Config as JConfig, Journal};
1349
1350 let hasher: Standard<Sha256> = Standard::new();
1351 let mut mmr = Journaled::<F, _, Digest>::init(
1352 context.with_label("first"),
1353 &hasher,
1354 test_config(&context),
1355 )
1356 .await
1357 .unwrap();
1358 assert_eq!(mmr.size(), 0);
1359
1360 const LEAF_COUNT: usize = 252;
1362 let mut leaves = Vec::with_capacity(LEAF_COUNT);
1363 for i in 0..LEAF_COUNT {
1364 leaves.push(test_digest(i));
1365 }
1366 let mut batch = mmr.new_batch();
1367 for leaf in &leaves {
1368 batch = batch.add(&hasher, leaf);
1369 }
1370 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1371 mmr.apply_batch(&batch).unwrap();
1372 let expected_size = Position::<F>::try_from(Location::<F>::new(LEAF_COUNT as u64)).unwrap();
1373 assert_eq!(mmr.size(), expected_size);
1374 mmr.sync().await.unwrap();
1375 drop(mmr);
1376
1377 {
1380 let journal: Journal<_, Digest> = Journal::init(
1381 context.with_label("corrupt"),
1382 JConfig {
1383 partition: "journal-partition".into(),
1384 items_per_blob: NZU64!(7),
1385 write_buffer: NZUsize!(1024),
1386 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1387 },
1388 )
1389 .await
1390 .unwrap();
1391 assert_eq!(journal.size().await, expected_size);
1392 journal.append(&Sha256::hash(b"orphan")).await.unwrap();
1393 journal.sync().await.unwrap();
1394 assert_eq!(journal.size().await, expected_size + 1);
1395 }
1396
1397 let mmr = Journaled::<F, _, Digest>::init(
1398 context.with_label("second"),
1399 &hasher,
1400 test_config(&context),
1401 )
1402 .await
1403 .unwrap();
1404 let recovered_size =
1407 Position::<F>::try_from(Location::<F>::new(LEAF_COUNT as u64 + 1)).unwrap();
1408 assert_eq!(mmr.size(), recovered_size);
1409
1410 drop(mmr);
1412 let mmr = Journaled::<F, _, Digest>::init(
1413 context.with_label("third"),
1414 &hasher,
1415 test_config(&context),
1416 )
1417 .await
1418 .unwrap();
1419 assert_eq!(mmr.size(), recovered_size);
1420
1421 mmr.destroy().await.unwrap();
1422 }
1423
1424 #[test_traced]
1425 fn test_journaled_recovery_mmr() {
1426 let executor = deterministic::Runner::default();
1427 executor.start(journaled_recovery_inner::<mmr::Family>);
1428 }
1429
1430 #[test_traced]
1431 fn test_journaled_recovery_mmb() {
1432 let executor = deterministic::Runner::default();
1433 executor.start(journaled_recovery_inner::<mmb::Family>);
1434 }
1435
1436 async fn journaled_pruning_inner<F: Family>(context: deterministic::Context) {
1437 let hasher: Standard<Sha256> = Standard::new();
1438 const LEAF_COUNT: usize = 2000;
1440 let cfg_pruned = test_config(&context);
1441 let mut pruned_mmr = Journaled::<F, _, Digest>::init(
1442 context.with_label("pruned"),
1443 &hasher,
1444 cfg_pruned.clone(),
1445 )
1446 .await
1447 .unwrap();
1448 let cfg_unpruned = Config {
1449 journal_partition: "unpruned-journal-partition".into(),
1450 metadata_partition: "unpruned-metadata-partition".into(),
1451 items_per_blob: NZU64!(7),
1452 write_buffer: NZUsize!(1024),
1453 thread_pool: None,
1454 page_cache: cfg_pruned.page_cache.clone(),
1455 };
1456 let mut mmr =
1457 Journaled::<F, _, Digest>::init(context.with_label("unpruned"), &hasher, cfg_unpruned)
1458 .await
1459 .unwrap();
1460 let mut leaves = Vec::with_capacity(LEAF_COUNT);
1461 for i in 0..LEAF_COUNT {
1462 leaves.push(test_digest(i));
1463 }
1464 let mut batch = mmr.new_batch();
1465 for leaf in &leaves {
1466 batch = batch.add(&hasher, leaf);
1467 }
1468 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1469 mmr.apply_batch(&batch).unwrap();
1470 let mut batch = pruned_mmr.new_batch();
1471 for leaf in &leaves {
1472 batch = batch.add(&hasher, leaf);
1473 }
1474 let batch = pruned_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1475 pruned_mmr.apply_batch(&batch).unwrap();
1476 let expected_size = Position::<F>::try_from(Location::<F>::new(LEAF_COUNT as u64)).unwrap();
1477 assert_eq!(mmr.size(), expected_size);
1478 assert_eq!(pruned_mmr.size(), expected_size);
1479
1480 for i in 0usize..300 {
1483 let prune_loc = Location::<F>::new(std::cmp::min(i as u64 * 10, *pruned_mmr.leaves()));
1484 pruned_mmr.prune(prune_loc).await.unwrap();
1485 assert_eq!(prune_loc, pruned_mmr.bounds().start);
1486
1487 let digest = test_digest(LEAF_COUNT + i);
1488 leaves.push(digest);
1489 let last_leaf = leaves.last().unwrap();
1490 let batch = pruned_mmr.new_batch().add(&hasher, last_leaf);
1491 let batch = pruned_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1492 pruned_mmr.apply_batch(&batch).unwrap();
1493 let batch = mmr.new_batch().add(&hasher, last_leaf);
1494 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1495 mmr.apply_batch(&batch).unwrap();
1496 assert_eq!(pruned_mmr.root(), mmr.root());
1497 }
1498
1499 pruned_mmr.sync().await.unwrap();
1501 assert_eq!(pruned_mmr.root(), mmr.root());
1502
1503 pruned_mmr.sync().await.unwrap();
1505 drop(pruned_mmr);
1506 let mut pruned_mmr = Journaled::<F, _, Digest>::init(
1507 context.with_label("pruned_reopen"),
1508 &hasher,
1509 cfg_pruned.clone(),
1510 )
1511 .await
1512 .unwrap();
1513 assert_eq!(pruned_mmr.root(), mmr.root());
1514
1515 let size = pruned_mmr.size();
1517 pruned_mmr.prune_all().await.unwrap();
1518 assert_eq!(pruned_mmr.root(), mmr.root());
1519 let bounds = pruned_mmr.bounds();
1520 assert!(bounds.is_empty());
1521 assert_eq!(bounds.start, Location::<F>::try_from(size).unwrap());
1522
1523 let batch = mmr.new_batch().add(&hasher, &test_digest(LEAF_COUNT));
1526 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1527 mmr.apply_batch(&batch).unwrap();
1528 let batch = pruned_mmr
1529 .new_batch()
1530 .add(&hasher, &test_digest(LEAF_COUNT));
1531 let batch = pruned_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1532 pruned_mmr.apply_batch(&batch).unwrap();
1533 assert!(*pruned_mmr.size() % cfg_pruned.items_per_blob != 0);
1534 pruned_mmr.sync().await.unwrap();
1535 drop(pruned_mmr);
1536 let mut pruned_mmr = Journaled::<F, _, Digest>::init(
1537 context.with_label("pruned_reopen2"),
1538 &hasher,
1539 cfg_pruned.clone(),
1540 )
1541 .await
1542 .unwrap();
1543 assert_eq!(pruned_mmr.root(), mmr.root());
1544 let bounds = pruned_mmr.bounds();
1545 assert!(!bounds.is_empty());
1546 assert_eq!(bounds.start, Location::<F>::try_from(size).unwrap());
1547
1548 assert!(pruned_mmr
1550 .prune(Location::<F>::try_from(size).unwrap() - 1)
1551 .await
1552 .is_ok());
1553 assert_eq!(
1554 pruned_mmr.bounds().start,
1555 Location::<F>::try_from(size).unwrap()
1556 );
1557
1558 while *pruned_mmr.size() % cfg_pruned.items_per_blob != 0 {
1561 let batch = pruned_mmr
1562 .new_batch()
1563 .add(&hasher, &test_digest(LEAF_COUNT));
1564 let batch = pruned_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1565 pruned_mmr.apply_batch(&batch).unwrap();
1566 }
1567 pruned_mmr.prune_all().await.unwrap();
1568 assert!(pruned_mmr.bounds().is_empty());
1569
1570 pruned_mmr.destroy().await.unwrap();
1571 mmr.destroy().await.unwrap();
1572 }
1573
1574 #[test_traced]
1575 fn test_journaled_pruning_mmr() {
1576 let executor = deterministic::Runner::default();
1577 executor.start(journaled_pruning_inner::<mmr::Family>);
1578 }
1579
1580 #[test_traced]
1581 fn test_journaled_pruning_mmb() {
1582 let executor = deterministic::Runner::default();
1583 executor.start(journaled_pruning_inner::<mmb::Family>);
1584 }
1585
1586 async fn journaled_recovery_with_pruning_inner<F: Family>(context: deterministic::Context) {
1588 let hasher: Standard<Sha256> = Standard::new();
1590 const LEAF_COUNT: usize = 2000;
1591 let mut leaves = Vec::with_capacity(LEAF_COUNT);
1592 let mut mmr = Journaled::<F, _, Digest>::init(
1593 context.with_label("init"),
1594 &hasher,
1595 test_config(&context),
1596 )
1597 .await
1598 .unwrap();
1599 for i in 0..LEAF_COUNT {
1600 leaves.push(test_digest(i));
1601 }
1602 let mut batch = mmr.new_batch();
1603 for leaf in &leaves {
1604 batch = batch.add(&hasher, leaf);
1605 }
1606 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1607 mmr.apply_batch(&batch).unwrap();
1608 let expected_size = Position::<F>::try_from(Location::<F>::new(LEAF_COUNT as u64)).unwrap();
1609 assert_eq!(mmr.size(), expected_size);
1610 mmr.sync().await.unwrap();
1611 drop(mmr);
1612
1613 for i in 0usize..200 {
1615 let label = format!("iter_{i}");
1616 let mut mmr = Journaled::<F, _, Digest>::init(
1617 context.with_label(&label),
1618 &hasher,
1619 test_config(&context),
1620 )
1621 .await
1622 .unwrap();
1623 let start_size = mmr.size();
1624 let start_leaves = *mmr.leaves();
1625 let prune_loc = Location::<F>::new(std::cmp::min(i as u64 * 50, start_leaves));
1626 if i % 5 == 0 {
1627 mmr.simulate_pruning_failure(prune_loc).await.unwrap();
1628 continue;
1629 }
1630 mmr.prune(prune_loc).await.unwrap();
1631
1632 for j in 0..10 {
1634 let digest = test_digest(100 * (i + 1) + j);
1635 leaves.push(digest);
1636 let batch = mmr
1637 .new_batch()
1638 .add(&hasher, leaves.last().unwrap())
1639 .add(&hasher, leaves.last().unwrap());
1640 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1641 mmr.apply_batch(&batch).unwrap();
1642 let digest = test_digest(LEAF_COUNT + i);
1643 leaves.push(digest);
1644 let batch = mmr
1645 .new_batch()
1646 .add(&hasher, leaves.last().unwrap())
1647 .add(&hasher, leaves.last().unwrap());
1648 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1649 mmr.apply_batch(&batch).unwrap();
1650 }
1651 let end_size = mmr.size();
1652 let total_to_write = (*end_size - *start_size) as usize;
1653 let partial_write_limit = i % total_to_write;
1654 mmr.simulate_partial_sync(partial_write_limit)
1655 .await
1656 .unwrap();
1657 }
1658
1659 let mmr = Journaled::<F, _, Digest>::init(
1660 context.with_label("final"),
1661 &hasher,
1662 test_config(&context),
1663 )
1664 .await
1665 .unwrap();
1666 mmr.destroy().await.unwrap();
1667 }
1668
1669 #[test_traced("WARN")]
1670 fn test_journaled_recovery_with_pruning_mmr() {
1671 let executor = deterministic::Runner::default();
1672 executor.start(journaled_recovery_with_pruning_inner::<mmr::Family>);
1673 }
1674
1675 #[test_traced("WARN")]
1676 fn test_journaled_recovery_with_pruning_mmb() {
1677 let executor = deterministic::Runner::default();
1678 executor.start(journaled_recovery_with_pruning_inner::<mmb::Family>);
1679 }
1680
1681 async fn journaled_historical_proof_basic_inner<F: Family>(context: deterministic::Context) {
1682 let hasher = Standard::<Sha256>::new();
1684 let cfg = test_config(&context);
1685 let mut mmr = Journaled::<F, _, Digest>::init(context, &hasher, cfg)
1686 .await
1687 .unwrap();
1688 let mut elements = Vec::new();
1689 for i in 0..10 {
1690 elements.push(test_digest(i));
1691 }
1692 let mut batch = mmr.new_batch();
1693 for elt in &elements {
1694 batch = batch.add(&hasher, elt);
1695 }
1696 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1697 mmr.apply_batch(&batch).unwrap();
1698 let original_leaves = mmr.leaves();
1699
1700 let historical_proof = mmr
1702 .historical_range_proof(
1703 &hasher,
1704 original_leaves,
1705 Location::<F>::new(2)..Location::<F>::new(6),
1706 )
1707 .await
1708 .unwrap();
1709 assert_eq!(historical_proof.leaves, original_leaves);
1710 let root = mmr.root();
1711 assert!(historical_proof.verify_range_inclusion(
1712 &hasher,
1713 &elements[2..6],
1714 Location::<F>::new(2),
1715 &root
1716 ));
1717 let regular_proof = mmr
1718 .range_proof(&hasher, Location::<F>::new(2)..Location::<F>::new(6))
1719 .await
1720 .unwrap();
1721 assert_eq!(regular_proof.leaves, historical_proof.leaves);
1722 assert_eq!(regular_proof.digests, historical_proof.digests);
1723
1724 for i in 10..20 {
1726 elements.push(test_digest(i));
1727 }
1728 let mut batch = mmr.new_batch();
1729 for elt in &elements[10..20] {
1730 batch = batch.add(&hasher, elt);
1731 }
1732 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1733 mmr.apply_batch(&batch).unwrap();
1734 let new_historical_proof = mmr
1735 .historical_range_proof(
1736 &hasher,
1737 original_leaves,
1738 Location::<F>::new(2)..Location::<F>::new(6),
1739 )
1740 .await
1741 .unwrap();
1742 assert_eq!(new_historical_proof.leaves, historical_proof.leaves);
1743 assert_eq!(new_historical_proof.digests, historical_proof.digests);
1744
1745 mmr.destroy().await.unwrap();
1746 }
1747
1748 #[test_traced]
1749 fn test_journaled_historical_proof_basic_mmr() {
1750 let executor = deterministic::Runner::default();
1751 executor.start(journaled_historical_proof_basic_inner::<mmr::Family>);
1752 }
1753
1754 #[test_traced]
1755 fn test_journaled_historical_proof_basic_mmb() {
1756 let executor = deterministic::Runner::default();
1757 executor.start(journaled_historical_proof_basic_inner::<mmb::Family>);
1758 }
1759
1760 async fn journaled_historical_proof_with_pruning_inner<F: Family>(
1761 context: deterministic::Context,
1762 ) {
1763 let hasher = Standard::<Sha256>::new();
1764 let mut mmr = Journaled::<F, _, Digest>::init(
1765 context.with_label("main"),
1766 &hasher,
1767 test_config(&context),
1768 )
1769 .await
1770 .unwrap();
1771
1772 let mut elements = Vec::new();
1774 for i in 0..50 {
1775 elements.push(test_digest(i));
1776 }
1777 let mut batch = mmr.new_batch();
1778 for elt in &elements {
1779 batch = batch.add(&hasher, elt);
1780 }
1781 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1782 mmr.apply_batch(&batch).unwrap();
1783
1784 let prune_loc = Location::<F>::new(16);
1786 mmr.prune(prune_loc).await.unwrap();
1787
1788 let mut ref_mmr = Journaled::<F, _, Digest>::init(
1790 context.with_label("ref"),
1791 &hasher,
1792 Config {
1793 journal_partition: "ref-journal-pruned".into(),
1794 metadata_partition: "ref-metadata-pruned".into(),
1795 items_per_blob: NZU64!(7),
1796 write_buffer: NZUsize!(1024),
1797 thread_pool: None,
1798 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1799 },
1800 )
1801 .await
1802 .unwrap();
1803
1804 let mut batch = ref_mmr.new_batch();
1805 for elt in elements.iter().take(41) {
1806 batch = batch.add(&hasher, elt);
1807 }
1808 let batch = ref_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1809 ref_mmr.apply_batch(&batch).unwrap();
1810 let historical_leaves = ref_mmr.leaves();
1811 let historical_root = ref_mmr.root();
1812
1813 let historical_proof = mmr
1815 .historical_range_proof(
1816 &hasher,
1817 historical_leaves,
1818 Location::<F>::new(35)..Location::<F>::new(39),
1819 )
1820 .await
1821 .unwrap();
1822
1823 assert_eq!(historical_proof.leaves, historical_leaves);
1824
1825 assert!(historical_proof.verify_range_inclusion(
1827 &hasher,
1828 &elements[35..39],
1829 Location::<F>::new(35),
1830 &historical_root
1831 ));
1832
1833 ref_mmr.destroy().await.unwrap();
1834 mmr.destroy().await.unwrap();
1835 }
1836
1837 #[test_traced]
1838 fn test_journaled_historical_proof_with_pruning_mmr() {
1839 let executor = deterministic::Runner::default();
1840 executor.start(journaled_historical_proof_with_pruning_inner::<mmr::Family>);
1841 }
1842
1843 #[test_traced]
1844 fn test_journaled_historical_proof_with_pruning_mmb() {
1845 let executor = deterministic::Runner::default();
1846 executor.start(journaled_historical_proof_with_pruning_inner::<mmb::Family>);
1847 }
1848
1849 async fn journaled_historical_proof_large_inner<F: Family>(context: deterministic::Context) {
1850 let hasher = Standard::<Sha256>::new();
1851
1852 let mut mmr = Journaled::<F, _, Digest>::init(
1853 context.with_label("server"),
1854 &hasher,
1855 Config {
1856 journal_partition: "server-journal".into(),
1857 metadata_partition: "server-metadata".into(),
1858 items_per_blob: NZU64!(7),
1859 write_buffer: NZUsize!(1024),
1860 thread_pool: None,
1861 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1862 },
1863 )
1864 .await
1865 .unwrap();
1866
1867 let mut elements = Vec::new();
1868 for i in 0..100 {
1869 elements.push(test_digest(i));
1870 }
1871 let mut batch = mmr.new_batch();
1872 for elt in &elements {
1873 batch = batch.add(&hasher, elt);
1874 }
1875 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1876 mmr.apply_batch(&batch).unwrap();
1877
1878 let range = Location::<F>::new(30)..Location::<F>::new(61);
1879
1880 let mut ref_mmr = Journaled::<F, _, Digest>::init(
1882 context.with_label("client"),
1883 &hasher,
1884 Config {
1885 journal_partition: "client-journal".into(),
1886 metadata_partition: "client-metadata".into(),
1887 items_per_blob: NZU64!(7),
1888 write_buffer: NZUsize!(1024),
1889 thread_pool: None,
1890 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
1891 },
1892 )
1893 .await
1894 .unwrap();
1895
1896 let mut batch = ref_mmr.new_batch();
1898 for elt in elements.iter().take(*range.end as usize) {
1899 batch = batch.add(&hasher, elt);
1900 }
1901 let batch = ref_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1902 ref_mmr.apply_batch(&batch).unwrap();
1903 let historical_leaves = ref_mmr.leaves();
1904 let expected_root = ref_mmr.root();
1905
1906 let proof = mmr
1908 .historical_range_proof(&hasher, historical_leaves, range.clone())
1909 .await
1910 .unwrap();
1911
1912 assert!(proof.verify_range_inclusion(
1913 &hasher,
1914 &elements[range.to_usize_range()],
1915 range.start,
1916 &expected_root ));
1918
1919 ref_mmr.destroy().await.unwrap();
1920 mmr.destroy().await.unwrap();
1921 }
1922
1923 #[test_traced]
1924 fn test_journaled_historical_proof_large_mmr() {
1925 let executor = deterministic::Runner::default();
1926 executor.start(journaled_historical_proof_large_inner::<mmr::Family>);
1927 }
1928
1929 #[test_traced]
1930 fn test_journaled_historical_proof_large_mmb() {
1931 let executor = deterministic::Runner::default();
1932 executor.start(journaled_historical_proof_large_inner::<mmb::Family>);
1933 }
1934
1935 async fn journaled_historical_proof_singleton_inner<F: Family>(
1936 context: deterministic::Context,
1937 ) {
1938 let hasher = Standard::<Sha256>::new();
1939 let cfg = test_config(&context);
1940 let mut mmr = Journaled::<F, _, Digest>::init(context, &hasher, cfg)
1941 .await
1942 .unwrap();
1943
1944 let element = test_digest(0);
1945 let batch = mmr.new_batch().add(&hasher, &element);
1946 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
1947 mmr.apply_batch(&batch).unwrap();
1948
1949 let single_proof = mmr
1951 .historical_range_proof(
1952 &hasher,
1953 Location::<F>::new(1),
1954 Location::<F>::new(0)..Location::<F>::new(1),
1955 )
1956 .await
1957 .unwrap();
1958
1959 let root = mmr.root();
1960 assert!(single_proof.verify_range_inclusion(
1961 &hasher,
1962 &[element],
1963 Location::<F>::new(0),
1964 &root
1965 ));
1966
1967 mmr.destroy().await.unwrap();
1968 }
1969
1970 #[test_traced]
1971 fn test_journaled_historical_proof_singleton_mmr() {
1972 let executor = deterministic::Runner::default();
1973 executor.start(journaled_historical_proof_singleton_inner::<mmr::Family>);
1974 }
1975
1976 #[test_traced]
1977 fn test_journaled_historical_proof_singleton_mmb() {
1978 let executor = deterministic::Runner::default();
1979 executor.start(journaled_historical_proof_singleton_inner::<mmb::Family>);
1980 }
1981
1982 async fn journaled_init_sync_empty_inner<F: Family>(context: deterministic::Context) {
1984 let hasher = Standard::<Sha256>::new();
1985
1986 let sync_cfg = SyncConfig::<F, sha256::Digest> {
1988 config: test_config(&context),
1989 range: Location::<F>::new(0)..Location::<F>::new(52),
1990 pinned_nodes: None,
1991 };
1992
1993 let mut sync_mmr = Journaled::<F, _, Digest>::init_sync(context.clone(), sync_cfg, &hasher)
1994 .await
1995 .unwrap();
1996
1997 assert_eq!(sync_mmr.size(), 0);
1999 let bounds = sync_mmr.bounds();
2000 assert_eq!(bounds.start, 0);
2001 assert!(bounds.is_empty());
2002
2003 let new_element = test_digest(999);
2005 let batch = sync_mmr.new_batch().add(&hasher, &new_element);
2006 let batch = sync_mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2007 sync_mmr.apply_batch(&batch).unwrap();
2008
2009 let _root = sync_mmr.root();
2011
2012 sync_mmr.destroy().await.unwrap();
2013 }
2014
2015 #[test_traced]
2016 fn test_journaled_init_sync_empty_mmr() {
2017 let executor = deterministic::Runner::default();
2018 executor.start(journaled_init_sync_empty_inner::<mmr::Family>);
2019 }
2020
2021 #[test_traced]
2022 fn test_journaled_init_sync_empty_mmb() {
2023 let executor = deterministic::Runner::default();
2024 executor.start(journaled_init_sync_empty_inner::<mmb::Family>);
2025 }
2026
2027 async fn journaled_init_sync_nonempty_exact_match_inner<F: Family>(
2029 context: deterministic::Context,
2030 ) {
2031 let hasher = Standard::<Sha256>::new();
2032
2033 let mut mmr = Journaled::<F, _, Digest>::init(
2035 context.with_label("init"),
2036 &hasher,
2037 test_config(&context),
2038 )
2039 .await
2040 .unwrap();
2041 let mut batch = mmr.new_batch();
2042 for i in 0..50 {
2043 batch = batch.add(&hasher, &test_digest(i));
2044 }
2045 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2046 mmr.apply_batch(&batch).unwrap();
2047 mmr.sync().await.unwrap();
2048 let original_size = mmr.size();
2049 let original_leaves = mmr.leaves();
2050 let original_root = mmr.root();
2051
2052 let lower_bound_loc = mmr.bounds().start;
2054 let upper_bound_loc = mmr.leaves();
2055 let lower_bound_pos = Position::<F>::try_from(lower_bound_loc).unwrap();
2056 let upper_bound_pos = mmr.size();
2057 let mut expected_nodes = BTreeMap::new();
2058 for i in *lower_bound_pos..*upper_bound_pos {
2059 expected_nodes.insert(
2060 Position::<F>::new(i),
2061 mmr.get_node(Position::<F>::new(i)).await.unwrap().unwrap(),
2062 );
2063 }
2064 let sync_cfg = SyncConfig::<F, sha256::Digest> {
2065 config: test_config(&context),
2066 range: lower_bound_loc..upper_bound_loc,
2067 pinned_nodes: None,
2068 };
2069
2070 mmr.sync().await.unwrap();
2071 drop(mmr);
2072
2073 let sync_mmr =
2074 Journaled::<F, _, Digest>::init_sync(context.with_label("sync"), sync_cfg, &hasher)
2075 .await
2076 .unwrap();
2077
2078 assert_eq!(sync_mmr.size(), original_size);
2080 assert_eq!(sync_mmr.leaves(), original_leaves);
2081 let bounds = sync_mmr.bounds();
2082 assert_eq!(bounds.start, lower_bound_loc);
2083 assert!(!bounds.is_empty());
2084 assert_eq!(sync_mmr.root(), original_root);
2085 for pos in *lower_bound_pos..*upper_bound_pos {
2086 let pos = Position::<F>::new(pos);
2087 assert_eq!(
2088 sync_mmr.get_node(pos).await.unwrap(),
2089 expected_nodes.get(&pos).cloned()
2090 );
2091 }
2092
2093 sync_mmr.destroy().await.unwrap();
2094 }
2095
2096 #[test_traced]
2097 fn test_journaled_init_sync_nonempty_exact_match_mmr() {
2098 let executor = deterministic::Runner::default();
2099 executor.start(journaled_init_sync_nonempty_exact_match_inner::<mmr::Family>);
2100 }
2101
2102 #[test_traced]
2103 fn test_journaled_init_sync_nonempty_exact_match_mmb() {
2104 let executor = deterministic::Runner::default();
2105 executor.start(journaled_init_sync_nonempty_exact_match_inner::<mmb::Family>);
2106 }
2107
2108 async fn journaled_init_sync_partial_overlap_inner<F: Family>(context: deterministic::Context) {
2111 let hasher = Standard::<Sha256>::new();
2112
2113 let mut mmr = Journaled::<F, _, Digest>::init(
2115 context.with_label("init"),
2116 &hasher,
2117 test_config(&context),
2118 )
2119 .await
2120 .unwrap();
2121 let mut batch = mmr.new_batch();
2122 for i in 0..30 {
2123 batch = batch.add(&hasher, &test_digest(i));
2124 }
2125 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2126 mmr.apply_batch(&batch).unwrap();
2127 mmr.sync().await.unwrap();
2128 mmr.prune(Location::<F>::new(6)).await.unwrap();
2129
2130 let original_size = mmr.size();
2131 let original_leaves = mmr.leaves();
2132 let original_root = mmr.root();
2133 let original_pruning_boundary = mmr.bounds().start;
2134 let original_pruning_pos = Position::<F>::try_from(original_pruning_boundary).unwrap();
2135
2136 let lower_bound_loc = original_pruning_boundary;
2138 let upper_bound_loc = original_leaves + 6; let mut expected_nodes = BTreeMap::new();
2141 for i in *original_pruning_pos..*original_size {
2142 let pos = Position::<F>::new(i);
2143 expected_nodes.insert(pos, mmr.get_node(pos).await.unwrap().unwrap());
2144 }
2145
2146 let sync_cfg = SyncConfig::<F, sha256::Digest> {
2147 config: test_config(&context),
2148 range: lower_bound_loc..upper_bound_loc,
2149 pinned_nodes: None,
2150 };
2151
2152 mmr.sync().await.unwrap();
2153 drop(mmr);
2154
2155 let sync_mmr =
2156 Journaled::<F, _, Digest>::init_sync(context.with_label("sync"), sync_cfg, &hasher)
2157 .await
2158 .unwrap();
2159
2160 assert_eq!(sync_mmr.size(), original_size);
2162 let bounds = sync_mmr.bounds();
2163 assert_eq!(bounds.start, lower_bound_loc);
2164 assert!(!bounds.is_empty());
2165 assert_eq!(sync_mmr.root(), original_root);
2166
2167 for i in *original_pruning_pos..*original_size {
2169 let pos = Position::<F>::new(i);
2170 assert_eq!(
2171 sync_mmr.get_node(pos).await.unwrap(),
2172 expected_nodes.get(&pos).cloned()
2173 );
2174 }
2175
2176 sync_mmr.destroy().await.unwrap();
2177 }
2178
2179 #[test_traced]
2180 fn test_journaled_init_sync_partial_overlap_mmr() {
2181 let executor = deterministic::Runner::default();
2182 executor.start(journaled_init_sync_partial_overlap_inner::<mmr::Family>);
2183 }
2184
2185 #[test_traced]
2186 fn test_journaled_init_sync_partial_overlap_mmb() {
2187 let executor = deterministic::Runner::default();
2188 executor.start(journaled_init_sync_partial_overlap_inner::<mmb::Family>);
2189 }
2190
2191 async fn journaled_init_sync_rejects_extra_pinned_nodes_inner<F: Family>(
2192 context: deterministic::Context,
2193 ) {
2194 let hasher = Standard::<Sha256>::new();
2195
2196 let sync_cfg = SyncConfig::<F, sha256::Digest> {
2197 config: test_config(&context),
2198 range: Location::<F>::new(6)..Location::<F>::new(20),
2199 pinned_nodes: Some(vec![test_digest(1), test_digest(2), test_digest(3)]),
2200 };
2201
2202 let result =
2203 Journaled::<F, _, Digest>::init_sync(context.with_label("sync"), sync_cfg, &hasher)
2204 .await;
2205 assert!(matches!(result, Err(Error::InvalidPinnedNodes)));
2206 }
2207
2208 #[test_traced]
2209 fn test_journaled_init_sync_rejects_extra_pinned_nodes_mmr() {
2210 let executor = deterministic::Runner::default();
2211 executor.start(journaled_init_sync_rejects_extra_pinned_nodes_inner::<mmr::Family>);
2212 }
2213
2214 #[test_traced]
2215 fn test_journaled_init_sync_rejects_extra_pinned_nodes_mmb() {
2216 let executor = deterministic::Runner::default();
2217 executor.start(journaled_init_sync_rejects_extra_pinned_nodes_inner::<mmb::Family>);
2218 }
2219
2220 async fn journaled_init_stale_metadata_returns_error_inner<F: Family>(
2224 context: deterministic::Context,
2225 ) {
2226 let hasher = Standard::<Sha256>::new();
2227
2228 let mut mmr = Journaled::<F, _, Digest>::init(
2230 context.with_label("init"),
2231 &hasher,
2232 test_config(&context),
2233 )
2234 .await
2235 .unwrap();
2236
2237 let mut batch = mmr.new_batch();
2239 for i in 0..50 {
2240 batch = batch.add(&hasher, &test_digest(i));
2241 }
2242 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2243 mmr.apply_batch(&batch).unwrap();
2244 mmr.sync().await.unwrap();
2245
2246 let prune_loc = Location::<F>::new(25);
2248 mmr.prune(prune_loc).await.unwrap();
2249 drop(mmr);
2250
2251 let meta_cfg = MConfig {
2254 partition: test_config(&context).metadata_partition,
2255 codec_config: ((0..).into(), ()),
2256 };
2257 let mut metadata =
2258 Metadata::<_, U64, Vec<u8>>::init(context.with_label("meta_tamper"), meta_cfg)
2259 .await
2260 .unwrap();
2261 metadata.clear();
2262 let key = U64::new(PRUNED_TO_PREFIX, 0);
2263 metadata.put(key, 0u64.to_be_bytes().to_vec());
2264 metadata.sync().await.unwrap();
2265 drop(metadata);
2266
2267 let result = Journaled::<F, _, Digest>::init(
2272 context.with_label("reopened"),
2273 &hasher,
2274 test_config(&context),
2275 )
2276 .await;
2277
2278 match result {
2279 Err(Error::MissingNode(_)) => {} Ok(_) => panic!("expected MissingNode error, got Ok"),
2281 Err(e) => panic!("expected MissingNode error, got {:?}", e),
2282 }
2283 }
2284
2285 #[test_traced("WARN")]
2286 fn test_journaled_init_stale_metadata_returns_error_mmr() {
2287 let executor = deterministic::Runner::default();
2288 executor.start(journaled_init_stale_metadata_returns_error_inner::<mmr::Family>);
2289 }
2290
2291 #[test_traced("WARN")]
2292 fn test_journaled_init_stale_metadata_returns_error_mmb() {
2293 let executor = deterministic::Runner::default();
2294 executor.start(journaled_init_stale_metadata_returns_error_inner::<mmb::Family>);
2295 }
2296
2297 async fn journaled_init_metadata_ahead_inner<F: Family>(context: deterministic::Context) {
2301 let hasher = Standard::<Sha256>::new();
2302
2303 let mut mmr = Journaled::<F, _, Digest>::init(
2305 context.with_label("init"),
2306 &hasher,
2307 test_config(&context),
2308 )
2309 .await
2310 .unwrap();
2311
2312 let mut batch = mmr.new_batch();
2314 for i in 0..50 {
2315 batch = batch.add(&hasher, &test_digest(i));
2316 }
2317 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2318 mmr.apply_batch(&batch).unwrap();
2319 mmr.sync().await.unwrap();
2320
2321 let prune_loc = Location::<F>::new(16);
2323 mmr.prune(prune_loc).await.unwrap();
2324 let expected_root = mmr.root();
2325 let expected_size = mmr.size();
2326 drop(mmr);
2327
2328 let mmr = Journaled::<F, _, Digest>::init(
2331 context.with_label("reopened"),
2332 &hasher,
2333 test_config(&context),
2334 )
2335 .await
2336 .unwrap();
2337
2338 assert_eq!(mmr.bounds().start, prune_loc);
2339 assert_eq!(mmr.size(), expected_size);
2340 assert_eq!(mmr.root(), expected_root);
2341
2342 mmr.destroy().await.unwrap();
2343 }
2344
2345 #[test_traced("WARN")]
2346 fn test_journaled_init_metadata_ahead_mmr() {
2347 let executor = deterministic::Runner::default();
2348 executor.start(journaled_init_metadata_ahead_inner::<mmr::Family>);
2349 }
2350
2351 #[test_traced("WARN")]
2352 fn test_journaled_init_metadata_ahead_mmb() {
2353 let executor = deterministic::Runner::default();
2354 executor.start(journaled_init_metadata_ahead_inner::<mmb::Family>);
2355 }
2356
2357 async fn journaled_init_sync_computes_pinned_nodes_before_pruning_inner<F: Family>(
2364 context: deterministic::Context,
2365 ) {
2366 let hasher = Standard::<Sha256>::new();
2367
2368 let cfg = Config {
2370 journal_partition: "mmr-journal".into(),
2371 metadata_partition: "mmr-metadata".into(),
2372 items_per_blob: NZU64!(7),
2373 write_buffer: NZUsize!(64),
2374 thread_pool: None,
2375 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2376 };
2377
2378 let mut mmr =
2380 Journaled::<F, _, Digest>::init(context.with_label("init"), &hasher, cfg.clone())
2381 .await
2382 .unwrap();
2383 let mut batch = mmr.new_batch();
2384 for i in 0..100 {
2385 batch = batch.add(&hasher, &test_digest(i));
2386 }
2387 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2388 mmr.apply_batch(&batch).unwrap();
2389 mmr.sync().await.unwrap();
2390
2391 let original_size = mmr.size();
2394 let original_root = mmr.root();
2395 drop(mmr);
2396
2397 let prune_loc = Location::<F>::new(32);
2400 let sync_cfg = SyncConfig::<F, sha256::Digest> {
2401 config: cfg,
2402 range: prune_loc..Location::<F>::new(128),
2403 pinned_nodes: None, };
2405
2406 let sync_mmr =
2407 Journaled::<F, _, Digest>::init_sync(context.with_label("sync"), sync_cfg, &hasher)
2408 .await
2409 .unwrap();
2410
2411 assert_eq!(sync_mmr.size(), original_size);
2413 assert_eq!(sync_mmr.root(), original_root);
2414 assert_eq!(sync_mmr.bounds().start, prune_loc);
2415
2416 sync_mmr.destroy().await.unwrap();
2417 }
2418
2419 #[test_traced]
2420 fn test_journaled_init_sync_computes_pinned_nodes_before_pruning_mmr() {
2421 let executor = deterministic::Runner::default();
2422 executor
2423 .start(journaled_init_sync_computes_pinned_nodes_before_pruning_inner::<mmr::Family>);
2424 }
2425
2426 #[test_traced]
2427 fn test_journaled_init_sync_computes_pinned_nodes_before_pruning_mmb() {
2428 let executor = deterministic::Runner::default();
2429 executor
2430 .start(journaled_init_sync_computes_pinned_nodes_before_pruning_inner::<mmb::Family>);
2431 }
2432
2433 async fn journaled_historical_proof_pruned_elements_inner<F: Family>(
2434 context: deterministic::Context,
2435 ) {
2436 let hasher = Standard::<Sha256>::new();
2437
2438 let mut mmr = Journaled::<F, _, Digest>::init(
2439 context.with_label("init"),
2440 &hasher,
2441 test_config(&context),
2442 )
2443 .await
2444 .unwrap();
2445
2446 let mut batch = mmr.new_batch();
2447 for i in 0..64 {
2448 batch = batch.add(&hasher, &test_digest(i));
2449 }
2450 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2451 mmr.apply_batch(&batch).unwrap();
2452
2453 let prune_loc = Location::<F>::new(16);
2454 mmr.prune(prune_loc).await.unwrap();
2455
2456 let historical_leaves = mmr.leaves();
2457 let mut pruned_loc = None;
2458 for loc_u64 in 0..*historical_leaves {
2459 let loc = Location::<F>::new(loc_u64);
2460 let result = mmr
2461 .historical_range_proof(&hasher, historical_leaves, loc..loc + 1)
2462 .await;
2463 if matches!(result, Err(Error::ElementPruned(_))) {
2464 pruned_loc = Some(loc);
2465 break;
2466 }
2467 }
2468 let pruned_loc = pruned_loc.expect("expected at least one pruned location");
2469
2470 let mut batch = mmr.new_batch();
2472 for i in 0..8 {
2473 batch = batch.add(&hasher, &test_digest(10_000 + i));
2474 }
2475 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2476 mmr.apply_batch(&batch).unwrap();
2477
2478 let requested = mmr.leaves();
2479 let result = mmr
2480 .historical_range_proof(&hasher, requested, pruned_loc..pruned_loc + 1)
2481 .await;
2482 assert!(matches!(result, Err(Error::ElementPruned(_))));
2483
2484 mmr.destroy().await.unwrap();
2485 }
2486
2487 #[test_traced]
2488 fn test_journaled_historical_proof_pruned_elements_mmr() {
2489 let executor = deterministic::Runner::default();
2490 executor.start(journaled_historical_proof_pruned_elements_inner::<mmr::Family>);
2491 }
2492
2493 #[test_traced]
2494 fn test_journaled_historical_proof_pruned_elements_mmb() {
2495 let executor = deterministic::Runner::default();
2496 executor.start(journaled_historical_proof_pruned_elements_inner::<mmb::Family>);
2497 }
2498
2499 async fn journaled_append_while_historical_proof_is_available_inner<F: Family>(
2500 context: deterministic::Context,
2501 ) {
2502 let hasher = Standard::<Sha256>::new();
2503 let mut mmr = Journaled::<F, _, Digest>::init(
2504 context.with_label("init"),
2505 &hasher,
2506 test_config(&context),
2507 )
2508 .await
2509 .unwrap();
2510
2511 let mut batch = mmr.new_batch();
2512 for i in 0..20 {
2513 batch = batch.add(&hasher, &test_digest(i));
2514 }
2515 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2516 mmr.apply_batch(&batch).unwrap();
2517
2518 let historical_leaves = Location::<F>::new(10);
2519 let range = Location::<F>::new(2)..Location::<F>::new(8);
2520
2521 let batch = mmr
2523 .new_batch()
2524 .add(&hasher, &test_digest(100))
2525 .add(&hasher, &test_digest(101));
2526 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2527 mmr.apply_batch(&batch).unwrap();
2528
2529 let proof = mmr
2530 .historical_range_proof(&hasher, historical_leaves, range.clone())
2531 .await
2532 .unwrap();
2533
2534 let expected = mmr
2535 .historical_range_proof(&hasher, historical_leaves, range)
2536 .await
2537 .unwrap();
2538 assert_eq!(proof, expected);
2539
2540 mmr.destroy().await.unwrap();
2541 }
2542
2543 #[test_traced]
2544 fn test_journaled_append_while_historical_proof_is_available_mmr() {
2545 let executor = deterministic::Runner::default();
2546 executor.start(journaled_append_while_historical_proof_is_available_inner::<mmr::Family>);
2547 }
2548
2549 #[test_traced]
2550 fn test_journaled_append_while_historical_proof_is_available_mmb() {
2551 let executor = deterministic::Runner::default();
2552 executor.start(journaled_append_while_historical_proof_is_available_inner::<mmb::Family>);
2553 }
2554
2555 async fn journaled_historical_proof_after_sync_reads_from_journal_inner<F: Family>(
2556 context: deterministic::Context,
2557 ) {
2558 let hasher = Standard::<Sha256>::new();
2559 let mut mmr = Journaled::<F, _, Digest>::init(
2560 context.with_label("init"),
2561 &hasher,
2562 test_config(&context),
2563 )
2564 .await
2565 .unwrap();
2566
2567 let mut batch = mmr.new_batch();
2568 for i in 0..64 {
2569 batch = batch.add(&hasher, &test_digest(i));
2570 }
2571 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2572 mmr.apply_batch(&batch).unwrap();
2573 mmr.sync().await.unwrap();
2574
2575 let historical_leaves = Location::<F>::new(20);
2576 let range = Location::<F>::new(5)..Location::<F>::new(15);
2577 let expected = mmr
2578 .historical_range_proof(&hasher, historical_leaves, range.clone())
2579 .await
2580 .unwrap();
2581
2582 let actual = mmr
2583 .historical_range_proof(&hasher, historical_leaves, range)
2584 .await
2585 .unwrap();
2586 assert_eq!(actual, expected);
2587
2588 mmr.destroy().await.unwrap();
2589 }
2590
2591 #[test_traced]
2592 fn test_journaled_historical_proof_after_sync_reads_from_journal_mmr() {
2593 let executor = deterministic::Runner::default();
2594 executor
2595 .start(journaled_historical_proof_after_sync_reads_from_journal_inner::<mmr::Family>);
2596 }
2597
2598 #[test_traced]
2599 fn test_journaled_historical_proof_after_sync_reads_from_journal_mmb() {
2600 let executor = deterministic::Runner::default();
2601 executor
2602 .start(journaled_historical_proof_after_sync_reads_from_journal_inner::<mmb::Family>);
2603 }
2604
2605 async fn journaled_historical_proof_after_pruning_inner<F: Family>(
2606 context: deterministic::Context,
2607 ) {
2608 let hasher = Standard::<Sha256>::new();
2609 let mut mmr = Journaled::<F, _, Digest>::init(
2610 context.with_label("init"),
2611 &hasher,
2612 test_config(&context),
2613 )
2614 .await
2615 .unwrap();
2616
2617 let mut batch = mmr.new_batch();
2618 for i in 0..30 {
2619 batch = batch.add(&hasher, &test_digest(i));
2620 }
2621 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2622 mmr.apply_batch(&batch).unwrap();
2623
2624 let prune_loc = Location::<F>::new(10);
2625 mmr.prune(prune_loc).await.unwrap();
2626
2627 let requested = Location::<F>::new(20);
2628 let range = prune_loc..requested;
2629 let proof = mmr
2630 .historical_range_proof(&hasher, requested, range)
2631 .await
2632 .unwrap();
2633 assert!(proof.leaves > Location::<F>::new(0));
2634
2635 mmr.destroy().await.unwrap();
2636 }
2637
2638 #[test_traced]
2639 fn test_journaled_historical_proof_after_pruning_mmr() {
2640 let executor = deterministic::Runner::default();
2641 executor.start(journaled_historical_proof_after_pruning_inner::<mmr::Family>);
2642 }
2643
2644 #[test_traced]
2645 fn test_journaled_historical_proof_after_pruning_mmb() {
2646 let executor = deterministic::Runner::default();
2647 executor.start(journaled_historical_proof_after_pruning_inner::<mmb::Family>);
2648 }
2649
2650 async fn journaled_historical_proof_edge_cases_inner<F: Family>(
2651 context: deterministic::Context,
2652 ) {
2653 let hasher = Standard::<Sha256>::new();
2654
2655 let mmr = Journaled::<F, _, Digest>::init(
2657 context.with_label("empty"),
2658 &hasher,
2659 test_config(&context),
2660 )
2661 .await
2662 .unwrap();
2663 let empty_end = Location::<F>::new(0);
2664 let empty_result = mmr
2665 .historical_range_proof(&hasher, empty_end, empty_end..empty_end)
2666 .await;
2667 assert!(matches!(empty_result, Err(Error::Empty)));
2668 let oob_result = mmr
2669 .historical_range_proof(&hasher, empty_end + 1, empty_end..empty_end + 1)
2670 .await;
2671 assert!(matches!(
2672 oob_result,
2673 Err(Error::RangeOutOfBounds(loc)) if loc == empty_end + 1
2674 ));
2675 mmr.destroy().await.unwrap();
2676
2677 let mut mmr = Journaled::<F, _, Digest>::init(
2679 context.with_label("fully_pruned"),
2680 &hasher,
2681 test_config(&context),
2682 )
2683 .await
2684 .unwrap();
2685 let mut batch = mmr.new_batch();
2686 for i in 0..20 {
2687 batch = batch.add(&hasher, &test_digest(i));
2688 }
2689 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2690 mmr.apply_batch(&batch).unwrap();
2691 let end = mmr.leaves();
2692 mmr.prune_all().await.unwrap();
2693 assert!(mmr.bounds().is_empty());
2694 let pruned_result = mmr.historical_range_proof(&hasher, end, end - 1..end).await;
2695 assert!(matches!(pruned_result, Err(Error::ElementPruned(_))));
2696 let oob_result = mmr
2697 .historical_range_proof(&hasher, end + 1, end - 1..end)
2698 .await;
2699 assert!(matches!(
2700 oob_result,
2701 Err(Error::RangeOutOfBounds(loc)) if loc == end + 1
2702 ));
2703 mmr.destroy().await.unwrap();
2704
2705 let mut mmr = Journaled::<F, _, Digest>::init(
2707 context.with_label("single_leaf"),
2708 &hasher,
2709 test_config(&context),
2710 )
2711 .await
2712 .unwrap();
2713 let mut batch = mmr.new_batch();
2714 for i in 0..11 {
2715 batch = batch.add(&hasher, &test_digest(i));
2716 }
2717 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2718 mmr.apply_batch(&batch).unwrap();
2719 let end = mmr.leaves();
2720 let keep_loc = end - 1;
2721 mmr.prune(keep_loc).await.unwrap();
2722 let ok_result = mmr
2723 .historical_range_proof(&hasher, end, keep_loc..end)
2724 .await;
2725 assert!(ok_result.is_ok());
2726 let pruned_end = keep_loc - 1;
2727 let start_loc = Location::<F>::new(1);
2729 let pruned_result = mmr
2730 .historical_range_proof(&hasher, end, start_loc..pruned_end + 1)
2731 .await;
2732 assert!(matches!(pruned_result, Err(Error::ElementPruned(_))));
2733 let oob_result = mmr
2734 .historical_range_proof(&hasher, end + 1, keep_loc..end)
2735 .await;
2736 assert!(matches!(oob_result, Err(Error::RangeOutOfBounds(_))));
2737 mmr.destroy().await.unwrap();
2738 }
2739
2740 #[test_traced]
2741 fn test_journaled_historical_proof_edge_cases_mmr() {
2742 let executor = deterministic::Runner::default();
2743 executor.start(journaled_historical_proof_edge_cases_inner::<mmr::Family>);
2744 }
2745
2746 #[test_traced]
2747 fn test_journaled_historical_proof_edge_cases_mmb() {
2748 let executor = deterministic::Runner::default();
2749 executor.start(journaled_historical_proof_edge_cases_inner::<mmb::Family>);
2750 }
2751
2752 async fn journaled_historical_proof_out_of_bounds_inner<F: Family>(
2753 context: deterministic::Context,
2754 ) {
2755 let hasher = Standard::<Sha256>::new();
2756 let mut mmr = Journaled::<F, _, Digest>::init(
2757 context.with_label("oob"),
2758 &hasher,
2759 test_config(&context),
2760 )
2761 .await
2762 .unwrap();
2763
2764 let mut batch = mmr.new_batch();
2765 for i in 0..8 {
2766 batch = batch.add(&hasher, &test_digest(i));
2767 }
2768 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2769 mmr.apply_batch(&batch).unwrap();
2770 let requested = mmr.leaves() + 1;
2771
2772 let result = mmr
2773 .historical_range_proof(&hasher, requested, Location::<F>::new(0)..requested)
2774 .await;
2775 assert!(matches!(
2776 result,
2777 Err(Error::RangeOutOfBounds(loc)) if loc == requested
2778 ));
2779
2780 mmr.destroy().await.unwrap();
2781 }
2782
2783 #[test_traced]
2784 fn test_journaled_historical_proof_out_of_bounds_mmr() {
2785 let executor = deterministic::Runner::default();
2786 executor.start(journaled_historical_proof_out_of_bounds_inner::<mmr::Family>);
2787 }
2788
2789 #[test_traced]
2790 fn test_journaled_historical_proof_out_of_bounds_mmb() {
2791 let executor = deterministic::Runner::default();
2792 executor.start(journaled_historical_proof_out_of_bounds_inner::<mmb::Family>);
2793 }
2794
2795 async fn journaled_historical_proof_range_validation_inner<F: Family>(
2796 context: deterministic::Context,
2797 ) {
2798 let hasher = Standard::<Sha256>::new();
2799 let mut mmr = Journaled::<F, _, Digest>::init(
2800 context.with_label("range_validation"),
2801 &hasher,
2802 test_config(&context),
2803 )
2804 .await
2805 .unwrap();
2806
2807 let mut batch = mmr.new_batch();
2808 for i in 0..32 {
2809 batch = batch.add(&hasher, &test_digest(i));
2810 }
2811 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2812 mmr.apply_batch(&batch).unwrap();
2813
2814 let valid_range = Location::<F>::new(0)..Location::<F>::new(1);
2815
2816 let requested = Location::<F>::new(5);
2818 let empty_range = requested..requested;
2819 let empty_result = mmr
2820 .historical_range_proof(&hasher, requested, empty_range)
2821 .await;
2822 assert!(matches!(empty_result, Err(Error::Empty)));
2823
2824 let leaves_oob = mmr.leaves() + 1;
2826 let result = mmr
2827 .historical_range_proof(&hasher, leaves_oob, valid_range.clone())
2828 .await;
2829 assert!(matches!(
2830 result,
2831 Err(Error::RangeOutOfBounds(loc)) if loc == leaves_oob
2832 ));
2833
2834 let end_oob = mmr.leaves() + 1;
2836 let range_oob = Location::<F>::new(0)..end_oob;
2837 let result = mmr
2838 .historical_range_proof(&hasher, requested, range_oob)
2839 .await;
2840 assert!(matches!(
2841 result,
2842 Err(Error::RangeOutOfBounds(loc)) if loc == end_oob
2843 ));
2844
2845 let range_end_gt_requested = requested + 1;
2847 let range_oob_at_requested = Location::<F>::new(0)..range_end_gt_requested;
2848 assert!(range_end_gt_requested <= mmr.leaves());
2849 let result = mmr
2850 .historical_range_proof(&hasher, requested, range_oob_at_requested)
2851 .await;
2852 assert!(matches!(
2853 result,
2854 Err(Error::RangeOutOfBounds(loc)) if loc == range_end_gt_requested
2855 ));
2856
2857 let overflow_loc = Location::<F>::new(u64::MAX);
2860 let overflow_range = Location::<F>::new(0)..overflow_loc;
2861 let result = mmr
2862 .historical_range_proof(&hasher, requested, overflow_range)
2863 .await;
2864 assert!(matches!(
2865 result,
2866 Err(Error::RangeOutOfBounds(loc)) if loc == overflow_loc
2867 ));
2868
2869 mmr.destroy().await.unwrap();
2870 }
2871
2872 #[test_traced]
2873 fn test_journaled_historical_proof_range_validation_mmr() {
2874 let executor = deterministic::Runner::default();
2875 executor.start(journaled_historical_proof_range_validation_inner::<mmr::Family>);
2876 }
2877
2878 #[test_traced]
2879 fn test_journaled_historical_proof_range_validation_mmb() {
2880 let executor = deterministic::Runner::default();
2881 executor.start(journaled_historical_proof_range_validation_inner::<mmb::Family>);
2882 }
2883
2884 async fn journaled_historical_proof_non_size_prune_excludes_pruned_leaves_inner<F: Family>(
2885 context: deterministic::Context,
2886 ) {
2887 let hasher = Standard::<Sha256>::new();
2888 let mut mmr = Journaled::<F, _, Digest>::init(
2889 context.with_label("non_size_prune"),
2890 &hasher,
2891 test_config(&context),
2892 )
2893 .await
2894 .unwrap();
2895
2896 let mut batch = mmr.new_batch();
2897 for i in 0..16 {
2898 batch = batch.add(&hasher, &test_digest(i));
2899 }
2900 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2901 mmr.apply_batch(&batch).unwrap();
2902
2903 let end = mmr.leaves();
2904 let mut failures = Vec::new();
2905 for prune_leaf in 1..*end {
2906 let prune_loc = Location::<F>::new(prune_leaf);
2907 mmr.prune(prune_loc).await.unwrap();
2908 for loc_u64 in 0..*end {
2909 let loc = Location::<F>::new(loc_u64);
2910 let range_includes_pruned_leaf = loc < prune_loc;
2911 match mmr.historical_proof(&hasher, end, loc).await {
2912 Ok(_) => {}
2913 Err(Error::ElementPruned(_)) if range_includes_pruned_leaf => {}
2914 Err(Error::ElementPruned(_)) => failures.push(format!(
2915 "prune_loc={prune_loc} loc={loc} returned ElementPruned without a pruned range element"
2916 )),
2917 Err(err) => failures
2918 .push(format!("prune_loc={prune_loc} loc={loc} err={err}")),
2919 }
2920 }
2921 }
2922
2923 assert!(
2924 failures.is_empty(),
2925 "historical proof generation returned unexpected errors: {failures:?}"
2926 );
2927
2928 mmr.destroy().await.unwrap();
2929 }
2930
2931 #[test_traced]
2932 fn test_journaled_historical_proof_non_size_prune_excludes_pruned_leaves_mmr() {
2933 let executor = deterministic::Runner::default();
2934 executor.start(
2935 journaled_historical_proof_non_size_prune_excludes_pruned_leaves_inner::<mmr::Family>,
2936 );
2937 }
2938
2939 #[test_traced]
2940 fn test_journaled_historical_proof_non_size_prune_excludes_pruned_leaves_mmb() {
2941 let executor = deterministic::Runner::default();
2942 executor.start(
2943 journaled_historical_proof_non_size_prune_excludes_pruned_leaves_inner::<mmb::Family>,
2944 );
2945 }
2946
2947 async fn journaled_init_sync_recovers_from_invalid_journal_size_inner<F: Family>(
2950 context: deterministic::Context,
2951 ) {
2952 let hasher = Standard::<Sha256>::new();
2953
2954 let mut mmr = Journaled::<F, _, Digest>::init(
2956 context.with_label("init"),
2957 &hasher,
2958 test_config(&context),
2959 )
2960 .await
2961 .unwrap();
2962 let mut batch = mmr.new_batch();
2963 for i in 0..3 {
2964 batch = batch.add(&hasher, &test_digest(i));
2965 }
2966 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
2967 mmr.apply_batch(&batch).unwrap();
2968 let valid_size = mmr.size();
2969 let valid_root = mmr.root();
2970 mmr.sync().await.unwrap();
2971 drop(mmr);
2972
2973 {
2977 let journal: Journal<_, Digest> = Journal::init(
2978 context.with_label("corrupt"),
2979 JConfig {
2980 partition: "journal-partition".into(),
2981 items_per_blob: NZU64!(7),
2982 write_buffer: NZUsize!(1024),
2983 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
2984 },
2985 )
2986 .await
2987 .unwrap();
2988 assert_eq!(journal.size().await, valid_size);
2989 journal.append(&Sha256::hash(b"orphan")).await.unwrap();
2990 journal.sync().await.unwrap();
2991 assert_eq!(journal.size().await, valid_size + 1);
2992 }
2993
2994 let sync_cfg = SyncConfig::<F, Digest> {
2996 config: test_config(&context),
2997 range: Location::<F>::new(0)..Location::<F>::new(100),
2998 pinned_nodes: None,
2999 };
3000 let sync_mmr =
3001 Journaled::<F, _, Digest>::init_sync(context.with_label("sync"), sync_cfg, &hasher)
3002 .await
3003 .unwrap();
3004
3005 assert_eq!(sync_mmr.size(), valid_size);
3006 assert_eq!(sync_mmr.root(), valid_root);
3007
3008 sync_mmr.destroy().await.unwrap();
3009 }
3010
3011 #[test_traced]
3012 fn test_init_sync_recovers_from_invalid_journal_size_mmr() {
3013 let executor = deterministic::Runner::default();
3014 executor.start(journaled_init_sync_recovers_from_invalid_journal_size_inner::<mmr::Family>);
3015 }
3016
3017 #[test_traced]
3018 fn test_init_sync_recovers_from_invalid_journal_size_mmb() {
3019 let executor = deterministic::Runner::default();
3020 executor.start(journaled_init_sync_recovers_from_invalid_journal_size_inner::<mmb::Family>);
3021 }
3022
3023 async fn journaled_stale_batch_inner<F: Family>(context: deterministic::Context) {
3024 let hasher: Standard<Sha256> = Standard::new();
3025 let mut mmr = Journaled::<F, _, Digest>::init(
3026 context.clone(),
3027 &Standard::<Sha256>::new(),
3028 test_config(&context),
3029 )
3030 .await
3031 .unwrap();
3032
3033 let batch_a = mmr.new_batch().add(&hasher, b"leaf-a");
3035 let batch_a = mmr.with_mem(|mem| batch_a.merkleize(mem, &hasher));
3036 let batch_b = mmr.new_batch().add(&hasher, b"leaf-b");
3037 let batch_b = mmr.with_mem(|mem| batch_b.merkleize(mem, &hasher));
3038
3039 mmr.apply_batch(&batch_a).unwrap();
3041
3042 let result = mmr.apply_batch(&batch_b);
3044 assert!(
3045 matches!(result, Err(Error::StaleBatch { .. })),
3046 "expected StaleBatch, got {result:?}"
3047 );
3048
3049 mmr.destroy().await.unwrap();
3050 }
3051
3052 #[test]
3053 fn test_stale_batch_mmr() {
3054 let executor = deterministic::Runner::default();
3055 executor.start(journaled_stale_batch_inner::<mmr::Family>);
3056 }
3057
3058 #[test]
3059 fn test_stale_batch_mmb() {
3060 let executor = deterministic::Runner::default();
3061 executor.start(journaled_stale_batch_inner::<mmb::Family>);
3062 }
3063
3064 async fn journaled_new_batch_returns_append_only_wrapper_inner<F: Family>(
3066 context: deterministic::Context,
3067 ) {
3068 let hasher = Standard::<Sha256>::new();
3069 let mmr = Journaled::<F, _, Digest>::init(context.clone(), &hasher, test_config(&context))
3070 .await
3071 .unwrap();
3072
3073 let _batch: UnmerkleizedBatch<F, Digest> = mmr.new_batch();
3074
3075 mmr.destroy().await.unwrap();
3076 }
3077
3078 #[test_traced]
3079 fn test_new_batch_returns_append_only_wrapper_mmr() {
3080 let executor = deterministic::Runner::default();
3081 executor.start(journaled_new_batch_returns_append_only_wrapper_inner::<mmr::Family>);
3082 }
3083
3084 #[test_traced]
3085 fn test_new_batch_returns_append_only_wrapper_mmb() {
3086 let executor = deterministic::Runner::default();
3087 executor.start(journaled_new_batch_returns_append_only_wrapper_inner::<mmb::Family>);
3088 }
3089
3090 async fn journaled_update_leaf_after_sync_returns_pruned_inner<F: Family>(
3095 context: deterministic::Context,
3096 ) {
3097 let hasher = Standard::<Sha256>::new();
3098 let mut mmr =
3099 Journaled::<F, _, Digest>::init(context.clone(), &hasher, test_config(&context))
3100 .await
3101 .unwrap();
3102
3103 let mut batch = mmr.new_batch();
3105 for i in 0..50 {
3106 batch = batch.add(&hasher, &test_digest(i));
3107 }
3108 let batch = mmr.with_mem(|mem| batch.merkleize(mem, &hasher));
3109 mmr.apply_batch(&batch).unwrap();
3110 mmr.sync().await.unwrap();
3111
3112 let batch = mmr.to_batch().new_batch();
3116 let result = batch.update_leaf(&hasher, Location::<F>::new(0), b"updated");
3117 assert!(matches!(result, Err(Error::ElementPruned(_))));
3118
3119 mmr.destroy().await.unwrap();
3120 }
3121
3122 #[test_traced]
3123 fn test_update_leaf_after_sync_returns_pruned_mmr() {
3124 let executor = deterministic::Runner::default();
3125 executor.start(journaled_update_leaf_after_sync_returns_pruned_inner::<mmr::Family>);
3126 }
3127
3128 #[test_traced]
3129 fn test_update_leaf_after_sync_returns_pruned_mmb() {
3130 let executor = deterministic::Runner::default();
3131 executor.start(journaled_update_leaf_after_sync_returns_pruned_inner::<mmb::Family>);
3132 }
3133}