1use crate::{
9 journal::{
10 contiguous::{fixed, variable, Contiguous, Many, Mutable, Reader},
11 Error as JournalError,
12 },
13 merkle::{
14 self, batch,
15 full::Merkle,
16 hasher::{Hasher as _, Standard as StandardHasher},
17 mem::Mem,
18 Bagging, Family, Location, Position, Proof, Readable,
19 },
20 Context, Persistable,
21};
22use alloc::{
23 sync::{Arc, Weak},
24 vec::Vec,
25};
26use commonware_codec::{CodecFixedShared, CodecShared, Encode, EncodeShared};
27use commonware_cryptography::{Digest, Hasher};
28use commonware_parallel::Strategy;
29use core::num::NonZeroU64;
30use futures::{try_join, TryFutureExt as _};
31use thiserror::Error;
32use tracing::{debug, warn};
33
34#[derive(Error, Debug)]
36pub enum Error<F: Family> {
37 #[error("merkle error: {0}")]
38 Merkle(#[from] merkle::Error<F>),
39
40 #[error("journal error: {0}")]
41 Journal(#[from] super::Error),
42}
43
44type MerkleizedParent<F, H, Item, S> = Arc<MerkleizedBatch<F, <H as Hasher>::Digest, Item, S>>;
46
47pub struct UnmerkleizedBatch<F: Family, H: Hasher, Item: Send + Sync, S: Strategy> {
50 inner: batch::UnmerkleizedBatch<F, H::Digest, S>,
52 hasher: StandardHasher<H>,
54 items: Vec<Item>,
56 parent: Option<MerkleizedParent<F, H, Item, S>>,
58}
59
60type MerkleizedBatchArc<F, H, Item, S> = Arc<MerkleizedBatch<F, <H as Hasher>::Digest, Item, S>>;
61
62impl<F: Family, H: Hasher, Item: Encode + Send + Sync, S: Strategy>
63 UnmerkleizedBatch<F, H, Item, S>
64{
65 #[allow(clippy::should_implement_trait)]
67 pub fn add(mut self, item: Item) -> Self {
68 let encoded = item.encode();
69 self.inner = self.inner.add(&self.hasher, &encoded);
70 self.items.push(item);
71 self
72 }
73
74 fn collect_ancestor_items(
76 parent: &Option<MerkleizedParent<F, H, Item, S>>,
77 ) -> Vec<Arc<Vec<Item>>> {
78 let Some(parent) = parent else {
79 return Vec::new();
80 };
81 let mut items = Vec::new();
82 if !parent.items.is_empty() {
83 items.push(Arc::clone(&parent.items));
84 }
85 let mut current = parent.parent.as_ref().and_then(Weak::upgrade);
86 while let Some(batch) = current {
87 if !batch.items.is_empty() {
88 items.push(Arc::clone(&batch.items));
89 }
90 current = batch.parent.as_ref().and_then(Weak::upgrade);
91 }
92 items.reverse();
93 items
94 }
95
96 pub fn merkleize(self, base: &Mem<F, H::Digest>) -> MerkleizedBatchArc<F, H, Item, S> {
99 let Self {
100 inner,
101 hasher,
102 items,
103 parent,
104 } = self;
105
106 let items = Arc::new(items);
107 let merkle = inner.merkleize(base, &hasher);
108 let ancestor_items = Self::collect_ancestor_items(&parent);
109 Arc::new(MerkleizedBatch {
110 inner: merkle,
111 bagging: hasher.root_bagging(),
112 items,
113 parent: parent.as_ref().map(Arc::downgrade),
114 ancestor_items,
115 })
116 }
117
118 pub(crate) fn merkleize_with(
129 mut self,
130 base: &Mem<F, H::Digest>,
131 items: Arc<Vec<Item>>,
132 ) -> MerkleizedBatchArc<F, H, Item, S> {
133 assert!(
134 self.items.is_empty(),
135 "merkleize_with expects no items added via add"
136 );
137
138 let starting_leaves = self.inner.leaves();
139 let digests: Vec<H::Digest> = self.inner.strategy().map_init_collect_vec(
140 items.iter().enumerate(),
141 || self.hasher.clone(),
142 |h, (i, item)| {
143 let loc = Location::<F>::new(*starting_leaves + i as u64);
144 let pos = Position::try_from(loc).expect("valid leaf location");
145 h.leaf_digest(pos, &item.encode())
146 },
147 );
148 for digest in digests {
149 self.inner = self.inner.add_leaf_digest(digest);
150 }
151
152 let merkle = self.inner.merkleize(base, &self.hasher);
153 let ancestor_items = Self::collect_ancestor_items(&self.parent);
154 Arc::new(MerkleizedBatch {
155 inner: merkle,
156 bagging: self.hasher.root_bagging(),
157 items,
158 parent: self.parent.as_ref().map(Arc::downgrade),
159 ancestor_items,
160 })
161 }
162}
163
164#[derive(Clone, Debug)]
166pub struct MerkleizedBatch<F: Family, D: Digest, Item: Send + Sync, S: Strategy> {
167 pub(crate) inner: Arc<batch::MerkleizedBatch<F, D, S>>,
169 bagging: Bagging,
171 items: Arc<Vec<Item>>,
173 parent: Option<Weak<Self>>,
175 pub(crate) ancestor_items: Vec<Arc<Vec<Item>>>,
177}
178
179impl<F: Family, D: Digest, Item: Send + Sync, S: Strategy> MerkleizedBatch<F, D, Item, S> {
180 pub(crate) fn size(&self) -> u64 {
182 *self.inner.leaves()
183 }
184
185 pub fn root(
190 &self,
191 base: &Mem<F, D>,
192 hasher: &impl merkle::hasher::Hasher<F, Digest = D>,
193 inactive_peaks: usize,
194 ) -> Result<D, merkle::Error<F>> {
195 self.inner.root(base, hasher, inactive_peaks)
196 }
197
198 pub fn proof(
200 &self,
201 hasher: &impl merkle::hasher::Hasher<F, Digest = D>,
202 loc: Location<F>,
203 inactive_peaks: usize,
204 ) -> Result<Proof<F, D>, merkle::Error<F>> {
205 self.inner.proof(hasher, loc, inactive_peaks)
206 }
207
208 pub fn range_proof(
210 &self,
211 hasher: &impl merkle::hasher::Hasher<F, Digest = D>,
212 range: core::ops::Range<Location<F>>,
213 inactive_peaks: usize,
214 ) -> Result<Proof<F, D>, merkle::Error<F>> {
215 self.inner.range_proof(hasher, range, inactive_peaks)
216 }
217
218 pub(crate) const fn items(&self) -> &Arc<Vec<Item>> {
220 &self.items
221 }
222
223 pub fn new_batch<H: Hasher<Digest = D>>(self: &Arc<Self>) -> UnmerkleizedBatch<F, H, Item, S>
228 where
229 Item: Encode,
230 {
231 UnmerkleizedBatch {
232 inner: self.inner.new_batch(),
233 hasher: StandardHasher::new(self.bagging),
234 items: Vec::new(),
235 parent: Some(Arc::clone(self)),
236 }
237 }
238}
239
240impl<F: Family, D: Digest, Item: Send + Sync, S: Strategy> Readable
241 for MerkleizedBatch<F, D, Item, S>
242{
243 type Family = F;
244 type Digest = D;
245 type Error = merkle::Error<F>;
246
247 fn size(&self) -> Position<F> {
248 self.inner.size()
249 }
250
251 fn get_node(&self, pos: Position<F>) -> Option<D> {
252 self.inner.get_node(pos)
253 }
254
255 fn pruning_boundary(&self) -> Location<F> {
256 self.inner.pruning_boundary()
257 }
258}
259
260pub struct Journal<F, E, C, H, S>
265where
266 F: Family,
267 E: Context,
268 C: Contiguous<Item: EncodeShared>,
269 H: Hasher,
270 S: Strategy,
271{
272 pub(crate) merkle: Merkle<F, E, H::Digest, S>,
275
276 pub(crate) journal: C,
279
280 pub(crate) hasher: StandardHasher<H>,
281}
282
283impl<F, E, C, H, S> Journal<F, E, C, H, S>
284where
285 F: Family,
286 E: Context,
287 C: Contiguous<Item: EncodeShared>,
288 H: Hasher,
289 S: Strategy,
290{
291 pub async fn size(&self) -> Location<F> {
293 Location::new(self.journal.size().await)
294 }
295
296 pub fn root(&self, inactive_peaks: usize) -> Result<H::Digest, Error<F>> {
299 self.merkle
300 .root(&self.hasher, inactive_peaks)
301 .map_err(Into::into)
302 }
303
304 fn map_error(error: Error<F>) -> JournalError {
306 match error {
307 Error::Journal(inner) => inner,
308 Error::Merkle(inner) => JournalError::Merkle(anyhow::Error::from(inner)),
309 }
310 }
311
312 pub const fn strategy(&self) -> &S {
314 self.merkle.strategy()
315 }
316
317 pub fn new_batch(&self) -> UnmerkleizedBatch<F, H, C::Item, S>
319 where
320 C::Item: Encode,
321 {
322 let root = self.merkle.to_batch();
323 UnmerkleizedBatch {
324 inner: root.new_batch(),
325 hasher: StandardHasher::new(self.hasher.root_bagging()),
326 items: Vec::new(),
327 parent: None,
328 }
329 }
330
331 pub(crate) fn with_mem<R>(&self, f: impl FnOnce(&Mem<F, H::Digest>) -> R) -> R {
333 self.merkle.with_mem(f)
334 }
335
336 pub(crate) fn to_merkleized_batch(&self) -> Arc<MerkleizedBatch<F, H::Digest, C::Item, S>> {
341 Arc::new(MerkleizedBatch {
342 inner: self.merkle.to_batch(),
343 bagging: self.hasher.root_bagging(),
344 items: Arc::new(Vec::new()),
345 parent: None,
346 ancestor_items: Vec::new(),
347 })
348 }
349}
350
351impl<F, E, C, H, S> Journal<F, E, C, H, S>
352where
353 F: Family,
354 E: Context,
355 C: Contiguous<Item: EncodeShared> + Persistable<Error = JournalError>,
356 H: Hasher,
357 S: Strategy,
358{
359 pub async fn commit(&self) -> Result<(), Error<F>> {
362 self.journal.commit().await.map_err(Error::Journal)
363 }
364}
365
366impl<F, E, C, H, S> Journal<F, E, C, H, S>
367where
368 F: Family,
369 E: Context,
370 C: Mutable<Item: EncodeShared>,
371 H: Hasher,
372 S: Strategy,
373{
374 pub async fn from_components(
377 mut merkle: Merkle<F, E, H::Digest, S>,
378 journal: C,
379 hasher: StandardHasher<H>,
380 apply_batch_size: u64,
381 ) -> Result<Self, Error<F>> {
382 Self::align(&mut merkle, &journal, &hasher, apply_batch_size).await?;
383
384 merkle.sync().await?;
387
388 Ok(Self {
389 merkle,
390 journal,
391 hasher,
392 })
393 }
394
395 async fn align(
400 merkle: &mut Merkle<F, E, H::Digest, S>,
401 journal: &C,
402 hasher: &StandardHasher<H>,
403 apply_batch_size: u64,
404 ) -> Result<(), Error<F>> {
405 let journal_size = journal.size().await;
407 let mut merkle_leaves = merkle.leaves();
408 if merkle_leaves > journal_size {
409 let rewind_count = merkle_leaves - journal_size;
410 warn!(
411 journal_size,
412 ?rewind_count,
413 "rewinding Merkle structure to match journal"
414 );
415 merkle.rewind(*rewind_count as usize).await?;
416 merkle_leaves = Location::new(journal_size);
417 }
418
419 if merkle_leaves < journal_size {
421 let replay_count = journal_size - *merkle_leaves;
422 warn!(
423 ?journal_size,
424 replay_count, "Merkle structure lags behind journal, replaying journal to catch up"
425 );
426
427 let reader = journal.reader().await;
428 while merkle_leaves < journal_size {
429 let batch = {
430 let mut batch = merkle.new_batch();
431 let mut count = 0u64;
432 while count < apply_batch_size && merkle_leaves < journal_size {
433 let op = reader.read(*merkle_leaves).await?;
434 batch = batch.add(hasher, &op.encode());
435 merkle_leaves += 1;
436 count += 1;
437 }
438 batch
439 };
440 let batch = merkle.with_mem(|mem| batch.merkleize(mem, hasher));
441 merkle.apply_batch(&batch)?;
442 }
443 return Ok(());
444 }
445
446 assert_eq!(journal.size().await, *merkle.leaves());
448
449 Ok(())
450 }
451
452 pub async fn append(&mut self, item: &C::Item) -> Result<Location<F>, Error<F>> {
454 let encoded_item = item.encode();
455
456 let loc = self.journal.append(item).await?;
458 let unmerkleized_batch = self.merkle.new_batch().add(&self.hasher, &encoded_item);
459 let batch = self
460 .merkle
461 .with_mem(|mem| unmerkleized_batch.merkleize(mem, &self.hasher));
462 self.merkle.apply_batch(&batch)?;
463
464 Ok(Location::new(loc))
465 }
466
467 pub async fn apply_batch(
474 &mut self,
475 batch: &MerkleizedBatch<F, H::Digest, C::Item, S>,
476 ) -> Result<(), Error<F>> {
477 let merkle_size = self.merkle.size();
478 let base_size = batch.inner.base_size();
479
480 let skip_ancestors = if merkle_size == base_size {
486 false
487 } else if merkle_size > base_size && merkle_size < batch.inner.size() {
488 true
489 } else {
490 return Err(merkle::Error::StaleBatch {
493 expected: base_size,
494 actual: merkle_size,
495 }
496 .into());
497 };
498
499 let committed_leaves = self.journal.size().await;
504 let base_leaves = *Location::<F>::try_from(base_size)?;
505 let mut batch_leaf_end = base_leaves;
506 let mut batches: Vec<&[C::Item]> = Vec::with_capacity(batch.ancestor_items.len() + 1);
507 for ancestor in &batch.ancestor_items {
508 batch_leaf_end += ancestor.len() as u64;
509 if skip_ancestors && batch_leaf_end <= committed_leaves {
510 continue;
511 }
512 batches.push(ancestor);
513 }
514 if !batch.items.is_empty() {
515 batches.push(&batch.items);
516 }
517 if !batches.is_empty() {
518 self.journal.append_many(Many::Nested(&batches)).await?;
519 }
520
521 self.merkle.apply_batch(&batch.inner)?;
522 assert_eq!(*self.merkle.leaves(), self.journal.size().await);
523 Ok(())
524 }
525
526 pub async fn rewind(&mut self, size: u64) -> Result<(), Error<F>> {
528 self.journal.rewind(size).await?;
529
530 let leaves = *self.merkle.leaves();
531 if leaves > size {
532 self.merkle.rewind((leaves - size) as usize).await?;
533 }
534
535 Ok(())
536 }
537
538 pub async fn prune(&mut self, prune_loc: Location<F>) -> Result<Location<F>, Error<F>> {
543 self.prune_inner(prune_loc)
544 .await
545 .map(|(boundary, _)| boundary)
546 }
547
548 async fn prune_inner(
549 &mut self,
550 prune_loc: Location<F>,
551 ) -> Result<(Location<F>, bool), Error<F>> {
552 if self.merkle.size() == 0 {
553 return Ok((Location::new(self.reader().await.bounds().start), false));
555 }
556
557 self.merkle.sync().await?;
561
562 let journal_pruned = self.journal.prune(*prune_loc).await?;
563 let bounds = self.reader().await.bounds();
564 let boundary = Location::new(bounds.start);
565 let merkle_boundary = self.merkle.bounds().start;
566
567 if boundary > merkle_boundary {
568 debug!(size = ?bounds.end, ?prune_loc, boundary = ?bounds.start, "pruned inactive ops");
569 self.merkle.prune(boundary).await?;
570 }
571
572 Ok((boundary, journal_pruned || boundary > merkle_boundary))
573 }
574}
575
576impl<F, E, C, H, S> Journal<F, E, C, H, S>
577where
578 F: Family,
579 E: Context,
580 C: Contiguous<Item: EncodeShared>,
581 H: Hasher,
582 S: Strategy,
583{
584 pub async fn proof(
598 &self,
599 start_loc: Location<F>,
600 max_ops: NonZeroU64,
601 inactive_peaks: usize,
602 ) -> Result<(Proof<F, H::Digest>, Vec<C::Item>), Error<F>> {
603 self.historical_proof(self.size().await, start_loc, max_ops, inactive_peaks)
604 .await
605 }
606
607 pub async fn historical_proof(
620 &self,
621 historical_leaves: Location<F>,
622 start_loc: Location<F>,
623 max_ops: NonZeroU64,
624 inactive_peaks: usize,
625 ) -> Result<(Proof<F, H::Digest>, Vec<C::Item>), Error<F>> {
626 let reader = self.journal.reader().await;
627 let bounds = reader.bounds();
628
629 if *historical_leaves > bounds.end {
630 return Err(merkle::Error::RangeOutOfBounds(Location::new(bounds.end)).into());
631 }
632 if start_loc >= historical_leaves {
633 return Err(merkle::Error::RangeOutOfBounds(start_loc).into());
634 }
635
636 let end_loc = std::cmp::min(historical_leaves, start_loc.saturating_add(max_ops.get()));
637
638 let hasher = self.hasher.clone();
639 let proof = self
640 .merkle
641 .historical_range_proof(
642 &hasher,
643 historical_leaves,
644 start_loc..end_loc,
645 inactive_peaks,
646 )
647 .await?;
648
649 let positions: Vec<u64> = (*start_loc..*end_loc).collect();
650 let ops = reader.read_many(&positions).await?;
651
652 Ok((proof, ops))
653 }
654}
655
656impl<F, E, C, H, S> Journal<F, E, C, H, S>
657where
658 F: Family,
659 E: Context,
660 C: Contiguous<Item: EncodeShared> + Persistable<Error = JournalError>,
661 H: Hasher,
662 S: Strategy,
663{
664 pub async fn destroy(self) -> Result<(), Error<F>> {
666 try_join!(
667 self.journal.destroy().map_err(Error::Journal),
668 self.merkle.destroy().map_err(Error::Merkle),
669 )?;
670
671 Ok(())
672 }
673
674 pub async fn sync(&self) -> Result<(), Error<F>> {
676 try_join!(
677 self.journal.sync().map_err(Error::Journal),
678 self.merkle.sync().map_err(Error::Merkle)
679 )?;
680
681 Ok(())
682 }
683}
684
685const APPLY_BATCH_SIZE: u64 = 1 << 16;
687
688macro_rules! impl_journal_new {
691 ($journal_mod:ident, $cfg_ty:ty, $codec_bound:path) => {
692 impl<F, E, O, H, S> Journal<F, E, $journal_mod::Journal<E, O>, H, S>
693 where
694 F: Family,
695 E: Context,
696 O: $codec_bound,
697 H: Hasher,
698 S: Strategy,
699 {
700 pub async fn new(
705 context: E,
706 merkle_cfg: merkle::full::Config<S>,
707 journal_cfg: $cfg_ty,
708 rewind_predicate: fn(&O) -> bool,
709 bagging: merkle::Bagging,
710 ) -> Result<Self, Error<F>> {
711 let mut journal =
712 $journal_mod::Journal::init(context.child("journal"), journal_cfg).await?;
713 journal.rewind_to(rewind_predicate).await?;
714
715 let hasher = StandardHasher::<H>::new(bagging);
716 let mut merkle = Merkle::init(context.child("merkle"), &hasher, merkle_cfg).await?;
717 Self::align(&mut merkle, &journal, &hasher, APPLY_BATCH_SIZE).await?;
718
719 journal.sync().await?;
720 merkle.sync().await?;
721
722 Ok(Self {
723 merkle,
724 journal,
725 hasher,
726 })
727 }
728 }
729 };
730}
731
732impl_journal_new!(fixed, fixed::Config, CodecFixedShared);
733impl_journal_new!(variable, variable::Config<O::Cfg>, CodecShared);
734
735impl<F, E, C, H, S> Contiguous for Journal<F, E, C, H, S>
736where
737 F: Family,
738 E: Context,
739 C: Contiguous<Item: EncodeShared>,
740 H: Hasher,
741 S: Strategy,
742{
743 type Item = C::Item;
744
745 async fn reader(&self) -> impl Reader<Item = C::Item> + '_ {
746 self.journal.reader().await
747 }
748
749 async fn size(&self) -> u64 {
750 self.journal.size().await
751 }
752}
753
754impl<F, E, C, H, S> Mutable for Journal<F, E, C, H, S>
755where
756 F: Family,
757 E: Context,
758 C: Mutable<Item: EncodeShared>,
759 H: Hasher,
760 S: Strategy,
761{
762 async fn append(&mut self, item: &Self::Item) -> Result<u64, JournalError> {
763 let res = self.append(item).await.map_err(Self::map_error)?;
764
765 Ok(*res)
766 }
767
768 async fn prune(&mut self, min_position: u64) -> Result<bool, JournalError> {
769 let prune_to = {
770 let reader = self.journal.reader().await;
771 let bounds = reader.bounds();
772 min_position.min(bounds.end)
773 };
774
775 let (_, pruned) = self
776 .prune_inner(Location::new(prune_to))
777 .await
778 .map_err(Self::map_error)?;
779 Ok(pruned)
780 }
781
782 async fn rewind(&mut self, size: u64) -> Result<(), JournalError> {
783 self.rewind(size).await.map_err(Self::map_error)
784 }
785}
786
787pub trait Inner<E: Context>: Mutable + Persistable<Error = JournalError> {
789 type Config: Clone + Send;
791
792 fn init<F: Family, H: Hasher, S: Strategy>(
794 context: E,
795 merkle_cfg: merkle::full::Config<S>,
796 journal_cfg: Self::Config,
797 rewind_predicate: fn(&Self::Item) -> bool,
798 bagging: merkle::Bagging,
799 ) -> impl core::future::Future<Output = Result<Journal<F, E, Self, H, S>, Error<F>>> + Send
800 where
801 Self: Sized,
802 Self::Item: EncodeShared;
803}
804
805impl<F, E, C, H, S> Persistable for Journal<F, E, C, H, S>
806where
807 F: Family,
808 E: Context,
809 C: Contiguous<Item: EncodeShared> + Persistable<Error = JournalError>,
810 H: Hasher,
811 S: Strategy,
812{
813 type Error = JournalError;
814
815 async fn commit(&self) -> Result<(), JournalError> {
816 self.commit().await.map_err(Self::map_error)
817 }
818
819 async fn sync(&self) -> Result<(), JournalError> {
820 self.sync().await.map_err(Self::map_error)
821 }
822
823 async fn destroy(self) -> Result<(), JournalError> {
824 self.destroy().await.map_err(Self::map_error)
825 }
826}
827
828#[cfg(test)]
829impl<F, E, C, H, S> Journal<F, E, C, H, S>
830where
831 F: Family,
832 E: Context,
833 C: Contiguous<Item: EncodeShared>,
834 S: Strategy,
835 H: Hasher,
836{
837 pub(crate) async fn read(&self, loc: Location<F>) -> Result<C::Item, Error<F>> {
839 self.journal
840 .reader()
841 .await
842 .read(*loc)
843 .await
844 .map_err(Error::Journal)
845 }
846}
847
848#[cfg(test)]
849mod tests {
850 use super::*;
851 use crate::{
852 journal::contiguous::fixed::{Config as JConfig, Journal as ContiguousJournal},
853 merkle::{
854 full::{Config as MerkleConfig, Merkle},
855 mmb, mmr,
856 Bagging::{BackwardFold, ForwardFold},
857 },
858 qmdb::{
859 any::{
860 operation::{update::Unordered as Update, Unordered as Op},
861 value::FixedEncoding,
862 },
863 operation::Committable,
864 },
865 };
866 use commonware_codec::Encode;
867 use commonware_cryptography::{sha256::Digest, Sha256};
868 use commonware_macros::test_traced;
869 use commonware_parallel::Sequential;
870 use commonware_runtime::{
871 buffer::paged::CacheRef,
872 deterministic::{self, Context},
873 BufferPooler, Runner as _, Supervisor as _,
874 };
875 use commonware_utils::{NZUsize, NZU16, NZU64};
876 use futures::StreamExt as _;
877 use std::num::{NonZeroU16, NonZeroUsize};
878
879 const PAGE_SIZE: NonZeroU16 = NZU16!(101);
880 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(11);
881
882 type TestOp<F> = Op<F, Digest, FixedEncoding<Digest>>;
884
885 type TestJournal<F> = Journal<
887 F,
888 deterministic::Context,
889 ContiguousJournal<deterministic::Context, TestOp<F>>,
890 Sha256,
891 Sequential,
892 >;
893
894 fn journal_root<F: Family>(journal: &TestJournal<F>) -> Digest {
895 journal.root(0).unwrap()
896 }
897
898 fn batch_root<F: Family>(
899 journal: &TestJournal<F>,
900 batch: &MerkleizedBatch<F, Digest, TestOp<F>, Sequential>,
901 ) -> Digest {
902 journal
903 .merkle
904 .with_mem(|mem| batch.root(mem, &journal.hasher, 0))
905 .unwrap()
906 }
907
908 fn merkle_config(suffix: &str, pooler: &impl BufferPooler) -> MerkleConfig<Sequential> {
910 MerkleConfig {
911 journal_partition: format!("mmr-journal-{suffix}"),
912 metadata_partition: format!("mmr-metadata-{suffix}"),
913 items_per_blob: NZU64!(11),
914 write_buffer: NZUsize!(1024),
915 strategy: Sequential,
916 page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
917 }
918 }
919
920 fn journal_config(suffix: &str, pooler: &impl BufferPooler) -> JConfig {
922 JConfig {
923 partition: format!("journal-{suffix}"),
924 items_per_blob: NZU64!(7),
925 write_buffer: NZUsize!(1024),
926 page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
927 }
928 }
929
930 async fn create_empty_journal<F: Family + PartialEq>(
932 context: Context,
933 suffix: &str,
934 ) -> TestJournal<F> {
935 let merkle_cfg = merkle_config(suffix, &context);
936 let journal_cfg = journal_config(suffix, &context);
937 TestJournal::<F>::new(
938 context,
939 merkle_cfg,
940 journal_cfg,
941 |op: &TestOp<F>| op.is_commit(),
942 ForwardFold,
943 )
944 .await
945 .unwrap()
946 }
947
948 #[test]
949 fn test_batches_inherit_journal_bagging() {
950 deterministic::Runner::default().start(|context| async move {
951 let merkle_cfg = merkle_config("batch-bagging", &context);
952 let journal_cfg = journal_config("batch-bagging", &context);
953 let journal = TestJournal::<mmr::Family>::new(
954 context,
955 merkle_cfg,
956 journal_cfg,
957 |op: &TestOp<mmr::Family>| op.is_commit(),
958 BackwardFold,
959 )
960 .await
961 .unwrap();
962
963 let batch = journal.new_batch();
964 assert_eq!(batch.hasher.root_bagging(), BackwardFold);
965
966 let merkleized = journal.merkle.with_mem(|mem| batch.merkleize(mem));
967 let child: UnmerkleizedBatch<mmr::Family, Sha256, TestOp<mmr::Family>, Sequential> =
968 merkleized.new_batch();
969 assert_eq!(child.hasher.root_bagging(), BackwardFold);
970 });
971 }
972
973 fn create_operation<F: Family + PartialEq>(index: u8) -> TestOp<F> {
975 TestOp::<F>::Update(Update(
976 Sha256::fill(index),
977 Sha256::fill(index.wrapping_add(1)),
978 ))
979 }
980
981 async fn create_journal_with_ops<F: Family + PartialEq>(
985 context: Context,
986 suffix: &str,
987 count: usize,
988 ) -> TestJournal<F> {
989 let mut journal = create_empty_journal::<F>(context, suffix).await;
990
991 for i in 0..count {
992 let op = create_operation::<F>(i as u8);
993 let loc = journal.append(&op).await.unwrap();
994 assert_eq!(loc, Location::<F>::new(i as u64));
995 }
996
997 journal.sync().await.unwrap();
998 journal
999 }
1000
1001 async fn create_components<F: Family + PartialEq>(
1007 context: Context,
1008 suffix: &str,
1009 ) -> (
1010 Merkle<F, deterministic::Context, Digest, Sequential>,
1011 ContiguousJournal<deterministic::Context, TestOp<F>>,
1012 StandardHasher<Sha256>,
1013 ) {
1014 let hasher = StandardHasher::new(ForwardFold);
1015 let merkle = Merkle::<F, _, Digest, Sequential>::init(
1016 context.child("mmr"),
1017 &hasher,
1018 merkle_config(suffix, &context),
1019 )
1020 .await
1021 .unwrap();
1022 let journal =
1023 ContiguousJournal::init(context.child("journal"), journal_config(suffix, &context))
1024 .await
1025 .unwrap();
1026 (merkle, journal, hasher)
1027 }
1028
1029 fn verify_proof<F: Family + PartialEq>(
1032 proof: &Proof<F, <Sha256 as commonware_cryptography::Hasher>::Digest>,
1033 operations: &[TestOp<F>],
1034 start_loc: Location<F>,
1035 root: &<Sha256 as commonware_cryptography::Hasher>::Digest,
1036 hasher: &StandardHasher<Sha256>,
1037 ) -> bool {
1038 let encoded_ops: Vec<_> = operations.iter().map(|op| op.encode()).collect();
1039 proof.verify_range_inclusion(hasher, &encoded_ops, start_loc, root)
1040 }
1041
1042 async fn test_new_creates_empty_journal_inner<F: Family + PartialEq>(context: Context) {
1044 let journal = create_empty_journal::<F>(context, "new-empty").await;
1045
1046 let bounds = journal.reader().await.bounds();
1047 assert_eq!(bounds.end, 0);
1048 assert_eq!(bounds.start, 0);
1049 assert!(bounds.is_empty());
1050 }
1051
1052 #[test_traced("INFO")]
1053 fn test_new_creates_empty_journal_mmr() {
1054 let executor = deterministic::Runner::default();
1055 executor.start(test_new_creates_empty_journal_inner::<mmr::Family>);
1056 }
1057
1058 #[test_traced("INFO")]
1059 fn test_new_creates_empty_journal_mmb() {
1060 let executor = deterministic::Runner::default();
1061 executor.start(test_new_creates_empty_journal_inner::<mmb::Family>);
1062 }
1063
1064 async fn test_align_with_empty_mmr_and_journal_inner<F: Family + PartialEq>(context: Context) {
1066 let (mut merkle, journal, hasher) = create_components::<F>(context, "align-empty").await;
1067
1068 TestJournal::<F>::align(&mut merkle, &journal, &hasher, APPLY_BATCH_SIZE)
1069 .await
1070 .unwrap();
1071
1072 assert_eq!(merkle.leaves(), Location::<F>::new(0));
1073 assert_eq!(journal.size().await, 0);
1074 }
1075
1076 #[test_traced("INFO")]
1077 fn test_align_with_empty_mmr_and_journal_mmr() {
1078 let executor = deterministic::Runner::default();
1079 executor.start(test_align_with_empty_mmr_and_journal_inner::<mmr::Family>);
1080 }
1081
1082 #[test_traced("INFO")]
1083 fn test_align_with_empty_mmr_and_journal_mmb() {
1084 let executor = deterministic::Runner::default();
1085 executor.start(test_align_with_empty_mmr_and_journal_inner::<mmb::Family>);
1086 }
1087
1088 async fn test_align_when_mmr_ahead_inner<F: Family + PartialEq>(context: Context) {
1090 let (mut merkle, journal, hasher) = create_components::<F>(context, "mmr-ahead").await;
1091
1092 {
1094 let batch = {
1095 let mut batch = merkle.new_batch();
1096 for i in 0..20 {
1097 let op = create_operation::<F>(i as u8);
1098 let encoded = op.encode();
1099 batch = batch.add(&hasher, &encoded);
1100 journal.append(&op).await.unwrap();
1101 }
1102 batch
1103 };
1104 let batch = merkle.with_mem(|mem| batch.merkleize(mem, &hasher));
1105 merkle.apply_batch(&batch).unwrap();
1106 }
1107
1108 let commit_op = TestOp::<F>::CommitFloor(None, Location::<F>::new(0));
1110 journal.append(&commit_op).await.unwrap();
1111 journal.sync().await.unwrap();
1112
1113 TestJournal::<F>::align(&mut merkle, &journal, &hasher, APPLY_BATCH_SIZE)
1115 .await
1116 .unwrap();
1117
1118 assert_eq!(merkle.leaves(), Location::<F>::new(21));
1120 assert_eq!(journal.size().await, 21);
1121 }
1122
1123 #[test_traced("WARN")]
1124 fn test_align_when_mmr_ahead_mmr() {
1125 let executor = deterministic::Runner::default();
1126 executor.start(test_align_when_mmr_ahead_inner::<mmr::Family>);
1127 }
1128
1129 #[test_traced("WARN")]
1130 fn test_align_when_mmr_ahead_mmb() {
1131 let executor = deterministic::Runner::default();
1132 executor.start(test_align_when_mmr_ahead_inner::<mmb::Family>);
1133 }
1134
1135 async fn test_align_when_journal_ahead_inner<F: Family + PartialEq>(context: Context) {
1137 let (mut merkle, journal, hasher) = create_components::<F>(context, "journal-ahead").await;
1138
1139 for i in 0..20 {
1141 let op = create_operation::<F>(i as u8);
1142 journal.append(&op).await.unwrap();
1143 }
1144
1145 let commit_op = TestOp::<F>::CommitFloor(None, Location::<F>::new(0));
1147 journal.append(&commit_op).await.unwrap();
1148 journal.sync().await.unwrap();
1149
1150 TestJournal::<F>::align(&mut merkle, &journal, &hasher, APPLY_BATCH_SIZE)
1152 .await
1153 .unwrap();
1154
1155 assert_eq!(merkle.leaves(), Location::<F>::new(21));
1157 assert_eq!(journal.size().await, 21);
1158 }
1159
1160 #[test_traced("WARN")]
1161 fn test_align_when_journal_ahead_mmr() {
1162 let executor = deterministic::Runner::default();
1163 executor.start(test_align_when_journal_ahead_inner::<mmr::Family>);
1164 }
1165
1166 #[test_traced("WARN")]
1167 fn test_align_when_journal_ahead_mmb() {
1168 let executor = deterministic::Runner::default();
1169 executor.start(test_align_when_journal_ahead_inner::<mmb::Family>);
1170 }
1171
1172 async fn test_align_with_mismatched_committed_ops_inner<F: Family + PartialEq>(
1174 context: Context,
1175 ) {
1176 let mut journal = create_empty_journal::<F>(context.child("first"), "mismatched").await;
1177
1178 for i in 0..20 {
1180 let loc = journal
1181 .append(&create_operation::<F>(i as u8))
1182 .await
1183 .unwrap();
1184 assert_eq!(loc, Location::<F>::new(i as u64));
1185 }
1186
1187 let size_before = journal.size().await;
1190 assert_eq!(size_before, 20);
1191
1192 journal.sync().await.unwrap();
1194 drop(journal);
1195 let journal = create_empty_journal::<F>(context.child("second"), "mismatched").await;
1196
1197 assert_eq!(journal.size().await, 0);
1199 }
1200
1201 #[test_traced("INFO")]
1202 fn test_align_with_mismatched_committed_ops_mmr() {
1203 let executor = deterministic::Runner::default();
1204 executor.start(|context| {
1205 test_align_with_mismatched_committed_ops_inner::<mmr::Family>(context)
1206 });
1207 }
1208
1209 #[test_traced("INFO")]
1210 fn test_align_with_mismatched_committed_ops_mmb() {
1211 let executor = deterministic::Runner::default();
1212 executor.start(|context| {
1213 test_align_with_mismatched_committed_ops_inner::<mmb::Family>(context)
1214 });
1215 }
1216
1217 async fn test_rewind_inner<F: Family + PartialEq>(context: Context) {
1218 {
1220 let mut journal = ContiguousJournal::init(
1221 context.child("rewind_match"),
1222 journal_config("rewind-match", &context),
1223 )
1224 .await
1225 .unwrap();
1226
1227 for i in 0..3 {
1229 journal.append(&create_operation::<F>(i)).await.unwrap();
1230 }
1231 journal
1232 .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(0)))
1233 .await
1234 .unwrap();
1235 for i in 4..7 {
1236 journal.append(&create_operation::<F>(i)).await.unwrap();
1237 }
1238
1239 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1241 assert_eq!(final_size, 4);
1242 assert_eq!(journal.size().await, 4);
1243
1244 let op = journal.read(3).await.unwrap();
1246 assert!(op.is_commit());
1247 }
1248
1249 {
1251 let mut journal = ContiguousJournal::init(
1252 context.child("rewind_multiple"),
1253 journal_config("rewind-multiple", &context),
1254 )
1255 .await
1256 .unwrap();
1257
1258 journal.append(&create_operation::<F>(0)).await.unwrap();
1260 journal
1261 .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(0)))
1262 .await
1263 .unwrap(); journal.append(&create_operation::<F>(2)).await.unwrap();
1265 journal
1266 .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(1)))
1267 .await
1268 .unwrap(); journal.append(&create_operation::<F>(4)).await.unwrap();
1270
1271 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1273 assert_eq!(final_size, 4);
1274
1275 let op = journal.read(3).await.unwrap();
1277 assert!(op.is_commit());
1278
1279 assert!(journal.read(4).await.is_err());
1281 }
1282
1283 {
1285 let mut journal = ContiguousJournal::init(
1286 context.child("rewind_no_match"),
1287 journal_config("rewind-no-match", &context),
1288 )
1289 .await
1290 .unwrap();
1291
1292 for i in 0..10 {
1294 journal.append(&create_operation::<F>(i)).await.unwrap();
1295 }
1296
1297 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1299 assert_eq!(final_size, 0, "Should rewind to pruning boundary (0)");
1300 assert_eq!(journal.size().await, 0);
1301 }
1302
1303 {
1305 let mut journal = ContiguousJournal::init(
1306 context.child("rewind_with_pruning"),
1307 journal_config("rewind-with-pruning", &context),
1308 )
1309 .await
1310 .unwrap();
1311
1312 for i in 0..10 {
1314 journal.append(&create_operation::<F>(i)).await.unwrap();
1315 }
1316 journal
1317 .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(0)))
1318 .await
1319 .unwrap(); for i in 11..15 {
1321 journal.append(&create_operation::<F>(i)).await.unwrap();
1322 }
1323 journal.sync().await.unwrap();
1324
1325 journal.prune(8).await.unwrap();
1327 assert_eq!(journal.reader().await.bounds().start, 7);
1328
1329 for i in 15..20 {
1331 journal.append(&create_operation::<F>(i)).await.unwrap();
1332 }
1333
1334 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1336 assert_eq!(final_size, 11);
1337
1338 let op = journal.read(10).await.unwrap();
1340 assert!(op.is_commit());
1341 }
1342
1343 {
1345 let mut journal = ContiguousJournal::init(
1346 context.child("rewind_no_match_pruned"),
1347 journal_config("rewind-no-match-pruned", &context),
1348 )
1349 .await
1350 .unwrap();
1351
1352 for i in 0..5 {
1354 journal.append(&create_operation::<F>(i)).await.unwrap();
1355 }
1356 journal
1357 .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(0)))
1358 .await
1359 .unwrap(); for i in 6..10 {
1361 journal.append(&create_operation::<F>(i)).await.unwrap();
1362 }
1363 journal.sync().await.unwrap();
1364
1365 journal.prune(8).await.unwrap();
1368 assert_eq!(journal.reader().await.bounds().start, 7);
1369
1370 for i in 10..14 {
1372 journal.append(&create_operation::<F>(i)).await.unwrap();
1373 }
1374
1375 let final_size = journal.rewind_to(|op| op.is_commit()).await.unwrap();
1378 assert_eq!(final_size, 7);
1379 }
1380
1381 {
1383 let mut journal = ContiguousJournal::init(
1384 context.child("rewind_empty"),
1385 journal_config("rewind-empty", &context),
1386 )
1387 .await
1388 .unwrap();
1389
1390 let final_size = journal
1392 .rewind_to(|op: &TestOp<F>| op.is_commit())
1393 .await
1394 .unwrap();
1395 assert_eq!(final_size, 0);
1396 assert_eq!(journal.size().await, 0);
1397 }
1398
1399 {
1401 let merkle_cfg = merkle_config("rewind", &context);
1402 let journal_cfg = journal_config("rewind", &context);
1403 let mut journal = TestJournal::<F>::new(
1404 context,
1405 merkle_cfg,
1406 journal_cfg,
1407 |op| op.is_commit(),
1408 ForwardFold,
1409 )
1410 .await
1411 .unwrap();
1412
1413 for i in 0..5 {
1415 journal.append(&create_operation::<F>(i)).await.unwrap();
1416 }
1417 journal
1418 .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(0)))
1419 .await
1420 .unwrap(); for i in 6..10 {
1422 journal.append(&create_operation::<F>(i)).await.unwrap();
1423 }
1424 assert_eq!(journal.size().await, 10);
1425
1426 journal.rewind(2).await.unwrap();
1427 assert_eq!(journal.size().await, 2);
1428 assert_eq!(journal.merkle.leaves(), 2);
1429 assert_eq!(journal.merkle.size(), 3);
1430 let bounds = journal.reader().await.bounds();
1431 assert_eq!(bounds.start, 0);
1432 assert!(!bounds.is_empty());
1433
1434 assert!(matches!(
1435 journal.rewind(3).await,
1436 Err(Error::Journal(JournalError::InvalidRewind(_)))
1437 ));
1438
1439 journal.rewind(0).await.unwrap();
1440 assert_eq!(journal.size().await, 0);
1441 assert_eq!(journal.merkle.leaves(), 0);
1442 assert_eq!(journal.merkle.size(), 0);
1443 let bounds = journal.reader().await.bounds();
1444 assert_eq!(bounds.start, 0);
1445 assert!(bounds.is_empty());
1446
1447 for i in 0..255 {
1449 journal.append(&create_operation::<F>(i)).await.unwrap();
1450 }
1451 journal.prune(Location::<F>::new(100)).await.unwrap();
1452 assert_eq!(journal.reader().await.bounds().start, 98);
1453 let res = journal.rewind(97).await;
1454 assert!(matches!(
1455 res,
1456 Err(Error::Journal(JournalError::InvalidRewind(97)))
1457 ));
1458 journal.rewind(98).await.unwrap();
1459 let bounds = journal.reader().await.bounds();
1460 assert_eq!(bounds.end, 98);
1461 assert_eq!(journal.merkle.leaves(), 98);
1462 assert_eq!(bounds.start, 98);
1463 assert!(bounds.is_empty());
1464 }
1465 }
1466
1467 #[test_traced("INFO")]
1468 fn test_rewind_mmr() {
1469 let executor = deterministic::Runner::default();
1470 executor.start(test_rewind_inner::<mmr::Family>);
1471 }
1472
1473 #[test_traced("INFO")]
1474 fn test_rewind_mmb() {
1475 let executor = deterministic::Runner::default();
1476 executor.start(test_rewind_inner::<mmb::Family>);
1477 }
1478
1479 async fn test_apply_op_and_read_operations_inner<F: Family + PartialEq>(context: Context) {
1482 let mut journal = create_empty_journal::<F>(context, "apply_op").await;
1483
1484 assert_eq!(journal.size().await, 0);
1485
1486 let expected_ops: Vec<_> = (0..50).map(|i| create_operation::<F>(i as u8)).collect();
1488 for (i, op) in expected_ops.iter().enumerate() {
1489 let loc = journal.append(op).await.unwrap();
1490 assert_eq!(loc, Location::<F>::new(i as u64));
1491 assert_eq!(journal.size().await, (i + 1) as u64);
1492 }
1493
1494 assert_eq!(journal.size().await, 50);
1495
1496 journal.sync().await.unwrap();
1498 for (i, expected_op) in expected_ops.iter().enumerate() {
1499 let read_op = journal.read(Location::<F>::new(i as u64)).await.unwrap();
1500 assert_eq!(read_op, *expected_op);
1501 }
1502 }
1503
1504 #[test_traced("INFO")]
1505 fn test_apply_op_and_read_operations_mmr() {
1506 let executor = deterministic::Runner::default();
1507 executor.start(test_apply_op_and_read_operations_inner::<mmr::Family>);
1508 }
1509
1510 #[test_traced("INFO")]
1511 fn test_apply_op_and_read_operations_mmb() {
1512 let executor = deterministic::Runner::default();
1513 executor.start(test_apply_op_and_read_operations_inner::<mmb::Family>);
1514 }
1515
1516 async fn test_read_operations_at_various_positions_inner<F: Family + PartialEq>(
1518 context: Context,
1519 ) {
1520 let journal = create_journal_with_ops::<F>(context, "read", 50).await;
1521
1522 let first_op = journal.read(Location::<F>::new(0)).await.unwrap();
1524 assert_eq!(first_op, create_operation::<F>(0));
1525
1526 let middle_op = journal.read(Location::<F>::new(25)).await.unwrap();
1528 assert_eq!(middle_op, create_operation::<F>(25));
1529
1530 let last_op = journal.read(Location::<F>::new(49)).await.unwrap();
1532 assert_eq!(last_op, create_operation::<F>(49));
1533
1534 for i in 0..50 {
1536 let op = journal.read(Location::<F>::new(i)).await.unwrap();
1537 assert_eq!(op, create_operation::<F>(i as u8));
1538 }
1539 }
1540
1541 #[test_traced("INFO")]
1542 fn test_read_operations_at_various_positions_mmr() {
1543 let executor = deterministic::Runner::default();
1544 executor.start(|context| {
1545 test_read_operations_at_various_positions_inner::<mmr::Family>(context)
1546 });
1547 }
1548
1549 #[test_traced("INFO")]
1550 fn test_read_operations_at_various_positions_mmb() {
1551 let executor = deterministic::Runner::default();
1552 executor.start(|context| {
1553 test_read_operations_at_various_positions_inner::<mmb::Family>(context)
1554 });
1555 }
1556
1557 async fn test_read_pruned_operation_returns_error_inner<F: Family + PartialEq>(
1559 context: Context,
1560 ) {
1561 let mut journal = create_journal_with_ops::<F>(context, "read_pruned", 100).await;
1562
1563 journal
1565 .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(50)))
1566 .await
1567 .unwrap();
1568 journal.sync().await.unwrap();
1569 let pruned_boundary = journal.prune(Location::<F>::new(50)).await.unwrap();
1570
1571 let read_loc = Location::<F>::new(0);
1573 if read_loc < pruned_boundary {
1574 let result = journal.read(read_loc).await;
1575 assert!(matches!(
1576 result,
1577 Err(Error::Journal(crate::journal::Error::ItemPruned(_)))
1578 ));
1579 }
1580 }
1581
1582 #[test_traced("INFO")]
1583 fn test_read_pruned_operation_returns_error_mmr() {
1584 let executor = deterministic::Runner::default();
1585 executor.start(|context| {
1586 test_read_pruned_operation_returns_error_inner::<mmr::Family>(context)
1587 });
1588 }
1589
1590 #[test_traced("INFO")]
1591 fn test_read_pruned_operation_returns_error_mmb() {
1592 let executor = deterministic::Runner::default();
1593 executor.start(|context| {
1594 test_read_pruned_operation_returns_error_inner::<mmb::Family>(context)
1595 });
1596 }
1597
1598 async fn test_read_out_of_range_returns_error_inner<F: Family + PartialEq>(context: Context) {
1600 let journal = create_journal_with_ops::<F>(context, "read_oob", 3).await;
1601
1602 let result = journal.read(Location::<F>::new(10)).await;
1604 assert!(matches!(
1605 result,
1606 Err(Error::Journal(crate::journal::Error::ItemOutOfRange(_)))
1607 ));
1608 }
1609
1610 #[test_traced("INFO")]
1611 fn test_read_out_of_range_returns_error_mmr() {
1612 let executor = deterministic::Runner::default();
1613 executor.start(test_read_out_of_range_returns_error_inner::<mmr::Family>);
1614 }
1615
1616 #[test_traced("INFO")]
1617 fn test_read_out_of_range_returns_error_mmb() {
1618 let executor = deterministic::Runner::default();
1619 executor.start(test_read_out_of_range_returns_error_inner::<mmb::Family>);
1620 }
1621
1622 async fn test_read_all_operations_back_correctly_inner<F: Family + PartialEq>(
1624 context: Context,
1625 ) {
1626 let journal = create_journal_with_ops::<F>(context, "read_all", 50).await;
1627
1628 assert_eq!(journal.size().await, 50);
1629
1630 for i in 0..50 {
1632 let op = journal.read(Location::<F>::new(i)).await.unwrap();
1633 assert_eq!(op, create_operation::<F>(i as u8));
1634 }
1635 }
1636
1637 #[test_traced("INFO")]
1638 fn test_read_all_operations_back_correctly_mmr() {
1639 let executor = deterministic::Runner::default();
1640 executor.start(test_read_all_operations_back_correctly_inner::<mmr::Family>);
1641 }
1642
1643 #[test_traced("INFO")]
1644 fn test_read_all_operations_back_correctly_mmb() {
1645 let executor = deterministic::Runner::default();
1646 executor.start(test_read_all_operations_back_correctly_inner::<mmb::Family>);
1647 }
1648
1649 async fn test_sync_inner<F: Family + PartialEq>(context: Context) {
1651 let mut journal = create_empty_journal::<F>(context.child("first"), "close_pending").await;
1652
1653 let expected_ops: Vec<_> = (0..20).map(|i| create_operation::<F>(i as u8)).collect();
1655 for (i, op) in expected_ops.iter().enumerate() {
1656 let loc = journal.append(op).await.unwrap();
1657 assert_eq!(loc, Location::<F>::new(i as u64),);
1658 }
1659
1660 let commit_loc = journal
1662 .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(0)))
1663 .await
1664 .unwrap();
1665 assert_eq!(
1666 commit_loc,
1667 Location::<F>::new(20),
1668 "commit should be at location 20"
1669 );
1670 journal.sync().await.unwrap();
1671
1672 drop(journal);
1674 let journal = create_empty_journal::<F>(context.child("second"), "close_pending").await;
1675 assert_eq!(journal.size().await, 21);
1676
1677 for (i, expected_op) in expected_ops.iter().enumerate() {
1679 let read_op = journal.read(Location::<F>::new(i as u64)).await.unwrap();
1680 assert_eq!(read_op, *expected_op);
1681 }
1682 }
1683
1684 #[test_traced("INFO")]
1685 fn test_sync_mmr() {
1686 let executor = deterministic::Runner::default();
1687 executor.start(test_sync_inner::<mmr::Family>);
1688 }
1689
1690 #[test_traced("INFO")]
1691 fn test_sync_mmb() {
1692 let executor = deterministic::Runner::default();
1693 executor.start(test_sync_inner::<mmb::Family>);
1694 }
1695
1696 async fn test_prune_empty_journal_inner<F: Family + PartialEq>(context: Context) {
1698 let mut journal = create_empty_journal::<F>(context, "prune_empty").await;
1699
1700 let boundary = journal.prune(Location::<F>::new(0)).await.unwrap();
1701
1702 assert_eq!(boundary, Location::<F>::new(0));
1703 }
1704
1705 #[test_traced("INFO")]
1706 fn test_prune_empty_journal_mmr() {
1707 let executor = deterministic::Runner::default();
1708 executor.start(test_prune_empty_journal_inner::<mmr::Family>);
1709 }
1710
1711 #[test_traced("INFO")]
1712 fn test_prune_empty_journal_mmb() {
1713 let executor = deterministic::Runner::default();
1714 executor.start(test_prune_empty_journal_inner::<mmb::Family>);
1715 }
1716
1717 async fn test_prune_to_location_inner<F: Family + PartialEq>(context: Context) {
1719 let mut journal = create_journal_with_ops::<F>(context, "prune_to", 100).await;
1720
1721 journal
1723 .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(50)))
1724 .await
1725 .unwrap();
1726 journal.sync().await.unwrap();
1727
1728 let boundary = journal.prune(Location::<F>::new(50)).await.unwrap();
1729
1730 assert!(boundary <= Location::<F>::new(50));
1732 }
1733
1734 #[test_traced("INFO")]
1735 fn test_prune_to_location_mmr() {
1736 let executor = deterministic::Runner::default();
1737 executor.start(test_prune_to_location_inner::<mmr::Family>);
1738 }
1739
1740 #[test_traced("INFO")]
1741 fn test_prune_to_location_mmb() {
1742 let executor = deterministic::Runner::default();
1743 executor.start(test_prune_to_location_inner::<mmb::Family>);
1744 }
1745
1746 async fn test_prune_returns_actual_boundary_inner<F: Family + PartialEq>(context: Context) {
1748 let mut journal = create_journal_with_ops::<F>(context, "prune_boundary", 100).await;
1749
1750 journal
1751 .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(50)))
1752 .await
1753 .unwrap();
1754 journal.sync().await.unwrap();
1755
1756 let requested = Location::<F>::new(50);
1757 let actual = journal.prune(requested).await.unwrap();
1758
1759 let bounds = journal.reader().await.bounds();
1761 assert!(!bounds.is_empty());
1762 assert_eq!(actual, bounds.start);
1763
1764 assert!(actual <= requested);
1766 }
1767
1768 #[test_traced("INFO")]
1769 fn test_prune_returns_actual_boundary_mmr() {
1770 let executor = deterministic::Runner::default();
1771 executor.start(test_prune_returns_actual_boundary_inner::<mmr::Family>);
1772 }
1773
1774 #[test_traced("INFO")]
1775 fn test_prune_returns_actual_boundary_mmb() {
1776 let executor = deterministic::Runner::default();
1777 executor.start(test_prune_returns_actual_boundary_inner::<mmb::Family>);
1778 }
1779
1780 async fn test_mutable_prune_updates_merkle_boundary_inner<F: Family + PartialEq>(
1782 context: Context,
1783 ) {
1784 let mut journal = create_journal_with_ops::<F>(context, "trait_prune", 100).await;
1785
1786 journal
1787 .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(50)))
1788 .await
1789 .unwrap();
1790 journal.sync().await.unwrap();
1791
1792 let pruned = <TestJournal<F> as Mutable>::prune(&mut journal, 50)
1793 .await
1794 .unwrap();
1795 assert!(pruned);
1796
1797 let item_boundary = journal.reader().await.bounds().start;
1798 let merkle_boundary = journal.merkle.bounds().start;
1799 assert_eq!(Location::<F>::new(item_boundary), merkle_boundary);
1800 assert!(merkle_boundary > Location::<F>::new(0));
1801
1802 let pruned = <TestJournal<F> as Mutable>::prune(&mut journal, 50)
1803 .await
1804 .unwrap();
1805 assert!(!pruned);
1806 assert_eq!(journal.reader().await.bounds().start, item_boundary);
1807 assert_eq!(journal.merkle.bounds().start, merkle_boundary);
1808 }
1809
1810 #[test_traced("INFO")]
1811 fn test_mutable_prune_updates_merkle_boundary_mmr() {
1812 let executor = deterministic::Runner::default();
1813 executor.start(test_mutable_prune_updates_merkle_boundary_inner::<mmr::Family>);
1814 }
1815
1816 #[test_traced("INFO")]
1817 fn test_mutable_prune_updates_merkle_boundary_mmb() {
1818 let executor = deterministic::Runner::default();
1819 executor.start(test_mutable_prune_updates_merkle_boundary_inner::<mmb::Family>);
1820 }
1821
1822 async fn test_prune_preserves_operation_count_inner<F: Family + PartialEq>(context: Context) {
1824 let mut journal = create_journal_with_ops::<F>(context, "prune_count", 100).await;
1825
1826 journal
1827 .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(50)))
1828 .await
1829 .unwrap();
1830 journal.sync().await.unwrap();
1831
1832 let count_before = journal.size().await;
1833 journal.prune(Location::<F>::new(50)).await.unwrap();
1834 let count_after = journal.size().await;
1835
1836 assert_eq!(count_before, count_after);
1837 }
1838
1839 #[test_traced("INFO")]
1840 fn test_prune_preserves_operation_count_mmr() {
1841 let executor = deterministic::Runner::default();
1842 executor.start(test_prune_preserves_operation_count_inner::<mmr::Family>);
1843 }
1844
1845 #[test_traced("INFO")]
1846 fn test_prune_preserves_operation_count_mmb() {
1847 let executor = deterministic::Runner::default();
1848 executor.start(test_prune_preserves_operation_count_inner::<mmb::Family>);
1849 }
1850
1851 async fn test_bounds_empty_and_pruned_inner<F: Family + PartialEq>(context: Context) {
1853 let journal = create_empty_journal::<F>(context.child("empty"), "oldest").await;
1855 assert!(journal.reader().await.bounds().is_empty());
1856 journal.destroy().await.unwrap();
1857
1858 let journal = create_journal_with_ops::<F>(context.child("no_prune"), "oldest", 100).await;
1860 let bounds = journal.reader().await.bounds();
1861 assert!(!bounds.is_empty());
1862 assert_eq!(bounds.start, 0);
1863 journal.destroy().await.unwrap();
1864
1865 let mut journal =
1867 create_journal_with_ops::<F>(context.child("pruned"), "oldest", 100).await;
1868 journal
1869 .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(50)))
1870 .await
1871 .unwrap();
1872 journal.sync().await.unwrap();
1873
1874 let pruned_boundary = journal.prune(Location::<F>::new(50)).await.unwrap();
1875
1876 let bounds = journal.reader().await.bounds();
1878 assert!(!bounds.is_empty());
1879 assert_eq!(bounds.start, pruned_boundary);
1880 assert!(pruned_boundary <= 50);
1882 journal.destroy().await.unwrap();
1883 }
1884
1885 #[test_traced("INFO")]
1886 fn test_bounds_empty_and_pruned_mmr() {
1887 let executor = deterministic::Runner::default();
1888 executor.start(test_bounds_empty_and_pruned_inner::<mmr::Family>);
1889 }
1890
1891 #[test_traced("INFO")]
1892 fn test_bounds_empty_and_pruned_mmb() {
1893 let executor = deterministic::Runner::default();
1894 executor.start(test_bounds_empty_and_pruned_inner::<mmb::Family>);
1895 }
1896
1897 async fn test_bounds_start_after_prune_inner<F: Family + PartialEq>(context: Context) {
1899 let journal = create_empty_journal::<F>(context.child("empty"), "boundary").await;
1901 assert_eq!(journal.reader().await.bounds().start, 0);
1902
1903 let journal =
1905 create_journal_with_ops::<F>(context.child("no_prune"), "boundary", 100).await;
1906 assert_eq!(journal.reader().await.bounds().start, 0);
1907
1908 let mut journal =
1910 create_journal_with_ops::<F>(context.child("pruned"), "boundary", 100).await;
1911 journal
1912 .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(50)))
1913 .await
1914 .unwrap();
1915 journal.sync().await.unwrap();
1916
1917 let pruned_boundary = journal.prune(Location::<F>::new(50)).await.unwrap();
1918
1919 assert_eq!(journal.reader().await.bounds().start, pruned_boundary);
1920 }
1921
1922 #[test_traced("INFO")]
1923 fn test_bounds_start_after_prune_mmr() {
1924 let executor = deterministic::Runner::default();
1925 executor.start(test_bounds_start_after_prune_inner::<mmr::Family>);
1926 }
1927
1928 #[test_traced("INFO")]
1929 fn test_bounds_start_after_prune_mmb() {
1930 let executor = deterministic::Runner::default();
1931 executor.start(test_bounds_start_after_prune_inner::<mmb::Family>);
1932 }
1933
1934 async fn test_mmr_prunes_to_journal_boundary_inner<F: Family + PartialEq>(context: Context) {
1936 let mut journal = create_journal_with_ops::<F>(context, "mmr_boundary", 50).await;
1937
1938 journal
1939 .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(25)))
1940 .await
1941 .unwrap();
1942 journal.sync().await.unwrap();
1943
1944 let pruned_boundary = journal.prune(Location::<F>::new(25)).await.unwrap();
1945
1946 let bounds = journal.reader().await.bounds();
1948 assert!(!bounds.is_empty());
1949 assert_eq!(pruned_boundary, bounds.start);
1950
1951 assert!(pruned_boundary <= Location::<F>::new(25));
1953
1954 assert_eq!(journal.size().await, 51);
1956 }
1957
1958 #[test_traced("INFO")]
1959 fn test_mmr_prunes_to_journal_boundary_mmr() {
1960 let executor = deterministic::Runner::default();
1961 executor.start(test_mmr_prunes_to_journal_boundary_inner::<mmr::Family>);
1962 }
1963
1964 #[test_traced("INFO")]
1965 fn test_mmr_prunes_to_journal_boundary_mmb() {
1966 let executor = deterministic::Runner::default();
1967 executor.start(test_mmr_prunes_to_journal_boundary_inner::<mmb::Family>);
1968 }
1969
1970 async fn test_proof_multiple_operations_inner<F: Family + PartialEq>(context: Context) {
1972 let journal = create_journal_with_ops::<F>(context, "proof_multi", 50).await;
1973
1974 let (proof, ops) = journal
1975 .proof(Location::<F>::new(0), NZU64!(50), 0)
1976 .await
1977 .unwrap();
1978
1979 assert_eq!(ops.len(), 50);
1980 for (i, op) in ops.iter().enumerate() {
1981 assert_eq!(*op, create_operation::<F>(i as u8));
1982 }
1983
1984 let hasher = StandardHasher::new(ForwardFold);
1986 let root = journal_root(&journal);
1987 assert!(verify_proof(
1988 &proof,
1989 &ops,
1990 Location::<F>::new(0),
1991 &root,
1992 &hasher
1993 ));
1994 }
1995
1996 #[test_traced("INFO")]
1997 fn test_proof_multiple_operations_mmr() {
1998 let executor = deterministic::Runner::default();
1999 executor.start(test_proof_multiple_operations_inner::<mmr::Family>);
2000 }
2001
2002 #[test_traced("INFO")]
2003 fn test_proof_multiple_operations_mmb() {
2004 let executor = deterministic::Runner::default();
2005 executor.start(test_proof_multiple_operations_inner::<mmb::Family>);
2006 }
2007
2008 async fn test_historical_proof_limited_by_max_ops_inner<F: Family + PartialEq>(
2010 context: Context,
2011 ) {
2012 let journal = create_journal_with_ops::<F>(context, "proof_limit", 50).await;
2013
2014 let size = journal.size().await;
2015 let (proof, ops) = journal
2016 .historical_proof(size, Location::<F>::new(0), NZU64!(20), 0)
2017 .await
2018 .unwrap();
2019
2020 assert_eq!(ops.len(), 20);
2022 for (i, op) in ops.iter().enumerate() {
2023 assert_eq!(*op, create_operation::<F>(i as u8));
2024 }
2025
2026 let hasher = StandardHasher::new(ForwardFold);
2028 let root = journal_root(&journal);
2029 assert!(verify_proof(
2030 &proof,
2031 &ops,
2032 Location::<F>::new(0),
2033 &root,
2034 &hasher
2035 ));
2036 }
2037
2038 #[test_traced("INFO")]
2039 fn test_historical_proof_limited_by_max_ops_mmr() {
2040 let executor = deterministic::Runner::default();
2041 executor.start(|context| {
2042 test_historical_proof_limited_by_max_ops_inner::<mmr::Family>(context)
2043 });
2044 }
2045
2046 #[test_traced("INFO")]
2047 fn test_historical_proof_limited_by_max_ops_mmb() {
2048 let executor = deterministic::Runner::default();
2049 executor.start(|context| {
2050 test_historical_proof_limited_by_max_ops_inner::<mmb::Family>(context)
2051 });
2052 }
2053
2054 async fn test_historical_proof_at_end_of_journal_inner<F: Family + PartialEq>(
2056 context: Context,
2057 ) {
2058 let journal = create_journal_with_ops::<F>(context, "proof_end", 50).await;
2059
2060 let size = journal.size().await;
2061 let (proof, ops) = journal
2063 .historical_proof(size, Location::<F>::new(40), NZU64!(20), 0)
2064 .await
2065 .unwrap();
2066
2067 assert_eq!(ops.len(), 10);
2069 for (i, op) in ops.iter().enumerate() {
2070 assert_eq!(*op, create_operation::<F>((40 + i) as u8));
2071 }
2072
2073 let hasher = StandardHasher::new(ForwardFold);
2075 let root = journal_root(&journal);
2076 assert!(verify_proof(
2077 &proof,
2078 &ops,
2079 Location::<F>::new(40),
2080 &root,
2081 &hasher
2082 ));
2083 }
2084
2085 #[test_traced("INFO")]
2086 fn test_historical_proof_at_end_of_journal_mmr() {
2087 let executor = deterministic::Runner::default();
2088 executor.start(test_historical_proof_at_end_of_journal_inner::<mmr::Family>);
2089 }
2090
2091 #[test_traced("INFO")]
2092 fn test_historical_proof_at_end_of_journal_mmb() {
2093 let executor = deterministic::Runner::default();
2094 executor.start(test_historical_proof_at_end_of_journal_inner::<mmb::Family>);
2095 }
2096
2097 async fn test_historical_proof_out_of_range_returns_error_inner<F: Family + PartialEq>(
2099 context: Context,
2100 ) {
2101 let journal = create_journal_with_ops::<F>(context, "proof_oob", 5).await;
2102
2103 let result = journal
2105 .historical_proof(Location::<F>::new(10), Location::<F>::new(0), NZU64!(1), 0)
2106 .await;
2107
2108 assert!(matches!(
2109 result,
2110 Err(Error::Merkle(merkle::Error::RangeOutOfBounds(_)))
2111 ));
2112 }
2113
2114 #[test_traced("INFO")]
2115 fn test_historical_proof_out_of_range_returns_error_mmr() {
2116 let executor = deterministic::Runner::default();
2117 executor.start(|context| {
2118 test_historical_proof_out_of_range_returns_error_inner::<mmr::Family>(context)
2119 });
2120 }
2121
2122 #[test_traced("INFO")]
2123 fn test_historical_proof_out_of_range_returns_error_mmb() {
2124 let executor = deterministic::Runner::default();
2125 executor.start(|context| {
2126 test_historical_proof_out_of_range_returns_error_inner::<mmb::Family>(context)
2127 });
2128 }
2129
2130 async fn test_historical_proof_start_too_large_returns_error_inner<F: Family + PartialEq>(
2132 context: Context,
2133 ) {
2134 let journal = create_journal_with_ops::<F>(context, "proof_start_oob", 5).await;
2135
2136 let size = journal.size().await;
2137 let result = journal.historical_proof(size, size, NZU64!(1), 0).await;
2139
2140 assert!(matches!(
2141 result,
2142 Err(Error::Merkle(merkle::Error::RangeOutOfBounds(_)))
2143 ));
2144 }
2145
2146 #[test_traced("INFO")]
2147 fn test_historical_proof_start_too_large_returns_error_mmr() {
2148 let executor = deterministic::Runner::default();
2149 executor.start(|context| {
2150 test_historical_proof_start_too_large_returns_error_inner::<mmr::Family>(context)
2151 });
2152 }
2153
2154 #[test_traced("INFO")]
2155 fn test_historical_proof_start_too_large_returns_error_mmb() {
2156 let executor = deterministic::Runner::default();
2157 executor.start(|context| {
2158 test_historical_proof_start_too_large_returns_error_inner::<mmb::Family>(context)
2159 });
2160 }
2161
2162 async fn test_historical_proof_truly_historical_inner<F: Family + PartialEq>(context: Context) {
2164 let mut journal = create_journal_with_ops::<F>(context, "proof_historical", 50).await;
2166
2167 let hasher = StandardHasher::new(ForwardFold);
2169 let historical_root = journal_root(&journal);
2170 let historical_size = journal.size().await;
2171
2172 for i in 50..100 {
2174 journal
2175 .append(&create_operation::<F>(i as u8))
2176 .await
2177 .unwrap();
2178 }
2179 journal.sync().await.unwrap();
2180
2181 let (proof, ops) = journal
2183 .historical_proof(historical_size, Location::<F>::new(0), NZU64!(50), 0)
2184 .await
2185 .unwrap();
2186
2187 assert_eq!(ops.len(), 50);
2189 for (i, op) in ops.iter().enumerate() {
2190 assert_eq!(*op, create_operation::<F>(i as u8));
2191 }
2192
2193 assert!(verify_proof(
2195 &proof,
2196 &ops,
2197 Location::<F>::new(0),
2198 &historical_root,
2199 &hasher
2200 ));
2201 }
2202
2203 #[test_traced("INFO")]
2204 fn test_historical_proof_truly_historical_mmr() {
2205 let executor = deterministic::Runner::default();
2206 executor.start(test_historical_proof_truly_historical_inner::<mmr::Family>);
2207 }
2208
2209 #[test_traced("INFO")]
2210 fn test_historical_proof_truly_historical_mmb() {
2211 let executor = deterministic::Runner::default();
2212 executor.start(test_historical_proof_truly_historical_inner::<mmb::Family>);
2213 }
2214
2215 async fn test_historical_proof_pruned_location_returns_error_inner<F: Family + PartialEq>(
2217 context: Context,
2218 ) {
2219 let mut journal = create_journal_with_ops::<F>(context, "proof_pruned", 50).await;
2220
2221 journal
2222 .append(&TestOp::<F>::CommitFloor(None, Location::<F>::new(25)))
2223 .await
2224 .unwrap();
2225 journal.sync().await.unwrap();
2226 let pruned_boundary = journal.prune(Location::<F>::new(25)).await.unwrap();
2227
2228 let size = journal.size().await;
2230 let start_loc = Location::<F>::new(0);
2231 if start_loc < pruned_boundary {
2232 let result = journal
2233 .historical_proof(size, start_loc, NZU64!(1), 0)
2234 .await;
2235
2236 assert!(result.is_err());
2238 }
2239 }
2240
2241 #[test_traced("INFO")]
2242 fn test_historical_proof_pruned_location_returns_error_mmr() {
2243 let executor = deterministic::Runner::default();
2244 executor.start(|context| {
2245 test_historical_proof_pruned_location_returns_error_inner::<mmr::Family>(context)
2246 });
2247 }
2248
2249 #[test_traced("INFO")]
2250 fn test_historical_proof_pruned_location_returns_error_mmb() {
2251 let executor = deterministic::Runner::default();
2252 executor.start(|context| {
2253 test_historical_proof_pruned_location_returns_error_inner::<mmb::Family>(context)
2254 });
2255 }
2256
2257 async fn test_replay_operations_inner<F: Family + PartialEq>(context: Context) {
2259 let journal = create_empty_journal::<F>(context.child("empty"), "replay").await;
2261 let reader = journal.reader().await;
2262 let stream = reader.replay(NZUsize!(10), 0).await.unwrap();
2263 futures::pin_mut!(stream);
2264 assert!(stream.next().await.is_none());
2265
2266 let journal = create_journal_with_ops::<F>(context.child("with_ops"), "replay", 50).await;
2268 let reader = journal.reader().await;
2269 let stream = reader.replay(NZUsize!(100), 0).await.unwrap();
2270 futures::pin_mut!(stream);
2271
2272 for i in 0..50 {
2273 let (pos, op) = stream.next().await.unwrap().unwrap();
2274 assert_eq!(pos, i);
2275 assert_eq!(op, create_operation::<F>(i as u8));
2276 }
2277
2278 assert!(stream.next().await.is_none());
2279 }
2280
2281 #[test_traced("INFO")]
2282 fn test_replay_operations_mmr() {
2283 let executor = deterministic::Runner::default();
2284 executor.start(test_replay_operations_inner::<mmr::Family>);
2285 }
2286
2287 #[test_traced("INFO")]
2288 fn test_replay_operations_mmb() {
2289 let executor = deterministic::Runner::default();
2290 executor.start(test_replay_operations_inner::<mmb::Family>);
2291 }
2292
2293 async fn test_replay_from_middle_inner<F: Family + PartialEq>(context: Context) {
2295 let journal = create_journal_with_ops::<F>(context, "replay_middle", 50).await;
2296 let reader = journal.reader().await;
2297 let stream = reader.replay(NZUsize!(100), 25).await.unwrap();
2298 futures::pin_mut!(stream);
2299
2300 let mut count = 0;
2301 while let Some(result) = stream.next().await {
2302 let (pos, op) = result.unwrap();
2303 assert_eq!(pos, 25 + count);
2304 assert_eq!(op, create_operation::<F>((25 + count) as u8));
2305 count += 1;
2306 }
2307
2308 assert_eq!(count, 25);
2310 }
2311
2312 #[test_traced("INFO")]
2313 fn test_replay_from_middle_mmr() {
2314 let executor = deterministic::Runner::default();
2315 executor.start(test_replay_from_middle_inner::<mmr::Family>);
2316 }
2317
2318 #[test_traced("INFO")]
2319 fn test_replay_from_middle_mmb() {
2320 let executor = deterministic::Runner::default();
2321 executor.start(test_replay_from_middle_inner::<mmb::Family>);
2322 }
2323
2324 async fn test_speculative_batch_inner<F: Family + PartialEq>(context: Context) {
2326 let mut journal = create_journal_with_ops::<F>(context, "speculative_batch", 10).await;
2327 let original_root = journal_root(&journal);
2328
2329 let b1 = journal.new_batch();
2331 let b2 = journal.new_batch();
2332
2333 let op_a = create_operation::<F>(100);
2335 let op_b = create_operation::<F>(200);
2336 let b1 = b1.add(op_a.clone());
2337 let b2 = b2.add(op_b);
2338
2339 let m1 = journal.merkle.with_mem(|mem| b1.merkleize(mem));
2341 let m2 = journal.merkle.with_mem(|mem| b2.merkleize(mem));
2342 assert_ne!(batch_root(&journal, &m1), batch_root(&journal, &m2));
2343 assert_ne!(batch_root(&journal, &m1), original_root);
2344 assert_ne!(batch_root(&journal, &m2), original_root);
2345
2346 assert_eq!(journal_root(&journal), original_root);
2348
2349 let expected_root = batch_root(&journal, &m1);
2351 journal.apply_batch(&m1).await.unwrap();
2352
2353 assert_eq!(journal_root(&journal), expected_root);
2355 assert_eq!(*journal.size().await, 11);
2356 }
2357
2358 #[test_traced("INFO")]
2359 fn test_speculative_batch_mmr() {
2360 let executor = deterministic::Runner::default();
2361 executor.start(test_speculative_batch_inner::<mmr::Family>);
2362 }
2363
2364 #[test_traced("INFO")]
2365 fn test_speculative_batch_mmb() {
2366 let executor = deterministic::Runner::default();
2367 executor.start(test_speculative_batch_inner::<mmb::Family>);
2368 }
2369
2370 async fn test_speculative_batch_stacking_inner<F: Family + PartialEq>(context: Context) {
2373 let mut journal = create_journal_with_ops::<F>(context, "batch_stacking", 10).await;
2374
2375 let op_a = create_operation::<F>(100);
2376 let op_b = create_operation::<F>(200);
2377
2378 let (merkleized_a, merkleized_b) = {
2379 let batch_a = journal.new_batch().add(op_a.clone());
2380 let merkleized_a = journal.merkle.with_mem(|mem| batch_a.merkleize(mem));
2381
2382 let batch_b = merkleized_a.new_batch::<Sha256>().add(op_b.clone());
2383 let merkleized_b = journal.merkle.with_mem(|mem| batch_b.merkleize(mem));
2384 (merkleized_a, merkleized_b)
2385 };
2386
2387 let expected_root = batch_root(&journal, &merkleized_b);
2388 journal.apply_batch(&merkleized_b).await.unwrap();
2389 drop(merkleized_a);
2390
2391 assert_eq!(journal_root(&journal), expected_root);
2392 assert_eq!(*journal.size().await, 12);
2393
2394 let read_a = journal.read(Location::<F>::new(10)).await.unwrap();
2396 assert_eq!(read_a, op_a);
2397 let read_b = journal.read(Location::<F>::new(11)).await.unwrap();
2398 assert_eq!(read_b, op_b);
2399 }
2400
2401 #[test_traced("INFO")]
2402 fn test_speculative_batch_stacking_mmr() {
2403 let executor = deterministic::Runner::default();
2404 executor.start(test_speculative_batch_stacking_inner::<mmr::Family>);
2405 }
2406
2407 #[test_traced("INFO")]
2408 fn test_speculative_batch_stacking_mmb() {
2409 let executor = deterministic::Runner::default();
2410 executor.start(test_speculative_batch_stacking_inner::<mmb::Family>);
2411 }
2412
2413 async fn test_speculative_batch_sequential_inner<F: Family + PartialEq>(context: Context) {
2416 let mut journal = create_journal_with_ops::<F>(context, "batch_sequential", 10).await;
2417
2418 let op_a = create_operation::<F>(100);
2419 let op_b = create_operation::<F>(200);
2420
2421 let batch_a = journal.new_batch().add(op_a.clone());
2423 let merkleized_a = journal.merkle.with_mem(|mem| batch_a.merkleize(mem));
2424 journal.apply_batch(&merkleized_a).await.unwrap();
2425 assert_eq!(*journal.size().await, 11);
2426
2427 let batch_b = journal.new_batch().add(op_b.clone());
2429 let merkleized_b = journal.merkle.with_mem(|mem| batch_b.merkleize(mem));
2430 let expected_root = batch_root(&journal, &merkleized_b);
2431 journal.apply_batch(&merkleized_b).await.unwrap();
2432
2433 assert_eq!(journal_root(&journal), expected_root);
2434 assert_eq!(*journal.size().await, 12);
2435
2436 let read_a = journal.read(Location::<F>::new(10)).await.unwrap();
2438 assert_eq!(read_a, op_a);
2439 let read_b = journal.read(Location::<F>::new(11)).await.unwrap();
2440 assert_eq!(read_b, op_b);
2441 }
2442
2443 #[test_traced("INFO")]
2444 fn test_speculative_batch_sequential_mmr() {
2445 let executor = deterministic::Runner::default();
2446 executor.start(test_speculative_batch_sequential_inner::<mmr::Family>);
2447 }
2448
2449 #[test_traced("INFO")]
2450 fn test_speculative_batch_sequential_mmb() {
2451 let executor = deterministic::Runner::default();
2452 executor.start(test_speculative_batch_sequential_inner::<mmb::Family>);
2453 }
2454
2455 async fn test_stale_batch_sibling_inner<F: Family + PartialEq>(context: Context) {
2456 let mut journal = create_empty_journal::<F>(context, "stale-sibling").await;
2457 let op_a = create_operation::<F>(1);
2458 let op_b = create_operation::<F>(2);
2459
2460 let batch_a = journal.new_batch().add(op_a.clone());
2462 let merkleized_a = journal.merkle.with_mem(|mem| batch_a.merkleize(mem));
2463 let batch_b = journal.new_batch().add(op_b);
2464 let merkleized_b = journal.merkle.with_mem(|mem| batch_b.merkleize(mem));
2465
2466 journal.apply_batch(&merkleized_a).await.unwrap();
2468 let expected_root = journal_root(&journal);
2469 let expected_size = journal.size().await;
2470
2471 let result = journal.apply_batch(&merkleized_b).await;
2473 assert!(
2474 matches!(
2475 result,
2476 Err(super::Error::Merkle(merkle::Error::StaleBatch { .. }))
2477 ),
2478 "expected StaleBatch, got {result:?}"
2479 );
2480
2481 assert_eq!(journal_root(&journal), expected_root);
2483 assert_eq!(journal.size().await, expected_size);
2484 let (_, ops) = journal
2485 .proof(Location::<F>::new(0), NZU64!(1), 0)
2486 .await
2487 .unwrap();
2488 assert_eq!(ops, vec![op_a]);
2489 }
2490
2491 #[test_traced("INFO")]
2492 fn test_stale_batch_sibling_mmr() {
2493 let executor = deterministic::Runner::default();
2494 executor.start(test_stale_batch_sibling_inner::<mmr::Family>);
2495 }
2496
2497 #[test_traced("INFO")]
2498 fn test_stale_batch_sibling_mmb() {
2499 let executor = deterministic::Runner::default();
2500 executor.start(test_stale_batch_sibling_inner::<mmb::Family>);
2501 }
2502
2503 async fn test_stale_batch_chained_inner<F: Family + PartialEq>(context: Context) {
2504 let mut journal = create_journal_with_ops::<F>(context, "stale-chained", 5).await;
2505
2506 let parent_batch = journal.new_batch().add(create_operation::<F>(10));
2508 let parent = journal.merkle.with_mem(|mem| parent_batch.merkleize(mem));
2509 let batch_a = parent.new_batch::<Sha256>().add(create_operation::<F>(20));
2510 let child_a = journal.merkle.with_mem(|mem| batch_a.merkleize(mem));
2511 let batch_b = parent.new_batch::<Sha256>().add(create_operation::<F>(30));
2512 let child_b = journal.merkle.with_mem(|mem| batch_b.merkleize(mem));
2513
2514 journal.apply_batch(&child_a).await.unwrap();
2516 let result = journal.apply_batch(&child_b).await;
2517 drop(parent);
2518 assert!(
2519 matches!(
2520 result,
2521 Err(super::Error::Merkle(merkle::Error::StaleBatch { .. }))
2522 ),
2523 "expected StaleBatch for sibling, got {result:?}"
2524 );
2525 }
2526
2527 #[test_traced("INFO")]
2528 fn test_stale_batch_chained_mmr() {
2529 let executor = deterministic::Runner::default();
2530 executor.start(test_stale_batch_chained_inner::<mmr::Family>);
2531 }
2532
2533 #[test_traced("INFO")]
2534 fn test_stale_batch_chained_mmb() {
2535 let executor = deterministic::Runner::default();
2536 executor.start(test_stale_batch_chained_inner::<mmb::Family>);
2537 }
2538
2539 async fn test_stale_batch_parent_before_child_inner<F: Family + PartialEq>(context: Context) {
2540 let mut journal = create_empty_journal::<F>(context, "stale-parent-first").await;
2541
2542 let parent_batch = journal.new_batch().add(create_operation::<F>(1));
2544 let parent = journal.merkle.with_mem(|mem| parent_batch.merkleize(mem));
2545 let child_batch = parent.new_batch::<Sha256>().add(create_operation::<F>(2));
2546 let child = journal.merkle.with_mem(|mem| child_batch.merkleize(mem));
2547
2548 let expected_root = batch_root(&journal, &child);
2549
2550 journal.apply_batch(&parent).await.unwrap();
2552 journal.apply_batch(&child).await.unwrap();
2553
2554 assert_eq!(journal_root(&journal), expected_root);
2555 assert_eq!(*journal.size().await, 2);
2556 }
2557
2558 #[test_traced("INFO")]
2559 fn test_stale_batch_parent_before_child_mmr() {
2560 let executor = deterministic::Runner::default();
2561 executor.start(test_stale_batch_parent_before_child_inner::<mmr::Family>);
2562 }
2563
2564 #[test_traced("INFO")]
2565 fn test_stale_batch_parent_before_child_mmb() {
2566 let executor = deterministic::Runner::default();
2567 executor.start(test_stale_batch_parent_before_child_inner::<mmb::Family>);
2568 }
2569
2570 async fn test_stale_batch_child_before_parent_inner<F: Family + PartialEq>(context: Context) {
2571 let mut journal = create_empty_journal::<F>(context, "stale-child-first").await;
2572
2573 let parent_batch = journal.new_batch().add(create_operation::<F>(1));
2575 let parent = journal.merkle.with_mem(|mem| parent_batch.merkleize(mem));
2576 let child_batch = parent.new_batch::<Sha256>().add(create_operation::<F>(2));
2577 let child = journal.merkle.with_mem(|mem| child_batch.merkleize(mem));
2578
2579 journal.apply_batch(&child).await.unwrap();
2581 let result = journal.apply_batch(&parent).await;
2582 assert!(
2583 matches!(
2584 result,
2585 Err(super::Error::Merkle(merkle::Error::StaleBatch { .. }))
2586 ),
2587 "expected StaleBatch for parent after child applied, got {result:?}"
2588 );
2589 }
2590
2591 #[test_traced("INFO")]
2592 fn test_stale_batch_child_before_parent_mmr() {
2593 let executor = deterministic::Runner::default();
2594 executor.start(test_stale_batch_child_before_parent_inner::<mmr::Family>);
2595 }
2596
2597 #[test_traced("INFO")]
2598 fn test_stale_batch_child_before_parent_mmb() {
2599 let executor = deterministic::Runner::default();
2600 executor.start(test_stale_batch_child_before_parent_inner::<mmb::Family>);
2601 }
2602
2603 async fn test_apply_batch_skip_ancestor_items_inner<F: Family + PartialEq>(context: Context) {
2605 let mut journal = create_journal_with_ops::<F>(context, "rp-skip", 3).await;
2606
2607 let parent_batch = journal
2609 .new_batch()
2610 .add(create_operation::<F>(10))
2611 .add(create_operation::<F>(11));
2612 let parent = journal.merkle.with_mem(|mem| parent_batch.merkleize(mem));
2613
2614 let child_batch = parent
2616 .new_batch::<Sha256>()
2617 .add(create_operation::<F>(20))
2618 .add(create_operation::<F>(21))
2619 .add(create_operation::<F>(22));
2620 let child = journal.merkle.with_mem(|mem| child_batch.merkleize(mem));
2621
2622 journal.apply_batch(&parent).await.unwrap();
2624
2625 journal.apply_batch(&child).await.unwrap();
2627
2628 let (_, ops) = journal
2630 .proof(Location::<F>::new(3), NZU64!(5), 0)
2631 .await
2632 .unwrap();
2633 assert_eq!(ops.len(), 5);
2634 }
2635
2636 #[test_traced("INFO")]
2637 fn test_apply_batch_skip_ancestor_items_mmr() {
2638 let executor = deterministic::Runner::default();
2639 executor.start(test_apply_batch_skip_ancestor_items_inner::<mmr::Family>);
2640 }
2641
2642 #[test_traced("INFO")]
2643 fn test_apply_batch_skip_ancestor_items_mmb() {
2644 let executor = deterministic::Runner::default();
2645 executor.start(test_apply_batch_skip_ancestor_items_inner::<mmb::Family>);
2646 }
2647
2648 async fn test_apply_batch_cross_batch_inner<F: Family + PartialEq>(context: Context) {
2650 let mut journal = create_journal_with_ops::<F>(context, "rp-cross", 2).await;
2651
2652 let grandparent_batch = journal
2654 .new_batch()
2655 .add(create_operation::<F>(3))
2656 .add(create_operation::<F>(4))
2657 .add(create_operation::<F>(5));
2658 let grandparent = journal
2659 .merkle
2660 .with_mem(|mem| grandparent_batch.merkleize(mem));
2661
2662 let parent_batch = grandparent
2664 .new_batch::<Sha256>()
2665 .add(create_operation::<F>(6))
2666 .add(create_operation::<F>(7));
2667 let parent = journal.merkle.with_mem(|mem| parent_batch.merkleize(mem));
2668
2669 let child_batch = parent.new_batch::<Sha256>().add(create_operation::<F>(8));
2671 let child = journal.merkle.with_mem(|mem| child_batch.merkleize(mem));
2672
2673 journal.apply_batch(&grandparent).await.unwrap();
2675
2676 journal.apply_batch(&parent).await.unwrap();
2678
2679 journal.apply_batch(&child).await.unwrap();
2681
2682 assert_eq!(*journal.size().await, 8);
2684
2685 let (_, ops) = journal
2687 .proof(Location::<F>::new(2), NZU64!(6), 0)
2688 .await
2689 .unwrap();
2690 for (i, op) in ops.iter().enumerate() {
2691 assert_eq!(*op, create_operation::<F>((i + 3) as u8));
2692 }
2693 }
2694
2695 #[test_traced("INFO")]
2696 fn test_apply_batch_cross_batch_mmr() {
2697 let executor = deterministic::Runner::default();
2698 executor.start(test_apply_batch_cross_batch_inner::<mmr::Family>);
2699 }
2700
2701 #[test_traced("INFO")]
2702 fn test_apply_batch_cross_batch_mmb() {
2703 let executor = deterministic::Runner::default();
2704 executor.start(test_apply_batch_cross_batch_inner::<mmb::Family>);
2705 }
2706
2707 async fn test_merkleize_with_matches_add_inner<F: Family + PartialEq>(context: Context) {
2709 let journal = create_journal_with_ops::<F>(context, "mw-matches", 5).await;
2710
2711 let ops = vec![
2712 create_operation::<F>(10),
2713 create_operation::<F>(11),
2714 create_operation::<F>(12),
2715 ];
2716
2717 let mut batch = journal.new_batch();
2719 for op in &ops {
2720 batch = batch.add(op.clone());
2721 }
2722 let expected = journal.merkle.with_mem(|mem| batch.merkleize(mem));
2723
2724 let batch = journal.new_batch();
2726 let actual = journal
2727 .merkle
2728 .with_mem(|mem| batch.merkleize_with(mem, Arc::new(ops)));
2729
2730 assert_eq!(
2731 batch_root(&journal, &actual),
2732 batch_root(&journal, &expected)
2733 );
2734 }
2735
2736 #[test_traced("INFO")]
2737 fn test_merkleize_with_matches_add_mmr() {
2738 let executor = deterministic::Runner::default();
2739 executor.start(test_merkleize_with_matches_add_inner::<mmr::Family>);
2740 }
2741
2742 #[test_traced("INFO")]
2743 fn test_merkleize_with_matches_add_mmb() {
2744 let executor = deterministic::Runner::default();
2745 executor.start(test_merkleize_with_matches_add_inner::<mmb::Family>);
2746 }
2747
2748 async fn test_merkleize_with_apply_inner<F: Family + PartialEq>(context: Context) {
2750 let mut journal = create_journal_with_ops::<F>(context, "mw-apply", 5).await;
2751
2752 let ops = vec![create_operation::<F>(10), create_operation::<F>(11)];
2753 let batch = journal.new_batch();
2754 let merkleized = journal
2755 .merkle
2756 .with_mem(|mem| batch.merkleize_with(mem, Arc::new(ops.clone())));
2757
2758 let expected_root = batch_root(&journal, &merkleized);
2759 journal.apply_batch(&merkleized).await.unwrap();
2760
2761 assert_eq!(journal_root(&journal), expected_root);
2762 assert_eq!(*journal.size().await, 7);
2763
2764 let reader = journal.reader().await;
2765 assert_eq!(reader.read(5).await.unwrap(), ops[0]);
2766 assert_eq!(reader.read(6).await.unwrap(), ops[1]);
2767 }
2768
2769 #[test_traced("INFO")]
2770 fn test_merkleize_with_apply_mmr() {
2771 let executor = deterministic::Runner::default();
2772 executor.start(test_merkleize_with_apply_inner::<mmr::Family>);
2773 }
2774
2775 #[test_traced("INFO")]
2776 fn test_merkleize_with_apply_mmb() {
2777 let executor = deterministic::Runner::default();
2778 executor.start(test_merkleize_with_apply_inner::<mmb::Family>);
2779 }
2780
2781 async fn test_merkleize_with_shares_arc_inner<F: Family + PartialEq>(context: Context) {
2783 let journal = create_journal_with_ops::<F>(context, "mw-arc", 3).await;
2784
2785 let ops = Arc::new(vec![create_operation::<F>(20), create_operation::<F>(21)]);
2786 let ops_clone = Arc::clone(&ops);
2787 let batch = journal.new_batch();
2788 let merkleized = journal
2789 .merkle
2790 .with_mem(|mem| batch.merkleize_with(mem, ops_clone));
2791
2792 assert!(Arc::ptr_eq(&merkleized.items, &ops));
2794 }
2795
2796 #[test_traced("INFO")]
2797 fn test_merkleize_with_shares_arc_mmr() {
2798 let executor = deterministic::Runner::default();
2799 executor.start(test_merkleize_with_shares_arc_inner::<mmr::Family>);
2800 }
2801
2802 #[test_traced("INFO")]
2803 fn test_merkleize_with_shares_arc_mmb() {
2804 let executor = deterministic::Runner::default();
2805 executor.start(test_merkleize_with_shares_arc_inner::<mmb::Family>);
2806 }
2807
2808 async fn test_apply_batch_skips_only_committed_ancestor_items_inner<F: Family + PartialEq>(
2811 context: Context,
2812 ) {
2813 let mut journal = create_empty_journal::<F>(context.child("storage"), "skip-partial").await;
2814
2815 let a_batch = journal.new_batch().add(create_operation::<F>(1));
2817 let a = journal.merkle.with_mem(|mem| a_batch.merkleize(mem));
2818 let b_batch = a.new_batch::<Sha256>().add(create_operation::<F>(2));
2819 let b = journal.merkle.with_mem(|mem| b_batch.merkleize(mem));
2820 let c_batch = b.new_batch::<Sha256>().add(create_operation::<F>(3));
2821 let c = journal.merkle.with_mem(|mem| c_batch.merkleize(mem));
2822
2823 journal.apply_batch(&a).await.unwrap();
2825 journal.apply_batch(&c).await.unwrap();
2826
2827 assert_eq!(*journal.size().await, 3);
2829
2830 let mut reference =
2832 create_empty_journal::<F>(context.child("ref"), "skip-partial-ref").await;
2833 for i in 1..=3u8 {
2834 reference.append(&create_operation::<F>(i)).await.unwrap();
2835 }
2836 assert_eq!(journal_root(&journal), journal_root(&reference));
2837 }
2838
2839 #[test_traced("INFO")]
2840 fn test_apply_batch_skips_only_committed_ancestor_items_mmr() {
2841 let executor = deterministic::Runner::default();
2842 executor.start(test_apply_batch_skips_only_committed_ancestor_items_inner::<mmr::Family>);
2843 }
2844
2845 #[test_traced("INFO")]
2846 fn test_apply_batch_skips_only_committed_ancestor_items_mmb() {
2847 let executor = deterministic::Runner::default();
2848 executor.start(test_apply_batch_skips_only_committed_ancestor_items_inner::<mmb::Family>);
2849 }
2850}