1use crate::{
9 journal::{
10 contiguous::{fixed, variable, Contiguous, Many, Mutable, Reader},
11 Error as JournalError,
12 },
13 merkle::{
14 self, batch,
15 hasher::{Hasher as _, Standard as StandardHasher},
16 journaled::Journaled,
17 Family, Location, Position, Proof, Readable,
18 },
19 Context, Persistable,
20};
21use alloc::{
22 sync::{Arc, Weak},
23 vec::Vec,
24};
25use commonware_codec::{CodecFixedShared, CodecShared, Encode, EncodeShared};
26use commonware_cryptography::{Digest, Hasher};
27use core::num::NonZeroU64;
28use futures::{future::try_join_all, try_join, TryFutureExt as _};
29use thiserror::Error;
30use tracing::{debug, warn};
31
32#[derive(Error, Debug)]
34pub enum Error<F: Family> {
35 #[error("merkle error: {0}")]
36 Merkle(#[from] merkle::Error<F>),
37
38 #[error("journal error: {0}")]
39 Journal(#[from] super::Error),
40}
41
42pub struct UnmerkleizedBatch<F: Family, H: Hasher, Item: Send + Sync> {
45 inner: batch::UnmerkleizedBatch<F, H::Digest>,
47 hasher: StandardHasher<H>,
49 items: Vec<Item>,
51 parent: Option<Arc<MerkleizedBatch<F, H::Digest, Item>>>,
53}
54
55impl<F: Family, H: Hasher, Item: Encode + Send + Sync> UnmerkleizedBatch<F, H, Item> {
56 #[allow(clippy::should_implement_trait)]
58 pub fn add(mut self, item: Item) -> Self {
59 let encoded = item.encode();
60 self.inner = self.inner.add(&self.hasher, &encoded);
61 self.items.push(item);
62 self
63 }
64
65 fn collect_ancestor_items(
67 parent: &Option<Arc<MerkleizedBatch<F, H::Digest, Item>>>,
68 ) -> Vec<Arc<Vec<Item>>> {
69 let Some(parent) = parent else {
70 return Vec::new();
71 };
72 let mut items = Vec::new();
73 if !parent.items.is_empty() {
74 items.push(Arc::clone(&parent.items));
75 }
76 let mut current = parent.parent.as_ref().and_then(Weak::upgrade);
77 while let Some(batch) = current {
78 if !batch.items.is_empty() {
79 items.push(Arc::clone(&batch.items));
80 }
81 current = batch.parent.as_ref().and_then(Weak::upgrade);
82 }
83 items.reverse();
84 items
85 }
86
87 pub fn merkleize(
90 self,
91 base: &merkle::mem::Mem<F, H::Digest>,
92 ) -> Arc<MerkleizedBatch<F, H::Digest, Item>> {
93 let merkle = self.inner.merkleize(base, &self.hasher);
94 let ancestor_items = Self::collect_ancestor_items(&self.parent);
95 Arc::new(MerkleizedBatch {
96 inner: merkle,
97 items: Arc::new(self.items),
98 parent: self.parent.as_ref().map(Arc::downgrade),
99 ancestor_items,
100 })
101 }
102
103 pub(crate) fn merkleize_with(
114 mut self,
115 base: &merkle::mem::Mem<F, H::Digest>,
116 items: Arc<Vec<Item>>,
117 ) -> Arc<MerkleizedBatch<F, H::Digest, Item>> {
118 assert!(
119 self.items.is_empty(),
120 "merkleize_with expects no items added via add"
121 );
122
123 #[cfg(feature = "std")]
124 if let Some(pool) = self
125 .inner
126 .pool()
127 .filter(|_| items.len() >= batch::MIN_TO_PARALLELIZE)
128 {
129 use rayon::prelude::*;
132
133 let starting_leaves = self.inner.leaves();
134 let digests: Vec<H::Digest> = pool.install(|| {
135 items
136 .par_iter()
137 .enumerate()
138 .map_init(
139 || self.hasher.clone(),
140 |h, (i, item)| {
141 let loc = Location::<F>::new(*starting_leaves + i as u64);
142 let pos = Position::try_from(loc).expect("valid leaf location");
143 h.leaf_digest(pos, &item.encode())
144 },
145 )
146 .collect()
147 });
148 for digest in digests {
149 self.inner = self.inner.add_leaf_digest(digest);
150 }
151 } else {
152 for item in &*items {
153 let encoded = item.encode();
154 self.inner = self.inner.add(&self.hasher, &encoded);
155 }
156 }
157
158 #[cfg(not(feature = "std"))]
159 for item in &*items {
160 let encoded = item.encode();
161 self.inner = self.inner.add(&self.hasher, &encoded);
162 }
163
164 let merkle = self.inner.merkleize(base, &self.hasher);
165 let ancestor_items = Self::collect_ancestor_items(&self.parent);
166 Arc::new(MerkleizedBatch {
167 inner: merkle,
168 items,
169 parent: self.parent.as_ref().map(Arc::downgrade),
170 ancestor_items,
171 })
172 }
173}
174
175#[derive(Clone, Debug)]
177pub struct MerkleizedBatch<F: Family, D: Digest, Item: Send + Sync> {
178 pub(crate) inner: Arc<batch::MerkleizedBatch<F, D>>,
180 items: Arc<Vec<Item>>,
182 parent: Option<Weak<Self>>,
184 pub(crate) ancestor_items: Vec<Arc<Vec<Item>>>,
186}
187
188impl<F: Family, D: Digest, Item: Send + Sync> MerkleizedBatch<F, D, Item> {
189 pub fn root(&self) -> D {
191 self.inner.root()
192 }
193
194 pub(crate) fn size(&self) -> u64 {
196 *self.inner.leaves()
197 }
198
199 pub(crate) const fn items(&self) -> &Arc<Vec<Item>> {
201 &self.items
202 }
203
204 pub fn new_batch<H: Hasher<Digest = D>>(self: &Arc<Self>) -> UnmerkleizedBatch<F, H, Item>
210 where
211 Item: Encode,
212 {
213 UnmerkleizedBatch {
214 inner: self.inner.new_batch(),
215 hasher: StandardHasher::new(),
216 items: Vec::new(),
217 parent: Some(Arc::clone(self)),
218 }
219 }
220}
221
222impl<F: Family, D: Digest, Item: Send + Sync> Readable for MerkleizedBatch<F, D, Item> {
223 type Family = F;
224 type Digest = D;
225 type Error = merkle::Error<F>;
226
227 fn size(&self) -> Position<F> {
228 self.inner.size()
229 }
230
231 fn get_node(&self, pos: Position<F>) -> Option<D> {
232 self.inner.get_node(pos)
233 }
234
235 fn root(&self) -> D {
236 self.inner.root()
237 }
238
239 fn pruning_boundary(&self) -> Location<F> {
240 self.inner.pruning_boundary()
241 }
242
243 fn proof(
244 &self,
245 hasher: &impl crate::merkle::hasher::Hasher<F, Digest = D>,
246 loc: Location<F>,
247 ) -> Result<Proof<F, D>, merkle::Error<F>> {
248 self.inner.proof(hasher, loc)
249 }
250
251 fn range_proof(
252 &self,
253 hasher: &impl crate::merkle::hasher::Hasher<F, Digest = D>,
254 range: core::ops::Range<Location<F>>,
255 ) -> Result<Proof<F, D>, merkle::Error<F>> {
256 self.inner.range_proof(hasher, range)
257 }
258}
259
260pub struct Journal<F, E, C, H>
265where
266 F: Family,
267 E: Context,
268 C: Contiguous<Item: EncodeShared>,
269 H: Hasher,
270{
271 pub(crate) merkle: Journaled<F, E, H::Digest>,
274
275 pub(crate) journal: C,
278
279 pub(crate) hasher: StandardHasher<H>,
280}
281
282impl<F, E, C, H> Journal<F, E, C, H>
283where
284 F: Family,
285 E: Context,
286 C: Contiguous<Item: EncodeShared>,
287 H: Hasher,
288{
289 pub async fn size(&self) -> Location<F> {
291 Location::new(self.journal.size().await)
292 }
293
294 pub fn root(&self) -> H::Digest {
296 self.merkle.root()
297 }
298
299 pub fn new_batch(&self) -> UnmerkleizedBatch<F, H, C::Item>
301 where
302 C::Item: Encode,
303 {
304 let root = self.merkle.to_batch();
305 UnmerkleizedBatch {
306 inner: root.new_batch(),
307 hasher: StandardHasher::new(),
308 items: Vec::new(),
309 parent: None,
310 }
311 }
312
313 pub(crate) fn with_mem<R>(&self, f: impl FnOnce(&merkle::mem::Mem<F, H::Digest>) -> R) -> R {
315 self.merkle.with_mem(f)
316 }
317
318 pub(crate) fn to_merkleized_batch(&self) -> Arc<MerkleizedBatch<F, H::Digest, C::Item>> {
323 Arc::new(MerkleizedBatch {
324 inner: self.merkle.to_batch(),
325 items: Arc::new(Vec::new()),
326 parent: None,
327 ancestor_items: Vec::new(),
328 })
329 }
330}
331
332impl<F, E, C, H> Journal<F, E, C, H>
333where
334 F: Family,
335 E: Context,
336 C: Contiguous<Item: EncodeShared> + Persistable<Error = JournalError>,
337 H: Hasher,
338{
339 pub async fn commit(&self) -> Result<(), Error<F>> {
342 self.journal.commit().await.map_err(Error::Journal)
343 }
344}
345
346impl<F, E, C, H> Journal<F, E, C, H>
347where
348 F: Family,
349 E: Context,
350 C: Mutable<Item: EncodeShared>,
351 H: Hasher,
352{
353 pub async fn from_components(
356 mut merkle: Journaled<F, E, H::Digest>,
357 journal: C,
358 hasher: StandardHasher<H>,
359 apply_batch_size: u64,
360 ) -> Result<Self, Error<F>> {
361 Self::align(&mut merkle, &journal, &hasher, apply_batch_size).await?;
362
363 merkle.sync().await?;
366
367 Ok(Self {
368 merkle,
369 journal,
370 hasher,
371 })
372 }
373
374 async fn align(
379 merkle: &mut Journaled<F, E, H::Digest>,
380 journal: &C,
381 hasher: &StandardHasher<H>,
382 apply_batch_size: u64,
383 ) -> Result<(), Error<F>> {
384 let journal_size = journal.size().await;
386 let mut merkle_leaves = merkle.leaves();
387 if merkle_leaves > journal_size {
388 let rewind_count = merkle_leaves - journal_size;
389 warn!(
390 journal_size,
391 ?rewind_count,
392 "rewinding Merkle structure to match journal"
393 );
394 merkle.rewind(*rewind_count as usize, hasher).await?;
395 merkle_leaves = Location::new(journal_size);
396 }
397
398 if merkle_leaves < journal_size {
400 let replay_count = journal_size - *merkle_leaves;
401 warn!(
402 ?journal_size,
403 replay_count, "Merkle structure lags behind journal, replaying journal to catch up"
404 );
405
406 let reader = journal.reader().await;
407 while merkle_leaves < journal_size {
408 let batch = {
409 let mut batch = merkle.new_batch();
410 let mut count = 0u64;
411 while count < apply_batch_size && merkle_leaves < journal_size {
412 let op = reader.read(*merkle_leaves).await?;
413 batch = batch.add(hasher, &op.encode());
414 merkle_leaves += 1;
415 count += 1;
416 }
417 batch
418 };
419 let batch = merkle.with_mem(|mem| batch.merkleize(mem, hasher));
420 merkle.apply_batch(&batch)?;
421 }
422 return Ok(());
423 }
424
425 assert_eq!(journal.size().await, *merkle.leaves());
427
428 Ok(())
429 }
430
431 pub async fn append(&mut self, item: &C::Item) -> Result<Location<F>, Error<F>> {
433 let encoded_item = item.encode();
434
435 let loc = self.journal.append(item).await?;
437 let unmerkleized_batch = self.merkle.new_batch().add(&self.hasher, &encoded_item);
438 let batch = self
439 .merkle
440 .with_mem(|mem| unmerkleized_batch.merkleize(mem, &self.hasher));
441 self.merkle.apply_batch(&batch)?;
442
443 Ok(Location::new(loc))
444 }
445
446 pub async fn apply_batch(
453 &mut self,
454 batch: &MerkleizedBatch<F, H::Digest, C::Item>,
455 ) -> Result<(), Error<F>> {
456 let merkle_size = self.merkle.size();
457 let base_size = batch.inner.base_size();
458
459 let skip_ancestors = if merkle_size == base_size {
465 false
466 } else if merkle_size > base_size && merkle_size < batch.inner.size() {
467 true
468 } else {
469 return Err(merkle::Error::StaleBatch {
472 expected: base_size,
473 actual: merkle_size,
474 }
475 .into());
476 };
477
478 let committed_leaves = self.journal.size().await;
483 let base_leaves = *Location::<F>::try_from(base_size)?;
484 let mut batch_leaf_end = base_leaves;
485 let mut batches: Vec<&[C::Item]> = Vec::with_capacity(batch.ancestor_items.len() + 1);
486 for ancestor in &batch.ancestor_items {
487 batch_leaf_end += ancestor.len() as u64;
488 if skip_ancestors && batch_leaf_end <= committed_leaves {
489 continue;
490 }
491 batches.push(ancestor);
492 }
493 if !batch.items.is_empty() {
494 batches.push(&batch.items);
495 }
496 if !batches.is_empty() {
497 self.journal.append_many(Many::Nested(&batches)).await?;
498 }
499
500 self.merkle.apply_batch(&batch.inner)?;
501 assert_eq!(*self.merkle.leaves(), self.journal.size().await);
502 Ok(())
503 }
504
505 pub async fn prune(&mut self, prune_loc: Location<F>) -> Result<Location<F>, Error<F>> {
510 if self.merkle.size() == 0 {
511 return Ok(Location::new(self.reader().await.bounds().start));
513 }
514
515 self.merkle.sync().await?;
519
520 if !self.journal.prune(*prune_loc).await? {
522 return Ok(Location::new(self.reader().await.bounds().start));
523 }
524
525 let bounds = self.reader().await.bounds();
526 debug!(size = ?bounds.end, ?prune_loc, boundary = ?bounds.start, "pruned inactive ops");
527
528 self.merkle.prune(Location::from(bounds.start)).await?;
530
531 Ok(Location::new(bounds.start))
532 }
533}
534
535impl<F, E, C, H> Journal<F, E, C, H>
536where
537 F: Family,
538 E: Context,
539 C: Contiguous<Item: EncodeShared>,
540 H: Hasher,
541{
542 pub async fn proof(
556 &self,
557 start_loc: Location<F>,
558 max_ops: NonZeroU64,
559 ) -> Result<(Proof<F, H::Digest>, Vec<C::Item>), Error<F>> {
560 self.historical_proof(self.size().await, start_loc, max_ops)
561 .await
562 }
563
564 pub async fn historical_proof(
577 &self,
578 historical_leaves: Location<F>,
579 start_loc: Location<F>,
580 max_ops: NonZeroU64,
581 ) -> Result<(Proof<F, H::Digest>, Vec<C::Item>), Error<F>> {
582 let reader = self.journal.reader().await;
584 let bounds = reader.bounds();
585
586 if *historical_leaves > bounds.end {
587 return Err(merkle::Error::RangeOutOfBounds(Location::new(bounds.end)).into());
588 }
589 if start_loc >= historical_leaves {
590 return Err(merkle::Error::RangeOutOfBounds(start_loc).into());
591 }
592
593 let end_loc = std::cmp::min(historical_leaves, start_loc.saturating_add(max_ops.get()));
594
595 let hasher = self.hasher.clone();
596 let proof = self
597 .merkle
598 .historical_range_proof(&hasher, historical_leaves, start_loc..end_loc)
599 .await?;
600
601 let mut ops = Vec::with_capacity((*end_loc - *start_loc) as usize);
602 let futures = (*start_loc..*end_loc)
603 .map(|i| reader.read(i))
604 .collect::<Vec<_>>();
605 try_join_all(futures)
606 .await?
607 .into_iter()
608 .for_each(|op| ops.push(op));
609
610 Ok((proof, ops))
611 }
612}
613
614impl<F, E, C, H> Journal<F, E, C, H>
615where
616 F: Family,
617 E: Context,
618 C: Contiguous<Item: EncodeShared> + Persistable<Error = JournalError>,
619 H: Hasher,
620{
621 pub async fn destroy(self) -> Result<(), Error<F>> {
623 try_join!(
624 self.journal.destroy().map_err(Error::Journal),
625 self.merkle.destroy().map_err(Error::Merkle),
626 )?;
627
628 Ok(())
629 }
630
631 pub async fn sync(&self) -> Result<(), Error<F>> {
633 try_join!(
634 self.journal.sync().map_err(Error::Journal),
635 self.merkle.sync().map_err(Error::Merkle)
636 )?;
637
638 Ok(())
639 }
640}
641
642const APPLY_BATCH_SIZE: u64 = 1 << 16;
644
645macro_rules! impl_journal_new {
648 ($journal_mod:ident, $cfg_ty:ty, $codec_bound:path) => {
649 impl<F, E, O, H> Journal<F, E, $journal_mod::Journal<E, O>, H>
650 where
651 F: Family,
652 E: Context,
653 O: $codec_bound,
654 H: Hasher,
655 {
656 pub async fn new(
661 context: E,
662 merkle_cfg: merkle::journaled::Config,
663 journal_cfg: $cfg_ty,
664 rewind_predicate: fn(&O) -> bool,
665 ) -> Result<Self, Error<F>> {
666 let mut journal =
667 $journal_mod::Journal::init(context.with_label("journal"), journal_cfg).await?;
668 journal.rewind_to(rewind_predicate).await?;
669
670 let hasher = StandardHasher::<H>::new();
671 let mut merkle =
672 Journaled::init(context.with_label("merkle"), &hasher, merkle_cfg).await?;
673 Self::align(&mut merkle, &journal, &hasher, APPLY_BATCH_SIZE).await?;
674
675 journal.sync().await?;
676 merkle.sync().await?;
677
678 Ok(Self {
679 merkle,
680 journal,
681 hasher,
682 })
683 }
684 }
685 };
686}
687
688impl_journal_new!(fixed, fixed::Config, CodecFixedShared);
689impl_journal_new!(variable, variable::Config<O::Cfg>, CodecShared);
690
691impl<F, E, C, H> Contiguous for Journal<F, E, C, H>
692where
693 F: Family,
694 E: Context,
695 C: Contiguous<Item: EncodeShared>,
696 H: Hasher,
697{
698 type Item = C::Item;
699
700 async fn reader(&self) -> impl Reader<Item = C::Item> + '_ {
701 self.journal.reader().await
702 }
703
704 async fn size(&self) -> u64 {
705 self.journal.size().await
706 }
707}
708
709impl<F, E, C, H> Mutable for Journal<F, E, C, H>
710where
711 F: Family,
712 E: Context,
713 C: Mutable<Item: EncodeShared>,
714 H: Hasher,
715{
716 async fn append(&mut self, item: &Self::Item) -> Result<u64, JournalError> {
717 let res = self.append(item).await.map_err(|e| match e {
718 Error::Journal(inner) => inner,
719 Error::Merkle(inner) => JournalError::Merkle(anyhow::Error::from(inner)),
720 })?;
721
722 Ok(*res)
723 }
724
725 async fn prune(&mut self, min_position: u64) -> Result<bool, JournalError> {
726 self.journal.prune(min_position).await
727 }
728
729 async fn rewind(&mut self, size: u64) -> Result<(), JournalError> {
730 self.journal.rewind(size).await?;
731
732 let leaves = *self.merkle.leaves();
733 if leaves > size {
734 self.merkle
735 .rewind((leaves - size) as usize, &self.hasher)
736 .await
737 .map_err(|error| JournalError::Merkle(anyhow::Error::from(error)))?;
738 }
739
740 Ok(())
741 }
742}
743
744pub trait Inner<E: Context>: Mutable + Persistable<Error = JournalError> {
746 type Config: Clone + Send;
748
749 fn init<F: Family, H: Hasher>(
751 context: E,
752 merkle_cfg: merkle::journaled::Config,
753 journal_cfg: Self::Config,
754 rewind_predicate: fn(&Self::Item) -> bool,
755 ) -> impl core::future::Future<Output = Result<Journal<F, E, Self, H>, Error<F>>> + Send
756 where
757 Self: Sized,
758 Self::Item: EncodeShared;
759}
760
761impl<F, E, C, H> Persistable for Journal<F, E, C, H>
762where
763 F: Family,
764 E: Context,
765 C: Contiguous<Item: EncodeShared> + Persistable<Error = JournalError>,
766 H: Hasher,
767{
768 type Error = JournalError;
769
770 async fn commit(&self) -> Result<(), JournalError> {
771 self.commit().await.map_err(|e| match e {
772 Error::Journal(inner) => inner,
773 Error::Merkle(inner) => JournalError::Merkle(anyhow::Error::from(inner)),
774 })
775 }
776
777 async fn sync(&self) -> Result<(), JournalError> {
778 self.sync().await.map_err(|e| match e {
779 Error::Journal(inner) => inner,
780 Error::Merkle(inner) => JournalError::Merkle(anyhow::Error::from(inner)),
781 })
782 }
783
784 async fn destroy(self) -> Result<(), JournalError> {
785 self.destroy().await.map_err(|e| match e {
786 Error::Journal(inner) => inner,
787 Error::Merkle(inner) => JournalError::Merkle(anyhow::Error::from(inner)),
788 })
789 }
790}
791
792#[cfg(test)]
793impl<F, E, C, H> Journal<F, E, C, H>
794where
795 F: Family,
796 E: Context,
797 C: Contiguous<Item: EncodeShared>,
798 H: Hasher,
799{
800 pub(crate) async fn read(&self, loc: Location<F>) -> Result<C::Item, Error<F>> {
802 self.journal
803 .reader()
804 .await
805 .read(*loc)
806 .await
807 .map_err(Error::Journal)
808 }
809}
810
811#[cfg(test)]
812mod tests {
813 use super::*;
814 use crate::{
815 journal::contiguous::fixed::{Config as JConfig, Journal as ContiguousJournal},
816 merkle::{
817 journaled::{Config as MerkleConfig, Journaled},
818 mmb, mmr,
819 },
820 qmdb::{
821 any::{
822 operation::{update::Unordered as Update, Unordered as Op},
823 value::FixedEncoding,
824 },
825 operation::Committable,
826 },
827 };
828 use commonware_codec::Encode;
829 use commonware_cryptography::{sha256::Digest, Sha256};
830 use commonware_macros::test_traced;
831 use commonware_runtime::{
832 buffer::paged::CacheRef,
833 deterministic::{self, Context},
834 BufferPooler, Metrics, Runner as _,
835 };
836 use commonware_utils::{NZUsize, NZU16, NZU64};
837 use futures::StreamExt as _;
838 use std::num::{NonZeroU16, NonZeroUsize};
839
840 const PAGE_SIZE: NonZeroU16 = NZU16!(101);
841 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(11);
842
843 type TestOp<F> = Op<F, Digest, FixedEncoding<Digest>>;
845
846 type TestJournal<F> = Journal<
848 F,
849 deterministic::Context,
850 ContiguousJournal<deterministic::Context, TestOp<F>>,
851 Sha256,
852 >;
853
854 fn merkle_config(suffix: &str, pooler: &impl BufferPooler) -> MerkleConfig {
856 MerkleConfig {
857 journal_partition: format!("mmr-journal-{suffix}"),
858 metadata_partition: format!("mmr-metadata-{suffix}"),
859 items_per_blob: NZU64!(11),
860 write_buffer: NZUsize!(1024),
861 thread_pool: None,
862 page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
863 }
864 }
865
866 fn journal_config(suffix: &str, pooler: &impl BufferPooler) -> JConfig {
868 JConfig {
869 partition: format!("journal-{suffix}"),
870 items_per_blob: NZU64!(7),
871 write_buffer: NZUsize!(1024),
872 page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
873 }
874 }
875
876 async fn create_empty_journal<F: Family + PartialEq>(
878 context: Context,
879 suffix: &str,
880 ) -> TestJournal<F> {
881 let merkle_cfg = merkle_config(suffix, &context);
882 let journal_cfg = journal_config(suffix, &context);
883 TestJournal::<F>::new(context, merkle_cfg, journal_cfg, |op: &TestOp<F>| {
884 op.is_commit()
885 })
886 .await
887 .unwrap()
888 }
889
890 fn create_operation<F: Family + PartialEq>(index: u8) -> TestOp<F> {
892 TestOp::<F>::Update(Update(
893 Sha256::fill(index),
894 Sha256::fill(index.wrapping_add(1)),
895 ))
896 }
897
898 async fn create_journal_with_ops<F: Family + PartialEq>(
902 context: Context,
903 suffix: &str,
904 count: usize,
905 ) -> TestJournal<F> {
906 let mut journal = create_empty_journal::<F>(context, suffix).await;
907
908 for i in 0..count {
909 let op = create_operation::<F>(i as u8);
910 let loc = journal.append(&op).await.unwrap();
911 assert_eq!(loc, Location::<F>::new(i as u64));
912 }
913
914 journal.sync().await.unwrap();
915 journal
916 }
917
918 async fn create_components<F: Family + PartialEq>(
924 context: Context,
925 suffix: &str,
926 ) -> (
927 Journaled<F, deterministic::Context, Digest>,
928 ContiguousJournal<deterministic::Context, TestOp<F>>,
929 StandardHasher<Sha256>,
930 ) {
931 let hasher = StandardHasher::new();
932 let merkle = Journaled::<F, _, Digest>::init(
933 context.with_label("mmr"),
934 &hasher,
935 merkle_config(suffix, &context),
936 )
937 .await
938 .unwrap();
939 let journal = ContiguousJournal::init(
940 context.with_label("journal"),
941 journal_config(suffix, &context),
942 )
943 .await
944 .unwrap();
945 (merkle, journal, hasher)
946 }
947
948 fn verify_proof<F: Family + PartialEq>(
951 proof: &Proof<F, <Sha256 as commonware_cryptography::Hasher>::Digest>,
952 operations: &[TestOp<F>],
953 start_loc: Location<F>,
954 root: &<Sha256 as commonware_cryptography::Hasher>::Digest,
955 hasher: &StandardHasher<Sha256>,
956 ) -> bool {
957 let encoded_ops: Vec<_> = operations.iter().map(|op| op.encode()).collect();
958 proof.verify_range_inclusion(hasher, &encoded_ops, start_loc, root)
959 }
960
961 async fn test_new_creates_empty_journal_inner<F: Family + PartialEq>(context: Context) {
963 let journal = create_empty_journal::<F>(context, "new-empty").await;
964
965 let bounds = journal.reader().await.bounds();
966 assert_eq!(bounds.end, 0);
967 assert_eq!(bounds.start, 0);
968 assert!(bounds.is_empty());
969 }
970
971 #[test_traced("INFO")]
972 fn test_new_creates_empty_journal_mmr() {
973 let executor = deterministic::Runner::default();
974 executor.start(test_new_creates_empty_journal_inner::<mmr::Family>);
975 }
976
977 #[test_traced("INFO")]
978 fn test_new_creates_empty_journal_mmb() {
979 let executor = deterministic::Runner::default();
980 executor.start(test_new_creates_empty_journal_inner::<mmb::Family>);
981 }
982
983 async fn test_align_with_empty_mmr_and_journal_inner<F: Family + PartialEq>(context: Context) {
985 let (mut merkle, journal, hasher) = create_components::<F>(context, "align-empty").await;
986
987 TestJournal::<F>::align(&mut merkle, &journal, &hasher, APPLY_BATCH_SIZE)
988 .await
989 .unwrap();
990
991 assert_eq!(merkle.leaves(), Location::<F>::new(0));
992 assert_eq!(journal.size().await, 0);
993 }
994
995 #[test_traced("INFO")]
996 fn test_align_with_empty_mmr_and_journal_mmr() {
997 let executor = deterministic::Runner::default();
998 executor.start(test_align_with_empty_mmr_and_journal_inner::<mmr::Family>);
999 }
1000
1001 #[test_traced("INFO")]
1002 fn test_align_with_empty_mmr_and_journal_mmb() {
1003 let executor = deterministic::Runner::default();
1004 executor.start(test_align_with_empty_mmr_and_journal_inner::<mmb::Family>);
1005 }
1006
1007 async fn test_align_when_mmr_ahead_inner<F: Family + PartialEq>(context: Context) {
1009 let (mut merkle, journal, hasher) = create_components::<F>(context, "mmr-ahead").await;
1010
1011 {
1013 let batch = {
1014 let mut batch = merkle.new_batch();
1015 for i in 0..20 {
1016 let op = create_operation::<F>(i as u8);
1017 let encoded = op.encode();
1018 batch = batch.add(&hasher, &encoded);
1019 journal.append(&op).await.unwrap();
1020 }
1021 batch
1022 };
1023 let batch = merkle.with_mem(|mem| batch.merkleize(mem, &hasher));
1024 merkle.apply_batch(&batch).unwrap();
1025 }
1026
1027 let commit_op = TestOp::<F>::CommitFloor(None, Location::<F>::new(0));
1029 journal.append(&commit_op).await.unwrap();
1030 journal.sync().await.unwrap();
1031
1032 TestJournal::<F>::align(&mut merkle, &journal, &hasher, APPLY_BATCH_SIZE)
1034 .await
1035 .unwrap();
1036
1037 assert_eq!(merkle.leaves(), Location::<F>::new(21));
1039 assert_eq!(journal.size().await, 21);
1040 }
1041
1042 #[test_traced("WARN")]
1043 fn test_align_when_mmr_ahead_mmr() {
1044 let executor = deterministic::Runner::default();
1045 executor.start(test_align_when_mmr_ahead_inner::<mmr::Family>);
1046 }
1047
1048 #[test_traced("WARN")]
1049 fn test_align_when_mmr_ahead_mmb() {
1050 let executor = deterministic::Runner::default();
1051 executor.start(test_align_when_mmr_ahead_inner::<mmb::Family>);
1052 }
1053
1054 async fn test_align_when_journal_ahead_inner<F: Family + PartialEq>(context: Context) {
1056 let (mut merkle, journal, hasher) = create_components::<F>(context, "journal-ahead").await;
1057
1058 for i in 0..20 {
1060 let op = create_operation::<F>(i as u8);
1061 journal.append(&op).await.unwrap();
1062 }
1063
1064 let commit_op = TestOp::<F>::CommitFloor(None, Location::<F>::new(0));
1066 journal.append(&commit_op).await.unwrap();
1067 journal.sync().await.unwrap();
1068
1069 TestJournal::<F>::align(&mut merkle, &journal, &hasher, APPLY_BATCH_SIZE)
1071 .await
1072 .unwrap();
1073
1074 assert_eq!(merkle.leaves(), Location::<F>::new(21));
1076 assert_eq!(journal.size().await, 21);
1077 }
1078
1079 #[test_traced("WARN")]
1080 fn test_align_when_journal_ahead_mmr() {
1081 let executor = deterministic::Runner::default();
1082 executor.start(test_align_when_journal_ahead_inner::<mmr::Family>);
1083 }
1084
1085 #[test_traced("WARN")]
1086 fn test_align_when_journal_ahead_mmb() {
1087 let executor = deterministic::Runner::default();
1088 executor.start(test_align_when_journal_ahead_inner::<mmb::Family>);
1089 }
1090
1091 async fn test_align_with_mismatched_committed_ops_inner<F: Family + PartialEq>(
1093 context: Context,
1094 ) {
1095 let mut journal =
1096 create_empty_journal::<F>(context.with_label("first"), "mismatched").await;
1097
1098 for i in 0..20 {
1100 let loc = journal
1101 .append(&create_operation::<F>(i as u8))
1102 .await
1103 .unwrap();
1104 assert_eq!(loc, Location::<F>::new(i as u64));
1105 }
1106
1107 let size_before = journal.size().await;
1110 assert_eq!(size_before, 20);
1111
1112 journal.sync().await.unwrap();
1114 drop(journal);
1115 let journal = create_empty_journal::<F>(context.with_label("second"), "mismatched").await;
1116
1117 assert_eq!(journal.size().await, 0);
1119 }
1120
1121 #[test_traced("INFO")]
1122 fn test_align_with_mismatched_committed_ops_mmr() {
1123 let executor = deterministic::Runner::default();
1124 executor.start(|context| {
1125 test_align_with_mismatched_committed_ops_inner::<mmr::Family>(context)
1126 });
1127 }
1128
1129 #[test_traced("INFO")]
1130 fn test_align_with_mismatched_committed_ops_mmb() {
1131 let executor = deterministic::Runner::default();
1132 executor.start(|context| {
1133 test_align_with_mismatched_committed_ops_inner::<mmb::Family>(context)
1134 });
1135 }
1136
1137 async fn test_rewind_inner<F: Family + PartialEq>(context: Context) {
1138 {
1140 let mut journal = ContiguousJournal::init(
1141 context.with_label("rewind_match"),
1142 journal_config("rewind-match", &context),
1143 )
1144 .await
1145 .unwrap();
1146
1147 for i in 0..3 {
1149 journal.append(&create_operation::<F>(i)).await.unwrap();
1150 }
1151 journal
1152 .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(0)))
1153 .await
1154 .unwrap();
1155 for i in 4..7 {
1156 journal.append(&create_operation::<F>(i)).await.unwrap();
1157 }
1158
1159 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1161 assert_eq!(final_size, 4);
1162 assert_eq!(journal.size().await, 4);
1163
1164 let op = journal.read(3).await.unwrap();
1166 assert!(op.is_commit());
1167 }
1168
1169 {
1171 let mut journal = ContiguousJournal::init(
1172 context.with_label("rewind_multiple"),
1173 journal_config("rewind-multiple", &context),
1174 )
1175 .await
1176 .unwrap();
1177
1178 journal.append(&create_operation::<F>(0)).await.unwrap();
1180 journal
1181 .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(0)))
1182 .await
1183 .unwrap(); journal.append(&create_operation::<F>(2)).await.unwrap();
1185 journal
1186 .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(1)))
1187 .await
1188 .unwrap(); journal.append(&create_operation::<F>(4)).await.unwrap();
1190
1191 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1193 assert_eq!(final_size, 4);
1194
1195 let op = journal.read(3).await.unwrap();
1197 assert!(op.is_commit());
1198
1199 assert!(journal.read(4).await.is_err());
1201 }
1202
1203 {
1205 let mut journal = ContiguousJournal::init(
1206 context.with_label("rewind_no_match"),
1207 journal_config("rewind-no-match", &context),
1208 )
1209 .await
1210 .unwrap();
1211
1212 for i in 0..10 {
1214 journal.append(&create_operation::<F>(i)).await.unwrap();
1215 }
1216
1217 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1219 assert_eq!(final_size, 0, "Should rewind to pruning boundary (0)");
1220 assert_eq!(journal.size().await, 0);
1221 }
1222
1223 {
1225 let mut journal = ContiguousJournal::init(
1226 context.with_label("rewind_with_pruning"),
1227 journal_config("rewind-with-pruning", &context),
1228 )
1229 .await
1230 .unwrap();
1231
1232 for i in 0..10 {
1234 journal.append(&create_operation::<F>(i)).await.unwrap();
1235 }
1236 journal
1237 .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(0)))
1238 .await
1239 .unwrap(); for i in 11..15 {
1241 journal.append(&create_operation::<F>(i)).await.unwrap();
1242 }
1243 journal.sync().await.unwrap();
1244
1245 journal.prune(8).await.unwrap();
1247 assert_eq!(journal.reader().await.bounds().start, 7);
1248
1249 for i in 15..20 {
1251 journal.append(&create_operation::<F>(i)).await.unwrap();
1252 }
1253
1254 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1256 assert_eq!(final_size, 11);
1257
1258 let op = journal.read(10).await.unwrap();
1260 assert!(op.is_commit());
1261 }
1262
1263 {
1265 let mut journal = ContiguousJournal::init(
1266 context.with_label("rewind_no_match_pruned"),
1267 journal_config("rewind-no-match-pruned", &context),
1268 )
1269 .await
1270 .unwrap();
1271
1272 for i in 0..5 {
1274 journal.append(&create_operation::<F>(i)).await.unwrap();
1275 }
1276 journal
1277 .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(0)))
1278 .await
1279 .unwrap(); for i in 6..10 {
1281 journal.append(&create_operation::<F>(i)).await.unwrap();
1282 }
1283 journal.sync().await.unwrap();
1284
1285 journal.prune(8).await.unwrap();
1288 assert_eq!(journal.reader().await.bounds().start, 7);
1289
1290 for i in 10..14 {
1292 journal.append(&create_operation::<F>(i)).await.unwrap();
1293 }
1294
1295 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1298 assert_eq!(final_size, 7);
1299 }
1300
1301 {
1303 let mut journal = ContiguousJournal::init(
1304 context.with_label("rewind_empty"),
1305 journal_config("rewind-empty", &context),
1306 )
1307 .await
1308 .unwrap();
1309
1310 let final_size = journal
1312 .rewind_to(|op: &TestOp<F>| op.is_commit())
1313 .await
1314 .unwrap();
1315 assert_eq!(final_size, 0);
1316 assert_eq!(journal.size().await, 0);
1317 }
1318
1319 {
1321 let merkle_cfg = merkle_config("rewind", &context);
1322 let journal_cfg = journal_config("rewind", &context);
1323 let mut journal =
1324 TestJournal::<F>::new(context, merkle_cfg, journal_cfg, |op| op.is_commit())
1325 .await
1326 .unwrap();
1327
1328 for i in 0..5 {
1330 journal.append(&create_operation::<F>(i)).await.unwrap();
1331 }
1332 journal
1333 .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(0)))
1334 .await
1335 .unwrap(); for i in 6..10 {
1337 journal.append(&create_operation::<F>(i)).await.unwrap();
1338 }
1339 assert_eq!(journal.size().await, 10);
1340
1341 journal.rewind(2).await.unwrap();
1342 assert_eq!(journal.size().await, 2);
1343 assert_eq!(journal.merkle.leaves(), 2);
1344 assert_eq!(journal.merkle.size(), 3);
1345 let bounds = journal.reader().await.bounds();
1346 assert_eq!(bounds.start, 0);
1347 assert!(!bounds.is_empty());
1348
1349 assert!(matches!(
1350 journal.rewind(3).await,
1351 Err(JournalError::InvalidRewind(_))
1352 ));
1353
1354 journal.rewind(0).await.unwrap();
1355 assert_eq!(journal.size().await, 0);
1356 assert_eq!(journal.merkle.leaves(), 0);
1357 assert_eq!(journal.merkle.size(), 0);
1358 let bounds = journal.reader().await.bounds();
1359 assert_eq!(bounds.start, 0);
1360 assert!(bounds.is_empty());
1361
1362 for i in 0..255 {
1364 journal.append(&create_operation::<F>(i)).await.unwrap();
1365 }
1366 journal.prune(Location::<F>::new(100)).await.unwrap();
1367 assert_eq!(journal.reader().await.bounds().start, 98);
1368 let res = journal.rewind(97).await;
1369 assert!(matches!(res, Err(JournalError::InvalidRewind(97))));
1370 journal.rewind(98).await.unwrap();
1371 let bounds = journal.reader().await.bounds();
1372 assert_eq!(bounds.end, 98);
1373 assert_eq!(journal.merkle.leaves(), 98);
1374 assert_eq!(bounds.start, 98);
1375 assert!(bounds.is_empty());
1376 }
1377 }
1378
1379 #[test_traced("INFO")]
1380 fn test_rewind_mmr() {
1381 let executor = deterministic::Runner::default();
1382 executor.start(test_rewind_inner::<mmr::Family>);
1383 }
1384
1385 #[test_traced("INFO")]
1386 fn test_rewind_mmb() {
1387 let executor = deterministic::Runner::default();
1388 executor.start(test_rewind_inner::<mmb::Family>);
1389 }
1390
1391 async fn test_apply_op_and_read_operations_inner<F: Family + PartialEq>(context: Context) {
1394 let mut journal = create_empty_journal::<F>(context, "apply_op").await;
1395
1396 assert_eq!(journal.size().await, 0);
1397
1398 let expected_ops: Vec<_> = (0..50).map(|i| create_operation::<F>(i as u8)).collect();
1400 for (i, op) in expected_ops.iter().enumerate() {
1401 let loc = journal.append(op).await.unwrap();
1402 assert_eq!(loc, Location::<F>::new(i as u64));
1403 assert_eq!(journal.size().await, (i + 1) as u64);
1404 }
1405
1406 assert_eq!(journal.size().await, 50);
1407
1408 journal.sync().await.unwrap();
1410 for (i, expected_op) in expected_ops.iter().enumerate() {
1411 let read_op = journal.read(Location::<F>::new(i as u64)).await.unwrap();
1412 assert_eq!(read_op, *expected_op);
1413 }
1414 }
1415
1416 #[test_traced("INFO")]
1417 fn test_apply_op_and_read_operations_mmr() {
1418 let executor = deterministic::Runner::default();
1419 executor.start(test_apply_op_and_read_operations_inner::<mmr::Family>);
1420 }
1421
1422 #[test_traced("INFO")]
1423 fn test_apply_op_and_read_operations_mmb() {
1424 let executor = deterministic::Runner::default();
1425 executor.start(test_apply_op_and_read_operations_inner::<mmb::Family>);
1426 }
1427
1428 async fn test_read_operations_at_various_positions_inner<F: Family + PartialEq>(
1430 context: Context,
1431 ) {
1432 let journal = create_journal_with_ops::<F>(context, "read", 50).await;
1433
1434 let first_op = journal.read(Location::<F>::new(0)).await.unwrap();
1436 assert_eq!(first_op, create_operation::<F>(0));
1437
1438 let middle_op = journal.read(Location::<F>::new(25)).await.unwrap();
1440 assert_eq!(middle_op, create_operation::<F>(25));
1441
1442 let last_op = journal.read(Location::<F>::new(49)).await.unwrap();
1444 assert_eq!(last_op, create_operation::<F>(49));
1445
1446 for i in 0..50 {
1448 let op = journal.read(Location::<F>::new(i)).await.unwrap();
1449 assert_eq!(op, create_operation::<F>(i as u8));
1450 }
1451 }
1452
1453 #[test_traced("INFO")]
1454 fn test_read_operations_at_various_positions_mmr() {
1455 let executor = deterministic::Runner::default();
1456 executor.start(|context| {
1457 test_read_operations_at_various_positions_inner::<mmr::Family>(context)
1458 });
1459 }
1460
1461 #[test_traced("INFO")]
1462 fn test_read_operations_at_various_positions_mmb() {
1463 let executor = deterministic::Runner::default();
1464 executor.start(|context| {
1465 test_read_operations_at_various_positions_inner::<mmb::Family>(context)
1466 });
1467 }
1468
1469 async fn test_read_pruned_operation_returns_error_inner<F: Family + PartialEq>(
1471 context: Context,
1472 ) {
1473 let mut journal = create_journal_with_ops::<F>(context, "read_pruned", 100).await;
1474
1475 journal
1477 .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(50)))
1478 .await
1479 .unwrap();
1480 journal.sync().await.unwrap();
1481 let pruned_boundary = journal.prune(Location::<F>::new(50)).await.unwrap();
1482
1483 let read_loc = Location::<F>::new(0);
1485 if read_loc < pruned_boundary {
1486 let result = journal.read(read_loc).await;
1487 assert!(matches!(
1488 result,
1489 Err(Error::Journal(crate::journal::Error::ItemPruned(_)))
1490 ));
1491 }
1492 }
1493
1494 #[test_traced("INFO")]
1495 fn test_read_pruned_operation_returns_error_mmr() {
1496 let executor = deterministic::Runner::default();
1497 executor.start(|context| {
1498 test_read_pruned_operation_returns_error_inner::<mmr::Family>(context)
1499 });
1500 }
1501
1502 #[test_traced("INFO")]
1503 fn test_read_pruned_operation_returns_error_mmb() {
1504 let executor = deterministic::Runner::default();
1505 executor.start(|context| {
1506 test_read_pruned_operation_returns_error_inner::<mmb::Family>(context)
1507 });
1508 }
1509
1510 async fn test_read_out_of_range_returns_error_inner<F: Family + PartialEq>(context: Context) {
1512 let journal = create_journal_with_ops::<F>(context, "read_oob", 3).await;
1513
1514 let result = journal.read(Location::<F>::new(10)).await;
1516 assert!(matches!(
1517 result,
1518 Err(Error::Journal(crate::journal::Error::ItemOutOfRange(_)))
1519 ));
1520 }
1521
1522 #[test_traced("INFO")]
1523 fn test_read_out_of_range_returns_error_mmr() {
1524 let executor = deterministic::Runner::default();
1525 executor.start(test_read_out_of_range_returns_error_inner::<mmr::Family>);
1526 }
1527
1528 #[test_traced("INFO")]
1529 fn test_read_out_of_range_returns_error_mmb() {
1530 let executor = deterministic::Runner::default();
1531 executor.start(test_read_out_of_range_returns_error_inner::<mmb::Family>);
1532 }
1533
1534 async fn test_read_all_operations_back_correctly_inner<F: Family + PartialEq>(
1536 context: Context,
1537 ) {
1538 let journal = create_journal_with_ops::<F>(context, "read_all", 50).await;
1539
1540 assert_eq!(journal.size().await, 50);
1541
1542 for i in 0..50 {
1544 let op = journal.read(Location::<F>::new(i)).await.unwrap();
1545 assert_eq!(op, create_operation::<F>(i as u8));
1546 }
1547 }
1548
1549 #[test_traced("INFO")]
1550 fn test_read_all_operations_back_correctly_mmr() {
1551 let executor = deterministic::Runner::default();
1552 executor.start(test_read_all_operations_back_correctly_inner::<mmr::Family>);
1553 }
1554
1555 #[test_traced("INFO")]
1556 fn test_read_all_operations_back_correctly_mmb() {
1557 let executor = deterministic::Runner::default();
1558 executor.start(test_read_all_operations_back_correctly_inner::<mmb::Family>);
1559 }
1560
1561 async fn test_sync_inner<F: Family + PartialEq>(context: Context) {
1563 let mut journal =
1564 create_empty_journal::<F>(context.with_label("first"), "close_pending").await;
1565
1566 let expected_ops: Vec<_> = (0..20).map(|i| create_operation::<F>(i as u8)).collect();
1568 for (i, op) in expected_ops.iter().enumerate() {
1569 let loc = journal.append(op).await.unwrap();
1570 assert_eq!(loc, Location::<F>::new(i as u64),);
1571 }
1572
1573 let commit_loc = journal
1575 .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(0)))
1576 .await
1577 .unwrap();
1578 assert_eq!(
1579 commit_loc,
1580 Location::<F>::new(20),
1581 "commit should be at location 20"
1582 );
1583 journal.sync().await.unwrap();
1584
1585 drop(journal);
1587 let journal =
1588 create_empty_journal::<F>(context.with_label("second"), "close_pending").await;
1589 assert_eq!(journal.size().await, 21);
1590
1591 for (i, expected_op) in expected_ops.iter().enumerate() {
1593 let read_op = journal.read(Location::<F>::new(i as u64)).await.unwrap();
1594 assert_eq!(read_op, *expected_op);
1595 }
1596 }
1597
1598 #[test_traced("INFO")]
1599 fn test_sync_mmr() {
1600 let executor = deterministic::Runner::default();
1601 executor.start(test_sync_inner::<mmr::Family>);
1602 }
1603
1604 #[test_traced("INFO")]
1605 fn test_sync_mmb() {
1606 let executor = deterministic::Runner::default();
1607 executor.start(test_sync_inner::<mmb::Family>);
1608 }
1609
1610 async fn test_prune_empty_journal_inner<F: Family + PartialEq>(context: Context) {
1612 let mut journal = create_empty_journal::<F>(context, "prune_empty").await;
1613
1614 let boundary = journal.prune(Location::<F>::new(0)).await.unwrap();
1615
1616 assert_eq!(boundary, Location::<F>::new(0));
1617 }
1618
1619 #[test_traced("INFO")]
1620 fn test_prune_empty_journal_mmr() {
1621 let executor = deterministic::Runner::default();
1622 executor.start(test_prune_empty_journal_inner::<mmr::Family>);
1623 }
1624
1625 #[test_traced("INFO")]
1626 fn test_prune_empty_journal_mmb() {
1627 let executor = deterministic::Runner::default();
1628 executor.start(test_prune_empty_journal_inner::<mmb::Family>);
1629 }
1630
1631 async fn test_prune_to_location_inner<F: Family + PartialEq>(context: Context) {
1633 let mut journal = create_journal_with_ops::<F>(context, "prune_to", 100).await;
1634
1635 journal
1637 .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(50)))
1638 .await
1639 .unwrap();
1640 journal.sync().await.unwrap();
1641
1642 let boundary = journal.prune(Location::<F>::new(50)).await.unwrap();
1643
1644 assert!(boundary <= Location::<F>::new(50));
1646 }
1647
1648 #[test_traced("INFO")]
1649 fn test_prune_to_location_mmr() {
1650 let executor = deterministic::Runner::default();
1651 executor.start(test_prune_to_location_inner::<mmr::Family>);
1652 }
1653
1654 #[test_traced("INFO")]
1655 fn test_prune_to_location_mmb() {
1656 let executor = deterministic::Runner::default();
1657 executor.start(test_prune_to_location_inner::<mmb::Family>);
1658 }
1659
1660 async fn test_prune_returns_actual_boundary_inner<F: Family + PartialEq>(context: Context) {
1662 let mut journal = create_journal_with_ops::<F>(context, "prune_boundary", 100).await;
1663
1664 journal
1665 .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(50)))
1666 .await
1667 .unwrap();
1668 journal.sync().await.unwrap();
1669
1670 let requested = Location::<F>::new(50);
1671 let actual = journal.prune(requested).await.unwrap();
1672
1673 let bounds = journal.reader().await.bounds();
1675 assert!(!bounds.is_empty());
1676 assert_eq!(actual, bounds.start);
1677
1678 assert!(actual <= requested);
1680 }
1681
1682 #[test_traced("INFO")]
1683 fn test_prune_returns_actual_boundary_mmr() {
1684 let executor = deterministic::Runner::default();
1685 executor.start(test_prune_returns_actual_boundary_inner::<mmr::Family>);
1686 }
1687
1688 #[test_traced("INFO")]
1689 fn test_prune_returns_actual_boundary_mmb() {
1690 let executor = deterministic::Runner::default();
1691 executor.start(test_prune_returns_actual_boundary_inner::<mmb::Family>);
1692 }
1693
1694 async fn test_prune_preserves_operation_count_inner<F: Family + PartialEq>(context: Context) {
1696 let mut journal = create_journal_with_ops::<F>(context, "prune_count", 100).await;
1697
1698 journal
1699 .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(50)))
1700 .await
1701 .unwrap();
1702 journal.sync().await.unwrap();
1703
1704 let count_before = journal.size().await;
1705 journal.prune(Location::<F>::new(50)).await.unwrap();
1706 let count_after = journal.size().await;
1707
1708 assert_eq!(count_before, count_after);
1709 }
1710
1711 #[test_traced("INFO")]
1712 fn test_prune_preserves_operation_count_mmr() {
1713 let executor = deterministic::Runner::default();
1714 executor.start(test_prune_preserves_operation_count_inner::<mmr::Family>);
1715 }
1716
1717 #[test_traced("INFO")]
1718 fn test_prune_preserves_operation_count_mmb() {
1719 let executor = deterministic::Runner::default();
1720 executor.start(test_prune_preserves_operation_count_inner::<mmb::Family>);
1721 }
1722
1723 async fn test_bounds_empty_and_pruned_inner<F: Family + PartialEq>(context: Context) {
1725 let journal = create_empty_journal::<F>(context.with_label("empty"), "oldest").await;
1727 assert!(journal.reader().await.bounds().is_empty());
1728 journal.destroy().await.unwrap();
1729
1730 let journal =
1732 create_journal_with_ops::<F>(context.with_label("no_prune"), "oldest", 100).await;
1733 let bounds = journal.reader().await.bounds();
1734 assert!(!bounds.is_empty());
1735 assert_eq!(bounds.start, 0);
1736 journal.destroy().await.unwrap();
1737
1738 let mut journal =
1740 create_journal_with_ops::<F>(context.with_label("pruned"), "oldest", 100).await;
1741 journal
1742 .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(50)))
1743 .await
1744 .unwrap();
1745 journal.sync().await.unwrap();
1746
1747 let pruned_boundary = journal.prune(Location::<F>::new(50)).await.unwrap();
1748
1749 let bounds = journal.reader().await.bounds();
1751 assert!(!bounds.is_empty());
1752 assert_eq!(bounds.start, pruned_boundary);
1753 assert!(pruned_boundary <= 50);
1755 journal.destroy().await.unwrap();
1756 }
1757
1758 #[test_traced("INFO")]
1759 fn test_bounds_empty_and_pruned_mmr() {
1760 let executor = deterministic::Runner::default();
1761 executor.start(test_bounds_empty_and_pruned_inner::<mmr::Family>);
1762 }
1763
1764 #[test_traced("INFO")]
1765 fn test_bounds_empty_and_pruned_mmb() {
1766 let executor = deterministic::Runner::default();
1767 executor.start(test_bounds_empty_and_pruned_inner::<mmb::Family>);
1768 }
1769
1770 async fn test_bounds_start_after_prune_inner<F: Family + PartialEq>(context: Context) {
1772 let journal = create_empty_journal::<F>(context.with_label("empty"), "boundary").await;
1774 assert_eq!(journal.reader().await.bounds().start, 0);
1775
1776 let journal =
1778 create_journal_with_ops::<F>(context.with_label("no_prune"), "boundary", 100).await;
1779 assert_eq!(journal.reader().await.bounds().start, 0);
1780
1781 let mut journal =
1783 create_journal_with_ops::<F>(context.with_label("pruned"), "boundary", 100).await;
1784 journal
1785 .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(50)))
1786 .await
1787 .unwrap();
1788 journal.sync().await.unwrap();
1789
1790 let pruned_boundary = journal.prune(Location::<F>::new(50)).await.unwrap();
1791
1792 assert_eq!(journal.reader().await.bounds().start, pruned_boundary);
1793 }
1794
1795 #[test_traced("INFO")]
1796 fn test_bounds_start_after_prune_mmr() {
1797 let executor = deterministic::Runner::default();
1798 executor.start(test_bounds_start_after_prune_inner::<mmr::Family>);
1799 }
1800
1801 #[test_traced("INFO")]
1802 fn test_bounds_start_after_prune_mmb() {
1803 let executor = deterministic::Runner::default();
1804 executor.start(test_bounds_start_after_prune_inner::<mmb::Family>);
1805 }
1806
1807 async fn test_mmr_prunes_to_journal_boundary_inner<F: Family + PartialEq>(context: Context) {
1809 let mut journal = create_journal_with_ops::<F>(context, "mmr_boundary", 50).await;
1810
1811 journal
1812 .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(25)))
1813 .await
1814 .unwrap();
1815 journal.sync().await.unwrap();
1816
1817 let pruned_boundary = journal.prune(Location::<F>::new(25)).await.unwrap();
1818
1819 let bounds = journal.reader().await.bounds();
1821 assert!(!bounds.is_empty());
1822 assert_eq!(pruned_boundary, bounds.start);
1823
1824 assert!(pruned_boundary <= Location::<F>::new(25));
1826
1827 assert_eq!(journal.size().await, 51);
1829 }
1830
1831 #[test_traced("INFO")]
1832 fn test_mmr_prunes_to_journal_boundary_mmr() {
1833 let executor = deterministic::Runner::default();
1834 executor.start(test_mmr_prunes_to_journal_boundary_inner::<mmr::Family>);
1835 }
1836
1837 #[test_traced("INFO")]
1838 fn test_mmr_prunes_to_journal_boundary_mmb() {
1839 let executor = deterministic::Runner::default();
1840 executor.start(test_mmr_prunes_to_journal_boundary_inner::<mmb::Family>);
1841 }
1842
1843 async fn test_proof_multiple_operations_inner<F: Family + PartialEq>(context: Context) {
1845 let journal = create_journal_with_ops::<F>(context, "proof_multi", 50).await;
1846
1847 let (proof, ops) = journal
1848 .proof(Location::<F>::new(0), NZU64!(50))
1849 .await
1850 .unwrap();
1851
1852 assert_eq!(ops.len(), 50);
1853 for (i, op) in ops.iter().enumerate() {
1854 assert_eq!(*op, create_operation::<F>(i as u8));
1855 }
1856
1857 let hasher = StandardHasher::new();
1859 let root = journal.root();
1860 assert!(verify_proof(
1861 &proof,
1862 &ops,
1863 Location::<F>::new(0),
1864 &root,
1865 &hasher
1866 ));
1867 }
1868
1869 #[test_traced("INFO")]
1870 fn test_proof_multiple_operations_mmr() {
1871 let executor = deterministic::Runner::default();
1872 executor.start(test_proof_multiple_operations_inner::<mmr::Family>);
1873 }
1874
1875 #[test_traced("INFO")]
1876 fn test_proof_multiple_operations_mmb() {
1877 let executor = deterministic::Runner::default();
1878 executor.start(test_proof_multiple_operations_inner::<mmb::Family>);
1879 }
1880
1881 async fn test_historical_proof_limited_by_max_ops_inner<F: Family + PartialEq>(
1883 context: Context,
1884 ) {
1885 let journal = create_journal_with_ops::<F>(context, "proof_limit", 50).await;
1886
1887 let size = journal.size().await;
1888 let (proof, ops) = journal
1889 .historical_proof(size, Location::<F>::new(0), NZU64!(20))
1890 .await
1891 .unwrap();
1892
1893 assert_eq!(ops.len(), 20);
1895 for (i, op) in ops.iter().enumerate() {
1896 assert_eq!(*op, create_operation::<F>(i as u8));
1897 }
1898
1899 let hasher = StandardHasher::new();
1901 let root = journal.root();
1902 assert!(verify_proof(
1903 &proof,
1904 &ops,
1905 Location::<F>::new(0),
1906 &root,
1907 &hasher
1908 ));
1909 }
1910
1911 #[test_traced("INFO")]
1912 fn test_historical_proof_limited_by_max_ops_mmr() {
1913 let executor = deterministic::Runner::default();
1914 executor.start(|context| {
1915 test_historical_proof_limited_by_max_ops_inner::<mmr::Family>(context)
1916 });
1917 }
1918
1919 #[test_traced("INFO")]
1920 fn test_historical_proof_limited_by_max_ops_mmb() {
1921 let executor = deterministic::Runner::default();
1922 executor.start(|context| {
1923 test_historical_proof_limited_by_max_ops_inner::<mmb::Family>(context)
1924 });
1925 }
1926
1927 async fn test_historical_proof_at_end_of_journal_inner<F: Family + PartialEq>(
1929 context: Context,
1930 ) {
1931 let journal = create_journal_with_ops::<F>(context, "proof_end", 50).await;
1932
1933 let size = journal.size().await;
1934 let (proof, ops) = journal
1936 .historical_proof(size, Location::<F>::new(40), NZU64!(20))
1937 .await
1938 .unwrap();
1939
1940 assert_eq!(ops.len(), 10);
1942 for (i, op) in ops.iter().enumerate() {
1943 assert_eq!(*op, create_operation::<F>((40 + i) as u8));
1944 }
1945
1946 let hasher = StandardHasher::new();
1948 let root = journal.root();
1949 assert!(verify_proof(
1950 &proof,
1951 &ops,
1952 Location::<F>::new(40),
1953 &root,
1954 &hasher
1955 ));
1956 }
1957
1958 #[test_traced("INFO")]
1959 fn test_historical_proof_at_end_of_journal_mmr() {
1960 let executor = deterministic::Runner::default();
1961 executor.start(test_historical_proof_at_end_of_journal_inner::<mmr::Family>);
1962 }
1963
1964 #[test_traced("INFO")]
1965 fn test_historical_proof_at_end_of_journal_mmb() {
1966 let executor = deterministic::Runner::default();
1967 executor.start(test_historical_proof_at_end_of_journal_inner::<mmb::Family>);
1968 }
1969
1970 async fn test_historical_proof_out_of_range_returns_error_inner<F: Family + PartialEq>(
1972 context: Context,
1973 ) {
1974 let journal = create_journal_with_ops::<F>(context, "proof_oob", 5).await;
1975
1976 let result = journal
1978 .historical_proof(Location::<F>::new(10), Location::<F>::new(0), NZU64!(1))
1979 .await;
1980
1981 assert!(matches!(
1982 result,
1983 Err(Error::Merkle(merkle::Error::RangeOutOfBounds(_)))
1984 ));
1985 }
1986
1987 #[test_traced("INFO")]
1988 fn test_historical_proof_out_of_range_returns_error_mmr() {
1989 let executor = deterministic::Runner::default();
1990 executor.start(|context| {
1991 test_historical_proof_out_of_range_returns_error_inner::<mmr::Family>(context)
1992 });
1993 }
1994
1995 #[test_traced("INFO")]
1996 fn test_historical_proof_out_of_range_returns_error_mmb() {
1997 let executor = deterministic::Runner::default();
1998 executor.start(|context| {
1999 test_historical_proof_out_of_range_returns_error_inner::<mmb::Family>(context)
2000 });
2001 }
2002
2003 async fn test_historical_proof_start_too_large_returns_error_inner<F: Family + PartialEq>(
2005 context: Context,
2006 ) {
2007 let journal = create_journal_with_ops::<F>(context, "proof_start_oob", 5).await;
2008
2009 let size = journal.size().await;
2010 let result = journal.historical_proof(size, size, NZU64!(1)).await;
2012
2013 assert!(matches!(
2014 result,
2015 Err(Error::Merkle(merkle::Error::RangeOutOfBounds(_)))
2016 ));
2017 }
2018
2019 #[test_traced("INFO")]
2020 fn test_historical_proof_start_too_large_returns_error_mmr() {
2021 let executor = deterministic::Runner::default();
2022 executor.start(|context| {
2023 test_historical_proof_start_too_large_returns_error_inner::<mmr::Family>(context)
2024 });
2025 }
2026
2027 #[test_traced("INFO")]
2028 fn test_historical_proof_start_too_large_returns_error_mmb() {
2029 let executor = deterministic::Runner::default();
2030 executor.start(|context| {
2031 test_historical_proof_start_too_large_returns_error_inner::<mmb::Family>(context)
2032 });
2033 }
2034
2035 async fn test_historical_proof_truly_historical_inner<F: Family + PartialEq>(context: Context) {
2037 let mut journal = create_journal_with_ops::<F>(context, "proof_historical", 50).await;
2039
2040 let hasher = StandardHasher::new();
2042 let historical_root = journal.root();
2043 let historical_size = journal.size().await;
2044
2045 for i in 50..100 {
2047 journal
2048 .append(&create_operation::<F>(i as u8))
2049 .await
2050 .unwrap();
2051 }
2052 journal.sync().await.unwrap();
2053
2054 let (proof, ops) = journal
2056 .historical_proof(historical_size, Location::<F>::new(0), NZU64!(50))
2057 .await
2058 .unwrap();
2059
2060 assert_eq!(ops.len(), 50);
2062 for (i, op) in ops.iter().enumerate() {
2063 assert_eq!(*op, create_operation::<F>(i as u8));
2064 }
2065
2066 assert!(verify_proof(
2068 &proof,
2069 &ops,
2070 Location::<F>::new(0),
2071 &historical_root,
2072 &hasher
2073 ));
2074 }
2075
2076 #[test_traced("INFO")]
2077 fn test_historical_proof_truly_historical_mmr() {
2078 let executor = deterministic::Runner::default();
2079 executor.start(test_historical_proof_truly_historical_inner::<mmr::Family>);
2080 }
2081
2082 #[test_traced("INFO")]
2083 fn test_historical_proof_truly_historical_mmb() {
2084 let executor = deterministic::Runner::default();
2085 executor.start(test_historical_proof_truly_historical_inner::<mmb::Family>);
2086 }
2087
2088 async fn test_historical_proof_pruned_location_returns_error_inner<F: Family + PartialEq>(
2090 context: Context,
2091 ) {
2092 let mut journal = create_journal_with_ops::<F>(context, "proof_pruned", 50).await;
2093
2094 journal
2095 .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(25)))
2096 .await
2097 .unwrap();
2098 journal.sync().await.unwrap();
2099 let pruned_boundary = journal.prune(Location::<F>::new(25)).await.unwrap();
2100
2101 let size = journal.size().await;
2103 let start_loc = Location::<F>::new(0);
2104 if start_loc < pruned_boundary {
2105 let result = journal.historical_proof(size, start_loc, NZU64!(1)).await;
2106
2107 assert!(result.is_err());
2109 }
2110 }
2111
2112 #[test_traced("INFO")]
2113 fn test_historical_proof_pruned_location_returns_error_mmr() {
2114 let executor = deterministic::Runner::default();
2115 executor.start(|context| {
2116 test_historical_proof_pruned_location_returns_error_inner::<mmr::Family>(context)
2117 });
2118 }
2119
2120 #[test_traced("INFO")]
2121 fn test_historical_proof_pruned_location_returns_error_mmb() {
2122 let executor = deterministic::Runner::default();
2123 executor.start(|context| {
2124 test_historical_proof_pruned_location_returns_error_inner::<mmb::Family>(context)
2125 });
2126 }
2127
2128 async fn test_replay_operations_inner<F: Family + PartialEq>(context: Context) {
2130 let journal = create_empty_journal::<F>(context.with_label("empty"), "replay").await;
2132 let reader = journal.reader().await;
2133 let stream = reader.replay(NZUsize!(10), 0).await.unwrap();
2134 futures::pin_mut!(stream);
2135 assert!(stream.next().await.is_none());
2136
2137 let journal =
2139 create_journal_with_ops::<F>(context.with_label("with_ops"), "replay", 50).await;
2140 let reader = journal.reader().await;
2141 let stream = reader.replay(NZUsize!(100), 0).await.unwrap();
2142 futures::pin_mut!(stream);
2143
2144 for i in 0..50 {
2145 let (pos, op) = stream.next().await.unwrap().unwrap();
2146 assert_eq!(pos, i);
2147 assert_eq!(op, create_operation::<F>(i as u8));
2148 }
2149
2150 assert!(stream.next().await.is_none());
2151 }
2152
2153 #[test_traced("INFO")]
2154 fn test_replay_operations_mmr() {
2155 let executor = deterministic::Runner::default();
2156 executor.start(test_replay_operations_inner::<mmr::Family>);
2157 }
2158
2159 #[test_traced("INFO")]
2160 fn test_replay_operations_mmb() {
2161 let executor = deterministic::Runner::default();
2162 executor.start(test_replay_operations_inner::<mmb::Family>);
2163 }
2164
2165 async fn test_replay_from_middle_inner<F: Family + PartialEq>(context: Context) {
2167 let journal = create_journal_with_ops::<F>(context, "replay_middle", 50).await;
2168 let reader = journal.reader().await;
2169 let stream = reader.replay(NZUsize!(100), 25).await.unwrap();
2170 futures::pin_mut!(stream);
2171
2172 let mut count = 0;
2173 while let Some(result) = stream.next().await {
2174 let (pos, op) = result.unwrap();
2175 assert_eq!(pos, 25 + count);
2176 assert_eq!(op, create_operation::<F>((25 + count) as u8));
2177 count += 1;
2178 }
2179
2180 assert_eq!(count, 25);
2182 }
2183
2184 #[test_traced("INFO")]
2185 fn test_replay_from_middle_mmr() {
2186 let executor = deterministic::Runner::default();
2187 executor.start(test_replay_from_middle_inner::<mmr::Family>);
2188 }
2189
2190 #[test_traced("INFO")]
2191 fn test_replay_from_middle_mmb() {
2192 let executor = deterministic::Runner::default();
2193 executor.start(test_replay_from_middle_inner::<mmb::Family>);
2194 }
2195
2196 async fn test_speculative_batch_inner<F: Family + PartialEq>(context: Context) {
2198 let mut journal = create_journal_with_ops::<F>(context, "speculative_batch", 10).await;
2199 let original_root = journal.root();
2200
2201 let b1 = journal.new_batch();
2203 let b2 = journal.new_batch();
2204
2205 let op_a = create_operation::<F>(100);
2207 let op_b = create_operation::<F>(200);
2208 let b1 = b1.add(op_a.clone());
2209 let b2 = b2.add(op_b);
2210
2211 let m1 = journal.merkle.with_mem(|mem| b1.merkleize(mem));
2213 let m2 = journal.merkle.with_mem(|mem| b2.merkleize(mem));
2214 assert_ne!(m1.root(), m2.root());
2215 assert_ne!(m1.root(), original_root);
2216 assert_ne!(m2.root(), original_root);
2217
2218 assert_eq!(journal.root(), original_root);
2220
2221 let expected_root = m1.root();
2223 journal.apply_batch(&m1).await.unwrap();
2224
2225 assert_eq!(journal.root(), expected_root);
2227 assert_eq!(*journal.size().await, 11);
2228 }
2229
2230 #[test_traced("INFO")]
2231 fn test_speculative_batch_mmr() {
2232 let executor = deterministic::Runner::default();
2233 executor.start(test_speculative_batch_inner::<mmr::Family>);
2234 }
2235
2236 #[test_traced("INFO")]
2237 fn test_speculative_batch_mmb() {
2238 let executor = deterministic::Runner::default();
2239 executor.start(test_speculative_batch_inner::<mmb::Family>);
2240 }
2241
2242 async fn test_speculative_batch_stacking_inner<F: Family + PartialEq>(context: Context) {
2245 let mut journal = create_journal_with_ops::<F>(context, "batch_stacking", 10).await;
2246
2247 let op_a = create_operation::<F>(100);
2248 let op_b = create_operation::<F>(200);
2249
2250 let merkleized_b = {
2251 let batch_a = journal.new_batch().add(op_a.clone());
2252 let merkleized_a = journal.merkle.with_mem(|mem| batch_a.merkleize(mem));
2253
2254 let batch_b = merkleized_a.new_batch::<Sha256>().add(op_b.clone());
2255 journal.merkle.with_mem(|mem| batch_b.merkleize(mem))
2256 };
2257
2258 let expected_root = merkleized_b.root();
2259 journal.apply_batch(&merkleized_b).await.unwrap();
2260
2261 assert_eq!(journal.root(), expected_root);
2262 assert_eq!(*journal.size().await, 12);
2263
2264 let read_a = journal.read(Location::<F>::new(10)).await.unwrap();
2266 assert_eq!(read_a, op_a);
2267 let read_b = journal.read(Location::<F>::new(11)).await.unwrap();
2268 assert_eq!(read_b, op_b);
2269 }
2270
2271 #[test_traced("INFO")]
2272 fn test_speculative_batch_stacking_mmr() {
2273 let executor = deterministic::Runner::default();
2274 executor.start(test_speculative_batch_stacking_inner::<mmr::Family>);
2275 }
2276
2277 #[test_traced("INFO")]
2278 fn test_speculative_batch_stacking_mmb() {
2279 let executor = deterministic::Runner::default();
2280 executor.start(test_speculative_batch_stacking_inner::<mmb::Family>);
2281 }
2282
2283 async fn test_speculative_batch_sequential_inner<F: Family + PartialEq>(context: Context) {
2286 let mut journal = create_journal_with_ops::<F>(context, "batch_sequential", 10).await;
2287
2288 let op_a = create_operation::<F>(100);
2289 let op_b = create_operation::<F>(200);
2290
2291 let batch_a = journal.new_batch().add(op_a.clone());
2293 let merkleized_a = journal.merkle.with_mem(|mem| batch_a.merkleize(mem));
2294 journal.apply_batch(&merkleized_a).await.unwrap();
2295 assert_eq!(*journal.size().await, 11);
2296
2297 let batch_b = journal.new_batch().add(op_b.clone());
2299 let merkleized_b = journal.merkle.with_mem(|mem| batch_b.merkleize(mem));
2300 let expected_root = merkleized_b.root();
2301 journal.apply_batch(&merkleized_b).await.unwrap();
2302
2303 assert_eq!(journal.root(), expected_root);
2304 assert_eq!(*journal.size().await, 12);
2305
2306 let read_a = journal.read(Location::<F>::new(10)).await.unwrap();
2308 assert_eq!(read_a, op_a);
2309 let read_b = journal.read(Location::<F>::new(11)).await.unwrap();
2310 assert_eq!(read_b, op_b);
2311 }
2312
2313 #[test_traced("INFO")]
2314 fn test_speculative_batch_sequential_mmr() {
2315 let executor = deterministic::Runner::default();
2316 executor.start(test_speculative_batch_sequential_inner::<mmr::Family>);
2317 }
2318
2319 #[test_traced("INFO")]
2320 fn test_speculative_batch_sequential_mmb() {
2321 let executor = deterministic::Runner::default();
2322 executor.start(test_speculative_batch_sequential_inner::<mmb::Family>);
2323 }
2324
2325 async fn test_stale_batch_sibling_inner<F: Family + PartialEq>(context: Context) {
2326 let mut journal = create_empty_journal::<F>(context, "stale-sibling").await;
2327 let op_a = create_operation::<F>(1);
2328 let op_b = create_operation::<F>(2);
2329
2330 let batch_a = journal.new_batch().add(op_a.clone());
2332 let merkleized_a = journal.merkle.with_mem(|mem| batch_a.merkleize(mem));
2333 let batch_b = journal.new_batch().add(op_b);
2334 let merkleized_b = journal.merkle.with_mem(|mem| batch_b.merkleize(mem));
2335
2336 journal.apply_batch(&merkleized_a).await.unwrap();
2338 let expected_root = journal.root();
2339 let expected_size = journal.size().await;
2340
2341 let result = journal.apply_batch(&merkleized_b).await;
2343 assert!(
2344 matches!(
2345 result,
2346 Err(super::Error::Merkle(merkle::Error::StaleBatch { .. }))
2347 ),
2348 "expected StaleBatch, got {result:?}"
2349 );
2350
2351 assert_eq!(journal.root(), expected_root);
2353 assert_eq!(journal.size().await, expected_size);
2354 let (_, ops) = journal
2355 .proof(Location::<F>::new(0), NZU64!(1))
2356 .await
2357 .unwrap();
2358 assert_eq!(ops, vec![op_a]);
2359 }
2360
2361 #[test_traced("INFO")]
2362 fn test_stale_batch_sibling_mmr() {
2363 let executor = deterministic::Runner::default();
2364 executor.start(test_stale_batch_sibling_inner::<mmr::Family>);
2365 }
2366
2367 #[test_traced("INFO")]
2368 fn test_stale_batch_sibling_mmb() {
2369 let executor = deterministic::Runner::default();
2370 executor.start(test_stale_batch_sibling_inner::<mmb::Family>);
2371 }
2372
2373 async fn test_stale_batch_chained_inner<F: Family + PartialEq>(context: Context) {
2374 let mut journal = create_journal_with_ops::<F>(context, "stale-chained", 5).await;
2375
2376 let parent_batch = journal.new_batch().add(create_operation::<F>(10));
2378 let parent = journal.merkle.with_mem(|mem| parent_batch.merkleize(mem));
2379 let batch_a = parent.new_batch::<Sha256>().add(create_operation::<F>(20));
2380 let child_a = journal.merkle.with_mem(|mem| batch_a.merkleize(mem));
2381 let batch_b = parent.new_batch::<Sha256>().add(create_operation::<F>(30));
2382 let child_b = journal.merkle.with_mem(|mem| batch_b.merkleize(mem));
2383 drop(parent);
2384
2385 journal.apply_batch(&child_a).await.unwrap();
2387 let result = journal.apply_batch(&child_b).await;
2388 assert!(
2389 matches!(
2390 result,
2391 Err(super::Error::Merkle(merkle::Error::StaleBatch { .. }))
2392 ),
2393 "expected StaleBatch for sibling, got {result:?}"
2394 );
2395 }
2396
2397 #[test_traced("INFO")]
2398 fn test_stale_batch_chained_mmr() {
2399 let executor = deterministic::Runner::default();
2400 executor.start(test_stale_batch_chained_inner::<mmr::Family>);
2401 }
2402
2403 #[test_traced("INFO")]
2404 fn test_stale_batch_chained_mmb() {
2405 let executor = deterministic::Runner::default();
2406 executor.start(test_stale_batch_chained_inner::<mmb::Family>);
2407 }
2408
2409 async fn test_stale_batch_parent_before_child_inner<F: Family + PartialEq>(context: Context) {
2410 let mut journal = create_empty_journal::<F>(context, "stale-parent-first").await;
2411
2412 let parent_batch = journal.new_batch().add(create_operation::<F>(1));
2414 let parent = journal.merkle.with_mem(|mem| parent_batch.merkleize(mem));
2415 let child_batch = parent.new_batch::<Sha256>().add(create_operation::<F>(2));
2416 let child = journal.merkle.with_mem(|mem| child_batch.merkleize(mem));
2417
2418 let expected_root = child.root();
2419
2420 journal.apply_batch(&parent).await.unwrap();
2422 journal.apply_batch(&child).await.unwrap();
2423
2424 assert_eq!(journal.root(), expected_root);
2425 assert_eq!(*journal.size().await, 2);
2426 }
2427
2428 #[test_traced("INFO")]
2429 fn test_stale_batch_parent_before_child_mmr() {
2430 let executor = deterministic::Runner::default();
2431 executor.start(test_stale_batch_parent_before_child_inner::<mmr::Family>);
2432 }
2433
2434 #[test_traced("INFO")]
2435 fn test_stale_batch_parent_before_child_mmb() {
2436 let executor = deterministic::Runner::default();
2437 executor.start(test_stale_batch_parent_before_child_inner::<mmb::Family>);
2438 }
2439
2440 async fn test_stale_batch_child_before_parent_inner<F: Family + PartialEq>(context: Context) {
2441 let mut journal = create_empty_journal::<F>(context, "stale-child-first").await;
2442
2443 let parent_batch = journal.new_batch().add(create_operation::<F>(1));
2445 let parent = journal.merkle.with_mem(|mem| parent_batch.merkleize(mem));
2446 let child_batch = parent.new_batch::<Sha256>().add(create_operation::<F>(2));
2447 let child = journal.merkle.with_mem(|mem| child_batch.merkleize(mem));
2448
2449 journal.apply_batch(&child).await.unwrap();
2451 let result = journal.apply_batch(&parent).await;
2452 assert!(
2453 matches!(
2454 result,
2455 Err(super::Error::Merkle(merkle::Error::StaleBatch { .. }))
2456 ),
2457 "expected StaleBatch for parent after child applied, got {result:?}"
2458 );
2459 }
2460
2461 #[test_traced("INFO")]
2462 fn test_stale_batch_child_before_parent_mmr() {
2463 let executor = deterministic::Runner::default();
2464 executor.start(test_stale_batch_child_before_parent_inner::<mmr::Family>);
2465 }
2466
2467 #[test_traced("INFO")]
2468 fn test_stale_batch_child_before_parent_mmb() {
2469 let executor = deterministic::Runner::default();
2470 executor.start(test_stale_batch_child_before_parent_inner::<mmb::Family>);
2471 }
2472
2473 async fn test_apply_batch_skip_ancestor_items_inner<F: Family + PartialEq>(context: Context) {
2475 let mut journal = create_journal_with_ops::<F>(context, "rp-skip", 3).await;
2476
2477 let parent_batch = journal
2479 .new_batch()
2480 .add(create_operation::<F>(10))
2481 .add(create_operation::<F>(11));
2482 let parent = journal.merkle.with_mem(|mem| parent_batch.merkleize(mem));
2483
2484 let child_batch = parent
2486 .new_batch::<Sha256>()
2487 .add(create_operation::<F>(20))
2488 .add(create_operation::<F>(21))
2489 .add(create_operation::<F>(22));
2490 let child = journal.merkle.with_mem(|mem| child_batch.merkleize(mem));
2491
2492 journal.apply_batch(&parent).await.unwrap();
2494
2495 journal.apply_batch(&child).await.unwrap();
2497
2498 let (_, ops) = journal
2500 .proof(Location::<F>::new(3), NZU64!(5))
2501 .await
2502 .unwrap();
2503 assert_eq!(ops.len(), 5);
2504 }
2505
2506 #[test_traced("INFO")]
2507 fn test_apply_batch_skip_ancestor_items_mmr() {
2508 let executor = deterministic::Runner::default();
2509 executor.start(test_apply_batch_skip_ancestor_items_inner::<mmr::Family>);
2510 }
2511
2512 #[test_traced("INFO")]
2513 fn test_apply_batch_skip_ancestor_items_mmb() {
2514 let executor = deterministic::Runner::default();
2515 executor.start(test_apply_batch_skip_ancestor_items_inner::<mmb::Family>);
2516 }
2517
2518 async fn test_apply_batch_cross_batch_inner<F: Family + PartialEq>(context: Context) {
2520 let mut journal = create_journal_with_ops::<F>(context, "rp-cross", 2).await;
2521
2522 let grandparent_batch = journal
2524 .new_batch()
2525 .add(create_operation::<F>(3))
2526 .add(create_operation::<F>(4))
2527 .add(create_operation::<F>(5));
2528 let grandparent = journal
2529 .merkle
2530 .with_mem(|mem| grandparent_batch.merkleize(mem));
2531
2532 let parent_batch = grandparent
2534 .new_batch::<Sha256>()
2535 .add(create_operation::<F>(6))
2536 .add(create_operation::<F>(7));
2537 let parent = journal.merkle.with_mem(|mem| parent_batch.merkleize(mem));
2538
2539 let child_batch = parent.new_batch::<Sha256>().add(create_operation::<F>(8));
2541 let child = journal.merkle.with_mem(|mem| child_batch.merkleize(mem));
2542
2543 journal.apply_batch(&grandparent).await.unwrap();
2545
2546 journal.apply_batch(&parent).await.unwrap();
2548
2549 journal.apply_batch(&child).await.unwrap();
2551
2552 assert_eq!(*journal.size().await, 8);
2554
2555 let (_, ops) = journal
2557 .proof(Location::<F>::new(2), NZU64!(6))
2558 .await
2559 .unwrap();
2560 for (i, op) in ops.iter().enumerate() {
2561 assert_eq!(*op, create_operation::<F>((i + 3) as u8));
2562 }
2563 }
2564
2565 #[test_traced("INFO")]
2566 fn test_apply_batch_cross_batch_mmr() {
2567 let executor = deterministic::Runner::default();
2568 executor.start(test_apply_batch_cross_batch_inner::<mmr::Family>);
2569 }
2570
2571 #[test_traced("INFO")]
2572 fn test_apply_batch_cross_batch_mmb() {
2573 let executor = deterministic::Runner::default();
2574 executor.start(test_apply_batch_cross_batch_inner::<mmb::Family>);
2575 }
2576
2577 async fn test_merkleize_with_matches_add_inner<F: Family + PartialEq>(context: Context) {
2579 let journal = create_journal_with_ops::<F>(context, "mw-matches", 5).await;
2580
2581 let ops = vec![
2582 create_operation::<F>(10),
2583 create_operation::<F>(11),
2584 create_operation::<F>(12),
2585 ];
2586
2587 let mut batch = journal.new_batch();
2589 for op in &ops {
2590 batch = batch.add(op.clone());
2591 }
2592 let expected = journal.merkle.with_mem(|mem| batch.merkleize(mem));
2593
2594 let batch = journal.new_batch();
2596 let actual = journal
2597 .merkle
2598 .with_mem(|mem| batch.merkleize_with(mem, Arc::new(ops)));
2599
2600 assert_eq!(actual.root(), expected.root());
2601 }
2602
2603 #[test_traced("INFO")]
2604 fn test_merkleize_with_matches_add_mmr() {
2605 let executor = deterministic::Runner::default();
2606 executor.start(test_merkleize_with_matches_add_inner::<mmr::Family>);
2607 }
2608
2609 #[test_traced("INFO")]
2610 fn test_merkleize_with_matches_add_mmb() {
2611 let executor = deterministic::Runner::default();
2612 executor.start(test_merkleize_with_matches_add_inner::<mmb::Family>);
2613 }
2614
2615 async fn test_merkleize_with_apply_inner<F: Family + PartialEq>(context: Context) {
2617 let mut journal = create_journal_with_ops::<F>(context, "mw-apply", 5).await;
2618
2619 let ops = vec![create_operation::<F>(10), create_operation::<F>(11)];
2620 let batch = journal.new_batch();
2621 let merkleized = journal
2622 .merkle
2623 .with_mem(|mem| batch.merkleize_with(mem, Arc::new(ops.clone())));
2624
2625 let expected_root = merkleized.root();
2626 journal.apply_batch(&merkleized).await.unwrap();
2627
2628 assert_eq!(journal.root(), expected_root);
2629 assert_eq!(*journal.size().await, 7);
2630
2631 let reader = journal.reader().await;
2632 assert_eq!(reader.read(5).await.unwrap(), ops[0]);
2633 assert_eq!(reader.read(6).await.unwrap(), ops[1]);
2634 }
2635
2636 #[test_traced("INFO")]
2637 fn test_merkleize_with_apply_mmr() {
2638 let executor = deterministic::Runner::default();
2639 executor.start(test_merkleize_with_apply_inner::<mmr::Family>);
2640 }
2641
2642 #[test_traced("INFO")]
2643 fn test_merkleize_with_apply_mmb() {
2644 let executor = deterministic::Runner::default();
2645 executor.start(test_merkleize_with_apply_inner::<mmb::Family>);
2646 }
2647
2648 async fn test_merkleize_with_shares_arc_inner<F: Family + PartialEq>(context: Context) {
2650 let journal = create_journal_with_ops::<F>(context, "mw-arc", 3).await;
2651
2652 let ops = Arc::new(vec![create_operation::<F>(20), create_operation::<F>(21)]);
2653 let ops_clone = Arc::clone(&ops);
2654 let batch = journal.new_batch();
2655 let merkleized = journal
2656 .merkle
2657 .with_mem(|mem| batch.merkleize_with(mem, ops_clone));
2658
2659 assert!(Arc::ptr_eq(&merkleized.items, &ops));
2661 }
2662
2663 #[test_traced("INFO")]
2664 fn test_merkleize_with_shares_arc_mmr() {
2665 let executor = deterministic::Runner::default();
2666 executor.start(test_merkleize_with_shares_arc_inner::<mmr::Family>);
2667 }
2668
2669 #[test_traced("INFO")]
2670 fn test_merkleize_with_shares_arc_mmb() {
2671 let executor = deterministic::Runner::default();
2672 executor.start(test_merkleize_with_shares_arc_inner::<mmb::Family>);
2673 }
2674
2675 async fn test_apply_batch_skips_only_committed_ancestor_items_inner<F: Family + PartialEq>(
2678 context: Context,
2679 ) {
2680 let mut journal = create_empty_journal::<F>(context.clone(), "skip-partial").await;
2681
2682 let a_batch = journal.new_batch().add(create_operation::<F>(1));
2684 let a = journal.merkle.with_mem(|mem| a_batch.merkleize(mem));
2685 let b_batch = a.new_batch::<Sha256>().add(create_operation::<F>(2));
2686 let b = journal.merkle.with_mem(|mem| b_batch.merkleize(mem));
2687 let c_batch = b.new_batch::<Sha256>().add(create_operation::<F>(3));
2688 let c = journal.merkle.with_mem(|mem| c_batch.merkleize(mem));
2689
2690 journal.apply_batch(&a).await.unwrap();
2692 journal.apply_batch(&c).await.unwrap();
2693
2694 assert_eq!(*journal.size().await, 3);
2696
2697 let mut reference =
2699 create_empty_journal::<F>(context.with_label("ref"), "skip-partial-ref").await;
2700 for i in 1..=3u8 {
2701 reference.append(&create_operation::<F>(i)).await.unwrap();
2702 }
2703 assert_eq!(journal.root(), reference.root());
2704 }
2705
2706 #[test_traced("INFO")]
2707 fn test_apply_batch_skips_only_committed_ancestor_items_mmr() {
2708 let executor = deterministic::Runner::default();
2709 executor.start(test_apply_batch_skips_only_committed_ancestor_items_inner::<mmr::Family>);
2710 }
2711
2712 #[test_traced("INFO")]
2713 fn test_apply_batch_skips_only_committed_ancestor_items_mmb() {
2714 let executor = deterministic::Runner::default();
2715 executor.start(test_apply_batch_skips_only_committed_ancestor_items_inner::<mmb::Family>);
2716 }
2717}