1use crate::{
8 adb::any::fixed::sync::{init_journal, init_journal_at_size},
9 journal::{
10 fixed::{Config as JConfig, Journal},
11 Error as JError,
12 },
13 metadata::{Config as MConfig, Metadata},
14 mmr::{
15 iterator::PeakIterator,
16 mem::{Config as MemConfig, Mmr as MemMmr},
17 verification::Proof,
18 Builder, Error, Hasher,
19 },
20};
21use commonware_codec::DecodeExt;
22use commonware_cryptography::{Digest, Hasher as CHasher};
23use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage as RStorage, ThreadPool};
24use commonware_utils::sequence::prefixed_u64::U64;
25use std::{
26 collections::HashMap,
27 num::{NonZeroU64, NonZeroUsize},
28};
29use tracing::{debug, error, warn};
30
31#[derive(Clone)]
33pub struct Config {
34 pub journal_partition: String,
37
38 pub metadata_partition: String,
42
43 pub items_per_blob: NonZeroU64,
45
46 pub write_buffer: NonZeroUsize,
48
49 pub thread_pool: Option<ThreadPool>,
51
52 pub buffer_pool: PoolRef,
54}
55
56pub struct SyncConfig<D: Digest> {
63 pub config: Config,
65
66 pub lower_bound: u64,
68
69 pub upper_bound: u64,
71
72 pub pinned_nodes: Option<Vec<D>>,
76}
77
78pub struct Mmr<E: RStorage + Clock + Metrics, H: CHasher> {
80 mem_mmr: MemMmr<H>,
84
85 journal: Journal<E, H::Digest>,
87
88 journal_size: u64,
91
92 metadata: Metadata<E, U64, Vec<u8>>,
96
97 pruned_to_pos: u64,
100}
101
102impl<E: RStorage + Clock + Metrics, H: CHasher> Builder<H> for Mmr<E, H> {
103 async fn add(&mut self, hasher: &mut impl Hasher<H>, element: &[u8]) -> Result<u64, Error> {
104 self.add(hasher, element).await
105 }
106
107 fn root(&self, hasher: &mut impl Hasher<H>) -> H::Digest {
108 self.root(hasher)
109 }
110}
111
112const NODE_PREFIX: u8 = 0;
114
115const PRUNE_TO_POS_PREFIX: u8 = 1;
117
118impl<E: RStorage + Clock + Metrics, H: CHasher> Mmr<E, H> {
119 pub async fn init_from_pinned_nodes(
135 context: E,
136 pinned_nodes: Vec<H::Digest>,
137 mmr_size: u64,
138 config: Config,
139 ) -> Result<Self, Error> {
140 context.remove(&config.journal_partition, None).await.ok();
142 context.remove(&config.metadata_partition, None).await.ok();
143
144 let journal_cfg = JConfig {
146 partition: config.journal_partition.clone(),
147 items_per_blob: config.items_per_blob,
148 buffer_pool: config.buffer_pool.clone(),
149 write_buffer: config.write_buffer,
150 };
151 let journal =
152 init_journal_at_size(context.with_label("mmr_journal"), journal_cfg, mmr_size).await?;
153
154 let metadata_cfg = MConfig {
156 partition: config.metadata_partition.clone(),
157 codec_config: ((0..).into(), ()),
158 };
159 let mut metadata = Metadata::init(context.with_label("mmr_metadata"), metadata_cfg).await?;
160
161 let pruning_boundary_key = U64::new(PRUNE_TO_POS_PREFIX, 0);
163 metadata.put(pruning_boundary_key, mmr_size.to_be_bytes().into());
164
165 let nodes_to_pin_positions = Proof::<H::Digest>::nodes_to_pin(mmr_size);
167 for (pos, digest) in nodes_to_pin_positions.zip(pinned_nodes.iter()) {
168 metadata.put(U64::new(NODE_PREFIX, pos), digest.to_vec());
169 }
170
171 metadata.sync().await.map_err(Error::MetadataError)?;
173
174 let mem_mmr = MemMmr::init(MemConfig {
176 nodes: vec![],
177 pruned_to_pos: mmr_size,
178 pinned_nodes,
179 pool: config.thread_pool,
180 });
181
182 Ok(Self {
183 mem_mmr,
184 journal,
185 journal_size: mmr_size,
186 metadata,
187 pruned_to_pos: mmr_size,
188 })
189 }
190
191 pub async fn init(context: E, hasher: &mut impl Hasher<H>, cfg: Config) -> Result<Self, Error> {
193 let journal_cfg = JConfig {
194 partition: cfg.journal_partition,
195 items_per_blob: cfg.items_per_blob,
196 buffer_pool: cfg.buffer_pool,
197 write_buffer: cfg.write_buffer,
198 };
199 let mut journal =
200 Journal::<E, H::Digest>::init(context.with_label("mmr_journal"), journal_cfg).await?;
201 let mut journal_size = journal.size().await?;
202
203 let metadata_cfg = MConfig {
204 partition: cfg.metadata_partition,
205 codec_config: ((0..).into(), ()),
206 };
207 let metadata =
208 Metadata::<_, U64, Vec<u8>>::init(context.with_label("mmr_metadata"), metadata_cfg)
209 .await?;
210
211 if journal_size == 0 {
212 return Ok(Self {
213 mem_mmr: MemMmr::init(MemConfig {
214 nodes: vec![],
215 pruned_to_pos: 0,
216 pinned_nodes: vec![],
217 pool: cfg.thread_pool,
218 }),
219 journal,
220 journal_size,
221 metadata,
222 pruned_to_pos: 0,
223 });
224 }
225
226 let key: U64 = U64::new(PRUNE_TO_POS_PREFIX, 0);
230 let metadata_prune_pos = match metadata.get(&key) {
231 Some(bytes) => u64::from_be_bytes(
232 bytes
233 .as_slice()
234 .try_into()
235 .expect("metadata prune position is not 8 bytes"),
236 ),
237 None => 0,
238 };
239 let oldest_retained_pos = journal.oldest_retained_pos().await?.unwrap_or(0);
240 if metadata_prune_pos != oldest_retained_pos {
241 assert!(metadata_prune_pos >= oldest_retained_pos);
242 journal.prune(metadata_prune_pos).await?;
245 if journal.oldest_retained_pos().await?.unwrap_or(0) != oldest_retained_pos {
246 warn!(
249 oldest_retained_pos,
250 metadata_prune_pos, "journal pruned to match metadata"
251 );
252 }
253 }
254
255 let last_valid_size = PeakIterator::to_nearest_size(journal_size);
256 let mut orphaned_leaf: Option<H::Digest> = None;
257 if last_valid_size != journal_size {
258 warn!(
259 last_valid_size,
260 "encountered invalid MMR structure, recovering from last valid size"
261 );
262 let recovered_item = journal.read(last_valid_size).await;
265 if let Ok(item) = recovered_item {
266 orphaned_leaf = Some(item);
267 }
268 journal.rewind(last_valid_size).await?;
269 journal.sync().await?;
270 journal_size = last_valid_size
271 }
272
273 let mut pinned_nodes = Vec::new();
275 for pos in Proof::<H::Digest>::nodes_to_pin(journal_size) {
276 let digest =
277 Mmr::<E, H>::get_from_metadata_or_journal(&metadata, &journal, pos).await?;
278 pinned_nodes.push(digest);
279 }
280 let mut mem_mmr = MemMmr::init(MemConfig {
281 nodes: vec![],
282 pruned_to_pos: journal_size,
283 pinned_nodes,
284 pool: cfg.thread_pool,
285 });
286 Self::add_extra_pinned_nodes(&mut mem_mmr, &metadata, &journal, metadata_prune_pos).await?;
287
288 let mut s = Self {
289 mem_mmr,
290 journal,
291 journal_size,
292 metadata,
293 pruned_to_pos: metadata_prune_pos,
294 };
295
296 if let Some(leaf) = orphaned_leaf {
297 let pos = s.mem_mmr.size();
299 warn!(pos, "recovering orphaned leaf");
300 s.mem_mmr.add_leaf_digest(hasher, leaf);
301 assert_eq!(pos, journal_size);
302 s.sync(hasher).await?;
303 assert_eq!(s.size(), s.journal.size().await?);
304 }
305
306 Ok(s)
307 }
308
309 async fn add_extra_pinned_nodes(
311 mem_mmr: &mut MemMmr<H>,
312 metadata: &Metadata<E, U64, Vec<u8>>,
313 journal: &Journal<E, H::Digest>,
314 prune_pos: u64,
315 ) -> Result<(), Error> {
316 let mut pinned_nodes = HashMap::new();
317 for pos in Proof::<H::Digest>::nodes_to_pin(prune_pos) {
318 let digest = Mmr::<E, H>::get_from_metadata_or_journal(metadata, journal, pos).await?;
319 pinned_nodes.insert(pos, digest);
320 }
321 mem_mmr.add_pinned_nodes(pinned_nodes);
322
323 Ok(())
324 }
325
326 pub async fn init_sync(context: E, cfg: SyncConfig<H::Digest>) -> Result<Self, Error> {
343 let journal = init_journal(
344 context.with_label("mmr_journal"),
345 JConfig {
346 partition: cfg.config.journal_partition,
347 items_per_blob: cfg.config.items_per_blob,
348 write_buffer: cfg.config.write_buffer,
349 buffer_pool: cfg.config.buffer_pool.clone(),
350 },
351 cfg.lower_bound,
352 cfg.upper_bound,
353 )
354 .await?;
355 let journal_size = journal.size().await?;
356 assert!(journal_size <= cfg.upper_bound + 1);
357
358 let metadata_cfg = MConfig {
360 partition: cfg.config.metadata_partition,
361 codec_config: ((0..).into(), ()),
362 };
363 let mut metadata = Metadata::init(context.with_label("mmr_metadata"), metadata_cfg).await?;
364
365 let pruning_boundary_key = U64::new(PRUNE_TO_POS_PREFIX, 0);
367 metadata.put(pruning_boundary_key, cfg.lower_bound.to_be_bytes().into());
368
369 if let Some(pinned_nodes) = cfg.pinned_nodes {
371 let nodes_to_pin_persisted = Proof::<H::Digest>::nodes_to_pin(cfg.lower_bound);
372 for (pos, digest) in nodes_to_pin_persisted.zip(pinned_nodes.iter()) {
373 metadata.put(U64::new(NODE_PREFIX, pos), digest.to_vec());
374 }
375 }
376
377 let nodes_to_pin_mem = Proof::<H::Digest>::nodes_to_pin(journal_size);
379 let mut mem_pinned_nodes = Vec::new();
380 for pos in nodes_to_pin_mem {
381 let digest =
382 Mmr::<E, H>::get_from_metadata_or_journal(&metadata, &journal, pos).await?;
383 mem_pinned_nodes.push(digest);
384 }
385 let mut mem_mmr = MemMmr::init(MemConfig {
386 nodes: vec![],
387 pruned_to_pos: journal_size,
388 pinned_nodes: mem_pinned_nodes,
389 pool: cfg.config.thread_pool,
390 });
391
392 if cfg.lower_bound < journal_size {
394 Self::add_extra_pinned_nodes(&mut mem_mmr, &metadata, &journal, cfg.lower_bound)
395 .await?;
396 }
397 metadata.sync().await?;
398
399 Ok(Self {
400 mem_mmr,
401 journal,
402 journal_size,
403 metadata,
404 pruned_to_pos: cfg.lower_bound,
405 })
406 }
407
408 pub fn size(&self) -> u64 {
411 self.mem_mmr.size()
412 }
413
414 pub fn leaves(&self) -> u64 {
416 self.mem_mmr.leaves()
417 }
418
419 pub fn last_leaf_pos(&self) -> Option<u64> {
421 self.mem_mmr.last_leaf_pos()
422 }
423
424 pub fn is_dirty(&self) -> bool {
426 self.mem_mmr.is_dirty()
427 }
428
429 pub async fn get_node(&self, position: u64) -> Result<Option<H::Digest>, Error> {
430 if let Some(node) = self.mem_mmr.get_node(position) {
431 return Ok(Some(node));
432 }
433
434 match self.journal.read(position).await {
435 Ok(item) => Ok(Some(item)),
436 Err(JError::ItemPruned(_)) => Ok(None),
437 Err(e) => Err(Error::JournalError(e)),
438 }
439 }
440
441 async fn get_from_metadata_or_journal(
445 metadata: &Metadata<E, U64, Vec<u8>>,
446 journal: &Journal<E, H::Digest>,
447 pos: u64,
448 ) -> Result<H::Digest, Error> {
449 if let Some(bytes) = metadata.get(&U64::new(NODE_PREFIX, pos)) {
450 debug!(pos, "read node from metadata");
451 let digest = H::Digest::decode(bytes.as_ref());
452 let Ok(digest) = digest else {
453 error!(
454 pos,
455 err = %digest.err().unwrap(),
456 "could not convert node from metadata bytes to digest"
457 );
458 return Err(Error::MissingNode(pos));
459 };
460 return Ok(digest);
461 }
462
463 debug!(pos, "reading node from journal");
465 let node = journal.read(pos).await;
466 match node {
467 Ok(node) => Ok(node),
468 Err(JError::ItemPruned(_)) => {
469 error!(pos, "node is missing from metadata and journal");
470 Err(Error::MissingNode(pos))
471 }
472 Err(e) => Err(Error::JournalError(e)),
473 }
474 }
475
476 pub async fn add(&mut self, h: &mut impl Hasher<H>, element: &[u8]) -> Result<u64, Error> {
483 Ok(self.mem_mmr.add(h, element))
484 }
485
486 pub async fn add_batched(
489 &mut self,
490 h: &mut impl Hasher<H>,
491 element: &[u8],
492 ) -> Result<u64, Error> {
493 Ok(self.mem_mmr.add_batched(h, element))
494 }
495
496 pub async fn pop(&mut self, mut leaves_to_pop: usize) -> Result<(), Error> {
504 while leaves_to_pop > 0 {
507 match self.mem_mmr.pop() {
508 Ok(_) => {
509 leaves_to_pop -= 1;
510 }
511 Err(Error::ElementPruned(_)) => break,
512 Err(Error::Empty) => {
513 return Err(Error::Empty);
514 }
515 _ => unreachable!(),
516 }
517 }
518 if leaves_to_pop == 0 {
519 return Ok(());
520 }
521
522 let mut new_size = self.size();
523 while leaves_to_pop > 0 {
524 if new_size == 0 {
525 return Err(Error::Empty);
526 }
527 new_size -= 1;
528 if new_size < self.pruned_to_pos {
529 return Err(Error::ElementPruned(new_size));
530 }
531 if PeakIterator::check_validity(new_size) {
532 leaves_to_pop -= 1;
533 }
534 }
535
536 self.journal.rewind(new_size).await?;
537 self.journal.sync().await?;
538 self.journal_size = new_size;
539
540 let mut pinned_nodes = Vec::new();
542 for pos in Proof::<H::Digest>::nodes_to_pin(new_size) {
543 let digest =
544 Mmr::<E, H>::get_from_metadata_or_journal(&self.metadata, &self.journal, pos)
545 .await?;
546 pinned_nodes.push(digest);
547 }
548 self.mem_mmr = MemMmr::init(MemConfig {
549 nodes: vec![],
550 pruned_to_pos: new_size,
551 pinned_nodes,
552 pool: self.mem_mmr.thread_pool.take(),
553 });
554 Self::add_extra_pinned_nodes(
555 &mut self.mem_mmr,
556 &self.metadata,
557 &self.journal,
558 self.pruned_to_pos,
559 )
560 .await?;
561
562 Ok(())
563 }
564
565 pub fn root(&self, h: &mut impl Hasher<H>) -> H::Digest {
571 self.mem_mmr.root(h)
572 }
573
574 pub fn process_updates(&mut self, h: &mut impl Hasher<H>) {
576 self.mem_mmr.sync(h)
577 }
578
579 pub async fn sync(&mut self, h: &mut impl Hasher<H>) -> Result<(), Error> {
582 self.process_updates(h);
583
584 for i in self.journal_size..self.size() {
586 let node = *self.mem_mmr.get_node_unchecked(i);
587 self.journal.append(node).await?;
588 }
589 self.journal_size = self.size();
590 self.journal.sync().await?;
591 assert_eq!(self.journal_size, self.journal.size().await?);
592
593 let mut pinned_nodes = HashMap::new();
596 for pos in Proof::<H::Digest>::nodes_to_pin(self.pruned_to_pos) {
597 let digest = self.mem_mmr.get_node_unchecked(pos);
598 pinned_nodes.insert(pos, *digest);
599 }
600
601 self.mem_mmr.prune_all();
604 self.mem_mmr.add_pinned_nodes(pinned_nodes);
605
606 Ok(())
607 }
608
609 async fn update_metadata(
612 &mut self,
613 prune_to_pos: u64,
614 ) -> Result<HashMap<u64, H::Digest>, Error> {
615 let mut pinned_nodes = HashMap::new();
616 for pos in Proof::<H::Digest>::nodes_to_pin(prune_to_pos) {
617 let digest = self.get_node(pos).await?.unwrap();
618 self.metadata
619 .put(U64::new(NODE_PREFIX, pos), digest.to_vec());
620 pinned_nodes.insert(pos, digest);
621 }
622
623 let key: U64 = U64::new(PRUNE_TO_POS_PREFIX, 0);
624 self.metadata.put(key, prune_to_pos.to_be_bytes().into());
625
626 self.metadata.sync().await.map_err(Error::MetadataError)?;
627
628 Ok(pinned_nodes)
629 }
630
631 pub async fn proof(&self, element_pos: u64) -> Result<Proof<H::Digest>, Error> {
638 self.range_proof(element_pos, element_pos).await
639 }
640
641 pub async fn range_proof(
648 &self,
649 start_element_pos: u64,
650 end_element_pos: u64,
651 ) -> Result<Proof<H::Digest>, Error> {
652 assert!(!self.mem_mmr.is_dirty());
653 Proof::<H::Digest>::range_proof::<Mmr<E, H>>(self, start_element_pos, end_element_pos).await
654 }
655
656 pub async fn historical_range_proof(
659 &self,
660 size: u64,
661 start_element_pos: u64,
662 end_element_pos: u64,
663 ) -> Result<Proof<H::Digest>, Error> {
664 assert!(!self.mem_mmr.is_dirty());
665 Proof::<H::Digest>::historical_range_proof::<Mmr<E, H>>(
666 self,
667 size,
668 start_element_pos,
669 end_element_pos,
670 )
671 .await
672 }
673
674 pub async fn prune_all(&mut self, h: &mut impl Hasher<H>) -> Result<(), Error> {
677 if self.size() != 0 {
678 self.prune_to_pos(h, self.size()).await?;
679 return Ok(());
680 }
681 Ok(())
682 }
683
684 pub async fn prune_to_pos(&mut self, h: &mut impl Hasher<H>, pos: u64) -> Result<(), Error> {
689 assert!(pos <= self.size());
690 if self.size() == 0 {
691 return Ok(());
692 }
693
694 self.sync(h).await?;
696
697 let pinned_nodes = self.update_metadata(pos).await?;
700
701 self.journal.prune(pos).await?;
702 self.mem_mmr.add_pinned_nodes(pinned_nodes);
703 self.pruned_to_pos = pos;
704
705 Ok(())
706 }
707
708 pub fn pruned_to_pos(&self) -> u64 {
711 self.pruned_to_pos
712 }
713
714 pub fn oldest_retained_pos(&self) -> Option<u64> {
716 if self.pruned_to_pos == self.size() {
717 return None;
718 }
719
720 Some(self.pruned_to_pos)
721 }
722
723 pub async fn close(mut self, h: &mut impl Hasher<H>) -> Result<(), Error> {
725 self.sync(h).await?;
726 self.journal.close().await?;
727 self.metadata.close().await.map_err(Error::MetadataError)
728 }
729
730 pub async fn destroy(self) -> Result<(), Error> {
732 self.journal.destroy().await?;
733 self.metadata.destroy().await?;
734
735 Ok(())
736 }
737
738 #[cfg(test)]
739 pub async fn simulate_partial_sync(
742 mut self,
743 hasher: &mut impl Hasher<H>,
744 write_limit: usize,
745 ) -> Result<(), Error> {
746 if write_limit == 0 {
747 return Ok(());
748 }
749
750 let mut written_count = 0usize;
753 self.mem_mmr.sync(hasher);
754 for i in self.journal_size..self.size() {
755 let node = *self.mem_mmr.get_node_unchecked(i);
756 self.journal.append(node).await?;
757 written_count += 1;
758 if written_count >= write_limit {
759 break;
760 }
761 }
762 self.journal.sync().await?;
763
764 Ok(())
765 }
766
767 #[cfg(test)]
768 pub fn get_pinned_nodes(&self) -> HashMap<u64, H::Digest> {
769 self.mem_mmr.pinned_nodes.clone()
770 }
771
772 #[cfg(test)]
773 pub async fn simulate_pruning_failure(
774 mut self,
775 h: &mut impl Hasher<H>,
776 prune_to_pos: u64,
777 ) -> Result<(), Error> {
778 assert!(prune_to_pos <= self.size());
779
780 self.sync(h).await?;
782
783 self.update_metadata(prune_to_pos).await?;
786
787 Ok(())
789 }
790}
791
792#[cfg(test)]
793mod tests {
794 use super::*;
795 use crate::mmr::{
796 hasher::Standard,
797 iterator::leaf_num_to_pos,
798 tests::{
799 build_and_check_test_roots_mmr, build_batched_and_check_test_roots_journaled, ROOTS,
800 },
801 };
802 use commonware_cryptography::{sha256::Digest, Hasher, Sha256};
803 use commonware_macros::test_traced;
804 use commonware_runtime::{buffer::PoolRef, deterministic, Blob as _, Runner};
805 use commonware_utils::{hex, NZUsize, NZU64};
806
807 fn test_digest(v: usize) -> Digest {
808 Sha256::hash(&v.to_be_bytes())
809 }
810
811 const PAGE_SIZE: usize = 111;
812 const PAGE_CACHE_SIZE: usize = 5;
813
814 fn test_config() -> Config {
815 Config {
816 journal_partition: "journal_partition".into(),
817 metadata_partition: "metadata_partition".into(),
818 items_per_blob: NZU64!(7),
819 write_buffer: NZUsize!(1024),
820 thread_pool: None,
821 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
822 }
823 }
824
825 #[test]
827 fn test_journaled_mmr_root_stability() {
828 let executor = deterministic::Runner::default();
829 executor.start(|context| async move {
830 let mut mmr = Mmr::init(context.clone(), &mut Standard::new(), test_config())
831 .await
832 .unwrap();
833 build_and_check_test_roots_mmr(&mut mmr).await;
834 mmr.destroy().await.unwrap();
835 });
836 }
837
838 #[test]
841 fn test_journaled_mmr_root_stability_batched() {
842 let executor = deterministic::Runner::default();
843 executor.start(|context| async move {
844 let mut std_hasher = Standard::new();
845 let mut mmr = Mmr::init(context.clone(), &mut std_hasher, test_config())
846 .await
847 .unwrap();
848 build_batched_and_check_test_roots_journaled(&mut mmr).await;
849 mmr.destroy().await.unwrap();
850 });
851 }
852
853 #[test_traced]
854 fn test_journaled_mmr_empty() {
855 let executor = deterministic::Runner::default();
856 executor.start(|context| async move {
857 let mut hasher: Standard<Sha256> = Standard::new();
858 let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
859 .await
860 .unwrap();
861 assert_eq!(mmr.size(), 0);
862 assert!(mmr.get_node(0).await.is_err());
863 assert_eq!(mmr.oldest_retained_pos(), None);
864 assert!(mmr.prune_all(&mut hasher).await.is_ok());
865 assert_eq!(mmr.pruned_to_pos(), 0);
866 assert!(mmr.prune_to_pos(&mut hasher, 0).await.is_ok());
867 assert!(mmr.sync(&mut hasher).await.is_ok());
868 assert!(matches!(mmr.pop(1).await, Err(Error::Empty)));
869
870 mmr.add(&mut hasher, &test_digest(0)).await.unwrap();
871 assert_eq!(mmr.size(), 1);
872 mmr.sync(&mut hasher).await.unwrap();
873 assert!(mmr.get_node(0).await.is_ok());
874 assert!(mmr.pop(1).await.is_ok());
875 assert_eq!(mmr.size(), 0);
876 mmr.sync(&mut hasher).await.unwrap();
877
878 let mmr = Mmr::init(context.clone(), &mut hasher, test_config())
879 .await
880 .unwrap();
881 assert_eq!(mmr.size(), 0);
882
883 mmr.destroy().await.unwrap();
884 });
885 }
886
887 #[test_traced]
888 fn test_journaled_mmr_pop() {
889 let executor = deterministic::Runner::default();
890 executor.start(|context| async move {
891 let mut hasher: Standard<Sha256> = Standard::new();
892 let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
893 .await
894 .unwrap();
895
896 let mut c_hasher = Sha256::new();
897 for i in 0u64..199 {
898 c_hasher.update(&i.to_be_bytes());
899 let element = c_hasher.finalize();
900 mmr.add(&mut hasher, &element).await.unwrap();
901 }
902 assert_eq!(ROOTS[199], hex(&mmr.root(&mut hasher)));
903
904 for i in (0..199u64).rev() {
907 assert!(mmr.pop(1).await.is_ok());
908 let root = mmr.root(&mut hasher);
909 let expected_root = ROOTS[i as usize];
910 assert_eq!(hex(&root), expected_root);
911 }
912 assert!(matches!(mmr.pop(1).await, Err(Error::Empty)));
913 assert!(mmr.pop(0).await.is_ok());
914
915 for i in 0u64..199 {
918 c_hasher.update(&i.to_be_bytes());
919 let element = c_hasher.finalize();
920 mmr.add(&mut hasher, &element).await.unwrap();
921 if i == 101 {
922 mmr.sync(&mut hasher).await.unwrap();
923 }
924 }
925 for i in (0..198u64).rev().step_by(2) {
926 assert!(mmr.pop(2).await.is_ok());
927 let root = mmr.root(&mut hasher);
928 let expected_root = ROOTS[i as usize];
929 assert_eq!(hex(&root), expected_root);
930 }
931 assert_eq!(mmr.size(), 1);
932 assert!(mmr.pop(1).await.is_ok()); assert!(matches!(mmr.pop(99).await, Err(Error::Empty)));
934
935 for i in 0u64..199 {
937 c_hasher.update(&i.to_be_bytes());
938 let element = c_hasher.finalize();
939 mmr.add(&mut hasher, &element).await.unwrap();
940 if i == 101 {
941 mmr.sync(&mut hasher).await.unwrap();
942 }
943 }
944 let leaf_pos = leaf_num_to_pos(50);
945 mmr.prune_to_pos(&mut hasher, leaf_pos).await.unwrap();
946 mmr.pop(80).await.unwrap();
948 mmr.proof(leaf_pos).await.unwrap();
950 while mmr.size() > leaf_pos {
952 assert!(mmr.pop(1).await.is_ok());
953 }
954 assert!(matches!(mmr.pop(1).await, Err(Error::ElementPruned(_))));
955
956 mmr.destroy().await.unwrap();
957 });
958 }
959
960 #[test_traced]
961 fn test_journaled_mmr_basic() {
962 let executor = deterministic::Runner::default();
963 executor.start(|context| async move {
964 let mut hasher: Standard<Sha256> = Standard::new();
965 let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
966 .await
967 .unwrap();
968 const LEAF_COUNT: usize = 255;
970 let mut leaves = Vec::with_capacity(LEAF_COUNT);
971 let mut positions = Vec::with_capacity(LEAF_COUNT);
972 for i in 0..LEAF_COUNT {
973 let digest = test_digest(i);
974 leaves.push(digest);
975 let pos = mmr.add(&mut hasher, leaves.last().unwrap()).await.unwrap();
976 positions.push(pos);
977 }
978 assert_eq!(mmr.size(), 502);
979 assert_eq!(mmr.journal_size, 0);
980
981 const TEST_ELEMENT: usize = 133;
983 let test_element_pos = positions[TEST_ELEMENT];
984
985 let proof = mmr.proof(test_element_pos).await.unwrap();
986 let root = mmr.root(&mut hasher);
987 assert!(proof.verify_element_inclusion(
988 &mut hasher,
989 &leaves[TEST_ELEMENT],
990 test_element_pos,
991 &root,
992 ));
993
994 mmr.sync(&mut hasher).await.unwrap();
996 assert_eq!(mmr.journal_size, 502);
997 assert_eq!(mmr.mem_mmr.oldest_retained_pos(), None);
998
999 let proof2 = mmr.proof(test_element_pos).await.unwrap();
1002 assert_eq!(proof, proof2);
1003
1004 let last_element = LEAF_COUNT - 1;
1006 let last_element_pos = positions[last_element];
1007 let proof = mmr
1008 .range_proof(test_element_pos, last_element_pos)
1009 .await
1010 .unwrap();
1011 assert!(proof.verify_range_inclusion(
1012 &mut hasher,
1013 &leaves[TEST_ELEMENT..last_element + 1],
1014 test_element_pos,
1015 &root
1016 ));
1017
1018 mmr.destroy().await.unwrap();
1019 });
1020 }
1021
1022 #[test_traced]
1023 fn test_journaled_mmr_recovery() {
1026 let executor = deterministic::Runner::default();
1027 executor.start(|context| async move {
1028 let mut hasher: Standard<Sha256> = Standard::new();
1029 let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1030 .await
1031 .unwrap();
1032 assert_eq!(mmr.size(), 0);
1033
1034 const LEAF_COUNT: usize = 252;
1036 let mut leaves = Vec::with_capacity(LEAF_COUNT);
1037 let mut positions = Vec::with_capacity(LEAF_COUNT);
1038 for i in 0..LEAF_COUNT {
1039 let digest = test_digest(i);
1040 leaves.push(digest);
1041 let pos = mmr.add(&mut hasher, leaves.last().unwrap()).await.unwrap();
1042 positions.push(pos);
1043 }
1044 assert_eq!(mmr.size(), 498);
1045 let root = mmr.root(&mut hasher);
1046 mmr.close(&mut hasher).await.unwrap();
1047
1048 let partition: String = "journal_partition".into();
1052 let (blob, len) = context
1053 .open(&partition, &71u64.to_be_bytes())
1054 .await
1055 .expect("Failed to open blob");
1056 assert_eq!(len, 36); blob.resize(len - 1).await.expect("Failed to corrupt blob");
1060 blob.sync().await.expect("Failed to sync blob");
1061
1062 let mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1063 .await
1064 .unwrap();
1065 assert_eq!(mmr.size(), 498);
1068 assert_eq!(mmr.root(&mut hasher), root);
1069
1070 mmr.close(&mut hasher).await.unwrap();
1072 let mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1073 .await
1074 .unwrap();
1075 assert_eq!(mmr.size(), 498);
1076 mmr.close(&mut hasher).await.unwrap();
1077
1078 context
1082 .remove(&partition, Some(&71u64.to_be_bytes()))
1083 .await
1084 .expect("Failed to remove blob");
1085 let (blob, len) = context
1086 .open(&partition, &70u64.to_be_bytes())
1087 .await
1088 .expect("Failed to open blob");
1089 assert_eq!(len, 36 * 7); blob.resize(36 * 5 + 35)
1093 .await
1094 .expect("Failed to corrupt blob");
1095 blob.sync().await.expect("Failed to sync blob");
1096
1097 let mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1098 .await
1099 .unwrap();
1100 assert_eq!(mmr.size(), 495);
1103
1104 mmr.destroy().await.unwrap();
1105 });
1106 }
1107
1108 #[test_traced]
1109 fn test_journaled_mmr_pruning() {
1110 let executor = deterministic::Runner::default();
1111 executor.start(|context| async move {
1112 let mut hasher: Standard<Sha256> = Standard::new();
1113 const LEAF_COUNT: usize = 2000;
1115 let cfg_pruned = test_config();
1116 let mut pruned_mmr = Mmr::init(context.clone(), &mut hasher, cfg_pruned.clone())
1117 .await
1118 .unwrap();
1119 let cfg_unpruned = Config {
1120 journal_partition: "unpruned_journal_partition".into(),
1121 metadata_partition: "unpruned_metadata_partition".into(),
1122 items_per_blob: NZU64!(7),
1123 write_buffer: NZUsize!(1024),
1124 thread_pool: None,
1125 buffer_pool: cfg_pruned.buffer_pool.clone(),
1126 };
1127 let mut mmr = Mmr::init(context.clone(), &mut hasher, cfg_unpruned)
1128 .await
1129 .unwrap();
1130 let mut leaves = Vec::with_capacity(LEAF_COUNT);
1131 let mut positions = Vec::with_capacity(LEAF_COUNT);
1132 for i in 0..LEAF_COUNT {
1133 let digest = test_digest(i);
1134 leaves.push(digest);
1135 let last_leaf = leaves.last().unwrap();
1136 let pos = mmr.add(&mut hasher, last_leaf).await.unwrap();
1137 positions.push(pos);
1138 pruned_mmr.add(&mut hasher, last_leaf).await.unwrap();
1139 }
1140 assert_eq!(mmr.size(), 3994);
1141 assert_eq!(pruned_mmr.size(), 3994);
1142
1143 for i in 0usize..300 {
1146 let prune_pos = i as u64 * 10;
1147 pruned_mmr
1148 .prune_to_pos(&mut hasher, prune_pos)
1149 .await
1150 .unwrap();
1151 assert_eq!(prune_pos, pruned_mmr.pruned_to_pos());
1152
1153 let digest = test_digest(LEAF_COUNT + i);
1154 leaves.push(digest);
1155 let last_leaf = leaves.last().unwrap();
1156 let pos = pruned_mmr.add(&mut hasher, last_leaf).await.unwrap();
1157 positions.push(pos);
1158 mmr.add(&mut hasher, last_leaf).await.unwrap();
1159 assert_eq!(pruned_mmr.root(&mut hasher), mmr.root(&mut hasher));
1160 }
1161
1162 pruned_mmr.sync(&mut hasher).await.unwrap();
1164 assert_eq!(pruned_mmr.root(&mut hasher), mmr.root(&mut hasher));
1165
1166 pruned_mmr.close(&mut hasher).await.unwrap();
1168 let mut pruned_mmr = Mmr::init(context.clone(), &mut hasher, cfg_pruned.clone())
1169 .await
1170 .unwrap();
1171 assert_eq!(pruned_mmr.root(&mut hasher), mmr.root(&mut hasher));
1172
1173 let size = pruned_mmr.size();
1175 pruned_mmr.prune_all(&mut hasher).await.unwrap();
1176 assert_eq!(pruned_mmr.root(&mut hasher), mmr.root(&mut hasher));
1177 assert_eq!(pruned_mmr.oldest_retained_pos(), None);
1178 assert_eq!(pruned_mmr.pruned_to_pos(), size);
1179
1180 mmr.add(&mut hasher, &test_digest(LEAF_COUNT))
1183 .await
1184 .unwrap();
1185 pruned_mmr
1186 .add(&mut hasher, &test_digest(LEAF_COUNT))
1187 .await
1188 .unwrap();
1189 assert!(pruned_mmr.size() % cfg_pruned.items_per_blob != 0);
1190 pruned_mmr.close(&mut hasher).await.unwrap();
1191 let mut pruned_mmr = Mmr::init(context.clone(), &mut hasher, cfg_pruned.clone())
1192 .await
1193 .unwrap();
1194 assert_eq!(pruned_mmr.root(&mut hasher), mmr.root(&mut hasher));
1195 assert_eq!(pruned_mmr.oldest_retained_pos(), Some(size));
1196 assert_eq!(pruned_mmr.pruned_to_pos(), size);
1197
1198 while pruned_mmr.size() % cfg_pruned.items_per_blob != 0 {
1201 pruned_mmr
1202 .add(&mut hasher, &test_digest(LEAF_COUNT))
1203 .await
1204 .unwrap();
1205 }
1206 pruned_mmr.prune_all(&mut hasher).await.unwrap();
1207 assert_eq!(pruned_mmr.oldest_retained_pos(), None);
1208
1209 pruned_mmr.destroy().await.unwrap();
1210 mmr.destroy().await.unwrap();
1211 });
1212 }
1213
1214 #[test_traced("WARN")]
1215 fn test_journaled_mmr_recovery_with_pruning() {
1217 let executor = deterministic::Runner::default();
1218 executor.start(|context| async move {
1219 let mut hasher: Standard<Sha256> = Standard::new();
1221 const LEAF_COUNT: usize = 2000;
1222 let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1223 .await
1224 .unwrap();
1225 let mut leaves = Vec::with_capacity(LEAF_COUNT);
1226 let mut positions = Vec::with_capacity(LEAF_COUNT);
1227 for i in 0..LEAF_COUNT {
1228 let digest = test_digest(i);
1229 leaves.push(digest);
1230 let last_leaf = leaves.last().unwrap();
1231 let pos = mmr.add(&mut hasher, last_leaf).await.unwrap();
1232 positions.push(pos);
1233 }
1234 assert_eq!(mmr.size(), 3994);
1235 mmr.close(&mut hasher).await.unwrap();
1236
1237 for i in 0usize..200 {
1239 let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1240 .await
1241 .unwrap();
1242 let start_size = mmr.size();
1243 let prune_pos = std::cmp::min(i as u64 * 50, start_size);
1244 if i % 5 == 0 {
1245 mmr.simulate_pruning_failure(&mut hasher, prune_pos)
1246 .await
1247 .unwrap();
1248 continue;
1249 }
1250 mmr.prune_to_pos(&mut hasher, prune_pos).await.unwrap();
1251
1252 for j in 0..10 {
1254 let digest = test_digest(100 * (i + 1) + j);
1255 leaves.push(digest);
1256 let last_leaf = leaves.last().unwrap();
1257 let pos = mmr.add(&mut hasher, last_leaf).await.unwrap();
1258 positions.push(pos);
1259 mmr.add(&mut hasher, last_leaf).await.unwrap();
1260 assert_eq!(mmr.root(&mut hasher), mmr.root(&mut hasher));
1261 let digest = test_digest(LEAF_COUNT + i);
1262 leaves.push(digest);
1263 let last_leaf = leaves.last().unwrap();
1264 let pos = mmr.add(&mut hasher, last_leaf).await.unwrap();
1265 positions.push(pos);
1266 mmr.add(&mut hasher, last_leaf).await.unwrap();
1267 }
1268 let end_size = mmr.size();
1269 let total_to_write = (end_size - start_size) as usize;
1270 let partial_write_limit = i % total_to_write;
1271 mmr.simulate_partial_sync(&mut hasher, partial_write_limit)
1272 .await
1273 .unwrap();
1274 }
1275
1276 let mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1277 .await
1278 .unwrap();
1279 mmr.destroy().await.unwrap();
1280 });
1281 }
1282
1283 #[test_traced]
1284 fn test_journaled_mmr_historical_range_proof_basic() {
1285 let executor = deterministic::Runner::default();
1286 executor.start(|context| async move {
1287 let mut hasher = Standard::<Sha256>::new();
1289 let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1290 .await
1291 .unwrap();
1292 let mut elements = Vec::new();
1293 let mut positions = Vec::new();
1294 for i in 0..10 {
1295 elements.push(test_digest(i));
1296 positions.push(mmr.add(&mut hasher, &elements[i]).await.unwrap());
1297 }
1298 let original_size = mmr.size();
1299
1300 let historical_proof = mmr
1302 .historical_range_proof(original_size, positions[2], positions[5])
1303 .await
1304 .unwrap();
1305 assert_eq!(historical_proof.size, original_size);
1306 let root = mmr.root(&mut hasher);
1307 assert!(historical_proof.verify_range_inclusion(
1308 &mut hasher,
1309 &elements[2..=5],
1310 positions[2],
1311 &root
1312 ));
1313 let regular_proof = mmr.range_proof(positions[2], positions[5]).await.unwrap();
1314 assert_eq!(regular_proof.size, historical_proof.size);
1315 assert_eq!(regular_proof.digests, historical_proof.digests);
1316
1317 for i in 10..20 {
1319 elements.push(test_digest(i));
1320 positions.push(mmr.add(&mut hasher, &elements[i]).await.unwrap());
1321 }
1322 let new_historical_proof = mmr
1323 .historical_range_proof(original_size, positions[2], positions[5])
1324 .await
1325 .unwrap();
1326 assert_eq!(new_historical_proof.size, historical_proof.size);
1327 assert_eq!(new_historical_proof.digests, historical_proof.digests);
1328
1329 mmr.destroy().await.unwrap();
1330 });
1331 }
1332
1333 #[test_traced]
1334 fn test_journaled_mmr_historical_range_proof_with_pruning() {
1335 let executor = deterministic::Runner::default();
1336 executor.start(|context| async move {
1337 let mut hasher = Standard::<Sha256>::new();
1338 let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1339 .await
1340 .unwrap();
1341
1342 let mut elements = Vec::new();
1344 let mut positions = Vec::new();
1345 for i in 0..50 {
1346 elements.push(test_digest(i));
1347 positions.push(mmr.add(&mut hasher, &elements[i]).await.unwrap());
1348 }
1349
1350 let prune_pos = 30;
1352 mmr.prune_to_pos(&mut hasher, prune_pos).await.unwrap();
1353
1354 let mut ref_mmr = Mmr::init(
1356 context.clone(),
1357 &mut hasher,
1358 Config {
1359 journal_partition: "ref_journal_pruned".into(),
1360 metadata_partition: "ref_metadata_pruned".into(),
1361 items_per_blob: NZU64!(7),
1362 write_buffer: NZUsize!(1024),
1363 thread_pool: None,
1364 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1365 },
1366 )
1367 .await
1368 .unwrap();
1369
1370 for elt in elements.iter().take(41) {
1371 ref_mmr.add(&mut hasher, elt).await.unwrap();
1372 }
1373 let historical_size = ref_mmr.size();
1374 let historical_root = ref_mmr.root(&mut hasher);
1375
1376 let historical_proof = mmr
1378 .historical_range_proof(
1379 historical_size,
1380 positions[35], positions[38], )
1383 .await
1384 .unwrap();
1385
1386 assert_eq!(historical_proof.size, historical_size);
1387
1388 assert!(historical_proof.verify_range_inclusion(
1390 &mut hasher,
1391 &elements[35..=38],
1392 positions[35],
1393 &historical_root
1394 ));
1395
1396 ref_mmr.destroy().await.unwrap();
1397 mmr.destroy().await.unwrap();
1398 });
1399 }
1400
1401 #[test_traced]
1402 fn test_journaled_mmr_historical_range_proof_large() {
1403 let executor = deterministic::Runner::default();
1404 executor.start(|context| async move {
1405 let mut hasher = Standard::<Sha256>::new();
1406
1407 let mut mmr = Mmr::init(
1408 context.clone(),
1409 &mut hasher,
1410 Config {
1411 journal_partition: "server_journal".into(),
1412 metadata_partition: "server_metadata".into(),
1413 items_per_blob: NZU64!(7),
1414 write_buffer: NZUsize!(1024),
1415 thread_pool: None,
1416 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1417 },
1418 )
1419 .await
1420 .unwrap();
1421
1422 let mut elements = Vec::new();
1423 let mut positions = Vec::new();
1424 for i in 0..100 {
1425 elements.push(test_digest(i));
1426 positions.push(mmr.add(&mut hasher, &elements[i]).await.unwrap());
1427 }
1428
1429 let start_pos = 30;
1430 let end_pos = 60;
1431
1432 let mut ref_mmr = Mmr::init(
1434 context.clone(),
1435 &mut hasher,
1436 Config {
1437 journal_partition: "client_journal".into(),
1438 metadata_partition: "client_metadata".into(),
1439 items_per_blob: NZU64!(7),
1440 write_buffer: NZUsize!(1024),
1441 thread_pool: None,
1442 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1443 },
1444 )
1445 .await
1446 .unwrap();
1447
1448 for elt in elements.iter().take(end_pos + 1) {
1450 ref_mmr.add(&mut hasher, elt).await.unwrap();
1451 }
1452 let historical_size = ref_mmr.size();
1453 let expected_root = ref_mmr.root(&mut hasher);
1454
1455 let proof = mmr
1457 .historical_range_proof(historical_size, positions[start_pos], positions[end_pos])
1458 .await
1459 .unwrap();
1460
1461 assert!(proof.verify_range_inclusion(
1462 &mut hasher,
1463 &elements[start_pos..=end_pos],
1464 positions[start_pos],
1465 &expected_root ));
1467
1468 ref_mmr.destroy().await.unwrap();
1469 mmr.destroy().await.unwrap();
1470 });
1471 }
1472
1473 #[test_traced]
1474 fn test_journaled_mmr_historical_range_proof_singleton() {
1475 let executor = deterministic::Runner::default();
1476 executor.start(|context| async move {
1477 let mut hasher = Standard::<Sha256>::new();
1478 let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1479 .await
1480 .unwrap();
1481
1482 let element = test_digest(0);
1483 let position = mmr.add(&mut hasher, &element).await.unwrap();
1484
1485 let single_proof = mmr
1487 .historical_range_proof(
1488 position + 1, position,
1490 position,
1491 )
1492 .await
1493 .unwrap();
1494
1495 let root = mmr.root(&mut hasher);
1496 assert!(single_proof.verify_range_inclusion(&mut hasher, &[element], position, &root));
1497
1498 mmr.destroy().await.unwrap();
1499 });
1500 }
1501
1502 #[test_traced]
1503 fn test_journaled_mmr_init_from_pinned_nodes() {
1504 let executor = deterministic::Runner::default();
1505 executor.start(|context| async move {
1506 let mut hasher = Standard::<Sha256>::new();
1507
1508 let mut original_mmr = Mmr::init(
1510 context.clone(),
1511 &mut hasher,
1512 Config {
1513 journal_partition: "original_journal".into(),
1514 metadata_partition: "original_metadata".into(),
1515 items_per_blob: NZU64!(7),
1516 write_buffer: NZUsize!(1024),
1517 thread_pool: None,
1518 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1519 },
1520 )
1521 .await
1522 .unwrap();
1523
1524 for i in 0..1_000 {
1526 original_mmr
1527 .add(&mut hasher, &test_digest(i))
1528 .await
1529 .unwrap();
1530 }
1531 original_mmr.sync(&mut hasher).await.unwrap();
1532 let original_size = original_mmr.size();
1533 original_mmr
1534 .prune_to_pos(&mut hasher, original_size)
1535 .await
1536 .unwrap();
1537
1538 let mut hasher = Standard::<Sha256>::new();
1540 let original_journal_digest = original_mmr.root(&mut hasher);
1541
1542 let pinned_nodes_map = original_mmr.get_pinned_nodes();
1544 let pinned_nodes: Vec<_> = Proof::<Digest>::nodes_to_pin(original_size)
1545 .map(|pos| pinned_nodes_map[&pos])
1546 .collect();
1547
1548 let new_mmr_config = Config {
1550 journal_partition: "new_journal".into(),
1551 metadata_partition: "new_metadata".into(),
1552 items_per_blob: NZU64!(7),
1553 write_buffer: NZUsize!(1024),
1554 thread_pool: None,
1555 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1556 };
1557 let mut new_mmr = Mmr::<_, Sha256>::init_from_pinned_nodes(
1558 context.clone(),
1559 pinned_nodes,
1560 original_size,
1561 new_mmr_config.clone(),
1562 )
1563 .await
1564 .unwrap();
1565
1566 assert_eq!(new_mmr.size(), original_size);
1568 assert_eq!(new_mmr.pruned_to_pos(), original_size);
1569 assert_eq!(new_mmr.oldest_retained_pos(), None);
1570 let new_journal_digest = new_mmr.root(&mut hasher);
1572 assert_eq!(new_journal_digest, original_journal_digest);
1573
1574 let new_element = test_digest(10);
1576
1577 let original_mmr_pos = original_mmr.add(&mut hasher, &new_element).await.unwrap();
1578 assert_eq!(original_mmr_pos, original_size);
1579
1580 let new_mmr_pos = new_mmr.add(&mut hasher, &new_element).await.unwrap();
1581 assert_eq!(new_mmr_pos, original_size); let original_mmr_root = original_mmr.root(&mut hasher);
1585 let new_mmr_root = new_mmr.root(&mut hasher);
1586 assert_eq!(new_mmr_root, original_mmr_root);
1587
1588 new_mmr.close(&mut hasher).await.unwrap();
1590 let new_mmr = Mmr::<_, Sha256>::init(context.clone(), &mut hasher, new_mmr_config)
1591 .await
1592 .unwrap();
1593
1594 let new_mmr_root = new_mmr.root(&mut hasher);
1596 assert_eq!(new_mmr_root, original_mmr_root);
1597
1598 assert_eq!(new_mmr.size(), original_size + 1); assert_eq!(new_mmr.pruned_to_pos(), original_size);
1601 assert_eq!(new_mmr.oldest_retained_pos(), Some(original_size)); let proof = new_mmr.proof(original_size).await.unwrap();
1605 let original_proof = original_mmr.proof(original_size).await.unwrap();
1606 assert_eq!(proof.digests, original_proof.digests);
1607 assert_eq!(proof.size, original_proof.size);
1608
1609 original_mmr.destroy().await.unwrap();
1610 new_mmr.destroy().await.unwrap();
1611 });
1612 }
1613
1614 #[test_traced]
1615 fn test_journaled_mmr_init_from_pinned_nodes_edge_cases() {
1616 let executor = deterministic::Runner::default();
1617 executor.start(|context| async move {
1618 let mut hasher = Standard::<Sha256>::new();
1619
1620 let mut empty_mmr = Mmr::<_, Sha256>::init_from_pinned_nodes(
1622 context.clone(),
1623 vec![], 0, Config {
1626 journal_partition: "empty_journal".into(),
1627 metadata_partition: "empty_metadata".into(),
1628 items_per_blob: NZU64!(7),
1629 write_buffer: NZUsize!(1024),
1630 thread_pool: None,
1631 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1632 },
1633 )
1634 .await
1635 .unwrap();
1636
1637 assert_eq!(empty_mmr.size(), 0);
1638 assert_eq!(empty_mmr.pruned_to_pos(), 0);
1639 assert_eq!(empty_mmr.oldest_retained_pos(), None);
1640
1641 let pos = empty_mmr.add(&mut hasher, &test_digest(0)).await.unwrap();
1643 assert_eq!(pos, 0);
1644 assert_eq!(empty_mmr.size(), 1);
1645
1646 empty_mmr.destroy().await.unwrap();
1647
1648 let mut single_mem_mmr = MemMmr::new();
1650 single_mem_mmr.add(&mut hasher, &test_digest(42));
1651 let single_size = single_mem_mmr.size();
1652 let single_root = single_mem_mmr.root(&mut hasher);
1653 let single_pinned = single_mem_mmr.node_digests_to_pin(single_size);
1654
1655 let single_journaled_mmr = Mmr::<_, Sha256>::init_from_pinned_nodes(
1656 context.clone(),
1657 single_pinned,
1658 single_size,
1659 Config {
1660 journal_partition: "single_journal".into(),
1661 metadata_partition: "single_metadata".into(),
1662 items_per_blob: NZU64!(7),
1663 write_buffer: NZUsize!(1024),
1664 thread_pool: None,
1665 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
1666 },
1667 )
1668 .await
1669 .unwrap();
1670
1671 assert_eq!(single_journaled_mmr.size(), single_size);
1672 assert_eq!(single_journaled_mmr.root(&mut hasher), single_root);
1673
1674 single_journaled_mmr.destroy().await.unwrap();
1675 });
1676 }
1677 #[test_traced]
1679 fn test_journaled_mmr_init_sync_empty() {
1680 let executor = deterministic::Runner::default();
1681 executor.start(|context| async move {
1682 let mut hasher = Standard::<Sha256>::new();
1683
1684 let sync_cfg = SyncConfig::<Digest> {
1686 config: test_config(),
1687 lower_bound: 0,
1688 upper_bound: 100,
1689 pinned_nodes: None,
1690 };
1691
1692 let sync_mmr = Mmr::<_, Sha256>::init_sync(context.clone(), sync_cfg)
1693 .await
1694 .unwrap();
1695
1696 assert_eq!(sync_mmr.size(), 0);
1698 assert_eq!(sync_mmr.pruned_to_pos(), 0);
1699 assert_eq!(sync_mmr.oldest_retained_pos(), None);
1700
1701 let mut sync_mmr = sync_mmr;
1703 let new_element = test_digest(999);
1704 sync_mmr.add(&mut hasher, &new_element).await.unwrap();
1705
1706 let _root = sync_mmr.root(&mut hasher);
1708
1709 sync_mmr.destroy().await.unwrap();
1710 });
1711 }
1712
1713 #[test_traced]
1715 fn test_journaled_mmr_init_sync_nonempty_exact_match() {
1716 let executor = deterministic::Runner::default();
1717 executor.start(|context| async move {
1718 let mut hasher = Standard::<Sha256>::new();
1719
1720 let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1722 .await
1723 .unwrap();
1724 for i in 0..50 {
1725 mmr.add(&mut hasher, &test_digest(i)).await.unwrap();
1726 }
1727 mmr.sync(&mut hasher).await.unwrap();
1728 let original_size = mmr.size();
1729 let original_root = mmr.root(&mut hasher);
1730
1731 let lower_bound = mmr.pruned_to_pos();
1733 let upper_bound = mmr.size() - 1;
1734 let mut expected_nodes = HashMap::new();
1735 for i in lower_bound..=upper_bound {
1736 expected_nodes.insert(i, mmr.get_node(i).await.unwrap().unwrap());
1737 }
1738 let sync_cfg = SyncConfig::<Digest> {
1739 config: test_config(),
1740 lower_bound,
1741 upper_bound,
1742 pinned_nodes: None,
1743 };
1744
1745 mmr.close(&mut hasher).await.unwrap();
1746
1747 let sync_mmr = Mmr::<_, Sha256>::init_sync(context.clone(), sync_cfg)
1748 .await
1749 .unwrap();
1750
1751 assert_eq!(sync_mmr.size(), original_size);
1753 assert_eq!(sync_mmr.pruned_to_pos(), lower_bound);
1754 assert_eq!(sync_mmr.oldest_retained_pos(), Some(lower_bound));
1755 assert_eq!(sync_mmr.root(&mut hasher), original_root);
1756 for i in lower_bound..=upper_bound {
1757 assert_eq!(
1758 sync_mmr.get_node(i).await.unwrap(),
1759 expected_nodes.get(&i).cloned()
1760 );
1761 }
1762
1763 sync_mmr.destroy().await.unwrap();
1764 });
1765 }
1766
1767 #[test_traced]
1769 fn test_journaled_mmr_init_sync_partial_overlap() {
1770 let executor = deterministic::Runner::default();
1771 executor.start(|context| async move {
1772 let mut hasher = Standard::<Sha256>::new();
1773
1774 let mut mmr = Mmr::init(context.clone(), &mut hasher, test_config())
1776 .await
1777 .unwrap();
1778 for i in 0..30 {
1779 mmr.add(&mut hasher, &test_digest(i)).await.unwrap();
1780 }
1781 mmr.sync(&mut hasher).await.unwrap();
1782 mmr.prune_to_pos(&mut hasher, 10).await.unwrap();
1783
1784 let original_size = mmr.size();
1785 let original_root = mmr.root(&mut hasher);
1786 let original_pruned_to = mmr.pruned_to_pos();
1787
1788 let lower_bound = original_pruned_to;
1790 let upper_bound = original_size + 10; let mut expected_nodes = HashMap::new();
1793 for i in lower_bound..original_size {
1794 expected_nodes.insert(i, mmr.get_node(i).await.unwrap().unwrap());
1795 }
1796
1797 let sync_cfg = SyncConfig::<Digest> {
1798 config: test_config(),
1799 lower_bound,
1800 upper_bound,
1801 pinned_nodes: None,
1802 };
1803
1804 mmr.close(&mut hasher).await.unwrap();
1805
1806 let sync_mmr = Mmr::<_, Sha256>::init_sync(context.clone(), sync_cfg)
1807 .await
1808 .unwrap();
1809
1810 assert_eq!(sync_mmr.size(), original_size);
1812 assert_eq!(sync_mmr.pruned_to_pos(), lower_bound);
1813 assert_eq!(sync_mmr.oldest_retained_pos(), Some(lower_bound));
1814 assert_eq!(sync_mmr.root(&mut hasher), original_root);
1815
1816 for i in lower_bound..original_size {
1818 assert_eq!(
1819 sync_mmr.get_node(i).await.unwrap(),
1820 expected_nodes.get(&i).cloned()
1821 );
1822 }
1823
1824 sync_mmr.destroy().await.unwrap();
1825 });
1826 }
1827}